Skip to content

Commit 9dd6fb3

Browse files
committed
Switch to new ACS export/import endpoints
[ci] This breaks cross-version compatibility in a few ways: 1. Sponsor SV needs to be on same version as the candidate. 2. For recover from key scan needs to be on the same version as the validator. 3. A HSM export needs to happen on the same version as the import. 1 and 2 don't seem problematic to me. 3 is a bit more problematic. In particular, we need to avoid merging this while we might still want to cut a new 3.x release that we upgrade testnet or mainnet to directly and even afterwards there may be stragglers. We can either tell people they need to upgrade to 0.4.1 first to complete the HDM which sounds fairly reasonable or we opt for the safer option of waiting for 30 days after the HDM is complete because then they would be pruned away anyway. Signed-off-by: Moritz Kiefer <moritz.kiefer@purelyfunctional.org>
1 parent 723bfaf commit 9dd6fb3

File tree

9 files changed

+111
-44
lines changed

9 files changed

+111
-44
lines changed

apps/common/src/main/scala/org/lfdecentralizedtrust/splice/environment/ParticipantAdminConnection.scala

Lines changed: 38 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,12 @@ import com.digitalasset.canton.admin.api.client.data.{
1818
ParticipantStatus,
1919
PruningSchedule,
2020
}
21-
import com.digitalasset.canton.admin.participant.v30.{ExportAcsOldResponse, PruningServiceGrpc}
21+
import com.digitalasset.canton.participant.admin.data.ContractIdImportMode
22+
import com.digitalasset.canton.admin.participant.v30.{
23+
ExportAcsResponse,
24+
ExportAcsAtTimestampResponse,
25+
PruningServiceGrpc,
26+
}
2227
import com.digitalasset.canton.admin.participant.v30.PruningServiceGrpc.PruningServiceStub
2328
import com.digitalasset.canton.config.RequireTypes.PositiveInt
2429
import com.digitalasset.canton.config.{ApiLoggingConfig, ClientConfig, PositiveDurationSeconds}
@@ -285,25 +290,46 @@ class ParticipantAdminConnection(
285290

286291
def downloadAcsSnapshot(
287292
parties: Set[PartyId],
288-
filterSynchronizerId: Option[SynchronizerId] = None,
289-
timestamp: Option[Instant] = None,
290-
force: Boolean = false,
293+
filterSynchronizerId: SynchronizerId,
294+
timestamp: Instant,
291295
)(implicit traceContext: TraceContext): Future[ByteString] = {
292296
logger.debug(
293297
show"Downloading ACS snapshot from domain $filterSynchronizerId, for parties $parties at timestamp $timestamp"
294298
)
295299
val requestComplete = Promise[ByteString]()
296300
// TODO(#3298) just concatenate the byteString here. Make it scale to 2M contracts.
297-
val observer = new GrpcByteChunksToByteArrayObserver[ExportAcsOldResponse](requestComplete)
301+
val observer =
302+
new GrpcByteChunksToByteArrayObserver[ExportAcsAtTimestampResponse](requestComplete)
298303
runCmd(
299-
ParticipantAdminCommands.ParticipantRepairManagement.ExportAcsOld(
304+
ParticipantAdminCommands.PartyManagement.ExportAcsAtTimestamp(
300305
parties = parties,
301-
partiesOffboarding = false,
302306
filterSynchronizerId,
303307
timestamp,
304308
observer,
305-
Map.empty,
306-
force,
309+
)
310+
).discard
311+
requestComplete.future
312+
}
313+
314+
def downloadAcsSnapshotAtOffset(
315+
parties: Set[PartyId],
316+
filterSynchronizerId: SynchronizerId,
317+
offset: Long,
318+
)(implicit traceContext: TraceContext): Future[ByteString] = {
319+
logger.debug(
320+
show"Downloading ACS snapshot from domain $filterSynchronizerId, for parties $parties at offset $offset"
321+
)
322+
val requestComplete = Promise[ByteString]()
323+
// TODO(#3298) just concatenate the byteString here. Make it scale to 2M contracts.
324+
val observer =
325+
new GrpcByteChunksToByteArrayObserver[ExportAcsResponse](requestComplete)
326+
runCmd(
327+
ParticipantAdminCommands.PartyManagement.ExportAcs(
328+
parties = parties,
329+
Some(filterSynchronizerId),
330+
offset,
331+
observer,
332+
contractSynchronizerRenames = Map.empty,
307333
)
308334
).discard
309335
requestComplete.future
@@ -317,10 +343,10 @@ class ParticipantAdminConnection(
317343
"Imports the acs in the participantl",
318344
runCmd(
319345
ParticipantAdminCommands.ParticipantRepairManagement
320-
.ImportAcsOld(
346+
.ImportAcs(
321347
acsBytes,
322-
IMPORT_ACS_WORKFLOW_ID_PREFIX,
323-
allowContractIdSuffixRecomputation = false,
348+
workflowIdPrefix = IMPORT_ACS_WORKFLOW_ID_PREFIX,
349+
contractIdImportMode = ContractIdImportMode.Validation,
324350
)
325351
).map(_ => ()),
326352
logger,

apps/common/src/main/scala/org/lfdecentralizedtrust/splice/migration/AcsExporter.scala

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,16 +39,14 @@ class AcsExporter(
3939
def exportAcsAtTimestamp(
4040
domain: SynchronizerId,
4141
timestamp: Instant,
42-
force: Boolean,
4342
parties: PartyId*
4443
)(implicit
4544
tc: TraceContext
4645
): Future[ByteString] = {
4746
participantAdminConnection.downloadAcsSnapshot(
4847
parties = parties.toSet,
49-
filterSynchronizerId = Some(domain),
50-
timestamp = Some(timestamp),
51-
force = force,
48+
filterSynchronizerId = domain,
49+
timestamp = timestamp,
5250
)
5351
}
5452

@@ -92,9 +90,8 @@ class AcsExporter(
9290
snapshot <- EitherT.liftF[Future, AcsExportFailure, ByteString](
9391
participantAdminConnection.downloadAcsSnapshot(
9492
parties = parties.toSet,
95-
filterSynchronizerId = Some(domain),
96-
timestamp = Some(acsSnapshotTimestamp),
97-
force = true,
93+
filterSynchronizerId = domain,
94+
timestamp = acsSnapshotTimestamp,
9895
)
9996
)
10097
} yield {

apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/ScanApp.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,7 @@ class ScanApp(
247247
config.spliceInstanceNames,
248248
participantAdminConnection,
249249
sequencerAdminConnection,
250-
store,
250+
automation,
251251
acsSnapshotStore,
252252
dsoAnsResolver,
253253
config.miningRoundsCacheTimeToLiveOverride,

apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/admin/http/HttpScanHandler.scala

Lines changed: 66 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ import org.lfdecentralizedtrust.splice.http.v0.definitions.{
4141
import org.lfdecentralizedtrust.splice.http.v0.scan.ScanResource
4242
import org.lfdecentralizedtrust.splice.http.v0.{definitions, scan as v0}
4343
import org.lfdecentralizedtrust.splice.scan.store.{AcsSnapshotStore, ScanStore, TxLogEntry}
44+
import org.lfdecentralizedtrust.splice.store.AppStoreWithIngestion
4445
import org.lfdecentralizedtrust.splice.util.{
4546
Codec,
4647
Contract,
@@ -50,9 +51,10 @@ import org.lfdecentralizedtrust.splice.util.{
5051
}
5152
import org.lfdecentralizedtrust.splice.util.PrettyInstances.*
5253
import com.digitalasset.canton.logging.NamedLoggerFactory
53-
import com.digitalasset.canton.participant.admin.data.ActiveContractOld as ActiveContract
54+
import com.digitalasset.canton.participant.admin.data.ActiveContract
5455
import com.digitalasset.canton.topology.{Member, PartyId, SynchronizerId}
5556
import com.digitalasset.canton.tracing.TraceContext
57+
import com.digitalasset.canton.util.{ByteStringUtil, GrpcStreamingUtils, ResourceUtil}
5658
import com.digitalasset.canton.util.ShowUtil.*
5759
import com.google.protobuf.ByteString
5860
import io.grpc.Status
@@ -62,6 +64,7 @@ import scala.concurrent.{ExecutionContextExecutor, Future}
6264
import scala.jdk.CollectionConverters.*
6365
import scala.jdk.OptionConverters.*
6466
import scala.util.{Try, Using}
67+
import java.io.ByteArrayInputStream
6568
import java.util.Base64
6669
import java.util.zip.GZIPOutputStream
6770
import java.time.{Instant, OffsetDateTime, ZoneOffset}
@@ -93,7 +96,7 @@ class HttpScanHandler(
9396
spliceInstanceNames: SpliceInstanceNamesConfig,
9497
participantAdminConnection: ParticipantAdminConnection,
9598
sequencerAdminConnection: SequencerAdminConnection,
96-
protected val store: ScanStore,
99+
protected val storeWithIngestion: AppStoreWithIngestion[ScanStore],
97100
snapshotStore: AcsSnapshotStore,
98101
dsoAnsResolver: DsoAnsResolver,
99102
miningRoundsCacheTimeToLiveOverride: Option[NonNegativeFiniteDuration],
@@ -110,6 +113,7 @@ class HttpScanHandler(
110113
with HttpValidatorLicensesHandler
111114
with HttpFeatureSupportHandler {
112115

116+
private val store = storeWithIngestion.store
113117
override protected val workflowId: String = this.getClass.getSimpleName
114118
override protected val votesStore: VotesStore = store
115119
override protected val validatorLicensesStore: AppStore = store
@@ -1100,25 +1104,45 @@ class HttpScanHandler(
11001104
/** Filter the given ACS snapshot to contracts the given party is a stakeholder on */
11011105
// TODO(#9340) Move this logic inside a Canton gRPC API.
11021106
private def filterAcsSnapshot(input: ByteString, stakeholder: PartyId): ByteString = {
1103-
val contracts = ActiveContract
1104-
.loadFromByteString(input)
1105-
.valueOr(error =>
1106-
throw Status.INTERNAL
1107-
.withDescription(s"Failed to read ACS snapshot: ${error}")
1108-
.asRuntimeException()
1109-
)
1107+
val decompressedBytes =
1108+
ByteStringUtil
1109+
.decompressGzip(input, None)
1110+
.valueOr(err =>
1111+
throw Status.INVALID_ARGUMENT
1112+
.withDescription(s"Failed to decompress bytes: $err")
1113+
.asRuntimeException
1114+
)
1115+
val contracts = ResourceUtil.withResource(
1116+
new ByteArrayInputStream(decompressedBytes.toByteArray)
1117+
) { inputSource =>
1118+
GrpcStreamingUtils
1119+
.parseDelimitedFromTrusted[ActiveContract](
1120+
inputSource,
1121+
ActiveContract,
1122+
)
1123+
.valueOr(err =>
1124+
throw Status.INVALID_ARGUMENT
1125+
.withDescription(s"Failed to parse contracts in acs snapshot: $err")
1126+
.asRuntimeException
1127+
)
1128+
}
11101129
val output = ByteString.newOutput
11111130
Using.resource(new GZIPOutputStream(output)) { outputStream =>
1112-
contracts.filter(c => c.contract.metadata.stakeholders.contains(stakeholder.toLf)).foreach {
1113-
c =>
1131+
contracts
1132+
.filter(c =>
1133+
c.contract.getCreatedEvent.signatories.contains(
1134+
stakeholder.toLf
1135+
) || c.contract.getCreatedEvent.observers.contains(stakeholder.toLf)
1136+
)
1137+
.foreach { c =>
11141138
c.writeDelimitedTo(outputStream) match {
11151139
case Left(error) =>
11161140
throw Status.INTERNAL
11171141
.withDescription(s"Failed to write ACS snapshot: ${error}")
11181142
.asRuntimeException()
11191143
case Right(_) => outputStream.flush()
11201144
}
1121-
}
1145+
}
11221146
}
11231147
output.toByteString
11241148
}
@@ -1133,6 +1157,20 @@ class HttpScanHandler(
11331157
withSpan(s"$workflowId.getAcsSnapshot") { _ => _ =>
11341158
val partyId = PartyId.tryFromProtoPrimitive(party)
11351159
for {
1160+
synchronizerId <- store
1161+
.lookupAmuletRules()
1162+
.map(
1163+
_.getOrElse(
1164+
throw io.grpc.Status.FAILED_PRECONDITION
1165+
.withDescription("No amulet rules.")
1166+
.asRuntimeException()
1167+
).state.fold(
1168+
identity,
1169+
throw io.grpc.Status.FAILED_PRECONDITION
1170+
.withDescription("Amulet rules are in flight.")
1171+
.asRuntimeException(),
1172+
)
1173+
)
11361174
// The DSO party is a stakeholder on all "important" contracts, in particular, all amulet holdings and ANS entries.
11371175
// This means the SV participants ingest data for that party and we can take a snapshot for that party.
11381176
// To make sure the snapshot is the same regardless of which SV is queried, we filter it down to
@@ -1141,10 +1179,22 @@ class HttpScanHandler(
11411179
// that users backup their own ACS.
11421180
// As the DSO party is hosted on all SVs, an arbitrary scan instance can be chosen for the ACS snapshot.
11431181
// BFT reads are usually not required since ACS commitments act as a check that the ACS was correct.
1144-
acsSnapshot <- participantAdminConnection.downloadAcsSnapshot(
1145-
Set(partyId),
1146-
timestamp = recordTime.map(_.toInstant),
1147-
)
1182+
acsSnapshot <- recordTime match {
1183+
case None =>
1184+
storeWithIngestion.connection.ledgerEnd().flatMap { offset =>
1185+
participantAdminConnection.downloadAcsSnapshotAtOffset(
1186+
Set(partyId),
1187+
offset = offset,
1188+
filterSynchronizerId = synchronizerId,
1189+
)
1190+
}
1191+
case Some(time) =>
1192+
participantAdminConnection.downloadAcsSnapshot(
1193+
Set(partyId),
1194+
timestamp = time.toInstant,
1195+
filterSynchronizerId = synchronizerId,
1196+
)
1197+
}
11481198
} yield {
11491199
val filteredAcsSnapshot =
11501200
filterAcsSnapshot(acsSnapshot, store.key.dsoParty)

apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/admin/http/HttpSvAdminHandler.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -560,7 +560,6 @@ class HttpSvAdminHandler(
560560
.getDomainDataSnapshot(
561561
Instant.parse(timestamp),
562562
partyId.map(Codec.tryDecode(Codec.Party)(_)),
563-
force.getOrElse(false),
564563
)
565564
.map { response =>
566565
val responseHttp = response.toHttp

apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/migration/DomainDataSnapshotGenerator.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ class DomainDataSnapshotGenerator(
4545
def getDomainDataSnapshot(
4646
timestamp: Instant,
4747
partyId: Option[PartyId],
48-
force: Boolean,
4948
)(implicit
5049
ec: ExecutionContext,
5150
tc: TraceContext,
@@ -57,7 +56,6 @@ class DomainDataSnapshotGenerator(
5756
.exportAcsAtTimestamp(
5857
decentralizedSynchronizer,
5958
timestamp,
60-
force,
6159
partyId.fold(Seq(dsoStore.key.dsoParty, dsoStore.key.svParty))(Seq(_))*
6260
)
6361
dars <- darExporter.exportAllDars()

apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/onboarding/sponsor/DsoPartyMigration.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,8 @@ class DsoPartyMigration(
101101
participantAdminConnection
102102
.downloadAcsSnapshot(
103103
Set(dsoParty),
104-
filterSynchronizerId = Some(decentralizedSynchronizer),
105-
timestamp = Some(authorizedAt),
104+
filterSynchronizerId = decentralizedSynchronizer,
105+
timestamp = authorizedAt,
106106
)
107107
.recoverWith { case ex: StatusRuntimeException =>
108108
val errorDetails = ErrorDetails.from(ex: StatusRuntimeException)

apps/validator/src/main/scala/org/lfdecentralizedtrust/splice/validator/admin/http/HttpValidatorAdminHandler.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,6 @@ class HttpValidatorAdminHandler(
176176
synchronizerId,
177177
// TODO(#9731): get migration id from scan instead of configuring here
178178
migrationId getOrElse (config.domainMigrationId + 1),
179-
force.getOrElse(false),
180179
)
181180
.map { response =>
182181
v0.ValidatorAdminResource.GetValidatorDomainDataSnapshotResponse.OK(

apps/validator/src/main/scala/org/lfdecentralizedtrust/splice/validator/migration/DomainMigrationDumpGenerator.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,6 @@ class DomainMigrationDumpGenerator(
7979
timestamp: Instant,
8080
domain: SynchronizerId,
8181
migrationId: Long,
82-
force: Boolean,
8382
)(implicit
8483
ec: ExecutionContext,
8584
tc: TraceContext,
@@ -97,7 +96,6 @@ class DomainMigrationDumpGenerator(
9796
acsSnapshot <- acsExporter.exportAcsAtTimestamp(
9897
domain,
9998
timestamp,
100-
force,
10199
parties*
102100
)
103101
dars <- darExporter.exportAllDars()

0 commit comments

Comments
 (0)