diff --git a/apps/app/src/main/scala/org/lfdecentralizedtrust/splice/config/SpliceConfig.scala b/apps/app/src/main/scala/org/lfdecentralizedtrust/splice/config/SpliceConfig.scala index fd9535709b..c496b781b9 100644 --- a/apps/app/src/main/scala/org/lfdecentralizedtrust/splice/config/SpliceConfig.scala +++ b/apps/app/src/main/scala/org/lfdecentralizedtrust/splice/config/SpliceConfig.scala @@ -13,10 +13,10 @@ import org.lfdecentralizedtrust.splice.http.UrlValidator import org.lfdecentralizedtrust.splice.scan.admin.api.client.BftScanConnection.BftScanClientConfig import org.lfdecentralizedtrust.splice.scan.config.{ BftSequencerConfig, + MediatorVerdictIngestionConfig, ScanAppBackendConfig, ScanAppClientConfig, ScanCacheConfig, - MediatorVerdictIngestionConfig, ScanSynchronizerConfig, CacheConfig as SpliceCacheConfig, } @@ -82,6 +82,7 @@ import com.digitalasset.canton.synchronizer.sequencer.config.{ } import com.digitalasset.canton.topology.PartyId import com.digitalasset.daml.lf.data.Ref.PackageVersion +import org.lfdecentralizedtrust.splice.store.ChoiceContextContractFetcher case class SpliceConfig( override val name: Option[String] = None, @@ -412,6 +413,9 @@ object SpliceConfig { deriveReader[CircuitBreakerConfig] implicit val circuitBreakersConfig: ConfigReader[CircuitBreakersConfig] = deriveReader[CircuitBreakersConfig] + implicit val contractFetchLedgerFallbackConfigReader + : ConfigReader[ChoiceContextContractFetcher.StoreContractFetcherWithLedgerFallbackConfig] = + deriveReader[ChoiceContextContractFetcher.StoreContractFetcherWithLedgerFallbackConfig] implicit val spliceParametersConfig: ConfigReader[SpliceParametersConfig] = deriveReader[SpliceParametersConfig] implicit val rateLimitersConfig: ConfigReader[RateLimitersConfig] = @@ -838,6 +842,9 @@ object SpliceConfig { deriveWriter[CircuitBreakerConfig] implicit val circuitBreakersConfig: ConfigWriter[CircuitBreakersConfig] = deriveWriter[CircuitBreakersConfig] + implicit val contractFetchLedgerFallbackConfigWriter + : ConfigWriter[ChoiceContextContractFetcher.StoreContractFetcherWithLedgerFallbackConfig] = + deriveWriter[ChoiceContextContractFetcher.StoreContractFetcherWithLedgerFallbackConfig] implicit val spliceParametersConfig: ConfigWriter[SpliceParametersConfig] = deriveWriter[SpliceParametersConfig] diff --git a/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/TokenStandardFetchFallbackIntegrationTest.scala b/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/TokenStandardFetchFallbackIntegrationTest.scala new file mode 100644 index 0000000000..aaf8f337ce --- /dev/null +++ b/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/TokenStandardFetchFallbackIntegrationTest.scala @@ -0,0 +1,139 @@ +package org.lfdecentralizedtrust.splice.integration.tests + +import com.digitalasset.canton.data.CantonTimestamp +import com.digitalasset.canton.{HasActorSystem, HasExecutionContext} +import org.lfdecentralizedtrust.splice.codegen.java.splice.api.token.allocationv1.* +import org.lfdecentralizedtrust.splice.codegen.java.splice.api.token.holdingv1.InstrumentId +import org.lfdecentralizedtrust.splice.codegen.java.splice.api.token.transferinstructionv1.TransferInstruction +import org.lfdecentralizedtrust.splice.http.v0.definitions.TransferInstructionResultOutput.members +import org.lfdecentralizedtrust.splice.integration.EnvironmentDefinition +import org.lfdecentralizedtrust.splice.integration.tests.SpliceTests.IntegrationTestWithSharedEnvironment +import org.lfdecentralizedtrust.splice.util.{ + ChoiceContextWithDisclosures, + TriggerTestUtil, + WalletTestUtil, +} + +import java.time.Instant +import java.util.{Optional, UUID} + +class TokenStandardFetchFallbackIntegrationTest + extends IntegrationTestWithSharedEnvironment + with WalletTestUtil + with WalletTxLogTestUtil + with TriggerTestUtil + with HasActorSystem + with HasExecutionContext { + + override def environmentDefinition: EnvironmentDefinition = { + EnvironmentDefinition.simpleTopology1Sv(this.getClass.getSimpleName) + } + + "Token Standard" should { + + "TransferInstruction context can be fetched from Scan even if it's not yet ingested into the store" in { + implicit env => + pauseScanIngestionWithin(sv1ScanBackend) { + onboardWalletUser(aliceWalletClient, aliceValidatorBackend) + val bobUserParty = onboardWalletUser(bobWalletClient, bobValidatorBackend) + aliceWalletClient.tap(100) + + val response = actAndCheck( + "Alice creates transfer offer", + aliceWalletClient.createTokenStandardTransfer( + bobUserParty, + 10, + s"Transfer", + CantonTimestamp.now().plusSeconds(3600L), + UUID.randomUUID().toString, + ), + )( + "Alice and Bob see it", + _ => { + Seq(aliceWalletClient, bobWalletClient).foreach( + _.listTokenStandardTransfers() should have size 1 + ) + }, + )._1 + + val cid = response.output match { + case members.TransferInstructionPending(value) => + new TransferInstruction.ContractId(value.transferInstructionCid) + case _ => fail("The transfers were expected to be pending.") + } + + clue("SV-1's Scan sees it (still, even though ingestion is paused)") { + eventuallySucceeds() { + sv1ScanBackend.getTransferInstructionAcceptContext(cid) + } + } + } + } + + "AmuletAllocations context can be fetched from Scan even if it's not yet ingested into the store" in { + implicit env => + pauseScanIngestionWithin(sv1ScanBackend) { + val aliceParty = onboardWalletUser(aliceWalletClient, aliceValidatorBackend) + val bobParty = onboardWalletUser(bobWalletClient, bobValidatorBackend) +// // Allocate venue on separate participant node, we still go through the validator API instead of parties.enable +// // so we can use the standard wallet client APIs but give the party a more useful name than splitwell. +// val venuePartyHint = s"venue-party-${Random.nextInt()}" +// val venueParty = splitwellValidatorBackend.onboardUser( +// splitwellWalletClient.config.ledgerApiUser, +// Some( +// PartyId.tryFromProtoPrimitive( +// s"$venuePartyHint::${splitwellValidatorBackend.participantClient.id.namespace.toProtoPrimitive}" +// ) +// ), +// ) + + aliceWalletClient.tap(100) + val referenceId = UUID.randomUUID().toString + + val (_, allocation) = actAndCheck( + "Alice creates an Allocation", + aliceWalletClient.allocateAmulet( + new AllocationSpecification( + new SettlementInfo( + dsoParty.toProtoPrimitive, + new Reference(referenceId, Optional.empty), + Instant.now, + Instant.now.plusSeconds(3600L), + Instant.now.plusSeconds(2 * 3600L), + ChoiceContextWithDisclosures.emptyMetadata, + ), + UUID.randomUUID().toString, + new TransferLeg( + aliceParty.toProtoPrimitive, + bobParty.toProtoPrimitive, + BigDecimal(10).bigDecimal, + new InstrumentId(dsoParty.toProtoPrimitive, "Amulet"), + ChoiceContextWithDisclosures.emptyMetadata, + ), + ) + ), + )( + "Alice sees the Allocation", + _ => { + val allocation = + aliceWalletClient + .listAmuletAllocations() + .loneElement + allocation.payload.allocation.settlement.settlementRef.id should be(referenceId) + allocation + }, + ) + + clue("SV-1's Scan sees it (still, even though ingestion is paused)") { + eventuallySucceeds() { + sv1ScanBackend.getAllocationTransferContext( + allocation.contractId.toInterface(Allocation.INTERFACE) + ) + } + } + } + } + + } + +} diff --git a/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/TokenStandardTransferIntegrationTest.scala b/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/TokenStandardTransferIntegrationTest.scala index a4984b4539..a90c0b5f42 100644 --- a/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/TokenStandardTransferIntegrationTest.scala +++ b/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/TokenStandardTransferIntegrationTest.scala @@ -8,11 +8,12 @@ import org.lfdecentralizedtrust.splice.codegen.java.splice.api.token.metadatav1 import org.lfdecentralizedtrust.splice.codegen.java.splice.api.token.transferinstructionv1.TransferInstruction import org.lfdecentralizedtrust.splice.config.ConfigTransforms.{ ConfigurableApp, + updateAllScanAppConfigs_, updateAllSvAppFoundDsoConfigs_, updateAutomationConfig, } -import org.lfdecentralizedtrust.splice.http.v0.definitions.TransferInstructionResultOutput.members import org.lfdecentralizedtrust.splice.http.v0.definitions.TransactionHistoryResponseItem.TransactionType as HttpTransactionType +import org.lfdecentralizedtrust.splice.http.v0.definitions.TransferInstructionResultOutput.members import org.lfdecentralizedtrust.splice.http.v0.definitions.{ AbortTransferInstruction, ReceiverAmount, @@ -20,7 +21,7 @@ import org.lfdecentralizedtrust.splice.http.v0.definitions.{ } import org.lfdecentralizedtrust.splice.integration.EnvironmentDefinition import org.lfdecentralizedtrust.splice.integration.tests.SpliceTests.IntegrationTestWithSharedEnvironment -import org.lfdecentralizedtrust.splice.integration.tests.WalletTxLogTestUtil +import org.lfdecentralizedtrust.splice.store.ChoiceContextContractFetcher import org.lfdecentralizedtrust.splice.util.{DisclosedContracts, WalletTestUtil} import org.lfdecentralizedtrust.splice.wallet.automation.CollectRewardsAndMergeAmuletsTrigger import org.lfdecentralizedtrust.splice.wallet.store.{ @@ -56,6 +57,17 @@ class TokenStandardTransferIntegrationTest _.copy(zeroTransferFees = true) )(config) ) + .addConfigTransforms((_, config) => + updateAllScanAppConfigs_(scanConfig => + scanConfig.copy(parameters = + scanConfig.parameters.copy(contractFetchLedgerFallbackConfig = + ChoiceContextContractFetcher.StoreContractFetcherWithLedgerFallbackConfig( + enabled = false // expiry test doesn't see the archival otherwise + ) + ) + ) + )(config) + ) } "Token Standard Transfers should" should { diff --git a/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/util/TriggerTestUtil.scala b/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/util/TriggerTestUtil.scala index 825dbdb508..2b9b520650 100644 --- a/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/util/TriggerTestUtil.scala +++ b/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/util/TriggerTestUtil.scala @@ -2,7 +2,8 @@ package org.lfdecentralizedtrust.splice.util import com.digitalasset.canton.{BaseTest, ScalaFuturesWithPatience} import com.typesafe.scalalogging.LazyLogging -import org.lfdecentralizedtrust.splice.automation.Trigger +import org.lfdecentralizedtrust.splice.automation.{Trigger, UpdateIngestionService} +import org.lfdecentralizedtrust.splice.console.ScanAppBackendReference import org.lfdecentralizedtrust.splice.integration.EnvironmentDefinition.sv1Backend import org.lfdecentralizedtrust.splice.integration.tests.SpliceTests.SpliceTestConsoleEnvironment import org.lfdecentralizedtrust.splice.sv.automation.delegatebased.AdvanceOpenMiningRoundTrigger @@ -34,6 +35,17 @@ trait TriggerTestUtil { self: BaseTest => advanceOpenMiningRoundTrigger.runOnce().futureValue should be(true) } } + + def pauseScanIngestionWithin[T](scan: ScanAppBackendReference)(codeBlock: => T): T = { + try { + logger.info(s"Pausing ingestion for ${scan.name}") + scan.automation.services[UpdateIngestionService].foreach(_.pause().futureValue) + codeBlock + } finally { + logger.info(s"Resuming ingestion for ${scan.name}") + scan.automation.services[UpdateIngestionService].foreach(_.resume()) + } + } } object TriggerTestUtil extends ScalaFuturesWithPatience with LazyLogging { diff --git a/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/automation/AutomationService.scala b/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/automation/AutomationService.scala index 7cece6d441..7e8740a8a1 100644 --- a/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/automation/AutomationService.scala +++ b/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/automation/AutomationService.scala @@ -83,11 +83,13 @@ abstract class AutomationService( } /** Returns all triggers of the given class */ - final def triggers[T <: Trigger](implicit tag: ClassTag[T]): Seq[T] = { + final def triggers[T <: Trigger](implicit tag: ClassTag[T]): Seq[T] = services[T](tag) + + final def services[T](implicit tag: ClassTag[T]): Seq[T] = { backgroundServices .get() - .collect { case trigger: T => - trigger + .collect { case service: T => + service } } diff --git a/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/automation/UpdateIngestionService.scala b/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/automation/UpdateIngestionService.scala index 2e07081315..f165a390c3 100644 --- a/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/automation/UpdateIngestionService.scala +++ b/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/automation/UpdateIngestionService.scala @@ -16,10 +16,11 @@ import org.lfdecentralizedtrust.splice.store.MultiDomainAcsStore import com.digitalasset.canton.logging.NamedLoggerFactory import com.digitalasset.canton.time.Clock import com.digitalasset.canton.tracing.TraceContext +import com.google.common.annotations.VisibleForTesting import io.opentelemetry.api.trace.Tracer import org.lfdecentralizedtrust.splice.store.MultiDomainAcsStore.IngestionSink.IngestionStart -import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.{ExecutionContext, Future, Promise, blocking} /** Ingestion for ACS and transfer stores. * We ingest them independently but we ensure that the acs store @@ -97,13 +98,18 @@ class UpdateIngestionService( private def process( msgs: Seq[GetTreeUpdatesResponse] )(implicit traceContext: TraceContext): Future[Unit] = { - NonEmptyList.fromFoldable(msgs) match { - case Some(batch) => - logger.debug(s"Processing batch of ${batch.size} elements") - ingestionSink.ingestUpdateBatch(batch.map(_.updateOrCheckpoint)) - case None => - logger.error("Received empty batch of updates to ingest. This is never supposed to happen.") - Future.unit + // if paused, this step will backpressure the source + waitForResumePromise.future.flatMap { _ => + NonEmptyList.fromFoldable(msgs) match { + case Some(batch) => + logger.debug(s"Processing batch of ${batch.size} elements") + ingestionSink.ingestUpdateBatch(batch.map(_.updateOrCheckpoint)) + case None => + logger.error( + "Received empty batch of updates to ingest. This is never supposed to happen." + ) + Future.unit + } } } @@ -124,4 +130,37 @@ class UpdateIngestionService( // Kick-off the ingestion startIngestion() + + @SuppressWarnings(Array("org.wartremover.warts.Var")) + @volatile + private var waitForResumePromise = Promise.successful(()) + + /** Note that any in-flight events being processed when `pause` is called will still be processed. + */ + @VisibleForTesting + def pause(): Future[Unit] = blocking { + withNewTrace(this.getClass.getSimpleName) { implicit traceContext => _ => + logger.info("Pausing UpdateIngestionService.") + blocking { + synchronized { + if (waitForResumePromise.isCompleted) { + waitForResumePromise = Promise() + } + Future.successful(()) + } + } + } + } + + @VisibleForTesting + def resume(): Unit = blocking { + withNewTrace(this.getClass.getSimpleName) { implicit traceContext => _ => + logger.info("Resuming UpdateIngestionService.") + blocking { + synchronized { + val _ = waitForResumePromise.trySuccess(()) + } + } + } + } } diff --git a/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/config/SpliceParametersConfig.scala b/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/config/SpliceParametersConfig.scala index 665ee9d510..31cb77f2af 100644 --- a/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/config/SpliceParametersConfig.scala +++ b/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/config/SpliceParametersConfig.scala @@ -10,11 +10,14 @@ import com.digitalasset.canton.config.{ NonNegativeFiniteDuration, WatchdogConfig, } +import org.lfdecentralizedtrust.splice.store.ChoiceContextContractFetcher import org.lfdecentralizedtrust.splice.util.SpliceRateLimitConfig final case class SpliceParametersConfig( batching: BatchingConfig = BatchingConfig(), caching: CachingConfigs = CachingConfigs(), + contractFetchLedgerFallbackConfig: ChoiceContextContractFetcher.StoreContractFetcherWithLedgerFallbackConfig = + ChoiceContextContractFetcher.StoreContractFetcherWithLedgerFallbackConfig(), // Do not define any defaults on the class containing the `SpliceParametersConfig` as they'll be overwritten. // Do it instead on the app.conf file in `cluster/images/${the_app}/app.conf` customTimeouts: Map[String, NonNegativeFiniteDuration] = Map.empty, diff --git a/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/environment/SpliceLedgerConnection.scala b/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/environment/SpliceLedgerConnection.scala index bc85cd81d1..d8f6159508 100644 --- a/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/environment/SpliceLedgerConnection.scala +++ b/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/environment/SpliceLedgerConnection.scala @@ -6,8 +6,9 @@ package org.lfdecentralizedtrust.splice.environment import com.daml.ledger.api.v2 as lapi import com.daml.ledger.api.v2.admin.identity_provider_config_service.IdentityProviderConfig import com.daml.ledger.api.v2.admin.{ObjectMetaOuterClass, UserManagementServiceOuterClass} +import com.daml.ledger.api.v2.event import com.daml.ledger.api.v2.package_reference.PackageReference -import com.daml.ledger.javaapi.data.codegen.{Created, Exercised, HasCommands, Update} +import com.daml.ledger.javaapi.data.codegen.{ContractId, Created, Exercised, HasCommands, Update} import com.daml.ledger.javaapi.data.{Command, CreatedEvent, ExercisedEvent, Transaction, User} import com.digitalasset.base.error.ErrorResource import com.digitalasset.base.error.utils.ErrorDetails @@ -149,6 +150,13 @@ class BaseLedgerConnection( ) ] = activeContracts(filter.toEventFormat, offset) + def getContract( + contractId: ContractId[?], + queryingParties: Seq[PartyId], + )(implicit tc: TraceContext): Future[Option[event.CreatedEvent]] = { + client.getContract(contractId, queryingParties) + } + def getConnectedDomains(party: PartyId)(implicit tc: TraceContext ): Future[Map[SynchronizerAlias, SynchronizerId]] = diff --git a/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/environment/ledger/api/LedgerClient.scala b/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/environment/ledger/api/LedgerClient.scala index 9436df9f57..eabf4ba332 100644 --- a/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/environment/ledger/api/LedgerClient.scala +++ b/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/environment/ledger/api/LedgerClient.scala @@ -19,6 +19,7 @@ import com.daml.ledger.api.v2.admin.party_management_service.{ } import com.daml.ledger.api.v2.interactive.interactive_submission_service.InteractiveSubmissionServiceGrpc import com.daml.ledger.api.v2.command_service.CommandServiceGrpc +import com.daml.ledger.api.v2.event.CreatedEvent import com.daml.ledger.api.v2.offset_checkpoint.OffsetCheckpoint.toJavaProto import com.daml.ledger.api.v2.package_reference.PackageReference import com.daml.ledger.api.v2.package_service.{ListPackagesRequest, PackageServiceGrpc} @@ -153,6 +154,8 @@ private[environment] class LedgerClient( lapi.command_completion_service.CommandCompletionServiceGrpc.stub(channel) private val stateServiceStub: lapi.state_service.StateServiceGrpc.StateServiceStub = lapi.state_service.StateServiceGrpc.stub(channel) + private val contractServiceStub: lapi.contract_service.ContractServiceGrpc.ContractServiceStub = + lapi.contract_service.ContractServiceGrpc.stub(channel) private val identityProviderConfigServiceStub : identity_provider_config_service.IdentityProviderConfigServiceGrpc.IdentityProviderConfigServiceStub = identity_provider_config_service.IdentityProviderConfigServiceGrpc.stub(channel) @@ -195,6 +198,23 @@ private[environment] class LedgerClient( .serverStreaming(request, stub.getActiveContracts) ) + def getContract(contractId: ContractId[?], queryingParties: Seq[PartyId])(implicit + tc: TraceContext + ): Future[Option[CreatedEvent]] = { + (for { + stub <- withCredentialsAndTraceContext(contractServiceStub) + contract <- stub.getContract( + new lapi.contract_service.GetContractRequest( + contractId.contractId, + queryingParties.map(_.toProtoPrimitive), + ) + ) + } yield contract.createdEvent).recover { + case e: StatusRuntimeException if e.getStatus.getCode == io.grpc.Status.Code.NOT_FOUND => + None + } + } + def updates( request: GetUpdatesRequest )(implicit tc: TraceContext): Source[LedgerClient.GetTreeUpdatesResponse, NotUsed] = { diff --git a/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/ChoiceContextContractFetcher.scala b/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/ChoiceContextContractFetcher.scala new file mode 100644 index 0000000000..9f7aaf73e1 --- /dev/null +++ b/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/ChoiceContextContractFetcher.scala @@ -0,0 +1,121 @@ +// Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package org.lfdecentralizedtrust.splice.store + +import cats.data.OptionT +import com.daml.ledger.api.v2.event.CreatedEvent.toJavaProto +import com.daml.ledger.javaapi.data.CreatedEvent +import com.daml.ledger.javaapi.data.codegen.ContractId +import com.digitalasset.canton.config.NonNegativeFiniteDuration +import com.digitalasset.canton.data.CantonTimestamp +import com.digitalasset.canton.logging.NamedLoggerFactory +import com.digitalasset.canton.time.Clock +import com.digitalasset.canton.tracing.TraceContext +import org.lfdecentralizedtrust.splice.environment.BaseLedgerConnection +import org.lfdecentralizedtrust.splice.store.MultiDomainAcsStore.ContractCompanion +import org.lfdecentralizedtrust.splice.util.Contract + +import scala.concurrent.{ExecutionContext, Future} + +/** The RecordOrderPublisher might cause that some contracts are visible by validators' stores, + * but not yet by SVs'. Then, when a validator requires some data from Scan that is not yet there, + * it slows the workflow down. + * This is particularly relevant in the token standard, where the TransferInstruction and the amuletallocation + * might appear in the validators before Scans, and when they try to accept them, Scan fails to provide the necessary + * ChoiceContext. + * The purpose of this class then is to fallback to a direct ledger call when the store says a contract does not exist. + * + * This is not general purpose (see limitations below), but covers the case for the Token Standard contracts. + */ +trait ChoiceContextContractFetcher { + + def lookupContractById[C, TCid <: ContractId[?], T]( + companion: C + )(id: ContractId[?])(implicit + companionClass: ContractCompanion[C, TCid, T], + traceContext: TraceContext, + ): Future[Option[Contract[TCid, T]]] + +} + +object ChoiceContextContractFetcher { + + private class StoreChoiceContextContractFetcher(store: AppStore)(implicit ec: ExecutionContext) + extends ChoiceContextContractFetcher { + override def lookupContractById[C, TCid <: ContractId[?], T]( + companion: C + )(id: ContractId[?])(implicit + companionClass: ContractCompanion[C, TCid, T], + traceContext: TraceContext, + ): Future[Option[Contract[TCid, T]]] = + store.multiDomainAcsStore.lookupContractById(companion)(id).map(_.map(_.contract)) + } + + private class StoreChoiceContextContractFetcherWithLedgerFallback( + store: AppStore, + fallbackLedgerClient: BaseLedgerConnection, + clock: Clock, + loggerFactory: NamedLoggerFactory, + getContractValidity: NonNegativeFiniteDuration, + )(implicit ec: ExecutionContext) + extends ChoiceContextContractFetcher { + private val logger = loggerFactory.getLogger(this.getClass) + + override def lookupContractById[C, TCid <: ContractId[?], T]( + companion: C + )(id: ContractId[?])(implicit + companionClass: ContractCompanion[C, TCid, T], + traceContext: TraceContext, + ): Future[Option[Contract[TCid, T]]] = + OptionT(store.multiDomainAcsStore.lookupContractById(companion)(id)) + .map(_.contract) + .orElse( + OptionT(fallbackLedgerClient.getContract(id, Seq(store.multiDomainAcsStore.storeParty))) + // `getContract` will return archived contracts (and thus missing from the store) until they have been pruned. + // Thus, we verify that it was created not too long ago, + // such that it means that what happened is the store didn't see it yet, but the ledger did. + // (as opposed to the contract actually being archived) + // This will give a false-positive for a contract that was quickly archived after creation. + .filter { event => + val createdAt = + event.createdAt.flatMap(CantonTimestamp.fromProtoTimestamp(_).toOption) + val ignoreBefore = clock.now.minus(getContractValidity.asJava) + createdAt.exists(_ > ignoreBefore) + } + .subflatMap { createdEvent => + val javaCreatedEvent = CreatedEvent.fromProto(toJavaProto(createdEvent)) + logger.debug(s"Falling back to ledger for contract $javaCreatedEvent") + companionClass + .fromCreatedEvent(companion)(javaCreatedEvent) + } + ) + .value + } + + case class StoreContractFetcherWithLedgerFallbackConfig( + enabled: Boolean = true, + // RecordOrderPublisher + (created_at VS record_time skew) + getContractValidity: NonNegativeFiniteDuration = NonNegativeFiniteDuration.ofSeconds(120L), + ) + def createStoreWithLedgerFallback( + config: StoreContractFetcherWithLedgerFallbackConfig, + store: AppStore, + fallbackLedgerClient: BaseLedgerConnection, + clock: Clock, + loggerFactory: NamedLoggerFactory, + )(implicit ec: ExecutionContext): ChoiceContextContractFetcher = { + if (config.enabled) { + new StoreChoiceContextContractFetcherWithLedgerFallback( + store, + fallbackLedgerClient, + clock, + loggerFactory, + config.getContractValidity, + ) + } else { + new StoreChoiceContextContractFetcher(store) + } + } + +} diff --git a/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/MultiDomainAcsStore.scala b/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/MultiDomainAcsStore.scala index a189655cb8..3869d959eb 100644 --- a/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/MultiDomainAcsStore.scala +++ b/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/MultiDomainAcsStore.scala @@ -52,11 +52,11 @@ import scala.jdk.CollectionConverters.* trait MultiDomainAcsStore extends HasIngestionSink with AutoCloseable with NamedLogging { protected def storeName: String - protected def storeParty: String + def storeParty: PartyId protected implicit lazy val mc: MetricsContext = MetricsContext( "store_name" -> storeName, - "store_party" -> storeParty, + "store_party" -> storeParty.toString, // using .toString for historical reasons ) protected def metricsFactory: LabeledMetricsFactory diff --git a/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/db/DbMultiDomainAcsStore.scala b/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/db/DbMultiDomainAcsStore.scala index 0e99e97ac2..fbcbd23dbb 100644 --- a/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/db/DbMultiDomainAcsStore.scala +++ b/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/db/DbMultiDomainAcsStore.scala @@ -115,7 +115,7 @@ final class DbMultiDomainAcsStore[TXE]( import profile.api.jdbcActionExtensionMethods override lazy val storeName = acsStoreDescriptor.name - override lazy val storeParty = acsStoreDescriptor.party.toString + override lazy val storeParty = acsStoreDescriptor.party override protected def metricsFactory: LabeledMetricsFactory = retryProvider.metricsFactory override lazy val metrics = new StoreMetrics(metricsFactory)(mc) diff --git a/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/ScanApp.scala b/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/ScanApp.scala index 86d267cef7..30062fad64 100644 --- a/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/ScanApp.scala +++ b/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/ScanApp.scala @@ -46,7 +46,11 @@ import org.lfdecentralizedtrust.splice.scan.store.db.{ ScanAggregatesReaderContext, } import org.lfdecentralizedtrust.splice.scan.dso.DsoAnsResolver -import org.lfdecentralizedtrust.splice.store.{PageLimit, UpdateHistory} +import org.lfdecentralizedtrust.splice.store.{ + ChoiceContextContractFetcher, + PageLimit, + UpdateHistory, +} import org.lfdecentralizedtrust.splice.util.HasHealth import com.digitalasset.canton.concurrent.FutureSupervisor import com.digitalasset.canton.config.CantonRequireTypes.InstanceName @@ -307,14 +311,23 @@ class ScanApp( bftSequencersWithAdminConnections, initialRound, ) + contractFetcher = ChoiceContextContractFetcher.createStoreWithLedgerFallback( + config.parameters.contractFetchLedgerFallbackConfig, + store, + appInitConnection, + clock, + loggerFactory, + ) tokenStandardTransferInstructionHandler = new HttpTokenStandardTransferInstructionHandler( store, + contractFetcher, clock, loggerFactory, ) tokenStandardAllocationHandler = new HttpTokenStandardAllocationHandler( store, + contractFetcher, clock, loggerFactory, ) diff --git a/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/admin/http/HttpTokenStandardAllocationHandler.scala b/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/admin/http/HttpTokenStandardAllocationHandler.scala index edf8ec9573..302e9ee1ae 100644 --- a/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/admin/http/HttpTokenStandardAllocationHandler.scala +++ b/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/admin/http/HttpTokenStandardAllocationHandler.scala @@ -13,6 +13,7 @@ import org.lfdecentralizedtrust.splice.codegen.java.splice.api.token.metadatav1 import org.lfdecentralizedtrust.splice.environment.DarResources import org.lfdecentralizedtrust.splice.scan.store.ScanStore import org.lfdecentralizedtrust.splice.scan.util +import org.lfdecentralizedtrust.splice.store.ChoiceContextContractFetcher import org.lfdecentralizedtrust.splice.util.Contract import org.lfdecentralizedtrust.tokenstandard.allocation.v1 import org.lfdecentralizedtrust.tokenstandard.allocation.v1.definitions.GetChoiceContextRequest @@ -24,6 +25,7 @@ import scala.jdk.CollectionConverters.* class HttpTokenStandardAllocationHandler( store: ScanStore, + contractFetcher: ChoiceContextContractFetcher, clock: Clock, protected val loggerFactory: NamedLoggerFactory, )(implicit @@ -102,7 +104,7 @@ class HttpTokenStandardAllocationHandler( tc: TraceContext ): Future[definitions.ChoiceContext] = { for { - amuletAlloc <- store.multiDomainAcsStore + amuletAlloc <- contractFetcher .lookupContractById(amuletallocation.AmuletAllocation.COMPANION)( new amuletallocation.AmuletAllocation.ContractId( allocationId @@ -121,13 +123,14 @@ class HttpTokenStandardAllocationHandler( ChoiceContextBuilder, ]( s"AmuletAllocation '$allocationId'", - amuletAlloc.contract.payload.lockedAmulet, - amuletAlloc.contract.payload.allocation.settlement.settleBefore, + amuletAlloc.payload.lockedAmulet, + amuletAlloc.payload.allocation.settlement.settleBefore, requireLockedAmulet, Option.when(canBeFeatured)( PartyId.tryFromProtoPrimitive(amuletAlloc.payload.allocation.settlement.executor) ), store, + contractFetcher, clock, new ChoiceContextBuilder(_, excludeDebugFields), ) diff --git a/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/admin/http/HttpTokenStandardTransferInstructionHandler.scala b/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/admin/http/HttpTokenStandardTransferInstructionHandler.scala index 9f7516889d..484a369e18 100644 --- a/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/admin/http/HttpTokenStandardTransferInstructionHandler.scala +++ b/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/admin/http/HttpTokenStandardTransferInstructionHandler.scala @@ -9,22 +9,26 @@ import com.digitalasset.canton.topology.PartyId import com.digitalasset.canton.tracing.{Spanning, TraceContext} import io.opentelemetry.api.trace.Tracer import org.lfdecentralizedtrust.splice.codegen.java.splice -import org.lfdecentralizedtrust.splice.codegen.java.splice.api.token.transferinstructionv1 +import org.lfdecentralizedtrust.splice.codegen.java.splice.api.token.{ + metadatav1, + transferinstructionv1, +} import org.lfdecentralizedtrust.splice.environment.DarResources -import org.lfdecentralizedtrust.tokenstandard.transferinstruction.v1 -import v1.{Resource, definitions} import org.lfdecentralizedtrust.splice.scan.store.ScanStore import org.lfdecentralizedtrust.splice.scan.util +import org.lfdecentralizedtrust.splice.store.ChoiceContextContractFetcher import org.lfdecentralizedtrust.splice.util.{AmuletConfigSchedule, Contract} -import org.lfdecentralizedtrust.splice.codegen.java.splice.api.token.metadatav1 +import org.lfdecentralizedtrust.tokenstandard.transferinstruction.v1 +import org.lfdecentralizedtrust.tokenstandard.transferinstruction.v1.{Resource, definitions} import java.time.ZoneOffset import scala.concurrent.{ExecutionContext, Future} -import scala.util.{Failure, Success, Try} import scala.jdk.CollectionConverters.* +import scala.util.{Failure, Success, Try} class HttpTokenStandardTransferInstructionHandler( store: ScanStore, + contractFetcher: ChoiceContextContractFetcher, clock: Clock, protected val loggerFactory: NamedLoggerFactory, )(implicit @@ -198,8 +202,10 @@ class HttpTokenStandardTransferInstructionHandler( tc: TraceContext ): Future[definitions.ChoiceContext] = { for { - amuletInstr <- store.multiDomainAcsStore - .lookupContractById(splice.amulettransferinstruction.AmuletTransferInstruction.COMPANION)( + amuletInstr <- contractFetcher + .lookupContractById( + splice.amulettransferinstruction.AmuletTransferInstruction.COMPANION + )( new splice.amulettransferinstruction.AmuletTransferInstruction.ContractId( transferInstructionId ) @@ -217,11 +223,12 @@ class HttpTokenStandardTransferInstructionHandler( ChoiceContextBuilder, ]( s"AmuletTransferInstruction '$transferInstructionId'", - amuletInstr.contract.payload.lockedAmulet, - amuletInstr.contract.payload.transfer.executeBefore, + amuletInstr.payload.lockedAmulet, + amuletInstr.payload.transfer.executeBefore, requireLockedAmulet, None, store, + contractFetcher, clock, new ChoiceContextBuilder(_, excludeDebugFields), ) diff --git a/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/util/ChoiceContextBuilder.scala b/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/util/ChoiceContextBuilder.scala index 3afe949ee5..82f55d3cf7 100644 --- a/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/util/ChoiceContextBuilder.scala +++ b/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/util/ChoiceContextBuilder.scala @@ -7,12 +7,12 @@ import com.digitalasset.canton.data.CantonTimestamp import com.digitalasset.canton.time.Clock import com.digitalasset.canton.topology.PartyId import com.digitalasset.canton.tracing.TraceContext - import org.lfdecentralizedtrust.splice.codegen.java.splice.amulet import org.lfdecentralizedtrust.splice.codegen.java.splice.api.token.metadatav1 import org.lfdecentralizedtrust.splice.codegen.java.splice.api.token.metadatav1.AnyContract import org.lfdecentralizedtrust.splice.codegen.java.splice.round import org.lfdecentralizedtrust.splice.scan.store.ScanStore +import org.lfdecentralizedtrust.splice.store.ChoiceContextContractFetcher import org.lfdecentralizedtrust.splice.util.{AmuletConfigSchedule, Contract, ContractWithState} import java.time.Instant @@ -140,6 +140,7 @@ object ChoiceContextBuilder { requireLockedAmulet: Boolean, featuredProvider: Option[PartyId], store: ScanStore, + fetcher: ChoiceContextContractFetcher, clock: Clock, newBuilder: String => Builder, )(implicit @@ -147,7 +148,7 @@ object ChoiceContextBuilder { tc: TraceContext, ): Future[ChoiceContext] = { for { - optLockedAmulet <- store.multiDomainAcsStore.lookupContractById( + optLockedAmulet <- fetcher.lookupContractById( amulet.LockedAmulet.COMPANION )(lockedAmuletId) (choiceContextBuilder, _) <- getAmuletRulesTransferContext[ @@ -182,7 +183,7 @@ object ChoiceContextBuilder { .build() } } else { - optLockedAmulet.foreach(co => choiceContextBuilder.disclose(co.contract)) + optLockedAmulet.foreach(contract => choiceContextBuilder.disclose(contract)) choiceContextBuilder // the choice implementation should only attempt to expire the lock if it exists .addBool("expire-lock", optLockedAmulet.isDefined) diff --git a/canton/community/ledger-api/src/main/protobuf/com/daml/ledger/api/v2/contract_service.proto b/canton/community/ledger-api/src/main/protobuf/com/daml/ledger/api/v2/contract_service.proto new file mode 100644 index 0000000000..0b8fc7522d --- /dev/null +++ b/canton/community/ledger-api/src/main/protobuf/com/daml/ledger/api/v2/contract_service.proto @@ -0,0 +1,52 @@ +// Copyright (c) 2025 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +syntax = "proto3"; + +package com.daml.ledger.api.v2; + +import "com/daml/ledger/api/v2/event.proto"; + +option csharp_namespace = "Com.Daml.Ledger.Api.V2"; +option java_outer_classname = "ContractServiceOuterClass"; +option java_package = "com.daml.ledger.api.v2"; + +// This service is experimental / alpha, therefore no backwards compatibility is guaranteed. +service ContractService { + // Looking up contract data by contract ID. + // This endpoint is experimental / alpha, therefore no backwards compatibility is guaranteed. + // This endpoint must not be used to look up contracts which entered the participant via party replication + // or repair service. + // If there is no contract exist with the contract ID, or there is no intersection with the querying_parties, + // an CONTRACT_PAYLOAD_NOT_FOUND error will be raised. + rpc GetContract(GetContractRequest) returns (GetContractResponse); +} + +message GetContractRequest { + // The ID of the contract. + // Must be a valid LedgerString (as described in ``value.proto``). + // Required + string contract_id = 1; + + // The list of querying parties + // The stakeholders of the referenced contract must have an intersection with any of these parties + // to return the result. + // Optional, if no querying_parties specified, all possible contracts could be returned. + repeated string querying_parties = 2; +} + +message GetContractResponse { + // The representative_package_id will be always set to the contract package ID, therefore this endpoint should + // not be used to lookup contract which entered the participant via party replication or repair service. + // The witnesses field will contain only the querying_parties which are also stakeholders of the contract as well. + // The following fields of the created event cannot be populated, so those should not be used / parsed: + // + // - offset + // - node_id + // - created_event_blob + // - interface_views + // - acs_delta + // + // Required + CreatedEvent created_event = 1; +} diff --git a/test-full-class-names.log b/test-full-class-names.log index 2179905b69..b8fd08a31a 100644 --- a/test-full-class-names.log +++ b/test-full-class-names.log @@ -45,6 +45,7 @@ org.lfdecentralizedtrust.splice.integration.tests.SvReconcileSynchronizerConfigI org.lfdecentralizedtrust.splice.integration.tests.SvStateManagementIntegrationTest org.lfdecentralizedtrust.splice.integration.tests.TokenStandardAllocationIntegrationTest org.lfdecentralizedtrust.splice.integration.tests.TokenStandardCliIntegrationTest +org.lfdecentralizedtrust.splice.integration.tests.TokenStandardFetchFallbackIntegrationTest org.lfdecentralizedtrust.splice.integration.tests.TokenStandardTransferIntegrationTest org.lfdecentralizedtrust.splice.integration.tests.UnclaimedActivityRecordIntegrationTest org.lfdecentralizedtrust.splice.integration.tests.UnclaimedSvRewardsScriptIntegrationTest