Skip to content

Commit b6a6255

Browse files
Add relevant scan connection metrics for validator [ci]
Signed-off-by: Julien Tinguely <julien.tinguely@digitalasset.com>
1 parent e5d4f93 commit b6a6255

File tree

6 files changed

+160
-8
lines changed

6 files changed

+160
-8
lines changed
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
// Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package org.lfdecentralizedtrust.splice.metrics
5+
6+
import com.daml.metrics.api.MetricHandle.{LabeledMetricsFactory, Meter, Timer}
7+
import com.daml.metrics.api.MetricQualification.{Latency, Traffic}
8+
import com.daml.metrics.api.{MetricInfo, MetricName, MetricsContext}
9+
import org.lfdecentralizedtrust.splice.environment.SpliceMetrics
10+
11+
class ScanConnectionMetrics(metricsFactory: LabeledMetricsFactory) extends AutoCloseable {
12+
private val prefix: MetricName = SpliceMetrics.MetricsPrefix :+ "validator" :+ "scan"
13+
14+
val latencyPerConnection: Timer =
15+
metricsFactory.timer(
16+
MetricInfo(
17+
name = prefix :+ "per_connection_latency",
18+
summary = "Latency per scan connection query",
19+
qualification = Latency,
20+
labelsWithDescription = Map(
21+
"scan_connection" -> "The scan connection of the request",
22+
"request" -> "Type of request",
23+
),
24+
)
25+
)(MetricsContext.Empty)
26+
27+
val failuresPerConnection: Meter = metricsFactory.meter(
28+
MetricInfo(
29+
name = prefix :+ "per_connection_errors",
30+
summary = "Count of per connection errors",
31+
qualification = Traffic,
32+
labelsWithDescription = Map(
33+
"scan_connection" -> "The scan connection of the request",
34+
"request" -> "Type of request",
35+
),
36+
)
37+
)(MetricsContext.Empty)
38+
39+
val bftReadLatency: Timer =
40+
metricsFactory.timer(
41+
MetricInfo(
42+
name = prefix :+ "bft_read_latency",
43+
summary = "Total latency after BFT reads",
44+
qualification = Latency,
45+
)
46+
)(MetricsContext.Empty)
47+
48+
val bftCallFailures: Meter = metricsFactory.meter(
49+
MetricInfo(
50+
name = prefix :+ "bft_errors",
51+
summary = "Count of BFT errors",
52+
qualification = Traffic,
53+
)
54+
)(MetricsContext.Empty)
55+
56+
override def close(): Unit = ???
57+
}

apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/admin/api/client/BftScanConnection.scala

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ import org.lfdecentralizedtrust.splice.codegen.java.splice.dsorules.{
9191
DsoRules_CloseVoteRequestResult,
9292
VoteRequest,
9393
}
94+
import org.lfdecentralizedtrust.splice.metrics.ScanConnectionMetrics
9495
import org.lfdecentralizedtrust.tokenstandard.{
9596
allocation,
9697
allocationinstruction,
@@ -114,6 +115,7 @@ class BftScanConnection(
114115
protected val clock: Clock,
115116
val retryProvider: RetryProvider,
116117
val loggerFactory: NamedLoggerFactory,
118+
val connectionMetrics: Option[ScanConnectionMetrics] = None,
117119
)(implicit protected val ec: ExecutionContextExecutor, protected val mat: Materializer)
118120
extends FlagCloseableAsync
119121
with NamedLogging
@@ -633,6 +635,10 @@ class BftScanConnection(
633635
LoggerUtil.logThrowableAtLevel(consensusFailureLogLevel, msg, exception)
634636
Future.failed(exception)
635637
} else {
638+
val timer = connectionMetrics match {
639+
case Some(metrics) => Some(metrics.bftReadLatency.startAsync())
640+
case None => None
641+
}
636642
retryProvider
637643
.retryForClientCalls(
638644
"bft_call",
@@ -647,12 +653,23 @@ class BftScanConnection(
647653
logger,
648654
(_: String) => ConsensusNotReachedRetryable,
649655
)
656+
.andThen { case _ =>
657+
timer match {
658+
case Some(t) => t.stop()
659+
case None =>
660+
}
661+
}
650662
.recoverWith { case c: ConsensusNotReached =>
651663
val httpError = HttpErrorWithHttpCode(
652664
StatusCodes.BadGateway,
653665
s"Failed to reach consensus from ${callConfig.requestsToDo} Scan nodes, requiring ${callConfig.targetSuccess} matching responses.",
654666
)
655667
LoggerUtil.logThrowableAtLevel(consensusFailureLogLevel, s"Consensus not reached.", c)
668+
connectionMetrics match {
669+
case Some(metrics) =>
670+
metrics.bftCallFailures.mark()
671+
case None =>
672+
}
656673
Future.failed(httpError)
657674
}
658675
}
@@ -1251,6 +1268,7 @@ object BftScanConnection {
12511268
clock: Clock,
12521269
retryProvider: RetryProvider,
12531270
loggerFactory: NamedLoggerFactory,
1271+
connectionMetrics: Option[ScanConnectionMetrics] = None,
12541272
lastPersistedScanUrlList: () => Future[Option[List[(String, String)]]] = () =>
12551273
Future.successful(None),
12561274
persistScanUrlsCallback: Seq[(String, String)] => Future[Unit] = _ => Future.unit,
@@ -1262,7 +1280,8 @@ object BftScanConnection {
12621280
templateDecoder: TemplateJsonDecoder,
12631281
): Future[BftScanConnection] = {
12641282

1265-
val builder = buildScanConnection(upgradesConfig, clock, retryProvider, loggerFactory)
1283+
val builder =
1284+
buildScanConnection(upgradesConfig, clock, retryProvider, loggerFactory, connectionMetrics)
12661285
val logger = loggerFactory.getTracedLogger(getClass)
12671286

12681287
config match {
@@ -1277,6 +1296,7 @@ object BftScanConnection {
12771296
clock,
12781297
retryProvider,
12791298
loggerFactory,
1299+
connectionMetrics,
12801300
)
12811301

12821302
case ts @ BftScanClientConfig.BftCustom(_, _, _, _, _, _) =>
@@ -1457,7 +1477,8 @@ object BftScanConnection {
14571477
httpClient: HttpClient,
14581478
templateDecoder: TemplateJsonDecoder,
14591479
): Future[BftScanConnection] = {
1460-
val builder = buildScanConnection(upgradesConfig, clock, retryProvider, loggerFactory)
1480+
val builder =
1481+
buildScanConnection(upgradesConfig, clock, retryProvider, loggerFactory, None)
14611482

14621483
for {
14631484
scans <- retryProvider.retry(
@@ -1512,6 +1533,7 @@ object BftScanConnection {
15121533
clock: Clock,
15131534
retryProvider: RetryProvider,
15141535
loggerFactory: NamedLoggerFactory,
1536+
connectionMetrics: Option[ScanConnectionMetrics],
15151537
)(implicit
15161538
ec: ExecutionContextExecutor,
15171539
tc: TraceContext,
@@ -1535,6 +1557,7 @@ object BftScanConnection {
15351557
// We only need f+1 Scans to be available, so as long as those are connected we don't need to slow init down.
15361558
// Furthermore, the refresh (either on init, or periodically) will retry anyway.
15371559
retryConnectionOnInitialFailure = false,
1560+
connectionMetrics,
15381561
)
15391562

15401563
sealed trait BftScanClientConfig {

apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/admin/api/client/ScanConnection.scala

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,15 @@ import com.digitalasset.canton.data.CantonTimestamp
3535
import com.digitalasset.canton.lifecycle.FlagCloseableAsync
3636
import com.digitalasset.canton.logging.{NamedLoggerFactory, TracedLogger}
3737
import com.digitalasset.canton.time.Clock
38-
import com.digitalasset.canton.topology.{SynchronizerId, PartyId}
38+
import com.digitalasset.canton.topology.{PartyId, SynchronizerId}
3939
import com.digitalasset.canton.tracing.TraceContext
4040
import io.grpc.Status
4141
import org.apache.pekko.stream.Materializer
4242
import org.lfdecentralizedtrust.splice.codegen.java.splice.dsorules.{
4343
DsoRules_CloseVoteRequestResult,
4444
VoteRequest,
4545
}
46+
import org.lfdecentralizedtrust.splice.metrics.ScanConnectionMetrics
4647

4748
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future}
4849
import scala.jdk.OptionConverters.*
@@ -268,6 +269,7 @@ object ScanConnection {
268269
clock: Clock,
269270
retryProvider: RetryProvider,
270271
loggerFactory: NamedLoggerFactory,
272+
connectionMetrics: Option[ScanConnectionMetrics] = None,
271273
retryConnectionOnInitialFailure: Boolean = true,
272274
)(implicit
273275
ec: ExecutionContextExecutor,
@@ -284,6 +286,7 @@ object ScanConnection {
284286
clock,
285287
retryProvider,
286288
loggerFactory,
289+
connectionMetrics,
287290
),
288291
retryConnectionOnInitialFailure,
289292
)
@@ -295,6 +298,7 @@ object ScanConnection {
295298
retryProvider: RetryProvider,
296299
loggerFactory: NamedLoggerFactory,
297300
retryConnectionOnInitialFailure: Boolean,
301+
connectionMetrics: Option[ScanConnectionMetrics] = None,
298302
)(implicit
299303
ec: ExecutionContextExecutor,
300304
tc: TraceContext,
@@ -303,7 +307,14 @@ object ScanConnection {
303307
templateDecoder: TemplateJsonDecoder,
304308
): Future[SingleScanConnection] =
305309
HttpAppConnection.checkVersionOrClose(
306-
new SingleScanConnection(config, upgradesConfig, clock, retryProvider, loggerFactory),
310+
new SingleScanConnection(
311+
config,
312+
upgradesConfig,
313+
clock,
314+
retryProvider,
315+
loggerFactory,
316+
connectionMetrics,
317+
),
307318
retryConnectionOnInitialFailure,
308319
)
309320

@@ -313,14 +324,22 @@ object ScanConnection {
313324
clock: Clock,
314325
retryProvider: RetryProvider,
315326
loggerFactory: NamedLoggerFactory,
327+
connectionMetrics: Option[ScanConnectionMetrics] = None,
316328
)(implicit
317329
ec: ExecutionContextExecutor,
318330
tc: TraceContext,
319331
mat: Materializer,
320332
httpClient: HttpClient,
321333
templateDecoder: TemplateJsonDecoder,
322334
): SingleScanConnection =
323-
new SingleScanConnection(config, upgradesConfig, clock, retryProvider, loggerFactory)
335+
new SingleScanConnection(
336+
config,
337+
upgradesConfig,
338+
clock,
339+
retryProvider,
340+
loggerFactory,
341+
connectionMetrics,
342+
)
324343

325344
private[client] case class CachedAmuletRules(
326345
cacheValidUntil: CantonTimestamp,

apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/admin/api/client/SingleScanConnection.scala

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ package org.lfdecentralizedtrust.splice.scan.admin.api.client
55

66
import cats.data.OptionT
77
import cats.syntax.either.*
8+
import com.daml.metrics.api.MetricsContext
9+
import com.daml.metrics.api.MetricsContext.Implicits.empty
810
import org.lfdecentralizedtrust.splice.codegen.java.splice.amulet.FeaturedAppRight
911
import org.lfdecentralizedtrust.splice.codegen.java.splice.amuletrules.{
1012
AmuletRules,
@@ -31,7 +33,7 @@ import org.lfdecentralizedtrust.splice.http.v0.definitions.{
3133
LookupTransferCommandStatusResponse,
3234
MigrationSchedule,
3335
}
34-
import org.lfdecentralizedtrust.splice.scan.admin.api.client.commands.{HttpScanAppClient}
36+
import org.lfdecentralizedtrust.splice.scan.admin.api.client.commands.HttpScanAppClient
3537
import org.lfdecentralizedtrust.splice.scan.config.ScanAppClientConfig
3638
import org.lfdecentralizedtrust.splice.scan.store.db.ScanAggregator
3739
import org.lfdecentralizedtrust.splice.store.HistoryBackfilling.SourceMigrationInfo
@@ -67,13 +69,18 @@ import org.lfdecentralizedtrust.splice.codegen.java.splice.dsorules.{
6769
VoteRequest,
6870
}
6971
import io.grpc.Status
72+
import org.apache.pekko.http.scaladsl.model.{HttpHeader, Uri}
73+
import org.lfdecentralizedtrust.splice.admin.api.client.commands.HttpCommand
7074
import org.lfdecentralizedtrust.splice.codegen.java.splice.api.token.transferinstructionv1
7175
import org.lfdecentralizedtrust.splice.codegen.java.splice.api.token.transferinstructionv1.TransferInstruction
7276
import org.lfdecentralizedtrust.splice.codegen.java.splice.api.token.allocationv1.Allocation
7377
import org.lfdecentralizedtrust.splice.codegen.java.splice.api.token.allocationinstructionv1
78+
import org.lfdecentralizedtrust.splice.metrics.ScanConnectionMetrics
7479
import org.lfdecentralizedtrust.splice.scan.admin.api.client.commands.HttpScanAppClient.BftSequencer
7580
import org.lfdecentralizedtrust.tokenstandard.transferinstruction.v1.definitions.TransferFactoryWithChoiceContext
7681

82+
import scala.util.{Failure, Success}
83+
7784
/** Connection to the admin API of CC Scan. This is used by other apps
7885
* to query for the DSO party id.
7986
*/
@@ -83,6 +90,7 @@ class SingleScanConnection private[client] (
8390
protected val clock: Clock,
8491
retryProvider: RetryProvider,
8592
outerLoggerFactory: NamedLoggerFactory,
93+
connectionMetrics: Option[ScanConnectionMetrics],
8694
)(implicit
8795
protected val ec: ExecutionContextExecutor,
8896
tc: TraceContext,
@@ -101,6 +109,38 @@ class SingleScanConnection private[client] (
101109
with HasUrl {
102110
import ScanRoundAggregatesDecoder.*
103111

112+
override def runHttpCmd[Res, Result](
113+
url: Uri,
114+
command: HttpCommand[Res, Result],
115+
headers: List[HttpHeader] = List.empty[HttpHeader],
116+
)(implicit
117+
templateDecoder: TemplateJsonDecoder,
118+
httpClient: HttpClient,
119+
tc: TraceContext,
120+
ec: ExecutionContext,
121+
mat: Materializer,
122+
): Future[Result] = {
123+
connectionMetrics match {
124+
case Some(metrics) =>
125+
MetricsContext.withExtraMetricLabels(
126+
("scan_connection", url.scheme),
127+
("request", command.fullName),
128+
) { m =>
129+
val timer = metrics.latencyPerConnection.startAsync()(m)
130+
super
131+
.runHttpCmd(url, command, headers)
132+
.andThen {
133+
case Failure(_) =>
134+
metrics.failuresPerConnection.mark()(m)
135+
timer.stop()(m)
136+
case Success(_) =>
137+
timer.stop()(m)
138+
}
139+
}
140+
case None => super.runHttpCmd(url, command, headers)
141+
}
142+
}
143+
104144
def url = config.adminApi.url
105145

106146
// cached DSO reference. Never changes.
@@ -743,13 +783,21 @@ class CachedScanConnection private[client] (
743783
clock: Clock,
744784
retryProvider: RetryProvider,
745785
outerLoggerFactory: NamedLoggerFactory,
786+
connectionMetrics: Option[ScanConnectionMetrics],
746787
)(implicit
747788
ec: ExecutionContextExecutor,
748789
tc: TraceContext,
749790
mat: Materializer,
750791
httpClient: HttpClient,
751792
templateDecoder: TemplateJsonDecoder,
752-
) extends SingleScanConnection(config, upgradesConfig, clock, retryProvider, outerLoggerFactory)
793+
) extends SingleScanConnection(
794+
config,
795+
upgradesConfig,
796+
clock,
797+
retryProvider,
798+
outerLoggerFactory,
799+
connectionMetrics,
800+
)
753801
with CachingScanConnection {
754802

755803
override protected val amuletRulesCacheTimeToLive: NonNegativeFiniteDuration =

apps/validator/src/main/scala/org/lfdecentralizedtrust/splice/validator/ValidatorApp.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,7 @@ class ValidatorApp(
219219
clock,
220220
retryProvider,
221221
loggerFactory,
222+
Some(metrics.scanConnections),
222223
ValidatorScanConnection.getPersistedScanList(configProvider),
223224
ValidatorScanConnection.persistScanUrlListBuilder(configProvider),
224225
)
@@ -750,6 +751,7 @@ class ValidatorApp(
750751
clock,
751752
retryProvider,
752753
loggerFactory,
754+
Some(metrics.scanConnections),
753755
ValidatorScanConnection.getPersistedScanList(configProvider),
754756
ValidatorScanConnection.persistScanUrlListBuilder(configProvider),
755757
)

apps/validator/src/main/scala/org/lfdecentralizedtrust/splice/validator/metrics/ValidatorAppMetrics.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,13 @@ package org.lfdecentralizedtrust.splice.validator.metrics
66
import com.daml.metrics.api.MetricHandle.LabeledMetricsFactory
77
import com.digitalasset.canton.metrics.DbStorageHistograms
88
import org.lfdecentralizedtrust.splice.BaseSpliceMetrics
9+
import org.lfdecentralizedtrust.splice.metrics.ScanConnectionMetrics
910

1011
/** Modelled after [[com.digitalasset.canton.synchronizer.metrics.DomainMetrics]].
1112
*/
1213
class ValidatorAppMetrics(
1314
metricsFactory: LabeledMetricsFactory,
1415
storageHistograms: DbStorageHistograms,
15-
) extends BaseSpliceMetrics("validator", metricsFactory, storageHistograms) {}
16+
) extends BaseSpliceMetrics("validator", metricsFactory, storageHistograms) {
17+
val scanConnections = new ScanConnectionMetrics(metricsFactory)
18+
}

0 commit comments

Comments
 (0)