Skip to content

Commit a864b42

Browse files
Persisting Scan Urls in the Validator App (#3254)
[ci] Signed-off-by: Pasindu Tennage <pasindu.tennage@digitalasset.com>
1 parent 5bfee0a commit a864b42

File tree

9 files changed

+307
-110
lines changed

9 files changed

+307
-110
lines changed

apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/plugins/UseToxiproxy.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package org.lfdecentralizedtrust.splice.integration.plugins.toxiproxy
22

3-
import org.lfdecentralizedtrust.splice.config.{SpliceConfig, ParticipantClientConfig}
3+
import org.lfdecentralizedtrust.splice.config.{ParticipantClientConfig, SpliceConfig}
44
import org.lfdecentralizedtrust.splice.sv.config.SvParticipantClientConfig
55
import org.lfdecentralizedtrust.splice.environment.SpliceEnvironment
66
import org.lfdecentralizedtrust.splice.scan.admin.api.client.BftScanConnection.BftScanClientConfig
@@ -167,7 +167,7 @@ case class UseToxiproxy(
167167
BftScanClientConfig.TrustSingle(newUrl, amuletRulesCacheTimeToLive)
168168
),
169169
)
170-
case BftScanClientConfig.Bft(seedUrls, _, amuletRulesCacheTimeToLive) =>
170+
case BftScanClientConfig.Bft(seedUrls, _, amuletRulesCacheTimeToLive, _) =>
171171
val newUrl = addScanAppHttpProxy(n.unwrap, seedUrls.head, basePortBump)
172172
(
173173
n,
@@ -182,6 +182,7 @@ case class UseToxiproxy(
182182
_,
183183
amuletRulesCacheTimeToLive,
184184
scansRefreshInterval,
185+
_,
185186
) =>
186187
val newUrl = addScanAppHttpProxy(n.unwrap, seedUrls.head, basePortBump)
187188
(

apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/BftScanConnectionIntegrationTest.scala

Lines changed: 103 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,30 @@
1+
// Copyright (c) 2025 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
14
package org.lfdecentralizedtrust.splice.integration.tests
25

6+
import cats.data.{NonEmptyList, OptionT}
7+
import com.digitalasset.canton.config.NonNegativeFiniteDuration
38
import com.digitalasset.canton.logging.SuppressionRule
49
import com.digitalasset.canton.{HasActorSystem, HasExecutionContext}
510
import org.apache.pekko.http.scaladsl.Http
11+
import org.apache.pekko.http.scaladsl.model.Uri
612
import org.apache.pekko.http.scaladsl.model.headers.{Authorization, OAuth2BearerToken}
713
import org.lfdecentralizedtrust.splice.codegen.java.splice.api.token.transferinstructionv1.TransferInstruction
14+
import org.lfdecentralizedtrust.splice.config.ConfigTransforms
815
import org.lfdecentralizedtrust.splice.http.v0.wallet as http
916
import org.lfdecentralizedtrust.splice.http.v0.wallet.AcceptTokenStandardTransferResponse
1017
import org.lfdecentralizedtrust.splice.integration.EnvironmentDefinition
1118
import org.lfdecentralizedtrust.splice.integration.tests.SpliceTests.IntegrationTest
19+
import org.lfdecentralizedtrust.splice.scan.admin.api.client.BftScanConnection.BftScanClientConfig
1220
import org.lfdecentralizedtrust.splice.util.{SvTestUtil, WalletTestUtil}
21+
import org.lfdecentralizedtrust.splice.validator.store.ValidatorConfigProvider.ScanUrlInternalConfig
1322
import org.lfdecentralizedtrust.tokenstandard.transferinstruction
1423
import org.lfdecentralizedtrust.tokenstandard.transferinstruction.v1.GetTransferInstructionAcceptContextResponse
1524
import org.lfdecentralizedtrust.tokenstandard.transferinstruction.v1.definitions.GetChoiceContextRequest
1625
import org.slf4j.event.Level
1726

27+
import scala.concurrent.Future
1828
import scala.jdk.CollectionConverters.*
1929
import scala.jdk.OptionConverters.*
2030

@@ -30,6 +40,19 @@ class BftScanConnectionIntegrationTest
3040
override def environmentDefinition: SpliceEnvironmentDefinition =
3141
EnvironmentDefinition
3242
.simpleTopology4Svs(this.getClass.getSimpleName)
43+
.addConfigTransforms((_, config) =>
44+
ConfigTransforms.updateAllValidatorConfigs {
45+
case (name, c) if name == "aliceValidator" =>
46+
val dbEnabledConfig = BftScanClientConfig.Bft(
47+
seedUrls = NonEmptyList.of(
48+
Uri("http://127.0.0.1:5012")
49+
),
50+
scansRefreshInterval = NonNegativeFiniteDuration.ofSeconds(60),
51+
)
52+
c.copy(scanClient = dbEnabledConfig)
53+
case (_, c) => c
54+
}(config)
55+
)
3356
.withManualStart
3457

3558
"init fast enough even if there are unavailable scans" in { implicit env =>
@@ -66,7 +89,9 @@ class BftScanConnectionIntegrationTest
6689
.forall(msg =>
6790
msg
6891
.contains(s"Failed to connect to scan of ${getSvName(2)} (http://localhost:5112).") ||
69-
msg.contains("Encountered 4 consecutive transient failures")
92+
msg.contains("Encountered 4 consecutive transient failures") || msg.contains(
93+
"Failed to connect to scan of FAILED Seed URL #0 (http://localhost:5112)."
94+
)
7095
) should be(true)).withClue(s"Actual Logs: $logs"),
7196
)
7297

@@ -157,4 +182,81 @@ class BftScanConnectionIntegrationTest
157182
}
158183
}
159184

185+
"validator onboarding and recovery succeed with internal config turned on" in { implicit env =>
186+
startAllSync(
187+
sv1Backend,
188+
sv1ScanBackend,
189+
sv2Backend,
190+
sv2ScanBackend,
191+
sv3Backend,
192+
sv3ScanBackend,
193+
sv4Backend,
194+
sv4ScanBackend,
195+
)
196+
197+
loggerFactory.assertEventuallyLogsSeq(SuppressionRule.LevelAndAbove(Level.INFO))(
198+
{
199+
aliceValidatorBackend.startSync()
200+
},
201+
logs => {
202+
val aliceValidatorLogs = logs.filter(_.loggerName.contains("validator=aliceValidator"))
203+
val messages = aliceValidatorLogs.map(_.message)
204+
withClue("Validator should first bootstrap with 1 and then 4 scans") {
205+
messages.filter(_.contains(bootstrapsWith1UrlLog)) should have length 1
206+
messages.filter(_.contains(bootstrapsWith4UrlsLog)) should have length 1
207+
}.withClue(
208+
s"Actual Logs: \n ${messages.filter(_.contains(bootstrapsWith1UrlLog))} \n ${messages
209+
.filter(_.contains(bootstrapsWith4UrlsLog))}"
210+
)
211+
},
212+
)
213+
214+
val persistedState: OptionT[Future, Seq[ScanUrlInternalConfig]] =
215+
aliceValidatorBackend.appState.configProvider.getScanUrlInternalConfig()
216+
217+
val expectedConfigs = Seq(
218+
ScanUrlInternalConfig(getSvName(1), "http://localhost:5012"),
219+
ScanUrlInternalConfig(getSvName(2), "http://localhost:5112"),
220+
ScanUrlInternalConfig(getSvName(3), "http://localhost:5212"),
221+
ScanUrlInternalConfig(getSvName(4), "http://localhost:5312"),
222+
)
223+
224+
withClue("Persisted state should contain the expected internal scan configurations") {
225+
persistedState.value.futureValue.value should contain theSameElementsAs expectedConfigs
226+
}
227+
228+
loggerFactory.assertEventuallyLogsSeq(SuppressionRule.LevelAndAbove(Level.INFO))(
229+
{
230+
aliceValidatorBackend.stop()
231+
aliceValidatorBackend.startSync()
232+
},
233+
logs => {
234+
val aliceValidatorLogs = logs.filter(_.loggerName.contains("validator=aliceValidator"))
235+
val messages = aliceValidatorLogs.map(_.message)
236+
withClue(
237+
"Validator should bootstrap with all the scan urls persisted to the internal store"
238+
) {
239+
messages.filter(_.contains(bootstrapsWith1UrlLog)) should have length 0
240+
messages.filter(_.contains(bootstrapsWith4UrlsLog)) should have length 2
241+
}.withClue(
242+
s"Actual Logs: \n ${messages.filter(_.contains(bootstrapsWith1UrlLog))} \n ${messages
243+
.filter(_.contains(bootstrapsWith4UrlsLog))} "
244+
)
245+
},
246+
)
247+
248+
withClue("Alice's validator should be able to onboard a user after establishing connections.") {
249+
eventuallySucceeds() {
250+
aliceValidatorBackend.onboardUser(aliceWalletClient.config.ledgerApiUser)
251+
}
252+
}
253+
254+
}
255+
256+
private val bootstrapsWith1UrlLog =
257+
s"Validator bootstrapping with 1 seed URLs: List(http://127.0.0.1:5012)"
258+
259+
private val bootstrapsWith4UrlsLog =
260+
s"Validator bootstrapping with 4 seed URLs: List(http://localhost:5012, http://localhost:5112, http://localhost:5212, http://localhost:5312)"
261+
160262
}

apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/admin/api/client/BftScanConnection.scala

Lines changed: 63 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -870,6 +870,7 @@ object BftScanConnection {
870870
protected val initialScanConnections: Seq[SingleScanConnection]
871871
protected val initialFailedConnections: Map[Uri, Throwable]
872872
protected val connectionBuilder: Uri => Future[SingleScanConnection]
873+
protected val scanUrlsChangedCallback: Seq[(String, String)] => Future[Unit]
873874
protected val getScans: BftScanConnection => Future[Seq[DsoScan]]
874875
val scansRefreshInterval: NonNegativeFiniteDuration
875876
val retryProvider: RetryProvider
@@ -944,10 +945,16 @@ object BftScanConnection {
944945

945946
filteredScans = filterScans(scansInDsoRules)
946947

948+
dsoScanSeq: Seq[(String, String)] = filteredScans.map(scan =>
949+
(scan.svName, scan.publicUrl.toString)
950+
)
951+
952+
_ = scanUrlsChangedCallback(dsoScanSeq)
953+
947954
newState <- computeNewState(retriedCurrentState, filteredScans)
948955
} yield {
949956
currentScanConnectionsRef.set(newState)
950-
logger.info(s"Updated scan list to $newState")
957+
logger.info(s"Updated scan list with ${dsoScanSeq.length} scans: $newState")
951958

952959
val connections = newState.scanConnections
953960
validateState(newState)
@@ -993,7 +1000,6 @@ object BftScanConnection {
9931000
attemptToClose(connection)
9941001
}
9951002
}
996-
9971003
BftState(
9981004
(currentScanConnections -- removedScans) ++ newScansSuccessfulConnections,
9991005
(currentFailed -- removedScans) ++ newScansFailedConnections,
@@ -1064,6 +1070,7 @@ object BftScanConnection {
10641070
override val initialScanConnections: Seq[SingleScanConnection],
10651071
override val initialFailedConnections: Map[Uri, Throwable],
10661072
override val connectionBuilder: Uri => Future[SingleScanConnection],
1073+
protected val scanUrlsChangedCallback: Seq[(String, String)] => Future[Unit],
10671074
override val getScans: BftScanConnection => Future[Seq[DsoScan]],
10681075
override val scansRefreshInterval: NonNegativeFiniteDuration,
10691076
override val retryProvider: RetryProvider,
@@ -1090,6 +1097,7 @@ object BftScanConnection {
10901097
override val initialScanConnections: Seq[SingleScanConnection],
10911098
override val initialFailedConnections: Map[Uri, Throwable],
10921099
override val connectionBuilder: Uri => Future[SingleScanConnection],
1100+
protected val scanUrlsChangedCallback: Seq[(String, String)] => Future[Unit],
10931101
override val getScans: BftScanConnection => Future[Seq[DsoScan]],
10941102
override val scansRefreshInterval: NonNegativeFiniteDuration,
10951103
override val retryProvider: RetryProvider,
@@ -1180,12 +1188,16 @@ object BftScanConnection {
11801188
retryProvider: RetryProvider,
11811189
loggerFactory: NamedLoggerFactory,
11821190
builder: (Uri, NonNegativeFiniteDuration) => Future[SingleScanConnection],
1191+
refreshScanUrlsCallback: Seq[(String, String)] => Future[Unit],
11831192
)(implicit
11841193
ec: ExecutionContextExecutor,
11851194
tc: TraceContext,
11861195
mat: Materializer,
11871196
): Future[BftScanConnection] = {
11881197
val logger = loggerFactory.getTracedLogger(getClass)
1198+
1199+
logger.info(s"Validator bootstrapping with ${seedUrls.size} seed URLs: ${seedUrls.toList}")
1200+
11891201
for {
11901202
initialSeedConnections <- seedUrls.traverse(uri =>
11911203
builder(uri, amuletRulesCacheTimeToLive).transformWith {
@@ -1211,6 +1223,7 @@ object BftScanConnection {
12111223
successfulSeedConnections,
12121224
failedSeeds.toMap,
12131225
uri => builder(uri, amuletRulesCacheTimeToLive),
1226+
refreshScanUrlsCallback,
12141227
Bft.getScansInDsoRules,
12151228
scansRefreshInterval,
12161229
retryProvider,
@@ -1238,6 +1251,9 @@ object BftScanConnection {
12381251
clock: Clock,
12391252
retryProvider: RetryProvider,
12401253
loggerFactory: NamedLoggerFactory,
1254+
lastPersistedScanUrlList: () => Future[Option[List[(String, String)]]] = () =>
1255+
Future.successful(None),
1256+
persistScanUrlsCallback: Seq[(String, String)] => Future[Unit] = _ => Future.unit,
12411257
)(implicit
12421258
ec: ExecutionContextExecutor,
12431259
tc: TraceContext,
@@ -1263,21 +1279,38 @@ object BftScanConnection {
12631279
loggerFactory,
12641280
)
12651281

1266-
case ts @ BftScanClientConfig.BftCustom(_, _, _, _, _) =>
1282+
case ts @ BftScanClientConfig.BftCustom(_, _, _, _, _, _) =>
12671283
// We bootstrap with the set of provided seed-urls.
12681284
// Since not all trusted SV seeds are provided (most likely), they will not be used in the initial scan connection checking.
12691285
// In the future, add a new threshold for how many trusted seed-urls should be there.
1270-
12711286
for {
1287+
lastPersistedScans <- lastPersistedScanUrlList()
1288+
bootstrapUris: NonEmptyList[Uri] =
1289+
if (ts.useLastKnownConnectionsForInitialization) {
1290+
lastPersistedScans match {
1291+
case Some(list) if list.nonEmpty =>
1292+
val urlStrings: List[String] = list.map(_._2)
1293+
val uris: List[Uri] = urlStrings.map(u => Uri(u))
1294+
NonEmptyList.fromList(uris).getOrElse {
1295+
ts.seedUrls
1296+
}
1297+
case _ =>
1298+
ts.seedUrls
1299+
}
1300+
} else {
1301+
ts.seedUrls
1302+
}
12721303
tempBftConnection <- bootstrapWithSeedNodes(
1273-
ts.seedUrls,
1304+
bootstrapUris,
12741305
ts.amuletRulesCacheTimeToLive,
12751306
spliceLedgerClient,
12761307
ts.scansRefreshInterval,
12771308
clock,
12781309
retryProvider,
12791310
loggerFactory,
12801311
builder,
1312+
if (ts.useLastKnownConnectionsForInitialization) { persistScanUrlsCallback }
1313+
else { _ => Future.unit },
12811314
)
12821315

12831316
// Use the temporary connection to get a consensus on the full list of scans
@@ -1318,6 +1351,8 @@ object BftScanConnection {
13181351
connections,
13191352
failed.toMap,
13201353
uri => builder(uri, ts.amuletRulesCacheTimeToLive),
1354+
if (ts.useLastKnownConnectionsForInitialization) { persistScanUrlsCallback }
1355+
else { _ => Future.unit },
13211356
Bft.getScansInDsoRules,
13221357
ts.scansRefreshInterval,
13231358
retryProvider,
@@ -1352,18 +1387,36 @@ object BftScanConnection {
13521387
)
13531388
} yield bftConnection
13541389

1355-
case bft @ BftScanClientConfig.Bft(_, _, _) =>
1390+
case bft @ BftScanClientConfig.Bft(_, _, _, _) =>
13561391
for {
1392+
lastPersistedScans <- lastPersistedScanUrlList()
1393+
bootstrapUris: NonEmptyList[Uri] =
1394+
if (bft.useLastKnownConnectionsForInitialization) {
1395+
lastPersistedScans match {
1396+
case Some(list) if list.nonEmpty =>
1397+
val urlStrings: List[String] = list.map(_._2)
1398+
val uris: List[Uri] = urlStrings.map(u => Uri(u))
1399+
NonEmptyList.fromList(uris).getOrElse {
1400+
bft.seedUrls
1401+
}
1402+
case _ =>
1403+
bft.seedUrls
1404+
}
1405+
} else {
1406+
bft.seedUrls
1407+
}
13571408

13581409
bftConnection <- bootstrapWithSeedNodes(
1359-
bft.seedUrls,
1410+
bootstrapUris,
13601411
bft.amuletRulesCacheTimeToLive,
13611412
spliceLedgerClient,
13621413
bft.scansRefreshInterval,
13631414
clock,
13641415
retryProvider,
13651416
loggerFactory,
13661417
builder,
1418+
if (bft.useLastKnownConnectionsForInitialization) { persistScanUrlsCallback }
1419+
else { _ => Future.unit },
13671420
)
13681421
_ <- retryProvider.waitUntil(
13691422
RetryFor.WaitingOnInitDependency,
@@ -1437,6 +1490,7 @@ object BftScanConnection {
14371490
connections,
14381491
failed.toMap,
14391492
uri => builder(uri, amuletRulesCacheTimeToLive),
1493+
_ => Future.unit,
14401494
_ => Bft.getPeerScansFromStore(store, svName),
14411495
scansRefreshInterval,
14421496
retryProvider,
@@ -1505,6 +1559,7 @@ object BftScanConnection {
15051559
ScanAppClientConfig.DefaultAmuletRulesCacheTimeToLive,
15061560
scansRefreshInterval: NonNegativeFiniteDuration =
15071561
ScanAppClientConfig.DefaultScansRefreshInterval,
1562+
useLastKnownConnectionsForInitialization: Boolean = true,
15081563
) extends BftScanClientConfig {
15091564
def setAmuletRulesCacheTimeToLive(ttl: NonNegativeFiniteDuration): BftCustom =
15101565
copy(amuletRulesCacheTimeToLive = ttl)
@@ -1516,6 +1571,7 @@ object BftScanConnection {
15161571
ScanAppClientConfig.DefaultScansRefreshInterval,
15171572
amuletRulesCacheTimeToLive: NonNegativeFiniteDuration =
15181573
ScanAppClientConfig.DefaultAmuletRulesCacheTimeToLive,
1574+
useLastKnownConnectionsForInitialization: Boolean = true,
15191575
) extends BftScanClientConfig {
15201576
def setAmuletRulesCacheTimeToLive(ttl: NonNegativeFiniteDuration): Bft =
15211577
copy(amuletRulesCacheTimeToLive = ttl)

apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/scan/admin/api/client/BftScanConnectionTest.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,7 @@ class BftScanConnectionTest
209209
initialConnections,
210210
initialFailedConnections,
211211
connectionBuilder,
212+
_ => Future.unit,
212213
Bft.getScansInDsoRules,
213214
NonNegativeFiniteDuration.ofSeconds(refreshSeconds),
214215
retryProvider,

0 commit comments

Comments
 (0)