Skip to content

Commit 34fb459

Browse files
Fix BFT refresh retry loop (#1162)
--------- Signed-off-by: Oriol Muñoz <oriol.munoz@digitalasset.com>
1 parent d323414 commit 34fb459

File tree

2 files changed

+171
-69
lines changed

2 files changed

+171
-69
lines changed

apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/admin/api/client/BftScanConnection.scala

Lines changed: 93 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -884,75 +884,88 @@ object BftScanConnection {
884884
)(implicit tc: TraceContext): Future[ScanConnections] = {
885885
val currentState @ BftState(currentScanConnections, currentFailed) =
886886
currentScanConnectionsRef.get()
887-
val currentScans = (currentScanConnections.keys ++ currentFailed.keys).toSet
888887
logger.info(s"Started refreshing scan list from $currentState")
889-
getScans(connection).flatMap { scansInDsoRules =>
890-
val newScans = scansInDsoRules.filter(scan => !currentScans.contains(scan.publicUrl))
891-
val removedScans = currentScans.filter(url => !scansInDsoRules.exists(_.publicUrl == url))
892-
if (scansInDsoRules.isEmpty) {
893-
// This is expected on app init, and is retried when building the BftScanConnection
894-
Future.failed(
895-
io.grpc.Status.FAILED_PRECONDITION
896-
.withDescription(
897-
s"Scan list in DsoRules is empty. Last known list: $currentState"
898-
)
899-
.asRuntimeException()
900-
)
901-
} else if (newScans.isEmpty && removedScans.isEmpty && currentFailed.isEmpty) {
902-
logger.debug("Not updating scan list as there are no changes.")
903-
Future.successful(currentState.scanConnections)
904-
} else {
905-
for {
906-
(newScansFailedConnections, newScansSuccessfulConnections) <- attemptConnections(
907-
newScans
888+
889+
for {
890+
// if the previous state had too many failed scans, we cannot fetch the new list of scans.
891+
// thus, we retry all failed connections first.
892+
(retriedScansFailedConnections, retriedScansSuccessfulConnections) <- attemptConnections(
893+
currentFailed.map { case (uri, (_, svName)) =>
894+
DsoScan(uri, svName)
895+
}.toSeq
896+
)
897+
retriedCurrentState = BftState(
898+
currentScanConnections ++ retriedScansSuccessfulConnections,
899+
retriedScansFailedConnections.toMap,
900+
)
901+
// this state will be used to fetch the scans in the next call
902+
_ = currentScanConnectionsRef.set(retriedCurrentState)
903+
// these will be BFT-read, failing if there's no consensus
904+
scansInDsoRules <- getScans(connection)
905+
newState <- computeNewState(retriedCurrentState, scansInDsoRules)
906+
} yield {
907+
currentScanConnectionsRef.set(newState)
908+
logger.info(s"Updated scan list to $newState")
909+
910+
val connections = newState.scanConnections
911+
val defaultCallConfig = BftCallConfig.default(connections)
912+
// Most but not all calls will use the default config.
913+
// Fail early if there are not enough Scans for the default config
914+
if (!defaultCallConfig.enoughAvailableScans) {
915+
throw io.grpc.Status.FAILED_PRECONDITION
916+
.withDescription(
917+
s"There are not enough Scans to satisfy f=${connections.f}. Will be retried. State: $newState"
908918
)
909-
toRetry = currentFailed -- removedScans
910-
(retriedScansFailedConnections, retriedScansSuccessfulConnections) <-
911-
attemptConnections(
912-
toRetry.map { case (url, (_, svName)) => DsoScan(url, svName) }.toSeq
913-
)
914-
} yield {
915-
removedScans.foreach { url =>
916-
currentScanConnections.get(url).foreach { case (connection, svName) =>
917-
logger.info(
918-
s"Closing connection to scan of $svName ($url) as it's been removed from the DsoRules scan list."
919-
)
920-
attemptToClose(connection)
921-
}
922-
}
923-
(newScansFailedConnections ++ retriedScansFailedConnections).foreach {
924-
case (url, (err, svName)) =>
925-
// TODO(#815): abstract this pattern into the RetryProvider
926-
if (retryProvider.isClosing)
927-
logger.info(
928-
s"Suppressed warning, as we're shutting down: Failed to connect to scan of $svName ($url).",
929-
err,
930-
)
931-
else
932-
logger.warn(s"Failed to connect to scan of $svName ($url).", err)
933-
}
919+
.asRuntimeException()
920+
} else {
921+
connections
922+
}
923+
}
924+
}
925+
926+
private def computeNewState(
927+
currentState: BftState,
928+
scansInDsoRules: Seq[DsoScan],
929+
)(implicit tc: TraceContext): Future[BftState] = {
930+
val BftState(currentScanConnections, currentFailed) = currentState
931+
val currentScans = (currentScanConnections.keys ++ currentFailed.keys).toSet
934932

935-
val newState = BftState(
936-
currentScanConnections -- removedScans ++ newScansSuccessfulConnections ++ retriedScansSuccessfulConnections,
937-
(retriedScansFailedConnections ++ newScansFailedConnections).toMap,
933+
val newScans = scansInDsoRules.filter(scan => !currentScans.contains(scan.publicUrl))
934+
val removedScans = currentScans.filter(url => !scansInDsoRules.exists(_.publicUrl == url))
935+
if (scansInDsoRules.isEmpty) {
936+
// This is expected on app init, and is retried when building the BftScanConnection
937+
Future.failed(
938+
io.grpc.Status.FAILED_PRECONDITION
939+
.withDescription(
940+
s"Scan list in DsoRules is empty. Last known list: $currentState"
938941
)
939-
currentScanConnectionsRef.set(newState)
940-
logger.info(s"Updated scan list to $newState")
941-
942-
val connections = newState.scanConnections
943-
val defaultCallConfig = BftCallConfig.default(connections)
944-
// Most but not all calls will use the default config.
945-
// Fail early if there are not enough Scans for the default config
946-
if (!defaultCallConfig.enoughAvailableScans) {
947-
throw io.grpc.Status.FAILED_PRECONDITION
948-
.withDescription(
949-
s"There are not enough Scans to satisfy f=${connections.f}. Will be retried. State: $newState"
950-
)
951-
.asRuntimeException()
952-
} else {
953-
connections
942+
.asRuntimeException()
943+
)
944+
} else {
945+
for {
946+
(newScansFailedConnections, newScansSuccessfulConnections) <- attemptConnections(
947+
newScans
948+
)
949+
} yield {
950+
logger.info(
951+
s"New successful scans: ${newScansSuccessfulConnections.map(_._1)}, " +
952+
s"new failed scans: ${newScansFailedConnections.map(_._1)}, " +
953+
s"removed scans: $removedScans"
954+
)
955+
956+
removedScans.foreach { url =>
957+
currentScanConnections.get(url).foreach { case (connection, svName) =>
958+
logger.info(
959+
s"Closing connection to scan of $svName ($url) as it's been removed from the DsoRules scan list."
960+
)
961+
attemptToClose(connection)
954962
}
955963
}
964+
965+
BftState(
966+
(currentScanConnections -- removedScans) ++ newScansSuccessfulConnections,
967+
(currentFailed -- removedScans) ++ newScansFailedConnections,
968+
)
956969
}
957970
}
958971
}
@@ -969,12 +982,27 @@ object BftScanConnection {
969982
.traverse { scan =>
970983
logger.info(s"Attempting to connect to Scan: $scan.")
971984
connectionBuilder(scan.publicUrl)
972-
.transformWith(result =>
985+
.transformWith { result =>
986+
// logging
987+
result.failed.foreach { err =>
988+
// TODO(#815): abstract this pattern into the RetryProvider
989+
if (retryProvider.isClosing)
990+
logger.info(
991+
s"Suppressed warning, as we're shutting down: Failed to connect to scan of ${scan.svName} (${scan.publicUrl}).",
992+
err,
993+
)
994+
else
995+
logger.warn(
996+
s"Failed to connect to scan of ${scan.svName} (${scan.publicUrl}).",
997+
err,
998+
)
999+
}
1000+
// actual result
9731001
Future.successful(
9741002
result.toEither
9751003
.bimap(scan.publicUrl -> (_, scan.svName), scan.publicUrl -> (_, scan.svName))
9761004
)
977-
)
1005+
}
9781006
}
9791007
.map(_.partitionEither(identity))
9801008
}

apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/scan/admin/api/client/BftScanConnectionTest.scala

Lines changed: 78 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import com.google.protobuf.ByteString
1515
import org.apache.pekko.http.scaladsl.model.*
1616
import org.lfdecentralizedtrust.splice.admin.http.HttpErrorWithHttpCode
1717
import org.lfdecentralizedtrust.splice.codegen.java.splice.amuletrules as amuletrulesCodegen
18+
import org.lfdecentralizedtrust.splice.codegen.java.splice.amuletrules.AmuletRules
1819
import org.lfdecentralizedtrust.splice.codegen.java.splice.api.token.{
1920
holdingv1,
2021
metadatav1,
@@ -49,6 +50,7 @@ import org.scalatest.wordspec.AsyncWordSpec
4950
import org.slf4j.event.Level
5051

5152
import java.time.{Duration, Instant}
53+
import java.util.concurrent.atomic.AtomicInteger
5254
import scala.concurrent.{ExecutionContext, Future}
5355

5456
// mock verification triggers this
@@ -76,7 +78,11 @@ class BftScanConnectionTest
7678
}
7779
connections.foreach { connection =>
7880
// all of this is noise...
79-
when(connection.getAmuletRulesWithState(eqTo(None))(any[ExecutionContext], any[TraceContext]))
81+
when(
82+
connection.getAmuletRulesWithState(
83+
any[Option[ContractWithState[AmuletRules.ContractId, AmuletRules]]]
84+
)(any[ExecutionContext], any[TraceContext])
85+
)
8086
.thenReturn(
8187
Future.successful(
8288
ContractWithState(
@@ -184,6 +190,7 @@ class BftScanConnectionTest
184190
synchronizerId = synchronizerId,
185191
)
186192
}
193+
val refreshSeconds = 10000L
187194
def getBft(
188195
initialConnections: Seq[SingleScanConnection],
189196
connectionBuilder: Uri => Future[SingleScanConnection] = _ =>
@@ -198,7 +205,7 @@ class BftScanConnectionTest
198205
initialFailedConnections,
199206
connectionBuilder,
200207
Bft.getScansInDsoRules,
201-
NonNegativeFiniteDuration.ofSeconds(1),
208+
NonNegativeFiniteDuration.ofSeconds(refreshSeconds),
202209
retryProvider,
203210
loggerFactory,
204211
),
@@ -278,18 +285,85 @@ class BftScanConnectionTest
278285
connections.foreach(makeMockReturn(_, partyIdA))
279286

280287
// we initialize with just the first one, and the second one will be "built" when we refresh
281-
val bft = getBft(connections.take(1), _ => Future.successful(connections(1)))
282-
clock.advance(Duration.ofSeconds(2))
288+
val refreshCalled = new AtomicInteger(0)
289+
val bft = getBft(
290+
connections.take(1),
291+
_ => {
292+
refreshCalled.incrementAndGet()
293+
Future.successful(connections(1))
294+
},
295+
)
296+
clock.advance(Duration.ofSeconds(refreshSeconds + 1))
297+
// even after advancing it shouldn't refresh yet, as that's less than refreshSeconds
298+
clock.advance(Duration.ofSeconds(1))
299+
clock.advance(Duration.ofSeconds(1))
300+
clock.advance(Duration.ofSeconds(1))
301+
clock.advance(Duration.ofSeconds(1))
283302

284303
// eventually the refresh goes through and the second connection is used
285304
eventually() {
305+
refreshCalled.intValue() should be(1)
286306
val result = bft.getDsoPartyId().futureValue
287307
try { verify(connections(1), atLeast(1)).getDsoPartyId() }
288308
catch { case cause: MockitoAssertionError => fail("Mockito fail", cause) }
289309
result should be(partyIdA)
290310
}
291311
}
292312

313+
"refresh the list of scans faster if there are not enough available scans" in {
314+
val connections = getMockedConnections(n = 4)
315+
val connectionsMap = connections.map(c => c.config.adminApi.url -> c).toMap
316+
317+
connections.foreach(makeMockReturn(_, partyIdA))
318+
val refreshCalled = connections.map(_.config.adminApi.url -> new AtomicInteger(0)).toMap
319+
320+
loggerFactory.assertLogsSeq(SuppressionRule.Level(Level.WARN))(
321+
{
322+
// all failed until retried enough times
323+
val bft = getBft(
324+
Seq.empty,
325+
uri => {
326+
val calls = refreshCalled(uri).incrementAndGet()
327+
if (calls > 3) {
328+
Future.successful(connectionsMap(uri))
329+
} else {
330+
Future.failed(new RuntimeException("some'rror"))
331+
}
332+
},
333+
initialFailedConnections = connections
334+
.map(connection => connection.config.adminApi.url -> new RuntimeException("Failed"))
335+
.toMap,
336+
)
337+
// trigger the first refresh, this is only required in tests, prod code retries already on BftScanConnection init
338+
clock.advance(Duration.ofSeconds(refreshSeconds + 1))
339+
// and then refresh until it's called enough times
340+
eventually() {
341+
clock.advance(Duration.ofSeconds(1))
342+
forAll(refreshCalled) { case (_, calls) =>
343+
calls.intValue() should be >= 3
344+
}
345+
}
346+
347+
// eventually the refresh goes through and the second connection is used
348+
eventually() {
349+
bft.scanList.scanConnections.open should have size 4
350+
bft.scanList.scanConnections.failed should be(0)
351+
val result = bft.getDsoPartyId().futureValue
352+
try { verify(connections(1), atLeast(1)).getDsoPartyId() }
353+
catch { case cause: MockitoAssertionError => fail("Mockito fail", cause) }
354+
result should be(partyIdA)
355+
}
356+
},
357+
entries =>
358+
forAll(entries) { entry =>
359+
entry.warningMessage should include("Failed to connect to scan").or(
360+
include("which are fewer than the necessary")
361+
)
362+
},
363+
)
364+
365+
}
366+
293367
"fail if too many Scans failed to connect" in {
294368
// f = (1ok + 3bad - 1) / 3 = 1
295369
// 1 Scan is not enough for f+1=2

0 commit comments

Comments
 (0)