Skip to content
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
dbe7254
[ci] test me
isegall-da Jan 14, 2026
79fe68a
[ci] .
isegall-da Jan 14, 2026
e34d096
[ci] .
isegall-da Jan 14, 2026
2c5cccd
[ci] .
isegall-da Jan 14, 2026
b3710e6
Merge remote-tracking branch 'origin/main' into isegall/acs-stream
isegall-da Jan 14, 2026
e93f1d5
[ci]
isegall-da Jan 14, 2026
b8280be
[ci] .
isegall-da Jan 15, 2026
3695a03
[ci] fix integration test
isegall-da Jan 15, 2026
a9b87eb
[ci] .
isegall-da Jan 15, 2026
4f56201
cleanup
isegall-da Jan 15, 2026
e4a5415
[ci] cleanup
isegall-da Jan 15, 2026
9b6302c
cleanup
isegall-da Jan 15, 2026
427efda
cleanup
isegall-da Jan 15, 2026
24af1f1
cleanup
isegall-da Jan 15, 2026
02eb6d3
cleanup
isegall-da Jan 15, 2026
0698fcc
cleanup
isegall-da Jan 15, 2026
b4a5388
cleanup
isegall-da Jan 15, 2026
d5ee22a
Merge remote-tracking branch 'origin/main' into isegall/acs-stream
isegall-da Jan 15, 2026
bb07b6e
[ci]
isegall-da Jan 15, 2026
3c93319
wip
isegall-da Jan 15, 2026
1350df0
Merge remote-tracking branch 'origin/main' into isegall/acs-stream-2
isegall-da Jan 16, 2026
7a9e018
[ci] refactor
isegall-da Jan 16, 2026
684c31c
put it together
isegall-da Jan 16, 2026
aea773d
[ci] woohoo
isegall-da Jan 17, 2026
261821b
pekkoUtil test
isegall-da Jan 20, 2026
0f0f1a6
[ci] cleanup, works
isegall-da Jan 20, 2026
8bce80a
[ci] cleanups and docs
isegall-da Jan 20, 2026
cd4fbec
[ci] test config
isegall-da Jan 20, 2026
3ebd34d
[ci]
isegall-da Jan 21, 2026
5c55ef5
[ci] review comments
isegall-da Jan 21, 2026
664acc7
[ci] fmt
isegall-da Jan 21, 2026
8141879
[ci] review comment
isegall-da Jan 21, 2026
faf2ad4
[ci] Merge remote-tracking branch 'origin/main' into isegall/acs-stre…
isegall-da Jan 21, 2026
987ba60
[ci] give it more time
isegall-da Jan 21, 2026
3cc031b
[ci] fix the test
isegall-da Jan 22, 2026
8421e94
[ci] shutdown
isegall-da Jan 23, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,39 +3,20 @@

package org.lfdecentralizedtrust.splice.scan.store.bulk

import scala.concurrent.ExecutionContext
import com.digitalasset.canton.data.CantonTimestamp
import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging}
import com.digitalasset.canton.tracing.TraceContext
import com.digitalasset.canton.util.{ErrorUtil, PekkoUtil}
import com.digitalasset.canton.util.PekkoUtil.RetrySourcePolicy
import com.digitalasset.canton.util.{ErrorUtil, PekkoUtil}
import org.apache.pekko.{Done, NotUsed}
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.stream.{KillSwitch, KillSwitches, OverflowStrategy}
import org.apache.pekko.stream.scaladsl.{Keep, Sink, Source}
import org.apache.pekko.util.ByteString
import org.lfdecentralizedtrust.splice.scan.admin.http.CompactJsonScanHttpEncodings
import org.apache.pekko.stream.scaladsl.{Keep, Source}
import org.apache.pekko.pattern.after
import org.apache.pekko.stream.{KillSwitch, KillSwitches}
import org.lfdecentralizedtrust.splice.scan.store.AcsSnapshotStore
import org.lfdecentralizedtrust.splice.store.HardLimit

import scala.concurrent.Future
import io.circe.syntax.*

import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.duration.FiniteDuration

import Position.*

object Position {
sealed trait Position

case object Start extends Position

case object End extends Position

final case class Index(value: Long) extends Position
}
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration.*

class AcsSnapshotBulkStorage(
val config: BulkStorageConfig,
Expand All @@ -45,91 +26,95 @@ class AcsSnapshotBulkStorage(
)(implicit actorSystem: ActorSystem, tc: TraceContext, ec: ExecutionContext)
extends NamedLogging {

private def getAcsSnapshotChunk(
migrationId: Long,
timestamp: CantonTimestamp,
after: Option[Long],
): Future[(Position, ByteString)] = {
for {
snapshot <- acsSnapshotStore.queryAcsSnapshot(
migrationId,
snapshot = timestamp,
after,
HardLimit.tryCreate(config.dbReadChunkSize),
Seq.empty,
Seq.empty,
)
} yield {
val encoded = snapshot.createdEventsInPage.map(event =>
CompactJsonScanHttpEncodings().javaToHttpCreatedEvent(event.eventId, event.event)
)
val contractsStr = encoded.map(_.asJson.noSpacesSortKeys).mkString("\n") + "\n"
val contractsBytes = ByteString(contractsStr.getBytes(StandardCharsets.UTF_8))
logger.debug(
s"Read ${encoded.length} contracts from ACS, to a bytestring of size ${contractsBytes.length} bytes"
)
(snapshot.afterToken.fold(End: Position)(Index(_)), contractsBytes)
}

// TODO(#3429): persist progress (or conclude it from the S3 storage), and start from latest successfully dumped snapshot upon restart
private def getStartTimestamp: (Long, CantonTimestamp) = (0, CantonTimestamp.MinValue)

// When new snapshot is not yet available, how long to wait for a new one.
// TODO(#3429): make it longer for prod (so consider making it configurable/overridable for tests)
private val snapshotPollingInterval = 5.seconds

private def getAcsSnapshotTimestampAfter(
startMigrationId: Long,
startTimestamp: CantonTimestamp,
): Source[(Long, CantonTimestamp), NotUsed] = {
Source
.unfoldAsync((startMigrationId, startTimestamp)) {
case (lastMigrationId: Long, lastTimestamp: CantonTimestamp) =>
acsSnapshotStore.lookupSnapshotAfter(lastMigrationId, lastTimestamp).flatMap {
case Some(snapshot) =>
logger.info(
s"next snapshot available, at migration ${snapshot.migrationId}, record time ${snapshot.snapshotRecordTime}"
)
Future.successful(
Some(
(
(snapshot.migrationId, snapshot.snapshotRecordTime),
Some((snapshot.migrationId, snapshot.snapshotRecordTime)),
)
)
)
case None =>
after(snapshotPollingInterval, actorSystem.scheduler) {
logger.debug("No new snapshot available, sleeping...")
Future.successful(Some(((lastMigrationId, lastTimestamp), None)))
}
}
}
.collect { case Some((migrationId, timestamp)) => (migrationId, timestamp) }
}

def dumpAcsSnapshot(migrationId: Long, timestamp: CantonTimestamp): Future[Unit] = {
/** *
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/** *
/**

* This is the main implementation of the pipeline. It is a Pekko Source that gets a `start` timestamp, and starts dumping to S3
* all snapshots (strictly) after `start`. It is an infinite source that should never complete.
*/
private def mksrc(
start: (Long, CantonTimestamp)
): Source[(Long, CantonTimestamp), (KillSwitch, Future[Done])] = start match {
case (startMigrationId, startAfterTimestamp) =>
logger.debug(
s"Starting ACS snapshot dump source, from migration $startMigrationId, timestamp $startAfterTimestamp"
)
val base =
getAcsSnapshotTimestampAfter(startMigrationId, startAfterTimestamp)
.via(SingleAcsSnapshotBulkStorage(config, acsSnapshotStore, s3Connection, loggerFactory))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!


def mksrc = {
val idx = new AtomicInteger(0)
val base = Source
.unfoldAsync(Start: Position) {
case Start => getAcsSnapshotChunk(migrationId, timestamp, None).map(Some(_))
case Index(i) => getAcsSnapshotChunk(migrationId, timestamp, Some(i)).map(Some(_))
case End => Future.successful(None)
}
.via(ZstdGroupedWeight(config.maxFileSize))
// Add a buffer so that the next object continues accumulating while we write the previous one
.buffer(
1,
OverflowStrategy.backpressure,
)
.mapAsync(1) { case ByteStringWithTermination(zstdObj, isLast) =>
val objectKey = if (isLast) s"snapshot_${idx}_last.zstd" else s"snapshot_$idx.zstd"
// TODO(#3429): For now, we accumulate the full object in memory, then write it as a whole.
// Consider streaming it to S3 instead. Need to make sure that it then handles crashes correctly,
// i.e. that until we tell S3 that we're done writing, if we stop, then S3 throws away the
// partially written object.
for {
_ <- s3Connection.writeFullObject(objectKey, ByteBuffer.wrap(zstdObj.toArrayUnsafe()))
} yield {
idx.addAndGet(1)
}
}
val withKs = base.viaMat(KillSwitches.single)(Keep.right)
withKs.watchTermination() { case (ks, done) => (ks: KillSwitch, done) }
}
}

/** wraps mksrc (where the main pipeline logic is implemented) in a retry loop, to retry upon failures.
*/
def getSource
: Source[PekkoUtil.WithKillSwitch[(Long, CantonTimestamp)], (KillSwitch, Future[Done])] = {
// TODO(#3429): once we persist the state, i.e. the last dumped snapshot, consider moving from Canton's PekkoUtil.restartSource
// to Pekko's built-in RestartSource (for now, it's convenient to use Canton's ability to track state via lastEmittedElement)
// TODO(#3429): tweak the retry parameters here
val delay = FiniteDuration(5, "seconds")
val policy = new RetrySourcePolicy[Unit, Int] {
val policy = new RetrySourcePolicy[(Long, CantonTimestamp), (Long, CantonTimestamp)] {
override def shouldRetry(
lastState: Unit,
lastEmittedElement: Option[Int],
lastState: (Long, CantonTimestamp),
lastEmittedElement: Option[(Long, CantonTimestamp)],
lastFailure: Option[Throwable],
): Option[(scala.concurrent.duration.FiniteDuration, Unit)] = {
): Option[(scala.concurrent.duration.FiniteDuration, (Long, CantonTimestamp))] = {
lastFailure.map { t =>
logger.warn(s"Writing ACS snapshot to bulk storage failed with : ${ErrorUtil
.messageWithStacktrace(t)}, will retry after delay of ${delay}")
logger.warn(
s"Writing ACS snapshot to bulk storage failed with : ${ErrorUtil
.messageWithStacktrace(t)}, will retry after delay of $delay from last successful timestamp $lastEmittedElement"
)
// Always retry (TODO(#3429): consider a max number of retries?)
delay -> ()
delay -> lastEmittedElement.fold(lastState)(identity)
}
}
}

PekkoUtil
.restartSource(
name = "acs-snapshot-dump",
initial = (),
mkSource = (_: Unit) => mksrc,
initial = getStartTimestamp,
mkSource = (start: (Long, CantonTimestamp)) => mksrc(start),
policy = policy,
)
.runWith(Sink.ignore)

}.map(_ => ())
}

}
Loading
Loading