Skip to content

Commit 41f712f

Browse files
committed
Revert "Switch to new ACS export/import endpoints (#1021)"
[ci] This reverts commit a9aa5b8. blows up on cilr because it's too slow Signed-off-by: Moritz Kiefer <moritz.kiefer@purelyfunctional.org>
1 parent 63fc1ef commit 41f712f

File tree

11 files changed

+104
-161
lines changed

11 files changed

+104
-161
lines changed

apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/RecoverExternalPartyIntegrationTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ class RecoverExternalPartyIntegrationTest
170170
}
171171
val acsSnapshotFile = Files.createTempFile("acs", ".snapshot")
172172
Files.write(acsSnapshotFile, acsSnapshot.toByteArray())
173-
bobValidatorBackend.participantClient.repair.import_acs(acsSnapshotFile.toString)
173+
bobValidatorBackend.participantClient.repair.import_acs_old(acsSnapshotFile.toString)
174174
bobValidatorBackend.participantClient.synchronizers.reconnect_all()
175175
}
176176

apps/common/src/main/scala/org/lfdecentralizedtrust/splice/environment/ParticipantAdminConnection.scala

Lines changed: 14 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,9 @@ import com.digitalasset.canton.admin.api.client.data.{
1818
ParticipantStatus,
1919
PruningSchedule,
2020
}
21-
import com.digitalasset.canton.participant.admin.data.ContractIdImportMode
22-
import com.digitalasset.canton.admin.participant.v30.{
23-
ExportAcsResponse,
24-
ExportAcsAtTimestampResponse,
25-
PruningServiceGrpc,
26-
}
21+
import com.digitalasset.canton.admin.participant.v30.{ExportAcsOldResponse, PruningServiceGrpc}
2722
import com.digitalasset.canton.admin.participant.v30.PruningServiceGrpc.PruningServiceStub
28-
import com.digitalasset.canton.config.RequireTypes.{NonNegativeLong, PositiveInt}
23+
import com.digitalasset.canton.config.RequireTypes.PositiveInt
2924
import com.digitalasset.canton.config.{ApiLoggingConfig, ClientConfig, PositiveDurationSeconds}
3025
import com.digitalasset.canton.discard.Implicits.DiscardOps
3126
import com.digitalasset.canton.logging.NamedLoggerFactory
@@ -288,79 +283,27 @@ class ParticipantAdminConnection(
288283
)
289284
}
290285

291-
def downloadAcsSnapshotForPartyMigration(
286+
def downloadAcsSnapshot(
292287
parties: Set[PartyId],
293-
filterSynchronizerId: SynchronizerId,
294-
timestamp: Instant,
288+
filterSynchronizerId: Option[SynchronizerId] = None,
289+
timestamp: Option[Instant] = None,
290+
force: Boolean = false,
295291
)(implicit traceContext: TraceContext): Future[ByteString] = {
296292
logger.debug(
297293
show"Downloading ACS snapshot from domain $filterSynchronizerId, for parties $parties at timestamp $timestamp"
298294
)
299295
val requestComplete = Promise[ByteString]()
300296
// TODO(DACH-NY/canton-network-node#3298) just concatenate the byteString here. Make it scale to 2M contracts.
301-
val observer =
302-
new GrpcByteChunksToByteArrayObserver[ExportAcsAtTimestampResponse](requestComplete)
297+
val observer = new GrpcByteChunksToByteArrayObserver[ExportAcsOldResponse](requestComplete)
303298
runCmd(
304-
ParticipantAdminCommands.PartyManagement.ExportAcsAtTimestamp(
299+
ParticipantAdminCommands.ParticipantRepairManagement.ExportAcsOld(
305300
parties = parties,
301+
partiesOffboarding = false,
306302
filterSynchronizerId,
307303
timestamp,
308304
observer,
309-
)
310-
).discard
311-
requestComplete.future
312-
}
313-
314-
def downloadAcsSnapshotForSynchronizerMigration(
315-
parties: Set[PartyId],
316-
synchronizerId: SynchronizerId,
317-
timestamp: Instant,
318-
disasterRecovery: Boolean,
319-
)(implicit traceContext: TraceContext): Future[ByteString] = {
320-
// ExportAcsAtTimestamp only works if the timestamp corresponds to a PartyToParticipant change so this
321-
// is required for synchronizer migrations and disaster recovery. Without the force flag,
322-
// this will fail until we have processed a timestamp >= requested timestamp. On migrations this is guaranteed
323-
// to happen for all nodes due to time proofs. On disaster recovery, we cannot guarantee this so we
324-
// use force=true which will not wait until a timestamp >= requested timestamp has been processed.
325-
getHighestOffsetByTimestamp(synchronizerId, timestamp, force = disasterRecovery).flatMap {
326-
offset =>
327-
downloadAcsSnapshotAtOffset(parties, synchronizerId, offset.unwrap)
328-
}
329-
}
330-
331-
def getHighestOffsetByTimestamp(
332-
synchronizerId: SynchronizerId,
333-
timestamp: Instant,
334-
force: Boolean,
335-
)(implicit tc: TraceContext): Future[NonNegativeLong] = {
336-
runCmd(
337-
ParticipantAdminCommands.PartyManagement
338-
.GetHighestOffsetByTimestamp(synchronizerId, timestamp, force)
339-
).map { offset =>
340-
logger.debug(s"Translated $timestamp on $synchronizerId to $offset with force=$force")
341-
offset
342-
}
343-
}
344-
345-
def downloadAcsSnapshotAtOffset(
346-
parties: Set[PartyId],
347-
filterSynchronizerId: SynchronizerId,
348-
offset: Long,
349-
)(implicit traceContext: TraceContext): Future[ByteString] = {
350-
logger.debug(
351-
show"Downloading ACS snapshot from domain $filterSynchronizerId, for parties $parties at offset $offset"
352-
)
353-
val requestComplete = Promise[ByteString]()
354-
// TODO(#3298) just concatenate the byteString here. Make it scale to 2M contracts.
355-
val observer =
356-
new GrpcByteChunksToByteArrayObserver[ExportAcsResponse](requestComplete)
357-
runCmd(
358-
ParticipantAdminCommands.PartyManagement.ExportAcs(
359-
parties = parties,
360-
Some(filterSynchronizerId),
361-
offset,
362-
observer,
363-
contractSynchronizerRenames = Map.empty,
305+
Map.empty,
306+
force,
364307
)
365308
).discard
366309
requestComplete.future
@@ -374,10 +317,10 @@ class ParticipantAdminConnection(
374317
"Imports the acs in the participantl",
375318
runCmd(
376319
ParticipantAdminCommands.ParticipantRepairManagement
377-
.ImportAcs(
320+
.ImportAcsOld(
378321
acsBytes,
379-
workflowIdPrefix = IMPORT_ACS_WORKFLOW_ID_PREFIX,
380-
contractIdImportMode = ContractIdImportMode.Validation,
322+
IMPORT_ACS_WORKFLOW_ID_PREFIX,
323+
allowContractIdSuffixRecomputation = false,
381324
)
382325
).map(_ => ()),
383326
logger,

apps/common/src/main/scala/org/lfdecentralizedtrust/splice/migration/AcsExporter.scala

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -39,16 +39,16 @@ class AcsExporter(
3939
def exportAcsAtTimestamp(
4040
domain: SynchronizerId,
4141
timestamp: Instant,
42-
disasterRecovery: Boolean,
42+
force: Boolean,
4343
parties: PartyId*
4444
)(implicit
4545
tc: TraceContext
4646
): Future[ByteString] = {
47-
participantAdminConnection.downloadAcsSnapshotForSynchronizerMigration(
47+
participantAdminConnection.downloadAcsSnapshot(
4848
parties = parties.toSet,
49-
synchronizerId = domain,
50-
timestamp = timestamp,
51-
disasterRecovery = disasterRecovery,
49+
filterSynchronizerId = Some(domain),
50+
timestamp = Some(timestamp),
51+
force = force,
5252
)
5353
}
5454

@@ -90,11 +90,11 @@ class AcsExporter(
9090
)
9191
acsSnapshotTimestamp = domainParamsStateTopology.base.validFrom
9292
snapshot <- EitherT.liftF[Future, AcsExportFailure, ByteString](
93-
participantAdminConnection.downloadAcsSnapshotForSynchronizerMigration(
93+
participantAdminConnection.downloadAcsSnapshot(
9494
parties = parties.toSet,
95-
synchronizerId = domain,
96-
timestamp = acsSnapshotTimestamp,
97-
disasterRecovery = false,
95+
filterSynchronizerId = Some(domain),
96+
timestamp = Some(acsSnapshotTimestamp),
97+
force = true,
9898
)
9999
)
100100
} yield {

apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/ScanApp.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,7 @@ class ScanApp(
249249
config.spliceInstanceNames,
250250
participantAdminConnection,
251251
sequencerAdminConnection,
252-
automation,
252+
store,
253253
acsSnapshotStore,
254254
dsoAnsResolver,
255255
config.miningRoundsCacheTimeToLiveOverride,

apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/admin/http/HttpScanHandler.scala

Lines changed: 16 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ import org.lfdecentralizedtrust.splice.http.v0.definitions.{
4444
import org.lfdecentralizedtrust.splice.http.v0.scan.ScanResource
4545
import org.lfdecentralizedtrust.splice.http.v0.{definitions, scan as v0}
4646
import org.lfdecentralizedtrust.splice.scan.store.{AcsSnapshotStore, ScanStore, TxLogEntry}
47-
import org.lfdecentralizedtrust.splice.store.AppStoreWithIngestion
4847
import org.lfdecentralizedtrust.splice.util.{
4948
Codec,
5049
Contract,
@@ -54,10 +53,9 @@ import org.lfdecentralizedtrust.splice.util.{
5453
}
5554
import org.lfdecentralizedtrust.splice.util.PrettyInstances.*
5655
import com.digitalasset.canton.logging.NamedLoggerFactory
57-
import com.digitalasset.canton.participant.admin.data.ActiveContract
56+
import com.digitalasset.canton.participant.admin.data.ActiveContractOld as ActiveContract
5857
import com.digitalasset.canton.topology.{Member, PartyId, SynchronizerId}
5958
import com.digitalasset.canton.tracing.TraceContext
60-
import com.digitalasset.canton.util.{ByteStringUtil, GrpcStreamingUtils, ResourceUtil}
6159
import com.digitalasset.canton.util.ShowUtil.*
6260
import com.google.protobuf.ByteString
6361
import io.grpc.Status
@@ -67,7 +65,6 @@ import scala.concurrent.{ExecutionContextExecutor, Future}
6765
import scala.jdk.CollectionConverters.*
6866
import scala.jdk.OptionConverters.*
6967
import scala.util.{Try, Using}
70-
import java.io.ByteArrayInputStream
7168
import java.util.Base64
7269
import java.util.zip.GZIPOutputStream
7370
import java.time.{Instant, OffsetDateTime, ZoneOffset}
@@ -101,7 +98,7 @@ class HttpScanHandler(
10198
spliceInstanceNames: SpliceInstanceNamesConfig,
10299
participantAdminConnection: ParticipantAdminConnection,
103100
sequencerAdminConnection: SequencerAdminConnection,
104-
protected val storeWithIngestion: AppStoreWithIngestion[ScanStore],
101+
protected val store: ScanStore,
105102
snapshotStore: AcsSnapshotStore,
106103
dsoAnsResolver: DsoAnsResolver,
107104
miningRoundsCacheTimeToLiveOverride: Option[NonNegativeFiniteDuration],
@@ -118,7 +115,6 @@ class HttpScanHandler(
118115
with HttpValidatorLicensesHandler
119116
with HttpFeatureSupportHandler {
120117

121-
private val store = storeWithIngestion.store
122118
override protected val workflowId: String = this.getClass.getSimpleName
123119
override protected val votesStore: VotesStore = store
124120
override protected val validatorLicensesStore: AppStore = store
@@ -1151,45 +1147,25 @@ class HttpScanHandler(
11511147
/** Filter the given ACS snapshot to contracts the given party is a stakeholder on */
11521148
// TODO(#828) Move this logic inside a Canton gRPC API.
11531149
private def filterAcsSnapshot(input: ByteString, stakeholder: PartyId): ByteString = {
1154-
val decompressedBytes =
1155-
ByteStringUtil
1156-
.decompressGzip(input, None)
1157-
.valueOr(err =>
1158-
throw Status.INVALID_ARGUMENT
1159-
.withDescription(s"Failed to decompress bytes: $err")
1160-
.asRuntimeException
1161-
)
1162-
val contracts = ResourceUtil.withResource(
1163-
new ByteArrayInputStream(decompressedBytes.toByteArray)
1164-
) { inputSource =>
1165-
GrpcStreamingUtils
1166-
.parseDelimitedFromTrusted[ActiveContract](
1167-
inputSource,
1168-
ActiveContract,
1169-
)
1170-
.valueOr(err =>
1171-
throw Status.INVALID_ARGUMENT
1172-
.withDescription(s"Failed to parse contracts in acs snapshot: $err")
1173-
.asRuntimeException
1174-
)
1175-
}
1150+
val contracts = ActiveContract
1151+
.loadFromByteString(input)
1152+
.valueOr(error =>
1153+
throw Status.INTERNAL
1154+
.withDescription(s"Failed to read ACS snapshot: ${error}")
1155+
.asRuntimeException()
1156+
)
11761157
val output = ByteString.newOutput
11771158
Using.resource(new GZIPOutputStream(output)) { outputStream =>
1178-
contracts
1179-
.filter(c =>
1180-
c.contract.getCreatedEvent.signatories.contains(
1181-
stakeholder.toLf
1182-
) || c.contract.getCreatedEvent.observers.contains(stakeholder.toLf)
1183-
)
1184-
.foreach { c =>
1159+
contracts.filter(c => c.contract.metadata.stakeholders.contains(stakeholder.toLf)).foreach {
1160+
c =>
11851161
c.writeDelimitedTo(outputStream) match {
11861162
case Left(error) =>
11871163
throw Status.INTERNAL
11881164
.withDescription(s"Failed to write ACS snapshot: ${error}")
11891165
.asRuntimeException()
11901166
case Right(_) => outputStream.flush()
11911167
}
1192-
}
1168+
}
11931169
}
11941170
output.toByteString
11951171
}
@@ -1204,20 +1180,6 @@ class HttpScanHandler(
12041180
withSpan(s"$workflowId.getAcsSnapshot") { _ => _ =>
12051181
val partyId = PartyId.tryFromProtoPrimitive(party)
12061182
for {
1207-
synchronizerId <- store
1208-
.lookupAmuletRules()
1209-
.map(
1210-
_.getOrElse(
1211-
throw io.grpc.Status.FAILED_PRECONDITION
1212-
.withDescription("No amulet rules.")
1213-
.asRuntimeException()
1214-
).state.fold(
1215-
identity,
1216-
throw io.grpc.Status.FAILED_PRECONDITION
1217-
.withDescription("Amulet rules are in flight.")
1218-
.asRuntimeException(),
1219-
)
1220-
)
12211183
// The DSO party is a stakeholder on all "important" contracts, in particular, all amulet holdings and ANS entries.
12221184
// This means the SV participants ingest data for that party and we can take a snapshot for that party.
12231185
// To make sure the snapshot is the same regardless of which SV is queried, we filter it down to
@@ -1226,24 +1188,10 @@ class HttpScanHandler(
12261188
// that users backup their own ACS.
12271189
// As the DSO party is hosted on all SVs, an arbitrary scan instance can be chosen for the ACS snapshot.
12281190
// BFT reads are usually not required since ACS commitments act as a check that the ACS was correct.
1229-
acsSnapshot <- recordTime match {
1230-
case None =>
1231-
storeWithIngestion.connection.ledgerEnd().flatMap { offset =>
1232-
participantAdminConnection.downloadAcsSnapshotAtOffset(
1233-
Set(partyId),
1234-
offset = offset,
1235-
filterSynchronizerId = synchronizerId,
1236-
)
1237-
}
1238-
case Some(time) =>
1239-
// To support more timestamp we use forSynchronizerMigration instead of forPartyMigration
1240-
participantAdminConnection.downloadAcsSnapshotForSynchronizerMigration(
1241-
Set(partyId),
1242-
timestamp = time.toInstant,
1243-
synchronizerId = synchronizerId,
1244-
disasterRecovery = false,
1245-
)
1246-
}
1191+
acsSnapshot <- participantAdminConnection.downloadAcsSnapshot(
1192+
Set(partyId),
1193+
timestamp = recordTime.map(_.toInstant),
1194+
)
12471195
} yield {
12481196
val filteredAcsSnapshot =
12491197
filterAcsSnapshot(acsSnapshot, store.key.dsoParty)

apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/SvApp.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -516,6 +516,7 @@ class SvApp(
516516
localSynchronizerNode,
517517
retryProvider,
518518
new DsoPartyMigration(
519+
svAutomation,
519520
dsoAutomation,
520521
participantAdminConnection,
521522
retryProvider,

apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/admin/http/HttpSvAdminHandler.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -560,7 +560,7 @@ class HttpSvAdminHandler(
560560
.getDomainDataSnapshot(
561561
Instant.parse(timestamp),
562562
partyId.map(Codec.tryDecode(Codec.Party)(_)),
563-
disasterRecovery = true,
563+
force.getOrElse(false),
564564
)
565565
.map { response =>
566566
val responseHttp = response.toHttp

apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/migration/DomainDataSnapshotGenerator.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ class DomainDataSnapshotGenerator(
4545
def getDomainDataSnapshot(
4646
timestamp: Instant,
4747
partyId: Option[PartyId],
48-
disasterRecovery: Boolean,
48+
force: Boolean,
4949
)(implicit
5050
ec: ExecutionContext,
5151
tc: TraceContext,
@@ -57,7 +57,7 @@ class DomainDataSnapshotGenerator(
5757
.exportAcsAtTimestamp(
5858
decentralizedSynchronizer,
5959
timestamp,
60-
disasterRecovery,
60+
force,
6161
partyId.fold(Seq(dsoStore.key.dsoParty, dsoStore.key.svParty))(Seq(_))*
6262
)
6363
dars <- darExporter.exportAllDars()

0 commit comments

Comments
 (0)