Skip to content

Commit 3cac79d

Browse files
authored
feat: Transform event and metadata of replicated event (#32650)
* transform incoming replicated event and metadata before it is stored * test and hardening of metadata
1 parent 8deef53 commit 3cac79d

File tree

13 files changed

+306
-93
lines changed

13 files changed

+306
-93
lines changed

akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicatedEventPublishingSpec.scala

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package akka.persistence.typed
66

77
import org.scalatest.wordspec.AnyWordSpecLike
8+
89
import akka.Done
910
import akka.actor.testkit.typed.TestException
1011
import akka.actor.testkit.typed.scaladsl.LogCapturing
@@ -18,11 +19,12 @@ import akka.persistence.typed.internal.{ ReplicatedPublishedEventMetaData, Versi
1819
import akka.persistence.typed.scaladsl.Effect
1920
import akka.persistence.typed.scaladsl.EventSourcedBehavior
2021
import akka.persistence.typed.scaladsl.ReplicatedEventSourcing
21-
2222
import scala.concurrent.ExecutionContext
2323
import scala.concurrent.Future
2424
import scala.concurrent.duration.DurationInt
2525

26+
import akka.persistence.typed.scaladsl.EventWithMetadata
27+
2628
object ReplicatedEventPublishingSpec {
2729

2830
val EntityType = "EventPublishingSpec"
@@ -433,6 +435,37 @@ class ReplicatedEventPublishingSpec
433435
probe.expectTerminated(actor)
434436
}
435437

438+
"transform replicated events between two entities" in {
439+
val id = nextEntityId()
440+
val probe = createTestProbe[Any]()
441+
case class Intercepted(origin: ReplicaId, seqNr: Long, event: String)
442+
val addTransformation
443+
: EventSourcedBehavior[MyReplicatedBehavior.Command, String, Set[String]] => EventSourcedBehavior[
444+
MyReplicatedBehavior.Command,
445+
String,
446+
Set[String]] =
447+
_.withReplicatedEventTransformation { (_, eventWithMeta) =>
448+
EventWithMetadata(eventWithMeta.event.toUpperCase, Nil)
449+
}
450+
val actor = spawn(MyReplicatedBehavior(id, DCA, Set(DCA, DCB), modifyBehavior = addTransformation))
451+
actor ! MyReplicatedBehavior.Add("one", probe.ref)
452+
probe.expectMessage(Done)
453+
454+
// simulate a published event from another replica
455+
actor.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl(
456+
ReplicationId(EntityType, id, DCB).persistenceId,
457+
1L,
458+
"two",
459+
System.currentTimeMillis(),
460+
Some(new ReplicatedPublishedEventMetaData(DCB, VersionVector.empty, None)),
461+
None)
462+
actor ! MyReplicatedBehavior.Add("three", probe.ref)
463+
probe.expectMessage(Done)
464+
465+
actor ! MyReplicatedBehavior.Get(probe.ref)
466+
probe.expectMessage(Set("one", "TWO", "three"))
467+
}
468+
436469
}
437470

438471
}

akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicatedEventSourcingSpec.scala

Lines changed: 130 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,10 @@ package akka.persistence.typed
77
import java.util.concurrent.CountDownLatch
88
import java.util.concurrent.TimeUnit
99
import java.util.concurrent.atomic.AtomicInteger
10+
1011
import org.scalatest.concurrent.Eventually
1112
import org.scalatest.wordspec.AnyWordSpecLike
13+
1214
import akka.Done
1315
import akka.actor.testkit.typed.TestException
1416
import akka.actor.testkit.typed.scaladsl.LogCapturing
@@ -20,35 +22,53 @@ import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal
2022
import akka.persistence.testkit.scaladsl.PersistenceTestKit
2123
import akka.persistence.typed.scaladsl.{ Effect, EventSourcedBehavior, ReplicatedEventSourcing, ReplicationContext }
2224
import akka.serialization.jackson.CborSerializable
23-
2425
import scala.concurrent.ExecutionContext
2526
import scala.concurrent.Future
2627
import scala.concurrent.duration.DurationInt
2728

29+
import akka.actor.typed.scaladsl.ActorContext
30+
import akka.actor.typed.scaladsl.Behaviors
31+
import akka.persistence.typed.internal.ReplicatedEventMetadata
32+
import akka.persistence.typed.scaladsl.EventWithMetadata
33+
2834
object ReplicatedEventSourcingSpec {
2935

3036
val AllReplicas = Set(ReplicaId("R1"), ReplicaId("R2"), ReplicaId("R3"))
3137

3238
sealed trait Command
33-
case class GetState(replyTo: ActorRef[State]) extends Command
34-
case class StoreMe(description: String, replyTo: ActorRef[Done], latch: CountDownLatch = new CountDownLatch(1))
39+
final case class GetState(replyTo: ActorRef[State]) extends Command
40+
final case class StoreMe(description: String, replyTo: ActorRef[Done], latch: CountDownLatch = new CountDownLatch(1))
3541
extends Command
36-
case class StoreUs(descriptions: List[String], replyTo: ActorRef[Done], latch: CountDownLatch = new CountDownLatch(1))
42+
final case class StoreUs(
43+
descriptions: List[String],
44+
replyTo: ActorRef[Done],
45+
latch: CountDownLatch = new CountDownLatch(1))
3746
extends Command
38-
case class GetReplica(replyTo: ActorRef[(ReplicaId, Set[ReplicaId])]) extends Command
39-
case object Stop extends Command
47+
final case class StoreMeWithMeta(description: String, replyTo: ActorRef[Done], meta: Meta) extends Command
48+
final case class GetReplica(replyTo: ActorRef[(ReplicaId, Set[ReplicaId])]) extends Command
49+
final case object Stop extends Command
4050

41-
case class State(all: List[String]) extends CborSerializable
51+
final case class State(all: Vector[String]) extends CborSerializable
52+
53+
final case class Meta(value: String) extends CborSerializable
54+
55+
final case class EventAndContext(
56+
event: Any,
57+
origin: ReplicaId,
58+
recoveryRunning: Boolean,
59+
concurrent: Boolean,
60+
meta: Option[Meta] = None)
4261

4362
def testBehavior(entityId: String, replicaId: String, probe: ActorRef[EventAndContext]): Behavior[Command] =
4463
testBehavior(entityId, replicaId, Some(probe))
4564

4665
def eventSourcedBehavior(
66+
context: ActorContext[Command],
4767
replicationContext: ReplicationContext,
4868
probe: Option[ActorRef[EventAndContext]]): EventSourcedBehavior[Command, String, State] = {
4969
EventSourcedBehavior[Command, String, State](
5070
replicationContext.persistenceId,
51-
State(Nil),
71+
State(Vector.empty),
5272
(state, command) =>
5373
command match {
5474
case GetState(replyTo) =>
@@ -65,17 +85,21 @@ object ReplicatedEventSourcingSpec {
6585
latch.countDown()
6686
latch.await(10, TimeUnit.SECONDS)
6787
Effect.persist(evts).thenRun(_ => replyTo ! Done)
88+
case StoreMeWithMeta(evt, ack, meta) =>
89+
Effect.persistWithMetadata(EventWithMetadata(evt, meta)).thenRun(_ => ack ! Done)
6890
case Stop =>
6991
Effect.stop()
7092
},
7193
(state, event) => {
94+
val meta = EventSourcedBehavior.currentMetadata[Meta](context)
7295
probe.foreach(
7396
_ ! EventAndContext(
7497
event,
7598
replicationContext.origin,
7699
replicationContext.recoveryRunning,
77-
replicationContext.concurrent))
78-
state.copy(all = event :: state.all)
100+
replicationContext.concurrent,
101+
meta))
102+
state.copy(all = state.all :+ event)
79103
})
80104
}
81105

@@ -85,19 +109,35 @@ object ReplicatedEventSourcingSpec {
85109
probe: Option[ActorRef[EventAndContext]] = None,
86110
allReplicas: Set[ReplicaId] = AllReplicas,
87111
modifyBehavior: EventSourcedBehavior[Command, String, State] => EventSourcedBehavior[Command, String, State] =
88-
identity): Behavior[Command] =
89-
ReplicatedEventSourcing.commonJournalConfig(
90-
ReplicationId("ReplicatedEventSourcingSpec", entityId, ReplicaId(replicaId)),
91-
allReplicas,
92-
PersistenceTestKitReadJournal.Identifier)(replicationContext =>
93-
modifyBehavior(eventSourcedBehavior(replicationContext, probe)))
112+
identity): Behavior[Command] = {
113+
testBehaviorWithContext(entityId, replicaId, probe, allReplicas, (b, _) => modifyBehavior(b))
114+
}
115+
116+
def testBehaviorWithContext(
117+
entityId: String,
118+
replicaId: String,
119+
probe: Option[ActorRef[EventAndContext]] = None,
120+
allReplicas: Set[ReplicaId] = AllReplicas,
121+
modifyBehavior: (
122+
EventSourcedBehavior[Command, String, State],
123+
ActorContext[Command]) => EventSourcedBehavior[Command, String, State]): Behavior[Command] = {
124+
Behaviors.setup[Command] { context =>
125+
ReplicatedEventSourcing.commonJournalConfig(
126+
ReplicationId("ReplicatedEventSourcingSpec", entityId, ReplicaId(replicaId)),
127+
allReplicas,
128+
PersistenceTestKitReadJournal.Identifier) { replicationContext =>
129+
val behv = eventSourcedBehavior(context, replicationContext, probe)
130+
modifyBehavior(behv, context)
131+
}
132+
}
133+
}
94134

95135
def nonReplicatedEventSourcedBehavior(
96136
persistenceId: PersistenceId,
97137
probe: Option[ActorRef[String]]): EventSourcedBehavior[Command, String, State] = {
98138
EventSourcedBehavior[Command, String, State](
99139
persistenceId,
100-
State(Nil),
140+
State(Vector.empty),
101141
(state, command) =>
102142
command match {
103143
case GetState(replyTo) =>
@@ -113,12 +153,14 @@ object ReplicatedEventSourcingSpec {
113153
latch.countDown()
114154
latch.await(10, TimeUnit.SECONDS)
115155
Effect.persist(evts).thenRun(_ => replyTo ! Done)
156+
case StoreMeWithMeta(evt, ack, meta) =>
157+
Effect.persistWithMetadata(EventWithMetadata(evt, meta)).thenRun(_ => ack ! Done)
116158
case Stop =>
117159
Effect.stop()
118160
},
119161
(state, event) => {
120162
probe.foreach(_ ! event)
121-
state.copy(all = event :: state.all)
163+
state.copy(all = state.all :+ event)
122164
})
123165
}
124166

@@ -127,8 +169,6 @@ object ReplicatedEventSourcingSpec {
127169

128170
}
129171

130-
case class EventAndContext(event: Any, origin: ReplicaId, recoveryRunning: Boolean, concurrent: Boolean)
131-
132172
class ReplicatedEventSourcingSpec
133173
extends ScalaTestWithActorTestKit(PersistenceTestKitPlugin.config)
134174
with AnyWordSpecLike
@@ -490,9 +530,9 @@ class ReplicatedEventSourcingSpec
490530

491531
// ensure recovery is complete
492532
r1 ! GetState(stateProbe.ref)
493-
stateProbe.expectMessage(State(Nil))
533+
stateProbe.expectMessage(State(Vector.empty))
494534
r2 ! GetState(stateProbe.ref)
495-
stateProbe.expectMessage(State(Nil))
535+
stateProbe.expectMessage(State(Vector.empty))
496536

497537
// make reads fail for the replication
498538
testkit.failNextNReads(s"$entityId|R2", 1)
@@ -517,21 +557,21 @@ class ReplicatedEventSourcingSpec
517557
val r1 = spawn(testBehavior(entityId, "", allReplicas = allReplicas))
518558
val stateProbe = createTestProbe[State]()
519559
r1 ! GetState(stateProbe.ref)
520-
stateProbe.expectMessageType[State].all.reverse shouldEqual List("from es1")
560+
stateProbe.expectMessageType[State].all shouldEqual List("from es1")
521561

522562
r1 ! StoreMe("from r1", probe.ref)
523563
probe.expectMessage(Done)
524564
val r2 = spawn(testBehavior(entityId, "R2", allReplicas = allReplicas))
525565
eventually {
526566
r2 ! GetState(stateProbe.ref)
527-
stateProbe.expectMessageType[State].all.reverse shouldEqual List("from es1", "from r1")
567+
stateProbe.expectMessageType[State].all shouldEqual List("from es1", "from r1")
528568
}
529569

530570
r2 ! StoreMe("from r2", probe.ref)
531571
probe.expectMessage(Done)
532572
eventually {
533573
r1 ! GetState(stateProbe.ref)
534-
stateProbe.expectMessageType[State].all.reverse shouldEqual List("from es1", "from r1", "from r2")
574+
stateProbe.expectMessageType[State].all shouldEqual List("from es1", "from r1", "from r2")
535575
}
536576
}
537577

@@ -603,5 +643,70 @@ class ReplicatedEventSourcingSpec
603643
r2 ! StoreMe("from r2", probe.ref)
604644
probe.expectTerminated(r1)
605645
}
646+
647+
"transform replicated events between two entities" in {
648+
val entityId = nextEntityId
649+
val probe = createTestProbe[Done]()
650+
val eventProbe1 = createTestProbe[EventAndContext]()
651+
val eventProbe2 = createTestProbe[EventAndContext]()
652+
val addTransformation: (
653+
EventSourcedBehavior[Command, String, State],
654+
ActorContext[Command]) => EventSourcedBehavior[Command, String, State] = { (behv, context) =>
655+
behv.withReplicatedEventTransformation { (_, eventWithMeta) =>
656+
val resMeta1 = EventSourcedBehavior.currentMetadata[ReplicatedEventMetadata](context)
657+
val resMeta2 = eventWithMeta.metadata[ReplicatedEventMetadata]
658+
if (resMeta1 != resMeta2)
659+
throw new IllegalStateException(s"Expected RES metadata to be the same, $resMeta1 != $resMeta2")
660+
661+
val newMeta = eventWithMeta.metadata[Meta].map(m => m.copy(m.value.toUpperCase)).toList
662+
EventWithMetadata(eventWithMeta.event.toUpperCase, newMeta)
663+
}
664+
}
665+
val r1 = spawn(
666+
testBehaviorWithContext(entityId, "R1", probe = Some(eventProbe1.ref), modifyBehavior = addTransformation))
667+
val r2 = spawn(
668+
testBehaviorWithContext(entityId, "R2", probe = Some(eventProbe2.ref), modifyBehavior = addTransformation))
669+
670+
r1 ! StoreMeWithMeta("from r1", probe.ref, Meta("meta from r1"))
671+
eventProbe1.expectMessage(
672+
EventAndContext(
673+
"from r1",
674+
ReplicaId("R1"),
675+
recoveryRunning = false,
676+
concurrent = false,
677+
Some(Meta("meta from r1"))))
678+
// replicated to r2, and transformed
679+
eventProbe2.expectMessage(
680+
EventAndContext(
681+
"FROM R1",
682+
ReplicaId("R1"),
683+
recoveryRunning = false,
684+
concurrent = false,
685+
meta = Some(Meta("META FROM R1"))))
686+
687+
r2 ! StoreMeWithMeta("from r2", probe.ref, Meta("meta from r2"))
688+
eventProbe2.expectMessage(
689+
EventAndContext(
690+
"from r2",
691+
ReplicaId("R2"),
692+
recoveryRunning = false,
693+
concurrent = false,
694+
Some(Meta("meta from r2"))))
695+
// replicated to r1, and transformed
696+
eventProbe1.expectMessage(
697+
EventAndContext(
698+
"FROM R2",
699+
ReplicaId("R2"),
700+
recoveryRunning = false,
701+
concurrent = false,
702+
meta = Some(Meta("META FROM R2"))))
703+
704+
val stateProbe = createTestProbe[State]()
705+
r1 ! GetState(stateProbe.ref)
706+
stateProbe.expectMessage(State(Vector("from r1", "FROM R2")))
707+
708+
r2 ! GetState(stateProbe.ref)
709+
stateProbe.expectMessage(State(Vector("FROM R1", "from r2")))
710+
}
606711
}
607712
}

akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicationSnapshotSpec.scala

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import org.scalatest.wordspec.AnyWordSpecLike
1010

1111
import akka.Done
1212
import akka.actor.testkit.typed.scaladsl.{ LogCapturing, ScalaTestWithActorTestKit }
13+
import akka.actor.typed.scaladsl.Behaviors
1314
import akka.actor.typed.{ ActorRef, Behavior }
1415
import akka.persistence.testkit.{ PersistenceTestKitPlugin, PersistenceTestKitSnapshotPlugin }
1516
import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal
@@ -36,11 +37,15 @@ object ReplicationSnapshotSpec {
3637
entityId: String,
3738
replicaId: ReplicaId,
3839
probe: Option[ActorRef[EventAndContext]]): Behavior[Command] = {
39-
ReplicatedEventSourcing.commonJournalConfig(
40-
ReplicationId(EntityType, entityId, replicaId),
41-
AllReplicas,
42-
PersistenceTestKitReadJournal.Identifier)(replicationContext =>
43-
eventSourcedBehavior(replicationContext, probe).snapshotWhen((_, _, sequenceNr) => sequenceNr % 2 == 0))
40+
Behaviors.setup[Command] { context =>
41+
ReplicatedEventSourcing.commonJournalConfig(
42+
ReplicationId(EntityType, entityId, replicaId),
43+
AllReplicas,
44+
PersistenceTestKitReadJournal.Identifier)(
45+
replicationContext =>
46+
eventSourcedBehavior(context, replicationContext, probe).snapshotWhen((_, _, sequenceNr) =>
47+
sequenceNr % 2 == 0))
48+
}
4449

4550
}
4651
}
@@ -79,8 +84,8 @@ class ReplicationSnapshotSpec
7984
r2EventProbe.expectMessageType[EventAndContext]
8085
r2EventProbe.expectMessageType[EventAndContext]
8186

82-
snapshotTestKit.expectNextPersisted(persistenceIdR1, State(List("r1 2", "r1 1")))
83-
snapshotTestKit.expectNextPersisted(persistenceIdR2, State(List("r1 2", "r1 1")))
87+
snapshotTestKit.expectNextPersisted(persistenceIdR1, State(Vector("r1 1", "r1 2")))
88+
snapshotTestKit.expectNextPersisted(persistenceIdR2, State(Vector("r1 1", "r1 2")))
8489

8590
r2.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl(
8691
ReplicationId(EntityType, entityId, R1).persistenceId,
@@ -108,7 +113,7 @@ class ReplicationSnapshotSpec
108113

109114
val stateProbe = createTestProbe[State]()
110115
r2 ! GetState(stateProbe.ref)
111-
stateProbe.expectMessage(State(List("r1 2", "r1 1")))
116+
stateProbe.expectMessage(State(Vector("r1 1", "r1 2")))
112117
}
113118
}
114119
}

akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorWatchSpec.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ class EventSourcedBehaviorWatchSpec
7878
internalLoggerFactory = () => logger,
7979
retentionInProgress = false,
8080
EmptyEventSourcedBehaviorInstrumentation,
81+
None,
8182
None)
8283

8384
"A typed persistent parent actor watching a child" must {
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.typed.scaladsl.EventSourcedBehavior.withReplicatedEventTransformation")
2+
ProblemFilters.exclude[Problem]("akka.persistence.typed.internal.JournalInteractions#EventToPersist.*")

0 commit comments

Comments
 (0)