Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ import org.lfdecentralizedtrust.splice.http.UrlValidator
import org.lfdecentralizedtrust.splice.scan.admin.api.client.BftScanConnection.BftScanClientConfig
import org.lfdecentralizedtrust.splice.scan.config.{
BftSequencerConfig,
MediatorVerdictIngestionConfig,
ScanAppBackendConfig,
ScanAppClientConfig,
ScanCacheConfig,
MediatorVerdictIngestionConfig,
ScanSynchronizerConfig,
CacheConfig as SpliceCacheConfig,
}
Expand Down Expand Up @@ -82,6 +82,7 @@ import com.digitalasset.canton.synchronizer.sequencer.config.{
}
import com.digitalasset.canton.topology.PartyId
import com.digitalasset.daml.lf.data.Ref.PackageVersion
import org.lfdecentralizedtrust.splice.store.ChoiceContextContractFetcher

case class SpliceConfig(
override val name: Option[String] = None,
Expand Down Expand Up @@ -412,6 +413,9 @@ object SpliceConfig {
deriveReader[CircuitBreakerConfig]
implicit val circuitBreakersConfig: ConfigReader[CircuitBreakersConfig] =
deriveReader[CircuitBreakersConfig]
implicit val contractFetchLedgerFallbackConfigReader
: ConfigReader[ChoiceContextContractFetcher.StoreContractFetcherWithLedgerFallbackConfig] =
deriveReader[ChoiceContextContractFetcher.StoreContractFetcherWithLedgerFallbackConfig]
implicit val spliceParametersConfig: ConfigReader[SpliceParametersConfig] =
deriveReader[SpliceParametersConfig]
implicit val rateLimitersConfig: ConfigReader[RateLimitersConfig] =
Expand Down Expand Up @@ -838,6 +842,9 @@ object SpliceConfig {
deriveWriter[CircuitBreakerConfig]
implicit val circuitBreakersConfig: ConfigWriter[CircuitBreakersConfig] =
deriveWriter[CircuitBreakersConfig]
implicit val contractFetchLedgerFallbackConfigWriter
: ConfigWriter[ChoiceContextContractFetcher.StoreContractFetcherWithLedgerFallbackConfig] =
deriveWriter[ChoiceContextContractFetcher.StoreContractFetcherWithLedgerFallbackConfig]
implicit val spliceParametersConfig: ConfigWriter[SpliceParametersConfig] =
deriveWriter[SpliceParametersConfig]

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package org.lfdecentralizedtrust.splice.integration.tests

import com.digitalasset.canton.data.CantonTimestamp
import com.digitalasset.canton.{HasActorSystem, HasExecutionContext}
import org.lfdecentralizedtrust.splice.codegen.java.splice.api.token.allocationv1.*
import org.lfdecentralizedtrust.splice.codegen.java.splice.api.token.holdingv1.InstrumentId
import org.lfdecentralizedtrust.splice.codegen.java.splice.api.token.transferinstructionv1.TransferInstruction
import org.lfdecentralizedtrust.splice.http.v0.definitions.TransferInstructionResultOutput.members
import org.lfdecentralizedtrust.splice.integration.EnvironmentDefinition
import org.lfdecentralizedtrust.splice.integration.tests.SpliceTests.IntegrationTestWithSharedEnvironment
import org.lfdecentralizedtrust.splice.util.{
ChoiceContextWithDisclosures,
TriggerTestUtil,
WalletTestUtil,
}

import java.time.Instant
import java.util.{Optional, UUID}

class TokenStandardFetchFallbackIntegrationTest
extends IntegrationTestWithSharedEnvironment
with WalletTestUtil
with WalletTxLogTestUtil
with TriggerTestUtil
with HasActorSystem
with HasExecutionContext {

override def environmentDefinition: EnvironmentDefinition = {
EnvironmentDefinition.simpleTopology1Sv(this.getClass.getSimpleName)
}

"Token Standard" should {

"TransferInstruction context can be fetched from Scan even if it's not yet ingested into the store" in {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test wants the fallback to be enabled and the validity to be long enough, whereas this assertion wants to see an archival immediately, which is in opposition to what this test wants, so I had to move this into a different file

implicit env =>
pauseScanIngestionWithin(sv1ScanBackend) {
onboardWalletUser(aliceWalletClient, aliceValidatorBackend)
val bobUserParty = onboardWalletUser(bobWalletClient, bobValidatorBackend)
aliceWalletClient.tap(100)

val response = actAndCheck(
"Alice creates transfer offer",
aliceWalletClient.createTokenStandardTransfer(
bobUserParty,
10,
s"Transfer",
CantonTimestamp.now().plusSeconds(3600L),
UUID.randomUUID().toString,
),
)(
"Alice and Bob see it",
_ => {
Seq(aliceWalletClient, bobWalletClient).foreach(
_.listTokenStandardTransfers() should have size 1
)
},
)._1

val cid = response.output match {
case members.TransferInstructionPending(value) =>
new TransferInstruction.ContractId(value.transferInstructionCid)
case _ => fail("The transfers were expected to be pending.")
}

clue("SV-1's Scan sees it (still, even though ingestion is paused)") {
eventuallySucceeds() {
sv1ScanBackend.getTransferInstructionAcceptContext(cid)
}
}
}
}

"AmuletAllocations context can be fetched from Scan even if it's not yet ingested into the store" in {
implicit env =>
pauseScanIngestionWithin(sv1ScanBackend) {
val aliceParty = onboardWalletUser(aliceWalletClient, aliceValidatorBackend)
val bobParty = onboardWalletUser(bobWalletClient, bobValidatorBackend)
// // Allocate venue on separate participant node, we still go through the validator API instead of parties.enable
// // so we can use the standard wallet client APIs but give the party a more useful name than splitwell.
// val venuePartyHint = s"venue-party-${Random.nextInt()}"
// val venueParty = splitwellValidatorBackend.onboardUser(
// splitwellWalletClient.config.ledgerApiUser,
// Some(
// PartyId.tryFromProtoPrimitive(
// s"$venuePartyHint::${splitwellValidatorBackend.participantClient.id.namespace.toProtoPrimitive}"
// )
// ),
// )

aliceWalletClient.tap(100)
val referenceId = UUID.randomUUID().toString

val (_, allocation) = actAndCheck(
"Alice creates an Allocation",
aliceWalletClient.allocateAmulet(
new AllocationSpecification(
new SettlementInfo(
dsoParty.toProtoPrimitive,
new Reference(referenceId, Optional.empty),
Instant.now,
Instant.now.plusSeconds(3600L),
Instant.now.plusSeconds(2 * 3600L),
ChoiceContextWithDisclosures.emptyMetadata,
),
UUID.randomUUID().toString,
new TransferLeg(
aliceParty.toProtoPrimitive,
bobParty.toProtoPrimitive,
BigDecimal(10).bigDecimal,
new InstrumentId(dsoParty.toProtoPrimitive, "Amulet"),
ChoiceContextWithDisclosures.emptyMetadata,
),
)
),
)(
"Alice sees the Allocation",
_ => {
val allocation =
aliceWalletClient
.listAmuletAllocations()
.loneElement
allocation.payload.allocation.settlement.settlementRef.id should be(referenceId)
allocation
},
)

clue("SV-1's Scan sees it (still, even though ingestion is paused)") {
eventuallySucceeds() {
sv1ScanBackend.getAllocationTransferContext(
allocation.contractId.toInterface(Allocation.INTERFACE)
)
}
}
}
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,20 @@ import org.lfdecentralizedtrust.splice.codegen.java.splice.api.token.metadatav1
import org.lfdecentralizedtrust.splice.codegen.java.splice.api.token.transferinstructionv1.TransferInstruction
import org.lfdecentralizedtrust.splice.config.ConfigTransforms.{
ConfigurableApp,
updateAllScanAppConfigs_,
updateAllSvAppFoundDsoConfigs_,
updateAutomationConfig,
}
import org.lfdecentralizedtrust.splice.http.v0.definitions.TransferInstructionResultOutput.members
import org.lfdecentralizedtrust.splice.http.v0.definitions.TransactionHistoryResponseItem.TransactionType as HttpTransactionType
import org.lfdecentralizedtrust.splice.http.v0.definitions.TransferInstructionResultOutput.members
import org.lfdecentralizedtrust.splice.http.v0.definitions.{
AbortTransferInstruction,
ReceiverAmount,
Transfer,
}
import org.lfdecentralizedtrust.splice.integration.EnvironmentDefinition
import org.lfdecentralizedtrust.splice.integration.tests.SpliceTests.IntegrationTestWithSharedEnvironment
import org.lfdecentralizedtrust.splice.integration.tests.WalletTxLogTestUtil
import org.lfdecentralizedtrust.splice.store.ChoiceContextContractFetcher
import org.lfdecentralizedtrust.splice.util.{DisclosedContracts, WalletTestUtil}
import org.lfdecentralizedtrust.splice.wallet.automation.CollectRewardsAndMergeAmuletsTrigger
import org.lfdecentralizedtrust.splice.wallet.store.{
Expand Down Expand Up @@ -56,6 +57,17 @@ class TokenStandardTransferIntegrationTest
_.copy(zeroTransferFees = true)
)(config)
)
.addConfigTransforms((_, config) =>
updateAllScanAppConfigs_(scanConfig =>
scanConfig.copy(parameters =
scanConfig.parameters.copy(contractFetchLedgerFallbackConfig =
ChoiceContextContractFetcher.StoreContractFetcherWithLedgerFallbackConfig(
enabled = false // expiry test doesn't see the archival otherwise
)
)
)
)(config)
)
}

"Token Standard Transfers should" should {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ package org.lfdecentralizedtrust.splice.util

import com.digitalasset.canton.{BaseTest, ScalaFuturesWithPatience}
import com.typesafe.scalalogging.LazyLogging
import org.lfdecentralizedtrust.splice.automation.Trigger
import org.lfdecentralizedtrust.splice.automation.{Trigger, UpdateIngestionService}
import org.lfdecentralizedtrust.splice.console.ScanAppBackendReference
import org.lfdecentralizedtrust.splice.integration.EnvironmentDefinition.sv1Backend
import org.lfdecentralizedtrust.splice.integration.tests.SpliceTests.SpliceTestConsoleEnvironment
import org.lfdecentralizedtrust.splice.sv.automation.delegatebased.AdvanceOpenMiningRoundTrigger
Expand Down Expand Up @@ -34,6 +35,17 @@ trait TriggerTestUtil { self: BaseTest =>
advanceOpenMiningRoundTrigger.runOnce().futureValue should be(true)
}
}

def pauseScanIngestionWithin[T](scan: ScanAppBackendReference)(codeBlock: => T): T = {
try {
logger.info(s"Pausing ingestion for ${scan.name}")
scan.automation.services[UpdateIngestionService].foreach(_.pause().futureValue)
codeBlock
} finally {
logger.info(s"Resuming ingestion for ${scan.name}")
scan.automation.services[UpdateIngestionService].foreach(_.resume())
}
}
}

object TriggerTestUtil extends ScalaFuturesWithPatience with LazyLogging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,13 @@ abstract class AutomationService(
}

/** Returns all triggers of the given class */
final def triggers[T <: Trigger](implicit tag: ClassTag[T]): Seq[T] = {
final def triggers[T <: Trigger](implicit tag: ClassTag[T]): Seq[T] = services[T](tag)

final def services[T](implicit tag: ClassTag[T]): Seq[T] = {
backgroundServices
.get()
.collect { case trigger: T =>
trigger
.collect { case service: T =>
service
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ import org.lfdecentralizedtrust.splice.store.MultiDomainAcsStore
import com.digitalasset.canton.logging.NamedLoggerFactory
import com.digitalasset.canton.time.Clock
import com.digitalasset.canton.tracing.TraceContext
import com.google.common.annotations.VisibleForTesting
import io.opentelemetry.api.trace.Tracer
import org.lfdecentralizedtrust.splice.store.MultiDomainAcsStore.IngestionSink.IngestionStart

import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.{ExecutionContext, Future, Promise, blocking}

/** Ingestion for ACS and transfer stores.
* We ingest them independently but we ensure that the acs store
Expand Down Expand Up @@ -97,13 +98,18 @@ class UpdateIngestionService(
private def process(
msgs: Seq[GetTreeUpdatesResponse]
)(implicit traceContext: TraceContext): Future[Unit] = {
NonEmptyList.fromFoldable(msgs) match {
case Some(batch) =>
logger.debug(s"Processing batch of ${batch.size} elements")
ingestionSink.ingestUpdateBatch(batch.map(_.updateOrCheckpoint))
case None =>
logger.error("Received empty batch of updates to ingest. This is never supposed to happen.")
Future.unit
// if paused, this step will backpressure the source
waitForResumePromise.future.flatMap { _ =>
NonEmptyList.fromFoldable(msgs) match {
case Some(batch) =>
logger.debug(s"Processing batch of ${batch.size} elements")
ingestionSink.ingestUpdateBatch(batch.map(_.updateOrCheckpoint))
case None =>
logger.error(
"Received empty batch of updates to ingest. This is never supposed to happen."
)
Future.unit
}
}
}

Expand All @@ -124,4 +130,37 @@ class UpdateIngestionService(

// Kick-off the ingestion
startIngestion()

@SuppressWarnings(Array("org.wartremover.warts.Var"))
@volatile
private var waitForResumePromise = Promise.successful(())

/** Note that any in-flight events being processed when `pause` is called will still be processed.
*/
@VisibleForTesting
def pause(): Future[Unit] = blocking {
withNewTrace(this.getClass.getSimpleName) { implicit traceContext => _ =>
logger.info("Pausing UpdateIngestionService.")
blocking {
synchronized {
if (waitForResumePromise.isCompleted) {
waitForResumePromise = Promise()
}
Future.successful(())
}
}
}
}

@VisibleForTesting
def resume(): Unit = blocking {
withNewTrace(this.getClass.getSimpleName) { implicit traceContext => _ =>
logger.info("Resuming UpdateIngestionService.")
blocking {
synchronized {
val _ = waitForResumePromise.trySuccess(())
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,14 @@ import com.digitalasset.canton.config.{
NonNegativeFiniteDuration,
WatchdogConfig,
}
import org.lfdecentralizedtrust.splice.store.ChoiceContextContractFetcher
import org.lfdecentralizedtrust.splice.util.SpliceRateLimitConfig

final case class SpliceParametersConfig(
batching: BatchingConfig = BatchingConfig(),
caching: CachingConfigs = CachingConfigs(),
contractFetchLedgerFallbackConfig: ChoiceContextContractFetcher.StoreContractFetcherWithLedgerFallbackConfig =
ChoiceContextContractFetcher.StoreContractFetcherWithLedgerFallbackConfig(),
// Do not define any defaults on the class containing the `SpliceParametersConfig` as they'll be overwritten.
// Do it instead on the app.conf file in `cluster/images/${the_app}/app.conf`
customTimeouts: Map[String, NonNegativeFiniteDuration] = Map.empty,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ package org.lfdecentralizedtrust.splice.environment
import com.daml.ledger.api.v2 as lapi
import com.daml.ledger.api.v2.admin.identity_provider_config_service.IdentityProviderConfig
import com.daml.ledger.api.v2.admin.{ObjectMetaOuterClass, UserManagementServiceOuterClass}
import com.daml.ledger.api.v2.event
import com.daml.ledger.api.v2.package_reference.PackageReference
import com.daml.ledger.javaapi.data.codegen.{Created, Exercised, HasCommands, Update}
import com.daml.ledger.javaapi.data.codegen.{ContractId, Created, Exercised, HasCommands, Update}
import com.daml.ledger.javaapi.data.{Command, CreatedEvent, ExercisedEvent, Transaction, User}
import com.digitalasset.base.error.ErrorResource
import com.digitalasset.base.error.utils.ErrorDetails
Expand Down Expand Up @@ -149,6 +150,13 @@ class BaseLedgerConnection(
)
] = activeContracts(filter.toEventFormat, offset)

def getContract(
contractId: ContractId[?],
queryingParties: Seq[PartyId],
)(implicit tc: TraceContext): Future[Option[event.CreatedEvent]] = {
client.getContract(contractId, queryingParties)
}

def getConnectedDomains(party: PartyId)(implicit
tc: TraceContext
): Future[Map[SynchronizerAlias, SynchronizerId]] =
Expand Down
Loading
Loading