fix: Snapshots as starting point for eventsBySlices, take 2#717
fix: Snapshots as starting point for eventsBySlices, take 2#717
Conversation
* The initial implementation suffered from two problems 1. Not all entites have snapshots, and then it would miss those events when adjusting the start offset 2. When emitting all snapshots first, followed by events, the offset storage can be two far ahead and events might be missed when restarting from stored offsets * Instead, this implementation drives the stream from the ordinary eventsBySlices query. - reads snapshot sequence number on first occurance of a persistence id, and keeps that in memory - loads snapshot and emits when corresponding event sequence number is seen - skips event with sequence number lower than the snapshot sequence number * Means that it must still read all events, but the benefit is that it doesn't have to process events before snapshots
pvlugter
left a comment
There was a problem hiding this comment.
Looks good.
Deeper change, but can we lazy load the event payloads, like for backtracking, so that we can process all the filtered events more efficiently?
That would be good for the catchup scenario, but probably not for the normal processing. Not sure we should complicate that with some adaptive hybrid approach. At least, let's try this first with some realistic data. I made one optimization for loading only the seqNr of the snapshot first, and then load the full snapshot when it's emitted: de75603 |
johanandren
left a comment
There was a problem hiding this comment.
Looks good. Is there something around what happens with a running system where new snapshots happen while the replay is ongoing that needs to be thought of/tested?
|
|
||
| -- `snapshot_slice_idx` is only needed if the slice based queries are used together with snapshot as starting point | ||
| CREATE INDEX IF NOT EXISTS snapshot_slice_idx ON snapshot(slice, entity_type, db_timestamp); | ||
|
|
There was a problem hiding this comment.
Add something in the migration guide about dropping the index, or do we expect nobody was using this yet?
Start from snapshots is most useful for rebuilds, so optimising this further could make sense. If we did find that useful, could do something that combines something like the old approach with the new approach. Old approach did all the snapshots first, then regular event queries, which doesn't work. But we could take the idea of first doing an initial phase — where we process all the earlier events in a filtered lazy load way and the initial snapshots, until we pass the max snapshot timestamp — and then switch fully over to regular event processing after that. So rather than processing all snapshots and then start event processing from min snapshot timestamp, process catch up until max snapshot timestamp to initialise the projection. |
* because then not depending on db migration of the db_timestamp
| // we can't ignore it when snapshot is not emitted, because it might have been emitted in | ||
| // previous incarnation, but then the stream was restarted | ||
| push(out, env) | ||
| } else if (!s.emitted && env.sequenceNr == s.seqNr) { |
There was a problem hiding this comment.
How is it with event deletion, we only delete up to before sequence number, not the sequence nr of the snapshot? (Or else this will not work together with deletion)
There was a problem hiding this comment.
We as in Akka runtime/sdk don't delete events, until the whole entity is deleted, and then all events and snapshots are deleted, much later.
I'm not sure if snapshotting and retention strategies in EventSourcedBehavior can be setup to delete event seqNr == snapshot seqNr. However, deleting events combined with projections requires considerations anyway, and we have that documented somewhere.
There was a problem hiding this comment.
Alright, as long as we warn a bit about it somewhere that might be good enough.
| } else { | ||
| // snapshot will be emitted later, ignore event | ||
| updateState(snap.persistenceId, snap.seqNr, emitted = false) | ||
| tryPullOrComplete() |
There was a problem hiding this comment.
I think this covers my previous (vague) concern: if there was a new snapshot taken after the result of seqNrOfCorrespondingSnapshot, that means this load returns a newer snapshot than the env triggering it and we'll end up here. Seems fine.
| new GraphStageLogic(shape) with InHandler with OutHandler { self => | ||
| private implicit def ec: ExecutionContext = materializer.executionContext | ||
|
|
||
| private var snapshotState = Map.empty[String, SnapshotState] |
There was a problem hiding this comment.
I still need to implement some kind of eviction of this.
| // we can't ignore it when snapshot is not emitted, because it might have been emitted in | ||
| // previous incarnation, but then the stream was restarted | ||
| push(out, env) | ||
| } else if (!s.emitted && env.sequenceNr == s.seqNr) { |
There was a problem hiding this comment.
We as in Akka runtime/sdk don't delete events, until the whole entity is deleted, and then all events and snapshots are deleted, much later.
I'm not sure if snapshotting and retention strategies in EventSourcedBehavior can be setup to delete event seqNr == snapshot seqNr. However, deleting events combined with projections requires considerations anyway, and we have that documented somewhere.
| updateState(env.persistenceId, seqNr, emitted = false) | ||
| push(env) | ||
| } else if (filterCount >= heartbeatAfter) { | ||
| pushHeartbeat() |
There was a problem hiding this comment.
I have added heartbeats when many events have been filtered out. That will give observability progress and offset storage progress downstreams. I have to deal with these heartbeats in projections. As is, they would be like filtered events, and then probably handled as duplicates in offset store. 8847b84
There was a problem hiding this comment.
Nice, useful to have the heartbeats.
pvlugter
left a comment
There was a problem hiding this comment.
LGTM. Updated approach looks good. Just a question around the state updates for heartbeats, and possible simplification of the logic here.
| } else if (filterCount >= heartbeatAfter) { | ||
| pushHeartbeat() | ||
| } else { | ||
| // snapshot will be emitted later, ignore event | ||
| updateState(env.persistenceId, seqNr, emitted = false) | ||
| ignore(env) | ||
| } |
There was a problem hiding this comment.
In places that we're pushing a heartbeat, rather than ignoring the envelope, should it also call updateState? Similar for loadSnapshotCallback. And also the update of the latest timestamp in ignore. Would seem that we should always do the same as ignoring/filtering, but also push the heartbeat. Could be simpler to not repeat logic, and move the heartbeat push into the ignore method?
| private def ignore(env: EventEnvelope[Event]): Unit = { | ||
| filterCount += 1 | ||
| updateLatestTimestamp(env) | ||
| tryPullOrComplete() | ||
| } |
There was a problem hiding this comment.
Could be simpler to remove the pushHeartbeat method and only have the ignore method to call from other places, and move the heartbeat logic into this method?
private def ignore(env: EventEnvelope[Event]): Unit = {
updateLatestTimestamp(env)
if (filterCount >= heartbeatAfter) {
filterCount = 1L
push(out, createHeartbeat(latestTimestamp))
} else {
filterCount += 1
tryPullOrComplete()
}
}|
It is |
SQL Server tests (or underlying r2dbc implementation) are flaky. This failed in the setup phase of the test, when persisting events, so likely not related to this PR. |
* easier than trying to adjust in when storing offsets
| private val heartbeatPersistenceIds = new ConcurrentHashMap[(String, Int), String]() | ||
| private val heartbeatUuid = UUID.randomUUID().toString | ||
| // gaps are allowed for heartbeat sequence numbers, but increasing for each heartbeat pid (uuid makes it unique) | ||
| private val heartbeatSeqNr = new AtomicLong |
There was a problem hiding this comment.
first I thought I could always use seqNr 1 for the hearbeats and adjust them when storing the offsets, but that is just difficult
Offset validation for heartbeats in akka/akka-projection#1413
With the increasing sequence number the offset can just be stored, as any other.
There was a problem hiding this comment.
Looks good. Increasing seq number on this side seems better.
Would be good to confirm the end-to-end behaviour again, with some larger tests.
See #410