Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ import org.lfdecentralizedtrust.splice.http.UrlValidator
import org.lfdecentralizedtrust.splice.scan.admin.api.client.BftScanConnection.BftScanClientConfig
import org.lfdecentralizedtrust.splice.scan.config.{
BftSequencerConfig,
MediatorVerdictIngestionConfig,
ScanAppBackendConfig,
ScanAppClientConfig,
ScanCacheConfig,
MediatorVerdictIngestionConfig,
ScanSynchronizerConfig,
CacheConfig as SpliceCacheConfig,
}
Expand Down Expand Up @@ -82,6 +82,7 @@ import com.digitalasset.canton.synchronizer.sequencer.config.{
}
import com.digitalasset.canton.topology.PartyId
import com.digitalasset.daml.lf.data.Ref.PackageVersion
import org.lfdecentralizedtrust.splice.store.ContractFetcher

case class SpliceConfig(
override val name: Option[String] = None,
Expand Down Expand Up @@ -412,6 +413,9 @@ object SpliceConfig {
deriveReader[CircuitBreakerConfig]
implicit val circuitBreakersConfig: ConfigReader[CircuitBreakersConfig] =
deriveReader[CircuitBreakersConfig]
implicit val contractFetchLedgerFallbackConfigReader
: ConfigReader[ContractFetcher.StoreContractFetcherWithLedgerFallbackConfig] =
deriveReader[ContractFetcher.StoreContractFetcherWithLedgerFallbackConfig]
implicit val spliceParametersConfig: ConfigReader[SpliceParametersConfig] =
deriveReader[SpliceParametersConfig]
implicit val rateLimitersConfig: ConfigReader[RateLimitersConfig] =
Expand Down Expand Up @@ -838,6 +842,9 @@ object SpliceConfig {
deriveWriter[CircuitBreakerConfig]
implicit val circuitBreakersConfig: ConfigWriter[CircuitBreakersConfig] =
deriveWriter[CircuitBreakersConfig]
implicit val contractFetchLedgerFallbackConfigWriter
: ConfigWriter[ContractFetcher.StoreContractFetcherWithLedgerFallbackConfig] =
deriveWriter[ContractFetcher.StoreContractFetcherWithLedgerFallbackConfig]
implicit val spliceParametersConfig: ConfigWriter[SpliceParametersConfig] =
deriveWriter[SpliceParametersConfig]

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package org.lfdecentralizedtrust.splice.integration.tests

import com.digitalasset.canton.data.CantonTimestamp
import com.digitalasset.canton.{HasActorSystem, HasExecutionContext}
import org.lfdecentralizedtrust.splice.codegen.java.splice.api.token.transferinstructionv1.TransferInstruction
import org.lfdecentralizedtrust.splice.config.ConfigTransforms.updateAllScanAppConfigs_
import org.lfdecentralizedtrust.splice.http.v0.definitions.TransferInstructionResultOutput.members
import org.lfdecentralizedtrust.splice.integration.EnvironmentDefinition
import org.lfdecentralizedtrust.splice.integration.tests.SpliceTests.IntegrationTestWithSharedEnvironment
import org.lfdecentralizedtrust.splice.store.ContractFetcher
import org.lfdecentralizedtrust.splice.util.{TriggerTestUtil, WalletTestUtil}

import java.util.UUID

class TokenStandardFetchFallbackIntegrationTest
extends IntegrationTestWithSharedEnvironment
with WalletTestUtil
with WalletTxLogTestUtil
with TriggerTestUtil
with HasActorSystem
with HasExecutionContext {

override def environmentDefinition: EnvironmentDefinition = {
EnvironmentDefinition
.simpleTopology1Sv(this.getClass.getSimpleName)
.addConfigTransforms((_, config) =>
updateAllScanAppConfigs_(scanConfig =>
scanConfig.copy(parameters =
scanConfig.parameters.copy(contractFetchLedgerFallbackConfig =
ContractFetcher.StoreContractFetcherWithLedgerFallbackConfig(
enabled = true
)
)
)
)(config)
)
}

"Token Standard Transfers should" should {

"TransferInstruction context can be fetched from Scan even if it's not yet ingested into the store" in {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test wants the fallback to be enabled and the validity to be long enough, whereas this assertion wants to see an archival immediately, which is in opposition to what this test wants, so I had to move this into a different file

implicit env =>
pauseScanIngestionWithin(sv1ScanBackend) {
onboardWalletUser(aliceWalletClient, aliceValidatorBackend)
val bobUserParty = onboardWalletUser(bobWalletClient, bobValidatorBackend)
aliceWalletClient.tap(100)

val response = actAndCheck(
"Alice creates transfer offer",
aliceWalletClient.createTokenStandardTransfer(
bobUserParty,
10,
s"Transfer",
CantonTimestamp.now().plusSeconds(3600L),
UUID.randomUUID().toString,
),
)(
"Alice and Bob see it",
_ => {
Seq(aliceWalletClient, bobWalletClient).foreach(
_.listTokenStandardTransfers() should have size 1
)
},
)._1

val cid = response.output match {
case members.TransferInstructionPending(value) =>
new TransferInstruction.ContractId(value.transferInstructionCid)
case _ => fail("The transfers were expected to be pending.")
}

clue("SV-1's Scan sees it (still, even though ingestion is paused)") {
eventuallySucceeds() {
sv1ScanBackend.getTransferInstructionAcceptContext(cid)
}
}
}
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ package org.lfdecentralizedtrust.splice.util

import com.digitalasset.canton.{BaseTest, ScalaFuturesWithPatience}
import com.typesafe.scalalogging.LazyLogging
import org.lfdecentralizedtrust.splice.automation.Trigger
import org.lfdecentralizedtrust.splice.automation.{Trigger, UpdateIngestionService}
import org.lfdecentralizedtrust.splice.console.ScanAppBackendReference
import org.lfdecentralizedtrust.splice.integration.EnvironmentDefinition.sv1Backend
import org.lfdecentralizedtrust.splice.integration.tests.SpliceTests.SpliceTestConsoleEnvironment
import org.lfdecentralizedtrust.splice.sv.automation.delegatebased.AdvanceOpenMiningRoundTrigger
Expand Down Expand Up @@ -34,6 +35,17 @@ trait TriggerTestUtil { self: BaseTest =>
advanceOpenMiningRoundTrigger.runOnce().futureValue should be(true)
}
}

def pauseScanIngestionWithin[T](scan: ScanAppBackendReference)(codeBlock: => T): T = {
try {
logger.info(s"Pausing ingestion for ${scan.name}")
scan.automation.services[UpdateIngestionService].foreach(_.pause().futureValue)
codeBlock
} finally {
logger.info(s"Resuming ingestion for ${scan.name}")
scan.automation.services[UpdateIngestionService].foreach(_.resume())
}
}
}

object TriggerTestUtil extends ScalaFuturesWithPatience with LazyLogging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,13 @@ abstract class AutomationService(
}

/** Returns all triggers of the given class */
final def triggers[T <: Trigger](implicit tag: ClassTag[T]): Seq[T] = {
final def triggers[T <: Trigger](implicit tag: ClassTag[T]): Seq[T] = services[T](tag)

final def services[T](implicit tag: ClassTag[T]): Seq[T] = {
backgroundServices
.get()
.collect { case trigger: T =>
trigger
.collect { case service: T =>
service
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import com.digitalasset.canton.tracing.TraceContext
import io.opentelemetry.api.trace.Tracer
import org.lfdecentralizedtrust.splice.store.MultiDomainAcsStore.IngestionSink.IngestionStart

import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.{ExecutionContext, Future, Promise, blocking}

/** Ingestion for ACS and transfer stores.
* We ingest them independently but we ensure that the acs store
Expand Down Expand Up @@ -97,13 +97,18 @@ class UpdateIngestionService(
private def process(
msgs: Seq[GetTreeUpdatesResponse]
)(implicit traceContext: TraceContext): Future[Unit] = {
NonEmptyList.fromFoldable(msgs) match {
case Some(batch) =>
logger.debug(s"Processing batch of ${batch.size} elements")
ingestionSink.ingestUpdateBatch(batch.map(_.updateOrCheckpoint))
case None =>
logger.error("Received empty batch of updates to ingest. This is never supposed to happen.")
Future.unit
// if paused, this step will backpressure the source
waitForResumePromise.future.flatMap { _ =>
NonEmptyList.fromFoldable(msgs) match {
case Some(batch) =>
logger.debug(s"Processing batch of ${batch.size} elements")
ingestionSink.ingestUpdateBatch(batch.map(_.updateOrCheckpoint))
case None =>
logger.error(
"Received empty batch of updates to ingest. This is never supposed to happen."
)
Future.unit
}
}
}

Expand All @@ -124,4 +129,36 @@ class UpdateIngestionService(

// Kick-off the ingestion
startIngestion()

@SuppressWarnings(Array("org.wartremover.warts.Var"))
@volatile
private var waitForResumePromise = Promise.successful(())

/** Note that any in-flight events being processed when `pause` is called will still be processed.
* For test purposes.
*/
def pause(): Future[Unit] = blocking {
withNewTrace(this.getClass.getSimpleName) { implicit traceContext => _ =>
logger.info("Pausing UpdateIngestionService.")
blocking {
synchronized {
if (waitForResumePromise.isCompleted) {
waitForResumePromise = Promise()
}
Future.successful(())
}
}
}
}

def resume(): Unit = blocking {
withNewTrace(this.getClass.getSimpleName) { implicit traceContext => _ =>
logger.info("Resuming UpdateIngestionService.")
blocking {
synchronized {
val _ = waitForResumePromise.trySuccess(())
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,14 @@ import com.digitalasset.canton.config.{
NonNegativeFiniteDuration,
WatchdogConfig,
}
import org.lfdecentralizedtrust.splice.store.ContractFetcher
import org.lfdecentralizedtrust.splice.util.SpliceRateLimitConfig

final case class SpliceParametersConfig(
batching: BatchingConfig = BatchingConfig(),
caching: CachingConfigs = CachingConfigs(),
contractFetchLedgerFallbackConfig: ContractFetcher.StoreContractFetcherWithLedgerFallbackConfig =
ContractFetcher.StoreContractFetcherWithLedgerFallbackConfig(),
// Do not define any defaults on the class containing the `SpliceParametersConfig` as they'll be overwritten.
// Do it instead on the app.conf file in `cluster/images/${the_app}/app.conf`
customTimeouts: Map[String, NonNegativeFiniteDuration] = Map.empty,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ package org.lfdecentralizedtrust.splice.environment
import com.daml.ledger.api.v2 as lapi
import com.daml.ledger.api.v2.admin.identity_provider_config_service.IdentityProviderConfig
import com.daml.ledger.api.v2.admin.{ObjectMetaOuterClass, UserManagementServiceOuterClass}
import com.daml.ledger.api.v2.event
import com.daml.ledger.api.v2.package_reference.PackageReference
import com.daml.ledger.javaapi.data.codegen.{Created, Exercised, HasCommands, Update}
import com.daml.ledger.javaapi.data.codegen.{ContractId, Created, Exercised, HasCommands, Update}
import com.daml.ledger.javaapi.data.{Command, CreatedEvent, ExercisedEvent, Transaction, User}
import com.digitalasset.base.error.ErrorResource
import com.digitalasset.base.error.utils.ErrorDetails
Expand Down Expand Up @@ -149,6 +150,13 @@ class BaseLedgerConnection(
)
] = activeContracts(filter.toEventFormat, offset)

def getContract(
contractId: ContractId[?],
queryingParties: Seq[PartyId],
)(implicit tc: TraceContext): Future[Option[event.CreatedEvent]] = {
client.getContract(contractId, queryingParties)
}

def getConnectedDomains(party: PartyId)(implicit
tc: TraceContext
): Future[Map[SynchronizerAlias, SynchronizerId]] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import com.daml.ledger.api.v2.admin.party_management_service.{
}
import com.daml.ledger.api.v2.interactive.interactive_submission_service.InteractiveSubmissionServiceGrpc
import com.daml.ledger.api.v2.command_service.CommandServiceGrpc
import com.daml.ledger.api.v2.event.CreatedEvent
import com.daml.ledger.api.v2.offset_checkpoint.OffsetCheckpoint.toJavaProto
import com.daml.ledger.api.v2.package_reference.PackageReference
import com.daml.ledger.api.v2.package_service.{ListPackagesRequest, PackageServiceGrpc}
Expand Down Expand Up @@ -153,6 +154,8 @@ private[environment] class LedgerClient(
lapi.command_completion_service.CommandCompletionServiceGrpc.stub(channel)
private val stateServiceStub: lapi.state_service.StateServiceGrpc.StateServiceStub =
lapi.state_service.StateServiceGrpc.stub(channel)
private val contractServiceStub: lapi.contract_service.ContractServiceGrpc.ContractServiceStub =
lapi.contract_service.ContractServiceGrpc.stub(channel)
private val identityProviderConfigServiceStub
: identity_provider_config_service.IdentityProviderConfigServiceGrpc.IdentityProviderConfigServiceStub =
identity_provider_config_service.IdentityProviderConfigServiceGrpc.stub(channel)
Expand Down Expand Up @@ -195,6 +198,20 @@ private[environment] class LedgerClient(
.serverStreaming(request, stub.getActiveContracts)
)

def getContract(contractId: ContractId[?], queryingParties: Seq[PartyId])(implicit
tc: TraceContext
): Future[Option[CreatedEvent]] = {
for {
stub <- withCredentialsAndTraceContext(contractServiceStub)
contract <- stub.getContract(
new lapi.contract_service.GetContractRequest(
contractId.contractId,
queryingParties.map(_.toProtoPrimitive),
)
)
} yield contract.createdEvent
}

def updates(
request: GetUpdatesRequest
)(implicit tc: TraceContext): Source[LedgerClient.GetTreeUpdatesResponse, NotUsed] = {
Expand Down
Loading
Loading