Skip to content

Commit 7e4e6f1

Browse files
authored
feat: Recovery based on only the last event (#32629)
* Introduced in a compatible way so that it still works if used with a plugin that doesn't implement this. In that case the snapshot will still not be loaded but all events will be replayed instead of only the last.
1 parent 2686dcb commit 7e4e6f1

File tree

23 files changed

+272
-37
lines changed

23 files changed

+272
-37
lines changed

akka-docs/src/main/paradox/persistence-plugins.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ AWS DynamoDB can be used as backend for Akka Persistence with the [Akka Persiste
2020

2121
@ref:[Durable State](./typed/index-persistence-durable-state.md) is not supported by the DynamoDB plugin.
2222

23+
Recovery from only last event is not supported by the DynamoDB plugin.
24+
2325
## JDBC plugin
2426

2527
Relational databases with JDBC-drivers are supported through [Akka Persistence JDBC](https://doc.akka.io/libraries/akka-persistence-jdbc/current/). For new projects, the @ref:[R2DBC plugin](#r2dbc-plugin) is recommended.
@@ -38,6 +40,7 @@ Example of concrete features _not_ supported by the Cassandra and JDBC plugins:
3840
* Projections starting from snapshots
3941
* Scalability of many Projections
4042
* Durable State entities (partly supported by JDBC plugin)
43+
* Recovery from only last event
4144

4245
## Enabling a plugin
4346

akka-docs/src/main/paradox/typed/persistence.md

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -546,6 +546,35 @@ Please refer to @ref[snapshots](persistence-snapshot.md#snapshots) if you need t
546546

547547
In any case, the highest sequence number will always be recovered so you can keep persisting new events without corrupting your event log.
548548

549+
@@@ warning
550+
551+
Disable of recovery is not normal behavior of an event sourced actor, since events and snapshots are not used for
552+
the recovery of the actor.
553+
554+
@@@
555+
556+
### Recovery from only last event
557+
558+
For some use cases it is enough to recover the actor from the last event, as an optimization to not replay all events.
559+
You can enable this recovery mode with:
560+
561+
Scala
562+
: @@snip [BasicPersistentBehaviorCompileOnly.scala](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala) { #replay-last }
563+
564+
Java
565+
: @@snip [BasicPersistentBehaviorTest.java](/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorTest.java) { #replay-last }
566+
567+
Snapshots are not loaded when recovery from last event is selected.
568+
569+
@@@ warning
570+
571+
Recovery from only last event is not normal behavior of an event sourced actor, since it typically would need
572+
all events, or a snapshot and events after the snapshot, to recover its state.
573+
574+
This feature is currently only supported by the R2DBC plugin.
575+
576+
@@@
577+
549578
## Tagging
550579

551580
Persistence allows you to use event tags without using an @ref[`EventAdapter`](../persistence.md#event-adapters):

akka-persistence-tck/src/main/scala/akka/persistence/CapabilityFlags.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,11 @@ trait JournalCapabilityFlags extends CapabilityFlags {
5858
*/
5959
protected def supportsMetadata: CapabilityFlag
6060

61+
/**
62+
* When `true` enables tests which check if the Journal can replay only the last event.
63+
*/
64+
protected def supportsReplayOnlyLast: CapabilityFlag
65+
6166
}
6267
//#journal-flags
6368

akka-persistence-tck/src/main/scala/akka/persistence/journal/JournalSpec.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ abstract class JournalSpec(config: Config)
5555

5656
override protected def supportsMetadata: CapabilityFlag = false
5757

58+
override protected def supportsReplayOnlyLast: CapabilityFlag = false
59+
5860
override protected def beforeEach(): Unit = {
5961
super.beforeEach()
6062
senderProbe = TestProbe()
@@ -188,6 +190,12 @@ abstract class JournalSpec(config: Config)
188190
journal ! ReplayMessages(0, Long.MaxValue, Long.MaxValue, "non-existing-pid", receiverProbe.ref)
189191
receiverProbe.expectMsg(RecoverySuccess(highestSequenceNr = 0L))
190192
}
193+
"retrieve highest sequenceNr without replaying events" in {
194+
journal ! ReplayMessages(0, 0, 1, pid, receiverProbe.ref)
195+
// no replayedMessage
196+
receiverProbe.expectMsg(RecoverySuccess(highestSequenceNr = 5L))
197+
}
198+
191199
"not replay permanently deleted messages (range deletion)" in {
192200
val receiverProbe2 = TestProbe()
193201
val cmd = DeleteMessagesTo(pid, 3, receiverProbe2.ref)
@@ -236,6 +244,7 @@ abstract class JournalSpec(config: Config)
236244
journal ! ReplayMessages(0, Long.MaxValue, Long.MaxValue, pid, receiverProbe.ref)
237245
receiverProbe.expectMsg(RecoverySuccess(highestSequenceNr = 5L))
238246
}
247+
239248
}
240249

241250
"A Journal optionally".may {
@@ -348,5 +357,14 @@ abstract class JournalSpec(config: Config)
348357

349358
}
350359
}
360+
361+
optional(flag = supportsReplayOnlyLast) {
362+
"replay only last when fromSequenceNr is -1" in {
363+
journal ! ReplayMessages(-1, Long.MaxValue, Long.MaxValue, pid, receiverProbe.ref)
364+
receiverProbe.expectMsg(replayedMessage(snr = 5))
365+
receiverProbe.expectMsg(RecoverySuccess(highestSequenceNr = 5L))
366+
}
367+
}
368+
351369
}
352370
}

akka-persistence-tck/src/main/scala/akka/persistence/snapshot/SnapshotStoreSpec.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@
44

55
package akka.persistence.snapshot
66

7-
import scala.collection.immutable.Seq
8-
97
import com.typesafe.config.Config
108
import com.typesafe.config.ConfigFactory
119

@@ -222,5 +220,6 @@ abstract class SnapshotStoreSpec(config: Config)
222220
}
223221
}
224222
}
223+
225224
}
226225
}

akka-persistence-tck/src/test/scala/akka/persistence/journal/inmem/InmemJournalSpec.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,5 +9,7 @@ import akka.persistence.PersistenceSpec
99
import akka.persistence.journal.JournalSpec
1010

1111
class InmemJournalSpec extends JournalSpec(config = PersistenceSpec.config("inmem", "InmemJournalSpec")) {
12-
override protected def supportsRejectingNonSerializableObjects: CapabilityFlag = CapabilityFlag.off()
12+
override protected def supportsRejectingNonSerializableObjects: CapabilityFlag = false
13+
14+
override protected def supportsReplayOnlyLast: CapabilityFlag = true
1315
}

akka-persistence-testkit/src/main/scala/akka/persistence/testkit/PersistenceTestKitPlugin.scala

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,10 +59,15 @@ class PersistenceTestKitPlugin(@nowarn("msg=never used") cfg: Config, cfgPath: S
5959

6060
override def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(
6161
recoveryCallback: PersistentRepr => Unit): Future[Unit] =
62-
Future.fromTry(
63-
Try(
62+
Future.fromTry(Try {
63+
val highest = storage.tryReadSeqNumber(persistenceId)
64+
if (highest != 0L && max != 0L) {
65+
val to = math.min(toSequenceNr, highest)
66+
// read only last when fromSequenceNr is -1
67+
val from = if (fromSequenceNr == -1) to else fromSequenceNr
68+
6469
storage
65-
.tryRead(persistenceId, fromSequenceNr, toSequenceNr, max)
70+
.tryRead(persistenceId, from, to, max)
6671
.map { repr =>
6772
// we keep the tags in the repr, so remove those here
6873
repr.payload match {
@@ -71,7 +76,11 @@ class PersistenceTestKitPlugin(@nowarn("msg=never used") cfg: Config, cfgPath: S
7176
}
7277

7378
}
74-
.foreach(recoveryCallback)))
79+
.foreach(recoveryCallback)
80+
} else {
81+
Future.successful(())
82+
}
83+
})
7584

7685
override def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] =
7786
Future.fromTry(Try {

akka-persistence-testkit/src/test/scala/akka/persistence/testkit/scaladsl/PersistenceTestKitJournalCompatSpec.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ class PersistenceTestKitJournalCompatSpec extends JournalSpec(config = Persisten
3838

3939
override protected def supportsRejectingNonSerializableObjects: CapabilityFlag = true
4040
override protected def supportsMetadata: CapabilityFlag = true
41+
override protected def supportsReplayOnlyLast: CapabilityFlag = true
42+
4143
}
4244

4345
class PersistenceTestKitSnapshotStoreCompatSpec
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Copyright (C) 2017-2024 Lightbend Inc. <https://www.lightbend.com>
3+
*/
4+
5+
package akka.persistence.typed.scaladsl
6+
7+
import java.util.concurrent.atomic.AtomicInteger
8+
9+
import scala.annotation.nowarn
10+
11+
import org.scalatest.matchers.should.Matchers
12+
import org.scalatest.wordspec.AnyWordSpecLike
13+
14+
import akka.actor.testkit.typed.scaladsl._
15+
import akka.actor.typed.ActorRef
16+
import akka.actor.typed.scaladsl.ActorContext
17+
import akka.actor.typed.scaladsl.Behaviors
18+
import akka.persistence.testkit.PersistenceTestKitPlugin
19+
import akka.persistence.testkit.PersistenceTestKitSnapshotPlugin
20+
import akka.persistence.typed.PersistenceId
21+
import akka.serialization.jackson.CborSerializable
22+
23+
object EventSourcedBehaviorReplayLastSpec extends Matchers {
24+
25+
sealed trait Command extends CborSerializable
26+
final case class Increment(id: String) extends Command
27+
final case class GetValue(replyTo: ActorRef[State]) extends Command
28+
29+
sealed trait Event extends CborSerializable
30+
final case class Incremented(id: String) extends Event
31+
32+
final case class State(value: Int, history: Vector[String]) extends CborSerializable
33+
34+
def counter(
35+
@nowarn("msg=never used") ctx: ActorContext[Command],
36+
persistenceId: PersistenceId,
37+
probe: Option[ActorRef[(State, Event)]] = None): EventSourcedBehavior[Command, Event, State] = {
38+
EventSourcedBehavior[Command, Event, State](
39+
persistenceId,
40+
emptyState = State(0, Vector.empty),
41+
commandHandler = (state, cmd) =>
42+
cmd match {
43+
case Increment(id) =>
44+
Effect.persist(Incremented(id))
45+
46+
case GetValue(replyTo) =>
47+
replyTo ! state
48+
Effect.none
49+
50+
},
51+
eventHandler = (state, evt) =>
52+
evt match {
53+
case Incremented(id) =>
54+
probe.foreach(_ ! ((state, evt)))
55+
State(state.value + 1, state.history :+ id)
56+
})
57+
}.withRecovery(Recovery.replayOnlyLast)
58+
59+
}
60+
61+
class EventSourcedBehaviorReplayLastSpec
62+
extends ScalaTestWithActorTestKit(
63+
PersistenceTestKitPlugin.config.withFallback(PersistenceTestKitSnapshotPlugin.config))
64+
with AnyWordSpecLike
65+
with LogCapturing {
66+
67+
import EventSourcedBehaviorReplayLastSpec._
68+
69+
val pidCounter = new AtomicInteger(0)
70+
private def nextPid(): PersistenceId = PersistenceId.ofUniqueId(s"c${pidCounter.incrementAndGet()}")
71+
72+
"EventSourcedBehavior with Recovery.replayOnlyLast" must {
73+
"recover from last event only" in {
74+
val pid = nextPid()
75+
val c = spawn(Behaviors.setup[Command](ctx => counter(ctx, pid)))
76+
val replyProbe = TestProbe[State]()
77+
78+
c ! Increment("a")
79+
c ! Increment("b")
80+
c ! Increment("c")
81+
c ! GetValue(replyProbe.ref)
82+
replyProbe.expectMessage(State(3, Vector("a", "b", "c")))
83+
testKit.stop(c)
84+
replyProbe.expectTerminated(c)
85+
86+
val probe = TestProbe[(State, Event)]()
87+
val c2 = spawn(Behaviors.setup[Command](ctx => counter(ctx, pid, Some(probe.ref))))
88+
// replayed the last event
89+
probe.expectMessage((State(0, Vector.empty), Incremented("c")))
90+
probe.expectNoMessage()
91+
92+
c2 ! GetValue(replyProbe.ref)
93+
replyProbe.expectMessage(State(1, Vector("c")))
94+
95+
c2 ! Increment("d")
96+
c2 ! GetValue(replyProbe.ref)
97+
replyProbe.expectMessage(State(2, Vector("c", "d")))
98+
}
99+
100+
}
101+
}

akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorWatchSpec.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ import akka.actor.testkit.typed.scaladsl.TestProbe
1717
import akka.actor.typed._
1818
import akka.actor.typed.scaladsl.ActorContext
1919
import akka.actor.typed.scaladsl.Behaviors
20-
import akka.persistence.{ Recovery => ClassicRecovery }
2120
import akka.persistence.typed.NoOpEventAdapter
2221
import akka.persistence.typed.PersistenceId
2322
import akka.persistence.typed.RecoveryCompleted
@@ -69,7 +68,7 @@ class EventSourcedBehaviorWatchSpec
6968
NoOpEventAdapter.instance[String],
7069
NoOpSnapshotAdapter.instance[String],
7170
snapshotWhen = SnapshotWhenPredicate.noSnapshot,
72-
ClassicRecovery(),
71+
Recovery.default,
7372
RetentionCriteria.disabled,
7473
holdingRecoveryPermit = false,
7574
settings = settings,

0 commit comments

Comments
 (0)