CQR S
Processing events
Martijn Blankestijn
@MartijnBlankest
Roadmap
Why this talk
Event Sourcing & CQRS
Query side processing
Why?
Appointments
Make
Move
Reassign
Conclude
Cancel
Environment
DC1 DC2
Node 1 Node 2 Node 1 Node 2
Cassandra
Cassandra
Model around your queries
determine what queries to support
create a table for that query 1 partition
Event Sourcing
'ensures that all changes
to application state
are stored as a
sequence of events.
https://martinfowler.com/eaaDev/EventSourcing.html
Time
Created
10:00 12:00 9th Nov
Events with Smith
in Amsterdam
10:00 12:00 9th Nov
with Smith
Current in Amsterdam
State
Time
Created Moved
14:00 16:00 2th Nov
10:00 12:00 9th Nov
Events with Smith
with Jones
in Amsterdam
in Amsterdam
Bid has been made
14:00 16:00 2th Nov
with Jones
Current in Amsterdam
State Bid has been made
Time
Created Moved
10:00 12:00 9th Nov
14:00 16:00 2th Nov Concluded
Events with Jones 5-star
with Smith
in Amsterdam product sold
in Amsterdam
Bid has been made
14:00 16:00 2th Nov
with Jones
Current in Amsterdam
State Comment: Bid has been made
Concluded( 5-star, product-sold)
Event Sourcing
Built-in audit log Space requirements
Troubleshooting Querying entities
Command-Query
Responsibility
Segregation
CQRS
Architectural pattern
with driving forces
Collaboration
Staleness
The CQRS universe
Command events
EventStore
Handler
projector
projector
projector
projection
query Query
Database
Query
Query
Advantages CQRS with ES
Scale read/write independently
Scale Query side per use-case
CHOOSE
CQRS with Event Sourcing Frameworks
Kafka as event store
Axon Framework
Eventuate
Akka Persistence
Persistence
Persistent Actor
Journal
Datacenter 1 Datacenter 2
http
command
PA
Node 1 Node 2 Node 1 Node 2
event
cassandra
Cassandra Events table
persistence_id |partition_nr|sequence_nr |timestamp|timebucket
7c7ec816-efc6... |0 |1 |8ef7f9...|20171018
7c7ec816-efc6... |0 |2 |99e314...|20171018
7c7ec816-efc6... |0 |3 |a41f2b...|20171018
tag1 |writer_uuid |ser_id |ser_manifest |event
appointment |f0088eec... |2 |nl...Created |[payload]
appointment |f0088eec... |2 |nl...Reassigned |[payload]
appointment |f0088eec... |2 |nl...Moved |[payload]
Query side processing
Using
Persistence Query
Using Persistence Query
PersistenceQuery(system)
.readJournalFor[LeveldbReadJournal](
Identifier)
.eventsByTag("appointment")
.map(println)
.runWith(Sink.ignore)
Considerations read side
Resumability
Event order
Changing requirements
Scalability
Push or Pull (*)
Resumability
Resumability
val offset = readOffset().getOrElse(noOffset)
PersistenceQuery(system)
.readJournalFor(Identifier)
.eventsByTag(tag, offset)
.map(processEvent)
.map(saveOffset)
Eventual Consistency
The same stream elements
(in same order) are returned
for multiple executions of the query
on a best effort basis.
https://static.javadoc.io/com.typesafe.akka/akka-persistence-cassandra_2.12/0.58/akka/persistence/cassandra/query/scaladsl/CassandraReadJournal$.html
id seq timestamp
A 1 04.709
B 1 04.731
C 1 04.801
NOW 885
eventual-consistency-delay=100ms
id seq timestamp
A 1 04.709
B 1 04.731
delayed-event-timeout = 1000ms
C 1 04.801
A 3 04.824 STOP
C 2 04.957
NOW (920)
eventual-consistency-delay=100ms
id seq timestamp
A 1 04.709
B 1 04.731
A 2 04.768
C 1 04.801
A 3 04.824
C 2 04.957
A 4 04.973
NOW (05.063)
NO ONE WANTS
EVENTUAL CONSISTENCY.
IT'S A NECESSARY EVIL.
IT'S NOT COOL. IT'S USEFUL.
Jonas Bonr
How much
latency
can you
accept ?
Environment
DC1 DC2
Node 1 Node 2 Node 1 Node 2
Cassandra
In parallel
A 3
B 8
A 4
D 1
A 3 B 8
D 1 A 4
Read-side Sharding
Read side Sharding
shard upon event data
shard on the entity id
Changing
requirements
Projection upgrade
New &
Original
Shiny
The Dark Side
of Event Sourcing:
Managing Data Conversion
http://files.movereem.nl/2017saner-eventsourcing.pdf
Event store upgrade techniques
Multiple versions
Upcasting
Lazy transformation
In place transformation
Copy and transformation
Event store upgrade techniques
Read-side impact
Multiple versions -
Upcasting +
Lazy transformation +
In place transformation ++
Copy and transformation ++
Pull
Write Event
Side Processor
eventsByTag
Push
Akka Cluster
register
Write Event
notify
Side Processor
eventsByTag
In conclusion
CQRS and Event Sourcing
Query side processing
resumability
event order
requirement changes
scale
push vs pull
@MartijnBlankest