Skip to content

Commit 0fe2c8d

Browse files
[main] Make migration export scale to large sizes (#2875)
* [main] Make migration export scale to large sizes forward port from #2863 fixes #2858 --------- Signed-off-by: Moritz Kiefer <moritz.kiefer@purelyfunctional.org> Co-authored-by: Moritz Kiefer <moritz.kiefer@purelyfunctional.org>
1 parent df2a3e2 commit 0fe2c8d

File tree

28 files changed

+316
-108
lines changed

28 files changed

+316
-108
lines changed

apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/DisasterRecoveryIntegrationTest.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -628,7 +628,7 @@ class DisasterRecoveryIntegrationTest
628628

629629
val fullDumpFile = migrationDumpFilePath(validator.name)
630630
clearOrCreate(fullDumpFile)
631-
fullDumpFile.write(dump.asJson.spaces2)
631+
fullDumpFile.write(dump.toHttp(outputDirectory = None).asJson.spaces2)
632632
}
633633

634634
private def writeMigrationDumpFile(
@@ -651,7 +651,7 @@ class DisasterRecoveryIntegrationTest
651651
dump.participantUsers,
652652
createdAt = dump.createdAt,
653653
)
654-
fullDumpFile.write(fullDump.asJson.spaces2)
654+
fullDumpFile.write(fullDump.toHttp(outputDirectory = None).asJson.spaces2)
655655
}
656656

657657
private def clearOrCreate(f: File) = {

apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/SvFrontendIntegrationTest.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1120,9 +1120,14 @@ class SvFrontendIntegrationTest
11201120
def changeAction(actionName: String)(implicit webDriver: WebDriverType) = {
11211121
eventually() { find(id("display-actions")) should not be empty }
11221122
val dropDownAction = new Select(webDriver.findElement(By.id("display-actions")))
1123+
val existingAction: String = dropDownAction.getFirstSelectedOption().getAttribute("value")
11231124
dropDownAction.selectByValue(actionName)
11241125

1125-
if (actionName != "SRARC_OffboardSv") {
1126+
if (actionName != "SRARC_OffboardSv" && existingAction != actionName) {
1127+
logger.debug(
1128+
s"Changed action from $existingAction to $actionName, waiting for confirmation dialog"
1129+
)
1130+
waitForQuery(id("action-change-dialog-proceed"))
11261131
val proceedButton = webDriver.findElement(By.id("action-change-dialog-proceed"))
11271132
proceedButton.click()
11281133
}

apps/common/src/main/scala/org/lfdecentralizedtrust/splice/environment/ParticipantAdminConnection.scala

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import com.digitalasset.canton.admin.participant.v30.PruningServiceGrpc.PruningS
2121
import com.digitalasset.canton.admin.participant.v30.{ExportAcsOldResponse, PruningServiceGrpc}
2222
import com.digitalasset.canton.config.RequireTypes.PositiveInt
2323
import com.digitalasset.canton.config.{ApiLoggingConfig, ClientConfig, PositiveDurationSeconds}
24-
import com.digitalasset.canton.discard.Implicits.DiscardOps
2524
import com.digitalasset.canton.logging.NamedLoggerFactory
2625
import com.digitalasset.canton.participant.synchronizer.SynchronizerConnectionConfig
2726
import com.digitalasset.canton.sequencing.{
@@ -63,7 +62,8 @@ import org.lfdecentralizedtrust.splice.environment.TopologyAdminConnection.{
6362
}
6463

6564
import java.time.Instant
66-
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future, Promise}
65+
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future}
66+
import scala.jdk.CollectionConverters.*
6767

6868
/** Connection to the subset of the Canton admin API that we rely
6969
* on in our own applications.
@@ -312,13 +312,11 @@ class ParticipantAdminConnection(
312312
filterSynchronizerId: Option[SynchronizerId] = None,
313313
timestamp: Option[Instant] = None,
314314
force: Boolean = false,
315-
)(implicit traceContext: TraceContext): Future[ByteString] = {
315+
)(implicit traceContext: TraceContext): Future[Seq[ByteString]] = {
316316
logger.debug(
317317
show"Downloading ACS snapshot from domain $filterSynchronizerId, for parties $parties at timestamp $timestamp"
318318
)
319-
val requestComplete = Promise[ByteString]()
320-
// TODO(DACH-NY/canton-network-node#3298) just concatenate the byteString here. Make it scale to 2M contracts.
321-
val observer = new GrpcByteChunksToByteArrayObserver[ExportAcsOldResponse](requestComplete)
319+
val observer = new SeqAccumulatingObserver[ExportAcsOldResponse]
322320
runCmd(
323321
ParticipantAdminCommands.ParticipantRepairManagement.ExportAcsOld(
324322
parties = parties,
@@ -328,20 +326,36 @@ class ParticipantAdminConnection(
328326
observer,
329327
force,
330328
)
331-
).discard
332-
requestComplete.future
329+
).flatMap(_ => observer.resultFuture).map(_.map(_.chunk))
333330
}
334331

335-
def uploadAcsSnapshot(acsBytes: ByteString)(implicit
332+
def downloadAcsSnapshotNonChunked(
333+
parties: Set[PartyId],
334+
filterSynchronizerId: Option[SynchronizerId] = None,
335+
timestamp: Option[Instant] = None,
336+
force: Boolean = false,
337+
)(implicit traceContext: TraceContext): Future[ByteString] =
338+
downloadAcsSnapshot(parties, filterSynchronizerId, timestamp, force).map(chunks =>
339+
ByteString.copyFrom(chunks.asJava)
340+
)
341+
342+
def uploadAcsSnapshot(acsBytes: Seq[ByteString])(implicit
336343
traceContext: TraceContext
337344
): Future[Unit] = {
345+
val chunkedAcsBytes: Seq[ByteString] = acsBytes match {
346+
case Seq(bytes) =>
347+
// Caller has not chunked the bytes, this is possible for SVs that try to onboard or for validator recovery.
348+
// The chuning logic here matches what GrpcStreamingUtils.streamToServer does
349+
bytes.toByteArray.grouped(1024 * 1024 * 2).map(ByteString.copyFrom(_)).toSeq
350+
case _ => acsBytes
351+
}
338352
retryProvider.retryForClientCalls(
339353
"import_acs",
340354
"Imports the acs in the participantl",
341355
runCmd(
342356
ParticipantAdminCommands.ParticipantRepairManagement
343357
.ImportAcsOld(
344-
acsBytes,
358+
chunkedAcsBytes,
345359
IMPORT_ACS_WORKFLOW_ID_PREFIX,
346360
allowContractIdSuffixRecomputation = false,
347361
),
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
// Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package org.lfdecentralizedtrust.splice.environment
5+
6+
import scala.collection.mutable.ListBuffer
7+
import scala.concurrent.{Future, Promise}
8+
9+
class SeqAccumulatingObserver[T] extends io.grpc.stub.StreamObserver[T] {
10+
private val promise: Promise[List[T]] = Promise[List[T]]()
11+
private val buffer: ListBuffer[T] = ListBuffer.empty[T]
12+
13+
def resultFuture: Future[Seq[T]] = promise.future
14+
15+
override def onNext(value: T): Unit = {
16+
buffer.append(value)
17+
}
18+
19+
override def onError(t: Throwable): Unit = {
20+
promise.failure(t)
21+
}
22+
23+
override def onCompleted(): Unit = {
24+
promise.success(buffer.toList)
25+
}
26+
}

apps/common/src/main/scala/org/lfdecentralizedtrust/splice/environment/SequencerAdminConnection.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -69,16 +69,16 @@ class SequencerAdminConnection(
6969

7070
def getGenesisState(timestamp: CantonTimestamp)(implicit
7171
traceContext: TraceContext
72-
): Future[ByteString] = {
73-
val responseObserver = new ByteStringStreamObserver[GenesisStateV2Response](_.chunk)
72+
): Future[Seq[ByteString]] = {
73+
val responseObserver = new SeqAccumulatingObserver[GenesisStateV2Response]()
7474
runCmd(
7575
TopologyAdminCommands.Read
7676
.GenesisStateV2(
7777
timestamp = Some(timestamp),
7878
synchronizerStore = None,
7979
observer = responseObserver,
8080
)
81-
).flatMap(_ => responseObserver.resultBytes)
81+
).flatMap(_ => responseObserver.resultFuture.map(_.map(_.chunk)))
8282
}
8383

8484
def getOnboardingState(sequencerId: SequencerId)(implicit
@@ -101,7 +101,7 @@ class SequencerAdminConnection(
101101
topologySnapshot.result.foreach(_.writeDelimitedTo(domainParameters.protocolVersion, builder))
102102
runCmd(
103103
SequencerAdminCommands.InitializeFromGenesisStateV2(
104-
builder.toByteString,
104+
Seq(builder.toByteString),
105105
domainParameters,
106106
)
107107
)
@@ -110,7 +110,7 @@ class SequencerAdminConnection(
110110
/** This is used for initializing the sequencer after hard domain migrations.
111111
*/
112112
def initializeFromGenesisState(
113-
genesisState: ByteString,
113+
genesisState: Seq[ByteString],
114114
domainParameters: StaticSynchronizerParameters,
115115
)(implicit traceContext: TraceContext): Future[InitializeSequencerResponse] =
116116
runCmd(

apps/common/src/main/scala/org/lfdecentralizedtrust/splice/migration/AcsExporter.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ class AcsExporter(
3939
parties: PartyId*
4040
)(implicit
4141
tc: TraceContext
42-
): Future[ByteString] = {
42+
): Future[Seq[ByteString]] = {
4343
participantAdminConnection.downloadAcsSnapshot(
4444
parties = parties.toSet,
4545
filterSynchronizerId = Some(domain),
@@ -51,7 +51,7 @@ class AcsExporter(
5151
def safeExportParticipantPartiesAcsFromPausedDomain(domain: SynchronizerId)(implicit
5252
tc: TraceContext,
5353
ec: ExecutionContext,
54-
): EitherT[Future, AcsExportFailure, (ByteString, Instant)] = {
54+
): EitherT[Future, AcsExportFailure, (Seq[ByteString], Instant)] = {
5555
EitherT {
5656
for {
5757
participantId <- participantAdminConnection.getId()
@@ -70,7 +70,7 @@ class AcsExporter(
7070
private def safeExportAcsFromPausedDomain(domain: SynchronizerId, parties: PartyId*)(implicit
7171
tc: TraceContext,
7272
ec: ExecutionContext,
73-
): EitherT[Future, AcsExportFailure, (ByteString, Instant)] = {
73+
): EitherT[Future, AcsExportFailure, (Seq[ByteString], Instant)] = {
7474
for {
7575
paramsState <- domainStateTopology
7676
.firstAuthorizedStateForTheLatestSynchronizerParametersState(domain)
@@ -83,7 +83,7 @@ class AcsExporter(
8383
_ <- EitherT.liftF[Future, AcsExportFailure, Unit](
8484
waitUntilSynchronizerTime(domain, paramsState.acsExportWaitTimestamp)
8585
)
86-
snapshot <- EitherT.liftF[Future, AcsExportFailure, ByteString](
86+
snapshot <- EitherT.liftF[Future, AcsExportFailure, Seq[ByteString]](
8787
participantAdminConnection.downloadAcsSnapshot(
8888
parties = parties.toSet,
8989
filterSynchronizerId = Some(domain),

apps/common/src/main/scala/org/lfdecentralizedtrust/splice/migration/DomainDataRestorer.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ class DomainDataRestorer(
3333
synchronizerId: SynchronizerId,
3434
sequencerConnections: SequencerConnections,
3535
dars: Seq[Dar],
36-
acsSnapshot: ByteString,
36+
acsSnapshot: Seq[ByteString],
3737
)(implicit
3838
tc: TraceContext
3939
): Future[Unit] = {
@@ -93,7 +93,7 @@ class DomainDataRestorer(
9393
}
9494
}
9595

96-
private def importAcs(acs: ByteString)(implicit tc: TraceContext) = {
96+
private def importAcs(acs: Seq[ByteString])(implicit tc: TraceContext) = {
9797
participantAdminConnection.uploadAcsSnapshot(
9898
acs
9999
)
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
// Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package org.lfdecentralizedtrust.splice.migration
5+
6+
import com.google.protobuf.ByteString
7+
import java.io.*
8+
import java.time.Instant
9+
import java.util.Base64
10+
import scala.collection.mutable.ListBuffer
11+
import scala.jdk.CollectionConverters.*
12+
import scala.util.Using
13+
14+
object DomainMigrationEncoding {
15+
private val base64Decoder = Base64.getDecoder()
16+
17+
def encode(
18+
outputDirectory: Option[String],
19+
acsTimestamp: Instant,
20+
name: String,
21+
content: Seq[ByteString],
22+
): String = {
23+
outputDirectory match {
24+
case None =>
25+
Base64.getEncoder.encodeToString(ByteString.copyFrom(content.asJava).toByteArray)
26+
case Some(dir) =>
27+
val file = s"$dir/${acsTimestamp}-$name"
28+
Using.resource(
29+
new DataOutputStream(
30+
new BufferedOutputStream(
31+
new FileOutputStream(file)
32+
)
33+
)
34+
) { dos =>
35+
writeChunks(dos, content)
36+
}
37+
file
38+
}
39+
}
40+
41+
def decode(separateFiles: Option[Boolean], content: String): Seq[ByteString] = {
42+
if (separateFiles.getOrElse(false)) {
43+
Using.resource(
44+
new DataInputStream(
45+
new BufferedInputStream(
46+
new FileInputStream(content)
47+
)
48+
)
49+
)(readAllChunks)
50+
} else {
51+
Seq(ByteString.copyFrom(base64Decoder.decode(content)))
52+
}
53+
}
54+
55+
@SuppressWarnings(Array("org.wartremover.warts.Var", "org.wartremover.warts.While"))
56+
private def readAllChunks(
57+
dis: DataInputStream
58+
): Seq[ByteString] = {
59+
val acc = ListBuffer.empty[ByteString]
60+
var eof = false
61+
while (!eof) {
62+
try {
63+
val length = dis.readInt()
64+
val chunk = new Array[Byte](length)
65+
dis.readFully(chunk)
66+
acc.addOne(ByteString.copyFrom(chunk))
67+
} catch {
68+
case _: EOFException =>
69+
eof = true
70+
}
71+
}
72+
acc.toSeq
73+
}
74+
75+
private def writeChunks(dos: DataOutputStream, chunks: Seq[ByteString]): Unit = {
76+
chunks.foreach { chunk =>
77+
dos.writeInt(chunk.size)
78+
dos.write(chunk.toByteArray)
79+
}
80+
}
81+
}

apps/common/src/main/scala/org/lfdecentralizedtrust/splice/migration/DomainMigrationTrigger.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,11 @@ import java.nio.file.Path
2525
import java.time.Instant
2626
import scala.concurrent.{ExecutionContext, Future}
2727

28-
abstract class DomainMigrationTrigger[T: Codec](implicit
28+
abstract class DomainMigrationTrigger[T](implicit
2929
ec: ExecutionContext,
3030
mat: Materializer,
3131
tracer: Tracer,
32+
codec: Codec[T],
3233
) extends ScheduledTaskTrigger[DomainMigrationTrigger.Task] {
3334
protected val participantAdminConnection: ParticipantAdminConnection
3435
protected val sequencerAdminConnection: Option[SequencerAdminConnection]

apps/common/src/main/scala/org/lfdecentralizedtrust/splice/setup/ParticipantPartyMigrator.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -349,7 +349,7 @@ class ParticipantPartyMigrator(
349349
_ <- MonadUtil.sequentialTraverse(partyIds) { partyId =>
350350
for {
351351
acsSnapshot <- getAcsSnapshot(partyId)
352-
_ <- participantAdminConnection.uploadAcsSnapshot(acsSnapshot)
352+
_ <- participantAdminConnection.uploadAcsSnapshot(Seq(acsSnapshot))
353353
} yield ()
354354
}
355355
_ <- participantAdminConnection.reconnectAllDomains()

0 commit comments

Comments
 (0)