Skip to content

Commit cc7862b

Browse files
authored
feat: Add LatestEventTimestampQuery (#32700)
* Retrieve the latest timestamp for an entity type and slice range. * Primary purpose is for observability.
1 parent ec392d1 commit cc7862b

File tree

4 files changed

+59
-2
lines changed

4 files changed

+59
-2
lines changed

akka-persistence-query/src/main/scala/akka/persistence/query/typed/javadsl/EventsBySliceFirehoseQuery.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ final class EventsBySliceFirehoseQuery(delegate: scaladsl.EventsBySliceFirehoseQ
4848
with EventsBySliceQuery
4949
with EventsBySliceStartingFromSnapshotsQuery
5050
with EventTimestampQuery
51-
with LoadEventQuery {
51+
with LoadEventQuery
52+
with LatestEventTimestampQuery {
5253

5354
override def sliceForPersistenceId(persistenceId: String): Int =
5455
delegate.sliceForPersistenceId(persistenceId)
@@ -82,4 +83,10 @@ final class EventsBySliceFirehoseQuery(delegate: scaladsl.EventsBySliceFirehoseQ
8283
override def loadEnvelope[Event](persistenceId: String, sequenceNr: Long): CompletionStage[EventEnvelope[Event]] =
8384
delegate.loadEnvelope[Event](persistenceId, sequenceNr).asJava
8485

86+
override def latestEventTimestamp(
87+
entityType: String,
88+
minSlice: Int,
89+
maxSlice: Int): CompletionStage[Optional[Instant]] =
90+
delegate.latestEventTimestamp(entityType, minSlice, maxSlice).map(_.toJava)(ExecutionContext.parasitic).asJava
91+
8592
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
* Copyright (C) 2021-2025 Lightbend Inc. <https://www.lightbend.com>
3+
*/
4+
5+
package akka.persistence.query.typed.javadsl
6+
7+
import java.time.Instant
8+
import java.util.Optional
9+
import java.util.concurrent.CompletionStage
10+
11+
import akka.persistence.query.javadsl.ReadJournal
12+
13+
/**
14+
* Retrieve the latest timestamp for an entity type and slice range.
15+
*/
16+
trait LatestEventTimestampQuery extends ReadJournal {
17+
18+
def latestEventTimestamp(entityType: String, minSlice: Int, maxSlice: Int): CompletionStage[Optional[Instant]]
19+
20+
}

akka-persistence-query/src/main/scala/akka/persistence/query/typed/scaladsl/EventsBySliceFirehoseQuery.scala

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,8 @@ final class EventsBySliceFirehoseQuery(system: ExtendedActorSystem, config: Conf
5050
with EventsBySliceQuery
5151
with EventsBySliceStartingFromSnapshotsQuery
5252
with EventTimestampQuery
53-
with LoadEventQuery {
53+
with LoadEventQuery
54+
with LatestEventTimestampQuery {
5455

5556
private lazy val persistenceExt = Persistence(system)
5657
private lazy val settings = EventsBySliceFirehose.Settings(system, cfgPath)
@@ -100,6 +101,15 @@ final class EventsBySliceFirehoseQuery(system: ExtendedActorSystem, config: Conf
100101
"doesn't implement LoadEventQuery")
101102
}
102103

104+
override def latestEventTimestamp(entityType: String, minSlice: Int, maxSlice: Int): Future[Option[Instant]] = {
105+
eventsBySliceQuery match {
106+
case q: LatestEventTimestampQuery => q.latestEventTimestamp(entityType, minSlice, maxSlice)
107+
case _ =>
108+
throw new IllegalArgumentException(
109+
s"Underlying ReadJournal [${settings.delegateQueryPluginId}] doesn't implement LatestEventTimestampQuery")
110+
}
111+
}
112+
103113
private def eventsBySliceQuery: EventsBySliceQuery = {
104114
val delegateQueryPluginId =
105115
EventsBySliceFirehose.Settings.delegateQueryPluginId(system.settings.config.getConfig(cfgPath))
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
* Copyright (C) 2021-2025 Lightbend Inc. <https://www.lightbend.com>
3+
*/
4+
5+
package akka.persistence.query.typed.scaladsl
6+
7+
import java.time.Instant
8+
9+
import scala.concurrent.Future
10+
11+
import akka.persistence.query.scaladsl.ReadJournal
12+
13+
/**
14+
* Retrieve the latest timestamp for an entity type and slice range.
15+
*/
16+
trait LatestEventTimestampQuery extends ReadJournal {
17+
18+
def latestEventTimestamp(entityType: String, minSlice: Int, maxSlice: Int): Future[Option[Instant]]
19+
20+
}

0 commit comments

Comments
 (0)