@@ -75,7 +75,7 @@ class SingleAcsSnapshotBulkStorage(
7575
7676 }
7777
78- private def getSource : Source [String , NotUsed ] = {
78+ private def getSource : Source [( Long , CantonTimestamp ) , NotUsed ] = {
7979 val idx = new AtomicInteger (0 )
8080 Source
8181 .unfoldAsync(Start : Position ) {
@@ -106,12 +106,17 @@ class SingleAcsSnapshotBulkStorage(
106106 objectKey
107107 }
108108 }
109+ // emit a Unit upon completion of the write to s3
110+ .fold(()) { case ((), _) => () }
111+ // emit back the (migration, timestamp) upon completion
112+ .map(_ => (migrationId, timestamp))
113+
109114 }
110115
111116 // TODO(#3429): I'm no longer sure the retrying source is actually useful,
112117 // we probably want to just rely on the of the full stream of ACS snapshot dumps (in AcsSnapshotBulkStorage),
113118 // but keeping it for now (and the corresponding unit test) until that is fully resolved
114- private def getRetryingSource : Source [WithKillSwitch [String ], (KillSwitch , Future [Done ])] = {
119+ private def getRetryingSource : Source [WithKillSwitch [( Long , CantonTimestamp ) ], (KillSwitch , Future [Done ])] = {
115120
116121 def mksrc = {
117122 val base = getSource
@@ -131,10 +136,10 @@ class SingleAcsSnapshotBulkStorage(
131136
132137 // TODO(#3429): tweak the retry parameters here
133138 val delay = FiniteDuration (5 , " seconds" )
134- val policy = new RetrySourcePolicy [Unit , String ] {
139+ val policy = new RetrySourcePolicy [Unit , ( Long , CantonTimestamp ) ] {
135140 override def shouldRetry (
136141 lastState : Unit ,
137- lastEmittedElement : Option [String ],
142+ lastEmittedElement : Option [( Long , CantonTimestamp ) ],
138143 lastFailure : Option [Throwable ],
139144 ): Option [(scala.concurrent.duration.FiniteDuration , Unit )] = {
140145 lastFailure.map { t =>
@@ -182,10 +187,6 @@ object SingleAcsSnapshotBulkStorage {
182187 s3Connection,
183188 loggerFactory,
184189 ).getSource
185- // emit a Unit upon completion
186- .fold(()) { case ((), _) => () }
187- // emit back the (migration, timestamp) upon completion
188- .map(_ => (migrationId, timestamp))
189190 }
190191
191192 /** The same flow as a source, currently used only for unit testing.
@@ -201,7 +202,7 @@ object SingleAcsSnapshotBulkStorage {
201202 actorSystem : ActorSystem ,
202203 tc : TraceContext ,
203204 ec : ExecutionContext ,
204- ): Source [WithKillSwitch [String ], (KillSwitch , Future [Done ])] =
205+ ): Source [WithKillSwitch [( Long , CantonTimestamp ) ], (KillSwitch , Future [Done ])] =
205206 new SingleAcsSnapshotBulkStorage (
206207 migrationId,
207208 timestamp,
0 commit comments