Skip to content

Commit 47e9f98

Browse files
committed
Don't drop events from pubsub; queries no longer heartbeat
1 parent 0622881 commit 47e9f98

File tree

4 files changed

+135
-110
lines changed

4 files changed

+135
-110
lines changed

core/src/main/scala/akka/persistence/dynamodb/internal/BySliceQuery.scala

Lines changed: 1 addition & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,6 @@ import org.slf4j.Logger
109109
dao: BySliceQuery.Dao[Item],
110110
createEnvelope: (TimestampOffset, Item) => Envelope,
111111
extractOffset: Envelope => TimestampOffset,
112-
createHeartbeat: Instant => Option[Envelope],
113112
clock: Clock,
114113
settings: DynamoDBSettings,
115114
log: Logger)(implicit val ec: ExecutionContext) {
@@ -401,22 +400,6 @@ import org.slf4j.Logger
401400
.via(deserializeAndAddOffset(newState.currentOffset)))
402401
}
403402

404-
def heartbeat(state: QueryState): Option[Envelope] = {
405-
if (state.idleCountBeforeHeartbeat >= 2 && state.previousQueryWallClock != Instant.EPOCH) {
406-
// use wall clock to measure duration since start, up to idle backtracking limit
407-
val timestamp = state.startTimestamp.plus(
408-
JDuration.between(state.startWallClock, state.previousQueryWallClock.minus(backtrackingBehindCurrentTime)))
409-
410-
val h = createHeartbeat(timestamp)
411-
if (h.isDefined)
412-
log.debug("{} heartbeat timestamp [{}]", logPrefix, timestamp)
413-
h
414-
} else None
415-
}
416-
417-
val nextHeartbeat: QueryState => Option[Envelope] =
418-
if (settings.journalPublishEvents) heartbeat else _ => None
419-
420403
val currentTimestamp = InstantFactory.now() // Can we use DDB as a timestamp source?
421404
val currentWallClock = clock.instant()
422405

@@ -426,8 +409,7 @@ import org.slf4j.Logger
426409
updateState = nextOffset,
427410
delayNextQuery = delayNextQuery,
428411
nextQuery = nextQuery,
429-
beforeQuery = _ => None,
430-
heartbeat = nextHeartbeat)
412+
beforeQuery = _ => None)
431413
}
432414

433415
private def deserializeAndAddOffset(timestampOffset: TimestampOffset): Flow[Item, Envelope, NotUsed] = {

core/src/main/scala/akka/persistence/dynamodb/internal/ContinuousQuery.scala

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,8 @@ private[dynamodb] object ContinuousQuery {
3030
updateState: (S, T) => S,
3131
delayNextQuery: S => Option[FiniteDuration],
3232
nextQuery: S => (S, Option[Source[T, NotUsed]]),
33-
beforeQuery: S => Option[Future[S]] = (_: S) => None,
34-
heartbeat: S => Option[T] = (_: S) => None): Source[T, NotUsed] =
35-
Source.fromGraph(
36-
new ContinuousQuery[S, T](initialState, updateState, delayNextQuery, nextQuery, beforeQuery, heartbeat))
33+
beforeQuery: S => Option[Future[S]] = (_: S) => None): Source[T, NotUsed] =
34+
Source.fromGraph(new ContinuousQuery[S, T](initialState, updateState, delayNextQuery, nextQuery, beforeQuery))
3735

3836
private case object NextQuery
3937

@@ -71,8 +69,7 @@ final private[dynamodb] class ContinuousQuery[S, T](
7169
updateState: (S, T) => S,
7270
delayNextQuery: S => Option[FiniteDuration],
7371
nextQuery: S => (S, Option[Source[T, NotUsed]]),
74-
beforeQuery: S => Option[Future[S]],
75-
heartbeat: S => Option[T])
72+
beforeQuery: S => Option[Future[S]])
7673
extends GraphStage[SourceShape[T]] {
7774
import ContinuousQuery._
7875

@@ -155,13 +152,8 @@ final private[dynamodb] class ContinuousQuery[S, T](
155152
}
156153
})
157154

158-
val sourceWithHeartbeat = heartbeat(newState) match {
159-
case None => source
160-
case Some(h) => Source.single(h).concat(source)
161-
}
162-
163155
val graph = Source
164-
.fromGraph(sourceWithHeartbeat)
156+
.fromGraph(source)
165157
.to(sinkIn.sink)
166158
interpreter.subFusingMaterializer.materialize(graph)
167159
sinkIn.pull()

core/src/main/scala/akka/persistence/dynamodb/query/scaladsl/DynamoDBReadJournal.scala

Lines changed: 106 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -130,10 +130,8 @@ final class DynamoDBReadJournal(system: ExtendedActorSystem, config: Config, cfg
130130
correlationId: Option[String]): BySliceQuery[SerializedJournalItem, EventEnvelope[Event]] = {
131131
val createEnvelope: (TimestampOffset, SerializedJournalItem) => EventEnvelope[Event] = createEventEnvelope
132132
val extractOffset = (env: EventEnvelope[Event]) => env.offset.asInstanceOf[TimestampOffset]
133-
val createHeartbeat = (timestamp: Instant) =>
134-
Some(createEventEnvelopeHeartbeat[Event](entityType, slice, timestamp, correlationId))
135133

136-
new BySliceQuery(queryDao, createEnvelope, extractOffset, createHeartbeat, clock, settings, log)
134+
new BySliceQuery(queryDao, createEnvelope, extractOffset, clock, settings, log)
137135
}
138136

139137
private def snapshotsBySlice[Snapshot, Event](
@@ -145,10 +143,8 @@ final class DynamoDBReadJournal(system: ExtendedActorSystem, config: Config, cfg
145143
(offset, row) => createEnvelopeFromSnapshot(row, offset, transformSnapshot)
146144

147145
val extractOffset: EventEnvelope[Event] => TimestampOffset = env => env.offset.asInstanceOf[TimestampOffset]
148-
val createHeartbeat = (timestamp: Instant) =>
149-
Some(createEventEnvelopeHeartbeat[Event](entityType, slice, timestamp, correlationId))
150146

151-
new BySliceQuery(snapshotDao, createEnvelope, extractOffset, createHeartbeat, clock, settings, log)
147+
new BySliceQuery(snapshotDao, createEnvelope, extractOffset, clock, settings, log)
152148
}
153149

154150
private def createEnvelopeFromSnapshot[Snapshot, Event](
@@ -324,8 +320,20 @@ final class DynamoDBReadJournal(system: ExtendedActorSystem, config: Config, cfg
324320

325321
val dbSource = bySliceQueries.head.mergeAll(bySliceQueries.tail, eagerComplete = false)
326322
if (settings.journalPublishEvents) {
323+
val initialHeartbeats = (minSlice to maxSlice).flatMap { slice =>
324+
sliceStartOffset(slice, offset) match {
325+
case t: TimestampOffset =>
326+
Some(
327+
createEventEnvelopeHeartbeat[Event](
328+
entityType,
329+
slice,
330+
t.timestamp.minus(JDuration.ofMillis(settings.querySettings.backtrackingWindow.toMillis)),
331+
correlationId))
332+
case _ => None
333+
}
334+
}
327335
val pubSubSource = eventsBySlicesPubSubSource[Event](entityType, minSlice, maxSlice)
328-
mergeDbAndPubSubSources(dbSource, pubSubSource, correlationId)
336+
mergeDbAndPubSubSources(dbSource, pubSubSource, initialHeartbeats, correlationId)
329337
} else
330338
dbSource
331339
}
@@ -480,8 +488,20 @@ final class DynamoDBReadJournal(system: ExtendedActorSystem, config: Config, cfg
480488

481489
val dbSource = bySliceQueries.head.mergeAll(bySliceQueries.tail, eagerComplete = false)
482490
if (settings.journalPublishEvents) {
491+
val initialHeartbeats = (minSlice to maxSlice).flatMap { slice =>
492+
sliceStartOffset(slice, offset) match {
493+
case t: TimestampOffset =>
494+
Some(
495+
createEventEnvelopeHeartbeat[Event](
496+
entityType,
497+
slice,
498+
t.timestamp.minus(JDuration.ofMillis(settings.querySettings.backtrackingWindow.toMillis)),
499+
correlationId))
500+
case _ => None
501+
}
502+
}
483503
val pubSubSource = eventsBySlicesPubSubSource[Event](entityType, minSlice, maxSlice)
484-
mergeDbAndPubSubSources(dbSource, pubSubSource, correlationId)
504+
mergeDbAndPubSubSources(dbSource, pubSubSource, initialHeartbeats, correlationId)
485505
} else
486506
dbSource
487507
}
@@ -554,15 +574,24 @@ final class DynamoDBReadJournal(system: ExtendedActorSystem, config: Config, cfg
554574
private def mergeDbAndPubSubSources[Event, Snapshot](
555575
dbSource: Source[EventEnvelope[Event], NotUsed],
556576
pubSubSource: Source[EventEnvelope[Event], NotUsed],
577+
initialHeartbeats: Iterable[EventEnvelope[Event]],
557578
correlationId: Option[String]) = {
558-
dbSource
559-
.mergePrioritized(pubSubSource, leftPriority = 1, rightPriority = 10)
560-
.via(
561-
skipPubSubTooFarAhead(
562-
settings.querySettings.backtrackingEnabled,
563-
JDuration.ofMillis(settings.querySettings.backtrackingWindow.toMillis),
564-
correlationId))
565-
.via(deduplicate(settings.querySettings.deduplicateCapacity))
579+
val dbMergedWithPubsub =
580+
dbSource
581+
.mergePrioritized(pubSubSource, leftPriority = 1, rightPriority = 10)
582+
583+
val base =
584+
if (settings.querySettings.backtrackingEnabled) {
585+
Source
586+
.fromIterator(() => initialHeartbeats.iterator)
587+
.concat(dbMergedWithPubsub)
588+
.via(
589+
handlePubSubTooFarAhead(
590+
JDuration.ofMillis(settings.querySettings.backtrackingWindow.toMillis),
591+
correlationId))
592+
} else dbMergedWithPubsub
593+
594+
base.via(deduplicate(settings.querySettings.deduplicateCapacity))
566595
}
567596

568597
/**
@@ -605,77 +634,85 @@ final class DynamoDBReadJournal(system: ExtendedActorSystem, config: Config, cfg
605634
}
606635
}
607636

608-
/**
609-
* INTERNAL API
610-
*/
611-
@InternalApi private[akka] def skipPubSubTooFarAhead[Event](
612-
enabled: Boolean,
613-
maxAheadOfBacktracking: JDuration,
637+
/** INTERNAL API */
638+
@nowarn("msg=eventMetadata in class EventEnvelope is deprecated")
639+
@InternalApi private[akka] def handlePubSubTooFarAhead[Event](
640+
backtrackingWindow: JDuration,
614641
correlationId: Option[String]): Flow[EventEnvelope[Event], EventEnvelope[Event], NotUsed] = {
615642
def correlationIdLogText = CorrelationId.toLogText(correlationId)
616-
if (!enabled)
617-
Flow[EventEnvelope[Event]]
618-
else
619-
Flow[EventEnvelope[Event]]
620-
.statefulMapConcat(() => {
621-
// track backtracking offset per slice
622-
var latestBacktrackingPerSlice = Map.empty[Int, Instant]
623-
def latestBacktracking(slice: Int): Instant = latestBacktrackingPerSlice.get(slice) match {
624-
case Some(instant) => instant
625-
case None => Instant.EPOCH
626-
}
627-
env => {
628-
val slice = persistenceExt.sliceForPersistenceId(env.persistenceId)
629-
env.offset match {
630-
case t: TimestampOffset =>
631-
if (EnvelopeOrigin.fromQuery(env)) {
632-
if (log.isDebugEnabled()) {
633-
val l = latestBacktracking(slice)
634-
if (l.isAfter(t.timestamp))
635-
log.debug(
636-
"event from query for persistenceId [{}] seqNr [{}] " +
637-
s"timestamp [{}]{} was before last event from backtracking or heartbeat [{}].",
638-
env.persistenceId,
639-
env.sequenceNr,
640-
t.timestamp,
641-
correlationIdLogText,
642-
l)
643-
}
644643

645-
env :: Nil
646-
} else {
647-
if (EnvelopeOrigin.fromBacktracking(env)) {
644+
Flow[EventEnvelope[Event]]
645+
.statefulMapConcat(() => {
646+
// track backtracking offsets per slice
647+
var latestBacktrackingPerSlice = Map.empty[Int, Instant]
648+
def latestBacktracking(slice: Int): Instant = latestBacktrackingPerSlice.get(slice) match {
649+
case Some(instant) => instant
650+
case None => Instant.EPOCH
651+
}
652+
653+
env => {
654+
val slice = persistenceExt.sliceForPersistenceId(env.persistenceId)
655+
val l = latestBacktracking(slice)
656+
env.offset match {
657+
case t: TimestampOffset =>
658+
if (EnvelopeOrigin.fromQuery(env)) {
659+
if (log.isDebugEnabled() && l.isAfter(t.timestamp)) {
660+
log.debug(
661+
"event from query for persistenceId [{}] seqNr [{}] " +
662+
s"timestamp [{}] was before last event from backtracking or heartbeat [{}].",
663+
env.persistenceId,
664+
env.sequenceNr,
665+
t.timestamp,
666+
l)
667+
}
668+
env :: Nil
669+
} else {
670+
if (EnvelopeOrigin.fromBacktracking(env)) {
671+
if (l.isBefore(t.timestamp)) {
648672
latestBacktrackingPerSlice = latestBacktrackingPerSlice.updated(slice, t.timestamp)
649-
env :: Nil
650-
} else if (EnvelopeOrigin.fromHeartbeat(env)) {
673+
}
674+
env :: Nil
675+
} else if (EnvelopeOrigin.fromHeartbeat(env)) {
676+
if (l.isBefore(t.timestamp)) {
651677
latestBacktrackingPerSlice = latestBacktrackingPerSlice.updated(slice, t.timestamp)
652-
Nil // always drop heartbeats
653-
} else if (EnvelopeOrigin.fromPubSub(env) && latestBacktracking(slice) == Instant.EPOCH) {
678+
}
679+
Nil // heartbeats are an internal implementation and must never leak out
680+
} else if (EnvelopeOrigin.fromPubSub(env)) {
681+
if (l == Instant.EPOCH) {
654682
log.trace(
655683
"Dropping pubsub event for persistenceId [{}] seqNr [{}]{} because no event from backtracking yet.",
656684
env.persistenceId,
657685
env.sequenceNr,
658686
correlationIdLogText)
659687
Nil
660-
} else if (EnvelopeOrigin.fromPubSub(env) && JDuration
661-
.between(latestBacktracking(slice), t.timestamp)
662-
.compareTo(maxAheadOfBacktracking) > 0) {
663-
// drop from pubsub when too far ahead from backtracking
664-
log.debug(
665-
"Dropping pubsub event for persistenceId [{}] seqNr [{}]{} because too far ahead of backtracking.",
688+
} else if (JDuration.between(l, t.timestamp).compareTo(backtrackingWindow) > 0) {
689+
// far ahead of backtracking, so adjust the offset (the timestamp in the envelope is unaffected)
690+
// so as to prevent the offset from being moved too far ahead
691+
val offset = TimestampOffset(l.plus(backtrackingWindow), t.readTimestamp, t.seen)
692+
val emittedEnv = new EventEnvelope(
693+
offset,
666694
env.persistenceId,
667695
env.sequenceNr,
668-
correlationIdLogText)
669-
Nil
696+
env.eventOption,
697+
env.timestamp,
698+
env.eventMetadata,
699+
env.entityType,
700+
env.slice,
701+
env.filtered,
702+
env.source,
703+
env.tags)
704+
emittedEnv :: Nil
670705
} else {
671706
env :: Nil
672707
}
708+
} else {
709+
env :: Nil
673710
}
674-
case _ =>
675-
env :: Nil
676-
}
711+
}
712+
case _ => env :: Nil
677713
}
678-
})
714+
}
715+
})
679716
}
680717

681718
// EventTimestampQuery

core/src/test/scala/akka/persistence/dynamodb/query/EventsBySlicePubSubSpec.scala

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ import akka.stream.typed.scaladsl.ActorFlow
4848
import com.typesafe.config.Config
4949
import com.typesafe.config.ConfigFactory
5050
import org.scalatest.wordspec.AnyWordSpecLike
51+
import akka.persistence.query.Offset
5152

5253
object EventsBySlicePubSubSpec {
5354
def config: Config = ConfigFactory
@@ -102,6 +103,18 @@ class EventsBySlicePubSubSpec
102103
source = EnvelopeOrigin.SourcePubSub)
103104
}
104105

106+
private def changeOffset(envelope: EventEnvelope[String], newOffset: Offset): EventEnvelope[String] =
107+
EventEnvelope(
108+
newOffset,
109+
envelope.persistenceId,
110+
envelope.sequenceNr,
111+
envelope.event,
112+
envelope.timestamp,
113+
envelope.entityType,
114+
envelope.slice,
115+
envelope.filtered,
116+
envelope.source)
117+
105118
def backtrackingEnvelope(env: EventEnvelope[String]): EventEnvelope[String] =
106119
new EventEnvelope[String](
107120
env.offset,
@@ -215,16 +228,17 @@ class EventsBySlicePubSubSpec
215228
envelope.eventOption should be(empty)
216229
}
217230

218-
"skipPubSubTooFarAhead" in {
231+
"handlePubSubTooFarAhead" in {
219232
persistenceExt.sliceForPersistenceId(envA1.persistenceId) should not be persistenceExt.sliceForPersistenceId(
220233
envB1.persistenceId)
221234

235+
val backtrackWindowMillis = settings.querySettings.backtrackingWindow.toMillis
236+
222237
val (in, out) =
223238
TestSource[EventEnvelope[String]]()
224239
.via(
225-
query.skipPubSubTooFarAhead(
226-
enabled = true,
227-
maxAheadOfBacktracking = JDuration.ofMillis(settings.querySettings.backtrackingWindow.toMillis),
240+
query.handlePubSubTooFarAhead(
241+
backtrackingWindow = JDuration.ofMillis(backtrackWindowMillis),
228242
correlationId = None))
229243
.toMat(TestSink[EventEnvelope[String]]())(Keep.both)
230244
.run()
@@ -255,21 +269,21 @@ class EventsBySlicePubSubSpec
255269
val time1 = envA1.offset
256270
.asInstanceOf[TimestampOffset]
257271
.timestamp
258-
val time2 = time1
259-
.plusMillis(settings.querySettings.backtrackingWindow.toMillis)
272+
val time2 = time1.plusMillis(backtrackWindowMillis)
260273
val envA3 = createEnvelope(pidA, 3L, "a3", time2.plusMillis(1))
261274
val envA4 = createEnvelope(pidA, 4L, "a4", time2.plusMillis(2))
262275
in.sendNext(envA3)
263276
in.sendNext(envA4)
264-
// dropped because > backtrackingWindow
265-
out.expectNoMessage()
277+
// emitted with adjusted offset based on latest backtracking
278+
out.expectNext(changeOffset(envA3, TimestampOffset(time2, time2.plusMillis(1), Map(pidA.id -> 3L))))
279+
out.expectNext(changeOffset(envA4, TimestampOffset(time2, time2.plusMillis(2), Map(pidA.id -> 4L))))
266280

267281
val pidCSameSlice =
268282
randomPersistenceIdForSlice(entityType, persistenceExt.sliceForPersistenceId(pidA.id))
269283
val envC1 = createEnvelope(pidCSameSlice, 1L, "c1", time2.plusMillis(1))
270284
in.sendNext(envC1)
271-
// dropped because > backtrackingWindow
272-
out.expectNoMessage()
285+
// emitted with adjusted offset based on latest backtracking
286+
out.expectNext(changeOffset(envC1, TimestampOffset(time2, time2.plusMillis(1), Map(pidCSameSlice.id -> 1L))))
273287

274288
val pidDSameSlice =
275289
randomPersistenceIdForSlice(entityType, persistenceExt.sliceForPersistenceId(pidA.id))

0 commit comments

Comments
 (0)