@@ -28,7 +28,8 @@ class AcsSnapshotBulkStorage(
2828 extends NamedLogging {
2929
3030 // TODO(#3429): persist progress (or conclude it from the S3 storage), and start from latest successfully dumped snapshot upon restart
31- private def getStartTimestamp : Future [Option [(Long , CantonTimestamp )]] = kvProvider.getLatestAcsSnapshotInBulkStorage().value
31+ private def getStartTimestamp : Future [Option [(Long , CantonTimestamp )]] =
32+ kvProvider.getLatestAcsSnapshotInBulkStorage().value
3233
3334 // When new snapshot is not yet available, how long to wait for a new one.
3435 // TODO(#3429): make it longer for prod (so consider making it configurable/overridable for tests)
@@ -64,37 +65,47 @@ class AcsSnapshotBulkStorage(
6465 .collect { case Some ((migrationId, timestamp)) => (migrationId, timestamp) }
6566 }
6667
67- /** This is the main implementation of the pipeline. It is a Pekko Source that gets a `start` timestamp, and starts dumping to S3
68- * all snapshots (strictly) after `start`. It is an infinite source that should never complete.
68+ /** This is the main implementation of the pipeline. It is a Pekko Source that reads a `start` timestamp
69+ * from the DB, and starts dumping to S3 all snapshots (strictly) after `start`. After every snapshot that
70+ * is successfully dumped, it persists to the DB its timestamp.
71+ * It is an infinite source that should never complete.
6972 */
70- private def mksrc (): Source [(Long , CantonTimestamp ), (KillSwitch , Future [Done ])] = {
71- logger.debug(" Starting ACS snapshot dump source" )
72- val base =
73- Source .single[Unit ](())
74- .mapAsync(1 ){_ => getStartTimestamp}
75- .flatMapConcat {
76- case Some ((startMigrationId, startAfterTimestamp)) => getAcsSnapshotTimestampsAfter(startMigrationId, startAfterTimestamp)
77- case None => getAcsSnapshotTimestampsAfter(0 , CantonTimestamp .MinValue )
78- }
79- .via(
80- SingleAcsSnapshotBulkStorage .asFlow(
81- config,
82- acsSnapshotStore,
83- s3Connection,
84- loggerFactory,
73+ private def mksrc (): Source [(Long , CantonTimestamp ), (KillSwitch , Future [Done ])] = {
74+ val base =
75+ Source
76+ .single[Unit ](())
77+ .mapAsync(1 ) { _ => getStartTimestamp }
78+ .flatMapConcat {
79+ case Some ((startMigrationId, startAfterTimestamp)) =>
80+ logger.info(
81+ s " Latest dumped snapshot was from migration $startMigrationId, timestamp $startAfterTimestamp"
8582 )
83+ getAcsSnapshotTimestampsAfter(startMigrationId, startAfterTimestamp)
84+ case None =>
85+ logger.info(" Not dumped snapshots yet, starting from genesis" )
86+ getAcsSnapshotTimestampsAfter(0 , CantonTimestamp .MinValue )
87+ }
88+ .via(
89+ SingleAcsSnapshotBulkStorage .asFlow(
90+ config,
91+ acsSnapshotStore,
92+ s3Connection,
93+ loggerFactory,
8694 )
87- .mapAsync(1 ){case (migrationId, timestamp) =>
88- for {
89- _ <- kvProvider.setLatestAcsSnapshotsInBulkStorage(migrationId, timestamp)
90- } yield {
91- (migrationId, timestamp)
92- }
95+ )
96+ .mapAsync(1 ) { case (migrationId, timestamp) =>
97+ for {
98+ _ <- kvProvider.setLatestAcsSnapshotsInBulkStorage(migrationId, timestamp)
99+ } yield {
100+ logger.info(
101+ s " Successfully completed dumping snapshots from migration $migrationId, timestamp $timestamp"
102+ )
103+ (migrationId, timestamp)
93104 }
94-
105+ }
95106
96107 val withKs = base.viaMat(KillSwitches .single)(Keep .right)
97- withKs.watchTermination() { case (ks, done) => (ks : KillSwitch , done) }
108+ withKs.watchTermination() { case (ks, done) => (ks : KillSwitch , done) }
98109 }
99110
100111 /** wraps mksrc (where the main pipeline logic is implemented) in a retry loop, to retry upon failures.
0 commit comments