Skip to content

Commit 2e0ef74

Browse files
authored
explicitly label the last s3 object for an acs snapshot (#3438)
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
1 parent cf5a5c8 commit 2e0ef74

File tree

3 files changed

+27
-23
lines changed

3 files changed

+27
-23
lines changed

apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/store/bulk/AcsSnapshotBulkStorage.scala

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -87,10 +87,6 @@ class AcsSnapshotBulkStorage(
8787

8888
def dumpAcsSnapshot(migrationId: Long, timestamp: CantonTimestamp): Future[Unit] = {
8989

90-
// TODO(#3429): currently, if this crashes half-way through, there is no indication in the S3 objects that
91-
// the snapshot is incomplete. We probably want to label the last object with `last` or something like that
92-
// so that we can detect incomplete snapshots and recreate them.
93-
9490
def mksrc = {
9591
val idx = new AtomicInteger(0)
9692
val base = Source
@@ -105,19 +101,17 @@ class AcsSnapshotBulkStorage(
105101
1,
106102
OverflowStrategy.backpressure,
107103
)
108-
.mapAsync(1) { zstdObj =>
109-
{
110-
val objectKey = s"snapshot_$idx.zstd"
111-
// TODO(#3429): For now, we accumulate the full object in memory, then write it as a whole.
112-
// Consider streaming it to S3 instead. Need to make sure that it then handles crashes correctly,
113-
// i.e. that until we tell S3 that we're done writing, if we stop, then S3 throws away the
114-
// partially written object.
115-
// TODO(#3429): Error handling
116-
for {
117-
_ <- s3Connection.writeFullObject(objectKey, ByteBuffer.wrap(zstdObj.toArrayUnsafe()))
118-
} yield {
119-
idx.addAndGet(1)
120-
}
104+
.mapAsync(1) { case ByteStringWithTermination(zstdObj, isLast) =>
105+
val objectKey = if (isLast) s"snapshot_${idx}_last.zstd" else s"snapshot_$idx.zstd"
106+
// TODO(#3429): For now, we accumulate the full object in memory, then write it as a whole.
107+
// Consider streaming it to S3 instead. Need to make sure that it then handles crashes correctly,
108+
// i.e. that until we tell S3 that we're done writing, if we stop, then S3 throws away the
109+
// partially written object.
110+
// TODO(#3429): Error handling
111+
for {
112+
_ <- s3Connection.writeFullObject(objectKey, ByteBuffer.wrap(zstdObj.toArrayUnsafe()))
113+
} yield {
114+
idx.addAndGet(1)
121115
}
122116
}
123117
val withKs = base.viaMat(KillSwitches.single)(Keep.right)

apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/store/bulk/ZstdGroupedWeight.scala

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,17 +11,23 @@ import org.apache.pekko.util.ByteString
1111

1212
import java.util.concurrent.atomic.AtomicReference
1313

14+
case class ByteStringWithTermination(
15+
bytes: ByteString,
16+
isLast: Boolean,
17+
)
18+
1419
/** A Pekko GraphStage that zstd-compresses a stream of bytestrings, and splits the output into zstd objects of size (minWeight + delta).
1520
* Somewhat similar to Pekko's built-in GroupedWeight, but outputs valid zstd compressed objects.
1621
*/
17-
case class ZstdGroupedWeight(minSize: Long) extends GraphStage[FlowShape[ByteString, ByteString]] {
22+
case class ZstdGroupedWeight(minSize: Long)
23+
extends GraphStage[FlowShape[ByteString, ByteStringWithTermination]] {
1824
require(minSize > 0, "minSize must be greater than 0")
1925

2026
val zstdTmpBufferSize = 10 * 1024 * 1024; // TODO(#3429): make configurable?
2127

2228
val in = Inlet[ByteString]("ZstdGroupedWeight.in")
23-
val out = Outlet[ByteString]("ZstdGroupedWeight.out")
24-
override val shape: FlowShape[ByteString, ByteString] = FlowShape(in, out)
29+
val out = Outlet[ByteStringWithTermination]("ZstdGroupedWeight.out")
30+
override val shape: FlowShape[ByteString, ByteStringWithTermination] = FlowShape(in, out)
2531

2632
override def initialAttributes: Attributes = Attributes.name("ZstdGroupedWeight")
2733

@@ -100,7 +106,7 @@ case class ZstdGroupedWeight(minSize: Long) extends GraphStage[FlowShape[ByteStr
100106
state.set(state.get().append(compressed))
101107
if (state.get().left <= 0) {
102108
state.set(state.get().append(zstd.get().zstdFinish()))
103-
push(out, state.get().bytes)
109+
push(out, ByteStringWithTermination(state.get().bytes, false))
104110
reset()
105111
} else {
106112
pull(in)
@@ -112,7 +118,7 @@ case class ZstdGroupedWeight(minSize: Long) extends GraphStage[FlowShape[ByteStr
112118
override def onUpstreamFinish(): Unit = {
113119
if (state.get().bytes.nonEmpty) {
114120
state.set(state.get().append(zstd.get().zstdFinish()))
115-
push(out, state.get().bytes)
121+
push(out, ByteStringWithTermination(state.get().bytes, true))
116122
}
117123
completeStage()
118124
}

apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/scan/store/AcsSnapshotBulkStorageTest.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ class AcsSnapshotBulkStorageTest extends StoreTest with HasExecutionContext with
6363
s3BucketConnection,
6464
loggerFactory,
6565
).dumpAcsSnapshot(0, timestamp)
66+
6667
s3Objects <- s3BucketConnection.s3Client
6768
.listObjects(
6869
ListObjectsRequest.builder().bucket("bucket").build()
@@ -80,13 +81,16 @@ class AcsSnapshotBulkStorageTest extends StoreTest with HasExecutionContext with
8081
)
8182
.map(_.createdEventsInPage)
8283
} yield {
83-
val allContractsFromS3 = s3Objects.contents.asScala
84+
val objectKeys = s3Objects.contents.asScala.sortBy(_.key())
85+
val allContractsFromS3 = objectKeys
8486
.map(readUncompressAndDecode(s3BucketConnection))
8587
.flatten
8688

8789
allContractsFromS3.map(
8890
reconstructFromS3
8991
) should contain theSameElementsInOrderAs allContracts.map(_.event)
92+
objectKeys.take(objectKeys.size - 1).forall { !_.key().endsWith("_last.zstd") }
93+
objectKeys.last.key() should endWith("_last.zstd")
9094
}
9195
})
9296
}

0 commit comments

Comments
 (0)