Skip to content

opt: Don't drop events from pubsub; queries no longer heartbeat#189

Closed
leviramsey wants to merge 1 commit intoakka:mainfrom
leviramsey:pubsub-pass
Closed

opt: Don't drop events from pubsub; queries no longer heartbeat#189
leviramsey wants to merge 1 commit intoakka:mainfrom
leviramsey:pubsub-pass

Conversation

@leviramsey
Copy link
Copy Markdown
Contributor

Presently, we drop events received from pubsub to avoid moving the offset for a slice too far ahead (because the timestamp offset in the event envelope is the event timestamp) of the latest event from backtracking. In turn, in order to allow an event from pubsub to pass through after a slice has been idle, the query periodically heartbeats (based on wall-clock time elasped since latest backtracking).

This change:

  • removes responsibility for emitting heartbeats from the byslice queries; instead an initial heartbeat is injected as the starting timestamp minus the backtracking window (open question: could this be taken as Instant.EPOCH - backtracking window if no timestamp?)
  • instead of dropping pubsub events, they are emitted with the offset adjusted such that if the query resumes from that offset, the backtracking window from the query will not move past latest backtracking (i.e. the offset is no later than latest backtracking plus the backtracking window). The event timestamp itself is not modified.

We don't presently promise that the query won't emit pubsub events which are more than one sequence number ahead of the events emitted by the DB queries: with this change we'll be emitting more such events. Projections ignore rejected events from pubsub, so this won't result in replays being triggered. The projections will process events from new persistence IDs even if they're far ahead of where the DB query is, but projections couldn't ever depend on any particular ordering of events from distinct persistence IDs.

maxAheadOfBacktracking: JDuration,
/** INTERNAL API */
@nowarn("msg=eventMetadata in class EventEnvelope is deprecated")
@InternalApi private[akka] def handlePubSubTooFarAhead[Event](
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this flow should effectively treat forward query events as backtracking if backtracking is disabled?

@leviramsey leviramsey requested review from patriknw and pvlugter March 11, 2026 17:59
"Dropping pubsub event for persistenceId [{}] seqNr [{}]{} because too far ahead of backtracking.",
} else if (JDuration.between(l, t.timestamp).compareTo(backtrackingWindow) > 0) {
// far ahead of backtracking, so adjust the offset (the timestamp in the envelope is unaffected)
// so as to prevent the offset from being moved too far ahead
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of dropping pubsub events, they are emitted with the offset adjusted such that if the query resumes from that offset, the backtracking window from the query will not move past latest backtracking (i.e. the offset is no later than latest backtracking plus the backtracking window)

This is scary to me. We have the general invariant that when comparing two events e1 and e2 then

e2.timestamp >= e1.timestamp if e2.seqNr > e1.seqNr

The event timestamp is always propagated as the offset timestamp. I don't know what consequences this could have. For example in the offset store we sort records by timestamp.

@leviramsey leviramsey closed this Mar 29, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants