Skip to content

Commit bc9758d

Browse files
authored
feat: API for dropping specific metadata from the EventEnvelope (#32664)
* feat: API for dropping specific metadata from the EventEnvelope * Actually correct impl and tests, cover untyped EventEnvelope as well
1 parent 2702ecd commit bc9758d

File tree

4 files changed

+165
-9
lines changed

4 files changed

+165
-9
lines changed

akka-persistence-query/src/main/scala/akka/persistence/query/EventEnvelope.scala

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,26 @@ final class EventEnvelope(
9999
metadata.toJava
100100
}
101101

102+
/**
103+
* Java API: Drop metadata of the given type if present
104+
*/
105+
def removeMetadata(metadataType: Class[_]): EventEnvelope =
106+
removeMetadata(ClassTag(metadataType))
107+
108+
/**
109+
* Scala API: Drop metadata of the given type if present
110+
*/
111+
def removeMetadata[M](implicit classTag: ClassTag[M]): EventEnvelope = {
112+
internalEventMetadata match {
113+
case Some(c: CompositeMetadata) =>
114+
val filtered = c.entries.filterNot(_.getClass == classTag.runtimeClass)
115+
if (filtered eq c.entries) this
116+
else internalCopy(internalEventMetadata = Some(CompositeMetadata(filtered)))
117+
case Some(m) if m.getClass == classTag.runtimeClass => internalCopy(internalEventMetadata = None)
118+
case _ => this
119+
}
120+
}
121+
102122
/**
103123
* INTERNAL API
104124
*/
@@ -139,9 +159,24 @@ final class EventEnvelope(
139159
event: Any = this.event): EventEnvelope =
140160
new EventEnvelope(offset, persistenceId, sequenceNr, event, timestamp, this._eventMetadata)
141161

162+
private def internalCopy(
163+
offset: Offset = this.offset,
164+
persistenceId: String = this.persistenceId,
165+
sequenceNr: Long = this.sequenceNr,
166+
event: Any = this.event,
167+
internalEventMetadata: Option[Any]): EventEnvelope =
168+
new EventEnvelope(offset, persistenceId, sequenceNr, event, timestamp, internalEventMetadata)
169+
142170
@InternalApi
143171
private[akka] def withMetadata(metadata: Any): EventEnvelope =
144-
new EventEnvelope(offset, persistenceId, sequenceNr, event, timestamp, Some(metadata))
172+
_eventMetadata match {
173+
case Some(c: CompositeMetadata) =>
174+
internalCopy(internalEventMetadata = Some(CompositeMetadata(metadata +: c.entries)))
175+
case Some(other) =>
176+
internalCopy(internalEventMetadata = Some(CompositeMetadata(metadata :: other :: Nil)))
177+
case None =>
178+
internalCopy(internalEventMetadata = Option(metadata))
179+
}
145180

146181
// Product4, for binary compatibility (used to be a case class)
147182
override def productPrefix = "EventEnvelope"

akka-persistence-query/src/main/scala/akka/persistence/query/typed/EventEnvelope.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,26 @@ final class EventEnvelope[Event](
256256
metadata.toJava
257257
}
258258

259+
/**
260+
* Java API: Drop metadata of the given type if present
261+
*/
262+
def removeMetadata(metadataType: Class[_]): EventEnvelope[Event] =
263+
removeMetadata(ClassTag(metadataType))
264+
265+
/**
266+
* Scala API: Drop metadata of the given type if present
267+
*/
268+
def removeMetadata[M](implicit classTag: ClassTag[M]): EventEnvelope[Event] = {
269+
internalEventMetadata match {
270+
case Some(c: CompositeMetadata) =>
271+
val filtered = c.entries.filterNot(_.getClass == classTag.runtimeClass)
272+
if (filtered eq c.entries) this
273+
else copy(eventMetadata = Some(CompositeMetadata(filtered)))
274+
case Some(m) if m.getClass == classTag.runtimeClass => copy(eventMetadata = None)
275+
case _ => this
276+
}
277+
}
278+
259279
/**
260280
* INTERNAL API
261281
*/
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* Copyright (C) 2025 Lightbend Inc. <https://www.lightbend.com>
3+
*/
4+
5+
package akka.persistence.query
6+
7+
import org.scalatest.matchers.should.Matchers
8+
import org.scalatest.wordspec.AnyWordSpecLike
9+
10+
import java.time.Instant
11+
import java.util.Optional
12+
import scala.annotation.nowarn
13+
14+
@nowarn("msg=deprecated")
15+
class EventEnvelopeSpec extends AnyWordSpecLike with Matchers {
16+
"EventEnvelope" must {
17+
"support single event metadata" in {
18+
val env = new EventEnvelope(
19+
offset = NoOffset,
20+
persistenceId = "pid",
21+
sequenceNr = 1L,
22+
event = "evt",
23+
System.currentTimeMillis(),
24+
_eventMetadata = Some("meta"))
25+
26+
env.metadata[String] shouldBe Some("meta")
27+
env.metadata[java.time.Instant] shouldBe None
28+
env.metadata[AnyRef] shouldBe None
29+
env.eventMetadata shouldBe Some("meta") // deprecated
30+
env.removeMetadata[String].metadata[String] shouldBe None
31+
env.removeMetadata[AnyRef] shouldBe theSameInstanceAs(env)
32+
33+
// Java API
34+
env.getMetadata(classOf[String]) shouldBe Optional.of("meta")
35+
env.getMetadata(classOf[java.time.Instant]) shouldBe Optional.empty
36+
env.getMetadata(classOf[AnyRef]) shouldBe Optional.empty
37+
env.getEventMetaData() shouldBe Optional.of("meta") // deprecated
38+
env.removeMetadata(classOf[String]).getMetadata(classOf[String]) shouldBe Optional.empty()
39+
env.removeMetadata(classOf[AnyRef]) shouldBe theSameInstanceAs(env)
40+
}
41+
42+
"support composite event metadata" in {
43+
val env =
44+
EventEnvelope(offset = NoOffset, persistenceId = "pid", sequenceNr = 1L, "evt", System.currentTimeMillis())
45+
46+
env.metadata[String] shouldBe None
47+
env.eventMetadata shouldBe None // deprecated
48+
49+
val env2 = env.withMetadata("meta")
50+
env2.metadata[String] shouldBe Some("meta")
51+
env2.metadata[java.time.Instant] shouldBe None
52+
env2.eventMetadata shouldBe Some("meta") // deprecated
53+
54+
val instant = Instant.now()
55+
val env3 = env2.withMetadata(instant)
56+
env3.metadata[String] shouldBe Some("meta")
57+
env3.metadata[java.time.Instant] shouldBe Some(instant)
58+
env3.metadata[AnyRef] shouldBe None
59+
// For backwards compatibility this will use the metadata that was added last (ReplicatedEventMetaData)
60+
env3.eventMetadata shouldBe Some(instant) // deprecated
61+
62+
// in case same class is added again the last will be used
63+
val instant2 = instant.plusSeconds(1)
64+
val env4 = env3.withMetadata(instant2)
65+
env4.metadata[String] shouldBe Some("meta")
66+
env4.metadata[java.time.Instant] shouldBe Some(instant2)
67+
env4.eventMetadata shouldBe Some(instant2) // deprecated
68+
69+
env4.removeMetadata[String].metadata[String] shouldBe None
70+
env4.removeMetadata[AnyRef] shouldBe theSameInstanceAs(env4)
71+
72+
// Java API
73+
env.getMetadata(classOf[String]) shouldBe Optional.empty
74+
env.getEventMetaData() shouldBe Optional.empty // deprecated
75+
76+
env2.getMetadata(classOf[String]) shouldBe Optional.of("meta")
77+
env2.getMetadata(classOf[java.time.Instant]) shouldBe Optional.empty
78+
env2.getEventMetaData() shouldBe Optional.of("meta") // deprecated
79+
80+
env3.getMetadata(classOf[String]) shouldBe Optional.of("meta")
81+
env3.getMetadata(classOf[java.time.Instant]) shouldBe Optional.of(instant)
82+
env3.getMetadata(classOf[AnyRef]) shouldBe Optional.empty
83+
env3.getEventMetaData() shouldBe Optional.of(instant) // deprecated
84+
85+
env4.getMetadata(classOf[String]) shouldBe Optional.of("meta")
86+
env4.getMetadata(classOf[java.time.Instant]) shouldBe Optional.of(instant2)
87+
env4.getEventMetaData() shouldBe Optional.of(instant2) // deprecated
88+
89+
env4.removeMetadata(classOf[String]).getMetadata(classOf[String]) shouldBe Optional.empty()
90+
env4.removeMetadata(classOf[AnyRef]) shouldBe theSameInstanceAs(env4)
91+
}
92+
}
93+
94+
}

akka-persistence-query/src/test/scala/akka/persistence/query/typed/internal/EventEnvelopeSpec.scala renamed to akka-persistence-query/src/test/scala/akka/persistence/query/typed/EventEnvelopeSpec.scala

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,15 @@
22
* Copyright (C) 2025 Lightbend Inc. <https://www.lightbend.com>
33
*/
44

5-
package akka.persistence.query.typed.internal
6-
7-
import java.time.Instant
8-
import java.util.Optional
9-
10-
import scala.annotation.nowarn
5+
package akka.persistence.query.typed
116

7+
import akka.persistence.query.NoOffset
128
import org.scalatest.matchers.should.Matchers
139
import org.scalatest.wordspec.AnyWordSpecLike
1410

15-
import akka.persistence.query.NoOffset
16-
import akka.persistence.query.typed.EventEnvelope
11+
import java.time.Instant
12+
import java.util.Optional
13+
import scala.annotation.nowarn
1714

1815
@nowarn("msg=deprecated")
1916
class EventEnvelopeSpec extends AnyWordSpecLike with Matchers {
@@ -36,12 +33,16 @@ class EventEnvelopeSpec extends AnyWordSpecLike with Matchers {
3633
env.metadata[java.time.Instant] shouldBe None
3734
env.metadata[AnyRef] shouldBe None
3835
env.eventMetadata shouldBe Some("meta") // deprecated
36+
env.removeMetadata[String].metadata[String] shouldBe None
37+
env.removeMetadata[AnyRef] shouldBe theSameInstanceAs(env)
3938

4039
// Java API
4140
env.getMetadata(classOf[String]) shouldBe Optional.of("meta")
4241
env.getMetadata(classOf[java.time.Instant]) shouldBe Optional.empty
4342
env.getMetadata(classOf[AnyRef]) shouldBe Optional.empty
4443
env.getEventMetaData() shouldBe Optional.of("meta") // deprecated
44+
env.removeMetadata(classOf[String]).getMetadata(classOf[String]) shouldBe Optional.empty()
45+
env.removeMetadata(classOf[AnyRef]) shouldBe theSameInstanceAs(env)
4546
}
4647

4748
"support composite event metadata" in {
@@ -77,6 +78,9 @@ class EventEnvelopeSpec extends AnyWordSpecLike with Matchers {
7778
env4.metadata[java.time.Instant] shouldBe Some(instant2)
7879
env4.eventMetadata shouldBe Some(instant2) // deprecated
7980

81+
env4.removeMetadata[String].metadata[String] shouldBe None
82+
env4.removeMetadata[AnyRef] shouldBe theSameInstanceAs(env4)
83+
8084
// Java API
8185
env.getMetadata(classOf[String]) shouldBe Optional.empty
8286
env.getEventMetaData() shouldBe Optional.empty // deprecated
@@ -93,6 +97,9 @@ class EventEnvelopeSpec extends AnyWordSpecLike with Matchers {
9397
env4.getMetadata(classOf[String]) shouldBe Optional.of("meta")
9498
env4.getMetadata(classOf[java.time.Instant]) shouldBe Optional.of(instant2)
9599
env4.getEventMetaData() shouldBe Optional.of(instant2) // deprecated
100+
101+
env4.removeMetadata(classOf[String]).getMetadata(classOf[String]) shouldBe Optional.empty()
102+
env4.removeMetadata(classOf[AnyRef]) shouldBe theSameInstanceAs(env4)
96103
}
97104
}
98105

0 commit comments

Comments
 (0)