Skip to content

Conversation

@isegall-da
Copy link
Contributor

@isegall-da isegall-da commented Jan 15, 2026

Currently dumps all snapshots. Computing which snapshots to actually dump, and which to skip, is tracked as a followup step in the tracking issue.

Fixes #3617

Pull Request Checklist

Cluster Testing

  • If a cluster test is required, comment /cluster_test on this PR to request it, and ping someone with access to the DA-internal system to approve it.
  • If a hard-migration test is required (from the latest release), comment /hdm_test on this PR to request it, and ping someone with access to the DA-internal system to approve it.

PR Guidelines

  • Include any change that might be observable by our partners or affect their deployment in the release notes.
  • Specify fixed issues with Fixes #n, and mention issues worked on using #n
  • Include a screenshot for frontend-related PRs - see README or use your favorite screenshot tool

Merge Guidelines

  • Make the git commit message look sensible when squash-merging on GitHub (most likely: just copy your PR description).

Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Base automatically changed from isegall/acs-stream to main January 16, 2026 15:57
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
@isegall-da isegall-da changed the title wip: stream ACS snapshots stream ACS snapshot dumps to S3 Jan 20, 2026
@isegall-da isegall-da marked this pull request as ready for review January 20, 2026 17:49
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
@isegall-da
Copy link
Contributor Author

ping @ray-roestenburg-da @rautenrieth-da can I have a review pls?

@isegall-da
Copy link
Contributor Author

ping @ray-roestenburg-da @rautenrieth-da can I have a review pls?

Ah, sorry, misremembered when I opened this, thought it was much earlier.....

s3BucketConnection,
loggerFactory,
).getSource
.runWith(TestSink.probe[WithKillSwitch[(Long, CantonTimestamp)]])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

}

def dumpAcsSnapshot(migrationId: Long, timestamp: CantonTimestamp): Future[Unit] = {
/** *
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/** *
/**

)
val base =
getAcsSnapshotTimestampAfter(startMigrationId, startAfterTimestamp)
.via(SingleAcsSnapshotBulkStorage(config, acsSnapshotStore, s3Connection, loggerFactory))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!

): Flow[(Long, CantonTimestamp), (Long, CantonTimestamp), NotUsed] =
Flow[(Long, CantonTimestamp)].flatMapConcat {
case (migrationId: Long, timestamp: CantonTimestamp) =>
new SingleAcsSnapshotBulkStorage(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style nitpick, you could do SingleAcsSnapshotBulkStorage.getSource and just pass the args to that method, you don't really need to create a new instance of SingleAcsSnapshotBulkStorage and then call it's one method

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it has other private methods that access these arguments, so moving them to be arguments of getSource would require passing these around to other private methods as well. Not sure what you gain here...

Copy link
Contributor

@ray-roestenburg-da ray-roestenburg-da Jan 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I think that's more straightforward, and slightly more functional style, just methods calling methods and if needed passing along arguments. From perspective of the user of the function, the user just calls the function with the args, the user does not need to create an instance just to call one method, so from usage perspective it's simpler, just call the function on object SingleAcsSnapshotBulkStorage

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm... still not sure I'm convinced :)
This is hidden in the object's apply, so the user is unaware of the fact that an object is created behind the scenes. The user indeed just calls a method on the object, so that's the UX you propose. Isn't it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes I'm proposing Object.method, no apply, just a method on an object, it's very easy to find. It's not super important 😄

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ACK, no strong feelings here. The only difference is that a user needs to type also .method after Object when using this, but if that seems more intuitive for you, I'm fine with that.

acsSnapshotStore,
s3Connection,
loggerFactory,
).getSource
Copy link
Contributor

@ray-roestenburg-da ray-roestenburg-da Jan 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You get the source (getSource) which return Source[String, ..], then you fold over it and just emit migration timestamp, this is not complete right? (maybe make a note of that) or am I missing something?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I missed the TODOs in getSource. I think it would make sense for the write to s3 to be a flow that emits (migrationid, timestamp) that was written, but you're probably planning this for later?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean to move the two lines below into the source itself?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Guess that makes sense, will do

val s3BucketConnection = getS3BucketConnectionWithInjectedErrors(loggerFactory)
for {
_ <- SingleAcsSnapshotBulkStorage
.asSource(
Copy link
Contributor

@ray-roestenburg-da ray-roestenburg-da Jan 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at this test, maybe it's nice to separate the code into a source that gives you acs snapshots (from the store) and a flow to write the s3 stuff, and then connect them in the code. (you could also unit test just the flow for instance without having an actual source that gives you real acs snapshots from the db)

Copy link
Contributor Author

@isegall-da isegall-da Jan 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As things are right now, wrapping the write to s3 as a flow is 2 lines of code, so I don't see great value in adding a class for that. Having said that, there's this TODO where I do plan on doing basically that:

// TODO(#3429): For now, we accumulate the full object in memory, then write it as a whole.
// Consider streaming it to S3 instead. Need to make sure that it then handles crashes correctly,
// i.e. that until we tell S3 that we're done writing, if we stop, then S3 throws away the
// partially written object.

Copy link
Contributor

@ray-roestenburg-da ray-roestenburg-da Jan 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can add a couple of methods on an object (don't need a class) to create the separate sources and flows and then use those somewhere compose them source.via(flow). Not a huge deal.
I'm used to doing something like:

object Bulk {
  def acsFromScan(...): Source[.., ..]
  def acsToS3(...): Flow[..,..,..] 
}

And somewhere you compose the whole stream acsFromScan().via(acsToS3)...
then you can test both separately, and use where you like. just FYI. in a test you can do Source(Vector(acs1,...)).via(acsToS3) .. etc

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ACK, will do as part of that TODO

Copy link
Contributor

@ray-roestenburg-da ray-roestenburg-da left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! definitely going in the right direction, made some comments

Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
…am-2

Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
@isegall-da isegall-da enabled auto-merge (squash) January 23, 2026 21:22
@isegall-da isegall-da merged commit e7a9f36 into main Jan 23, 2026
62 checks passed
@isegall-da isegall-da deleted the isegall/acs-stream-2 branch January 23, 2026 21:26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

support dumping all ACS snapshots to S3

3 participants