Skip to content

Commit bf5dbae

Browse files
committed
use offset from event envelope
* because then not depending on db migration of the db_timestamp
1 parent 94c3cd2 commit bf5dbae

File tree

3 files changed

+8
-11
lines changed

3 files changed

+8
-11
lines changed

core/src/main/scala/akka/persistence/r2dbc/internal/StartingFromSnapshotStage.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import scala.util.Success
1111
import scala.util.Try
1212

1313
import akka.annotation.InternalApi
14+
import akka.persistence.query.TimestampOffset
1415
import akka.persistence.query.typed.EventEnvelope
1516
import akka.persistence.r2dbc.internal.SnapshotDao.SerializedSnapshotRow
1617
import akka.stream.Attributes
@@ -37,7 +38,7 @@ import akka.util.RecencyList
3738
cacheCapacity: Int,
3839
sequenceNumberOfSnapshot: String => Future[Option[Long]],
3940
loadSnapshot: String => Future[Option[SnapshotDao.SerializedSnapshotRow]],
40-
createEnvelope: SerializedSnapshotRow => EventEnvelope[Event])
41+
createEnvelope: (SerializedSnapshotRow, TimestampOffset) => EventEnvelope[Event])
4142
extends GraphStage[FlowShape[EventEnvelope[Event], EventEnvelope[Event]]] {
4243
import StartingFromSnapshotStage._
4344

@@ -95,7 +96,7 @@ import akka.util.RecencyList
9596
case Success((env, Some(snap))) =>
9697
inProgress = false
9798
if (env.sequenceNr == snap.seqNr) {
98-
push(out, createEnvelope(snap))
99+
push(out, createEnvelope(snap, env.offset.asInstanceOf[TimestampOffset]))
99100
updateState(snap.persistenceId, snap.seqNr, emitted = true)
100101
} else if (env.sequenceNr > snap.seqNr) {
101102
// event is ahead of snapshot, emit event

core/src/main/scala/akka/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -373,10 +373,8 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat
373373
cacheCapacity = settings.querySettings.startFromSnapshotCacheCapacity,
374374
sequenceNumberOfSnapshot = persistenceId => snapshotDao.sequenceNumberOfSnapshot(persistenceId),
375375
loadSnapshot = persistenceId => snapshotDao.load(persistenceId, SnapshotSelectionCriteria.Latest),
376-
createEnvelope = snapshotRow => {
377-
val offset = TimestampOffset(snapshotRow.dbTimestamp, Map(snapshotRow.persistenceId -> snapshotRow.seqNr))
378-
createEnvelopeFromSnapshot(snapshotRow, offset, transformSnapshot)
379-
})))
376+
createEnvelope =
377+
(snapshotRow, offset) => createEnvelopeFromSnapshot(snapshotRow, offset, transformSnapshot))))
380378
}
381379

382380
/**
@@ -425,10 +423,8 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat
425423
cacheCapacity = settings.querySettings.startFromSnapshotCacheCapacity,
426424
sequenceNumberOfSnapshot = persistenceId => snapshotDao.sequenceNumberOfSnapshot(persistenceId),
427425
loadSnapshot = persistenceId => snapshotDao.load(persistenceId, SnapshotSelectionCriteria.Latest),
428-
createEnvelope = snapshotRow => {
429-
val offset = TimestampOffset(snapshotRow.dbTimestamp, Map(snapshotRow.persistenceId -> snapshotRow.seqNr))
430-
createEnvelopeFromSnapshot(snapshotRow, offset, transformSnapshot)
431-
})))
426+
createEnvelope =
427+
(snapshotRow, offset) => createEnvelopeFromSnapshot(snapshotRow, offset, transformSnapshot))))
432428
}
433429

434430
private def checkStartFromSnapshotEnabled(methodName: String): Unit =

core/src/test/scala/akka/persistence/r2dbc/query/StartingFromSnapshotStageSpec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ class StartingFromSnapshotStageSpec extends ScalaTestWithActorTestKit with AnyWo
9292
def sequenceNumberOfSnapshot(persistenceId: String): Future[Option[Long]] =
9393
Future.successful(findEnvelope(PersistenceId.ofUniqueId(persistenceId), snapshotEnvelopes).map(_.sequenceNr))
9494

95-
def createEnvelopeFromSnapshotRow(snap: SerializedSnapshotRow): EventEnvelope[Any] =
95+
def createEnvelopeFromSnapshotRow(snap: SerializedSnapshotRow, offset: TimestampOffset): EventEnvelope[Any] =
9696
findEnvelope(PersistenceId.ofUniqueId(snap.persistenceId), snap.seqNr, snapshotEnvelopes)
9797
.getOrElse(throw new IllegalArgumentException(s"Unknown envelope for [$snap]"))
9898

0 commit comments

Comments
 (0)