Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
-- Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
-- SPDX-License-Identifier: Apache-2.0

-- Index on round column for efficient lookups by round number.
create index scan_rewards_reference_store_active_round
on scan_rewards_reference_store_active (store_id, migration_id, round)
where round is not null;

create index scan_rewards_reference_store_archived_round
on scan_rewards_reference_store_archived (store_id, migration_id, round)
where round is not null;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm always nervous when we're creating indexes on non-empty tables in flyway migrations, because migrations are blocking application startup. AFAICT, this table should be sufficiently small that the index creation completes in a second, so it's fine.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noted, thanks!

Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import com.digitalasset.canton.data.CantonTimestamp
import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging}
import com.digitalasset.canton.tracing.TraceContext
import com.github.blemale.scaffeine.Scaffeine
import org.lfdecentralizedtrust.splice.codegen.java.splice.round.OpenMiningRound
import org.lfdecentralizedtrust.splice.store.{Limit, MultiDomainAcsStore, SynchronizerStore}
import org.lfdecentralizedtrust.splice.util.Contract

import scala.concurrent.{ExecutionContext, Future}

Expand Down Expand Up @@ -50,6 +52,13 @@ class CachingScanRewardsReferenceStore private[splice] (
)(implicit tc: TraceContext): Future[Set[String]] =
featuredAppPartiesCache.get(asOf)

override def lookupOpenMiningRoundByNumber(
roundNumber: Long
)(implicit
tc: TraceContext
): Future[Option[Contract[OpenMiningRound.ContractId, OpenMiningRound]]] =
store.lookupOpenMiningRoundByNumber(roundNumber)

override val storeName: String = store.storeName
override def defaultLimit: Limit = store.defaultLimit
override lazy val acsContractFilter = store.acsContractFilter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@ import com.digitalasset.canton.resource.DbStorage
import com.digitalasset.canton.topology.{ParticipantId, PartyId, SynchronizerId}
import com.digitalasset.canton.tracing.TraceContext
import org.lfdecentralizedtrust.splice.codegen.java.splice
import org.lfdecentralizedtrust.splice.codegen.java.splice.round.OpenMiningRound
import org.lfdecentralizedtrust.splice.config.IngestionConfig
import org.lfdecentralizedtrust.splice.environment.RetryProvider
import org.lfdecentralizedtrust.splice.migration.DomainMigrationInfo
import org.lfdecentralizedtrust.splice.scan.store.db.ScanRewardsReferenceTables.ScanRewardsReferenceStoreRowData
import org.lfdecentralizedtrust.splice.store.{AppStore, Limit, MultiDomainAcsStore}
import org.lfdecentralizedtrust.splice.store.db.AcsInterfaceViewRowData
import org.lfdecentralizedtrust.splice.util.TemplateJsonDecoder
import org.lfdecentralizedtrust.splice.util.{Contract, TemplateJsonDecoder}

import scala.concurrent.{ExecutionContext, Future}

Expand Down Expand Up @@ -60,6 +61,16 @@ trait ScanRewardsReferenceStore extends AppStore {
asOf: CantonTimestamp
)(implicit tc: TraceContext): Future[Set[String]]

/** Look up an OpenMiningRound contract by its round number.
* Checks both the active ACS table and the archive table,
* since the round may have already been closed by the time the trigger runs.
*/
def lookupOpenMiningRoundByNumber(
roundNumber: Long
)(implicit
tc: TraceContext
): Future[Option[Contract[OpenMiningRound.ContractId, OpenMiningRound]]]

override lazy val acsContractFilter: MultiDomainAcsStore.ContractFilter[
ScanRewardsReferenceStoreRowData,
AcsInterfaceViewRowData.NoInterfacesIngested,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,21 @@ import org.lfdecentralizedtrust.splice.scan.store.ScanRewardsReferenceStore
import org.lfdecentralizedtrust.splice.store.{Limit, TcsStore}
import org.lfdecentralizedtrust.splice.store.db.{
AcsArchiveConfig,
AcsQueries,
AcsTables,
DbAppStore,
DbTcsStore,
StoreDescriptor,
}
import org.lfdecentralizedtrust.splice.util.{ContractWithState, TemplateJsonDecoder}
import org.lfdecentralizedtrust.splice.store.db.AcsQueries.SelectFromAcsTableResult
import org.lfdecentralizedtrust.splice.util.{
Contract,
ContractWithState,
PackageQualifiedName,
TemplateJsonDecoder,
}
import org.lfdecentralizedtrust.splice.util.FutureUnlessShutdownUtil.futureUnlessShutdownToFuture
import slick.jdbc.canton.ActionBasedSQLInterpolation.Implicits.actionBasedSQLInterpolationCanton

import scala.concurrent.{ExecutionContext, Future}

Expand Down Expand Up @@ -62,7 +72,9 @@ class DbScanRewardsReferenceStore(
)
),
)
with ScanRewardsReferenceStore {
with ScanRewardsReferenceStore
with AcsTables
with AcsQueries {

override def waitUntilInitialized: Future[Unit] = multiDomainAcsStore.waitUntilAcsIngested()

Expand Down Expand Up @@ -137,4 +149,47 @@ class DbScanRewardsReferenceStore(
tc: TraceContext
): Future[Seq[ContractWithState[OpenMiningRound.ContractId, OpenMiningRound]]] =
tcsStore.listAllContractsAsOf(OpenMiningRound.COMPANION, asOf)

override def lookupOpenMiningRoundByNumber(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should at least make use of acsStore.waitUntilAcsIngested.
It may be even better to wait if the round being looked up has not been ingested yet? we could compare max round in active table to assess that, and use acsStore.waitUntilRecordTimeReached?

If doing wait here is not possible, then it may be better to return something other than None?

If None is an acceptable response in such cases, (because the trigger would try again after a while?), then it should be made clear in comments.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The scenario would happen when scan has restarted and is catching up, The CalculateRewardsV2 would come from ScanStore which may be ahead of DbScanRewardsReferenceStore.
Currently it seems that the only thing the trigger would do when receiving None is retry?
it won't give up right?
so doing retry at the trigger level may be fine

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely add acsStore.waitUntilAcsIngested, the call might throw an exception otherwise ("using a storeId before it was initialized").

IMO it's fine to have ACS stores return data for the current state of the store (i.e., return None if the mining round has not been ingested yet), and let callers handle the fact that different stores might be lagging behind by arbitrary amounts of time.

As for triggers, after retrieveTasks() returns a task, the trigger will retry the task a few times, but if it keeps failing the trigger will eventually give up and retrieve the next task.

As long as you make sure retrieveTasks() is implemented such that it properly resumes from the persisted state, you should be fine.

I would still make sure we are not failing tasks just because some store is not caught up. Tasks that fail even after retries produce log warnings, and we alert on those in production. So either have retrieveTasks() only return tasks that have a reasonable chance of succeeding, or successfully complete a task if some data is missing.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed: Thanks for all the feedback here, I've added waitUntilAcsIngested as recommended

roundNumber: Long
)(implicit
tc: TraceContext
): Future[Option[Contract[OpenMiningRound.ContractId, OpenMiningRound]]] =
waitUntilInitialized.flatMap { _ =>
lookupOpenMiningRoundByNumberQuery(roundNumber)
}

private def lookupOpenMiningRoundByNumberQuery(
roundNumber: Long
)(implicit
tc: TraceContext
): Future[Option[Contract[OpenMiningRound.ContractId, OpenMiningRound]]] = {
val storeId = multiDomainAcsStore.acsStoreId
val migrationId = multiDomainAcsStore.domainMigrationId
val pqn = PackageQualifiedName.fromJavaCodegenCompanion(OpenMiningRound.COMPANION)
val columns = SelectFromAcsTableResult.sqlColumnsCommaSeparated()
val query =
sql"""(
select #$columns
from #${ScanRewardsReferenceTables.acsTableName} acs
where acs.store_id = $storeId
and acs.migration_id = $migrationId
and acs.package_name = ${pqn.packageName}
and acs.template_id_qualified_name = ${pqn.qualifiedName}
and acs.round = $roundNumber
) union all (
select #$columns
from #${ScanRewardsReferenceTables.archiveTableName} acs
where acs.store_id = $storeId
and acs.migration_id = $migrationId
and acs.package_name = ${pqn.packageName}
and acs.template_id_qualified_name = ${pqn.qualifiedName}
and acs.round = $roundNumber
) limit 1""".as[SelectFromAcsTableResult]
for {
result <- futureUnlessShutdownToFuture(
storage.query(query, "lookupOpenMiningRoundByNumber")
)
} yield result.headOption.map(contractFromRow(OpenMiningRound.COMPANION)(_))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,48 @@ class DbScanRewardsReferenceStoreTest
} yield succeed
}

"lookupOpenMiningRoundByNumber returns the correct contract" in {
val store = mkStore()
val omr3 = openMiningRound(dsoParty, round = 3, amuletPrice = 1.0)
.copy(createdAt = CantonTimestamp.ofEpochSecond(100).toInstant)
val omr4 = openMiningRound(dsoParty, round = 4, amuletPrice = 1.5)
.copy(createdAt = CantonTimestamp.ofEpochSecond(200).toInstant)
val omr5 = openMiningRound(dsoParty, round = 5, amuletPrice = 2.0)
.copy(createdAt = CantonTimestamp.ofEpochSecond(300).toInstant)
for {
_ <- initWithAcs()(store.multiDomainAcsStore)
_ <- sync1.create(omr3, recordTime = CantonTimestamp.ofEpochSecond(100).toInstant)(
store.multiDomainAcsStore
)
_ <- sync1.create(omr4, recordTime = CantonTimestamp.ofEpochSecond(200).toInstant)(
store.multiDomainAcsStore
)
_ <- sync1.create(omr5, recordTime = CantonTimestamp.ofEpochSecond(300).toInstant)(
store.multiDomainAcsStore
)
// Archive round 3 — it should still be found in the archive table
_ <- sync1.archive(omr3, recordTime = CantonTimestamp.ofEpochSecond(350).toInstant)(
store.multiDomainAcsStore
)

// Round 3: archived — found via archive table
result3 <- store.lookupOpenMiningRoundByNumber(3)
_ = result3 shouldBe Some(omr3)

// Round 4: still active — found via active table
result4 <- store.lookupOpenMiningRoundByNumber(4)
_ = result4 shouldBe Some(omr4)

// Round 5: still active
result5 <- store.lookupOpenMiningRoundByNumber(5)
_ = result5 shouldBe Some(omr5)

// Round 99: never existed
resultMissing <- store.lookupOpenMiningRoundByNumber(99)
_ = resultMissing shouldBe None
} yield succeed
}

"lookupActiveOpenMiningRounds" in {
val store = mkStore()
// Timeline (ingestion start = 250, earliest archived_at):
Expand Down
4 changes: 2 additions & 2 deletions project/BuildCommon.scala
Original file line number Diff line number Diff line change
Expand Up @@ -250,8 +250,8 @@ object BuildCommon {
"splice-util-token-standard-wallet-test-daml/clean",
"splice-util-batched-markers-daml/clean",
"splice-util-batched-markers-test-daml/clean",
"splice-featured-app-api-v1-daml/clean",
"splice-featured-app-api-v2-daml/clean",
"splice-api-featured-app-v1-daml/clean",
"splice-api-featured-app-v2-daml/clean",
).map(";" + _).mkString(""),
) ++
addCommandAlias("splice-clean", "; clean-splice") ++
Expand Down
Loading