Skip to content

Commit 5a89f7f

Browse files
Add more ingestion metrics (#2946)
--------- Signed-off-by: Oriol Muñoz <oriol.munoz@digitalasset.com>
1 parent 033b851 commit 5a89f7f

File tree

13 files changed

+342
-137
lines changed

13 files changed

+342
-137
lines changed

apps/common/src/main/scala/org/lfdecentralizedtrust/splice/automation/TxLogBackfillingTrigger.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -115,13 +115,16 @@ class TxLogBackfillingTrigger[TXE](
115115
historyMetrics.TxLogBackfilling.completed.updateValue(0)
116116
// Using MetricsContext.Empty is okay, because it's merged with the StoreMetrics context
117117
historyMetrics.TxLogBackfilling.latestRecordTime.updateValue(
118-
workDone.lastBackfilledRecordTime.toMicros
118+
workDone.lastBackfilledRecordTime
119119
)(MetricsContext.Empty)
120120
historyMetrics.TxLogBackfilling.updateCount.inc(
121121
workDone.backfilledUpdates
122122
)(MetricsContext.Empty)
123-
historyMetrics.TxLogBackfilling.eventCount.inc(workDone.backfilledEvents)(
124-
MetricsContext.Empty
123+
historyMetrics.TxLogBackfilling.eventCount.inc(workDone.backfilledCreatedEvents)(
124+
MetricsContext("event_type" -> "created")
125+
)
126+
historyMetrics.TxLogBackfilling.eventCount.inc(workDone.backfilledExercisedEvents)(
127+
MetricsContext("event_type" -> "exercised")
125128
)
126129
TaskSuccess("Backfilling step completed")
127130
case HistoryBackfilling.Outcome.MoreWorkAvailableLater =>

apps/common/src/main/scala/org/lfdecentralizedtrust/splice/environment/SpliceMetrics.scala

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,40 @@
33

44
package org.lfdecentralizedtrust.splice.environment
55

6-
import com.daml.metrics.api.MetricName
6+
import com.daml.metrics.api.MetricHandle.{Gauge, LabeledMetricsFactory}
7+
import com.daml.metrics.api.{MetricInfo, MetricName, MetricsContext}
8+
import com.digitalasset.canton.data.CantonTimestamp
79

810
object SpliceMetrics {
911

1012
val MetricsPrefix: MetricName = MetricName("splice")
1113

14+
private type CantonTimestampMicros = Long
15+
16+
def cantonTimestampGauge(
17+
metricsFactory: LabeledMetricsFactory,
18+
_info: MetricInfo,
19+
initial: CantonTimestamp,
20+
)(implicit mc: MetricsContext): Gauge[CantonTimestamp] = new Gauge[CantonTimestamp] {
21+
private val underlying: Gauge[CantonTimestampMicros] =
22+
metricsFactory.gauge(_info, initial.toMicros)
23+
24+
override def updateValue(newValue: CantonTimestamp)(implicit mc: MetricsContext): Unit =
25+
underlying.updateValue(newValue.toMicros)
26+
27+
override def updateValue(f: CantonTimestamp => CantonTimestamp): Unit =
28+
underlying.updateValue(micros => f(CantonTimestamp.assertFromLong(micros)).toMicros)
29+
30+
override def getValue: CantonTimestamp = CantonTimestamp.assertFromLong(underlying.getValue)
31+
32+
override def getValueAndContext: (CantonTimestamp, MetricsContext) = {
33+
val (micros, context) = underlying.getValueAndContext
34+
(CantonTimestamp.assertFromLong(micros), context)
35+
}
36+
37+
override def close(): Unit = underlying.close()
38+
39+
override def info: MetricInfo = underlying.info
40+
}
41+
1242
}

apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/HistoryBackfilling.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -357,7 +357,8 @@ object HistoryBackfilling {
357357
object DestinationHistory {
358358
case class InsertResult(
359359
backfilledUpdates: Long,
360-
backfilledEvents: Long,
360+
backfilledCreatedEvents: Long,
361+
backfilledExercisedEvents: Long,
361362
lastBackfilledRecordTime: CantonTimestamp,
362363
)
363364
}

apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/HistoryMetrics.scala

Lines changed: 80 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,17 @@
33

44
package org.lfdecentralizedtrust.splice.store
55

6-
import com.daml.metrics.api.MetricHandle.{Counter, Gauge, LabeledMetricsFactory, Meter}
6+
import com.daml.metrics.api.MetricHandle.{Counter, Gauge, LabeledMetricsFactory, Meter, Timer}
77
import com.daml.metrics.api.{MetricInfo, MetricName, MetricsContext}
8-
import com.daml.metrics.api.MetricQualification.{Debug, Traffic}
8+
import com.daml.metrics.api.MetricQualification.{Debug, Latency, Traffic}
99
import org.lfdecentralizedtrust.splice.environment.SpliceMetrics
1010
import com.digitalasset.canton.data.CantonTimestamp
11+
import org.lfdecentralizedtrust.splice.environment.ledger.api.{
12+
ReassignmentUpdate,
13+
TransactionTreeUpdate,
14+
TreeUpdate,
15+
TreeUpdateOrOffsetCheckpoint,
16+
}
1117

1218
class HistoryMetrics(metricsFactory: LabeledMetricsFactory)(implicit
1319
metricsContext: MetricsContext
@@ -17,17 +23,15 @@ class HistoryMetrics(metricsFactory: LabeledMetricsFactory)(implicit
1723
object UpdateHistoryBackfilling {
1824
private val historyBackfillingPrefix: MetricName = prefix :+ "backfilling"
1925

20-
type CantonTimestampMicros =
21-
Long // OpenTelemetry Gauges only allow numeric types and there's no way to map it
22-
lazy val latestRecordTime: Gauge[CantonTimestampMicros] =
23-
metricsFactory.gauge(
24-
MetricInfo(
25-
name = historyBackfillingPrefix :+ "latest-record-time",
26-
summary = "The latest record time that has been backfilled",
27-
Traffic,
28-
),
29-
initial = CantonTimestamp.MinValue.toMicros,
30-
)(metricsContext)
26+
lazy val latestRecordTime = SpliceMetrics.cantonTimestampGauge(
27+
metricsFactory,
28+
MetricInfo(
29+
name = historyBackfillingPrefix :+ "latest-record-time",
30+
summary = "The latest record time that has been backfilled",
31+
Traffic,
32+
),
33+
initial = CantonTimestamp.MinValue,
34+
)(metricsContext)
3135

3236
val updateCount: Counter =
3337
metricsFactory.counter(
@@ -61,16 +65,15 @@ class HistoryMetrics(metricsFactory: LabeledMetricsFactory)(implicit
6165
object TxLogBackfilling {
6266
private val historyBackfillingPrefix: MetricName = prefix :+ "txlog-backfilling"
6367

64-
type CantonTimestampMicros =
65-
Long // OpenTelemetry Gauges only allow numeric types and there's no way to map it
66-
val latestRecordTime: Gauge[CantonTimestampMicros] =
67-
metricsFactory.gauge(
68+
val latestRecordTime: Gauge[CantonTimestamp] =
69+
SpliceMetrics.cantonTimestampGauge(
70+
metricsFactory,
6871
MetricInfo(
6972
name = historyBackfillingPrefix :+ "latest-record-time",
7073
summary = "The latest record time that has been backfilled",
7174
Traffic,
7275
),
73-
initial = CantonTimestamp.MinValue.toMicros,
76+
initial = CantonTimestamp.MinValue,
7477
)(metricsContext)
7578

7679
val updateCount: Counter =
@@ -138,16 +141,15 @@ class HistoryMetrics(metricsFactory: LabeledMetricsFactory)(implicit
138141
object CorruptAcsSnapshots {
139142
private val corruptAcsSnapshotsPrefix: MetricName = prefix :+ "corrupt-acs-snapshots"
140143

141-
type CantonTimestampMicros =
142-
Long // OpenTelemetry Gauges only allow numeric types and there's no way to map it
143-
val latestRecordTime: Gauge[CantonTimestampMicros] =
144-
metricsFactory.gauge(
144+
val latestRecordTime: Gauge[CantonTimestamp] =
145+
SpliceMetrics.cantonTimestampGauge(
146+
metricsFactory,
145147
MetricInfo(
146148
name = corruptAcsSnapshotsPrefix :+ "latest-record-time",
147149
summary = "The record time of the latest corrupt snapshot that has been deleted",
148150
Traffic,
149151
),
150-
initial = CantonTimestamp.MinValue.toMicros,
152+
initial = CantonTimestamp.MinValue,
151153
)(metricsContext)
152154

153155
val count: Counter =
@@ -198,9 +200,64 @@ class HistoryMetrics(metricsFactory: LabeledMetricsFactory)(implicit
198200
)
199201
)(metricsContext)
200202

203+
val eventCount: Counter =
204+
metricsFactory.counter(
205+
MetricInfo(
206+
name = updateHistoryPrefix :+ "event-count",
207+
summary = "The number of events that have been ingested",
208+
Traffic,
209+
)
210+
)(metricsContext)
211+
212+
lazy val latestRecordTime: Gauge[CantonTimestamp] =
213+
SpliceMetrics.cantonTimestampGauge(
214+
metricsFactory,
215+
MetricInfo(
216+
name = updateHistoryPrefix :+ "latest-record-time",
217+
summary = "The latest record time that has been ingested",
218+
Traffic,
219+
),
220+
initial = CantonTimestamp.MinValue,
221+
)(metricsContext)
222+
223+
val latency: Timer =
224+
metricsFactory.timer(
225+
MetricInfo(
226+
name = updateHistoryPrefix :+ "latency",
227+
summary = "How long it takes to ingest a single update history entry",
228+
qualification = Latency,
229+
)
230+
)(metricsContext)
231+
232+
}
233+
234+
def metricsContextFromUpdate(
235+
treeUpdateOrOffsetCheckpoint: TreeUpdateOrOffsetCheckpoint,
236+
backfilling: Boolean,
237+
): MetricsContext = {
238+
treeUpdateOrOffsetCheckpoint match {
239+
case TreeUpdateOrOffsetCheckpoint.Update(treeUpdate, _) =>
240+
metricsContextFromUpdate(treeUpdate, backfilling)
241+
case TreeUpdateOrOffsetCheckpoint.Checkpoint(_) =>
242+
MetricsContext("update_type" -> "Checkpoint", "backfilling" -> backfilling.toString)
243+
}
244+
}
245+
246+
def metricsContextFromUpdate(
247+
treeUpdate: TreeUpdate,
248+
backfilling: Boolean,
249+
): MetricsContext = {
250+
val updateType = treeUpdate match {
251+
case ReassignmentUpdate(_) =>
252+
"ReassignmentUpdate"
253+
case TransactionTreeUpdate(_) =>
254+
"TransactionTreeUpdate"
255+
}
256+
MetricsContext("update_type" -> updateType, "backfilling" -> backfilling.toString)
201257
}
202258

203259
override def close(): Unit = {
260+
UpdateHistory.latestRecordTime.close()
204261
UpdateHistoryBackfilling.completed.close()
205262
UpdateHistoryBackfilling.latestRecordTime.close()
206263
TxLogBackfilling.completed.close()

apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/StoreHelper.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,12 @@
33

44
package org.lfdecentralizedtrust.splice.store
55

6+
import com.daml.ledger.javaapi.data.{ArchivedEvent, CreatedEvent, ExercisedEvent, Transaction}
67
import org.lfdecentralizedtrust.splice.codegen.java.splice.dsorules.VoteRequestOutcome as VRO
78
import org.lfdecentralizedtrust.splice.codegen.java.splice.dsorules.voterequestoutcome.VRO_Accepted
89

910
import java.time.Instant
11+
import scala.jdk.CollectionConverters.*
1012

1113
sealed trait VoteRequestOutcome {
1214
val effectiveAt: Option[Instant]
@@ -23,3 +25,19 @@ object VoteRequestOutcome {
2325
}
2426
}
2527
}
28+
29+
final case class IngestedEvents(numCreatedEvents: Long, numExercisedEvents: Long)
30+
object IngestedEvents {
31+
def eventCount(txs: Iterable[Transaction]): IngestedEvents =
32+
txs
33+
.foldLeft(IngestedEvents(0L, 0L)) { case (acc, next) =>
34+
next.getEventsById.asScala.foldLeft(acc) {
35+
case (acc, (_, _: ArchivedEvent | _: ExercisedEvent)) =>
36+
acc.copy(numExercisedEvents = acc.numExercisedEvents + 1)
37+
case (acc, (_, _: CreatedEvent)) =>
38+
acc.copy(numCreatedEvents = acc.numCreatedEvents + 1)
39+
case (_, (_, e)) =>
40+
throw new IllegalArgumentException(s"Unrecognized event type: $e")
41+
}
42+
}
43+
}

apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/StoreMetrics.scala

Lines changed: 55 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,19 @@
33

44
package org.lfdecentralizedtrust.splice.store
55

6-
import com.daml.metrics.api.MetricHandle.{Gauge, Histogram, LabeledMetricsFactory, Meter, Timer}
6+
import com.daml.metrics.api.MetricHandle.{
7+
Counter,
8+
Gauge,
9+
Histogram,
10+
LabeledMetricsFactory,
11+
Meter,
12+
Timer,
13+
}
714
import com.daml.metrics.api.MetricQualification.{Latency, Traffic}
815
import com.daml.metrics.api.{MetricInfo, MetricName, MetricsContext}
916
import com.digitalasset.canton.topology.SynchronizerId
1017
import org.lfdecentralizedtrust.splice.environment.SpliceMetrics
18+
import org.lfdecentralizedtrust.splice.environment.ledger.api.TreeUpdateOrOffsetCheckpoint
1119

1220
import scala.collection.concurrent.TrieMap
1321

@@ -46,6 +54,15 @@ class StoreMetrics(metricsFactory: LabeledMetricsFactory)(metricsContext: Metric
4654
)
4755
)(metricsContext)
4856

57+
val eventCount: Counter =
58+
metricsFactory.counter(
59+
MetricInfo(
60+
name = prefix :+ "event-count",
61+
summary = "The number of events that have been ingested",
62+
Traffic,
63+
)
64+
)(metricsContext)
65+
4966
val batchSize: Histogram = metricsFactory.histogram(
5067
MetricInfo(
5168
name = prefix :+ "ingestion-batch-size",
@@ -74,14 +91,50 @@ class StoreMetrics(metricsFactory: LabeledMetricsFactory)(metricsContext: Metric
7491
name = prefix :+ "last-ingested-record-time-ms",
7592
summary = "The most recent record time ingested by this store",
7693
Traffic,
77-
"The most recent record time ingested by this store for each synchronizer in milliseconds. Note that this only updates when the store processes a new transaction so if there is no activity the time won't update.",
94+
"The most recent record time ingested by this store for each synchronizer in milliseconds. " +
95+
"Note that this only updates when the store processes a new transaction so if there is no activity the time won't update.",
7896
),
7997
0L,
8098
)(metricsContext.merge(MetricsContext((Map("synchronizer_id" -> synchronizerId.toString))))),
8199
)
82100

101+
private val perSynchronizerLastSeenRecordTimeMs: TrieMap[SynchronizerId, Gauge[Long]] =
102+
TrieMap.empty
103+
104+
private def getLastSeenRecordTimeMsForSynchronizer(synchronizerId: SynchronizerId): Gauge[Long] =
105+
perSynchronizerLastSeenRecordTimeMs.getOrElseUpdate(
106+
synchronizerId,
107+
metricsFactory.gauge(
108+
MetricInfo(
109+
name = prefix :+ "last-seen-record-time-ms",
110+
summary =
111+
"The most recent record time this store has seen from the ledger (but not necessarily ingested)",
112+
Traffic,
113+
"The most recent record time seen by this store for each synchronizer in milliseconds. " +
114+
"This updates for every entry seen, regardless of whether it was ingested or filtered out.",
115+
),
116+
0L,
117+
)(metricsContext.merge(MetricsContext((Map("synchronizer_id" -> synchronizerId.toString))))),
118+
)
119+
120+
def updateLastSeenMetrics(updateOrCheckpoint: TreeUpdateOrOffsetCheckpoint): Unit = {
121+
updateOrCheckpoint match {
122+
case TreeUpdateOrOffsetCheckpoint.Update(update, synchronizerId) =>
123+
getLastSeenRecordTimeMsForSynchronizer(synchronizerId)
124+
.updateValue(update.recordTime.toEpochMilli)
125+
case TreeUpdateOrOffsetCheckpoint.Checkpoint(checkpoint) =>
126+
checkpoint.getSynchronizerTimes.forEach(syncTime =>
127+
getLastSeenRecordTimeMsForSynchronizer(
128+
SynchronizerId.tryFromString(syncTime.getSynchronizerId)
129+
)
130+
.updateValue(syncTime.getRecordTime.toEpochMilli)
131+
)
132+
}
133+
}
134+
83135
override def close(): Unit = {
84136
acsSize.close()
85137
perSynchronizerLastIngestedRecordTimeMs.values.foreach(_.close())
138+
perSynchronizerLastSeenRecordTimeMs.values.foreach(_.close())
86139
}
87140
}

0 commit comments

Comments
 (0)