Skip to content

Commit 403f5de

Browse files
committed
[ci] add test
Signed-off-by: Oriol Muñoz <oriol.munoz@digitalasset.com>
1 parent bc76315 commit 403f5de

File tree

7 files changed

+117
-20
lines changed

7 files changed

+117
-20
lines changed

apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/TokenStandardTransferIntegrationTest.scala

Lines changed: 50 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,20 +8,20 @@ import org.lfdecentralizedtrust.splice.codegen.java.splice.api.token.metadatav1
88
import org.lfdecentralizedtrust.splice.codegen.java.splice.api.token.transferinstructionv1.TransferInstruction
99
import org.lfdecentralizedtrust.splice.config.ConfigTransforms.{
1010
ConfigurableApp,
11+
updateAllScanAppConfigs_,
1112
updateAllSvAppFoundDsoConfigs_,
1213
updateAutomationConfig,
1314
}
14-
import org.lfdecentralizedtrust.splice.http.v0.definitions.TransferInstructionResultOutput.members
1515
import org.lfdecentralizedtrust.splice.http.v0.definitions.TransactionHistoryResponseItem.TransactionType as HttpTransactionType
16+
import org.lfdecentralizedtrust.splice.http.v0.definitions.TransferInstructionResultOutput.members
1617
import org.lfdecentralizedtrust.splice.http.v0.definitions.{
1718
AbortTransferInstruction,
1819
ReceiverAmount,
1920
Transfer,
2021
}
2122
import org.lfdecentralizedtrust.splice.integration.EnvironmentDefinition
2223
import org.lfdecentralizedtrust.splice.integration.tests.SpliceTests.IntegrationTestWithSharedEnvironment
23-
import org.lfdecentralizedtrust.splice.integration.tests.WalletTxLogTestUtil
24-
import org.lfdecentralizedtrust.splice.util.{DisclosedContracts, WalletTestUtil}
24+
import org.lfdecentralizedtrust.splice.util.{DisclosedContracts, TriggerTestUtil, WalletTestUtil}
2525
import org.lfdecentralizedtrust.splice.wallet.automation.CollectRewardsAndMergeAmuletsTrigger
2626
import org.lfdecentralizedtrust.splice.wallet.store.{
2727
BalanceChangeTxLogEntry,
@@ -39,6 +39,7 @@ class TokenStandardTransferIntegrationTest
3939
extends IntegrationTestWithSharedEnvironment
4040
with WalletTestUtil
4141
with WalletTxLogTestUtil
42+
with TriggerTestUtil
4243
with HasActorSystem
4344
with HasExecutionContext {
4445

@@ -56,10 +57,56 @@ class TokenStandardTransferIntegrationTest
5657
_.copy(zeroTransferFees = true)
5758
)(config)
5859
)
60+
.addConfigTransforms((_, config) =>
61+
updateAllScanAppConfigs_(scanConfig =>
62+
scanConfig.copy(parameters =
63+
scanConfig.parameters.copy(contractFetchLedgerFallbackEnabled = true)
64+
)
65+
)(config)
66+
)
5967
}
6068

6169
"Token Standard Transfers should" should {
6270

71+
"TransferInstruction context can be fetched from Scan even if it's not yet ingested into the store" in {
72+
implicit env =>
73+
pauseScanIngestionWithin(sv1ScanBackend) {
74+
onboardWalletUser(aliceWalletClient, aliceValidatorBackend)
75+
val bobUserParty = onboardWalletUser(bobWalletClient, bobValidatorBackend)
76+
aliceWalletClient.tap(100)
77+
78+
val response = actAndCheck(
79+
"Alice creates transfer offer",
80+
aliceWalletClient.createTokenStandardTransfer(
81+
bobUserParty,
82+
10,
83+
s"Transfer",
84+
CantonTimestamp.now().plusSeconds(3600L),
85+
UUID.randomUUID().toString,
86+
),
87+
)(
88+
"Alice and Bob see it",
89+
_ => {
90+
Seq(aliceWalletClient, bobWalletClient).foreach(
91+
_.listTokenStandardTransfers() should have size 1
92+
)
93+
},
94+
)._1
95+
96+
val cid = response.output match {
97+
case members.TransferInstructionPending(value) =>
98+
new TransferInstruction.ContractId(value.transferInstructionCid)
99+
case _ => fail("The transfers were expected to be pending.")
100+
}
101+
102+
clue("SV-1's Scan sees it (stiil, even though ingestion is paused)") {
103+
eventuallySucceeds() {
104+
sv1ScanBackend.getTransferInstructionAcceptContext(cid)
105+
}
106+
}
107+
}
108+
}
109+
63110
"support create, list, accept, reject and withdraw" in { implicit env =>
64111
val aliceUserParty = onboardWalletUser(aliceWalletClient, aliceValidatorBackend)
65112
val bobUserParty = onboardWalletUser(bobWalletClient, bobValidatorBackend)

apps/app/src/test/scala/org/lfdecentralizedtrust/splice/util/TriggerTestUtil.scala

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@ package org.lfdecentralizedtrust.splice.util
22

33
import com.digitalasset.canton.{BaseTest, ScalaFuturesWithPatience}
44
import com.typesafe.scalalogging.LazyLogging
5-
import org.lfdecentralizedtrust.splice.automation.Trigger
5+
import org.lfdecentralizedtrust.splice.automation.{Trigger, UpdateIngestionService}
6+
import org.lfdecentralizedtrust.splice.console.ScanAppBackendReference
67
import org.lfdecentralizedtrust.splice.integration.EnvironmentDefinition.sv1Backend
78
import org.lfdecentralizedtrust.splice.integration.tests.SpliceTests.SpliceTestConsoleEnvironment
89
import org.lfdecentralizedtrust.splice.sv.automation.delegatebased.AdvanceOpenMiningRoundTrigger
@@ -34,6 +35,17 @@ trait TriggerTestUtil { self: BaseTest =>
3435
advanceOpenMiningRoundTrigger.runOnce().futureValue should be(true)
3536
}
3637
}
38+
39+
def pauseScanIngestionWithin[T](scan: ScanAppBackendReference)(codeBlock: => T): T = {
40+
try {
41+
logger.info(s"Pausing ingestion for ${scan.name}")
42+
scan.automation.services[UpdateIngestionService].foreach(_.pause().futureValue)
43+
codeBlock
44+
} finally {
45+
logger.info(s"Resuming ingestion for ${scan.name}")
46+
scan.automation.services[UpdateIngestionService].foreach(_.resume())
47+
}
48+
}
3749
}
3850

3951
object TriggerTestUtil extends ScalaFuturesWithPatience with LazyLogging {

apps/common/src/main/scala/org/lfdecentralizedtrust/splice/automation/AutomationService.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,11 +83,13 @@ abstract class AutomationService(
8383
}
8484

8585
/** Returns all triggers of the given class */
86-
final def triggers[T <: Trigger](implicit tag: ClassTag[T]): Seq[T] = {
86+
final def triggers[T <: Trigger](implicit tag: ClassTag[T]): Seq[T] = services[T](tag)
87+
88+
final def services[T](implicit tag: ClassTag[T]): Seq[T] = {
8789
backgroundServices
8890
.get()
89-
.collect { case trigger: T =>
90-
trigger
91+
.collect { case service: T =>
92+
service
9193
}
9294
}
9395

apps/common/src/main/scala/org/lfdecentralizedtrust/splice/automation/UpdateIngestionService.scala

Lines changed: 45 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import com.digitalasset.canton.tracing.TraceContext
1919
import io.opentelemetry.api.trace.Tracer
2020
import org.lfdecentralizedtrust.splice.store.MultiDomainAcsStore.IngestionSink.IngestionStart
2121

22-
import scala.concurrent.{ExecutionContext, Future}
22+
import scala.concurrent.{ExecutionContext, Future, Promise, blocking}
2323

2424
/** Ingestion for ACS and transfer stores.
2525
* We ingest them independently but we ensure that the acs store
@@ -97,13 +97,18 @@ class UpdateIngestionService(
9797
private def process(
9898
msgs: Seq[GetTreeUpdatesResponse]
9999
)(implicit traceContext: TraceContext): Future[Unit] = {
100-
NonEmptyList.fromFoldable(msgs) match {
101-
case Some(batch) =>
102-
logger.debug(s"Processing batch of ${batch.size} elements")
103-
ingestionSink.ingestUpdateBatch(batch.map(_.updateOrCheckpoint))
104-
case None =>
105-
logger.error("Received empty batch of updates to ingest. This is never supposed to happen.")
106-
Future.unit
100+
// if paused, this step will backpressure the source
101+
waitForResumePromise.future.flatMap { _ =>
102+
NonEmptyList.fromFoldable(msgs) match {
103+
case Some(batch) =>
104+
logger.debug(s"Processing batch of ${batch.size} elements")
105+
ingestionSink.ingestUpdateBatch(batch.map(_.updateOrCheckpoint))
106+
case None =>
107+
logger.error(
108+
"Received empty batch of updates to ingest. This is never supposed to happen."
109+
)
110+
Future.unit
111+
}
107112
}
108113
}
109114

@@ -124,4 +129,36 @@ class UpdateIngestionService(
124129

125130
// Kick-off the ingestion
126131
startIngestion()
132+
133+
@SuppressWarnings(Array("org.wartremover.warts.Var"))
134+
@volatile
135+
private var waitForResumePromise = Promise.successful(())
136+
137+
/** Note that any in-flight events being processed when `pause` is called will still be processed.
138+
* For test purposes.
139+
*/
140+
def pause(): Future[Unit] = blocking {
141+
withNewTrace(this.getClass.getSimpleName) { implicit traceContext => _ =>
142+
logger.info("Pausing UpdateIngestionService.")
143+
blocking {
144+
synchronized {
145+
if (waitForResumePromise.isCompleted) {
146+
waitForResumePromise = Promise()
147+
}
148+
Future.successful(())
149+
}
150+
}
151+
}
152+
}
153+
154+
def resume(): Unit = blocking {
155+
withNewTrace(this.getClass.getSimpleName) { implicit traceContext => _ =>
156+
logger.info("Resuming UpdateIngestionService.")
157+
blocking {
158+
synchronized {
159+
val _ = waitForResumePromise.trySuccess(())
160+
}
161+
}
162+
}
163+
}
127164
}

apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/ContractFetcher.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ object ContractFetcher {
5252
OptionT(store.multiDomainAcsStore.lookupContractById(companion)(id))
5353
.map(_.contract)
5454
.orElse(
55-
OptionT(fallbackLedgerClient.getContract(id, Seq(store.multiDomainAcsStore.storePartyId)))
55+
OptionT(fallbackLedgerClient.getContract(id, Seq(store.multiDomainAcsStore.storeParty)))
5656
.subflatMap { createdEvent =>
5757
companionClass
5858
.fromCreatedEvent(companion)(CreatedEvent.fromProto(toJavaProto(createdEvent)))

apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/MultiDomainAcsStore.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,12 +52,11 @@ import scala.jdk.CollectionConverters.*
5252

5353
trait MultiDomainAcsStore extends HasIngestionSink with AutoCloseable with NamedLogging {
5454
protected def storeName: String
55-
protected def storeParty: String
56-
def storePartyId = PartyId.tryFromProtoPrimitive(storeParty)
55+
def storeParty: PartyId
5756

5857
protected implicit lazy val mc: MetricsContext = MetricsContext(
5958
"store_name" -> storeName,
60-
"store_party" -> storeParty,
59+
"store_party" -> storeParty.toString, // using .toString for historical reasons
6160
)
6261
protected def metricsFactory: LabeledMetricsFactory
6362

apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/db/DbMultiDomainAcsStore.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ final class DbMultiDomainAcsStore[TXE](
115115
import profile.api.jdbcActionExtensionMethods
116116

117117
override lazy val storeName = acsStoreDescriptor.name
118-
override lazy val storeParty = acsStoreDescriptor.party.toString
118+
override lazy val storeParty = acsStoreDescriptor.party
119119

120120
override protected def metricsFactory: LabeledMetricsFactory = retryProvider.metricsFactory
121121
override lazy val metrics = new StoreMetrics(metricsFactory)(mc)

0 commit comments

Comments
 (0)