Skip to content

Commit 7a9e018

Browse files
committed
[ci] refactor
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
1 parent 1350df0 commit 7a9e018

File tree

2 files changed

+192
-7
lines changed

2 files changed

+192
-7
lines changed
Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
// Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package org.lfdecentralizedtrust.splice.scan.store.bulk
5+
6+
import scala.concurrent.ExecutionContext
7+
import com.digitalasset.canton.data.CantonTimestamp
8+
import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging}
9+
import com.digitalasset.canton.tracing.TraceContext
10+
import com.digitalasset.canton.util.{ErrorUtil, PekkoUtil}
11+
import com.digitalasset.canton.util.PekkoUtil.{RetrySourcePolicy, WithKillSwitch}
12+
import org.apache.pekko.actor.ActorSystem
13+
import org.apache.pekko.stream.{KillSwitch, KillSwitches, OverflowStrategy}
14+
import org.apache.pekko.stream.scaladsl.{Flow, Keep, Source}
15+
import org.apache.pekko.util.ByteString
16+
import org.lfdecentralizedtrust.splice.scan.admin.http.CompactJsonScanHttpEncodings
17+
import org.lfdecentralizedtrust.splice.scan.store.AcsSnapshotStore
18+
import org.lfdecentralizedtrust.splice.store.HardLimit
19+
20+
import scala.concurrent.Future
21+
import io.circe.syntax.*
22+
23+
import java.nio.ByteBuffer
24+
import java.nio.charset.StandardCharsets
25+
import java.util.concurrent.atomic.AtomicInteger
26+
import scala.concurrent.duration.FiniteDuration
27+
import Position.*
28+
import org.apache.pekko.{Done, NotUsed}
29+
30+
object Position {
31+
sealed trait Position
32+
33+
case object Start extends Position
34+
35+
case object End extends Position
36+
37+
final case class Index(value: Long) extends Position
38+
}
39+
40+
class SingleAcsSnapshotBulkStorage(
41+
val migrationId: Long,
42+
val timestamp: CantonTimestamp,
43+
val config: BulkStorageConfig,
44+
val acsSnapshotStore: AcsSnapshotStore,
45+
val s3Connection: S3BucketConnection,
46+
override val loggerFactory: NamedLoggerFactory,
47+
)(implicit actorSystem: ActorSystem, tc: TraceContext, ec: ExecutionContext)
48+
extends NamedLogging {
49+
50+
private def getAcsSnapshotChunk(
51+
migrationId: Long,
52+
timestamp: CantonTimestamp,
53+
after: Option[Long],
54+
): Future[(Position, ByteString)] = {
55+
for {
56+
snapshot <- acsSnapshotStore.queryAcsSnapshot(
57+
migrationId,
58+
snapshot = timestamp,
59+
after,
60+
HardLimit.tryCreate(config.dbReadChunkSize),
61+
Seq.empty,
62+
Seq.empty,
63+
)
64+
} yield {
65+
val encoded = snapshot.createdEventsInPage.map(event =>
66+
CompactJsonScanHttpEncodings().javaToHttpCreatedEvent(event.eventId, event.event)
67+
)
68+
val contractsStr = encoded.map(_.asJson.noSpacesSortKeys).mkString("\n") + "\n"
69+
val contractsBytes = ByteString(contractsStr.getBytes(StandardCharsets.UTF_8))
70+
logger.debug(
71+
s"Read ${encoded.length} contracts from ACS, to a bytestring of size ${contractsBytes.length} bytes"
72+
)
73+
(snapshot.afterToken.fold(End: Position)(Index(_)), contractsBytes)
74+
}
75+
76+
}
77+
78+
private def getSource: Source[WithKillSwitch[Int], (KillSwitch, Future[Done])] = {
79+
80+
def mksrc = {
81+
val idx = new AtomicInteger(0)
82+
val base = Source
83+
.unfoldAsync(Start: Position) {
84+
case Start => getAcsSnapshotChunk(migrationId, timestamp, None).map(Some(_))
85+
case Index(i) => getAcsSnapshotChunk(migrationId, timestamp, Some(i)).map(Some(_))
86+
case End => Future.successful(None)
87+
}
88+
.via(ZstdGroupedWeight(config.maxFileSize))
89+
// Add a buffer so that the next object continues accumulating while we write the previous one
90+
.buffer(
91+
1,
92+
OverflowStrategy.backpressure,
93+
)
94+
.mapAsync(1) { case ByteStringWithTermination(zstdObj, isLast) =>
95+
val objectKey = if (isLast) s"snapshot_${idx}_last.zstd" else s"snapshot_$idx.zstd"
96+
// TODO(#3429): For now, we accumulate the full object in memory, then write it as a whole.
97+
// Consider streaming it to S3 instead. Need to make sure that it then handles crashes correctly,
98+
// i.e. that until we tell S3 that we're done writing, if we stop, then S3 throws away the
99+
// partially written object.
100+
for {
101+
_ <- s3Connection.writeFullObject(objectKey, ByteBuffer.wrap(zstdObj.toArrayUnsafe()))
102+
} yield {
103+
idx.addAndGet(1)
104+
}
105+
}
106+
val withKs = base.viaMat(KillSwitches.single)(Keep.right)
107+
withKs.watchTermination() { case (ks, done) => (ks: KillSwitch, done) }
108+
}
109+
110+
// TODO(#3429): tweak the retry parameters here
111+
val delay = FiniteDuration(5, "seconds")
112+
val policy = new RetrySourcePolicy[Unit, Int] {
113+
override def shouldRetry(
114+
lastState: Unit,
115+
lastEmittedElement: Option[Int],
116+
lastFailure: Option[Throwable],
117+
): Option[(scala.concurrent.duration.FiniteDuration, Unit)] = {
118+
lastFailure.map { t =>
119+
logger.warn(s"Writing ACS snapshot to bulk storage failed with : ${ErrorUtil
120+
.messageWithStacktrace(t)}, will retry after delay of ${delay}")
121+
// Always retry (TODO(#3429): consider a max number of retries?)
122+
delay -> ()
123+
}
124+
}
125+
}
126+
127+
PekkoUtil
128+
.restartSource(
129+
name = "acs-snapshot-dump",
130+
initial = (),
131+
mkSource = (_: Unit) => mksrc,
132+
policy = policy,
133+
)
134+
}
135+
}
136+
137+
object SingleAcsSnapshotBulkStorage {
138+
def apply(
139+
config: BulkStorageConfig,
140+
acsSnapshotStore: AcsSnapshotStore,
141+
s3Connection: S3BucketConnection,
142+
loggerFactory: NamedLoggerFactory,
143+
)(implicit
144+
actorSystem: ActorSystem,
145+
tc: TraceContext,
146+
ec: ExecutionContext,
147+
): Flow[(Long, CantonTimestamp), WithKillSwitch[Int], NotUsed] =
148+
Flow[(Long, CantonTimestamp)].flatMapConcat {
149+
case (migrationId: Long, timestamp: CantonTimestamp) =>
150+
SingleAcsSnapshotBulkStorage.asSource(
151+
migrationId,
152+
timestamp,
153+
config,
154+
acsSnapshotStore,
155+
s3Connection,
156+
loggerFactory,
157+
)
158+
}
159+
160+
def asSource(
161+
migrationId: Long,
162+
timestamp: CantonTimestamp,
163+
config: BulkStorageConfig,
164+
acsSnapshotStore: AcsSnapshotStore,
165+
s3Connection: S3BucketConnection,
166+
loggerFactory: NamedLoggerFactory,
167+
)(implicit
168+
actorSystem: ActorSystem,
169+
tc: TraceContext,
170+
ec: ExecutionContext,
171+
): Source[WithKillSwitch[Int], (KillSwitch, Future[Done])] =
172+
new SingleAcsSnapshotBulkStorage(
173+
migrationId,
174+
timestamp,
175+
config,
176+
acsSnapshotStore,
177+
s3Connection,
178+
loggerFactory,
179+
).getSource
180+
181+
}

apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/scan/store/bulk/AcsSnapshotBulkStorageTest.scala

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,18 +39,22 @@ class AcsSnapshotBulkStorageTest
3939
)
4040

4141
"AcsSnapshotBulkStorage" should {
42-
"work" in {
42+
"successfully dump a single ACS snapshot" in {
4343
withS3Mock {
4444
val store = mockAcsSnapshotStore(acsSnapshotSize)
4545
val timestamp = CantonTimestamp.now()
4646
val s3BucketConnection = getS3BucketConnectionWithInjectedErrors(loggerFactory)
4747
for {
48-
_ <- new AcsSnapshotBulkStorage(
49-
bulkStorageTestConfig,
50-
store,
51-
s3BucketConnection,
52-
loggerFactory,
53-
).getSingleAcsSnapshotDumpSource(0, timestamp).runWith(Sink.ignore)
48+
_ <- SingleAcsSnapshotBulkStorage
49+
.asSource(
50+
0,
51+
timestamp,
52+
bulkStorageTestConfig,
53+
store,
54+
s3BucketConnection,
55+
loggerFactory,
56+
)
57+
.runWith(Sink.ignore)
5458

5559
s3Objects <- s3BucketConnection.s3Client
5660
.listObjects(

0 commit comments

Comments
 (0)