33
44package org .lfdecentralizedtrust .splice .scan .store .bulk
55
6- import scala .concurrent .ExecutionContext
76import com .digitalasset .canton .data .CantonTimestamp
87import com .digitalasset .canton .logging .{NamedLoggerFactory , NamedLogging }
98import com .digitalasset .canton .tracing .TraceContext
10- import com .digitalasset .canton .util .{ErrorUtil , PekkoUtil }
11- import com .digitalasset .canton .util .PekkoUtil .{RetrySourcePolicy , WithKillSwitch }
9+ import org .apache .pekko .NotUsed
1210import org .apache .pekko .actor .ActorSystem
13- import org .apache .pekko .stream .{KillSwitch , KillSwitches , OverflowStrategy }
14- import org .apache .pekko .stream .scaladsl .{Keep , Source }
15- import org .apache .pekko .util .ByteString
16- import org .lfdecentralizedtrust .splice .scan .admin .http .CompactJsonScanHttpEncodings
11+ import org .apache .pekko .stream .scaladsl .Source
12+ import org .apache .pekko .pattern .after
1713import org .lfdecentralizedtrust .splice .scan .store .AcsSnapshotStore
18- import org .lfdecentralizedtrust .splice .store .HardLimit
1914
20- import scala .concurrent .Future
21- import io .circe .syntax .*
22-
23- import java .nio .ByteBuffer
24- import java .nio .charset .StandardCharsets
25- import java .util .concurrent .atomic .AtomicInteger
26- import scala .concurrent .duration .FiniteDuration
27- import Position .*
28- import org .apache .pekko .Done
29-
30- object Position {
31- sealed trait Position
32-
33- case object Start extends Position
34-
35- case object End extends Position
36-
37- final case class Index (value : Long ) extends Position
38- }
15+ import scala .concurrent .{ExecutionContext , Future }
16+ import scala .concurrent .duration .*
3917
4018class AcsSnapshotBulkStorage (
4119 val config : BulkStorageConfig ,
@@ -45,92 +23,36 @@ class AcsSnapshotBulkStorage(
4523)(implicit actorSystem : ActorSystem , tc : TraceContext , ec : ExecutionContext )
4624 extends NamedLogging {
4725
48- private def getAcsSnapshotChunk (
49- migrationId : Long ,
50- timestamp : CantonTimestamp ,
51- after : Option [Long ],
52- ): Future [(Position , ByteString )] = {
53- for {
54- snapshot <- acsSnapshotStore.queryAcsSnapshot(
55- migrationId,
56- snapshot = timestamp,
57- after,
58- HardLimit .tryCreate(config.dbReadChunkSize),
59- Seq .empty,
60- Seq .empty,
61- )
62- } yield {
63- val encoded = snapshot.createdEventsInPage.map(event =>
64- CompactJsonScanHttpEncodings ().javaToHttpCreatedEvent(event.eventId, event.event)
65- )
66- val contractsStr = encoded.map(_.asJson.noSpacesSortKeys).mkString(" \n " ) + " \n "
67- val contractsBytes = ByteString (contractsStr.getBytes(StandardCharsets .UTF_8 ))
68- logger.debug(
69- s " Read ${encoded.length} contracts from ACS, to a bytestring of size ${contractsBytes.length} bytes "
70- )
71- (snapshot.afterToken.fold(End : Position )(Index (_)), contractsBytes)
72- }
73-
74- }
75-
76- private [bulk] def getSingleAcsSnapshotDumpSource (
77- migrationId : Long ,
78- timestamp : CantonTimestamp ,
79- ): Source [WithKillSwitch [Int ], (KillSwitch , Future [Done ])] = {
80-
81- def mksrc = {
82- val idx = new AtomicInteger (0 )
83- val base = Source
84- .unfoldAsync(Start : Position ) {
85- case Start => getAcsSnapshotChunk(migrationId, timestamp, None ).map(Some (_))
86- case Index (i) => getAcsSnapshotChunk(migrationId, timestamp, Some (i)).map(Some (_))
87- case End => Future .successful(None )
88- }
89- .via(ZstdGroupedWeight (config.maxFileSize))
90- // Add a buffer so that the next object continues accumulating while we write the previous one
91- .buffer(
92- 1 ,
93- OverflowStrategy .backpressure,
94- )
95- .mapAsync(1 ) { case ByteStringWithTermination (zstdObj, isLast) =>
96- val objectKey = if (isLast) s " snapshot_ ${idx}_last.zstd " else s " snapshot_ $idx.zstd "
97- // TODO(#3429): For now, we accumulate the full object in memory, then write it as a whole.
98- // Consider streaming it to S3 instead. Need to make sure that it then handles crashes correctly,
99- // i.e. that until we tell S3 that we're done writing, if we stop, then S3 throws away the
100- // partially written object.
101- for {
102- _ <- s3Connection.writeFullObject(objectKey, ByteBuffer .wrap(zstdObj.toArrayUnsafe()))
103- } yield {
104- idx.addAndGet(1 )
26+ // TODO(#3429): persist progress, and start from latest successfully dumped snapshot upon restart
27+ private def getStartTimestamp : (Long , CantonTimestamp ) = (0 , CantonTimestamp .MinValue )
28+
29+ private def timestampsSource : Source [(Long , CantonTimestamp ), NotUsed ] = {
30+ Source
31+ .unfoldAsync(getStartTimestamp) {
32+ case (lastMigrationId : Long , lastTimestamp : CantonTimestamp ) =>
33+ acsSnapshotStore.lookupSnapshotAfter(lastMigrationId, lastTimestamp).flatMap {
34+ case Some (snapshot) =>
35+ logger.debug(
36+ s " next snapshot available, at migration ${snapshot.migrationId}, record time ${snapshot.snapshotRecordTime}"
37+ )
38+ Future .successful(
39+ Some (
40+ (snapshot.migrationId, snapshot.snapshotRecordTime),
41+ Some (snapshot.migrationId, snapshot.snapshotRecordTime),
42+ )
43+ )
44+ case None =>
45+ after(5 .seconds, actorSystem.scheduler) {
46+ Future .successful(Some ((lastMigrationId, lastTimestamp), None ))
47+ }
10548 }
106- }
107- val withKs = base.viaMat(KillSwitches .single)(Keep .right)
108- withKs.watchTermination() { case (ks, done) => (ks : KillSwitch , done) }
109- }
110-
111- // TODO(#3429): tweak the retry parameters here
112- val delay = FiniteDuration (5 , " seconds" )
113- val policy = new RetrySourcePolicy [Unit , Int ] {
114- override def shouldRetry (
115- lastState : Unit ,
116- lastEmittedElement : Option [Int ],
117- lastFailure : Option [Throwable ],
118- ): Option [(scala.concurrent.duration.FiniteDuration , Unit )] = {
119- lastFailure.map { t =>
120- logger.warn(s " Writing ACS snapshot to bulk storage failed with : ${ErrorUtil
121- .messageWithStacktrace(t)}, will retry after delay of ${delay}" )
122- // Always retry (TODO(#3429): consider a max number of retries?)
123- delay -> ()
124- }
12549 }
126- }
50+ .collect { case Some ((migrationId, timestamp)) => (migrationId, timestamp) }
51+ }
12752
128- PekkoUtil
129- .restartSource(
130- name = " acs-snapshot-dump" ,
131- initial = (),
132- mkSource = (_ : Unit ) => mksrc,
133- policy = policy,
134- )
53+ def getSource : Source [Unit , NotUsed ] = {
54+ timestampsSource
55+ .via(SingleAcsSnapshotBulkStorage (config, acsSnapshotStore, s3Connection, loggerFactory))
56+ .map(_ => ())
13557 }
13658}
0 commit comments