@@ -29,6 +29,10 @@ class AcsSnapshotBulkStorage(
2929 // TODO(#3429): persist progress (or conclude it from the S3 storage), and start from latest successfully dumped snapshot upon restart
3030 private def getStartTimestamp : (Long , CantonTimestamp ) = (0 , CantonTimestamp .MinValue )
3131
32+ // When new snapshot is not yet available, how long to wait for a new one.
33+ // TODO(#3429): make it longer for prod (so consider making it configurable/overridable for tests)
34+ private val snapshotPollingInterval = 5 .seconds
35+
3236 private def getAcsSnapshotTimestampAfter (
3337 startMigrationId : Long ,
3438 startTimestamp : CantonTimestamp ,
@@ -38,7 +42,7 @@ class AcsSnapshotBulkStorage(
3842 case (lastMigrationId : Long , lastTimestamp : CantonTimestamp ) =>
3943 acsSnapshotStore.lookupSnapshotAfter(lastMigrationId, lastTimestamp).flatMap {
4044 case Some (snapshot) =>
41- logger.debug (
45+ logger.info (
4246 s " next snapshot available, at migration ${snapshot.migrationId}, record time ${snapshot.snapshotRecordTime}"
4347 )
4448 Future .successful(
@@ -50,7 +54,7 @@ class AcsSnapshotBulkStorage(
5054 )
5155 )
5256 case None =>
53- after(5 .seconds , actorSystem.scheduler) {
57+ after(snapshotPollingInterval , actorSystem.scheduler) {
5458 logger.debug(" No new snapshot available, sleeping..." )
5559 Future .successful(Some (((lastMigrationId, lastTimestamp), None )))
5660 }
@@ -59,21 +63,31 @@ class AcsSnapshotBulkStorage(
5963 .collect { case Some ((migrationId, timestamp)) => (migrationId, timestamp) }
6064 }
6165
62- private def mksrc (start : (Long , CantonTimestamp )): Source [(Long , CantonTimestamp ), (KillSwitch , Future [Done ])] = start match {
66+ /** *
67+ * This is the main implementation of the pipeline. It is a Pekko Source that gets a `start` timestamp, and starts dumping to S3
68+ * all snapshots (strictly) after `start`. It is an infinite source that should never complete.
69+ */
70+ private def mksrc (
71+ start : (Long , CantonTimestamp )
72+ ): Source [(Long , CantonTimestamp ), (KillSwitch , Future [Done ])] = start match {
6373 case (startMigrationId, startAfterTimestamp) =>
64- logger.debug(s " Starting ACS snapshot dump source, from migration $startMigrationId, timestamp $startAfterTimestamp" )
74+ logger.debug(
75+ s " Starting ACS snapshot dump source, from migration $startMigrationId, timestamp $startAfterTimestamp"
76+ )
6577 val base =
6678 getAcsSnapshotTimestampAfter(startMigrationId, startAfterTimestamp)
67- .via(SingleAcsSnapshotBulkStorage (config, acsSnapshotStore, s3Connection, loggerFactory))
79+ .via(SingleAcsSnapshotBulkStorage (config, acsSnapshotStore, s3Connection, loggerFactory))
6880
6981 val withKs = base.viaMat(KillSwitches .single)(Keep .right)
70- withKs.watchTermination() { case (ks, done) => (ks : KillSwitch , done)
71- }
82+ withKs.watchTermination() { case (ks, done) => (ks : KillSwitch , done) }
7283 }
7384
74- // TODO(#3429): once we persist the state, i.e. the last dumped snapshot, consider moving from Canton's PekkoUtil.restartSource
75- // to Pekko's built-in RestartSource (for now, it's convenient to use Canton's ability to track state via lastEmittedElement)
76- def getSource = {
85+ /** wraps mksrc (where the main pipeline logic is implemented) in a retry loop, to retry upon failures.
86+ */
87+ def getSource
88+ : Source [PekkoUtil .WithKillSwitch [(Long , CantonTimestamp )], (KillSwitch , Future [Done ])] = {
89+ // TODO(#3429): once we persist the state, i.e. the last dumped snapshot, consider moving from Canton's PekkoUtil.restartSource
90+ // to Pekko's built-in RestartSource (for now, it's convenient to use Canton's ability to track state via lastEmittedElement)
7791 // TODO(#3429): tweak the retry parameters here
7892 val delay = FiniteDuration (5 , " seconds" )
7993 val policy = new RetrySourcePolicy [(Long , CantonTimestamp ), (Long , CantonTimestamp )] {
@@ -83,8 +97,10 @@ class AcsSnapshotBulkStorage(
8397 lastFailure : Option [Throwable ],
8498 ): Option [(scala.concurrent.duration.FiniteDuration , (Long , CantonTimestamp ))] = {
8599 lastFailure.map { t =>
86- logger.warn(s " Writing ACS snapshot to bulk storage failed with : ${ErrorUtil
87- .messageWithStacktrace(t)}, will retry after delay of $delay from last successful timestamp $lastEmittedElement" )
100+ logger.warn(
101+ s " Writing ACS snapshot to bulk storage failed with : ${ErrorUtil
102+ .messageWithStacktrace(t)}, will retry after delay of $delay from last successful timestamp $lastEmittedElement"
103+ )
88104 // Always retry (TODO(#3429): consider a max number of retries?)
89105 delay -> lastEmittedElement.fold(lastState)(identity)
90106 }
0 commit comments