diff --git a/core/src/main/mima-filters/1.3.11.backwards.excludes/start-from-snapshot.excludes b/core/src/main/mima-filters/1.3.11.backwards.excludes/start-from-snapshot.excludes new file mode 100644 index 00000000..83bb6dc0 --- /dev/null +++ b/core/src/main/mima-filters/1.3.11.backwards.excludes/start-from-snapshot.excludes @@ -0,0 +1,2 @@ +# internals +ProblemFilters.exclude[MissingClassProblem]("akka.persistence.r2dbc.internal.postgres.YugabyteSnapshotDao") diff --git a/core/src/main/resources/reference.conf b/core/src/main/resources/reference.conf index b2d648b0..02f5f608 100644 --- a/core/src/main/resources/reference.conf +++ b/core/src/main/resources/reference.conf @@ -166,6 +166,16 @@ akka.persistence.r2dbc { # a snapshot. # See also https://doc.akka.io/libraries/akka-persistence-r2dbc/current/migration-guide.html#eventsBySlicesStartingFromSnapshots enabled = false + + # For filtering the events before the snapshots it keeps track of sequence numbers for + # the snapshots. This is the max size of the cache for these sequence numbers. When missing + # in the cache it will load the sequence number from the database. + cache-capacity = 10000 + + # When filtering many events before the snapshots it is still good to occasionally emit + # fictive heartbeat events for progress observations and offset storage downstreams. + # This is the number of filtered events until a heartbeat is emitted. + heartbeat-after = 1000 } # Cache TTL for latestEventTimestamp queries. Setting this to a positive duration enables caching of diff --git a/core/src/main/scala/akka/persistence/r2dbc/R2dbcSettings.scala b/core/src/main/scala/akka/persistence/r2dbc/R2dbcSettings.scala index a17310a1..2f8b679d 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/R2dbcSettings.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/R2dbcSettings.scala @@ -506,6 +506,8 @@ final class QuerySettings(config: Config) { val persistenceIdsBufferSize: Int = config.getInt("persistence-ids.buffer-size") val deduplicateCapacity: Int = config.getInt("deduplicate-capacity") val startFromSnapshotEnabled: Boolean = config.getBoolean("start-from-snapshot.enabled") + val startFromSnapshotCacheCapacity: Int = config.getInt("start-from-snapshot.cache-capacity") + val startFromSnapshotHeartbeatAfter: Int = config.getInt("start-from-snapshot.heartbeat-after") val cacheLatestEventTimestamp: Option[FiniteDuration] = config.optDuration("cache-latest-event-timestamp") } diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala index 9d561fb3..83f51938 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala @@ -273,8 +273,7 @@ import org.slf4j.Logger entityType: String, minSlice: Int, maxSlice: Int, - offset: Offset, - filterEventsBeforeSnapshots: (String, Long, String) => Boolean = (_, _, _) => true): Source[Envelope, NotUsed] = { + offset: Offset): Source[Envelope, NotUsed] = { val initialOffset = toTimestampOffset(offset) def nextOffset(state: QueryState, envelope: Envelope): QueryState = { @@ -326,9 +325,6 @@ import org.slf4j.Logger behindCurrentTime = Duration.Zero, backtracking = false, correlationId) - .filter { row => - filterEventsBeforeSnapshots(row.persistenceId, row.seqNr, row.source) - } .via(deserializeAndAddOffset(state.latest))) } else { if (log.isDebugEnabled) @@ -369,8 +365,7 @@ import org.slf4j.Logger entityType: String, minSlice: Int, maxSlice: Int, - offset: Offset, - filterEventsBeforeSnapshots: (String, Long, String) => Boolean = (_, _, _) => true): Source[Envelope, NotUsed] = { + offset: Offset): Source[Envelope, NotUsed] = { val initialOffset = toTimestampOffset(offset) if (log.isDebugEnabled()) @@ -561,9 +556,6 @@ import org.slf4j.Logger behindCurrentTime, backtracking = newState.backtracking, correlationId) - .filter { row => - filterEventsBeforeSnapshots(row.persistenceId, row.seqNr, row.source) - } .via(deserializeAndAddOffset(newState.currentOffset))) } diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/SnapshotDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/SnapshotDao.scala index 5deccc7c..eff3dea8 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/SnapshotDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/SnapshotDao.scala @@ -42,11 +42,12 @@ private[r2dbc] object SnapshotDao { * INTERNAL API */ @InternalApi -private[r2dbc] trait SnapshotDao extends BySliceQuery.Dao[SnapshotDao.SerializedSnapshotRow] { +private[r2dbc] trait SnapshotDao { import SnapshotDao._ def load(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SerializedSnapshotRow]] def store(serializedRow: SerializedSnapshotRow): Future[Unit] def delete(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit] + def sequenceNumberOfSnapshot(persistenceId: String): Future[Option[Long]] } diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/StartingFromSnapshotStage.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/StartingFromSnapshotStage.scala index 030427a4..d95bf945 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/StartingFromSnapshotStage.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/StartingFromSnapshotStage.scala @@ -6,99 +6,205 @@ package akka.persistence.r2dbc.internal import java.time.Instant -import akka.NotUsed +import scala.concurrent.ExecutionContext +import scala.concurrent.Future +import scala.util.Failure +import scala.util.Success +import scala.util.Try + import akka.annotation.InternalApi -import akka.persistence.query.TimestampOffset.toTimestampOffset +import akka.persistence.query.TimestampOffset import akka.persistence.query.typed.EventEnvelope +import akka.persistence.r2dbc.internal.SnapshotDao.SerializedSnapshotRow import akka.stream.Attributes +import akka.stream.FlowShape +import akka.stream.Inlet import akka.stream.Outlet -import akka.stream.SourceShape -import akka.stream.scaladsl.Keep -import akka.stream.scaladsl.Source import akka.stream.stage.GraphStage import akka.stream.stage.GraphStageLogic import akka.stream.stage.InHandler import akka.stream.stage.OutHandler +import akka.util.RecencyList /** * INTERNAL API */ -@InternalApi private[r2dbc] class StartingFromSnapshotStage[Event]( - snapshotSource: Source[EventEnvelope[Event], NotUsed], - primarySource: Map[String, (Long, Instant)] => Source[EventEnvelope[Event], NotUsed]) - extends GraphStage[SourceShape[EventEnvelope[Event]]] { +@InternalApi private[r2dbc] object StartingFromSnapshotStage { + private case class SnapshotState(seqNr: Long, emitted: Boolean) +} +/** + * INTERNAL API + */ +@InternalApi private[r2dbc] class StartingFromSnapshotStage[Event]( + cacheCapacity: Int, + sequenceNumberOfSnapshot: String => Future[Option[Long]], + loadSnapshot: String => Future[Option[SnapshotDao.SerializedSnapshotRow]], + createEnvelope: (SerializedSnapshotRow, TimestampOffset) => EventEnvelope[Event], + heartbeatAfter: Int, + createHeartbeat: Instant => EventEnvelope[Event]) + extends GraphStage[FlowShape[EventEnvelope[Event], EventEnvelope[Event]]] { + import StartingFromSnapshotStage._ + + val in: Inlet[EventEnvelope[Event]] = Inlet("in") val out: Outlet[EventEnvelope[Event]] = Outlet("out") - override val shape: SourceShape[EventEnvelope[Event]] = SourceShape(out) + override val shape: FlowShape[EventEnvelope[Event], EventEnvelope[Event]] = + FlowShape(in, out) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = - new GraphStageLogic(shape) { self => - setHandler( - out, - new OutHandler { - override def onPull(): Unit = { - val snapshotHandler = new SnapshotHandler - setHandler(out, snapshotHandler) - - subFusingMaterializer.materialize( - snapshotSource.toMat(snapshotHandler.subSink.sink)(Keep.left), - inheritedAttributes) + new GraphStageLogic(shape) with InHandler with OutHandler { self => + private implicit def ec: ExecutionContext = materializer.executionContext + + private var snapshotState = Map.empty[String, SnapshotState] + private val recency = RecencyList.emptyWithNanoClock[String] + private var inProgress = false + + // for emitting heartbeat events + private var filterCount = 0L + private var latestTimestamp = Instant.EPOCH + + private def updateState(persistenceId: String, seqNr: Long, emitted: Boolean): Unit = { + snapshotState = snapshotState.updated(persistenceId, SnapshotState(seqNr, emitted)) + recency.update(persistenceId) + if (recency.size > cacheCapacity) + recency.removeLeastRecent().foreach { pid => + snapshotState -= pid } - }) + } - class SnapshotHandler extends OutHandler with InHandler { - private var snapshotOffsets = Map.empty[String, (Long, Instant)] + private val seqNrOfSnapshotCallback = getAsyncCallback[Try[(EventEnvelope[Event], Option[Long])]] { + case Success((env, Some(seqNr))) => + inProgress = false + if (env.sequenceNr == seqNr) { + // snapshot should be emitted, load full snapshot + loadCorrespondingSnapshot(env) + } else if (env.sequenceNr > seqNr) { + // event is ahead of snapshot, emit event + updateState(env.persistenceId, seqNr, emitted = false) + push(env) + } else { + // snapshot will be emitted later, ignore event + updateState(env.persistenceId, seqNr, emitted = false) + ignore(env) + } - val subSink = new SubSinkInlet[EventEnvelope[Event]]("snapshots") - subSink.pull() - subSink.setHandler(this) + case Success((env, None)) => + inProgress = false + // no snapshot, emit event + updateState(env.persistenceId, 0L, emitted = true) + push(env) - override def onPull(): Unit = { - subSink.pull() - } + case Failure(exc) => + inProgress = false + failStage(exc) + } - override def onPush(): Unit = { - val env = subSink.grab() - snapshotOffsets = - snapshotOffsets.updated(env.persistenceId, env.sequenceNr -> toTimestampOffset(env.offset).timestamp) - push(out, env) - } + private val loadSnapshotCallback = getAsyncCallback[Try[(EventEnvelope[Event], Option[SerializedSnapshotRow])]] { + case Success((env, Some(snap))) => + inProgress = false + if (env.sequenceNr == snap.seqNr) { + push(createEnvelope(snap, env.offset.asInstanceOf[TimestampOffset])) + updateState(snap.persistenceId, snap.seqNr, emitted = true) + } else if (env.sequenceNr > snap.seqNr) { + // event is ahead of snapshot, emit event + updateState(snap.persistenceId, snap.seqNr, emitted = false) + push(env) + } else { + // snapshot will be emitted later, ignore event + updateState(snap.persistenceId, snap.seqNr, emitted = false) + ignore(env) + } + + case Success((env, None)) => + inProgress = false + // no snapshot, emit event + updateState(env.persistenceId, 0L, emitted = true) + push(env) - override def onUpstreamFinish(): Unit = { - val primaryHandler = new PrimaryHandler(isAvailable(out)) - self.setHandler(out, primaryHandler) + case Failure(exc) => + inProgress = false + failStage(exc) + } - subFusingMaterializer.materialize( - primarySource(snapshotOffsets).toMat(primaryHandler.subSink.sink)(Keep.left), - inheritedAttributes) + override def onPush(): Unit = { + val env = grab(in) + snapshotState.get(env.persistenceId) match { + case Some(s) => + val eventIsAfterSnapshot = env.sequenceNr > s.seqNr + + if (eventIsAfterSnapshot) { + // event is after snapshot, emit event + // we can't ignore it when snapshot is not emitted, because it might have been emitted in + // previous incarnation, but then the stream was restarted + push(env) + } else if (!s.emitted && env.sequenceNr == s.seqNr) { + // trigger emit of snapshot + loadCorrespondingSnapshot(env) + } else { + // event is before (or same as) snapshot, ignore + ignore(env) + } + + case None => + seqNrOfCorrespondingSnapshot(env) } + } + + override def onUpstreamFinish(): Unit = { + if (!inProgress) + super.onUpstreamFinish() + } - override def onDownstreamFinish(cause: Throwable): Unit = { - subSink.cancel(cause) + private def tryPullOrComplete(): Unit = { + if (isClosed(in)) completeStage() - } + else + pull(in) } - class PrimaryHandler(pullImmediately: Boolean) extends OutHandler with InHandler { - val subSink = new SubSinkInlet[EventEnvelope[Event]]("snapshots") - if (pullImmediately) subSink.pull() - subSink.setHandler(this) + private def push(env: EventEnvelope[Event]): Unit = { + filterCount = 0L + push(out, env) + } - override def onPull(): Unit = { - subSink.pull() + private def ignore(env: EventEnvelope[Event]): Unit = { + updateLatestTimestamp(env) + filterCount += 1 + if (filterCount >= heartbeatAfter) { + push(createHeartbeat(latestTimestamp)) + } else { + tryPullOrComplete() } + } - override def onPush(): Unit = { - push(out, subSink.grab()) - } + private def updateLatestTimestamp(env: EventEnvelope[Event]): Unit = { + val timestamp = env.offset.asInstanceOf[TimestampOffset].timestamp + if (timestamp.isAfter(latestTimestamp)) + latestTimestamp = timestamp + } - override def onDownstreamFinish(cause: Throwable): Unit = { - subSink.cancel(cause) - completeStage() - } + private def seqNrOfCorrespondingSnapshot(env: EventEnvelope[Event]): Unit = { + inProgress = true + sequenceNumberOfSnapshot(env.persistenceId) + .map(result => (env, result))(ExecutionContext.parasitic) + .onComplete(seqNrOfSnapshotCallback.invoke) + } + + private def loadCorrespondingSnapshot(env: EventEnvelope[Event]): Unit = { + inProgress = true + loadSnapshot(env.persistenceId) + .map(result => (env, result))(ExecutionContext.parasitic) + .onComplete(loadSnapshotCallback.invoke) } + override def onPull(): Unit = { + tryPullOrComplete() + } + + setHandler(in, this) + setHandler(out, this) + } } diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresSnapshotDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresSnapshotDao.scala index 16845789..ffde9d77 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresSnapshotDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresSnapshotDao.scala @@ -5,35 +5,31 @@ package akka.persistence.r2dbc.internal.postgres import java.time.Instant + import scala.concurrent.ExecutionContext import scala.concurrent.Future -import scala.concurrent.duration.FiniteDuration + import io.r2dbc.spi.Row import io.r2dbc.spi.Statement import org.slf4j.Logger import org.slf4j.LoggerFactory -import akka.NotUsed + import akka.actor.typed.ActorSystem import akka.annotation.InternalApi import akka.persistence.Persistence import akka.persistence.SnapshotSelectionCriteria import akka.persistence.r2dbc.R2dbcSettings -import akka.persistence.r2dbc.internal.BySliceQuery.Buckets -import akka.persistence.r2dbc.internal.BySliceQuery.Buckets.Bucket -import akka.persistence.r2dbc.internal.CorrelationId -import akka.persistence.r2dbc.internal.InstantFactory -import akka.persistence.r2dbc.internal.codec.PayloadCodec.RichRow -import akka.persistence.r2dbc.internal.codec.PayloadCodec.RichStatement import akka.persistence.r2dbc.internal.R2dbcExecutorProvider import akka.persistence.r2dbc.internal.SnapshotDao import akka.persistence.r2dbc.internal.Sql import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter -import akka.persistence.r2dbc.internal.codec.TagsCodec.TagsCodecRichStatement +import akka.persistence.r2dbc.internal.codec.PayloadCodec.RichRow +import akka.persistence.r2dbc.internal.codec.PayloadCodec.RichStatement import akka.persistence.r2dbc.internal.codec.TagsCodec.TagsCodecRichRow +import akka.persistence.r2dbc.internal.codec.TagsCodec.TagsCodecRichStatement import akka.persistence.r2dbc.internal.codec.TimestampCodec.TimestampCodecRichRow import akka.persistence.r2dbc.internal.codec.TimestampCodec.TimestampCodecRichStatement import akka.persistence.typed.PersistenceId -import akka.stream.scaladsl.Source /** * INTERNAL API @@ -50,9 +46,10 @@ private[r2dbc] class PostgresSnapshotDao(executorProvider: R2dbcExecutorProvider protected val settings: R2dbcSettings = executorProvider.settings protected val system: ActorSystem[_] = executorProvider.system implicit protected val ec: ExecutionContext = executorProvider.ec - import SnapshotDao._ import settings.codecSettings.SnapshotImplicits._ + import SnapshotDao._ + protected def log: Logger = PostgresSnapshotDao.log private val sqlCache = Sql.Cache(settings.numberOfDataPartitions > 1) @@ -140,6 +137,16 @@ private[r2dbc] class PostgresSnapshotDao(executorProvider: R2dbcExecutorProvider createSql // no cache } + protected def selectSeqNrSql(slice: Int): String = { + sqlCache.get(slice, "selectSeqNrSql") { + sql""" + SELECT seq_nr + FROM ${snapshotTable(slice)} + WHERE persistence_id = ? + LIMIT 1""" + } + } + private def deleteSql(slice: Int, criteria: SnapshotSelectionCriteria): String = { // not caching, too many combinations @@ -165,36 +172,6 @@ private[r2dbc] class PostgresSnapshotDao(executorProvider: R2dbcExecutorProvider $maxSeqNrCondition $minSeqNrCondition $maxTimestampCondition $minTimestampCondition""" } - private val currentDbTimestampSql = - sql"SELECT CURRENT_TIMESTAMP AS db_timestamp" - - protected def snapshotsBySlicesRangeSql(minSlice: Int, maxSlice: Int): String = - sqlCache.get(minSlice, s"snapshotsBySlicesRangeSql-$minSlice-$maxSlice") { - sql""" - SELECT slice, persistence_id, seq_nr, db_timestamp, write_timestamp, snapshot, ser_id, ser_manifest, tags, meta_payload, meta_ser_id, meta_ser_manifest - FROM ${snapshotTable(minSlice)} - WHERE entity_type = ? - AND ${sliceCondition(minSlice, maxSlice)} - AND db_timestamp >= ? - ORDER BY db_timestamp, seq_nr - LIMIT ?""" - } - - protected def selectBucketsSql(minSlice: Int, maxSlice: Int): String = - sqlCache.get(minSlice, s"selectBucketsSql-$minSlice-$maxSlice") { - sql""" - SELECT extract(EPOCH from db_timestamp)::BIGINT / 10 AS bucket, count(*) AS count - FROM ${snapshotTable(minSlice)} - WHERE entity_type = ? - AND ${sliceCondition(minSlice, maxSlice)} - AND db_timestamp >= ? AND db_timestamp <= ? - GROUP BY bucket ORDER BY bucket LIMIT ? - """ - } - - protected def sliceCondition(minSlice: Int, maxSlice: Int): String = - s"slice in (${(minSlice to maxSlice).mkString(",")})" - private def collectSerializedSnapshot(entityType: String, row: Row): SerializedSnapshotRow = { val writeTimestamp = row.get[java.lang.Long]("write_timestamp", classOf[java.lang.Long]) @@ -272,6 +249,16 @@ private[r2dbc] class PostgresSnapshotDao(executorProvider: R2dbcExecutorProvider .map(_.headOption)(ExecutionContext.parasitic) } + override def sequenceNumberOfSnapshot(persistenceId: String): Future[Option[Long]] = { + val slice = persistenceExt.sliceForPersistenceId(persistenceId) + val executor = executorProvider.executorFor(slice) + executor + .selectOne(s"sequenceNumberOfSnapshot [$persistenceId]")( + _.createStatement(selectSeqNrSql(slice)) + .bind(0, persistenceId), + _.get[java.lang.Long]("seq_nr", classOf[java.lang.Long])) + } + protected def bindUpsertSql(statement: Statement, serializedRow: SerializedSnapshotRow): Statement = { statement .bind(0, serializedRow.slice) @@ -350,132 +337,4 @@ private[r2dbc] class PostgresSnapshotDao(executorProvider: R2dbcExecutorProvider } }.map(_ => ())(ExecutionContext.parasitic) - /** - * This is used from `BySliceQuery`, i.e. only if settings.querySettings.startFromSnapshotEnabled - */ - override def currentDbTimestamp(slice: Int): Future[Instant] = { - val executor = executorProvider.executorFor(slice) - executor - .selectOne("select current db timestamp")( - connection => connection.createStatement(currentDbTimestampSql), - row => row.getTimestamp("db_timestamp")) - .map { - case Some(time) => time - case None => throw new IllegalStateException(s"Expected one row for: $currentDbTimestampSql") - } - } - - protected def bindSnapshotsBySlicesRangeSql( - stmt: Statement, - entityType: String, - fromTimestamp: Instant, - bufferSize: Int): Statement = { - stmt - .bind(0, entityType) - .bindTimestamp(1, fromTimestamp) - .bind(2, settings.querySettings.bufferSize) - } - - /** - * This is used from `BySliceQuery`, i.e. only if settings.querySettings.startFromSnapshotEnabled - */ - override def rowsBySlices( - entityType: String, - minSlice: Int, - maxSlice: Int, - fromTimestamp: Instant, - fromSeqNr: Option[Long], - toTimestamp: Option[Instant], - behindCurrentTime: FiniteDuration, - backtracking: Boolean, - correlationId: Option[String]): Source[SerializedSnapshotRow, NotUsed] = { - if (!settings.isSliceRangeWithinSameDataPartition(minSlice, maxSlice)) - throw new IllegalArgumentException( - s"Slice range [$minSlice-$maxSlice] spans over more than one " + - s"of the [${settings.numberOfDataPartitions}] data partitions" + CorrelationId.toLogText(correlationId)) - - val executor = executorProvider.executorFor(minSlice) - val result = executor.select(s"select snapshotsBySlices [$minSlice - $maxSlice]")( - connection => { - val stmt = connection.createStatement(snapshotsBySlicesRangeSql(minSlice, maxSlice)) - bindSnapshotsBySlicesRangeSql(stmt, entityType, fromTimestamp, settings.querySettings.bufferSize) - }, - collectSerializedSnapshot(entityType, _)) - - if (log.isDebugEnabled) { - val correlationText = CorrelationId.toLogText(correlationId) - result.foreach(rows => - log.debug("Read [{}] snapshots from slices [{} - {}]{}", rows.size, minSlice, maxSlice, correlationText)) - } - - Source.futureSource(result.map(Source(_))).mapMaterializedValue(_ => NotUsed) - } - - /** - * Counts for a bucket may become inaccurate when existing snapshots are updated since the timestamp is changed. This - * is used from `BySliceQuery`, i.e. only if settings.querySettings.startFromSnapshotEnabled - */ - override def countBucketsMayChange: Boolean = true - - protected def bindSelectBucketsSql( - stmt: Statement, - entityType: String, - fromTimestamp: Instant, - toTimestamp: Instant, - limit: Int): Statement = { - stmt - .bind(0, entityType) - .bindTimestamp(1, fromTimestamp) - .bindTimestamp(2, toTimestamp) - .bind(3, limit) - } - - /** - * This is used from `BySliceQuery`, i.e. only if settings.querySettings.startFromSnapshotEnabled - */ - override def countBuckets( - entityType: String, - minSlice: Int, - maxSlice: Int, - fromTimestamp: Instant, - limit: Int, - correlationId: Option[String]): Future[Seq[Bucket]] = { - - val now = InstantFactory.now() // not important to use database time - val toTimestamp = { - if (fromTimestamp == Instant.EPOCH) - now - else { - // max buckets, just to have some upper bound - val t = fromTimestamp.plusSeconds(Buckets.BucketDurationSeconds * limit + Buckets.BucketDurationSeconds) - if (t.isAfter(now)) now else t - } - } - - val executor = executorProvider.executorFor(minSlice) - - val result = executor.select(s"select bucket counts [$minSlice - $maxSlice]")( - connection => { - val stmt = connection.createStatement(selectBucketsSql(minSlice, maxSlice)) - bindSelectBucketsSql(stmt, entityType, fromTimestamp, toTimestamp, limit) - }, - row => { - val bucketStartEpochSeconds = row.get("bucket", classOf[java.lang.Long]).toLong * 10 - val count = row.get[java.lang.Long]("count", classOf[java.lang.Long]).toLong - Bucket(bucketStartEpochSeconds, count) - }) - - if (log.isDebugEnabled) { - val correlationText = CorrelationId.toLogText(correlationId) - result.foreach(rows => - log.debug("Read [{}] bucket counts from slices [{} - {}]{}", rows.size, minSlice, maxSlice, correlationText)) - } - - if (toTimestamp == now) - result - else - result.map(appendEmptyBucketIfLastIsMissing(_, toTimestamp)) - - } - } diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/YugabyteDialect.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/YugabyteDialect.scala index 66b95916..271f853d 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/YugabyteDialect.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/YugabyteDialect.scala @@ -40,7 +40,7 @@ private[r2dbc] object YugabyteDialect extends Dialect { new PostgresJournalDao(executorProvider) override def createSnapshotDao(executorProvider: R2dbcExecutorProvider): SnapshotDao = - new YugabyteSnapshotDao(executorProvider) + new PostgresSnapshotDao(executorProvider) override def createQueryDao(executorProvider: R2dbcExecutorProvider): QueryDao = new YugabyteQueryDao(executorProvider) diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/YugabyteSnapshotDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/YugabyteSnapshotDao.scala deleted file mode 100644 index eb9eb2eb..00000000 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/YugabyteSnapshotDao.scala +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Copyright (C) 2022 - 2025 Lightbend Inc. - */ - -package akka.persistence.r2dbc.internal.postgres - -import org.slf4j.Logger -import org.slf4j.LoggerFactory - -import akka.annotation.InternalApi -import akka.persistence.r2dbc.internal.R2dbcExecutorProvider - -/** - * INTERNAL API - */ -@InternalApi -private[r2dbc] final class YugabyteSnapshotDao(executorProvider: R2dbcExecutorProvider) - extends PostgresSnapshotDao(executorProvider) { - - override protected lazy val log: Logger = LoggerFactory.getLogger(classOf[YugabyteSnapshotDao]) - - override protected def sliceCondition(minSlice: Int, maxSlice: Int): String = { - s"slice BETWEEN $minSlice AND $maxSlice" - } - -} diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerSnapshotDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerSnapshotDao.scala index 333fea55..bd391dff 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerSnapshotDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerSnapshotDao.scala @@ -4,17 +4,12 @@ package akka.persistence.r2dbc.internal.sqlserver -import java.time.Instant - -import scala.concurrent.Future - import io.r2dbc.spi.Statement import org.slf4j.Logger import org.slf4j.LoggerFactory import akka.annotation.InternalApi import akka.persistence.SnapshotSelectionCriteria -import akka.persistence.r2dbc.internal.InstantFactory import akka.persistence.r2dbc.internal.R2dbcExecutorProvider import akka.persistence.r2dbc.internal.SnapshotDao.SerializedSnapshotMetadata import akka.persistence.r2dbc.internal.SnapshotDao.SerializedSnapshotRow @@ -86,6 +81,16 @@ private[r2dbc] class SqlServerSnapshotDao(executorProvider: R2dbcExecutorProvide } + override protected def selectSeqNrSql(slice: Int): String = { + sqlCache.get(slice, "selectSeqNrSql") { + sql""" + SELECT TOP(1) seq_nr + FROM ${snapshotTable(slice)} + WHERE persistence_id = @persistenceId + """ + } + } + override protected def upsertSql(slice: Int): String = sqlCache.get(slice, "upsertSql") { if (settings.querySettings.startFromSnapshotEnabled) @@ -161,59 +166,4 @@ private[r2dbc] class SqlServerSnapshotDao(executorProvider: R2dbcExecutorProvide statement } - override protected def bindSelectBucketsSql( - stmt: Statement, - entityType: String, - fromTimestamp: Instant, - toTimestamp: Instant, - limit: Int): Statement = { - stmt - .bind("@limit", limit) - .bind("@entityType", entityType) - .bindTimestamp("@fromTimestamp", fromTimestamp) - .bindTimestamp("@toTimestamp", toTimestamp) - } - - override protected def selectBucketsSql(minSlice: Int, maxSlice: Int): String = - sqlCache.get(minSlice, s"selectBucketsSql-$minSlice-$maxSlice") { - // group by column alias (bucket) needs a sub query - val subQuery = - s""" - select TOP(@limit) CAST(DATEDIFF(s,'1970-01-01 00:00:00',db_timestamp) AS BIGINT) / 10 AS bucket - FROM ${snapshotTable(minSlice)} - WHERE entity_type = @entityType - AND ${sliceCondition(minSlice, maxSlice)} - AND db_timestamp >= @fromTimestamp AND db_timestamp <= @toTimestamp - """ - sql""" - SELECT bucket, count(*) as count from ($subQuery) as sub - GROUP BY bucket ORDER BY bucket - """ - } - - override protected def bindSnapshotsBySlicesRangeSql( - stmt: Statement, - entityType: String, - fromTimestamp: Instant, - bufferSize: Int): Statement = { - stmt - .bind("@bufferSize", settings.querySettings.bufferSize) - .bind("@entityType", entityType) - .bindTimestamp("@fromTimestamp", fromTimestamp) - } - - override protected def snapshotsBySlicesRangeSql(minSlice: Int, maxSlice: Int): String = - sqlCache.get(minSlice, s"snapshotsBySlicesRangeSql-$minSlice-$maxSlice") { - sql""" - SELECT TOP(@bufferSize) slice, persistence_id, seq_nr, db_timestamp, write_timestamp, snapshot, ser_id, ser_manifest, tags, meta_payload, meta_ser_id, meta_ser_manifest - FROM ${snapshotTable(minSlice)} - WHERE entity_type = @entityType - AND ${sliceCondition(minSlice, maxSlice)} - AND db_timestamp >= @fromTimestamp - ORDER BY db_timestamp, seq_nr - """ - } - - override def currentDbTimestamp(slice: Int): Future[Instant] = Future.successful(InstantFactory.now()) - } diff --git a/core/src/main/scala/akka/persistence/r2dbc/query/javadsl/R2dbcReadJournal.scala b/core/src/main/scala/akka/persistence/r2dbc/query/javadsl/R2dbcReadJournal.scala index 3d13d5ee..c2d94e12 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/query/javadsl/R2dbcReadJournal.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/query/javadsl/R2dbcReadJournal.scala @@ -109,13 +109,11 @@ final class R2dbcReadJournal(delegate: scaladsl.R2dbcReadJournal) /** * Same as `currentEventsBySlices` but with the purpose to use snapshots as starting points and thereby reducing - * number of events that have to be loaded. This can be useful if the consumer start from zero without any previously - * processed offset or if it has been disconnected for a long while and its offset is far behind. - * - * First it loads all snapshots with timestamps greater than or equal to the offset timestamp. There is at most one - * snapshot per persistenceId. The snapshots are transformed to events with the given `transformSnapshot` function. + * number of events that have to be processed. This can be useful if the consumer start from zero without any + * previously processed offset or if it has been disconnected for a long while and its offset is far behind. * - * After emitting the snapshot events the ordinary events with sequence numbers after the snapshots are emitted. + * Snapshot events and ordinary events are interleaved, skipping events before known snapshots. After emitting the + * snapshot events the ordinary events with sequence numbers after the snapshots are emitted. * * To use `currentEventsBySlicesStartingFromSnapshots` you must enable configuration * `akka.persistence.r2dbc.query.start-from-snapshot.enabled` and follow instructions in migration guide @@ -133,13 +131,11 @@ final class R2dbcReadJournal(delegate: scaladsl.R2dbcReadJournal) /** * Same as `eventsBySlices` but with the purpose to use snapshots as starting points and thereby reducing number of - * events that have to be loaded. This can be useful if the consumer start from zero without any previously processed - * offset or if it has been disconnected for a long while and its offset is far behind. - * - * First it loads all snapshots with timestamps greater than or equal to the offset timestamp. There is at most one - * snapshot per persistenceId. The snapshots are transformed to events with the given `transformSnapshot` function. + * events that have to be processed. This can be useful if the consumer start from zero without any previously + * processed offset or if it has been disconnected for a long while and its offset is far behind. * - * After emitting the snapshot events the ordinary events with sequence numbers after the snapshots are emitted. + * Snapshot events and ordinary events are interleaved, skipping events before known snapshots. After emitting the + * snapshot events the ordinary events with sequence numbers after the snapshots are emitted. * * To use `eventsBySlicesStartingFromSnapshots` you must enable configuration * `akka.persistence.r2dbc.query.start-from-snapshot.enabled` and follow instructions in migration guide diff --git a/core/src/main/scala/akka/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala b/core/src/main/scala/akka/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala index b53c250a..71594cee 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala @@ -9,12 +9,15 @@ import java.time.Instant import java.time.{ Duration => JDuration } import java.util.UUID import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicLong + import scala.annotation.tailrec import scala.collection.immutable import scala.collection.mutable import scala.concurrent.Future import scala.concurrent.duration.Duration import scala.concurrent.duration.FiniteDuration + import akka.NotUsed import akka.actor.ExtendedActorSystem import akka.actor.typed.pubsub.Topic @@ -28,7 +31,6 @@ import akka.persistence.SnapshotSelectionCriteria import akka.persistence.query.Offset import akka.persistence.query.QueryCorrelationId import akka.persistence.query.TimestampOffset -import akka.persistence.query.TimestampOffset.toTimestampOffset import akka.persistence.query.scaladsl._ import akka.persistence.query.typed.EventEnvelope import akka.persistence.query.typed.scaladsl.CurrentEventsByPersistenceIdStartingFromSnapshotQuery @@ -128,6 +130,8 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat // key is tuple of entity type and slice private val heartbeatPersistenceIds = new ConcurrentHashMap[(String, Int), String]() private val heartbeatUuid = UUID.randomUUID().toString + // gaps are allowed for heartbeat sequence numbers, but increasing for each heartbeat pid (uuid makes it unique) + private val heartbeatSeqNr = new AtomicLong log.debug("Using heartbeat UUID [{}]", heartbeatUuid) // Optional caching of latestEventTimestamp results @@ -207,7 +211,7 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat new EventEnvelope( TimestampOffset(timestamp, Map.empty), heartbeatPersistenceId(entityType, slice), - 1L, + heartbeatSeqNr.incrementAndGet(), eventOption = None, timestamp.toEpochMilli, _eventMetadata = None, @@ -230,23 +234,6 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat } } - private def snapshotsBySlice[Snapshot, Event]( - entityType: String, - minSlice: Int, - transformSnapshot: Snapshot => Event): BySliceQuery[SerializedSnapshotRow, EventEnvelope[Event]] = { - val createEnvelope: (TimestampOffset, SerializedSnapshotRow) => EventEnvelope[Event] = - (offset, row) => createEnvelopeFromSnapshot(row, offset, transformSnapshot) - - val extractOffset: EventEnvelope[Event] => TimestampOffset = env => env.offset.asInstanceOf[TimestampOffset] - - val createHeartbeat: Instant => Option[EventEnvelope[Event]] = { timestamp => - Some(createEventEnvelopeHeartbeat(entityType, minSlice, timestamp).asInstanceOf[EventEnvelope[Event]]) - } - - new BySliceQuery(snapshotDao, createEnvelope, extractOffset, createHeartbeat, clock, settings, log)( - typedSystem.executionContext) - } - private def createEnvelopeFromSnapshot[Snapshot, Event]( row: SerializedSnapshotRow, offset: TimestampOffset, @@ -354,13 +341,11 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat /** * Same as `currentEventsBySlices` but with the purpose to use snapshots as starting points and thereby reducing - * number of events that have to be loaded. This can be useful if the consumer start from zero without any previously - * processed offset or if it has been disconnected for a long while and its offset is far behind. - * - * First it loads all snapshots with timestamps greater than or equal to the offset timestamp. There is at most one - * snapshot per persistenceId. The snapshots are transformed to events with the given `transformSnapshot` function. + * number of events that have to be processed. This can be useful if the consumer start from zero without any + * previously processed offset or if it has been disconnected for a long while and its offset is far behind. * - * After emitting the snapshot events the ordinary events with sequence numbers after the snapshots are emitted. + * Snapshot events and ordinary events are interleaved, skipping events before known snapshots. After emitting the + * snapshot events the ordinary events with sequence numbers after the snapshots are emitted. * * To use `currentEventsBySlicesStartingFromSnapshots` you must enable configuration * `akka.persistence.r2dbc.query.start-from-snapshot.enabled` and follow instructions in migration guide @@ -373,59 +358,41 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat offset: Offset, transformSnapshot: Snapshot => Event): Source[EventEnvelope[Event], NotUsed] = { checkStartFromSnapshotEnabled("currentEventsBySlicesStartingFromSnapshots") - val timestampOffset = toTimestampOffset(offset) val correlationId = QueryCorrelationId.get() val correlationIdText = CorrelationId.toLogText(correlationId) - val snapshotSource = - snapshotsBySlice[Snapshot, Event](entityType, minSlice, transformSnapshot) + + val eventSource = + bySlice[Event](entityType, minSlice) .currentBySlices( - s"[$entityType] currentSnapshotsBySlices [$minSlice-$maxSlice]$correlationIdText: ", + s"[$entityType] currentEventsBySlicesStartingFromSnapshots [$minSlice-$maxSlice]$correlationIdText: ", correlationId, entityType, minSlice, maxSlice, offset) - Source.fromGraph( - new StartingFromSnapshotStage[Event]( - snapshotSource, - { snapshotOffsets => - val initOffset = - if (timestampOffset == TimestampOffset.Zero && snapshotOffsets.nonEmpty) { - val minTimestamp = snapshotOffsets.valuesIterator.minBy { case (_, timestamp) => timestamp }._2 - TimestampOffset(minTimestamp, Map.empty) - } else { - // don't adjust because then there is a risk that there was no found snapshot for a persistenceId - // but there can still be events between the given `offset` parameter and the min timestamp of the - // snapshots and those would then be missed - offset - } - - log.debug( - "currentEventsBySlicesStartingFromSnapshots initOffset [{}] with [{}] snapshots", - initOffset, - snapshotOffsets.size) - - bySlice(entityType, minSlice).currentBySlices( - s"[$entityType] currentEventsBySlices [$minSlice-$maxSlice]$correlationIdText: ", - correlationId, - entityType, - minSlice, - maxSlice, - initOffset, - filterEventsBeforeSnapshots(snapshotOffsets, backtrackingEnabled = false)) - })) + eventSource + .via( + Flow.fromGraph( + new StartingFromSnapshotStage[Event]( + cacheCapacity = settings.querySettings.startFromSnapshotCacheCapacity, + sequenceNumberOfSnapshot = persistenceId => snapshotDao.sequenceNumberOfSnapshot(persistenceId), + loadSnapshot = persistenceId => snapshotDao.load(persistenceId, SnapshotSelectionCriteria.Latest), + createEnvelope = + (snapshotRow, offset) => createEnvelopeFromSnapshot(snapshotRow, offset, transformSnapshot), + heartbeatAfter = settings.querySettings.startFromSnapshotHeartbeatAfter, + createHeartbeat = timestamp => createEventEnvelopeHeartbeat(entityType, minSlice, timestamp)))) } /** * Same as `eventsBySlices` but with the purpose to use snapshots as starting points and thereby reducing number of - * events that have to be loaded. This can be useful if the consumer start from zero without any previously processed - * offset or if it has been disconnected for a long while and its offset is far behind. + * events that have to be processed. This can be useful if the consumer start from zero without any previously + * processed offset or if it has been disconnected for a long while and its offset is far behind. * - * First it loads all snapshots with timestamps greater than or equal to the offset timestamp. There is at most one - * snapshot per persistenceId. The snapshots are transformed to events with the given `transformSnapshot` function. + * The snapshots are transformed to events with the given `transformSnapshot` function. * - * After emitting the snapshot events the ordinary events with sequence numbers after the snapshots are emitted. + * Snapshot events and ordinary events are interleaved, skipping events before known snapshots. After emitting the + * snapshot events the ordinary events with sequence numbers after the snapshots are emitted. * * To use `eventsBySlicesStartingFromSnapshots` you must enable configuration * `akka.persistence.r2dbc.query.start-from-snapshot.enabled` and follow instructions in migration guide @@ -438,87 +405,35 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat offset: Offset, transformSnapshot: Snapshot => Event): Source[EventEnvelope[Event], NotUsed] = { checkStartFromSnapshotEnabled("eventsBySlicesStartingFromSnapshots") - val timestampOffset = toTimestampOffset(offset) val correlationId = QueryCorrelationId.get() val correlationIdText = CorrelationId.toLogText(correlationId) - val snapshotSource = - snapshotsBySlice[Snapshot, Event](entityType, minSlice, transformSnapshot) - .currentBySlices( - s"[$entityType] snapshotsBySlices [$minSlice-$maxSlice]$correlationIdText: ", - correlationId, - entityType, - minSlice, - maxSlice, - offset) - - Source.fromGraph( - new StartingFromSnapshotStage[Event]( - snapshotSource, - { snapshotOffsets => - val initOffset = - if (timestampOffset == TimestampOffset.Zero && snapshotOffsets.nonEmpty) { - val minTimestamp = snapshotOffsets.valuesIterator.minBy { case (_, timestamp) => timestamp }._2 - TimestampOffset(minTimestamp, Map.empty) - } else { - // don't adjust because then there is a risk that there was no found snapshot for a persistenceId - // but there can still be events between the given `offset` parameter and the min timestamp of the - // snapshots and those would then be missed - offset - } - - log.debug( - s"eventsBySlicesStartingFromSnapshots $correlationIdText initOffset [{}] with [{}] snapshots", - initOffset, - snapshotOffsets.size) - - val dbSource = - bySlice[Event](entityType, minSlice).liveBySlices( - s"[$entityType] eventsBySlices [$minSlice-$maxSlice]$correlationIdText: ", - correlationId, - entityType, - minSlice, - maxSlice, - initOffset, - filterEventsBeforeSnapshots(snapshotOffsets, settings.querySettings.backtrackingEnabled)) - - if (settings.journalPublishEvents) { - // Note that events via PubSub are not filtered by snapshotOffsets. It's unlikely that - // Those will be earlier than the snapshots and duplicates must be handled downstream in that case. - // If we would use the filterEventsBeforeSnapshots function for PubSub it would be difficult to - // know when memory of that Map can be released or the filter function would have to be shared - // and thread safe, which is not worth it. - val pubSubSource = eventsBySlicesPubSubSource[Event](entityType, minSlice, maxSlice) - mergeDbAndPubSubSources(dbSource, pubSubSource, correlationId) - } else - dbSource - })) - } - - /** - * Stateful filter function that decides if (persistenceId, seqNr, source) should be emitted by - * `eventsBySlicesStartingFromSnapshots` and `currentEventsBySlicesStartingFromSnapshots`. - */ - private def filterEventsBeforeSnapshots( - snapshotOffsets: Map[String, (Long, Instant)], - backtrackingEnabled: Boolean): (String, Long, String) => Boolean = { - var _snapshotOffsets = snapshotOffsets - (persistenceId, seqNr, source) => { - if (_snapshotOffsets.isEmpty) - true - else - _snapshotOffsets.get(persistenceId) match { - case None => true - case Some((snapshotSeqNr, _)) => - // release memory by removing from the _snapshotOffsets Map - if (seqNr == snapshotSeqNr && - ((backtrackingEnabled && source == EnvelopeOrigin.SourceBacktracking) || - (!backtrackingEnabled && source == EnvelopeOrigin.SourceQuery))) { - _snapshotOffsets -= persistenceId - } - seqNr > snapshotSeqNr - } - } + val dbSource = + bySlice[Event](entityType, minSlice).liveBySlices( + s"[$entityType] eventsBySlicesStartingFromSnapshots [$minSlice-$maxSlice]$correlationIdText: ", + correlationId, + entityType, + minSlice, + maxSlice, + offset) + val eventSource = + if (settings.journalPublishEvents) { + val pubSubSource = eventsBySlicesPubSubSource[Event](entityType, minSlice, maxSlice) + mergeDbAndPubSubSources(dbSource, pubSubSource, correlationId) + } else + dbSource + + eventSource + .via( + Flow.fromGraph( + new StartingFromSnapshotStage[Event]( + cacheCapacity = settings.querySettings.startFromSnapshotCacheCapacity, + sequenceNumberOfSnapshot = persistenceId => snapshotDao.sequenceNumberOfSnapshot(persistenceId), + loadSnapshot = persistenceId => snapshotDao.load(persistenceId, SnapshotSelectionCriteria.Latest), + createEnvelope = + (snapshotRow, offset) => createEnvelopeFromSnapshot(snapshotRow, offset, transformSnapshot), + heartbeatAfter = settings.querySettings.startFromSnapshotHeartbeatAfter, + createHeartbeat = timestamp => createEventEnvelopeHeartbeat(entityType, minSlice, timestamp)))) } private def checkStartFromSnapshotEnabled(methodName: String): Unit = @@ -562,7 +477,7 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat private def deserializeEvent[Event](se: SerializedEvent): Event = serialization.deserialize(se.bytes, se.serializerId, se.serializerManifest).get.asInstanceOf[Event] - private def mergeDbAndPubSubSources[Event, Snapshot]( + private def mergeDbAndPubSubSources[Event]( dbSource: Source[EventEnvelope[Event], NotUsed], pubSubSource: Source[EventEnvelope[Event], NotUsed], correlationId: Option[String]) = { diff --git a/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceStartingFromSnapshotSpec.scala b/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceStartingFromSnapshotSpec.scala index 66cd8911..66293c57 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceStartingFromSnapshotSpec.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceStartingFromSnapshotSpec.scala @@ -21,6 +21,7 @@ import akka.persistence.r2dbc.TestActors.Persister.PersistWithAck import akka.persistence.r2dbc.TestConfig import akka.persistence.r2dbc.TestData import akka.persistence.r2dbc.TestDbLifecycle +import akka.persistence.r2dbc.internal.EnvelopeOrigin import akka.persistence.r2dbc.query.scaladsl.R2dbcReadJournal import akka.persistence.typed.PersistenceId import akka.stream.scaladsl.Sink @@ -40,6 +41,7 @@ object EventsBySliceStartingFromSnapshotSpec { TestConfig.backtrackingDisabledConfig .withFallback(ConfigFactory.parseString(s""" akka.persistence.r2dbc.query.start-from-snapshot.enabled = true + akka.persistence.r2dbc.query.start-from-snapshot.heartbeat-after = 100 # This test is not using backtracking, so increase behind-current-time to # reduce risk of missing events @@ -216,6 +218,42 @@ class EventsBySliceStartingFromSnapshotSpec assertFinished(result) } + "emit heartbeat after many filtered events before snapshot" in new Setup { + // heartbeat-after is configured to 100 + val snapshotAt = 220 + for (i <- 1 to 230) { + if (i == snapshotAt) { + persister ! PersistWithAck(s"e-$i-snap", probe.ref) + snapshotAckProbe.expectMessage(snapshotAt.toLong) + } else + persister ! PersistWithAck(s"e-$i", probe.ref) + probe.expectMessage(Done) + } + val result: TestSubscriber.Probe[EventEnvelope[String]] = + doQuery(entityType, slice, slice, NoOffset) + .runWith(sinkProbe) + .request(300) + + // events 1-219 are before the snapshot, 219 filtered events + // heartbeats emitted after 100 and 200 filtered events + val hb1 = result.expectNext() + EnvelopeOrigin.fromHeartbeat(hb1) shouldBe true + hb1.filtered shouldBe true + hb1.sequenceNr should be >= 1L + + val hb2 = result.expectNext() + EnvelopeOrigin.fromHeartbeat(hb2) shouldBe true + hb2.filtered shouldBe true + hb2.persistenceId shouldBe hb1.persistenceId + hb2.sequenceNr should be > hb1.sequenceNr + + result.expectNext().event shouldBe expectedSnapshotEvent(snapshotAt) + for (i <- (snapshotAt + 1) to 230) { + result.expectNext().event shouldBe s"e-$i" + } + assertFinished(result) + } + } } diff --git a/core/src/test/scala/akka/persistence/r2dbc/query/StartingFromSnapshotStageSpec.scala b/core/src/test/scala/akka/persistence/r2dbc/query/StartingFromSnapshotStageSpec.scala index 5f6a6fd8..c7880349 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/query/StartingFromSnapshotStageSpec.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/query/StartingFromSnapshotStageSpec.scala @@ -5,29 +5,41 @@ package akka.persistence.r2dbc.query import java.time.Instant +import java.util.concurrent.atomic.AtomicInteger -import akka.actor.testkit.typed.TestException +import scala.concurrent.Future + +import org.scalatest.wordspec.AnyWordSpecLike + +import akka.NotUsed import akka.actor.testkit.typed.scaladsl.LogCapturing import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import akka.actor.typed.ActorSystem import akka.persistence.Persistence import akka.persistence.query.TimestampOffset import akka.persistence.query.typed.EventEnvelope +import akka.persistence.r2dbc.internal.EnvelopeOrigin +import akka.persistence.r2dbc.internal.SnapshotDao.SerializedSnapshotRow import akka.persistence.r2dbc.internal.StartingFromSnapshotStage import akka.persistence.typed.PersistenceId +import akka.stream.scaladsl.Flow import akka.stream.scaladsl.Source import akka.stream.testkit.scaladsl.TestSink -import org.scalatest.wordspec.AnyWordSpecLike class StartingFromSnapshotStageSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with LogCapturing { private val entityType = "TestEntity" private val persistence = Persistence(system) private implicit val sys: ActorSystem[_] = system + private val pidA = PersistenceId(entityType, "a") + private val pidB = PersistenceId(entityType, "b") + private val pidC = PersistenceId(entityType, "c") + private def createEnvelope( pid: PersistenceId, seqNr: Long, evt: String, + source: String = "", tags: Set[String] = Set.empty): EventEnvelope[Any] = { val now = Instant.now() EventEnvelope( @@ -39,76 +51,302 @@ class StartingFromSnapshotStageSpec extends ScalaTestWithActorTestKit with AnyWo pid.entityTypeHint, persistence.sliceForPersistenceId(pid.id), filtered = false, - source = "", + source, tags = tags) } - private val snapshotEnvelopes = Vector( - createEnvelope(PersistenceId(entityType, "a"), 2, "snap-a2"), - createEnvelope(PersistenceId(entityType, "b"), 3, "snap-b3"), - createEnvelope(PersistenceId(entityType, "c"), 1, "snap-c1")) + private def createSerializedSnapshotRow(env: EventEnvelope[Any]) = + SerializedSnapshotRow( + env.slice, + env.entityType, + env.persistenceId, + env.sequenceNr, + env.offset.asInstanceOf[TimestampOffset].timestamp, + env.timestamp, + Array.empty, + 0, + "", + Set.empty, + None) + + def findEnvelope( + persistenceId: PersistenceId, + envelopes: IndexedSeq[EventEnvelope[Any]]): Option[EventEnvelope[Any]] = + envelopes.find(env => env.persistenceId == persistenceId.id) + + def findEnvelope( + persistenceId: PersistenceId, + seqNr: Long, + envelopes: IndexedSeq[EventEnvelope[Any]], + source: String = ""): Option[EventEnvelope[Any]] = + envelopes.find(env => env.persistenceId == persistenceId.id && env.sequenceNr == seqNr && env.source == source) + + private abstract class Setup { + def snapshotEnvelopes: IndexedSeq[EventEnvelope[Any]] + def eventEnvelopes: IndexedSeq[EventEnvelope[Any]] - private val primaryEnvelopes = Vector( - createEnvelope(PersistenceId(entityType, "a"), 3, "a3"), - createEnvelope(PersistenceId(entityType, "a"), 4, "a4"), - createEnvelope(PersistenceId(entityType, "b"), 4, "b4")) + def loadSnapshot(persistenceId: String): Future[Option[SerializedSnapshotRow]] = + Future.successful( + findEnvelope(PersistenceId.ofUniqueId(persistenceId), snapshotEnvelopes).map(createSerializedSnapshotRow)) + + def sequenceNumberOfSnapshot(persistenceId: String): Future[Option[Long]] = + Future.successful(findEnvelope(PersistenceId.ofUniqueId(persistenceId), snapshotEnvelopes).map(_.sequenceNr)) + + def createEnvelopeFromSnapshotRow(snap: SerializedSnapshotRow, offset: TimestampOffset): EventEnvelope[Any] = + findEnvelope(PersistenceId.ofUniqueId(snap.persistenceId), snap.seqNr, snapshotEnvelopes) + .getOrElse(throw new IllegalArgumentException(s"Unknown envelope for [$snap]")) + + def createHeartbeat(timestamp: Instant): EventEnvelope[Any] = { + new EventEnvelope( + TimestampOffset(timestamp, Map.empty), + "heartbeat-pid", // we don't care about slice mapping of heartbeats in this test + 1L, + eventOption = None, + timestamp.toEpochMilli, + _eventMetadata = None, + entityType, + 0, + filtered = true, + source = EnvelopeOrigin.SourceHeartbeat, + Set.empty) + } + + def source(cacheCapacity: Int = 10000, heartbeatAfter: Int = 1000): Source[EventEnvelope[Any], NotUsed] = + Source(eventEnvelopes) + .via( + Flow.fromGraph( + new StartingFromSnapshotStage( + cacheCapacity, + sequenceNumberOfSnapshot, + loadSnapshot, + createEnvelopeFromSnapshotRow, + heartbeatAfter, + createHeartbeat))) + } "StartingFromSnapshotStage" must { - "emit envelopes from snapshots and then from primary" in { - val source = - Source.fromGraph(new StartingFromSnapshotStage(Source(snapshotEnvelopes), _ => Source(primaryEnvelopes))) + "emit envelopes from snapshots and from ordinary events" in new Setup { + override lazy val snapshotEnvelopes = + Vector(createEnvelope(pidA, 2, "snap-a2"), createEnvelope(pidC, 1, "snap-c1")) - val probe = source.runWith(TestSink()) + override lazy val eventEnvelopes = Vector( + createEnvelope(pidA, 1, "a1"), + createEnvelope(pidA, 2, "a2"), + createEnvelope(pidA, 3, "a3"), + createEnvelope(pidA, 4, "a4"), + createEnvelope(pidB, 1, "b1"), + createEnvelope(pidB, 2, "b2"), + createEnvelope(pidC, 1, "c1"), + createEnvelope(pidC, 2, "c2")) + + val probe = source().runWith(TestSink()) probe.request(100) - probe.expectNextN(snapshotEnvelopes ++ primaryEnvelopes) + + probe.expectNext(findEnvelope(pidA, 2, snapshotEnvelopes).get) + probe.expectNext(findEnvelope(pidA, 3, eventEnvelopes).get) + probe.expectNext(findEnvelope(pidA, 4, eventEnvelopes).get) + + probe.expectNext(findEnvelope(pidB, 1, eventEnvelopes).get) + probe.expectNext(findEnvelope(pidB, 2, eventEnvelopes).get) + + probe.expectNext(findEnvelope(pidC, 1, snapshotEnvelopes).get) + probe.expectNext(findEnvelope(pidC, 2, eventEnvelopes).get) + probe.expectComplete() } - "collect offsets from snapshots" in { - val source = - Source.fromGraph( - new StartingFromSnapshotStage( - Source(snapshotEnvelopes), - { snapshotOffsets => - val moreEnvelopes = - snapshotOffsets.iterator - .map { case (pid, (seqNr, _)) => - createEnvelope( - PersistenceId.ofUniqueId(pid), - seqNr + 1, - s"${PersistenceId.ofUniqueId(pid).entityId}${seqNr + 1}") - } - .toVector - .sortBy(_.persistenceId) - Source(moreEnvelopes) - })) - - val probe = source.runWith(TestSink()) + "handle backtracking envelopes" in new Setup { + override lazy val snapshotEnvelopes = + Vector(createEnvelope(pidA, 2, "snap-a2"), createEnvelope(pidC, 1, "snap-c1")) + + override lazy val eventEnvelopes = Vector( + createEnvelope(pidA, 1, "a1"), + createEnvelope(pidA, 1, "bt-a1", source = EnvelopeOrigin.SourceBacktracking), + createEnvelope(pidA, 2, "a2"), + createEnvelope(pidA, 2, "bt-a2", source = EnvelopeOrigin.SourceBacktracking), + createEnvelope(pidA, 3, "a3"), + createEnvelope(pidA, 4, "a4"), + createEnvelope(pidA, 3, "bt-a3", source = EnvelopeOrigin.SourceBacktracking), + createEnvelope(pidA, 4, "bt-a4", source = EnvelopeOrigin.SourceBacktracking), + createEnvelope(pidB, 1, "b1"), + createEnvelope(pidB, 2, "b2"), + createEnvelope(pidB, 1, "bt-b1", source = EnvelopeOrigin.SourceBacktracking), + createEnvelope(pidB, 2, "bt-b2", source = EnvelopeOrigin.SourceBacktracking), + createEnvelope(pidC, 1, "c1"), + createEnvelope(pidC, 1, "c1", source = EnvelopeOrigin.SourceBacktracking), + createEnvelope(pidC, 2, "c2"), + createEnvelope(pidC, 2, "c2", source = EnvelopeOrigin.SourceBacktracking)) + + val probe = source().runWith(TestSink()) probe.request(100) - probe.expectNextN(snapshotEnvelopes) - probe.expectNext().event shouldBe "a3" - probe.expectNext().event shouldBe "b4" - probe.expectNext().event shouldBe "c2" + + probe.expectNext(findEnvelope(pidA, 2, snapshotEnvelopes).get) + probe.expectNext(findEnvelope(pidA, 3, eventEnvelopes).get) + probe.expectNext(findEnvelope(pidA, 4, eventEnvelopes).get) + probe.expectNext(findEnvelope(pidA, 3, eventEnvelopes, source = EnvelopeOrigin.SourceBacktracking).get) + probe.expectNext(findEnvelope(pidA, 4, eventEnvelopes, source = EnvelopeOrigin.SourceBacktracking).get) + + probe.expectNext(findEnvelope(pidB, 1, eventEnvelopes).get) + probe.expectNext(findEnvelope(pidB, 2, eventEnvelopes).get) + probe.expectNext(findEnvelope(pidB, 1, eventEnvelopes, source = EnvelopeOrigin.SourceBacktracking).get) + probe.expectNext(findEnvelope(pidB, 2, eventEnvelopes, source = EnvelopeOrigin.SourceBacktracking).get) + + probe.expectNext(findEnvelope(pidC, 1, snapshotEnvelopes).get) + probe.expectNext(findEnvelope(pidC, 2, eventEnvelopes).get) + probe.expectNext(findEnvelope(pidC, 2, eventEnvelopes, source = EnvelopeOrigin.SourceBacktracking).get) + + probe.expectComplete() } - "fail if snapshots fail" in { - val source = - Source.fromGraph( - new StartingFromSnapshotStage(Source.failed(TestException("err")), _ => Source(primaryEnvelopes))) + "handle sequence gap" in new Setup { + override lazy val snapshotEnvelopes = + Vector(createEnvelope(pidA, 2, "snap-a2"), createEnvelope(pidB, 2, "snap-b2")) + + override lazy val eventEnvelopes = Vector( + createEnvelope(pidA, 1, "a1"), + // a2 is missing + createEnvelope(pidA, 3, "a3"), + createEnvelope(pidA, 2, "bt-a2", source = EnvelopeOrigin.SourceBacktracking), + createEnvelope(pidA, 4, "a4"), + createEnvelope(pidA, 3, "bt-a3", source = EnvelopeOrigin.SourceBacktracking), + createEnvelope(pidA, 4, "bt-a4", source = EnvelopeOrigin.SourceBacktracking), + + // first is ahead of snapshot + createEnvelope(pidB, 3, "b3"), + createEnvelope(pidB, 4, "b4"), + createEnvelope(pidB, 2, "bt-b2", source = EnvelopeOrigin.SourceBacktracking), + createEnvelope(pidB, 3, "bt-b3", source = EnvelopeOrigin.SourceBacktracking)) - val probe = source.runWith(TestSink()) + val probe = source().runWith(TestSink()) probe.request(100) - probe.expectError(TestException("err")) + + // a3 still emitted, since it's ahead of snapshot + probe.expectNext(findEnvelope(pidA, 3, eventEnvelopes).get) + + // snapshot triggered by backtracking + probe.expectNext(findEnvelope(pidA, 2, snapshotEnvelopes).get) + probe.expectNext(findEnvelope(pidA, 4, eventEnvelopes).get) + probe.expectNext(findEnvelope(pidA, 3, eventEnvelopes, source = EnvelopeOrigin.SourceBacktracking).get) + probe.expectNext(findEnvelope(pidA, 4, eventEnvelopes, source = EnvelopeOrigin.SourceBacktracking).get) + + probe.expectNext(findEnvelope(pidB, 3, eventEnvelopes).get) + probe.expectNext(findEnvelope(pidB, 4, eventEnvelopes).get) + probe.expectNext(findEnvelope(pidB, 2, snapshotEnvelopes).get) + probe.expectNext(findEnvelope(pidB, 3, eventEnvelopes, source = EnvelopeOrigin.SourceBacktracking).get) + + probe.expectComplete() } - "fail if primary fail" in { - val source = - Source.fromGraph(new StartingFromSnapshotStage(Source.empty, _ => Source.failed(TestException("err")))) + "fail if snapshots loading fail" in new Setup { + override lazy val snapshotEnvelopes = Vector(createEnvelope(pidA, 2, "snap-a2")) + + override lazy val eventEnvelopes = + Vector(createEnvelope(pidA, 1, "a1"), createEnvelope(pidA, 2, "a2"), createEnvelope(pidA, 3, "a3")) + + override def loadSnapshot(persistenceId: String): Future[Option[SerializedSnapshotRow]] = + Future.failed(new RuntimeException(s"Simulated exc when loading snapshot [$persistenceId]")) - val probe = source.runWith(TestSink()) + val probe = source().runWith(TestSink()) probe.request(100) - probe.expectError(TestException("err")) + + probe + .expectError() + .getMessage shouldBe s"Simulated exc when loading snapshot [${eventEnvelopes.head.persistenceId}]" } + + } + + "evict cache" in new Setup { + val pids = (1 to 10).map { n => + PersistenceId(entityType, s"pid-$n") + } + + override lazy val snapshotEnvelopes = + pids.map(pid => createEnvelope(pid, 3, s"snap-3")) + + override lazy val eventEnvelopes = + for { + seqNr <- (1 to 5) + pid <- pids + } yield { + createEnvelope(pid, seqNr, s"evt-$seqNr") + } + + val loadCounter = new AtomicInteger + + override def sequenceNumberOfSnapshot(persistenceId: String): Future[Option[Long]] = { + loadCounter.incrementAndGet() + super.sequenceNumberOfSnapshot(persistenceId) + } + + override def loadSnapshot(persistenceId: String): Future[Option[SerializedSnapshotRow]] = { + loadCounter.incrementAndGet() + super.loadSnapshot(persistenceId) + } + + val probe = source(cacheCapacity = 3).runWith(TestSink()) + probe.request(100) + + pids.foreach { pid => + probe.expectNext(findEnvelope(pid, 3, snapshotEnvelopes).get) + } + + pids.foreach { pid => + probe.expectNext(findEnvelope(pid, 4, eventEnvelopes).get) + } + pids.foreach { pid => + probe.expectNext(findEnvelope(pid, 5, eventEnvelopes).get) + } + + probe.expectComplete() + + // Normally, each pid snapshot is loaded twice, on firs occurrence, and when emitted. + // When evicted from the cache, it is loaded again + loadCounter.get() should be > (pids.size * 2) + } + + "emit many events" in new Setup { + override lazy val snapshotEnvelopes = Vector(createEnvelope(pidA, 33, "snap-33")) + + override lazy val eventEnvelopes = (1 to 100).map(seqNr => createEnvelope(pidA, seqNr, s"evt-$seqNr")) + + val probe = source(heartbeatAfter = 33).runWith(TestSink()) + probe.request(200) + val received = probe.expectNextN(100 - 32) + received.head shouldBe findEnvelope(pidA, snapshotEnvelopes).get + received.find(EnvelopeOrigin.fromHeartbeat) shouldBe None // no heartbeat + probe.expectComplete() + } + + "emit heartbeat after many filtered events" in new Setup { + override lazy val snapshotEnvelopes = Vector(createEnvelope(pidA, 66, "snap-66")) + + override lazy val eventEnvelopes = (1 to 100).map(seqNr => createEnvelope(pidA, seqNr, s"evt-$seqNr")) + + val probe = source(heartbeatAfter = 20).runWith(TestSink()) + probe.request(200) + val e1 = probe.expectNext() + EnvelopeOrigin.fromHeartbeat(e1) shouldBe true + e1.offset.asInstanceOf[TimestampOffset].timestamp shouldBe eventEnvelopes(19).offset + .asInstanceOf[TimestampOffset] + .timestamp + val e2 = probe.expectNext() + EnvelopeOrigin.fromHeartbeat(e2) shouldBe true + e2.offset.asInstanceOf[TimestampOffset].timestamp shouldBe eventEnvelopes(39).offset + .asInstanceOf[TimestampOffset] + .timestamp + val e3 = probe.expectNext() + EnvelopeOrigin.fromHeartbeat(e3) shouldBe true + e3.offset.asInstanceOf[TimestampOffset].timestamp shouldBe eventEnvelopes(59).offset + .asInstanceOf[TimestampOffset] + .timestamp + val e4 = probe.expectNext() + e4 shouldBe findEnvelope(pidA, snapshotEnvelopes).get + + val more = probe.expectNextN(30) + more.find(EnvelopeOrigin.fromHeartbeat) shouldBe None + + probe.cancel() } } diff --git a/ddl-scripts/create_tables_postgres.sql b/ddl-scripts/create_tables_postgres.sql index dfff08d0..dfeb1661 100644 --- a/ddl-scripts/create_tables_postgres.sql +++ b/ddl-scripts/create_tables_postgres.sql @@ -42,9 +42,6 @@ CREATE TABLE IF NOT EXISTS snapshot( PRIMARY KEY(persistence_id) ); --- `snapshot_slice_idx` is only needed if the slice based queries are used together with snapshot as starting point -CREATE INDEX IF NOT EXISTS snapshot_slice_idx ON snapshot(slice, entity_type, db_timestamp); - CREATE TABLE IF NOT EXISTS durable_state ( slice INT NOT NULL, entity_type VARCHAR(255) NOT NULL, diff --git a/ddl-scripts/create_tables_postgres_0-1.sql b/ddl-scripts/create_tables_postgres_0-1.sql index 70ac81e2..55e0e1b8 100644 --- a/ddl-scripts/create_tables_postgres_0-1.sql +++ b/ddl-scripts/create_tables_postgres_0-1.sql @@ -86,10 +86,6 @@ CREATE TABLE IF NOT EXISTS snapshot_1( PRIMARY KEY(persistence_id) ); --- `snapshot_slice_idx` is only needed if the slice based queries are used together with snapshot as starting point -CREATE INDEX IF NOT EXISTS snapshot_0_slice_idx ON snapshot_0(slice, entity_type, db_timestamp); -CREATE INDEX IF NOT EXISTS snapshot_1_slice_idx ON snapshot_1(slice, entity_type, db_timestamp); - CREATE TABLE IF NOT EXISTS durable_state_0 ( slice INT NOT NULL, entity_type VARCHAR(255) NOT NULL, diff --git a/ddl-scripts/create_tables_postgres_2-3.sql b/ddl-scripts/create_tables_postgres_2-3.sql index 079b22ba..c025496b 100644 --- a/ddl-scripts/create_tables_postgres_2-3.sql +++ b/ddl-scripts/create_tables_postgres_2-3.sql @@ -86,10 +86,6 @@ CREATE TABLE IF NOT EXISTS snapshot_3( PRIMARY KEY(persistence_id) ); --- `snapshot_slice_idx` is only needed if the slice based queries are used together with snapshot as starting point -CREATE INDEX IF NOT EXISTS snapshot_2_slice_idx ON snapshot_2(slice, entity_type, db_timestamp); -CREATE INDEX IF NOT EXISTS snapshot_3_slice_idx ON snapshot_3(slice, entity_type, db_timestamp); - CREATE TABLE IF NOT EXISTS durable_state_2 ( slice INT NOT NULL, entity_type VARCHAR(255) NOT NULL, diff --git a/ddl-scripts/create_tables_postgres_jsonb.sql b/ddl-scripts/create_tables_postgres_jsonb.sql index 336af00a..c79a5d90 100644 --- a/ddl-scripts/create_tables_postgres_jsonb.sql +++ b/ddl-scripts/create_tables_postgres_jsonb.sql @@ -42,9 +42,6 @@ CREATE TABLE IF NOT EXISTS snapshot( PRIMARY KEY(persistence_id) ); --- `snapshot_slice_idx` is only needed if the slice based queries are used together with snapshot as starting point -CREATE INDEX IF NOT EXISTS snapshot_slice_idx ON snapshot(slice, entity_type, db_timestamp); - CREATE TABLE IF NOT EXISTS durable_state ( slice INT NOT NULL, entity_type VARCHAR(255) NOT NULL, diff --git a/ddl-scripts/create_tables_sqlserver.sql b/ddl-scripts/create_tables_sqlserver.sql index 4987e9e4..59bdfa14 100644 --- a/ddl-scripts/create_tables_sqlserver.sql +++ b/ddl-scripts/create_tables_sqlserver.sql @@ -42,12 +42,6 @@ IF object_id('snapshot') is null PRIMARY KEY(persistence_id) ); --- `snapshot_slice_idx` is only needed if the slice based queries are used together with snapshot as starting point -IF NOT EXISTS(SELECT * FROM sys.indexes WHERE name = 'snapshot_slice_idx' AND object_id = OBJECT_ID('snapshot')) - BEGIN - CREATE INDEX snapshot_slice_idx ON snapshot(slice, entity_type, db_timestamp); - END; - IF object_id('durable_state') is null CREATE TABLE durable_state ( slice INT NOT NULL, diff --git a/ddl-scripts/create_tables_yugabyte.sql b/ddl-scripts/create_tables_yugabyte.sql index c9f57335..77e16942 100644 --- a/ddl-scripts/create_tables_yugabyte.sql +++ b/ddl-scripts/create_tables_yugabyte.sql @@ -43,10 +43,6 @@ CREATE TABLE IF NOT EXISTS snapshot( PRIMARY KEY(persistence_id HASH) ); --- `snapshot_slice_idx` is only needed if the slice based queries are used together with snapshot as starting point -CREATE INDEX IF NOT EXISTS snapshot_slice_idx ON snapshot(slice ASC, entity_type ASC, db_timestamp ASC, persistence_id) - SPLIT AT VALUES ((127), (255), (383), (511), (639), (767), (895)); - CREATE TABLE IF NOT EXISTS durable_state ( slice INT NOT NULL, entity_type VARCHAR(255) NOT NULL, diff --git a/ddl-scripts/drop_tables_postgres.sql b/ddl-scripts/drop_tables_postgres.sql index 7730b1c9..90f7ae5d 100644 --- a/ddl-scripts/drop_tables_postgres.sql +++ b/ddl-scripts/drop_tables_postgres.sql @@ -1,5 +1,4 @@ DROP INDEX event_journal_slice_idx; -DROP INDEX snapshot_slice_idx; DROP INDEX durable_state_slice_idx; DROP TABLE IF EXISTS event_journal; DROP TABLE IF EXISTS snapshot; diff --git a/ddl-scripts/drop_tables_sqlserver.sql b/ddl-scripts/drop_tables_sqlserver.sql index fb908f87..c520a3e6 100644 --- a/ddl-scripts/drop_tables_sqlserver.sql +++ b/ddl-scripts/drop_tables_sqlserver.sql @@ -1,6 +1,5 @@ DROP INDEX event_journal.event_journal_slice_idx; -DROP INDEX snapshot.snapshot_slice_idx; DROP INDEX durable_state.durable_state_slice_idx; DROP TABLE IF EXISTS event_journal; DROP TABLE IF EXISTS snapshot; -DROP TABLE IF EXISTS durable_state; \ No newline at end of file +DROP TABLE IF EXISTS durable_state; diff --git a/docs/src/main/paradox/migration-guide.md b/docs/src/main/paradox/migration-guide.md index b32fe0e9..24da3c46 100644 --- a/docs/src/main/paradox/migration-guide.md +++ b/docs/src/main/paradox/migration-guide.md @@ -94,19 +94,6 @@ Yugabyte: UPDATE snapshot s SET db_timestamp = e.db_timestamp, tags = e.tags FROM event_journal e WHERE s.persistence_id = e.persistence_id and s.seq_nr = e.seq_nr; ``` -A new index must be added to the `snapshot` table: - -Postgres: -: ```sql -CREATE INDEX IF NOT EXISTS snapshot_slice_idx ON snapshot(slice, entity_type, db_timestamp); -``` - -Yugabyte: -: ```sql -CREATE INDEX IF NOT EXISTS snapshot_slice_idx ON snapshot(slice ASC, entity_type ASC, db_timestamp ASC, persistence_id) - SPLIT AT VALUES ((127), (255), (383), (511), (639), (767), (895)); -``` - The feature must be enabled with configuration: ```hocon @@ -117,5 +104,5 @@ These changes can be made in a rolling update process. 1. While running the old version, alter tables to add the two columns. 2. Enable configuration and roll out new version. Don't use `eventsBySlicesStartingFromSnapshots` yet. -3. Populate the columns with the update statement. Create the index. +3. Populate the columns with the update statement. 4. Roll out another version where you can use `eventsBySlicesStartingFromSnapshots`. diff --git a/docs/src/main/paradox/query.md b/docs/src/main/paradox/query.md index e3fcae65..2550982b 100644 --- a/docs/src/main/paradox/query.md +++ b/docs/src/main/paradox/query.md @@ -109,13 +109,13 @@ event payload is always full loaded. ### eventsBySlicesStartingFromSnapshots Same as `eventsBySlices` but with the purpose to use snapshots as starting points and thereby reducing number of -events that have to be loaded. This can be useful if the consumer start from zero without any previously processed +events that have to be processed. This can be useful if the consumer start from zero without any previously processed offset or if it has been disconnected for a long while and its offset is far behind. -First it loads all snapshots with timestamps greater than or equal to the offset timestamp. There is at most one -snapshot per persistenceId. The snapshots are transformed to events with the given `transformSnapshot` function. +The snapshot is transformed to an event with the given `transformSnapshot` function. -After emitting the snapshot events the ordinary events with sequence numbers after the snapshots are emitted. +Snapshot events and ordinary events are interleaved, skipping events before known snapshots. After emitting the +snapshot events the ordinary events with sequence numbers after the snapshots are emitted. To use `eventsBySlicesStartingFromSnapshots` or `currentEventsBySlicesStartingFromSnapshots` you must follow instructions in @ref:[migration guide](migration-guide.md#eventsBySlicesStartingFromSnapshots) and enable configuration: