Skip to content

Commit 97286a4

Browse files
Cache SV node state based queries (#1150)
* Cache SV node state based queries [ci] fixes #1141 Signed-off-by: Moritz Kiefer <moritz.kiefer@purelyfunctional.org> * fix cache metrics [ci] Signed-off-by: Moritz Kiefer <moritz.kiefer@purelyfunctional.org> * review [ci] Signed-off-by: Moritz Kiefer <moritz.kiefer@purelyfunctional.org> * fix [ci] Signed-off-by: Moritz Kiefer <moritz.kiefer@purelyfunctional.org> --------- Signed-off-by: Moritz Kiefer <moritz.kiefer@purelyfunctional.org> Co-authored-by: Moritz Kiefer <moritz.kiefer@purelyfunctional.org>
1 parent b3dc3fd commit 97286a4

File tree

13 files changed

+74
-18
lines changed

13 files changed

+74
-18
lines changed

apps/app/src/main/scala/org/lfdecentralizedtrust/splice/config/SpliceConfig.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import org.lfdecentralizedtrust.splice.scan.config.{
1515
BftSequencerConfig,
1616
ScanAppBackendConfig,
1717
ScanAppClientConfig,
18+
ScanCacheConfig,
1819
ScanSynchronizerConfig,
1920
}
2021
import org.lfdecentralizedtrust.splice.splitwell.config.{
@@ -427,6 +428,8 @@ object SpliceConfig {
427428
ConfigReader.forProduct3("migration-id", "sequencer-admin-client", "p2p-url")(
428429
BftSequencerConfig(_, _, _)
429430
)
431+
implicit val scanCacheConfigReader: ConfigReader[ScanCacheConfig] =
432+
deriveReader[ScanCacheConfig]
430433
implicit val scanConfigReader: ConfigReader[ScanAppBackendConfig] =
431434
deriveReader[ScanAppBackendConfig]
432435

@@ -794,6 +797,8 @@ object SpliceConfig {
794797
)
795798
implicit val scanConfigWriter: ConfigWriter[ScanAppBackendConfig] =
796799
deriveWriter[ScanAppBackendConfig]
800+
implicit val scanCacheConfigWriter: ConfigWriter[ScanCacheConfig] =
801+
deriveWriter[ScanCacheConfig]
797802

798803
implicit val svClientConfigWriter: ConfigWriter[SvAppClientConfig] =
799804
deriveWriter[SvAppClientConfig]

apps/app/src/test/resources/include/scans/_scan.conf

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,6 @@
1212
participant-client {
1313
ledger-api.auth-config.type = "none"
1414
}
15+
# Reduce in tests as SVs change relatively frequently here.
16+
cache.sv-node-state-ttl = 1s
1517
}

apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/ScanApp.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,7 @@ class ScanApp(
171171
},
172172
migrationInfo,
173173
participantId,
174+
config.cache.svNodeStateTtl,
174175
nodeMetrics.dbScanStore,
175176
)
176177
acsSnapshotStore = AcsSnapshotStore(

apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/admin/api/client/BftScanConnection.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1033,7 +1033,7 @@ object BftScanConnection {
10331033
): Future[Seq[DsoScan]] = {
10341034
for {
10351035
decentralizedSynchronizerId <- store.getDecentralizedSynchronizerId()
1036-
scans <- store.listDsoScans()
1036+
scans <- store.listCachedDsoScans()
10371037
domainScans <- scans
10381038
.find(_._1 == decentralizedSynchronizerId.toProtoPrimitive)
10391039
.map(e => Future.successful(e._2.filter(_.svName != ownSvName)))

apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/admin/http/HttpScanHandler.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -610,14 +610,13 @@ class HttpScanHandler(
610610
.map(ScanResource.ListValidatorLicensesResponse.OK)
611611
}
612612

613-
// TODO: (#7809) Add caching for sequencers per domain
614613
override def listDsoSequencers(
615614
respond: v0.ScanResource.ListDsoSequencersResponse.type
616615
)()(extracted: TraceContext): Future[v0.ScanResource.ListDsoSequencersResponse] = {
617616
implicit val tc = extracted
618617
withSpan(s"$workflowId.listDsoSequencers") { _ => _ =>
619618
store
620-
.listFromSvNodeStates { nodeState =>
619+
.listFromCachedSvNodeStates { nodeState =>
621620
for {
622621
(synchronizerId, domainConfig) <- nodeState.state.synchronizerNodes.asScala.toVector
623622
sequencers = for {
@@ -670,7 +669,7 @@ class HttpScanHandler(
670669
implicit val tc: TraceContext = extracted
671670
withSpan(s"$workflowId.listDsoScans") { _ => _ =>
672671
store
673-
.listDsoScans()
672+
.listCachedDsoScans()
674673
.map(list =>
675674
definitions.ListDsoScansResponse(list.map { case (synchronizerId, scans) =>
676675
definitions.DomainScans(

apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/config/ScanAppConfig.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ case class ScanAppBackendConfig(
4646
txLogBackfillEnabled: Boolean = true,
4747
txLogBackfillBatchSize: Int = 100,
4848
bftSequencers: Seq[BftSequencerConfig] = Seq.empty,
49+
cache: ScanCacheConfig = ScanCacheConfig(),
4950
) extends SpliceBackendConfig
5051
with BaseScanAppConfig // TODO(DACH-NY/canton-network-node#736): fork or generalize this trait.
5152
{
@@ -54,6 +55,10 @@ case class ScanAppBackendConfig(
5455
override def clientAdminApi: ClientConfig = adminApi.clientConfig
5556
}
5657

58+
final case class ScanCacheConfig(
59+
svNodeStateTtl: NonNegativeFiniteDuration = NonNegativeFiniteDuration.ofSeconds(30)
60+
)
61+
5762
case class ScanAppClientConfig(
5863
adminApi: NetworkAppClientConfig,
5964

apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/store/ScanStore.scala

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import org.lfdecentralizedtrust.splice.util.{
3636
ContractWithState,
3737
TemplateJsonDecoder,
3838
}
39+
import com.digitalasset.canton.config.NonNegativeFiniteDuration
3940
import com.digitalasset.canton.data.CantonTimestamp
4041
import com.digitalasset.canton.lifecycle.CloseContext
4142
import com.digitalasset.canton.logging.NamedLoggerFactory
@@ -97,26 +98,29 @@ trait ScanStore
9798
)
9899
)
99100

101+
protected def listCachedSvNodeStates()(implicit
102+
tc: TraceContext
103+
): Future[Seq[splice.dso.svstate.SvNodeState]]
104+
100105
/** Returns all items extracted by `f` from the DsoRules ensuring that they're sorted by synchronizerId,
101106
* so that the order is deterministic.
102107
*/
103-
def listFromSvNodeStates[T](
108+
def listFromCachedSvNodeStates[T](
104109
f: splice.dso.svstate.SvNodeState => Vector[(String, T)]
105110
)(implicit tc: TraceContext): Future[Vector[(String, Vector[T])]] = {
106111
for {
107-
dsoRules <- getDsoRulesWithState()
108-
nodeStates <- Future.traverse(dsoRules.payload.svs.asScala.keys) { svPartyId =>
109-
getSvNodeState(PartyId.tryFromProtoPrimitive(svPartyId))
110-
}
112+
nodeStates <- listCachedSvNodeStates()
111113
} yield {
112-
val items = nodeStates.toVector.flatMap(nodeState => f(nodeState.contract.payload))
114+
val items = nodeStates.toVector.flatMap(nodeState => f(nodeState))
113115
val itemsByDomain = items.groupBy(_._1).view.mapValues(_.map(_._2))
114116
itemsByDomain.toVector.sortBy(_._1)
115117
}
116118
}
117119

118-
def listDsoScans()(implicit tc: TraceContext): Future[Vector[(String, Vector[ScanInfo])]] = {
119-
listFromSvNodeStates { nodeState =>
120+
def listCachedDsoScans()(implicit
121+
tc: TraceContext
122+
): Future[Vector[(String, Vector[ScanInfo])]] = {
123+
listFromCachedSvNodeStates { nodeState =>
120124
for {
121125
(synchronizerId, domainConfig) <- nodeState.state.synchronizerNodes.asScala.toVector
122126
scan <- domainConfig.scan.toScala
@@ -314,6 +318,7 @@ object ScanStore {
314318
createScanAggregatesReader: DbScanStore => ScanAggregatesReader,
315319
domainMigrationInfo: DomainMigrationInfo,
316320
participantId: ParticipantId,
321+
svNodeStateCacheTtl: NonNegativeFiniteDuration,
317322
metrics: DbScanStoreMetrics,
318323
)(implicit
319324
ec: ExecutionContext,
@@ -331,6 +336,7 @@ object ScanStore {
331336
createScanAggregatesReader,
332337
domainMigrationInfo,
333338
participantId,
339+
svNodeStateCacheTtl,
334340
metrics,
335341
)
336342
case storageType => throw new RuntimeException(s"Unsupported storage type $storageType")

apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/store/db/DbScanStore.scala

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ package org.lfdecentralizedtrust.splice.scan.store.db
55

66
import com.daml.ledger.javaapi.data.codegen.ContractId
77
import com.digitalasset.canton.caching.ScaffeineCache
8-
import com.digitalasset.canton.config.NonNegativeDuration
8+
import com.digitalasset.canton.config.{NonNegativeDuration, NonNegativeFiniteDuration}
99
import com.digitalasset.canton.data.CantonTimestamp
1010
import com.digitalasset.canton.lifecycle.{
1111
AsyncCloseable,
@@ -83,6 +83,7 @@ import org.lfdecentralizedtrust.splice.store.db.TxLogQueries.TxLogStoreId
8383

8484
import java.time.Instant
8585
import scala.concurrent.{ExecutionContext, Future}
86+
import scala.jdk.CollectionConverters.*
8687

8788
object DbScanStore {
8889
type CacheKey = java.lang.Long // caffeine metrics function demands AnyRefs
@@ -97,6 +98,7 @@ class DbScanStore(
9798
createScanAggregatesReader: DbScanStore => ScanAggregatesReader,
9899
domainMigrationInfo: DomainMigrationInfo,
99100
participantId: ParticipantId,
101+
svNodeStateCacheTtl: NonNegativeFiniteDuration,
100102
storeMetrics: DbScanStoreMetrics,
101103
)(implicit
102104
override protected val ec: ExecutionContext,
@@ -587,14 +589,40 @@ class DbScanStore(
587589
Scaffeine()
588590
.maximumSize(1000),
589591
key => getUncachedTotalAmuletBalance(key),
590-
metrics = Some(storeMetrics.cache),
592+
metrics = Some(storeMetrics.totalAmuletBalanceCache),
591593
)(logger, "amuletBalanceCache")
592594
}
593595
// TODO(#800) remove when amulet expiry works again
594596
def getTotalAmuletBalance(asOfEndOfRound: Long): Future[BigDecimal] = {
595597
totalAmuletBalanceCache.get(asOfEndOfRound)
596598
}
597599

600+
private val svNodeStateCache: ScaffeineCache.TunnelledAsyncLoadingCache[
601+
Future,
602+
Unit,
603+
Seq[SvNodeState],
604+
] = {
605+
implicit val tc = TraceContext.empty
606+
ScaffeineCache.buildAsync[Future, Unit, Seq[SvNodeState]](
607+
Scaffeine()
608+
.expireAfterWrite(svNodeStateCacheTtl.asFiniteApproximation),
609+
_ => this.listSvNodeStates(),
610+
metrics = Some(storeMetrics.svNodeStateCache),
611+
)(logger, "svNodeStateCache")
612+
}
613+
614+
private def listSvNodeStates()(implicit tc: TraceContext): Future[Seq[SvNodeState]] =
615+
for {
616+
dsoRules <- getDsoRulesWithState()
617+
nodeStates <- Future.traverse(dsoRules.payload.svs.asScala.keys) { svPartyId =>
618+
getSvNodeState(PartyId.tryFromProtoPrimitive(svPartyId))
619+
}
620+
} yield nodeStates.map(_.contract.payload).toVector
621+
622+
protected override def listCachedSvNodeStates()(implicit
623+
tc: TraceContext
624+
): Future[Seq[SvNodeState]] = svNodeStateCache.get(())
625+
598626
protected override def getUncachedTotalAmuletBalance(asOfEndOfRound: Long)(implicit
599627
tc: TraceContext
600628
): Future[BigDecimal] =

apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/store/db/DbScanStoreMetrics.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,9 @@ class DbScanStoreMetrics(metricsFactory: LabeledMetricsFactory) extends AutoClos
3636
-1L,
3737
)(MetricsContext.Empty)
3838

39-
val cache = new CacheMetrics(prefix :+ "cache", metricsFactory)
39+
val totalAmuletBalanceCache =
40+
new CacheMetrics(prefix :+ "total_amulet_balance_cache", metricsFactory)
41+
val svNodeStateCache = new CacheMetrics(prefix :+ "sv_node_state_cache", metricsFactory)
4042

4143
val history = new HistoryMetrics(metricsFactory)(MetricsContext.Empty)
4244

apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/store/db/ScanAggregatesReader.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ object ScanAggregatesReader {
131131
decentralizedSynchronizerId <- store
132132
.getDecentralizedSynchronizerId()
133133
.map(_.unwrap.toProtoPrimitive)
134-
scans <- store.listDsoScans()
134+
scans <- store.listCachedDsoScans()
135135
scanUrls = scans
136136
.filter(_._1 == decentralizedSynchronizerId)
137137
.flatMap { case (_, scans) =>

0 commit comments

Comments
 (0)