Skip to content

Commit 2686dcb

Browse files
authored
feat: Persist metadata together with event (#32628)
* we only used it for ReplicatedEventMetadata and there was no public api to add custom metadata * adding composite metadata to be able to store more than one metadata entry * public api to persist event with metadata * public api to retrieve metadata from EventEnvelope * public api to retrieve metadata in EventSourcedBehavior, e.g. event handler during replay * include metadata in PublishedEventImpl, and serialization * store metadata with snapshot * Recovery.default can't be used with Java * InternalStableApi for some things used from Akka Projection * regenerate reflect-config.json
1 parent 27eee92 commit 2686dcb

File tree

39 files changed

+1968
-185
lines changed

39 files changed

+1968
-185
lines changed

akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ReplicatedShardingDirectReplicationSpec.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ class ReplicatedShardingDirectReplicationSpec extends ScalaTestWithActorTestKit
4444
1L,
4545
"event",
4646
System.currentTimeMillis(),
47-
Some(new ReplicatedPublishedEventMetaData(ReplicaId("ReplicaA"), VersionVector.empty)),
47+
Some(new ReplicatedPublishedEventMetaData(ReplicaId("ReplicaA"), VersionVector.empty, None)),
4848
None)
4949
system.eventStream ! EventStream.Publish(event)
5050

@@ -71,7 +71,7 @@ class ReplicatedShardingDirectReplicationSpec extends ScalaTestWithActorTestKit
7171
1L,
7272
"event",
7373
System.currentTimeMillis(),
74-
Some(new ReplicatedPublishedEventMetaData(ReplicaId("ReplicaA"), VersionVector.empty)),
74+
Some(new ReplicatedPublishedEventMetaData(ReplicaId("ReplicaA"), VersionVector.empty, None)),
7575
None)
7676
system.eventStream ! EventStream.Publish(event)
7777

@@ -96,7 +96,7 @@ class ReplicatedShardingDirectReplicationSpec extends ScalaTestWithActorTestKit
9696
1L,
9797
"event",
9898
System.currentTimeMillis(),
99-
Some(new ReplicatedPublishedEventMetaData(ReplicaId("ReplicaA"), VersionVector.empty)),
99+
Some(new ReplicatedPublishedEventMetaData(ReplicaId("ReplicaA"), VersionVector.empty, None)),
100100
None)
101101
system.eventStream ! EventStream.Publish(event)
102102

akka-persistence-query/src/main/scala/akka/persistence/query/EventEnvelope.scala

Lines changed: 46 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,12 @@ package akka.persistence.query
66

77
import java.util.Optional
88

9+
import scala.reflect.ClassTag
910
import scala.runtime.AbstractFunction4
1011

1112
import akka.annotation.InternalApi
13+
import akka.annotation.InternalStableApi
14+
import akka.persistence.CompositeMetadata
1215
import akka.util.HashCode
1316

1417
// for binary compatibility (used to be a case class)
@@ -47,7 +50,8 @@ final class EventEnvelope(
4750
val sequenceNr: Long,
4851
val event: Any,
4952
val timestamp: Long,
50-
val eventMetadata: Option[Any])
53+
@deprecatedName("eventMetadata")
54+
_eventMetadata: Option[Any])
5155
extends Product4[Offset, String, Long, Any]
5256
with Serializable {
5357

@@ -59,14 +63,48 @@ final class EventEnvelope(
5963
def this(offset: Offset, persistenceId: String, sequenceNr: Long, event: Any, timestamp: Long) =
6064
this(offset, persistenceId, sequenceNr, event, timestamp, None)
6165

66+
/**
67+
* Scala API
68+
*/
69+
@deprecated("Use metadata with metadataType parameter")
70+
def eventMetadata: Option[Any] = {
71+
// For backwards compatibility this will use the metadata that was added last (ReplicatedEventMetaData)
72+
_eventMetadata.collect {
73+
case CompositeMetadata(entries) => entries.head
74+
case other => other
75+
}
76+
}
77+
6278
/**
6379
* Java API
6480
*/
65-
def getEventMetaData(): Optional[Any] = {
81+
@deprecated("Use getMetadata with metadataType parameter")
82+
def getEventMetaData(): Optional[AnyRef] = {
83+
import scala.jdk.OptionConverters._
84+
eventMetadata.map(_.asInstanceOf[AnyRef]).toJava
85+
}
86+
87+
/**
88+
* Scala API: The metadata of a given type that is associated with the event.
89+
*/
90+
def metadata[M: ClassTag]: Option[M] =
91+
CompositeMetadata.extract[M](_eventMetadata)
92+
93+
/**
94+
* Java API: The metadata of a given type that is associated with the event.
95+
*/
96+
def getMetadata[M](metadataType: Class[M]): Optional[M] = {
6697
import scala.jdk.OptionConverters._
67-
eventMetadata.toJava
98+
implicit val ct: ClassTag[M] = ClassTag(metadataType)
99+
metadata.toJava
68100
}
69101

102+
/**
103+
* INTERNAL API
104+
*/
105+
@InternalStableApi private[akka] def internalEventMetadata: Option[Any] =
106+
_eventMetadata
107+
70108
override def hashCode(): Int = {
71109
var result = HashCode.SEED
72110
result = HashCode.hash(result, offset)
@@ -85,9 +123,10 @@ final class EventEnvelope(
85123

86124
override def toString: String = {
87125
val eventStr = event.getClass.getName
88-
val metaStr = eventMetadata match {
89-
case Some(meta) => meta.getClass.getName
90-
case None => ""
126+
val metaStr = _eventMetadata match {
127+
case Some(CompositeMetadata(entries)) => entries.map(_.getClass.getName).mkString("[", ",", "]")
128+
case Some(other) => other.getClass.getName
129+
case None => ""
91130
}
92131
s"EventEnvelope($offset,$persistenceId,$sequenceNr,$eventStr,$timestamp,$metaStr)"
93132
}
@@ -98,7 +137,7 @@ final class EventEnvelope(
98137
persistenceId: String = this.persistenceId,
99138
sequenceNr: Long = this.sequenceNr,
100139
event: Any = this.event): EventEnvelope =
101-
new EventEnvelope(offset, persistenceId, sequenceNr, event, timestamp, this.eventMetadata)
140+
new EventEnvelope(offset, persistenceId, sequenceNr, event, timestamp, this._eventMetadata)
102141

103142
@InternalApi
104143
private[akka] def withMetadata(metadata: Any): EventEnvelope =

akka-persistence-query/src/main/scala/akka/persistence/query/internal/QuerySerializer.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ import scala.jdk.CollectionConverters._
8585
}
8686

8787
env.eventOption.foreach(event => builder.setEvent(payloadBuilder(event, serialization, log)))
88-
env.eventMetadata.foreach(meta => builder.setMetadata(payloadBuilder(meta, serialization, log)))
88+
env.internalEventMetadata.foreach(meta => builder.setMetadata(payloadBuilder(meta, serialization, log)))
8989

9090
builder.build().toByteArray()
9191

akka-persistence-query/src/main/scala/akka/persistence/query/typed/EventEnvelope.scala

Lines changed: 57 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,10 @@ import java.util.Optional
1010
import akka.persistence.query.Offset
1111
import akka.util.HashCode
1212
import scala.jdk.CollectionConverters._
13+
import scala.reflect.ClassTag
14+
15+
import akka.annotation.InternalStableApi
16+
import akka.persistence.CompositeMetadata
1317

1418
object EventEnvelope {
1519

@@ -126,7 +130,8 @@ final class EventEnvelope[Event](
126130
val sequenceNr: Long,
127131
val eventOption: Option[Event],
128132
val timestamp: Long,
129-
val eventMetadata: Option[Any],
133+
@deprecatedName("eventMetadata")
134+
_eventMetadata: Option[Any],
130135
val entityType: String,
131136
val slice: Int,
132137
val filtered: Boolean,
@@ -215,14 +220,48 @@ final class EventEnvelope[Event](
215220
eventOption.toJava
216221
}
217222

223+
/**
224+
* Scala API
225+
*/
226+
@deprecated("Use metadata with metadataType parameter")
227+
def eventMetadata: Option[Any] = {
228+
// For backwards compatibility this will use the metadata that was added last (ReplicatedEventMetaData)
229+
_eventMetadata.collect {
230+
case CompositeMetadata(entries) => entries.head
231+
case other => other
232+
}
233+
}
234+
218235
/**
219236
* Java API
220237
*/
238+
@deprecated("Use getMetadata with metadataType parameter")
221239
def getEventMetaData(): Optional[AnyRef] = {
222240
import scala.jdk.OptionConverters._
223241
eventMetadata.map(_.asInstanceOf[AnyRef]).toJava
224242
}
225243

244+
/**
245+
* Scala API: The metadata of a given type that is associated with the event.
246+
*/
247+
def metadata[M: ClassTag]: Option[M] =
248+
CompositeMetadata.extract[M](_eventMetadata)
249+
250+
/**
251+
* Java API: The metadata of a given type that is associated with the event.
252+
*/
253+
def getMetadata[M](metadataType: Class[M]): Optional[M] = {
254+
import scala.jdk.OptionConverters._
255+
implicit val ct: ClassTag[M] = ClassTag(metadataType)
256+
metadata.toJava
257+
}
258+
259+
/**
260+
* INTERNAL API
261+
*/
262+
@InternalStableApi private[akka] def internalEventMetadata: Option[Any] =
263+
_eventMetadata
264+
226265
/**
227266
* Java API:
228267
*/
@@ -248,16 +287,25 @@ final class EventEnvelope[Event](
248287
def withTags(tags: Set[String]): EventEnvelope[Event] =
249288
copy(tags = tags)
250289

251-
def withMetadata(metadata: Any): EventEnvelope[Event] =
252-
copy(eventMetadata = Option(metadata))
290+
def withMetadata(metadata: Any): EventEnvelope[Event] = {
291+
_eventMetadata match {
292+
case Some(c: CompositeMetadata) =>
293+
copy(eventMetadata = Some(CompositeMetadata(metadata +: c.entries)))
294+
case Some(other) =>
295+
copy(eventMetadata = Some(CompositeMetadata(metadata :: other :: Nil)))
296+
case None =>
297+
copy(eventMetadata = Option(metadata))
298+
}
299+
300+
}
253301

254302
private def copy(
255303
offset: Offset = offset,
256304
persistenceId: String = persistenceId,
257305
sequenceNr: Long = sequenceNr,
258306
eventOption: Option[Event] = eventOption,
259307
timestamp: Long = timestamp,
260-
eventMetadata: Option[Any] = eventMetadata,
308+
eventMetadata: Option[Any] = _eventMetadata,
261309
entityType: String = entityType,
262310
slice: Int = slice,
263311
filtered: Boolean = filtered,
@@ -288,7 +336,7 @@ final class EventEnvelope[Event](
288336
override def equals(obj: Any): Boolean = obj match {
289337
case other: EventEnvelope[_] =>
290338
offset == other.offset && persistenceId == other.persistenceId && sequenceNr == other.sequenceNr &&
291-
eventOption == other.eventOption && timestamp == other.timestamp && eventMetadata == other.eventMetadata &&
339+
eventOption == other.eventOption && timestamp == other.timestamp && _eventMetadata == other.internalEventMetadata &&
292340
entityType == other.entityType && slice == other.slice && filtered == other.filtered &&
293341
tags == other.tags
294342
case _ => false
@@ -299,9 +347,10 @@ final class EventEnvelope[Event](
299347
case Some(evt) => evt.getClass.getName
300348
case None => ""
301349
}
302-
val metaStr = eventMetadata match {
303-
case Some(meta) => meta.getClass.getName
304-
case None => ""
350+
val metaStr = _eventMetadata match {
351+
case Some(CompositeMetadata(entries)) => entries.map(_.getClass.getName).mkString("[", ",", "]")
352+
case Some(other) => other.getClass.getName
353+
case None => ""
305354
}
306355
s"EventEnvelope($offset,$persistenceId,$sequenceNr,$eventStr,$timestamp,$metaStr,$entityType,$slice,$filtered,$source,${tags
307356
.mkString("[", ", ", "]")})"
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
* Copyright (C) 2024 Lightbend Inc. <https://www.lightbend.com>
3+
*/
4+
5+
package akka.persistence.query.typed.internal
6+
7+
import java.time.Instant
8+
import java.util.Optional
9+
10+
import scala.annotation.nowarn
11+
12+
import org.scalatest.matchers.should.Matchers
13+
import org.scalatest.wordspec.AnyWordSpecLike
14+
15+
import akka.persistence.query.NoOffset
16+
import akka.persistence.query.typed.EventEnvelope
17+
18+
@nowarn("msg=deprecated")
19+
class EventEnvelopeSpec extends AnyWordSpecLike with Matchers {
20+
"EventEnvelope" must {
21+
"support single event metadata" in {
22+
val env = new EventEnvelope[String](
23+
offset = NoOffset,
24+
persistenceId = "pid",
25+
sequenceNr = 1L,
26+
eventOption = Some("evt"),
27+
System.currentTimeMillis(),
28+
_eventMetadata = Some("meta"),
29+
entityType = "E",
30+
slice = 0,
31+
filtered = false,
32+
source = "",
33+
tags = Set.empty)
34+
35+
env.metadata[String] shouldBe Some("meta")
36+
env.metadata[java.time.Instant] shouldBe None
37+
env.metadata[AnyRef] shouldBe None
38+
env.eventMetadata shouldBe Some("meta") // deprecated
39+
40+
// Java API
41+
env.getMetadata(classOf[String]) shouldBe Optional.of("meta")
42+
env.getMetadata(classOf[java.time.Instant]) shouldBe Optional.empty
43+
env.getMetadata(classOf[AnyRef]) shouldBe Optional.empty
44+
env.getEventMetaData() shouldBe Optional.of("meta") // deprecated
45+
}
46+
47+
"support composite event metadata" in {
48+
val env = EventEnvelope(
49+
offset = NoOffset,
50+
persistenceId = "pid",
51+
sequenceNr = 1L,
52+
"evt",
53+
System.currentTimeMillis(),
54+
entityType = "E",
55+
slice = 0)
56+
57+
env.metadata[String] shouldBe None
58+
env.eventMetadata shouldBe None // deprecated
59+
60+
val env2 = env.withMetadata("meta")
61+
env2.metadata[String] shouldBe Some("meta")
62+
env2.metadata[java.time.Instant] shouldBe None
63+
env2.eventMetadata shouldBe Some("meta") // deprecated
64+
65+
val instant = Instant.now()
66+
val env3 = env2.withMetadata(instant)
67+
env3.metadata[String] shouldBe Some("meta")
68+
env3.metadata[java.time.Instant] shouldBe Some(instant)
69+
env3.metadata[AnyRef] shouldBe None
70+
// For backwards compatibility this will use the metadata that was added last (ReplicatedEventMetaData)
71+
env3.eventMetadata shouldBe Some(instant) // deprecated
72+
73+
// in case same class is added again the last will be used
74+
val instant2 = instant.plusSeconds(1)
75+
val env4 = env3.withMetadata(instant2)
76+
env4.metadata[String] shouldBe Some("meta")
77+
env4.metadata[java.time.Instant] shouldBe Some(instant2)
78+
env4.eventMetadata shouldBe Some(instant2) // deprecated
79+
80+
// Java API
81+
env.getMetadata(classOf[String]) shouldBe Optional.empty
82+
env.getEventMetaData() shouldBe Optional.empty // deprecated
83+
84+
env2.getMetadata(classOf[String]) shouldBe Optional.of("meta")
85+
env2.getMetadata(classOf[java.time.Instant]) shouldBe Optional.empty
86+
env2.getEventMetaData() shouldBe Optional.of("meta") // deprecated
87+
88+
env3.getMetadata(classOf[String]) shouldBe Optional.of("meta")
89+
env3.getMetadata(classOf[java.time.Instant]) shouldBe Optional.of(instant)
90+
env3.getMetadata(classOf[AnyRef]) shouldBe Optional.empty
91+
env3.getEventMetaData() shouldBe Optional.of(instant) // deprecated
92+
93+
env4.getMetadata(classOf[String]) shouldBe Optional.of("meta")
94+
env4.getMetadata(classOf[java.time.Instant]) shouldBe Optional.of(instant2)
95+
env4.getEventMetaData() shouldBe Optional.of(instant2) // deprecated
96+
}
97+
}
98+
99+
}

0 commit comments

Comments
 (0)