Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -380,32 +380,6 @@ class ParticipantAdminConnection(
ByteString.copyFrom(chunks.asJava)
)

def uploadAcsSnapshotLegacy(acsBytes: Seq[ByteString])(implicit
traceContext: TraceContext
): Future[Unit] = {
val chunkedAcsBytes: Seq[ByteString] = acsBytes match {
case Seq(bytes) =>
// Caller has not chunked the bytes, this is possible for SVs that try to onboard or for validator recovery.
// The chuning logic here matches what GrpcStreamingUtils.streamToServer does
bytes.toByteArray.grouped(1024 * 1024 * 2).map(ByteString.copyFrom(_)).toSeq
case _ => acsBytes
}
retryProvider.retryForClientCalls(
"import_acs",
"Imports the acs in the participantl",
runCmd(
ParticipantAdminCommands.ParticipantRepairManagement
.ImportAcsOld(
chunkedAcsBytes,
IMPORT_ACS_WORKFLOW_ID_PREFIX,
allowContractIdSuffixRecomputation = false,
),
timeoutOverride = Some(GrpcAdminCommand.DefaultUnboundedTimeout),
).map(_ => ()),
logger,
)
}

def uploadAcsSnapshot(acsBytes: Seq[ByteString])(implicit
traceContext: TraceContext
): Future[Unit] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ class DomainDataRestorer(
sequencerConnections: SequencerConnections,
dars: Seq[Dar],
acsSnapshot: Seq[ByteString],
legacyAcsImport: Boolean,
)(implicit
tc: TraceContext
): Future[Unit] = {
Expand Down Expand Up @@ -66,7 +65,9 @@ class DomainDataRestorer(
RetryFor.ClientCalls,
)
_ = logger.info("Importing the ACS")
_ <- importAcs(acsSnapshot, legacyAcsImport)
_ <- participantAdminConnection.uploadAcsSnapshot(
acsSnapshot
)
_ = logger.info("Imported the ACS")
_ <- participantAdminConnection.modifySynchronizerConnectionConfigAndReconnect(
synchronizerAlias,
Expand Down Expand Up @@ -97,20 +98,6 @@ class DomainDataRestorer(
}
}

private def importAcs(acs: Seq[ByteString], legacyAcsImport: Boolean)(implicit
tc: TraceContext
) = {
if (legacyAcsImport) {
participantAdminConnection.uploadAcsSnapshotLegacy(
acs
)
} else {
participantAdminConnection.uploadAcsSnapshot(
acs
)
}
}

private def importDars(dars: Seq[Dar])(implicit tc: TraceContext) = {
val packages = dars
.map { dar =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ final case class DomainDataSnapshot(
dars: Seq[Dar],
// true if we exported for a proper migration, false for DR.
synchronizerWasPaused: Boolean,
acsFormat: http.DomainDataSnapshot.AcsFormat,
) extends PrettyPrinting {
// if output directory is specified we use the new format, otherwise the old one.
// Only the DR endpoint should use the old one.
Expand All @@ -43,7 +42,7 @@ final case class DomainDataSnapshot(
}.toVector,
synchronizerWasPaused = Some(synchronizerWasPaused),
separatePayloadFiles = Some(outputDirectory.isDefined),
acsFormat = Some(acsFormat),
acsFormat = Some(http.DomainDataSnapshot.AcsFormat.LedgerApi),
)
}

Expand Down Expand Up @@ -85,16 +84,20 @@ object DomainDataSnapshot {
Dar(dar.hash, ByteString.copyFrom(decoded))
}
val acsTimestamp = Instant.parse(src.acsTimestamp)
Right(
DomainDataSnapshot(
src.genesisState.map(DomainMigrationEncoding.decode(src.separatePayloadFiles, _)),
DomainMigrationEncoding.decode(src.separatePayloadFiles, src.acsSnapshot),
acsTimestamp,
dars,
src.synchronizerWasPaused.getOrElse(false),
acsFormat = src.acsFormat.getOrElse(http.DomainDataSnapshot.AcsFormat.AdminApi),
)
)
src.acsFormat.getOrElse(http.DomainDataSnapshot.AcsFormat.AdminApi) match {
case http.DomainDataSnapshot.AcsFormat.members.AdminApi =>
Left("Admin API ACS Format is unsupported since splice 0.5")
case http.DomainDataSnapshot.AcsFormat.members.LedgerApi =>
Right(
DomainDataSnapshot(
src.genesisState.map(DomainMigrationEncoding.decode(src.separatePayloadFiles, _)),
DomainMigrationEncoding.decode(src.separatePayloadFiles, src.acsSnapshot),
acsTimestamp,
dars,
src.synchronizerWasPaused.getOrElse(false),
)
)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import org.lfdecentralizedtrust.splice.environment.{
RetryProvider,
SequencerAdminConnection,
}
import org.lfdecentralizedtrust.splice.http.v0.definitions as http
import org.lfdecentralizedtrust.splice.migration.{
AcsExporter,
DarExporter,
Expand Down Expand Up @@ -74,7 +73,6 @@ class DomainDataSnapshotGenerator(
acsTimestamp = timestamp,
dars,
synchronizerWasPaused = false,
acsFormat = http.DomainDataSnapshot.AcsFormat.LedgerApi,
)

private def exportGenesisState(
Expand Down Expand Up @@ -165,7 +163,6 @@ class DomainDataSnapshotGenerator(
acsTimestamp,
dars,
synchronizerWasPaused = true,
acsFormat = http.DomainDataSnapshot.AcsFormat.LedgerApi,
)
logger.info(show"Finished generating $result")
result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,6 @@ class DomainMigrationInitializer(
SequencerConnections.single(localSynchronizerNode.sequencerConnection),
domainMigrationDump.domainDataSnapshot.dars,
domainMigrationDump.domainDataSnapshot.acsSnapshot,
legacyAcsImport =
domainMigrationDump.domainDataSnapshot.acsFormat == http.DomainDataSnapshot.AcsFormat.AdminApi,
)
_ = logger.info("resumed domain")
} yield {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import org.lfdecentralizedtrust.splice.automation.{
import org.lfdecentralizedtrust.splice.config.{NetworkAppClientConfig, SharedSpliceAppParameters}
import org.lfdecentralizedtrust.splice.environment.*
import org.lfdecentralizedtrust.splice.environment.ledger.api.DedupDuration
import org.lfdecentralizedtrust.splice.http.v0.definitions as http
import org.lfdecentralizedtrust.splice.http.v0.status.wallet.WalletResource as StatusWalletResource
import org.lfdecentralizedtrust.splice.http.v0.external.ans.AnsResource
import org.lfdecentralizedtrust.splice.http.v0.external.wallet.WalletResource as ExternalWalletResource
Expand Down Expand Up @@ -271,8 +270,6 @@ class ValidatorApp(
sequencerConnections,
migrationDump.dars,
migrationDump.acsSnapshot,
legacyAcsImport =
migrationDump.acsFormat == http.DomainMigrationDump.AcsFormat.AdminApi,
)
}
_ <- appInitStep("Restoring participant users data") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ final case class DomainMigrationDump(
createdAt: Instant,
// true if we exported for a proper migration, false for DR.
synchronizerWasPaused: Boolean,
acsFormat: http.DomainMigrationDump.AcsFormat,
) extends PrettyPrinting {
override def pretty: Pretty[DomainMigrationDump.this.type] =
Pretty.prettyNode(
Expand Down Expand Up @@ -63,7 +62,7 @@ final case class DomainMigrationDump(
createdAt = createdAt.toString,
synchronizerWasPaused = Some(synchronizerWasPaused),
separatePayloadFiles = Some(outputDirectory.isDefined),
acsFormat = Some(acsFormat),
acsFormat = Some(http.DomainMigrationDump.AcsFormat.LedgerApi),
)
}

Expand Down Expand Up @@ -94,6 +93,12 @@ object DomainMigrationDump {
Dar(dar.hash, ByteString.copyFrom(decoded))
}
createdAt = Instant.parse(response.createdAt)
_ <- response.acsFormat.getOrElse(http.DomainMigrationDump.AcsFormat.AdminApi) match {
case http.DomainMigrationDump.AcsFormat.members.AdminApi =>
Left("Admin API ACS Format is unsupported since splice 0.5")
case http.DomainMigrationDump.AcsFormat.members.LedgerApi =>
Right(())
}
} yield DomainMigrationDump(
domainId = domainId,
migrationId = migrationId,
Expand All @@ -104,6 +109,5 @@ object DomainMigrationDump {
dars = dars,
createdAt = createdAt,
synchronizerWasPaused = response.synchronizerWasPaused.getOrElse(false),
acsFormat = response.acsFormat.getOrElse(http.DomainMigrationDump.AcsFormat.AdminApi),
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import org.lfdecentralizedtrust.splice.environment.{
RetryProvider,
SpliceLedgerConnection,
}
import org.lfdecentralizedtrust.splice.http.v0.definitions as http
import org.lfdecentralizedtrust.splice.identities.NodeIdentitiesStore
import org.lfdecentralizedtrust.splice.migration.AcsExporter.AcsExportForParties.AllParticipantParties
import org.lfdecentralizedtrust.splice.migration.{
Expand Down Expand Up @@ -76,7 +75,6 @@ class DomainMigrationDumpGenerator(
dars = dars,
createdAt = createdAt,
synchronizerWasPaused = true,
acsFormat = http.DomainMigrationDump.AcsFormat.LedgerApi,
)
logger.info(
show"Finished generating $result"
Expand Down Expand Up @@ -116,7 +114,6 @@ class DomainMigrationDumpGenerator(
dars = dars,
createdAt = Instant.now(),
synchronizerWasPaused = false,
acsFormat = http.DomainMigrationDump.AcsFormat.LedgerApi,
)
}
}
Expand Down
Loading