Skip to content

Commit f98e8c4

Browse files
committed
Configuring persistence plugins at runtime for EventSourcedBehavior
1 parent 138e419 commit f98e8c4

File tree

13 files changed

+250
-52
lines changed

13 files changed

+250
-52
lines changed

akka-persistence-testkit/src/main/scala/akka/persistence/testkit/PersistenceTestKitPlugin.scala

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,9 +104,18 @@ object PersistenceTestKitPlugin {
104104
* Persistence testkit plugin for snapshots.
105105
*/
106106
@InternalApi
107-
class PersistenceTestKitSnapshotPlugin extends SnapshotStore {
107+
class PersistenceTestKitSnapshotPlugin(
108+
// providing this parameter in first position as unused
109+
// because Persistence extension that instantiates the plugins
110+
// does not support constructors without it
111+
@nowarn("msg=never used") cfg: Config,
112+
cfgPath: String)
113+
extends SnapshotStore {
108114

109-
private final val storage = SnapshotStorageEmulatorExtension(context.system)
115+
private final val storage = {
116+
log.debug("Using snapshot storage emulator extension [{}] for test kit snapshot storage", cfgPath)
117+
SnapshotStorageEmulatorExtension(context.system).storageFor(cfgPath)
118+
}
110119

111120
override def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] =
112121
Future.fromTry(Try(storage.tryRead(persistenceId, criteria)))

akka-persistence-testkit/src/main/scala/akka/persistence/testkit/internal/SnapshotStorageEmulatorExtension.scala

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
package akka.persistence.testkit.internal
66

7+
import java.util.concurrent.ConcurrentHashMap
8+
79
import akka.actor.{ ActorSystem, ExtendedActorSystem, ExtensionId, ExtensionIdProvider }
810
import akka.actor.Extension
911
import akka.annotation.InternalApi
@@ -14,17 +16,34 @@ import akka.persistence.testkit.scaladsl.SnapshotTestKit
1416
* INTERNAL API
1517
*/
1618
@InternalApi
17-
private[testkit] object SnapshotStorageEmulatorExtension extends ExtensionId[SnapshotStorage] with ExtensionIdProvider {
19+
private[testkit] object SnapshotStorageEmulatorExtension
20+
extends ExtensionId[SnapshotStorageEmulatorExtension]
21+
with ExtensionIdProvider {
1822

19-
override def get(system: ActorSystem): SnapshotStorage = super.get(system)
23+
override def get(system: ActorSystem): SnapshotStorageEmulatorExtension = super.get(system)
2024

21-
override def createExtension(system: ExtendedActorSystem): SnapshotStorage =
22-
if (SnapshotTestKit.Settings(system).serialize) {
23-
new SerializedSnapshotStorageImpl(system)
24-
} else {
25-
new SimpleSnapshotStorageImpl
26-
}
25+
override def createExtension(system: ExtendedActorSystem): SnapshotStorageEmulatorExtension =
26+
new SnapshotStorageEmulatorExtension(system)
2727

2828
override def lookup: ExtensionId[_ <: Extension] =
2929
SnapshotStorageEmulatorExtension
3030
}
31+
32+
/**
33+
* INTERNAL API
34+
*/
35+
@InternalApi
36+
final class SnapshotStorageEmulatorExtension(system: ExtendedActorSystem) extends Extension {
37+
private val stores = new ConcurrentHashMap[String, SnapshotStorage]()
38+
private lazy val shouldCreateSerializedSnapshotStorage = SnapshotTestKit.Settings(system).serialize
39+
40+
def storageFor(key: String): SnapshotStorage =
41+
stores.computeIfAbsent(key, _ => {
42+
// we don't really care about the key here, we just want separate instances
43+
if (shouldCreateSerializedSnapshotStorage) {
44+
new SerializedSnapshotStorageImpl(system)
45+
} else {
46+
new SimpleSnapshotStorageImpl
47+
}
48+
})
49+
}

akka-persistence-testkit/src/main/scala/akka/persistence/testkit/scaladsl/PersistenceTestKit.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -328,7 +328,8 @@ class SnapshotTestKit(system: ActorSystem)
328328

329329
import SnapshotTestKit._
330330

331-
override protected val storage: SnapshotStorage = SnapshotStorageEmulatorExtension(system)
331+
override protected val storage: SnapshotStorage =
332+
SnapshotStorageEmulatorExtension(system).storageFor(PersistenceTestKitSnapshotPlugin.PluginId)
332333

333334
override def getItem(persistenceId: String, nextInd: Int): Option[Any] = {
334335
storage.firstInExpectNextQueue(persistenceId).map(reprToAny)
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/*
2+
* Copyright (C) 2020-2024 Lightbend Inc. <https://www.lightbend.com>
3+
*/
4+
5+
package akka.persistence.testkit.scaladsl
6+
7+
import akka.Done
8+
import akka.actor.testkit.typed.scaladsl.LogCapturing
9+
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
10+
import akka.actor.typed.ActorRef
11+
import akka.actor.typed.Behavior
12+
import akka.actor.typed.scaladsl.adapter._
13+
import akka.persistence.JournalProtocol.RecoverySuccess
14+
import akka.persistence.JournalProtocol.ReplayMessages
15+
import akka.persistence.JournalProtocol.ReplayedMessage
16+
import akka.persistence.Persistence
17+
import akka.persistence.SelectedSnapshot
18+
import akka.persistence.SnapshotProtocol.LoadSnapshot
19+
import akka.persistence.SnapshotProtocol.LoadSnapshotResult
20+
import akka.persistence.SnapshotSelectionCriteria
21+
import akka.persistence.testkit.PersistenceTestKitPlugin
22+
import akka.persistence.testkit.PersistenceTestKitSnapshotPlugin
23+
import akka.persistence.typed.PersistenceId
24+
import akka.persistence.typed.scaladsl.Effect
25+
import akka.persistence.typed.scaladsl.EventSourcedBehavior
26+
import akka.persistence.typed.scaladsl.RetentionCriteria
27+
import com.typesafe.config.ConfigFactory
28+
import org.scalatest.Inside
29+
import org.scalatest.wordspec.AnyWordSpecLike
30+
31+
object RuntimeJournalsSpec {
32+
33+
private object Actor {
34+
sealed trait Command
35+
case class Save(text: String, replyTo: ActorRef[Done]) extends Command
36+
case class ShowMeWhatYouGot(replyTo: ActorRef[String]) extends Command
37+
case object Stop extends Command
38+
39+
def apply(persistenceId: String, journal: String): Behavior[Command] =
40+
EventSourcedBehavior[Command, String, String](
41+
PersistenceId.ofUniqueId(persistenceId),
42+
"",
43+
(state, cmd) =>
44+
cmd match {
45+
case Save(text, replyTo) =>
46+
Effect.persist(text).thenRun(_ => replyTo ! Done)
47+
case ShowMeWhatYouGot(replyTo) =>
48+
replyTo ! state
49+
Effect.none
50+
case Stop =>
51+
Effect.stop()
52+
},
53+
(state, evt) => Seq(state, evt).filter(_.nonEmpty).mkString("|"))
54+
.withRetention(RetentionCriteria.snapshotEvery(1, Int.MaxValue))
55+
.withJournalPluginId(s"$journal.journal")
56+
.withJournalPluginConfig(Some(config(journal)))
57+
.withSnapshotPluginId(s"$journal.snapshot")
58+
.withSnapshotPluginConfig(Some(config(journal)))
59+
60+
}
61+
62+
private def config(journal: String) = {
63+
ConfigFactory.parseString(s"""
64+
$journal {
65+
journal.class = "${classOf[PersistenceTestKitPlugin].getName}"
66+
snapshot.class = "${classOf[PersistenceTestKitSnapshotPlugin].getName}"
67+
}
68+
""")
69+
}
70+
}
71+
72+
class RuntimeJournalsSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with LogCapturing with Inside {
73+
74+
import RuntimeJournalsSpec._
75+
76+
"The testkit journal and snapshot store plugins" must {
77+
78+
"be possible to configure at runtime and use in multiple isolated instances" in {
79+
val probe = createTestProbe[Any]()
80+
81+
{
82+
// one actor in each journal with same id
83+
val j1 = spawn(Actor("id1", "journal1"))
84+
val j2 = spawn(Actor("id1", "journal2"))
85+
j1 ! Actor.Save("j1m1", probe.ref)
86+
probe.receiveMessage()
87+
j2 ! Actor.Save("j2m1", probe.ref)
88+
probe.receiveMessage()
89+
}
90+
91+
{
92+
def assertJournal(journal: String, expectedEvent: String) = {
93+
val ref = Persistence(system).journalFor(s"$journal.journal", config(journal))
94+
ref.tell(ReplayMessages(0, Long.MaxValue, Long.MaxValue, "id1", probe.ref.toClassic), probe.ref.toClassic)
95+
inside(probe.receiveMessage()) {
96+
case ReplayedMessage(persistentRepr) =>
97+
persistentRepr.persistenceId shouldBe "id1"
98+
persistentRepr.payload shouldBe expectedEvent
99+
}
100+
probe.expectMessage(RecoverySuccess(1))
101+
}
102+
103+
assertJournal("journal1", "j1m1")
104+
assertJournal("journal2", "j2m1")
105+
}
106+
107+
{
108+
def assertSnapshot(journal: String, expectedShapshot: String) = {
109+
val ref = Persistence(system).snapshotStoreFor(s"$journal.snapshot", config(journal))
110+
ref.tell(LoadSnapshot("id1", SnapshotSelectionCriteria.Latest, Long.MaxValue), probe.ref.toClassic)
111+
inside(probe.receiveMessage()) {
112+
case LoadSnapshotResult(Some(SelectedSnapshot(_, snapshot)), _) =>
113+
snapshot shouldBe expectedShapshot
114+
}
115+
}
116+
117+
assertSnapshot("journal1", "j1m1")
118+
assertSnapshot("journal2", "j2m1")
119+
}
120+
}
121+
}
122+
}
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.typed.scaladsl.EventSourcedBehavior.withJournalPluginConfig")
2+
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.typed.scaladsl.EventSourcedBehavior.withSnapshotPluginConfig")

akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/BehaviorSetup.scala

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,11 @@ import akka.persistence.typed.scaladsl.ReplicationInterceptor
2323
import akka.persistence.typed.scaladsl.RetentionCriteria
2424
import akka.persistence.typed.telemetry.EventSourcedBehaviorInstrumentation
2525
import akka.persistence.typed.scaladsl.SnapshotWhenPredicate
26+
import akka.util.Helpers.ConfigOps
2627
import akka.util.OptionVal
28+
import com.typesafe.config.ConfigFactory
29+
30+
import scala.concurrent.duration.FiniteDuration
2731

2832
/**
2933
* INTERNAL API
@@ -70,8 +74,11 @@ private[akka] final class BehaviorSetup[C, E, S](
7074

7175
val persistence: Persistence = Persistence(context.system.toClassic)
7276

73-
val journal: ClassicActorRef = persistence.journalFor(settings.journalPluginId)
74-
val snapshotStore: ClassicActorRef = persistence.snapshotStoreFor(settings.snapshotPluginId)
77+
val journal: ClassicActorRef =
78+
persistence.journalFor(settings.journalPluginId, settings.journalPluginConfig.getOrElse(ConfigFactory.empty))
79+
val snapshotStore: ClassicActorRef = persistence.snapshotStoreFor(
80+
settings.snapshotPluginId,
81+
settings.snapshotPluginConfig.getOrElse(ConfigFactory.empty))
7582

7683
val (isSnapshotOptional: Boolean, isOnlyOneSnapshot: Boolean) = {
7784
val snapshotStoreConfig = Persistence(context.system.classicSystem).configFor(snapshotStore)
@@ -125,16 +132,19 @@ private[akka] final class BehaviorSetup[C, E, S](
125132

126133
private var recoveryTimer: OptionVal[Cancellable] = OptionVal.None
127134

135+
val recoveryEventTimeout: FiniteDuration = persistence
136+
.journalConfigFor(settings.journalPluginId, settings.journalPluginConfig.getOrElse(ConfigFactory.empty))
137+
.getMillisDuration("recovery-event-timeout")
138+
128139
def startRecoveryTimer(snapshot: Boolean): Unit = {
129140
cancelRecoveryTimer()
130141
implicit val ec: ExecutionContext = context.executionContext
131142
val timer =
132143
if (snapshot)
133-
context.scheduleOnce(settings.recoveryEventTimeout, context.self, RecoveryTickEvent(snapshot = true))
144+
context.scheduleOnce(recoveryEventTimeout, context.self, RecoveryTickEvent(snapshot = true))
134145
else
135-
context.system.scheduler.scheduleWithFixedDelay(settings.recoveryEventTimeout, settings.recoveryEventTimeout) {
136-
() =>
137-
context.self ! RecoveryTickEvent(snapshot = false)
146+
context.system.scheduler.scheduleWithFixedDelay(recoveryEventTimeout, recoveryEventTimeout) { () =>
147+
context.self ! RecoveryTickEvent(snapshot = false)
138148
}
139149
recoveryTimer = OptionVal.Some(timer)
140150
}

akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package akka.persistence.typed.internal
77
import java.util.Optional
88
import java.util.UUID
99
import java.util.concurrent.atomic.AtomicInteger
10+
1011
import org.slf4j.LoggerFactory
1112
import akka.Done
1213
import akka.actor.typed
@@ -44,6 +45,7 @@ import akka.persistence.typed.scaladsl._
4445
import akka.persistence.typed.scaladsl.{ Recovery => TypedRecovery }
4546
import akka.persistence.typed.scaladsl.RetentionCriteria
4647
import akka.persistence.typed.telemetry.EventSourcedBehaviorInstrumentationProvider
48+
import com.typesafe.config.Config
4749

4850
@InternalApi
4951
private[akka] object EventSourcedBehaviorImpl {
@@ -93,6 +95,8 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State](
9395
loggerClass: Class[_],
9496
journalPluginId: Option[String] = None,
9597
snapshotPluginId: Option[String] = None,
98+
journalPluginConfig: Option[Config] = None,
99+
snapshotPluginConfig: Option[Config] = None,
96100
tagger: (State, Event) => Set[String] = (_: State, _: Event) => Set.empty[String],
97101
eventAdapter: EventAdapter[Event, Any] = NoOpEventAdapter.instance[Event],
98102
snapshotAdapter: SnapshotAdapter[State] = NoOpSnapshotAdapter.instance[State],
@@ -139,7 +143,9 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State](
139143
ctx.system,
140144
journalPluginId.getOrElse(""),
141145
snapshotPluginId.getOrElse(""),
142-
customStashCapacity)
146+
customStashCapacity,
147+
journalPluginConfig,
148+
snapshotPluginConfig)
143149

144150
// stashState outside supervise because StashState should survive restarts due to persist failures
145151
val stashState = new StashState(ctx.asInstanceOf[ActorContext[InternalProtocol]], settings)
@@ -271,6 +277,14 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State](
271277
copy(snapshotPluginId = if (id != "") Some(id) else None)
272278
}
273279

280+
override def withJournalPluginConfig(config: Option[Config]): EventSourcedBehavior[Command, Event, State] = {
281+
copy(journalPluginConfig = config)
282+
}
283+
284+
override def withSnapshotPluginConfig(config: Option[Config]): EventSourcedBehavior[Command, Event, State] = {
285+
copy(snapshotPluginConfig = config)
286+
}
287+
274288
override def withSnapshotSelectionCriteria(
275289
selection: SnapshotSelectionCriteria): EventSourcedBehavior[Command, Event, State] = {
276290
copy(recovery = Recovery(selection.toClassic))

0 commit comments

Comments
 (0)