File tree Expand file tree Collapse file tree 2 files changed +85
-0
lines changed
akka-persistence-query/src
main/scala/akka/persistence/query
test/scala/akka/persistence/query Expand file tree Collapse file tree 2 files changed +85
-0
lines changed Original file line number Diff line number Diff line change 1+ /*
2+ * Copyright (C) 2009-2025 Lightbend Inc. <https://www.lightbend.com>
3+ */
4+
5+ package akka .persistence .query
6+
7+ /**
8+ * (Optional) mechanism for query implementations to pick up a correlation id from the caller, to use in logging and
9+ * error messages. Used by akka-projections to make correlating projection logs with debug and trace logging from the
10+ * underlying akka persistence query implementations possible.
11+ */
12+ object QueryCorrelationId {
13+
14+ private val threadLocal = new ThreadLocal [String ]
15+
16+ /**
17+ * Expected to be used "around" calls to plugin query method, will clear the correlation id from thread local
18+ * to make sure there is no leak between logic executed on shared threads.
19+ *
20+ * @param correlationId
21+ */
22+ def withCorrelationId [T ](correlationId : String )(block : () => T ): T = {
23+ threadLocal.set(correlationId)
24+ try {
25+ block()
26+ } finally {
27+ threadLocal.remove()
28+ }
29+ }
30+
31+ /**
32+ * @return Expected to be called directly after receiving a query call, before starting any asynchronous tasks,
33+ * returns and clears out the correlation id to make sure there is no leak between tasks. Further passing
34+ * around of the uuid inside the query plugin implementation is up to the implementer.
35+ */
36+ def get (): Option [String ] =
37+ Option (threadLocal.get)
38+
39+ }
Original file line number Diff line number Diff line change 1+ /*
2+ * Copyright (C) 2009-2025 Lightbend Inc. <https://www.lightbend.com>
3+ */
4+
5+ package akka .persistence .query
6+
7+ import akka .testkit .TestException
8+ import org .scalatest .matchers .should .Matchers
9+ import org .scalatest .wordspec .AnyWordSpecLike
10+
11+ import java .util .UUID
12+
13+ class QueryCorrelationIdSpec extends AnyWordSpecLike with Matchers {
14+
15+ def pretendQueryMethod (): Option [String ] =
16+ QueryCorrelationId .get()
17+
18+ " The query correlation id utility" should {
19+
20+ " pass and clear correlation id" in {
21+ val uuid = UUID .randomUUID().toString
22+ val observed =
23+ QueryCorrelationId .withCorrelationId(uuid) { () =>
24+ pretendQueryMethod()
25+ }
26+ observed shouldEqual Some (uuid)
27+
28+ // cleared after returning
29+ QueryCorrelationId .get() shouldBe None
30+ }
31+
32+ " clear correlation id when call fails" in {
33+ val uuid = UUID .randomUUID().toString
34+ intercept[TestException ] {
35+ QueryCorrelationId .withCorrelationId(uuid) { () =>
36+ throw TestException (" boom" )
37+ }
38+ }
39+
40+ // cleared after throwing
41+ QueryCorrelationId .get() shouldBe None
42+ }
43+
44+ }
45+
46+ }
You can’t perform that action at this time.
0 commit comments