Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# internals
ProblemFilters.exclude[MissingClassProblem]("akka.persistence.r2dbc.internal.postgres.YugabyteSnapshotDao")
10 changes: 10 additions & 0 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,16 @@ akka.persistence.r2dbc {
# a snapshot.
# See also https://doc.akka.io/libraries/akka-persistence-r2dbc/current/migration-guide.html#eventsBySlicesStartingFromSnapshots
enabled = false

# For filtering the events before the snapshots it keeps track of sequence numbers for
# the snapshots. This is the max size of the cache for these sequence numbers. When missing
# in the cache it will load the sequence number from the database.
cache-capacity = 10000

# When filtering many events before the snapshots it is still good to occasionally emit
# fictive heartbeat events for progress observations and offset storage downstreams.
# This is the number of filtered events until a heartbeat is emitted.
heartbeat-after = 1000
}

# Cache TTL for latestEventTimestamp queries. Setting this to a positive duration enables caching of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,8 @@ final class QuerySettings(config: Config) {
val persistenceIdsBufferSize: Int = config.getInt("persistence-ids.buffer-size")
val deduplicateCapacity: Int = config.getInt("deduplicate-capacity")
val startFromSnapshotEnabled: Boolean = config.getBoolean("start-from-snapshot.enabled")
val startFromSnapshotCacheCapacity: Int = config.getInt("start-from-snapshot.cache-capacity")
val startFromSnapshotHeartbeatAfter: Int = config.getInt("start-from-snapshot.heartbeat-after")
val cacheLatestEventTimestamp: Option[FiniteDuration] = config.optDuration("cache-latest-event-timestamp")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,7 @@ import org.slf4j.Logger
entityType: String,
minSlice: Int,
maxSlice: Int,
offset: Offset,
filterEventsBeforeSnapshots: (String, Long, String) => Boolean = (_, _, _) => true): Source[Envelope, NotUsed] = {
offset: Offset): Source[Envelope, NotUsed] = {
val initialOffset = toTimestampOffset(offset)

def nextOffset(state: QueryState, envelope: Envelope): QueryState = {
Expand Down Expand Up @@ -326,9 +325,6 @@ import org.slf4j.Logger
behindCurrentTime = Duration.Zero,
backtracking = false,
correlationId)
.filter { row =>
filterEventsBeforeSnapshots(row.persistenceId, row.seqNr, row.source)
}
.via(deserializeAndAddOffset(state.latest)))
} else {
if (log.isDebugEnabled)
Expand Down Expand Up @@ -369,8 +365,7 @@ import org.slf4j.Logger
entityType: String,
minSlice: Int,
maxSlice: Int,
offset: Offset,
filterEventsBeforeSnapshots: (String, Long, String) => Boolean = (_, _, _) => true): Source[Envelope, NotUsed] = {
offset: Offset): Source[Envelope, NotUsed] = {
val initialOffset = toTimestampOffset(offset)

if (log.isDebugEnabled())
Expand Down Expand Up @@ -561,9 +556,6 @@ import org.slf4j.Logger
behindCurrentTime,
backtracking = newState.backtracking,
correlationId)
.filter { row =>
filterEventsBeforeSnapshots(row.persistenceId, row.seqNr, row.source)
}
.via(deserializeAndAddOffset(newState.currentOffset)))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,12 @@ private[r2dbc] object SnapshotDao {
* INTERNAL API
*/
@InternalApi
private[r2dbc] trait SnapshotDao extends BySliceQuery.Dao[SnapshotDao.SerializedSnapshotRow] {
private[r2dbc] trait SnapshotDao {
import SnapshotDao._

def load(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SerializedSnapshotRow]]
def store(serializedRow: SerializedSnapshotRow): Future[Unit]
def delete(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit]
def sequenceNumberOfSnapshot(persistenceId: String): Future[Option[Long]]

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,99 +6,212 @@ package akka.persistence.r2dbc.internal

import java.time.Instant

import akka.NotUsed
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.util.Failure
import scala.util.Success
import scala.util.Try

import akka.annotation.InternalApi
import akka.persistence.query.TimestampOffset.toTimestampOffset
import akka.persistence.query.TimestampOffset
import akka.persistence.query.typed.EventEnvelope
import akka.persistence.r2dbc.internal.SnapshotDao.SerializedSnapshotRow
import akka.stream.Attributes
import akka.stream.FlowShape
import akka.stream.Inlet
import akka.stream.Outlet
import akka.stream.SourceShape
import akka.stream.scaladsl.Keep
import akka.stream.scaladsl.Source
import akka.stream.stage.GraphStage
import akka.stream.stage.GraphStageLogic
import akka.stream.stage.InHandler
import akka.stream.stage.OutHandler
import akka.util.RecencyList

/**
* INTERNAL API
*/
@InternalApi private[r2dbc] class StartingFromSnapshotStage[Event](
snapshotSource: Source[EventEnvelope[Event], NotUsed],
primarySource: Map[String, (Long, Instant)] => Source[EventEnvelope[Event], NotUsed])
extends GraphStage[SourceShape[EventEnvelope[Event]]] {
@InternalApi private[r2dbc] object StartingFromSnapshotStage {
private case class SnapshotState(seqNr: Long, emitted: Boolean)
}

/**
* INTERNAL API
*/
@InternalApi private[r2dbc] class StartingFromSnapshotStage[Event](
cacheCapacity: Int,
sequenceNumberOfSnapshot: String => Future[Option[Long]],
loadSnapshot: String => Future[Option[SnapshotDao.SerializedSnapshotRow]],
createEnvelope: (SerializedSnapshotRow, TimestampOffset) => EventEnvelope[Event],
heartbeatAfter: Int,
createHeartbeat: Instant => EventEnvelope[Event])
extends GraphStage[FlowShape[EventEnvelope[Event], EventEnvelope[Event]]] {
import StartingFromSnapshotStage._

val in: Inlet[EventEnvelope[Event]] = Inlet("in")
val out: Outlet[EventEnvelope[Event]] = Outlet("out")

override val shape: SourceShape[EventEnvelope[Event]] = SourceShape(out)
override val shape: FlowShape[EventEnvelope[Event], EventEnvelope[Event]] =
FlowShape(in, out)

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) { self =>
setHandler(
out,
new OutHandler {
override def onPull(): Unit = {
val snapshotHandler = new SnapshotHandler
setHandler(out, snapshotHandler)

subFusingMaterializer.materialize(
snapshotSource.toMat(snapshotHandler.subSink.sink)(Keep.left),
inheritedAttributes)
new GraphStageLogic(shape) with InHandler with OutHandler { self =>
private implicit def ec: ExecutionContext = materializer.executionContext

private var snapshotState = Map.empty[String, SnapshotState]
private val recency = RecencyList.emptyWithNanoClock[String]
private var inProgress = false

// for emitting heartbeat events
private var filterCount = 0L
private var latestTimestamp = Instant.EPOCH

private def updateState(persistenceId: String, seqNr: Long, emitted: Boolean): Unit = {
snapshotState = snapshotState.updated(persistenceId, SnapshotState(seqNr, emitted))
recency.update(persistenceId)
if (recency.size > cacheCapacity)
recency.removeLeastRecent().foreach { pid =>
snapshotState -= pid
}
})
}

class SnapshotHandler extends OutHandler with InHandler {
private var snapshotOffsets = Map.empty[String, (Long, Instant)]
private val seqNrOfSnapshotCallback = getAsyncCallback[Try[(EventEnvelope[Event], Option[Long])]] {
case Success((env, Some(seqNr))) =>
inProgress = false
if (env.sequenceNr == seqNr) {
// snapshot should be emitted, load full snapshot
loadCorrespondingSnapshot(env)
} else if (env.sequenceNr > seqNr) {
// event is ahead of snapshot, emit event
updateState(env.persistenceId, seqNr, emitted = false)
push(env)
} else if (filterCount >= heartbeatAfter) {
pushHeartbeat()
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added heartbeats when many events have been filtered out. That will give observability progress and offset storage progress downstreams. I have to deal with these heartbeats in projections. As is, they would be like filtered events, and then probably handled as duplicates in offset store. 8847b84

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, useful to have the heartbeats.

} else {
// snapshot will be emitted later, ignore event
updateState(env.persistenceId, seqNr, emitted = false)
ignore(env)
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In places that we're pushing a heartbeat, rather than ignoring the envelope, should it also call updateState? Similar for loadSnapshotCallback. And also the update of the latest timestamp in ignore. Would seem that we should always do the same as ignoring/filtering, but also push the heartbeat. Could be simpler to not repeat logic, and move the heartbeat push into the ignore method?


val subSink = new SubSinkInlet[EventEnvelope[Event]]("snapshots")
subSink.pull()
subSink.setHandler(this)
case Success((env, None)) =>
inProgress = false
// no snapshot, emit event
updateState(env.persistenceId, 0L, emitted = true)
push(env)

override def onPull(): Unit = {
subSink.pull()
}
case Failure(exc) =>
inProgress = false
failStage(exc)
}

override def onPush(): Unit = {
val env = subSink.grab()
snapshotOffsets =
snapshotOffsets.updated(env.persistenceId, env.sequenceNr -> toTimestampOffset(env.offset).timestamp)
push(out, env)
}
private val loadSnapshotCallback = getAsyncCallback[Try[(EventEnvelope[Event], Option[SerializedSnapshotRow])]] {
case Success((env, Some(snap))) =>
inProgress = false
if (env.sequenceNr == snap.seqNr) {
push(createEnvelope(snap, env.offset.asInstanceOf[TimestampOffset]))
updateState(snap.persistenceId, snap.seqNr, emitted = true)
} else if (env.sequenceNr > snap.seqNr) {
// event is ahead of snapshot, emit event
updateState(snap.persistenceId, snap.seqNr, emitted = false)
push(env)
} else if (filterCount >= heartbeatAfter) {
pushHeartbeat()
} else {
// snapshot will be emitted later, ignore event
updateState(snap.persistenceId, snap.seqNr, emitted = false)
ignore(env)
}

override def onUpstreamFinish(): Unit = {
val primaryHandler = new PrimaryHandler(isAvailable(out))
self.setHandler(out, primaryHandler)
case Success((env, None)) =>
inProgress = false
// no snapshot, emit event
updateState(env.persistenceId, 0L, emitted = true)
push(env)

subFusingMaterializer.materialize(
primarySource(snapshotOffsets).toMat(primaryHandler.subSink.sink)(Keep.left),
inheritedAttributes)
case Failure(exc) =>
inProgress = false
failStage(exc)
}

override def onPush(): Unit = {
val env = grab(in)
snapshotState.get(env.persistenceId) match {
case Some(s) =>
val eventIsAfterSnapshot = env.sequenceNr > s.seqNr

if (eventIsAfterSnapshot) {
// event is after snapshot, emit event
// we can't ignore it when snapshot is not emitted, because it might have been emitted in
// previous incarnation, but then the stream was restarted
push(env)
} else if (!s.emitted && env.sequenceNr == s.seqNr) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is it with event deletion, we only delete up to before sequence number, not the sequence nr of the snapshot? (Or else this will not work together with deletion)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We as in Akka runtime/sdk don't delete events, until the whole entity is deleted, and then all events and snapshots are deleted, much later.

I'm not sure if snapshotting and retention strategies in EventSourcedBehavior can be setup to delete event seqNr == snapshot seqNr. However, deleting events combined with projections requires considerations anyway, and we have that documented somewhere.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, as long as we warn a bit about it somewhere that might be good enough.

// trigger emit of snapshot
loadCorrespondingSnapshot(env)
} else if (filterCount >= heartbeatAfter) {
pushHeartbeat()
} else {
// event is before (or same as) snapshot, ignore
ignore(env)
}

case None =>
seqNrOfCorrespondingSnapshot(env)
}
}

override def onDownstreamFinish(cause: Throwable): Unit = {
subSink.cancel(cause)
override def onUpstreamFinish(): Unit = {
if (!inProgress)
super.onUpstreamFinish()
}

private def tryPullOrComplete(): Unit = {
if (isClosed(in))
completeStage()
}
else
pull(in)
}

class PrimaryHandler(pullImmediately: Boolean) extends OutHandler with InHandler {
val subSink = new SubSinkInlet[EventEnvelope[Event]]("snapshots")
if (pullImmediately) subSink.pull()
subSink.setHandler(this)
private def push(env: EventEnvelope[Event]): Unit = {
filterCount = 0L
push(out, env)
}

override def onPull(): Unit = {
subSink.pull()
}
private def pushHeartbeat(): Unit = {
filterCount = 1L
push(out, createHeartbeat(latestTimestamp))
}

override def onPush(): Unit = {
push(out, subSink.grab())
}
private def ignore(env: EventEnvelope[Event]): Unit = {
filterCount += 1
updateLatestTimestamp(env)
tryPullOrComplete()
}
Comment on lines +171 to +179
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be simpler to remove the pushHeartbeat method and only have the ignore method to call from other places, and move the heartbeat logic into this method?

private def ignore(env: EventEnvelope[Event]): Unit = {
  updateLatestTimestamp(env)
  if (filterCount >= heartbeatAfter) {
    filterCount = 1L
    push(out, createHeartbeat(latestTimestamp))
  } else {
    filterCount += 1
    tryPullOrComplete()
  }
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion, thanks. 8917f32


override def onDownstreamFinish(cause: Throwable): Unit = {
subSink.cancel(cause)
completeStage()
}
private def updateLatestTimestamp(env: EventEnvelope[Event]): Unit = {
val timestamp = env.offset.asInstanceOf[TimestampOffset].timestamp
if (timestamp.isAfter(latestTimestamp))
latestTimestamp = timestamp
}

private def seqNrOfCorrespondingSnapshot(env: EventEnvelope[Event]): Unit = {
inProgress = true
sequenceNumberOfSnapshot(env.persistenceId)
.map(result => (env, result))(ExecutionContext.parasitic)
.onComplete(seqNrOfSnapshotCallback.invoke)
}

private def loadCorrespondingSnapshot(env: EventEnvelope[Event]): Unit = {
inProgress = true
loadSnapshot(env.persistenceId)
.map(result => (env, result))(ExecutionContext.parasitic)
.onComplete(loadSnapshotCallback.invoke)
}

override def onPull(): Unit = {
tryPullOrComplete()
}

setHandler(in, this)
setHandler(out, this)

}

}
Loading
Loading