Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# internals
ProblemFilters.exclude[MissingClassProblem]("akka.persistence.r2dbc.internal.postgres.YugabyteSnapshotDao")
2 changes: 2 additions & 0 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ akka.persistence.r2dbc {
# a snapshot.
# See also https://doc.akka.io/libraries/akka-persistence-r2dbc/current/migration-guide.html#eventsBySlicesStartingFromSnapshots
enabled = false

cache-capacity = 10000
}

# Cache TTL for latestEventTimestamp queries. Setting this to a positive duration enables caching of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,7 @@ final class QuerySettings(config: Config) {
val persistenceIdsBufferSize: Int = config.getInt("persistence-ids.buffer-size")
val deduplicateCapacity: Int = config.getInt("deduplicate-capacity")
val startFromSnapshotEnabled: Boolean = config.getBoolean("start-from-snapshot.enabled")
val startFromSnapshotCacheCapacity: Int = config.getInt("start-from-snapshot.cache-capacity")
val cacheLatestEventTimestamp: Option[FiniteDuration] = config.optDuration("cache-latest-event-timestamp")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,7 @@ import org.slf4j.Logger
entityType: String,
minSlice: Int,
maxSlice: Int,
offset: Offset,
filterEventsBeforeSnapshots: (String, Long, String) => Boolean = (_, _, _) => true): Source[Envelope, NotUsed] = {
offset: Offset): Source[Envelope, NotUsed] = {
val initialOffset = toTimestampOffset(offset)

def nextOffset(state: QueryState, envelope: Envelope): QueryState = {
Expand Down Expand Up @@ -326,9 +325,6 @@ import org.slf4j.Logger
behindCurrentTime = Duration.Zero,
backtracking = false,
correlationId)
.filter { row =>
filterEventsBeforeSnapshots(row.persistenceId, row.seqNr, row.source)
}
.via(deserializeAndAddOffset(state.latest)))
} else {
if (log.isDebugEnabled)
Expand Down Expand Up @@ -369,8 +365,7 @@ import org.slf4j.Logger
entityType: String,
minSlice: Int,
maxSlice: Int,
offset: Offset,
filterEventsBeforeSnapshots: (String, Long, String) => Boolean = (_, _, _) => true): Source[Envelope, NotUsed] = {
offset: Offset): Source[Envelope, NotUsed] = {
val initialOffset = toTimestampOffset(offset)

if (log.isDebugEnabled())
Expand Down Expand Up @@ -561,9 +556,6 @@ import org.slf4j.Logger
behindCurrentTime,
backtracking = newState.backtracking,
correlationId)
.filter { row =>
filterEventsBeforeSnapshots(row.persistenceId, row.seqNr, row.source)
}
.via(deserializeAndAddOffset(newState.currentOffset)))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,12 @@ private[r2dbc] object SnapshotDao {
* INTERNAL API
*/
@InternalApi
private[r2dbc] trait SnapshotDao extends BySliceQuery.Dao[SnapshotDao.SerializedSnapshotRow] {
private[r2dbc] trait SnapshotDao {
import SnapshotDao._

def load(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SerializedSnapshotRow]]
def store(serializedRow: SerializedSnapshotRow): Future[Unit]
def delete(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit]
def sequenceNumberOfSnapshot(persistenceId: String): Future[Option[Long]]

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,101 +4,178 @@

package akka.persistence.r2dbc.internal

import java.time.Instant
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.util.Failure
import scala.util.Success
import scala.util.Try

import akka.NotUsed
import akka.annotation.InternalApi
import akka.persistence.query.TimestampOffset.toTimestampOffset
import akka.persistence.query.TimestampOffset
import akka.persistence.query.typed.EventEnvelope
import akka.persistence.r2dbc.internal.SnapshotDao.SerializedSnapshotRow
import akka.stream.Attributes
import akka.stream.FlowShape
import akka.stream.Inlet
import akka.stream.Outlet
import akka.stream.SourceShape
import akka.stream.scaladsl.Keep
import akka.stream.scaladsl.Source
import akka.stream.stage.GraphStage
import akka.stream.stage.GraphStageLogic
import akka.stream.stage.InHandler
import akka.stream.stage.OutHandler
import akka.util.RecencyList

/**
* INTERNAL API
*/
@InternalApi private[r2dbc] class StartingFromSnapshotStage[Event](
snapshotSource: Source[EventEnvelope[Event], NotUsed],
primarySource: Map[String, (Long, Instant)] => Source[EventEnvelope[Event], NotUsed])
extends GraphStage[SourceShape[EventEnvelope[Event]]] {
@InternalApi private[r2dbc] object StartingFromSnapshotStage {
private case class SnapshotState(seqNr: Long, emitted: Boolean)
}

/**
* INTERNAL API
*/
@InternalApi private[r2dbc] class StartingFromSnapshotStage[Event](
cacheCapacity: Int,
sequenceNumberOfSnapshot: String => Future[Option[Long]],
loadSnapshot: String => Future[Option[SnapshotDao.SerializedSnapshotRow]],
createEnvelope: (SerializedSnapshotRow, TimestampOffset) => EventEnvelope[Event])
extends GraphStage[FlowShape[EventEnvelope[Event], EventEnvelope[Event]]] {
import StartingFromSnapshotStage._

val in: Inlet[EventEnvelope[Event]] = Inlet("in")
val out: Outlet[EventEnvelope[Event]] = Outlet("out")

override val shape: SourceShape[EventEnvelope[Event]] = SourceShape(out)
override val shape: FlowShape[EventEnvelope[Event], EventEnvelope[Event]] =
FlowShape(in, out)

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) { self =>
setHandler(
out,
new OutHandler {
override def onPull(): Unit = {
val snapshotHandler = new SnapshotHandler
setHandler(out, snapshotHandler)

subFusingMaterializer.materialize(
snapshotSource.toMat(snapshotHandler.subSink.sink)(Keep.left),
inheritedAttributes)
new GraphStageLogic(shape) with InHandler with OutHandler { self =>
private implicit def ec: ExecutionContext = materializer.executionContext

private var snapshotState = Map.empty[String, SnapshotState]
private val recency = RecencyList.emptyWithNanoClock[String]
private var inProgress = false

private def updateState(persistenceId: String, seqNr: Long, emitted: Boolean): Unit = {
snapshotState = snapshotState.updated(persistenceId, SnapshotState(seqNr, emitted))
recency.update(persistenceId)
if (recency.size > cacheCapacity)
recency.removeLeastRecent().foreach { pid =>
snapshotState -= pid
}
}

private val seqNrOfSnapshotCallback = getAsyncCallback[Try[(EventEnvelope[Event], Option[Long])]] {
case Success((env, Some(seqNr))) =>
inProgress = false
if (env.sequenceNr == seqNr) {
// snapshot should be emitted, load full snapshot
loadCorrespondingSnapshot(env)
} else if (env.sequenceNr > seqNr) {
// event is ahead of snapshot, emit event
updateState(env.persistenceId, seqNr, emitted = false)
push(out, env)
} else {
// snapshot will be emitted later, ignore event
updateState(env.persistenceId, seqNr, emitted = false)
tryPullOrComplete()
}
})

class SnapshotHandler extends OutHandler with InHandler {
private var snapshotOffsets = Map.empty[String, (Long, Instant)]
case Success((env, None)) =>
inProgress = false
// no snapshot, emit event
updateState(env.persistenceId, 0L, emitted = true)
push(out, env)

val subSink = new SubSinkInlet[EventEnvelope[Event]]("snapshots")
subSink.pull()
subSink.setHandler(this)
case Failure(exc) =>
inProgress = false
failStage(exc)
}

override def onPull(): Unit = {
subSink.pull()
}
private val loadSnapshotCallback = getAsyncCallback[Try[(EventEnvelope[Event], Option[SerializedSnapshotRow])]] {
case Success((env, Some(snap))) =>
inProgress = false
if (env.sequenceNr == snap.seqNr) {
push(out, createEnvelope(snap, env.offset.asInstanceOf[TimestampOffset]))
updateState(snap.persistenceId, snap.seqNr, emitted = true)
} else if (env.sequenceNr > snap.seqNr) {
// event is ahead of snapshot, emit event
updateState(snap.persistenceId, snap.seqNr, emitted = false)
push(out, env)
} else {
// snapshot will be emitted later, ignore event
updateState(snap.persistenceId, snap.seqNr, emitted = false)
tryPullOrComplete()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this covers my previous (vague) concern: if there was a new snapshot taken after the result of seqNrOfCorrespondingSnapshot, that means this load returns a newer snapshot than the env triggering it and we'll end up here. Seems fine.

}

override def onPush(): Unit = {
val env = subSink.grab()
snapshotOffsets =
snapshotOffsets.updated(env.persistenceId, env.sequenceNr -> toTimestampOffset(env.offset).timestamp)
case Success((env, None)) =>
inProgress = false
// no snapshot, emit event
updateState(env.persistenceId, 0L, emitted = true)
push(out, env)
}

override def onUpstreamFinish(): Unit = {
val primaryHandler = new PrimaryHandler(isAvailable(out))
self.setHandler(out, primaryHandler)
case Failure(exc) =>
inProgress = false
failStage(exc)
}

subFusingMaterializer.materialize(
primarySource(snapshotOffsets).toMat(primaryHandler.subSink.sink)(Keep.left),
inheritedAttributes)
override def onPush(): Unit = {
val env = grab(in)
snapshotState.get(env.persistenceId) match {
case Some(s) =>
val eventIsAfterSnapshot = env.sequenceNr > s.seqNr

if (eventIsAfterSnapshot) {
// event is after snapshot, emit event
// we can't ignore it when snapshot is not emitted, because it might have been emitted in
// previous incarnation, but then the stream was restarted
push(out, env)
} else if (!s.emitted && env.sequenceNr == s.seqNr) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is it with event deletion, we only delete up to before sequence number, not the sequence nr of the snapshot? (Or else this will not work together with deletion)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We as in Akka runtime/sdk don't delete events, until the whole entity is deleted, and then all events and snapshots are deleted, much later.

I'm not sure if snapshotting and retention strategies in EventSourcedBehavior can be setup to delete event seqNr == snapshot seqNr. However, deleting events combined with projections requires considerations anyway, and we have that documented somewhere.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, as long as we warn a bit about it somewhere that might be good enough.

// trigger emit of snapshot
loadCorrespondingSnapshot(env)
} else {
// event is before (or same as) snapshot, ignore
tryPullOrComplete()
}

case None =>
seqNrOfCorrespondingSnapshot(env)
}
}

override def onDownstreamFinish(cause: Throwable): Unit = {
subSink.cancel(cause)
completeStage()
}
override def onUpstreamFinish(): Unit = {
if (!inProgress)
super.onUpstreamFinish()
}

class PrimaryHandler(pullImmediately: Boolean) extends OutHandler with InHandler {
val subSink = new SubSinkInlet[EventEnvelope[Event]]("snapshots")
if (pullImmediately) subSink.pull()
subSink.setHandler(this)
private def tryPullOrComplete(): Unit = {
if (isClosed(in))
completeStage()
else
pull(in)
}

override def onPull(): Unit = {
subSink.pull()
}
private def seqNrOfCorrespondingSnapshot(env: EventEnvelope[Event]): Unit = {
inProgress = true
sequenceNumberOfSnapshot(env.persistenceId)
.map(result => (env, result))(ExecutionContext.parasitic)
.onComplete(seqNrOfSnapshotCallback.invoke)
}

override def onPush(): Unit = {
push(out, subSink.grab())
}
private def loadCorrespondingSnapshot(env: EventEnvelope[Event]): Unit = {
inProgress = true
loadSnapshot(env.persistenceId)
.map(result => (env, result))(ExecutionContext.parasitic)
.onComplete(loadSnapshotCallback.invoke)
}

override def onDownstreamFinish(cause: Throwable): Unit = {
subSink.cancel(cause)
completeStage()
}
override def onPull(): Unit = {
tryPullOrComplete()
}

setHandler(in, this)
setHandler(out, this)

}

}
Loading
Loading