diff --git a/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/plugins/UpdateHistorySanityCheckPlugin.scala b/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/plugins/UpdateHistorySanityCheckPlugin.scala index d2d785c133..e4d6dd2571 100644 --- a/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/plugins/UpdateHistorySanityCheckPlugin.scala +++ b/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/plugins/UpdateHistorySanityCheckPlugin.scala @@ -270,7 +270,7 @@ class UpdateHistorySanityCheckPlugin( interval = Span(100, Millis), ) eventually { - scan.automation.store.updateHistory + scan.automation.updateHistory .getBackfillingState() .futureValue should be(BackfillingState.Complete) }( diff --git a/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/DecentralizedSynchronizerMigrationIntegrationTest.scala b/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/DecentralizedSynchronizerMigrationIntegrationTest.scala index 374e289e11..1d9a9a0076 100644 --- a/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/DecentralizedSynchronizerMigrationIntegrationTest.scala +++ b/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/DecentralizedSynchronizerMigrationIntegrationTest.scala @@ -1002,14 +1002,14 @@ class DecentralizedSynchronizerMigrationIntegrationTest withClueAndLog("Backfilled history includes ACS import") { eventually() { - sv1ScanLocalBackend.appState.store.updateHistory.sourceHistory + sv1ScanLocalBackend.appState.automation.updateHistory.sourceHistory .migrationInfo(1L) .futureValue .exists(_.complete) should be(true) } val backfilledUpdates = - sv1ScanLocalBackend.appState.store.updateHistory + sv1ScanLocalBackend.appState.automation.updateHistory .getAllUpdates(None, PageLimit.tryCreate(1000)) .futureValue backfilledUpdates.collect { diff --git a/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/ScanHistoryBackfillingIntegrationTest.scala b/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/ScanHistoryBackfillingIntegrationTest.scala index 9ae9731349..fc507962fd 100644 --- a/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/ScanHistoryBackfillingIntegrationTest.scala +++ b/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/ScanHistoryBackfillingIntegrationTest.scala @@ -279,8 +279,8 @@ class ScanHistoryBackfillingIntegrationTest )( "History marked as free of corrupt snapshots", _ => { - sv1ScanBackend.appState.store.updateHistory.corruptAcsSnapshotsDeleted shouldBe true - sv2ScanBackend.appState.store.updateHistory.corruptAcsSnapshotsDeleted shouldBe true + sv1ScanBackend.appState.automation.updateHistory.corruptAcsSnapshotsDeleted shouldBe true + sv2ScanBackend.appState.automation.updateHistory.corruptAcsSnapshotsDeleted shouldBe true }, ) @@ -307,7 +307,7 @@ class ScanHistoryBackfillingIntegrationTest )( "Backfilling is complete only on the founding SV", _ => { - sv1ScanBackend.appState.store.updateHistory + sv1ScanBackend.appState.automation.updateHistory .getBackfillingState() .futureValue should be(BackfillingState.Complete) // Update history is complete at this point, but the status endpoint only reports @@ -315,7 +315,7 @@ class ScanHistoryBackfillingIntegrationTest sv1ScanBackend.getBackfillingStatus().complete shouldBe false readUpdateHistoryFromScan(sv1ScanBackend) should not be empty - sv2ScanBackend.appState.store.updateHistory + sv2ScanBackend.appState.automation.updateHistory .getBackfillingState() .futureValue should be(BackfillingState.InProgress(false, false)) sv2ScanBackend.getBackfillingStatus().complete shouldBe false @@ -354,12 +354,12 @@ class ScanHistoryBackfillingIntegrationTest )( "All backfilling is complete", _ => { - sv1ScanBackend.appState.store.updateHistory + sv1ScanBackend.appState.automation.updateHistory .getBackfillingState() .futureValue should be(BackfillingState.Complete) // Update history is complete, TxLog is not sv1ScanBackend.getBackfillingStatus().complete shouldBe false - sv2ScanBackend.appState.store.updateHistory + sv2ScanBackend.appState.automation.updateHistory .getBackfillingState() .futureValue should be(BackfillingState.Complete) // Update history is complete, TxLog is not @@ -444,7 +444,7 @@ class ScanHistoryBackfillingIntegrationTest clue("Compare scan history with participant update stream") { compareHistory( sv1Backend.participantClient, - sv1ScanBackend.appState.store.updateHistory, + sv1ScanBackend.appState.automation.updateHistory, ledgerBeginSv1, ) } @@ -554,7 +554,7 @@ class ScanHistoryBackfillingIntegrationTest private def allUpdatesFromScanBackend(scanBackend: ScanAppBackendReference) = { // Need to use the store directly, as the HTTP endpoint refuses to return data unless it's completely backfilled - scanBackend.appState.store.updateHistory + scanBackend.appState.automation.updateHistory .getAllUpdates(None, PageLimit.tryCreate(1000)) .futureValue } diff --git a/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/ScanTimeBasedIntegrationTest.scala b/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/ScanTimeBasedIntegrationTest.scala index bf27641576..aca58ef070 100644 --- a/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/ScanTimeBasedIntegrationTest.scala +++ b/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/ScanTimeBasedIntegrationTest.scala @@ -439,7 +439,7 @@ class ScanTimeBasedIntegrationTest "Wait for backfilling to complete, as the ACS snapshot trigger is paused until then" ) { eventually() { - sv1ScanBackend.automation.store.updateHistory + sv1ScanBackend.automation.updateHistory .getBackfillingState() .futureValue should be(BackfillingState.Complete) advanceTime(sv1ScanBackend.config.automation.pollingInterval.asJava) diff --git a/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/ScanTotalSupplyBigQueryIntegrationTest.scala b/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/ScanTotalSupplyBigQueryIntegrationTest.scala index f5f04bacef..94ce14cfef 100644 --- a/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/ScanTotalSupplyBigQueryIntegrationTest.scala +++ b/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/ScanTotalSupplyBigQueryIntegrationTest.scala @@ -325,7 +325,7 @@ class ScanTotalSupplyBigQueryIntegrationTest case db: DbStorage => db case s => fail(s"non-DB storage configured, unsupported for BigQuery: ${s.getClass}") } - val sourceHistoryId = sv1ScanBackend.appState.store.updateHistory.historyId + val sourceHistoryId = sv1ScanBackend.appState.automation.updateHistory.historyId copyTableToBigQuery( "update_history_creates", diff --git a/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/UpdateHistoryIntegrationTest.scala b/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/UpdateHistoryIntegrationTest.scala index d009316ea4..d8c814fca8 100644 --- a/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/UpdateHistoryIntegrationTest.scala +++ b/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/UpdateHistoryIntegrationTest.scala @@ -154,7 +154,7 @@ class UpdateHistoryIntegrationTest eventually() { compareHistory( sv1Backend.participantClient, - sv1ScanBackend.appState.store.updateHistory, + sv1ScanBackend.appState.automation.updateHistory, ledgerBeginSv1, ) } @@ -179,30 +179,16 @@ class UpdateHistoryIntegrationTest .lookupUserWallet(aliceWalletClient.config.ledgerApiUser) .futureValue .getOrElse(throw new RuntimeException("Alice wallet should exist")) - .store + .automation .updateHistory, ledgerBeginAlice, true, ) } - eventually() { - compareHistory( - sv1Backend.participantClient, - sv1Backend.appState.svStore.updateHistory, - ledgerBeginSv1, - ) - } - eventually() { - compareHistory( - sv1Backend.participantClient, - sv1Backend.appState.dsoStore.updateHistory, - ledgerBeginSv1, - ) - } eventually() { compareHistory( aliceValidatorBackend.participantClient, - aliceValidatorBackend.appState.store.updateHistory, + aliceValidatorBackend.appState.automation.updateHistory, ledgerBeginAlice, ) } diff --git a/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/WalletTxLogIntegrationTest.scala b/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/WalletTxLogIntegrationTest.scala index ca849bfa65..df519e03b8 100644 --- a/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/WalletTxLogIntegrationTest.scala +++ b/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/WalletTxLogIntegrationTest.scala @@ -7,7 +7,7 @@ import org.lfdecentralizedtrust.splice.codegen.java.splice.wallet.subscriptions import org.lfdecentralizedtrust.splice.config.ConfigTransforms import org.lfdecentralizedtrust.splice.integration.EnvironmentDefinition import org.lfdecentralizedtrust.splice.integration.tests.SpliceTests.IntegrationTestWithSharedEnvironment -import org.lfdecentralizedtrust.splice.store.Limit +import org.lfdecentralizedtrust.splice.store.{Limit, PageLimit} import org.lfdecentralizedtrust.splice.sv.automation.delegatebased.AnsSubscriptionRenewalPaymentTrigger import org.lfdecentralizedtrust.splice.sv.config.InitialAnsConfig import org.lfdecentralizedtrust.splice.util.{ @@ -1226,12 +1226,27 @@ class WalletTxLogIntegrationTest logEntry.senderHoldingFees should beWithin(0, smallAmount) logEntry.amuletPrice shouldBe amuletPrice } + val expectedTxLogEntries = Seq(renewTxLog, creationTxLog, tapTxLog) checkTxHistory( bobValidatorWalletClient, - Seq(renewTxLog, creationTxLog, tapTxLog), + expectedTxLogEntries, trafficTopups = IgnoreTopupsDevNet, ) + clue("Check UpdateHistory works for external parties") { + inside( + bobValidatorBackend.appState.walletManager + .valueOrFail("WalletManager is expected to be defined") + .externalPartyWalletManager + .lookupExternalPartyWallet(onboarding.party) + .valueOrFail(s"Expected ${onboarding.party} to have an external party wallet") + .updateHistory + .getAllUpdates(None, PageLimit.Max) + .futureValue + ) { history => + history.size should be >= expectedTxLogEntries.size + } + } } "handle failed automation (direct transfer)" in { implicit env => diff --git a/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/util/UpdateHistoryTestUtil.scala b/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/util/UpdateHistoryTestUtil.scala index 7f6c821f3b..d3c9ff3ef3 100644 --- a/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/util/UpdateHistoryTestUtil.scala +++ b/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/util/UpdateHistoryTestUtil.scala @@ -151,7 +151,7 @@ trait UpdateHistoryTestUtil extends TestCommon { scanBackend: ScanAppBackendReference, scanClient: ScanAppClientReference, ): Assertion = { - val historyFromStore = scanBackend.appState.store.updateHistory + val historyFromStore = scanBackend.appState.automation.updateHistory .getAllUpdates( None, PageLimit.tryCreate(1000), diff --git a/apps/common/src/main/resources/db/migration/canton-network/postgres/stable/V051__truncate_sv_splitwell_history.sql b/apps/common/src/main/resources/db/migration/canton-network/postgres/stable/V051__truncate_sv_splitwell_history.sql new file mode 100644 index 0000000000..13e769312c --- /dev/null +++ b/apps/common/src/main/resources/db/migration/canton-network/postgres/stable/V051__truncate_sv_splitwell_history.sql @@ -0,0 +1,26 @@ +-- if the only store descriptors belong to the SV app or Splitwell, that means we can truncate the update_history tables + +DO $$ +DECLARE + descriptors TEXT[]; +BEGIN + +-- array equality (ordered) ensures that exactly the provided descriptors, no more, no less, are there <=> it's the SV app or splitwell +select array_agg(store_name order by store_name) into descriptors +from update_history_descriptors; + +IF (descriptors = '{"DbSvDsoStore", "DbSvSvStore"}' OR descriptors = '{"DbSplitwellStore"}') THEN + RAISE NOTICE 'Truncating update history tables as only SV/Splitwell app descriptors are present. Descriptors: %', descriptors::text; + EXECUTE 'TRUNCATE TABLE update_history_assignments CASCADE'; + EXECUTE 'TRUNCATE TABLE update_history_unassignments CASCADE'; + EXECUTE 'TRUNCATE TABLE update_history_backfilling CASCADE'; + EXECUTE 'TRUNCATE TABLE update_history_creates CASCADE'; + EXECUTE 'TRUNCATE TABLE update_history_exercises CASCADE'; + EXECUTE 'TRUNCATE TABLE update_history_transactions CASCADE'; + EXECUTE 'TRUNCATE TABLE update_history_last_ingested_offsets CASCADE'; + EXECUTE 'TRUNCATE TABLE update_history_descriptors CASCADE'; +ELSE + RAISE NOTICE 'This is not the SV or Splitwell app, NOT truncating update history tables. Descriptors: %', descriptors::text; +END IF; + +END $$; diff --git a/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/automation/AutomationService.scala b/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/automation/AutomationService.scala index feca353888..7cece6d441 100644 --- a/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/automation/AutomationService.scala +++ b/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/automation/AutomationService.scala @@ -23,7 +23,7 @@ import scala.reflect.ClassTag /** Shared base class for running ingestion and task-handler automation in applications. */ abstract class AutomationService( - private val automationConfig: AutomationConfig, + protected val automationConfig: AutomationConfig, clock: Clock, domainTimeSync: DomainTimeSynchronization, domainUnpausedSync: DomainUnpausedSynchronization, diff --git a/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/automation/SpliceAppAutomationService.scala b/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/automation/SpliceAppAutomationService.scala index a0b9decee6..6051e8b677 100644 --- a/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/automation/SpliceAppAutomationService.scala +++ b/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/automation/SpliceAppAutomationService.scala @@ -15,6 +15,7 @@ import org.lfdecentralizedtrust.splice.store.{ AppStoreWithIngestion, DomainTimeSynchronization, DomainUnpausedSynchronization, + UpdateHistory, } import com.digitalasset.canton.time.{Clock, WallClock} import com.digitalasset.canton.tracing.TraceContext @@ -37,7 +38,6 @@ abstract class SpliceAppAutomationService[Store <: AppStore]( ledgerClient: SpliceLedgerClient, retryProvider: RetryProvider, ingestFromParticipantBegin: Boolean, - ingestUpdateHistoryFromParticipantBegin: Boolean, parametersConfig: SpliceParametersConfig, )(implicit ec: ExecutionContext, @@ -97,6 +97,24 @@ abstract class SpliceAppAutomationService[Store <: AppStore]( case SpliceLedgerConnectionPriority.AmuletExpiry => amuletExpiryConnection } + final protected def registerUpdateHistoryIngestion( + updateHistory: UpdateHistory, + ingestUpdateHistoryFromParticipantBegin: Boolean, + ): Unit = { + registerService( + new UpdateIngestionService( + updateHistory.getClass.getSimpleName, + updateHistory.ingestionSink, + connection(SpliceLedgerConnectionPriority.High), + automationConfig, + backoffClock = triggerContext.pollingClock, + triggerContext.retryProvider, + triggerContext.loggerFactory, + ingestUpdateHistoryFromParticipantBegin, + ) + ) + } + private def completionOffsetCallback(offset: Long): Future[Unit] = store.multiDomainAcsStore.signalWhenIngestedOrShutdown(offset)(TraceContext.empty) @@ -113,19 +131,6 @@ abstract class SpliceAppAutomationService[Store <: AppStore]( ) ) - registerService( - new UpdateIngestionService( - store.updateHistory.getClass.getSimpleName, - store.updateHistory.ingestionSink, - connection(SpliceLedgerConnectionPriority.High), - automationConfig, - backoffClock = triggerContext.pollingClock, - triggerContext.retryProvider, - triggerContext.loggerFactory, - ingestUpdateHistoryFromParticipantBegin, - ) - ) - registerTrigger( new DomainIngestionService( store.domains.ingestionSink, diff --git a/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/automation/SqlIndexInitializationTrigger.scala b/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/automation/SqlIndexInitializationTrigger.scala index 273d6b6333..543ccbe01d 100644 --- a/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/automation/SqlIndexInitializationTrigger.scala +++ b/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/automation/SqlIndexInitializationTrigger.scala @@ -8,7 +8,7 @@ import com.digitalasset.canton.data.CantonTimestamp import com.digitalasset.canton.discard.Implicits.DiscardOps import com.digitalasset.canton.lifecycle.{FutureUnlessShutdown, *} import com.digitalasset.canton.logging.pretty.{Pretty, PrettyPrinting} -import com.digitalasset.canton.resource.{DbStorage, Storage} +import com.digitalasset.canton.resource.DbStorage import com.digitalasset.canton.tracing.TraceContext import io.opentelemetry.api.trace.Tracer import org.apache.pekko.stream.Materializer @@ -150,23 +150,19 @@ class SqlIndexInitializationTrigger( object SqlIndexInitializationTrigger { def apply( - storage: Storage, + storage: DbStorage, triggerContext: TriggerContext, indexActions: List[IndexAction] = defaultIndexActions, )(implicit ec: ExecutionContextExecutor, tracer: Tracer, mat: Materializer, - ): SqlIndexInitializationTrigger = storage match { - case dbStorage: DbStorage => - new SqlIndexInitializationTrigger( - dbStorage, - triggerContext, - indexActions, - ) - case storageType => - // Same behavior as in `ScanStore.apply` and similar - we only really support DbStorage in our apps. - throw new RuntimeException(s"Unsupported storage type $storageType") + ): SqlIndexInitializationTrigger = { + new SqlIndexInitializationTrigger( + storage, + triggerContext, + indexActions, + ) } sealed trait IndexStatus diff --git a/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/automation/TxLogBackfillingTrigger.scala b/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/automation/TxLogBackfillingTrigger.scala index e09ea06fdd..391d8c850f 100644 --- a/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/automation/TxLogBackfillingTrigger.scala +++ b/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/automation/TxLogBackfillingTrigger.scala @@ -10,6 +10,7 @@ import org.lfdecentralizedtrust.splice.store.{ HistoryMetrics, TxLogAppStore, TxLogBackfilling, + UpdateHistory, } import com.digitalasset.canton.logging.pretty.{Pretty, PrettyPrinting} import com.digitalasset.canton.topology.PartyId @@ -23,6 +24,7 @@ import scala.concurrent.{ExecutionContext, Future} class TxLogBackfillingTrigger[TXE]( store: TxLogAppStore[TXE], + updateHistory: UpdateHistory, batchSize: Int, override protected val context: TriggerContext, )(implicit @@ -31,13 +33,13 @@ class TxLogBackfillingTrigger[TXE]( mat: Materializer, ) extends PollingParallelTaskExecutionTrigger[TxLogBackfillingTrigger.Task] { - private def party: PartyId = store.updateHistory.updateStreamParty + private def party: PartyId = updateHistory.updateStreamParty override protected def extraMetricLabels = Seq( "party" -> party.toProtoPrimitive ) - private val currentMigrationId = store.updateHistory.domainMigrationInfo.currentMigrationId + private val currentMigrationId = updateHistory.domainMigrationInfo.currentMigrationId private val historyMetrics = new HistoryMetrics(context.metricsFactory)( MetricsContext.Empty @@ -48,7 +50,7 @@ class TxLogBackfillingTrigger[TXE]( ) private val backfilling = new TxLogBackfilling( store.multiDomainAcsStore, - store.updateHistory, + updateHistory, batchSize, context.loggerFactory, ) @@ -56,7 +58,7 @@ class TxLogBackfillingTrigger[TXE]( override def retrieveTasks()(implicit tc: TraceContext ): Future[Seq[TxLogBackfillingTrigger.Task]] = { - if (!store.updateHistory.isReady) { + if (!updateHistory.isReady) { logger.debug("UpdateHistory is not yet ready") Future.successful(Seq.empty) } else if (!store.multiDomainAcsStore.destinationHistory.isReady) { @@ -64,7 +66,7 @@ class TxLogBackfillingTrigger[TXE]( Future.successful(Seq.empty) } else { for { - sourceState <- store.updateHistory.getBackfillingState() + sourceState <- updateHistory.getBackfillingState() destinationState <- store.multiDomainAcsStore.getTxLogBackfillingState() } yield { sourceState match { diff --git a/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/environment/NodeBootstrapBase.scala b/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/environment/NodeBootstrapBase.scala index 2666d8d92f..8906f81f35 100644 --- a/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/environment/NodeBootstrapBase.scala +++ b/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/environment/NodeBootstrapBase.scala @@ -14,7 +14,7 @@ import com.digitalasset.canton.crypto.Crypto import com.digitalasset.canton.environment.{CantonNode, CantonNodeBootstrap, CantonNodeParameters} import com.digitalasset.canton.lifecycle.{HasCloseContext, LifeCycle} import com.digitalasset.canton.logging.NamedLoggerFactory -import com.digitalasset.canton.resource.StorageFactory +import com.digitalasset.canton.resource.{DbStorage, StorageFactory} import com.digitalasset.canton.telemetry.ConfiguredOpenTelemetry import com.digitalasset.canton.time.Clock import com.digitalasset.canton.tracing.{NoTracing, TracerProvider} @@ -102,7 +102,10 @@ abstract class NodeBootstrapBase[ nodeMetrics.storageMetrics, parameterConfig.processingTimeouts, loggerFactory, - ) + ) match { + case storage: DbStorage => storage + case storageType => throw new RuntimeException(s"Unsupported storage type $storageType") + } protected val httpAdminService: HttpAdminService = HttpAdminService( nodeConfig.nodeTypeName, diff --git a/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/AppStore.scala b/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/AppStore.scala index 25ec39bd3a..959bb51931 100644 --- a/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/AppStore.scala +++ b/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/AppStore.scala @@ -12,17 +12,18 @@ import scala.concurrent.ExecutionContext */ trait AppStore extends NamedLogging with AutoCloseable with StoreErrors { + val storeName: String + implicit protected def ec: ExecutionContext /** Defines which create events are to be ingested into the store. */ - protected def acsContractFilter + def acsContractFilter : MultiDomainAcsStore.ContractFilter[_ <: AcsRowData, _ <: AcsInterfaceViewRowData] def domains: SynchronizerStore def multiDomainAcsStore: MultiDomainAcsStore - def updateHistory: UpdateHistory } trait TxLogAppStore[TXE] extends AppStore { diff --git a/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/HistoryMetrics.scala b/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/HistoryMetrics.scala index 60d4f984fc..c19827e87a 100644 --- a/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/HistoryMetrics.scala +++ b/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/HistoryMetrics.scala @@ -266,3 +266,9 @@ class HistoryMetrics(metricsFactory: LabeledMetricsFactory)(implicit ImportUpdatesBackfilling.completed.close() } } + +object HistoryMetrics { + def apply(metricsFactory: LabeledMetricsFactory, currentMigrationId: Long) = new HistoryMetrics( + metricsFactory + )(MetricsContext("current_migration_id" -> currentMigrationId.toString)) +} diff --git a/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/UpdateHistory.scala b/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/UpdateHistory.scala index f1cf84e8cf..cee48d3756 100644 --- a/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/UpdateHistory.scala +++ b/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/UpdateHistory.scala @@ -41,7 +41,7 @@ import com.digitalasset.canton.config.CantonRequireTypes.String256M import com.digitalasset.canton.data.CantonTimestamp import com.digitalasset.canton.lifecycle.CloseContext import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging} -import com.digitalasset.canton.resource.{DbStorage, Storage} +import com.digitalasset.canton.resource.DbStorage import com.digitalasset.canton.topology.{ParticipantId, PartyId, SynchronizerId} import com.digitalasset.canton.tracing.TraceContext import slick.dbio.{DBIO, DBIOAction, Effect, NoStream} @@ -93,14 +93,15 @@ class UpdateHistory( override protected val loggerFactory: NamedLoggerFactory, enableissue12777Workaround: Boolean, enableImportUpdateBackfill: Boolean, - val oMetrics: Option[HistoryMetrics] = None, + metrics: HistoryMetrics, )(implicit ec: ExecutionContext, closeContext: CloseContext, ) extends HasIngestionSink with AcsJdbcTypes with AcsQueries - with NamedLogging { + with NamedLogging + with AutoCloseable { override lazy val profile: JdbcProfile = storage.api.jdbcProfile @@ -118,9 +119,13 @@ class UpdateHistory( s.copy(lastIngestedRecordTime = Some(ts)) } (for { - metrics <- oMetrics lastIngestedRecordTime <- newState.lastIngestedRecordTime - } yield metrics.UpdateHistory.latestRecordTime.updateValue(lastIngestedRecordTime)).discard + } yield metrics.UpdateHistory.latestRecordTime.updateValue(lastIngestedRecordTime)( + MetricsContext( + "update_stream_party" -> updateStreamParty.toProtoPrimitive, + "store_name" -> storeName, + ) + )).discard } def waitUntilInitialized: Future[Unit] = state.get().initialized.future @@ -133,6 +138,8 @@ class UpdateHistory( def isReady: Boolean = state.get().historyId.isDefined + override def close(): Unit = metrics.close() + lazy val ingestionSink: MultiDomainAcsStore.IngestionSink = new MultiDomainAcsStore.IngestionSink { override def ingestionFilter: IngestionFilter = IngestionFilter( @@ -337,15 +344,11 @@ class UpdateHistory( None } - val timeIngestion = oMetrics - .map(metrics => - (future: Future[Unit]) => - metrics.UpdateHistory.latency - .timeFuture[Unit](future)( - metrics.metricsContextFromUpdate(updateOrCheckpoint, backfilling = false) - ) - ) - .getOrElse(identity[Future[Unit]]) + val timeIngestion = (future: Future[Unit]) => + metrics.UpdateHistory.latency + .timeFuture(future)( + metrics.metricsContextFromUpdate(updateOrCheckpoint, backfilling = false) + ) timeIngestion { // Note: in theory, it's enough if this action is atomic - there should only be a single @@ -408,14 +411,12 @@ class UpdateHistory( .queryAndUpdate(action, "ingestUpdate") .map { ingestedEvents => recordTime.foreach(advanceLastIngestedRecordTime) - oMetrics.foreach { metrics => - metrics.UpdateHistory.eventCount.inc(ingestedEvents.numCreatedEvents)( - MetricsContext("event_type" -> "created") - ) - metrics.UpdateHistory.eventCount.inc(ingestedEvents.numExercisedEvents)( - MetricsContext("event_type" -> "exercised") - ) - } + metrics.UpdateHistory.eventCount.inc(ingestedEvents.numCreatedEvents)( + MetricsContext("event_type" -> "created") + ) + metrics.UpdateHistory.eventCount.inc(ingestedEvents.numExercisedEvents)( + MetricsContext("event_type" -> "exercised") + ) } } } @@ -475,7 +476,7 @@ class UpdateHistory( val safeParticipantOffset = lengthLimited(LegacyOffset.Api.fromLong(reassignment.offset)) val safeUnassignId = lengthLimited(event.unassignId) val safeContractId = lengthLimited(event.contractId.contractId) - oMetrics.foreach(_.UpdateHistory.unassignments.mark()) + metrics.UpdateHistory.unassignments.mark() sqlu""" insert into update_history_unassignments( history_id,update_id,record_time, @@ -521,7 +522,7 @@ class UpdateHistory( val safeCreatedAt = CantonTimestamp.assertFromInstant(event.createdEvent.createdAt) val safeSignatories = event.createdEvent.getSignatories.asScala.toSeq.map(lengthLimited) val safeObservers = event.createdEvent.getObservers.asScala.toSeq.map(lengthLimited) - oMetrics.foreach(_.UpdateHistory.assignments.mark()) + metrics.UpdateHistory.assignments.mark() sqlu""" insert into update_history_assignments( history_id,update_id,record_time, @@ -551,7 +552,7 @@ class UpdateHistory( tree: Transaction, migrationId: Long, ): DBIOAction[IngestedEvents, NoStream, Effect.Read & Effect.Write] = { - oMetrics.foreach(_.UpdateHistory.transactionsTrees.mark()) + metrics.UpdateHistory.transactionsTrees.mark() insertTransactionUpdateRow(tree, migrationId) .flatMap(updateRowId => { // Note: the order of elements in the eventsById map doesn't matter, and is not preserved here. @@ -2337,25 +2338,21 @@ object UpdateHistory { // Since we're interested in the highest known migration id, we don't need to filter by anything // (store ID, participant ID, etc. are not even known at the time we want to call this). def getHighestKnownMigrationId( - storage: Storage + storage: DbStorage )(implicit ec: ExecutionContext, closeContext: CloseContext, tc: TraceContext, ): Future[Option[Long]] = { - storage match { - case storage: DbStorage => - for { - queryResult <- storage.query( - sql""" + for { + queryResult <- storage.query( + sql""" select max(migration_id) from update_history_last_ingested_offsets """.as[Option[Long]], - "getHighestKnownMigrationId", - ) - } yield { - queryResult.headOption.flatten - } - case storageType => throw new RuntimeException(s"Unsupported storage type $storageType") + "getHighestKnownMigrationId", + ) + } yield { + queryResult.headOption.flatten } } diff --git a/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/db/DbAppStore.scala b/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/db/DbAppStore.scala index 15ebc464ae..015fc867f7 100644 --- a/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/db/DbAppStore.scala +++ b/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/db/DbAppStore.scala @@ -10,9 +10,7 @@ import org.lfdecentralizedtrust.splice.util.TemplateJsonDecoder import com.digitalasset.canton.concurrent.FutureSupervisor import com.digitalasset.canton.lifecycle.CloseContext import com.digitalasset.canton.resource.DbStorage -import com.digitalasset.canton.topology.ParticipantId import org.lfdecentralizedtrust.splice.config.IngestionConfig -import org.lfdecentralizedtrust.splice.store.UpdateHistory.BackfillingRequirement import scala.concurrent.ExecutionContext @@ -24,12 +22,7 @@ abstract class DbTxLogAppStore[TXE]( acsStoreDescriptor: DbMultiDomainAcsStore.StoreDescriptor, txLogStoreDescriptor: DbMultiDomainAcsStore.StoreDescriptor, domainMigrationInfo: DomainMigrationInfo, - participantId: ParticipantId, - enableissue12777Workaround: Boolean, - enableImportUpdateBackfill: Boolean, - backfillingRequired: BackfillingRequirement, ingestionConfig: IngestionConfig, - oHistoryMetrics: Option[HistoryMetrics] = None, )(implicit override protected val ec: ExecutionContext, templateJsonDecoder: TemplateJsonDecoder, @@ -40,12 +33,7 @@ abstract class DbTxLogAppStore[TXE]( interfaceViewsTableNameOpt = interfaceViewsTableNameOpt, acsStoreDescriptor = acsStoreDescriptor, domainMigrationInfo = domainMigrationInfo, - participantId = participantId, - enableissue12777Workaround = enableissue12777Workaround, - enableImportUpdateBackfill = enableImportUpdateBackfill, - backfillingRequired, - ingestionConfig, - oHistoryMetrics = oHistoryMetrics, + ingestionConfig = ingestionConfig, ) with TxLogAppStore[TXE] { @@ -73,12 +61,7 @@ abstract class DbAppStore( interfaceViewsTableNameOpt: Option[String], acsStoreDescriptor: DbMultiDomainAcsStore.StoreDescriptor, domainMigrationInfo: DomainMigrationInfo, - participantId: ParticipantId, - enableissue12777Workaround: Boolean, - enableImportUpdateBackfill: Boolean, - backfillingRequired: BackfillingRequirement, ingestionConfig: IngestionConfig, - oHistoryMetrics: Option[HistoryMetrics] = None, )(implicit protected val ec: ExecutionContext, templateJsonDecoder: TemplateJsonDecoder, @@ -107,6 +90,8 @@ abstract class DbAppStore( handleIngestionSummary, ) + override lazy val storeName: String = multiDomainAcsStore.storeName + override lazy val domains: InMemorySynchronizerStore = new InMemorySynchronizerStore( acsContractFilter.ingestionFilter.primaryParty, @@ -114,20 +99,6 @@ abstract class DbAppStore( retryProvider, ) - override lazy val updateHistory: UpdateHistory = - new UpdateHistory( - storage, - domainMigrationInfo, - acsStoreDescriptor.name, - participantId, - acsContractFilter.ingestionFilter.primaryParty, - backfillingRequired, - loggerFactory, - enableissue12777Workaround, - enableImportUpdateBackfill, - oHistoryMetrics, - ) - override def close(): Unit = { multiDomainAcsStore.close() } diff --git a/apps/common/src/test/scala/org/lfdecentralizedtrust/splice/store/TxLogBackfillingStoreTest.scala b/apps/common/src/test/scala/org/lfdecentralizedtrust/splice/store/TxLogBackfillingStoreTest.scala index 4dd5dd6298..086b0fccf1 100644 --- a/apps/common/src/test/scala/org/lfdecentralizedtrust/splice/store/TxLogBackfillingStoreTest.scala +++ b/apps/common/src/test/scala/org/lfdecentralizedtrust/splice/store/TxLogBackfillingStoreTest.scala @@ -567,6 +567,7 @@ class TxLogBackfillingStoreTest loggerFactory, enableissue12777Workaround = true, enableImportUpdateBackfill = true, + HistoryMetrics.apply(NoOpMetricsFactory, migrationId), ) } diff --git a/apps/common/src/test/scala/org/lfdecentralizedtrust/splice/store/UpdateHistoryTestBase.scala b/apps/common/src/test/scala/org/lfdecentralizedtrust/splice/store/UpdateHistoryTestBase.scala index d2aec3f2fe..b15109a188 100644 --- a/apps/common/src/test/scala/org/lfdecentralizedtrust/splice/store/UpdateHistoryTestBase.scala +++ b/apps/common/src/test/scala/org/lfdecentralizedtrust/splice/store/UpdateHistoryTestBase.scala @@ -9,7 +9,7 @@ import org.lfdecentralizedtrust.splice.environment.ledger.api.{ ReassignmentUpdate, TransactionTreeUpdate, } -import org.lfdecentralizedtrust.splice.migration.{DomainMigrationInfo, MigrationTimeInfo} +import org.lfdecentralizedtrust.splice.migration.MigrationTimeInfo import org.lfdecentralizedtrust.splice.store.db.{AcsJdbcTypes, AcsTables, SplicePostgresTest} import com.digitalasset.canton.data.CantonTimestamp import com.digitalasset.canton.lifecycle.FutureUnlessShutdown @@ -26,8 +26,9 @@ import org.scalatest.Assertion import scala.concurrent.Future import scala.jdk.CollectionConverters.* - import UpdateHistory.UpdateHistoryResponse +import com.daml.metrics.api.noop.NoOpMetricsFactory +import org.lfdecentralizedtrust.splice.migration.DomainMigrationInfo abstract class UpdateHistoryTestBase extends StoreTest @@ -254,6 +255,7 @@ abstract class UpdateHistoryTestBase loggerFactory, enableissue12777Workaround = true, enableImportUpdateBackfill = true, + HistoryMetrics.apply(NoOpMetricsFactory, domainMigrationId), ) } diff --git a/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/ScanApp.scala b/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/ScanApp.scala index dd6fb8fa6b..8541cf117d 100644 --- a/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/ScanApp.scala +++ b/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/ScanApp.scala @@ -46,14 +46,14 @@ import org.lfdecentralizedtrust.splice.scan.store.db.{ ScanAggregatesReaderContext, } import org.lfdecentralizedtrust.splice.scan.dso.DsoAnsResolver -import org.lfdecentralizedtrust.splice.store.PageLimit +import org.lfdecentralizedtrust.splice.store.{PageLimit, UpdateHistory} import org.lfdecentralizedtrust.splice.util.HasHealth import com.digitalasset.canton.concurrent.FutureSupervisor import com.digitalasset.canton.config.CantonRequireTypes.InstanceName import com.digitalasset.canton.config.ProcessingTimeout import com.digitalasset.canton.lifecycle.LifeCycle import com.digitalasset.canton.logging.{NamedLoggerFactory, TracedLogger} -import com.digitalasset.canton.resource.Storage +import com.digitalasset.canton.resource.{DbStorage, Storage} import com.digitalasset.canton.time.Clock import com.digitalasset.canton.topology.PartyId import com.digitalasset.canton.tracing.{TraceContext, TracerProvider} @@ -65,6 +65,7 @@ import org.apache.pekko.http.cors.scaladsl.settings.CorsSettings import scala.concurrent.{ExecutionContextExecutor, Future} import org.apache.pekko.stream.Materializer import org.lfdecentralizedtrust.splice.http.HttpRateLimiter +import org.lfdecentralizedtrust.splice.store.UpdateHistory.BackfillingRequirement /** Class representing a Scan app instance. * @@ -74,7 +75,7 @@ class ScanApp( override val name: InstanceName, val config: ScanAppBackendConfig, val amuletAppParameters: SharedSpliceAppParameters, - storage: Storage, + storage: DbStorage, override protected val clock: Clock, val loggerFactory: NamedLoggerFactory, tracerProvider: TracerProvider, @@ -181,14 +182,25 @@ class ScanApp( migrationInfo, participantId, config.cache, - config.updateHistoryBackfillImportUpdatesEnabled, nodeMetrics.dbScanStore, config.automation.ingestion, initialRound.toLong, ) + updateHistory = new UpdateHistory( + storage, + migrationInfo, + store.storeName, + participantId, + store.acsContractFilter.ingestionFilter.primaryParty, + BackfillingRequirement.NeedsBackfilling, + loggerFactory, + enableissue12777Workaround = true, + enableImportUpdateBackfill = config.updateHistoryBackfillImportUpdatesEnabled, + nodeMetrics.dbScanStore.history, + ) acsSnapshotStore = AcsSnapshotStore( storage, - store.updateHistory, + updateHistory, dsoParty, migrationInfo.currentMigrationId, loggerFactory, @@ -207,6 +219,7 @@ class ScanApp( retryProvider, loggerFactory, store, + updateHistory, storage, acsSnapshotStore, config.ingestFromParticipantBegin, @@ -216,10 +229,10 @@ class ScanApp( amuletAppParameters.upgradesConfig, initialRound.toLong, ) - scanVerdictStore = DbScanVerdictStore(storage, store.updateHistory, loggerFactory)(ec) + scanVerdictStore = DbScanVerdictStore(storage, updateHistory, loggerFactory)(ec) scanEventStore = new ScanEventStore( scanVerdictStore, - store.updateHistory, + updateHistory, loggerFactory, )(ec) _ <- appInitStep("Wait until there is an OpenMiningRound contract") { @@ -282,6 +295,7 @@ class ScanApp( participantAdminConnection, sequencerAdminConnection, store, + updateHistory, acsSnapshotStore, scanEventStore, dsoAnsResolver, diff --git a/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/admin/http/HttpScanHandler.scala b/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/admin/http/HttpScanHandler.scala index 92204732e0..3601384270 100644 --- a/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/admin/http/HttpScanHandler.scala +++ b/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/admin/http/HttpScanHandler.scala @@ -109,6 +109,7 @@ class HttpScanHandler( participantAdminConnection: ParticipantAdminConnection, sequencerAdminConnection: SequencerAdminConnection, protected val store: ScanStore, + updateHistory: UpdateHistory, snapshotStore: AcsSnapshotStore, eventStore: ScanEventStore, dsoAnsResolver: DsoAnsResolver, @@ -762,7 +763,6 @@ class HttpScanHandler( extracted: TraceContext, ): Future[Vector[definitions.UpdateHistoryItem]] = { implicit val tc: TraceContext = extracted - val updateHistory = store.updateHistory val afterO = after.map { after => val afterRecordTime = parseTimestamp(after.afterRecordTime) ( @@ -869,7 +869,7 @@ class HttpScanHandler( for { eventO <- eventStore.getEventByUpdateId( updateId, - store.updateHistory.domainMigrationInfo.currentMigrationId, + updateHistory.domainMigrationInfo.currentMigrationId, ) } yield { eventO match { @@ -911,7 +911,6 @@ class HttpScanHandler( extracted: TraceContext, ): Future[Vector[definitions.EventHistoryItem]] = { implicit val tc: TraceContext = extracted - val updateHistory = store.updateHistory val afterO = after.map { a => val afterRecordTime = parseTimestamp(a.afterRecordTime) (a.afterMigrationId, afterRecordTime) @@ -1263,7 +1262,7 @@ class HttpScanHandler( .sequentialTraverse(txLogEntryMap.view.toList) { case (cid, entry) => // The update history ingests independently so this lookup can return None temporarily. // We just filter out those contracts. - store.updateHistory + updateHistory .lookupContractById(TransferCommand.COMPANION)(cid) .map( _.map(c => @@ -1413,7 +1412,7 @@ class HttpScanHandler( .asRuntimeException(), ) ) - snapshotTime <- snapshotStore.updateHistory + snapshotTime <- updateHistory .getUpdatesBefore( snapshotStore.currentMigrationId, synchronizerId, @@ -1715,7 +1714,7 @@ class HttpScanHandler( ): Future[Either[definitions.ErrorResponse, definitions.UpdateHistoryItem]] = { implicit val tc = extracted for { - tx <- store.updateHistory.getUpdate(updateId) + tx <- updateHistory.getUpdate(updateId) } yield { tx.fold[Either[definitions.ErrorResponse, definitions.UpdateHistoryItem]]( Left( @@ -2013,7 +2012,7 @@ class HttpScanHandler( )(extracted: TraceContext): Future[ScanResource.GetMigrationInfoResponse] = { implicit val tc = extracted withSpan(s"$workflowId.getMigrationInfo") { _ => _ => - val sourceHistory = store.updateHistory.sourceHistory + val sourceHistory = updateHistory.sourceHistory for { infoO <- sourceHistory.migrationInfo(body.migrationId) } yield infoO match { @@ -2046,7 +2045,6 @@ class HttpScanHandler( )(extracted: TraceContext): Future[ScanResource.GetUpdatesBeforeResponse] = { implicit val tc: TraceContext = extracted withSpan(s"$workflowId.getUpdatesBefore") { _ => _ => - val updateHistory = store.updateHistory updateHistory .getUpdatesBefore( migrationId = body.migrationId, @@ -2077,7 +2075,6 @@ class HttpScanHandler( )(extracted: TraceContext): Future[ScanResource.GetImportUpdatesResponse] = { implicit val tc: TraceContext = extracted withSpan(s"$workflowId.getImportUpdates") { _ => _ => - val updateHistory = store.updateHistory updateHistory .getImportUpdates( migrationId = body.migrationId, @@ -2207,7 +2204,7 @@ class HttpScanHandler( implicit val tc = extracted withSpan(s"$workflowId.getBackfillingStatus") { _ => _ => for { - updateHistoryStatus <- store.updateHistory.getBackfillingState() + updateHistoryStatus <- updateHistory.getBackfillingState() txLogStatus <- store.multiDomainAcsStore.getTxLogBackfillingState() updateHistoryComplete = updateHistoryStatus == BackfillingState.Complete txLogComplete = txLogStatus == TxLogBackfillingState.Complete diff --git a/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/automation/ScanAutomationService.scala b/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/automation/ScanAutomationService.scala index 8ada2ace18..e16e11ec90 100644 --- a/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/automation/ScanAutomationService.scala +++ b/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/automation/ScanAutomationService.scala @@ -17,11 +17,12 @@ import org.lfdecentralizedtrust.splice.scan.config.ScanAppBackendConfig import org.lfdecentralizedtrust.splice.store.{ DomainTimeSynchronization, DomainUnpausedSynchronization, + UpdateHistory, } import org.lfdecentralizedtrust.splice.scan.store.{AcsSnapshotStore, ScanStore} import org.lfdecentralizedtrust.splice.util.TemplateJsonDecoder import com.digitalasset.canton.logging.NamedLoggerFactory -import com.digitalasset.canton.resource.Storage +import com.digitalasset.canton.resource.DbStorage import com.digitalasset.canton.time.Clock import com.digitalasset.canton.topology.PartyId import io.opentelemetry.api.trace.Tracer @@ -36,7 +37,8 @@ class ScanAutomationService( retryProvider: RetryProvider, protected val loggerFactory: NamedLoggerFactory, store: ScanStore, - storage: Storage, + val updateHistory: UpdateHistory, + storage: DbStorage, snapshotStore: AcsSnapshotStore, ingestFromParticipantBegin: Boolean, ingestUpdateHistoryFromParticipantBegin: Boolean, @@ -60,7 +62,6 @@ class ScanAutomationService( ledgerClient, retryProvider, ingestFromParticipantBegin, - ingestUpdateHistoryFromParticipantBegin, config.parameters, ) { override def companion @@ -71,10 +72,17 @@ class ScanAutomationService( registerTrigger( new ScanBackfillAggregatesTrigger(store, triggerContext, initialRound) ) + + registerUpdateHistoryIngestion( + updateHistory, + ingestUpdateHistoryFromParticipantBegin, + ) + if (config.updateHistoryBackfillEnabled) { registerTrigger( new ScanHistoryBackfillingTrigger( store, + updateHistory, svName, ledgerClient, config.updateHistoryBackfillBatchSize, @@ -88,7 +96,7 @@ class ScanAutomationService( registerTrigger( new AcsSnapshotTrigger( snapshotStore, - store.updateHistory, + updateHistory, config.acsSnapshotPeriodHours, // The acs snapshot trigger should not attempt to backfill snapshots unless the backfilling // UpdateHistory is fully enabled and complete. @@ -100,7 +108,7 @@ class ScanAutomationService( registerTrigger( new DeleteCorruptAcsSnapshotTrigger( snapshotStore, - store.updateHistory, + updateHistory, triggerContext, ) ) @@ -109,6 +117,7 @@ class ScanAutomationService( registerTrigger( new TxLogBackfillingTrigger( store, + updateHistory, config.txLogBackfillBatchSize, triggerContext, ) diff --git a/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/automation/ScanHistoryBackfillingTrigger.scala b/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/automation/ScanHistoryBackfillingTrigger.scala index df0c57b111..efaf9d6aba 100644 --- a/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/automation/ScanHistoryBackfillingTrigger.scala +++ b/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/automation/ScanHistoryBackfillingTrigger.scala @@ -31,6 +31,7 @@ import org.lfdecentralizedtrust.splice.store.{ ImportUpdatesBackfilling, PageLimit, TreeUpdateWithMigrationId, + UpdateHistory, } import org.lfdecentralizedtrust.splice.util.TemplateJsonDecoder import com.digitalasset.canton.data.CantonTimestamp @@ -46,6 +47,7 @@ import scala.concurrent.{ExecutionContextExecutor, Future, blocking} class ScanHistoryBackfillingTrigger( store: ScanStore, + updateHistory: UpdateHistory, svName: String, ledgerClient: SpliceLedgerClient, batchSize: Int, @@ -61,7 +63,7 @@ class ScanHistoryBackfillingTrigger( mat: Materializer, ) extends PollingParallelTaskExecutionTrigger[ScanHistoryBackfillingTrigger.Task] { - private val currentMigrationId = store.updateHistory.domainMigrationInfo.currentMigrationId + private val currentMigrationId = updateHistory.domainMigrationInfo.currentMigrationId private val historyMetrics = new HistoryMetrics(context.metricsFactory)( MetricsContext( @@ -70,7 +72,7 @@ class ScanHistoryBackfillingTrigger( ) /** A cursor for iterating over the beginning of the update history in findHistoryStart, - * see [[org.lfdecentralizedtrust.splice.store.UpdateHistory.getUpdates()]]. + * see [[org.lfdecentralizedtrust.splice.updateHistory.getUpdates()]]. * We need to store this as we don't want to start over from the beginning every time the trigger runs. */ @SuppressWarnings(Array("org.wartremover.warts.Var")) @@ -88,14 +90,14 @@ class ScanHistoryBackfillingTrigger( override def retrieveTasks()(implicit tc: TraceContext ): Future[Seq[ScanHistoryBackfillingTrigger.Task]] = { - if (!store.updateHistory.isReady) { + if (!updateHistory.isReady) { logger.debug("UpdateHistory is not yet ready") Future.successful(Seq.empty) - } else if (importUpdateBackfillingEnabled && !store.updateHistory.corruptAcsSnapshotsDeleted) { + } else if (importUpdateBackfillingEnabled && !updateHistory.corruptAcsSnapshotsDeleted) { logger.debug("There may be corrupt ACS snapshots that need to be deleted") Future.successful(Seq.empty) } else { - store.updateHistory.getBackfillingState().map { + updateHistory.getBackfillingState().map { case BackfillingState.Complete => historyMetrics.UpdateHistoryBackfilling.completed.updateValue(1) historyMetrics.ImportUpdatesBackfilling.completed.updateValue(1) @@ -149,7 +151,7 @@ class ScanHistoryBackfillingTrigger( result <- initialUpdateO match { case Some(FoundingTransactionTreeUpdate(treeUpdate, _)) => for { - _ <- store.updateHistory + _ <- updateHistory .initializeBackfilling( treeUpdate.migrationId, treeUpdate.update.synchronizerId, @@ -163,8 +165,8 @@ class ScanHistoryBackfillingTrigger( for { // Before deleting updates, we need to delete ACS snapshots that were generated before backfilling was enabled. // This will delete all ACS snapshots for migration id where the SV node joined the network. - _ <- store.updateHistory.deleteAcsSnapshotsAfter( - historyId = store.updateHistory.historyId, + _ <- updateHistory.deleteAcsSnapshotsAfter( + historyId = updateHistory.historyId, migrationId = treeUpdate.migrationId, recordTime = CantonTimestamp.MinValue, ) @@ -172,12 +174,12 @@ class ScanHistoryBackfillingTrigger( // only with the visibility of the SV party and not the DSO party. // Note that this will also delete the import updates because they have a record time of 0, // which is good because we want to remove them. - _ <- store.updateHistory.deleteUpdatesBefore( + _ <- updateHistory.deleteUpdatesBefore( synchronizerId = treeUpdate.update.synchronizerId, migrationId = treeUpdate.migrationId, recordTime = treeUpdate.update.update.recordTime, ) - _ <- store.updateHistory + _ <- updateHistory .initializeBackfilling( treeUpdate.migrationId, treeUpdate.update.synchronizerId, @@ -203,7 +205,7 @@ class ScanHistoryBackfillingTrigger( synchronized { val batchSize = 100 for { - updates <- store.updateHistory.getUpdatesWithoutImportUpdates( + updates <- updateHistory.getUpdatesWithoutImportUpdates( findHistoryStartAfter, PageLimit.tryCreate(batchSize), ) @@ -260,7 +262,7 @@ class ScanHistoryBackfillingTrigger( val backfilling = new ScanHistoryBackfilling( connection = connection, - destinationHistory = store.updateHistory.destinationHistory, + destinationHistory = updateHistory.destinationHistory, currentMigrationId = currentMigrationId, batchSize = batchSize, loggerFactory = loggerFactory, diff --git a/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/store/AcsSnapshotStore.scala b/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/store/AcsSnapshotStore.scala index c0a6659305..4380e06141 100644 --- a/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/store/AcsSnapshotStore.scala +++ b/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/store/AcsSnapshotStore.scala @@ -20,7 +20,7 @@ import com.digitalasset.canton.data.CantonTimestamp import com.digitalasset.canton.lifecycle.{CloseContext, FutureUnlessShutdown} import com.digitalasset.canton.logging.pretty.{Pretty, PrettyPrinting} import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging} -import com.digitalasset.canton.resource.{DbStorage, Storage} +import com.digitalasset.canton.resource.DbStorage import com.digitalasset.canton.resource.DbStorage.Implicits.BuilderChain.toSQLActionBuilderChain import com.digitalasset.canton.topology.PartyId import com.digitalasset.canton.tracing.TraceContext @@ -480,16 +480,12 @@ object AcsSnapshotStore { } def apply( - storage: Storage, + storage: DbStorage, updateHistory: UpdateHistory, dsoParty: PartyId, migrationId: Long, loggerFactory: NamedLoggerFactory, )(implicit ec: ExecutionContext, closeContext: CloseContext): AcsSnapshotStore = - storage match { - case db: DbStorage => - new AcsSnapshotStore(db, updateHistory, dsoParty, migrationId, loggerFactory) - case storageType => throw new RuntimeException(s"Unsupported storage type $storageType") - } + new AcsSnapshotStore(storage, updateHistory, dsoParty, migrationId, loggerFactory) } diff --git a/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/store/CachingScanStore.scala b/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/store/CachingScanStore.scala index 31f734f960..3857897d30 100644 --- a/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/store/CachingScanStore.scala +++ b/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/store/CachingScanStore.scala @@ -59,6 +59,7 @@ class CachingScanStore( with FlagCloseableAsync with RetryProvider.Has { + override val storeName: String = store.storeName override lazy val txLogConfig: TxLogStore.Config[TxLogEntry] = store.txLogConfig override def key: ScanStore.Key = store.key @@ -307,12 +308,14 @@ class CachingScanStore( override def lookupContractByRecordTime[C, TCId <: ContractId[_], T]( companion: C, + updateHistory: UpdateHistory, recordTime: CantonTimestamp, )(implicit companionClass: MultiDomainAcsStore.ContractCompanion[C, TCId, T], tc: TraceContext, ): Future[Option[Contract[TCId, T]]] = store.lookupContractByRecordTime( companion, + updateHistory, recordTime, ) @@ -381,8 +384,6 @@ class CachingScanStore( override def multiDomainAcsStore: MultiDomainAcsStore = store.multiDomainAcsStore - override def updateHistory: UpdateHistory = store.updateHistory - @SuppressWarnings(Array("org.wartremover.warts.AsInstanceOf")) private def getCache[Key, Value]( cacheName: String, diff --git a/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/store/ScanStore.scala b/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/store/ScanStore.scala index 94b734351c..5bd35685f1 100644 --- a/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/store/ScanStore.scala +++ b/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/store/ScanStore.scala @@ -8,7 +8,7 @@ import com.digitalasset.canton.data.CantonTimestamp import com.digitalasset.canton.lifecycle.CloseContext import com.digitalasset.canton.logging.NamedLoggerFactory import com.digitalasset.canton.logging.pretty.{Pretty, PrettyPrinting} -import com.digitalasset.canton.resource.{DbStorage, Storage} +import com.digitalasset.canton.resource.DbStorage import com.digitalasset.canton.topology.{Member, ParticipantId, PartyId, SynchronizerId} import com.digitalasset.canton.tracing.TraceContext import com.digitalasset.daml.lf.data.Time.Timestamp @@ -39,6 +39,7 @@ import org.lfdecentralizedtrust.splice.store.{ PageLimit, SortOrder, TxLogAppStore, + UpdateHistory, VotesStore, } import org.lfdecentralizedtrust.splice.util.{Contract, ContractWithState, TemplateJsonDecoder} @@ -284,6 +285,7 @@ trait ScanStore def lookupContractByRecordTime[C, TCId <: ContractId[_], T]( companion: C, + updateHistory: UpdateHistory, recordTime: CantonTimestamp = CantonTimestamp.MinValue, )(implicit companionClass: ContractCompanion[C, TCId, T], @@ -304,7 +306,7 @@ object ScanStore { def apply( key: ScanStore.Key, - storage: Storage, + storage: DbStorage, isFirstSv: Boolean, loggerFactory: NamedLoggerFactory, retryProvider: RetryProvider, @@ -312,7 +314,6 @@ object ScanStore { domainMigrationInfo: DomainMigrationInfo, participantId: ParticipantId, cacheConfigs: ScanCacheConfig, - enableImportUpdateBackfill: Boolean, metrics: DbScanStoreMetrics, ingestionConfig: IngestionConfig, initialRound: Long, @@ -321,30 +322,25 @@ object ScanStore { templateJsonDecoder: TemplateJsonDecoder, close: CloseContext, ): ScanStore = { - storage match { - case db: DbStorage => - new CachingScanStore( - loggerFactory, - retryProvider, - new DbScanStore( - key = key, - db, - isFirstSv, - loggerFactory, - retryProvider, - createScanAggregatesReader, - domainMigrationInfo, - participantId, - enableImportUpdateBackfill, - ingestionConfig, - metrics, - initialRound, - ), - cacheConfigs, - metrics, - ) - case storageType => throw new RuntimeException(s"Unsupported storage type $storageType") - } + new CachingScanStore( + loggerFactory, + retryProvider, + new DbScanStore( + key = key, + storage, + isFirstSv, + loggerFactory, + retryProvider, + createScanAggregatesReader, + domainMigrationInfo, + participantId, + ingestionConfig, + metrics, + initialRound, + ), + cacheConfigs, + metrics, + ) } def contractFilter( diff --git a/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/store/db/DbScanStore.scala b/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/store/db/DbScanStore.scala index e01145f052..c0e64b0235 100644 --- a/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/store/db/DbScanStore.scala +++ b/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/store/db/DbScanStore.scala @@ -50,7 +50,6 @@ import org.lfdecentralizedtrust.splice.scan.store.{ VoteRequestTxLogEntry, } import org.lfdecentralizedtrust.splice.store.MultiDomainAcsStore.ContractCompanion -import org.lfdecentralizedtrust.splice.store.db.DbMultiDomainAcsStore.StoreDescriptor import org.lfdecentralizedtrust.splice.store.db.{ AcsQueries, AcsTables, @@ -64,6 +63,7 @@ import org.lfdecentralizedtrust.splice.store.{ PageLimit, SortOrder, TxLogStore, + UpdateHistory, } import org.lfdecentralizedtrust.splice.util.{ Contract, @@ -75,9 +75,9 @@ import org.lfdecentralizedtrust.splice.util.{ import slick.jdbc.canton.ActionBasedSQLInterpolation.Implicits.actionBasedSQLInterpolationCanton import io.grpc.Status import org.lfdecentralizedtrust.splice.config.IngestionConfig -import org.lfdecentralizedtrust.splice.store.UpdateHistory.BackfillingRequirement import org.lfdecentralizedtrust.splice.store.UpdateHistoryQueries.UpdateHistoryQueries import org.lfdecentralizedtrust.splice.store.db.AcsQueries.AcsStoreId +import org.lfdecentralizedtrust.splice.store.db.DbMultiDomainAcsStore.StoreDescriptor import org.lfdecentralizedtrust.splice.store.db.TxLogQueries.TxLogStoreId import java.time.Instant @@ -97,7 +97,6 @@ class DbScanStore( createScanAggregatesReader: DbScanStore => ScanAggregatesReader, domainMigrationInfo: DomainMigrationInfo, participantId: ParticipantId, - enableImportUpdateBackfill: Boolean, ingestionConfig: IngestionConfig, storeMetrics: DbScanStoreMetrics, initialRound: Long, @@ -131,12 +130,7 @@ class DbScanStore( ), ), domainMigrationInfo, - participantId, - enableissue12777Workaround = true, - enableImportUpdateBackfill = enableImportUpdateBackfill, - BackfillingRequirement.NeedsBackfilling, ingestionConfig, - Some(storeMetrics.history), ) with ScanStore with AcsTables @@ -1060,8 +1054,10 @@ class DbScanStore( .toMap } + // TODO (#934): this method probably belongs in UpdateHistory instead override def lookupContractByRecordTime[C, TCId <: ContractId[_], T]( companion: C, + updateHistory: UpdateHistory, recordTime: CantonTimestamp, )(implicit companionClass: ContractCompanion[C, TCId, T], diff --git a/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/store/db/DbScanVerdictStore.scala b/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/store/db/DbScanVerdictStore.scala index 5fd84e1e8c..b8448a4b0e 100644 --- a/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/store/db/DbScanVerdictStore.scala +++ b/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/store/db/DbScanVerdictStore.scala @@ -68,15 +68,11 @@ object DbScanVerdictStore { } def apply( - storage: com.digitalasset.canton.resource.Storage, + storage: com.digitalasset.canton.resource.DbStorage, updateHistory: UpdateHistory, loggerFactory: NamedLoggerFactory, )(implicit ec: ExecutionContext): DbScanVerdictStore = - storage match { - case db: DbStorage => new DbScanVerdictStore(db, updateHistory, loggerFactory) - case other => - throw new RuntimeException(s"Unsupported storage type $other for DbScanVerdictStore") - } + new DbScanVerdictStore(storage, updateHistory, loggerFactory) } class DbScanVerdictStore( diff --git a/apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/scan/store/ScanEventStoreTest.scala b/apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/scan/store/ScanEventStoreTest.scala index 7ad8c797d5..4feb5a3346 100644 --- a/apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/scan/store/ScanEventStoreTest.scala +++ b/apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/scan/store/ScanEventStoreTest.scala @@ -1,19 +1,18 @@ package org.lfdecentralizedtrust.splice.scan.store +import com.daml.metrics.api.noop.NoOpMetricsFactory import com.digitalasset.canton.HasExecutionContext import com.digitalasset.canton.data.CantonTimestamp import com.digitalasset.canton.tracing.TraceContext import com.digitalasset.canton.topology.{ParticipantId, PartyId, SynchronizerId} -import org.lfdecentralizedtrust.splice.store.PageLimit -import org.lfdecentralizedtrust.splice.migration.DomainMigrationInfo +import org.lfdecentralizedtrust.splice.store.{HistoryMetrics, PageLimit, StoreTest, UpdateHistory} import org.lfdecentralizedtrust.splice.scan.store.db.DbScanVerdictStore -import org.lfdecentralizedtrust.splice.store.UpdateHistory -import org.lfdecentralizedtrust.splice.store.UpdateHistory.BackfillingRequirement -import org.lfdecentralizedtrust.splice.store.StoreTest import org.lfdecentralizedtrust.splice.store.db.SplicePostgresTest import com.digitalasset.canton.resource.DbStorage import com.digitalasset.canton.lifecycle.FutureUnlessShutdown import io.circe.Json +import org.lfdecentralizedtrust.splice.migration.DomainMigrationInfo +import org.lfdecentralizedtrust.splice.store.UpdateHistory.BackfillingRequirement import scala.concurrent.Future @@ -838,6 +837,7 @@ class ScanEventStoreTest extends StoreTest with HasExecutionContext with SpliceP loggerFactory, enableissue12777Workaround = true, enableImportUpdateBackfill = true, + HistoryMetrics(NoOpMetricsFactory, migrationId), ) uh.ingestionSink.initialize().map(_ => uh) } diff --git a/apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/store/db/AcsSnapshotStoreTest.scala b/apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/store/db/AcsSnapshotStoreTest.scala index 54571ea137..486ef80f50 100644 --- a/apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/store/db/AcsSnapshotStoreTest.scala +++ b/apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/store/db/AcsSnapshotStoreTest.scala @@ -3,12 +3,13 @@ package org.lfdecentralizedtrust.splice.store.db import cats.data.NonEmptyVector import com.daml.ledger.javaapi.data.Unit as damlUnit import com.daml.ledger.javaapi.data.codegen.ContractId +import com.daml.metrics.api.noop.NoOpMetricsFactory import org.lfdecentralizedtrust.splice.environment.DarResources import org.lfdecentralizedtrust.splice.environment.ledger.api.TransactionTreeUpdate -import org.lfdecentralizedtrust.splice.migration.DomainMigrationInfo import org.lfdecentralizedtrust.splice.scan.store.AcsSnapshotStore import org.lfdecentralizedtrust.splice.store.{ HardLimit, + HistoryMetrics, PageLimit, StoreErrors, StoreTest, @@ -24,6 +25,7 @@ import com.digitalasset.canton.util.MonadUtil import com.digitalasset.canton.{HasActorSystem, HasExecutionContext} import io.grpc.StatusRuntimeException import org.lfdecentralizedtrust.splice.codegen.java.splice.round as roundCodegen +import org.lfdecentralizedtrust.splice.migration.DomainMigrationInfo import org.lfdecentralizedtrust.splice.store.UpdateHistory.BackfillingRequirement import org.scalatest.Succeeded @@ -1030,6 +1032,7 @@ class AcsSnapshotStoreTest loggerFactory, enableissue12777Workaround = true, enableImportUpdateBackfill = true, + HistoryMetrics(NoOpMetricsFactory, migrationId), ) updateHistory.ingestionSink.initialize().map(_ => updateHistory) } diff --git a/apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/store/db/ScanAggregatorTest.scala b/apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/store/db/ScanAggregatorTest.scala index 7bac67005f..e5feeb461e 100644 --- a/apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/store/db/ScanAggregatorTest.scala +++ b/apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/store/db/ScanAggregatorTest.scala @@ -978,7 +978,6 @@ class ScanAggregatorTest None, ), participantId = mkParticipantId("ScanAggregatorTest"), - enableImportUpdateBackfill = true, ingestionConfig = IngestionConfig(), new DbScanStoreMetrics(new NoOpMetricsFactory(), loggerFactory, ProcessingTimeout()), initialRound = initialRound, diff --git a/apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/store/db/ScanStoreTest.scala b/apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/store/db/ScanStoreTest.scala index db50e46ec8..4d22faf7f1 100644 --- a/apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/store/db/ScanStoreTest.scala +++ b/apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/store/db/ScanStoreTest.scala @@ -1,6 +1,17 @@ package org.lfdecentralizedtrust.splice.store.db import com.daml.ledger.javaapi.data.{DamlRecord, Unit as damlUnit} +import com.daml.metrics.api.noop.NoOpMetricsFactory +import com.digitalasset.canton.concurrent.FutureSupervisor +import com.digitalasset.canton.crypto.Fingerprint +import com.digitalasset.canton.data.CantonTimestamp +import com.digitalasset.canton.lifecycle.FutureUnlessShutdown +import com.digitalasset.canton.resource.DbStorage +import com.digitalasset.canton.topology.* +import com.digitalasset.canton.tracing.TraceContext +import com.digitalasset.canton.util.MonadUtil +import com.digitalasset.canton.{HasActorSystem, HasExecutionContext, SynchronizerAlias} +import org.lfdecentralizedtrust.splice.codegen.java.da.time.types.RelTime import org.lfdecentralizedtrust.splice.codegen.java.splice import org.lfdecentralizedtrust.splice.codegen.java.splice.amulet.{ Amulet, @@ -12,86 +23,46 @@ import org.lfdecentralizedtrust.splice.codegen.java.splice.amuletrules.{ AmuletRules_BuyMemberTrafficResult, AmuletRules_MintResult, } +import org.lfdecentralizedtrust.splice.codegen.java.splice.ans.AnsEntry import org.lfdecentralizedtrust.splice.codegen.java.splice.decentralizedsynchronizer.MemberTraffic +import org.lfdecentralizedtrust.splice.codegen.java.splice.dso.decentralizedsynchronizer as decentralizedsynchronizerCodegen +import org.lfdecentralizedtrust.splice.codegen.java.splice.dsorules.{DsoRules, Reason, Vote} import org.lfdecentralizedtrust.splice.codegen.java.splice.types.Round import org.lfdecentralizedtrust.splice.codegen.java.splice.validatorlicense.FaucetState import org.lfdecentralizedtrust.splice.codegen.java.splice.{ amulet as amuletCodegen, - round as roundCodegen, -} -import org.lfdecentralizedtrust.splice.codegen.java.splice.ans.AnsEntry -import org.lfdecentralizedtrust.splice.codegen.java.splice.{ cometbft as cometbftCodegen, dsorules as dsorulesCodegen, + round as roundCodegen, } -import org.lfdecentralizedtrust.splice.codegen.java.splice.dso.decentralizedsynchronizer as decentralizedsynchronizerCodegen -import org.lfdecentralizedtrust.splice.codegen.java.da.time.types.RelTime -import org.lfdecentralizedtrust.splice.codegen.java.splice.dsorules.{DsoRules, Reason, Vote} import org.lfdecentralizedtrust.splice.environment.{DarResources, RetryProvider} -import org.lfdecentralizedtrust.splice.history.{ - AmuletExpire, - ExternalPartyAmuletRules_CreateTransferCommand, - LockedAmuletExpireAmulet, - Transfer, - TransferCommand_Expire, - TransferCommand_Send, - TransferCommand_Withdraw, -} +import org.lfdecentralizedtrust.splice.history.* import org.lfdecentralizedtrust.splice.migration.DomainMigrationInfo import org.lfdecentralizedtrust.splice.scan.admin.api.client.commands.HttpScanAppClient -import org.lfdecentralizedtrust.splice.scan.store.{ - OpenMiningRoundTxLogEntry, - ReceiverAmount, - SenderAmount, - TransferCommandCreated, - TransferCommandExpired, - TransferCommandFailed, - TransferCommandSent, - TransferCommandTxLogEntry, - TransferCommandWithdrawn, - TransferTxLogEntry, -} -import org.lfdecentralizedtrust.splice.scan.store.ScanStore import org.lfdecentralizedtrust.splice.scan.store.db.{ DbScanStore, DbScanStoreMetrics, ScanAggregatesReader, ScanAggregator, } -import org.lfdecentralizedtrust.splice.store.{PageLimit, SortOrder, StoreErrors, StoreTest} +import org.lfdecentralizedtrust.splice.scan.store.* import org.lfdecentralizedtrust.splice.store.MultiDomainAcsStore.ContractState.Assigned +import org.lfdecentralizedtrust.splice.store.UpdateHistory.BackfillingRequirement import org.lfdecentralizedtrust.splice.store.events.DsoRulesCloseVoteRequest +import org.lfdecentralizedtrust.splice.store.* import org.lfdecentralizedtrust.splice.util.SpliceUtil.damlDecimal -import org.lfdecentralizedtrust.splice.util.{ - Contract, - ContractWithState, - EventId, - ResourceTemplateDecoder, - TemplateJsonDecoder, -} -import com.digitalasset.canton.concurrent.FutureSupervisor -import com.digitalasset.canton.crypto.Fingerprint -import com.digitalasset.canton.data.CantonTimestamp -import com.daml.metrics.api.noop.NoOpMetricsFactory -import com.digitalasset.canton.lifecycle.FutureUnlessShutdown -import com.digitalasset.canton.resource.DbStorage -import com.digitalasset.canton.topology.* -import com.digitalasset.canton.tracing.TraceContext -import com.digitalasset.canton.{HasActorSystem, HasExecutionContext, SynchronizerAlias} +import org.lfdecentralizedtrust.splice.util.* import java.time.Instant +import java.time.temporal.ChronoUnit import java.util.{Collections, Optional} -import scala.concurrent.Future +import scala.concurrent.{ExecutionContext, Future} import scala.jdk.CollectionConverters.* import scala.jdk.OptionConverters.* import scala.math.BigDecimal.javaBigDecimal2bigDecimal import scala.reflect.ClassTag -import com.digitalasset.canton.util.MonadUtil import org.lfdecentralizedtrust.splice.config.IngestionConfig -import java.time.temporal.ChronoUnit -import scala.concurrent.ExecutionContext - abstract class ScanStoreTest extends StoreTest with HasExecutionContext @@ -1833,24 +1804,26 @@ abstract class ScanStoreTest val recordTimeThird = now.plusSeconds(9).toInstant for { store <- mkStore() - _ <- store.updateHistory.ingestionSink.initialize() + updateHistory <- mkUpdateHistory(domainMigrationId) + _ <- updateHistory.ingestionSink.initialize() first <- dummyDomain.create( firstDsoRules, recordTime = recordTimeFirst, )( - store.updateHistory + updateHistory ) firstRecordTime = CantonTimestamp.fromInstant(first.getRecordTime).getOrElse(now) _ <- dummyDomain.create( secondDsoRules, recordTime = recordTimeSecond, - )(store.updateHistory) + )(updateHistory) _ <- dummyDomain.create( thirdDsoRules, recordTime = recordTimeThird, - )(store.updateHistory) + )(updateHistory) result <- store.lookupContractByRecordTime( DsoRules.COMPANION, + updateHistory, firstRecordTime.plusSeconds(1), ) } yield { @@ -1870,26 +1843,28 @@ abstract class ScanStoreTest val recordTimeThird = now.plusSeconds(9).toInstant for { store <- mkStore() - _ <- store.updateHistory.ingestionSink.initialize() + updateHistory <- mkUpdateHistory(domainMigrationId) + _ <- updateHistory.ingestionSink.initialize() first <- dummyDomain.create( firstAmuletRules, recordTime = recordTimeFirst, )( - store.updateHistory + updateHistory ) firstRecordTime = CantonTimestamp.fromInstant(first.getRecordTime).getOrElse(now) _ <- dummyDomain.create( secondAmuletRules, recordTime = recordTimeSecond, )( - store.updateHistory + updateHistory ) _ <- dummyDomain.create( thirdAmuletRules, recordTime = recordTimeThird, - )(store.updateHistory) + )(updateHistory) result <- store.lookupContractByRecordTime( AmuletRules.COMPANION, + updateHistory, firstRecordTime.plusSeconds(1), ) } yield { @@ -1905,6 +1880,10 @@ abstract class ScanStoreTest dsoParty: PartyId = dsoParty ): Future[ScanStore] + protected def mkUpdateHistory( + migrationId: Long + ): Future[UpdateHistory] + private lazy val user1 = userParty(1) private lazy val user2 = userParty(2) @@ -2354,7 +2333,6 @@ class DbScanStoreTest None, ), participantId = mkParticipantId("ScanStoreTest"), - enableImportUpdateBackfill = true, IngestionConfig(), new DbScanStoreMetrics(new NoOpMetricsFactory(), loggerFactory, timeouts), initialRound = 0, @@ -2370,6 +2348,24 @@ class DbScanStoreTest } yield store } + override def mkUpdateHistory( + migrationId: Long + ): Future[UpdateHistory] = { + val updateHistory = new UpdateHistory( + storage.underlying, // not under test + new DomainMigrationInfo(migrationId, None), + "update_history_scan_store_test", + mkParticipantId("whatever"), + dsoParty, + BackfillingRequirement.BackfillingNotRequired, + loggerFactory, + enableissue12777Workaround = true, + enableImportUpdateBackfill = true, + HistoryMetrics(NoOpMetricsFactory, migrationId), + ) + updateHistory.ingestionSink.initialize().map(_ => updateHistory) + } + override protected def cleanDb( storage: DbStorage )(implicit traceContext: TraceContext): FutureUnlessShutdown[?] = diff --git a/apps/splitwell/src/main/scala/org/lfdecentralizedtrust/splice/splitwell/SplitwellApp.scala b/apps/splitwell/src/main/scala/org/lfdecentralizedtrust/splice/splitwell/SplitwellApp.scala index 22937da183..a169b77a08 100644 --- a/apps/splitwell/src/main/scala/org/lfdecentralizedtrust/splice/splitwell/SplitwellApp.scala +++ b/apps/splitwell/src/main/scala/org/lfdecentralizedtrust/splice/splitwell/SplitwellApp.scala @@ -9,7 +9,7 @@ import com.digitalasset.canton.config.CantonRequireTypes.InstanceName import com.digitalasset.canton.config.ProcessingTimeout import com.digitalasset.canton.lifecycle.LifeCycle import com.digitalasset.canton.logging.{NamedLoggerFactory, TracedLogger} -import com.digitalasset.canton.resource.Storage +import com.digitalasset.canton.resource.DbStorage import com.digitalasset.canton.time.Clock import com.digitalasset.canton.topology.{PartyId, SynchronizerId} import com.digitalasset.canton.tracing.{TraceContext, TracerProvider} @@ -57,7 +57,7 @@ class SplitwellApp( override val name: InstanceName, val config: SplitwellAppBackendConfig, val amuletAppParameters: SharedSpliceAppParameters, - storage: Storage, + storage: DbStorage, override protected val clock: Clock, val loggerFactory: NamedLoggerFactory, tracerProvider: TracerProvider, @@ -130,6 +130,7 @@ class SplitwellApp( participantId, config.automation.ingestion, ) + // splitwell does not need to have UpdateHistory automation = new SplitwellAutomationService( config.automation, clock, @@ -251,7 +252,7 @@ class SplitwellApp( object SplitwellApp { case class State( automation: SplitwellAutomationService, - storage: Storage, + storage: DbStorage, store: SplitwellStore, scanConnection: ScanConnection, participantAdminConnection: ParticipantAdminConnection, diff --git a/apps/splitwell/src/main/scala/org/lfdecentralizedtrust/splice/splitwell/automation/SplitwellAutomationService.scala b/apps/splitwell/src/main/scala/org/lfdecentralizedtrust/splice/splitwell/automation/SplitwellAutomationService.scala index 195c3b3ba3..49ee539fdd 100644 --- a/apps/splitwell/src/main/scala/org/lfdecentralizedtrust/splice/splitwell/automation/SplitwellAutomationService.scala +++ b/apps/splitwell/src/main/scala/org/lfdecentralizedtrust/splice/splitwell/automation/SplitwellAutomationService.scala @@ -32,7 +32,7 @@ import org.lfdecentralizedtrust.splice.util.QualifiedName import org.lfdecentralizedtrust.splice.scan.admin.api.client.ScanConnection import org.lfdecentralizedtrust.splice.splitwell.store.SplitwellStore import com.digitalasset.canton.logging.NamedLoggerFactory -import com.digitalasset.canton.resource.Storage +import com.digitalasset.canton.resource.DbStorage import com.digitalasset.canton.time.Clock import io.opentelemetry.api.trace.Tracer import org.lfdecentralizedtrust.splice.store.AppStoreWithIngestion.SpliceLedgerConnectionPriority @@ -44,7 +44,7 @@ class SplitwellAutomationService( automationConfig: AutomationConfig, clock: Clock, store: SplitwellStore, - storage: Storage, + storage: DbStorage, ledgerClient: SpliceLedgerClient, scanConnection: ScanConnection, retryProvider: RetryProvider, @@ -65,7 +65,6 @@ class SplitwellAutomationService( ledgerClient, retryProvider, ingestFromParticipantBegin = true, - ingestUpdateHistoryFromParticipantBegin = true, params, ) { diff --git a/apps/splitwell/src/main/scala/org/lfdecentralizedtrust/splice/splitwell/store/SplitwellStore.scala b/apps/splitwell/src/main/scala/org/lfdecentralizedtrust/splice/splitwell/store/SplitwellStore.scala index 4f707cb7e4..58a4d702ea 100644 --- a/apps/splitwell/src/main/scala/org/lfdecentralizedtrust/splice/splitwell/store/SplitwellStore.scala +++ b/apps/splitwell/src/main/scala/org/lfdecentralizedtrust/splice/splitwell/store/SplitwellStore.scala @@ -21,7 +21,7 @@ import org.lfdecentralizedtrust.splice.util.{ import com.digitalasset.canton.lifecycle.CloseContext import com.digitalasset.canton.logging.NamedLoggerFactory import com.digitalasset.canton.logging.pretty.{Pretty, PrettyPrinting} -import com.digitalasset.canton.resource.{DbStorage, Storage} +import com.digitalasset.canton.resource.DbStorage import com.digitalasset.canton.topology.{ParticipantId, PartyId, SynchronizerId} import com.digitalasset.canton.tracing.TraceContext import org.lfdecentralizedtrust.splice.config.IngestionConfig @@ -186,7 +186,7 @@ trait SplitwellStore extends AppStore { object SplitwellStore { def apply( key: Key, - storage: Storage, + storage: DbStorage, domainConfig: SplitwellSynchronizerConfig, loggerFactory: NamedLoggerFactory, retryProvider: RetryProvider, @@ -198,20 +198,16 @@ object SplitwellStore { templateJsonDecoder: TemplateJsonDecoder, close: CloseContext, ): SplitwellStore = - storage match { - case dbStorage: DbStorage => - new DbSplitwellStore( - key, - domainConfig, - dbStorage, - loggerFactory, - retryProvider, - domainMigrationInfo, - participantId, - ingestionConfig, - ) - case storageType => throw new RuntimeException(s"Unsupported storage type $storageType") - } + new DbSplitwellStore( + key, + domainConfig, + storage, + loggerFactory, + retryProvider, + domainMigrationInfo, + participantId, + ingestionConfig, + ) case class Key( providerParty: PartyId diff --git a/apps/splitwell/src/main/scala/org/lfdecentralizedtrust/splice/splitwell/store/db/DbSplitwellStore.scala b/apps/splitwell/src/main/scala/org/lfdecentralizedtrust/splice/splitwell/store/db/DbSplitwellStore.scala index 069bad4e9c..c21528770f 100644 --- a/apps/splitwell/src/main/scala/org/lfdecentralizedtrust/splice/splitwell/store/db/DbSplitwellStore.scala +++ b/apps/splitwell/src/main/scala/org/lfdecentralizedtrust/splice/splitwell/store/db/DbSplitwellStore.scala @@ -31,7 +31,6 @@ import com.digitalasset.canton.resource.DbStorage import com.digitalasset.canton.topology.{ParticipantId, PartyId, SynchronizerId} import com.digitalasset.canton.tracing.TraceContext import org.lfdecentralizedtrust.splice.config.IngestionConfig -import org.lfdecentralizedtrust.splice.store.UpdateHistory.BackfillingRequirement import org.lfdecentralizedtrust.splice.store.db.AcsQueries.AcsStoreId import slick.jdbc.canton.ActionBasedSQLInterpolation.Implicits.actionBasedSQLInterpolationCanton @@ -66,10 +65,6 @@ class DbSplitwellStore( ), ), domainMigrationInfo = domainMigrationInfo, - participantId = participantId, - enableissue12777Workaround = false, - enableImportUpdateBackfill = false, - BackfillingRequirement.BackfillingNotRequired, ingestionConfig, ) with AcsTables diff --git a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/SvApp.scala b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/SvApp.scala index 3682f6b83e..84cf099428 100644 --- a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/SvApp.scala +++ b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/SvApp.scala @@ -20,7 +20,7 @@ import com.digitalasset.canton.config.{ } import com.digitalasset.canton.lifecycle.{AsyncOrSyncCloseable, FlagCloseableAsync, SyncCloseable} import com.digitalasset.canton.logging.{NamedLoggerFactory, TracedLogger} -import com.digitalasset.canton.resource.Storage +import com.digitalasset.canton.resource.DbStorage import com.digitalasset.canton.time.Clock import com.digitalasset.canton.topology.{ParticipantId, PartyId, SynchronizerId} import com.digitalasset.canton.tracing.{TraceContext, TracerProvider} @@ -104,7 +104,7 @@ class SvApp( override val name: InstanceName, val config: SvAppBackendConfig, val amuletAppParameters: SharedSpliceAppParameters, - storage: Storage, + storage: DbStorage, override protected val clock: Clock, val loggerFactory: NamedLoggerFactory, tracerProvider: TracerProvider, @@ -806,7 +806,7 @@ object SvApp { case class State( participantAdminConnection: ParticipantAdminConnection, localSynchronizerNode: Option[LocalSynchronizerNode], - storage: Storage, + storage: DbStorage, domainTimeAutomationService: DomainTimeAutomationService, domainParamsAutomationService: DomainParamsAutomationService, svStore: SvSvStore, diff --git a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/admin/http/HttpSvAdminHandler.scala b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/admin/http/HttpSvAdminHandler.scala index 306ef434cf..1e6aca79a4 100644 --- a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/admin/http/HttpSvAdminHandler.scala +++ b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/admin/http/HttpSvAdminHandler.scala @@ -58,7 +58,6 @@ import com.digitalasset.canton.logging.{ErrorLoggingContext, NamedLoggerFactory} import com.digitalasset.canton.time.Clock import com.digitalasset.canton.tracing.TraceContext import com.digitalasset.canton.util.ErrorUtil -import io.grpc.Status import io.opentelemetry.api.trace.Tracer import org.apache.pekko.stream.Materializer import org.lfdecentralizedtrust.splice.config.{NetworkAppClientConfig, UpgradesConfig} @@ -110,29 +109,18 @@ class HttpSvAdminHandler( // Similar to PublishScanConfigTrigger, this class creates its own scan connection // on demand, because scan might not be available at application startup. - private def createScanConnection(): Future[ScanConnection] = - config.scan match { - case None => - Future.failed( - Status.UNAVAILABLE - .withDescription( - "This application is not configured to connect to a scan service. " + - " Check the application configuration or use the scan API to query votes information." - ) - .asRuntimeException() - ) - case Some(scanConfig) => - implicit val tc: TraceContext = TraceContext.empty - ScanConnection - .singleUncached( - ScanAppClientConfig(NetworkAppClientConfig(scanConfig.internalUrl)), - upgradesConfig, - clock, - retryProvider, - loggerFactory, - retryConnectionOnInitialFailure = true, - ) - } + private def createScanConnection(): Future[ScanConnection] = { + implicit val tc: TraceContext = TraceContext.empty + ScanConnection + .singleUncached( + ScanAppClientConfig(NetworkAppClientConfig(config.scan.internalUrl)), + upgradesConfig, + clock, + retryProvider, + loggerFactory, + retryConnectionOnInitialFailure = true, + ) + } @SuppressWarnings(Array("org.wartremover.warts.Var")) private var scanConnectionV: Option[Future[ScanConnection]] = None private def scanConnectionF: Future[ScanConnection] = blocking { diff --git a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/automation/SvDsoAutomationService.scala b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/automation/SvDsoAutomationService.scala index 74c51326a5..8260ef3bdf 100644 --- a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/automation/SvDsoAutomationService.scala +++ b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/automation/SvDsoAutomationService.scala @@ -86,7 +86,6 @@ class SvDsoAutomationService( ledgerClient, retryProvider, config.ingestFromParticipantBegin, - config.ingestUpdateHistoryFromParticipantBegin, config.parameters, ) { @@ -94,6 +93,8 @@ class SvDsoAutomationService( : org.lfdecentralizedtrust.splice.sv.automation.SvDsoAutomationService.type = SvDsoAutomationService + // notice the absence of UpdateHistory: the history for the dso party is duplicate with Scan + private[splice] val restartDsoDelegateBasedAutomationTrigger = new RestartDsoDelegateBasedAutomationTrigger( triggerContext, @@ -378,17 +379,15 @@ class SvDsoAutomationService( ) ) - config.scan.foreach { scan => - registerTrigger( - new PublishScanConfigTrigger( - triggerContext, - dsoStore, - connection(SpliceLedgerConnectionPriority.Low), - scan, - upgradesConfig, - ) + registerTrigger( + new PublishScanConfigTrigger( + triggerContext, + dsoStore, + connection(SpliceLedgerConnectionPriority.Low), + config.scan, + upgradesConfig, ) - } + ) config.followAmuletConversionRateFeed.foreach { c => registerTrigger( @@ -450,6 +449,8 @@ class SvDsoAutomationService( new SequencerPruningTrigger( contextWithSpecificPolling, dsoStore, + config.scan, + upgradesConfig, sequencerContext.sequencerAdminConnection, sequencerContext.mediatorAdminConnection, clock, diff --git a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/automation/SvSvAutomationService.scala b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/automation/SvSvAutomationService.scala index 9b1c98a434..a94806271e 100644 --- a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/automation/SvSvAutomationService.scala +++ b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/automation/SvSvAutomationService.scala @@ -23,7 +23,7 @@ import org.lfdecentralizedtrust.splice.sv.config.SvAppBackendConfig import org.lfdecentralizedtrust.splice.sv.store.{SvDsoStore, SvSvStore} import org.lfdecentralizedtrust.splice.sv.LocalSynchronizerNode import com.digitalasset.canton.logging.NamedLoggerFactory -import com.digitalasset.canton.resource.Storage +import com.digitalasset.canton.resource.DbStorage import com.digitalasset.canton.time.Clock import io.opentelemetry.api.trace.Tracer import org.lfdecentralizedtrust.splice.store.AppStoreWithIngestion.SpliceLedgerConnectionPriority @@ -37,7 +37,7 @@ class SvSvAutomationService( config: SvAppBackendConfig, svStore: SvSvStore, dsoStore: SvDsoStore, - storage: Storage, + storage: DbStorage, ledgerClient: SpliceLedgerClient, participantAdminConnection: ParticipantAdminConnection, localSynchronizerNode: Option[LocalSynchronizerNode], @@ -56,7 +56,6 @@ class SvSvAutomationService( ledgerClient, retryProvider, config.ingestFromParticipantBegin, - config.ingestUpdateHistoryFromParticipantBegin, config.parameters, ) { override def companion: org.lfdecentralizedtrust.splice.sv.automation.SvSvAutomationService.type = @@ -69,6 +68,8 @@ class SvSvAutomationService( ) ) + // notice the absence of UpdateHistory: the history for the sv party is not needed as we don't foresee ever adding TxLog for it + registerTrigger( SqlIndexInitializationTrigger( storage, diff --git a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/automation/singlesv/SequencerPruningTrigger.scala b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/automation/singlesv/SequencerPruningTrigger.scala index d1c436feb6..96fbf84f24 100644 --- a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/automation/singlesv/SequencerPruningTrigger.scala +++ b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/automation/singlesv/SequencerPruningTrigger.scala @@ -10,7 +10,7 @@ import org.lfdecentralizedtrust.splice.environment.{ SequencerAdminConnection, } import org.lfdecentralizedtrust.splice.sv.store.SvDsoStore -import org.lfdecentralizedtrust.splice.util.DomainRecordTimeRange +import org.lfdecentralizedtrust.splice.util.{DomainRecordTimeRange, TemplateJsonDecoder} import com.digitalasset.canton.config.NonNegativeFiniteDuration import com.digitalasset.canton.data.CantonTimestamp import com.digitalasset.canton.lifecycle.{AsyncOrSyncCloseable, SyncCloseable} @@ -24,14 +24,25 @@ import io.opentelemetry.api.trace.Tracer import scala.jdk.DurationConverters.* import io.grpc.Status +import org.apache.pekko.stream.Materializer +import org.lfdecentralizedtrust.splice.config.{NetworkAppClientConfig, UpgradesConfig} +import org.lfdecentralizedtrust.splice.http.HttpClient +import org.lfdecentralizedtrust.splice.scan.admin.api.client.{ + BackfillingScanConnection, + ScanConnection, +} +import org.lfdecentralizedtrust.splice.scan.config.ScanAppClientConfig +import org.lfdecentralizedtrust.splice.sv.config.SvScanConfig -import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.{ExecutionContextExecutor, Future} /** A trigger to periodically call the sequencer pruning command */ class SequencerPruningTrigger( override protected val context: TriggerContext, store: SvDsoStore, + scanConfig: SvScanConfig, + upgradesConfig: UpgradesConfig, sequencerAdminConnection: SequencerAdminConnection, mediatorAdminConnection: MediatorAdminConnection, clock: Clock, @@ -39,19 +50,38 @@ class SequencerPruningTrigger( participantAdminConnection: ParticipantAdminConnection, migrationId: Long, )(implicit - override val ec: ExecutionContext, + override val ec: ExecutionContextExecutor, override val tracer: Tracer, + mat: Materializer, + httpClient: HttpClient, + templateDecoder: TemplateJsonDecoder, ) extends PollingTrigger { val pruningMetrics = new SequencerPruningMetrics( context.metricsFactory ) + // Similar to PublishScanConfigTrigger, this class creates its own scan connection + // on demand, because scan might not be available at application startup. + private def createScanConnection()(implicit + tc: TraceContext + ): Future[BackfillingScanConnection] = + ScanConnection + .singleUncached( + ScanAppClientConfig(NetworkAppClientConfig(scanConfig.internalUrl)), + upgradesConfig, + clock, + context.retryProvider, + loggerFactory, + retryConnectionOnInitialFailure = true, + ) + override def performWorkIfAvailable()(implicit traceContext: TraceContext): Future[Boolean] = for { synchronizerId <- sequencerAdminConnection.getStatus.map(_.trySuccess.synchronizerId) - recordTimeRangeO <- store.updateHistory - .getRecordTimeRange(migrationId) - .map(_.get(synchronizerId.logical)) + scanConnection <- createScanConnection() + recordTimeRangeO <- scanConnection + .getMigrationInfo(migrationId) + .map(_.flatMap(_.recordTimeRange.get(synchronizerId.logical))) _ <- recordTimeRangeO match { case Some(DomainRecordTimeRange(earliest, latest)) if (latest - earliest).compareTo(retentionPeriod.asJava) > 0 => diff --git a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/config/SvAppConfig.scala b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/config/SvAppConfig.scala index b5efdbea1d..bbe38a0c7e 100644 --- a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/config/SvAppConfig.scala +++ b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/config/SvAppConfig.scala @@ -285,7 +285,7 @@ case class SvAppBackendConfig( initialAmuletPriceVote: Option[BigDecimal] = None, cometBftConfig: Option[SvCometBftConfig] = None, localSynchronizerNode: Option[SvSynchronizerNodeConfig], - scan: Option[SvScanConfig], + scan: SvScanConfig, participantBootstrappingDump: Option[ParticipantBootstrapDumpConfig] = None, identitiesDump: Option[BackupDumpConfig] = None, domainMigrationDumpPath: Option[Path] = None, diff --git a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/onboarding/NodeInitializerUtil.scala b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/onboarding/NodeInitializerUtil.scala index 0c3a3d6ce5..67f6c3a809 100644 --- a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/onboarding/NodeInitializerUtil.scala +++ b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/onboarding/NodeInitializerUtil.scala @@ -31,7 +31,7 @@ import org.lfdecentralizedtrust.splice.sv.store.{SvDsoStore, SvStore, SvSvStore} import org.lfdecentralizedtrust.splice.util.TemplateJsonDecoder import com.digitalasset.canton.lifecycle.CloseContext import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging} -import com.digitalasset.canton.resource.Storage +import com.digitalasset.canton.resource.DbStorage import com.digitalasset.canton.time.Clock import com.digitalasset.canton.topology.{ParticipantId, PartyId, SynchronizerId} import com.digitalasset.canton.tracing.{Spanning, TraceContext} @@ -57,7 +57,7 @@ import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future} trait NodeInitializerUtil extends NamedLogging with Spanning with SynchronizerNodeConfigClient { protected val config: SvAppBackendConfig - protected val storage: Storage + protected val storage: DbStorage protected val retryProvider: RetryProvider protected val clock: Clock protected val domainTimeSync: DomainTimeSynchronization diff --git a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/onboarding/SynchronizerNodeReconciler.scala b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/onboarding/SynchronizerNodeReconciler.scala index 71d6301f24..0cf0f807db 100644 --- a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/onboarding/SynchronizerNodeReconciler.scala +++ b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/onboarding/SynchronizerNodeReconciler.scala @@ -46,7 +46,7 @@ class SynchronizerNodeReconciler( synchronizerId: SynchronizerId, state: SynchronizerNodeState, migrationId: Long, - scan: Option[SvScanConfig], + scanConfig: SvScanConfig, )(implicit ec: ExecutionContext, tc: TraceContext, @@ -54,9 +54,7 @@ class SynchronizerNodeReconciler( def setConfigIfRequired() = for { localSequencerConfig <- SvUtil.getSequencerConfig(synchronizerNode, migrationId) localMediatorConfig <- SvUtil.getMediatorConfig(synchronizerNode) - localScanConfig = scan - .map(scanConfig => new ScanConfig(scanConfig.publicUrl.toString())) - .toJava + localScanConfig = java.util.Optional.of(new ScanConfig(scanConfig.publicUrl.toString())) rulesAndState <- dsoStore.getDsoRulesWithSvNodeState(svParty) nodeState = rulesAndState.svNodeState.payload // TODO(DACH-NY/canton-network-node#4901): do not use default, but reconcile all configured domains diff --git a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/onboarding/domainmigration/DomainMigrationInitializer.scala b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/onboarding/domainmigration/DomainMigrationInitializer.scala index 625416fe57..39384b4da0 100644 --- a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/onboarding/domainmigration/DomainMigrationInitializer.scala +++ b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/onboarding/domainmigration/DomainMigrationInitializer.scala @@ -59,7 +59,7 @@ import com.digitalasset.canton.admin.api.client.data.{NodeStatus, WaitingForInit import com.digitalasset.canton.data.CantonTimestamp import com.digitalasset.canton.lifecycle.CloseContext import com.digitalasset.canton.logging.NamedLoggerFactory -import com.digitalasset.canton.resource.Storage +import com.digitalasset.canton.resource.DbStorage import com.digitalasset.canton.sequencing.SequencerConnections import com.digitalasset.canton.time.Clock import com.digitalasset.canton.topology.{ParticipantId, PhysicalSynchronizerId, SynchronizerId} @@ -90,7 +90,7 @@ class DomainMigrationInitializer( override protected val clock: Clock, override protected val domainTimeSync: DomainTimeSynchronization, override protected val domainUnpausedSync: DomainUnpausedSynchronization, - override protected val storage: Storage, + override protected val storage: DbStorage, override protected val loggerFactory: NamedLoggerFactory, override protected val retryProvider: RetryProvider, override protected val spliceInstanceNamesConfig: SpliceInstanceNamesConfig, diff --git a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/onboarding/joining/JoiningNodeInitializer.scala b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/onboarding/joining/JoiningNodeInitializer.scala index bff035a7ba..a331f70265 100644 --- a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/onboarding/joining/JoiningNodeInitializer.scala +++ b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/onboarding/joining/JoiningNodeInitializer.scala @@ -13,7 +13,7 @@ import com.digitalasset.canton.config.SynchronizerTimeTrackerConfig import com.digitalasset.canton.lifecycle.CloseContext import com.digitalasset.canton.logging.NamedLoggerFactory import com.digitalasset.canton.participant.synchronizer.SynchronizerConnectionConfig -import com.digitalasset.canton.resource.Storage +import com.digitalasset.canton.resource.DbStorage import com.digitalasset.canton.sequencing.{ GrpcSequencerConnection, SequencerConnectionPoolDelays, @@ -94,7 +94,7 @@ class JoiningNodeInitializer( override protected val clock: Clock, override protected val domainTimeSync: DomainTimeSynchronization, override protected val domainUnpausedSync: DomainUnpausedSynchronization, - override protected val storage: Storage, + override protected val storage: DbStorage, override val loggerFactory: NamedLoggerFactory, override protected val retryProvider: RetryProvider, override protected val spliceInstanceNamesConfig: SpliceInstanceNamesConfig, diff --git a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/onboarding/sv1/SV1Initializer.scala b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/onboarding/sv1/SV1Initializer.scala index 4210f76fd8..e58faaecf2 100644 --- a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/onboarding/sv1/SV1Initializer.scala +++ b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/onboarding/sv1/SV1Initializer.scala @@ -59,7 +59,7 @@ import com.digitalasset.canton.lifecycle.CloseContext import com.digitalasset.canton.logging.NamedLoggerFactory import com.digitalasset.canton.participant.synchronizer.SynchronizerConnectionConfig import com.digitalasset.canton.protocol.DynamicSynchronizerParameters -import com.digitalasset.canton.resource.Storage +import com.digitalasset.canton.resource.DbStorage import com.digitalasset.canton.sequencing.{ GrpcSequencerConnection, SequencerConnectionPoolDelays, @@ -114,7 +114,7 @@ class SV1Initializer( override protected val clock: Clock, override protected val domainTimeSync: DomainTimeSynchronization, override protected val domainUnpausedSync: DomainUnpausedSynchronization, - override protected val storage: Storage, + override protected val storage: DbStorage, override protected val retryProvider: RetryProvider, override protected val spliceInstanceNamesConfig: SpliceInstanceNamesConfig, override protected val loggerFactory: NamedLoggerFactory, diff --git a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/store/SvDsoStore.scala b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/store/SvDsoStore.scala index 555950e3cf..9ff3f3baf5 100644 --- a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/store/SvDsoStore.scala +++ b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/store/SvDsoStore.scala @@ -44,7 +44,7 @@ import com.digitalasset.canton.data.CantonTimestamp import com.digitalasset.canton.lifecycle.CloseContext import com.digitalasset.canton.logging.pretty.{Pretty, PrettyPrinting} import com.digitalasset.canton.logging.NamedLoggerFactory -import com.digitalasset.canton.resource.{DbStorage, Storage} +import com.digitalasset.canton.resource.DbStorage import com.digitalasset.canton.topology.{Member, ParticipantId, PartyId, SynchronizerId} import com.digitalasset.canton.tracing.TraceContext import com.digitalasset.canton.util.MonadUtil @@ -984,7 +984,7 @@ trait SvDsoStore object SvDsoStore { def apply( key: SvStore.Key, - storage: Storage, + storage: DbStorage, loggerFactory: NamedLoggerFactory, retryProvider: RetryProvider, domainMigrationInfo: DomainMigrationInfo, @@ -995,19 +995,15 @@ object SvDsoStore { templateJsonDecoder: TemplateJsonDecoder, closeContext: CloseContext, ): SvDsoStore = { - storage match { - case db: DbStorage => - new DbSvDsoStore( - key, - db, - loggerFactory, - retryProvider, - domainMigrationInfo, - participantId, - ingestionConfig, - ) - case storageType => throw new RuntimeException(s"Unsupported storage type $storageType") - } + new DbSvDsoStore( + key, + storage, + loggerFactory, + retryProvider, + domainMigrationInfo, + participantId, + ingestionConfig, + ) } /** Contract filter of an sv acs store for a specific acs party. */ diff --git a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/store/SvSvStore.scala b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/store/SvSvStore.scala index d6e4c4767e..3a4697567e 100644 --- a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/store/SvSvStore.scala +++ b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/store/SvSvStore.scala @@ -19,7 +19,7 @@ import org.lfdecentralizedtrust.splice.sv.store.db.SvTables.SvAcsStoreRowData import org.lfdecentralizedtrust.splice.util.{Contract, TemplateJsonDecoder} import com.digitalasset.canton.lifecycle.CloseContext import com.digitalasset.canton.logging.NamedLoggerFactory -import com.digitalasset.canton.resource.{DbStorage, Storage} +import com.digitalasset.canton.resource.DbStorage import com.digitalasset.canton.topology.ParticipantId import com.digitalasset.canton.tracing.TraceContext import org.lfdecentralizedtrust.splice.config.IngestionConfig @@ -94,7 +94,7 @@ trait SvSvStore extends AppStore { object SvSvStore { def apply( key: SvStore.Key, - storage: Storage, + storage: DbStorage, loggerFactory: NamedLoggerFactory, retryProvider: RetryProvider, domainMigrationInfo: DomainMigrationInfo, @@ -105,19 +105,15 @@ object SvSvStore { templateJsonDecoder: TemplateJsonDecoder, closeContext: CloseContext, ): SvSvStore = - storage match { - case db: DbStorage => - new DbSvSvStore( - key, - db, - loggerFactory, - retryProvider, - domainMigrationInfo, - participantId, - ingestionConfig, - ) - case storageType => throw new RuntimeException(s"Unsupported storage type $storageType") - } + new DbSvSvStore( + key, + storage, + loggerFactory, + retryProvider, + domainMigrationInfo, + participantId, + ingestionConfig, + ) /** Contract filter of an sv acs store for a specific acs party. */ def contractFilter(key: SvStore.Key): MultiDomainAcsStore.ContractFilter[ diff --git a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/store/db/DbSvDsoStore.scala b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/store/db/DbSvDsoStore.scala index 37a24f005f..ba0b052172 100644 --- a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/store/db/DbSvDsoStore.scala +++ b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/store/db/DbSvDsoStore.scala @@ -60,7 +60,6 @@ import com.digitalasset.canton.topology.{Member, ParticipantId, PartyId, Synchro import com.digitalasset.canton.tracing.TraceContext import io.grpc.Status import org.lfdecentralizedtrust.splice.config.IngestionConfig -import org.lfdecentralizedtrust.splice.store.UpdateHistory.BackfillingRequirement import slick.jdbc.GetResult import slick.jdbc.canton.ActionBasedSQLInterpolation.Implicits.actionBasedSQLInterpolationCanton import slick.jdbc.canton.SQLActionBuilder @@ -98,10 +97,6 @@ class DbSvDsoStore( ), ), domainMigrationInfo, - participantId, - enableissue12777Workaround = false, - enableImportUpdateBackfill = false, - BackfillingRequirement.BackfillingNotRequired, ingestionConfig, ) with SvDsoStore diff --git a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/store/db/DbSvSvStore.scala b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/store/db/DbSvSvStore.scala index ef6381573f..61b337f1b9 100644 --- a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/store/db/DbSvSvStore.scala +++ b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/store/db/DbSvSvStore.scala @@ -21,7 +21,6 @@ import com.digitalasset.canton.resource.DbStorage import com.digitalasset.canton.topology.ParticipantId import com.digitalasset.canton.tracing.TraceContext import org.lfdecentralizedtrust.splice.config.IngestionConfig -import org.lfdecentralizedtrust.splice.store.UpdateHistory.BackfillingRequirement import org.lfdecentralizedtrust.splice.store.db.AcsQueries.AcsStoreId import slick.jdbc.canton.ActionBasedSQLInterpolation.Implicits.actionBasedSQLInterpolationCanton @@ -56,10 +55,6 @@ class DbSvSvStore( ), ), domainMigrationInfo = domainMigrationInfo, - participantId = participantId, - enableissue12777Workaround = false, - enableImportUpdateBackfill = false, - BackfillingRequirement.BackfillingNotRequired, ingestionConfig, ) with SvSvStore diff --git a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/util/SvUtil.scala b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/util/SvUtil.scala index 90f6a6a70f..7e061c820f 100644 --- a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/util/SvUtil.scala +++ b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/util/SvUtil.scala @@ -180,7 +180,7 @@ object SvUtil { def getSV1SynchronizerNodeConfig( cometBftNode: Option[CometBftNode], localSynchronizerNode: LocalSynchronizerNode, - scanConfig: Option[SvScanConfig], + scanConfig: SvScanConfig, synchronizerId: SynchronizerId, clock: Clock, migrationId: Long, @@ -234,7 +234,7 @@ object SvUtil { cometBftConfig, sequencerConfig.toJava, mediatorConfig.toJava, - scanConfig.map(c => new ScanConfig(c.publicUrl.toString())).toJava, + Optional.of(new ScanConfig(scanConfig.publicUrl.toString())), Optional.empty(), ) ).asJava diff --git a/apps/validator/src/main/scala/org/lfdecentralizedtrust/splice/validator/ValidatorApp.scala b/apps/validator/src/main/scala/org/lfdecentralizedtrust/splice/validator/ValidatorApp.scala index b71a6a4a5d..19c11de075 100644 --- a/apps/validator/src/main/scala/org/lfdecentralizedtrust/splice/validator/ValidatorApp.scala +++ b/apps/validator/src/main/scala/org/lfdecentralizedtrust/splice/validator/ValidatorApp.scala @@ -43,7 +43,7 @@ import org.lfdecentralizedtrust.splice.setup.{ ParticipantInitializer, ParticipantPartyMigrator, } -import org.lfdecentralizedtrust.splice.store.{AppStoreWithIngestion, UpdateHistory} +import org.lfdecentralizedtrust.splice.store.{AppStoreWithIngestion, HistoryMetrics, UpdateHistory} import org.lfdecentralizedtrust.splice.store.MultiDomainAcsStore.QueryResult import org.lfdecentralizedtrust.splice.util.{ AmuletConfigSchedule, @@ -88,7 +88,7 @@ import com.digitalasset.canton.data.CantonTimestamp import com.digitalasset.canton.ledger.api.util.DurationConversion import com.digitalasset.canton.lifecycle.LifeCycle import com.digitalasset.canton.logging.{NamedLoggerFactory, TracedLogger} -import com.digitalasset.canton.resource.Storage +import com.digitalasset.canton.resource.{DbStorage, Storage} import com.digitalasset.canton.time.Clock import com.digitalasset.canton.topology.{PartyId, SynchronizerId} import com.digitalasset.canton.tracing.{TraceContext, TracerProvider} @@ -104,6 +104,7 @@ import org.apache.pekko.http.scaladsl.server.Directives.* import org.apache.pekko.http.scaladsl.server.directives.BasicDirectives import com.google.protobuf.ByteString import org.lfdecentralizedtrust.splice.store.AppStoreWithIngestion.SpliceLedgerConnectionPriority +import org.lfdecentralizedtrust.splice.store.UpdateHistory.BackfillingRequirement import scala.concurrent.{ExecutionContextExecutor, Future} import scala.util.{Failure, Success} @@ -113,7 +114,7 @@ class ValidatorApp( override val name: InstanceName, val config: ValidatorAppBackendConfig, val amuletAppParameters: SharedSpliceAppParameters, - storage: Storage, + storage: DbStorage, override protected val clock: Clock, val loggerFactory: NamedLoggerFactory, tracerProvider: TracerProvider, @@ -769,6 +770,18 @@ class ValidatorApp( participantId, config.automation.ingestion, ) + validatorUpdateHistory = new UpdateHistory( + storage, + domainMigrationInfo, + store.storeName, + participantId, + store.acsContractFilter.ingestionFilter.primaryParty, + BackfillingRequirement.BackfillingNotRequired, + loggerFactory, + enableissue12777Workaround = false, + enableImportUpdateBackfill = false, + HistoryMetrics(retryProvider.metricsFactory, domainMigrationInfo.currentMigrationId), + ) domainTimeAutomationService = new DomainTimeAutomationService( config.domains.global.alias, participantAdminConnection, @@ -820,7 +833,7 @@ class ValidatorApp( clock, domainTimeAutomationService.domainTimeSync, domainParamsAutomationService.domainUnpausedSync, - storage: Storage, + storage, retryProvider, loggerFactory, domainMigrationInfo, @@ -874,6 +887,7 @@ class ValidatorApp( domainParamsAutomationService.domainUnpausedSync, walletManagerOpt, store, + validatorUpdateHistory, storage, scanConnection, ledgerClient, diff --git a/apps/validator/src/main/scala/org/lfdecentralizedtrust/splice/validator/automation/ValidatorAutomationService.scala b/apps/validator/src/main/scala/org/lfdecentralizedtrust/splice/validator/automation/ValidatorAutomationService.scala index fd9f87b0ff..44465a2305 100644 --- a/apps/validator/src/main/scala/org/lfdecentralizedtrust/splice/validator/automation/ValidatorAutomationService.scala +++ b/apps/validator/src/main/scala/org/lfdecentralizedtrust/splice/validator/automation/ValidatorAutomationService.scala @@ -20,6 +20,7 @@ import org.lfdecentralizedtrust.splice.scan.admin.api.client.BftScanConnection import org.lfdecentralizedtrust.splice.store.{ DomainTimeSynchronization, DomainUnpausedSynchronization, + UpdateHistory, } import org.lfdecentralizedtrust.splice.util.QualifiedName import org.lfdecentralizedtrust.splice.validator.domain.DomainConnector @@ -36,7 +37,7 @@ import org.lfdecentralizedtrust.splice.wallet.util.ValidatorTopupConfig import com.digitalasset.canton.config.NonNegativeFiniteDuration import com.digitalasset.canton.data.CantonTimestamp import com.digitalasset.canton.logging.NamedLoggerFactory -import com.digitalasset.canton.resource.Storage +import com.digitalasset.canton.resource.DbStorage import com.digitalasset.canton.time.Clock import com.digitalasset.canton.tracing.TraceContext import io.opentelemetry.api.trace.Tracer @@ -60,7 +61,8 @@ class ValidatorAutomationService( domainUnpausedSync: DomainUnpausedSynchronization, walletManagerOpt: Option[UserWalletManager], // None when config.enableWallet=false store: ValidatorStore, - storage: Storage, + val updateHistory: UpdateHistory, + storage: DbStorage, scanConnection: BftScanConnection, ledgerClient: SpliceLedgerClient, participantAdminConnection: ParticipantAdminConnection, @@ -93,13 +95,17 @@ class ValidatorAutomationService( ledgerClient, retryProvider, ingestFromParticipantBegin, - ingestUpdateHistoryFromParticipantBegin, params, ) { override def companion : org.lfdecentralizedtrust.splice.validator.automation.ValidatorAutomationService.type = ValidatorAutomationService + registerUpdateHistoryIngestion( + updateHistory, + ingestUpdateHistoryFromParticipantBegin, + ) + automationConfig.topologyMetricsPollingInterval.foreach(topologyPollingInterval => registerTrigger( new TopologyMetricsTrigger( diff --git a/apps/validator/src/main/scala/org/lfdecentralizedtrust/splice/validator/store/ValidatorStore.scala b/apps/validator/src/main/scala/org/lfdecentralizedtrust/splice/validator/store/ValidatorStore.scala index dcb0ae3a28..df54029818 100644 --- a/apps/validator/src/main/scala/org/lfdecentralizedtrust/splice/validator/store/ValidatorStore.scala +++ b/apps/validator/src/main/scala/org/lfdecentralizedtrust/splice/validator/store/ValidatorStore.scala @@ -27,7 +27,7 @@ import com.digitalasset.canton.config.NonNegativeFiniteDuration import com.digitalasset.canton.lifecycle.CloseContext import com.digitalasset.canton.logging.NamedLoggerFactory import com.digitalasset.canton.logging.pretty.{Pretty, PrettyPrinting} -import com.digitalasset.canton.resource.{DbStorage, Storage} +import com.digitalasset.canton.resource.DbStorage import com.digitalasset.canton.topology.{ParticipantId, PartyId, SynchronizerId} import com.digitalasset.canton.tracing.TraceContext import com.digitalasset.daml.lf.data.Time.Timestamp @@ -155,7 +155,7 @@ object ValidatorStore { def apply( key: Key, - storage: Storage, + storage: DbStorage, loggerFactory: NamedLoggerFactory, retryProvider: RetryProvider, domainMigrationInfo: DomainMigrationInfo, @@ -166,19 +166,15 @@ object ValidatorStore { templateJsonDecoder: TemplateJsonDecoder, closeContext: CloseContext, ): ValidatorStore = - storage match { - case storage: DbStorage => - new DbValidatorStore( - key, - storage, - loggerFactory, - retryProvider, - domainMigrationInfo, - participantId, - ingestionConfig, - ) - case storageType => throw new RuntimeException(s"Unsupported storage type $storageType") - } + new DbValidatorStore( + key, + storage, + loggerFactory, + retryProvider, + domainMigrationInfo, + participantId, + ingestionConfig, + ) case class Key( /** The validator party. */ diff --git a/apps/validator/src/main/scala/org/lfdecentralizedtrust/splice/validator/store/db/DbValidatorStore.scala b/apps/validator/src/main/scala/org/lfdecentralizedtrust/splice/validator/store/db/DbValidatorStore.scala index a9f919cef0..ce699e6602 100644 --- a/apps/validator/src/main/scala/org/lfdecentralizedtrust/splice/validator/store/db/DbValidatorStore.scala +++ b/apps/validator/src/main/scala/org/lfdecentralizedtrust/splice/validator/store/db/DbValidatorStore.scala @@ -38,7 +38,6 @@ import com.digitalasset.canton.topology.{ParticipantId, PartyId, SynchronizerId} import com.digitalasset.canton.tracing.TraceContext import org.lfdecentralizedtrust.splice.automation.MultiDomainExpiredContractTrigger.ListExpiredContracts import org.lfdecentralizedtrust.splice.config.IngestionConfig -import org.lfdecentralizedtrust.splice.store.UpdateHistory.BackfillingRequirement import org.lfdecentralizedtrust.splice.store.db.AcsQueries.AcsStoreId import slick.jdbc.canton.ActionBasedSQLInterpolation.Implicits.actionBasedSQLInterpolationCanton @@ -73,10 +72,6 @@ class DbValidatorStore( ), ), domainMigrationInfo = domainMigrationInfo, - participantId = participantId, - enableissue12777Workaround = false, - enableImportUpdateBackfill = false, - BackfillingRequirement.BackfillingNotRequired, ingestionConfig, ) with ValidatorStore diff --git a/apps/wallet/src/main/scala/org/lfdecentralizedtrust/splice/wallet/ExternalPartyWalletManager.scala b/apps/wallet/src/main/scala/org/lfdecentralizedtrust/splice/wallet/ExternalPartyWalletManager.scala index 8dcca6170e..e934a8164d 100644 --- a/apps/wallet/src/main/scala/org/lfdecentralizedtrust/splice/wallet/ExternalPartyWalletManager.scala +++ b/apps/wallet/src/main/scala/org/lfdecentralizedtrust/splice/wallet/ExternalPartyWalletManager.scala @@ -16,7 +16,7 @@ import org.lfdecentralizedtrust.splice.util.{HasHealth, TemplateJsonDecoder} import org.lfdecentralizedtrust.splice.wallet.store.{ExternalPartyWalletStore, WalletStore} import com.digitalasset.canton.lifecycle.* import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging} -import com.digitalasset.canton.resource.Storage +import com.digitalasset.canton.resource.DbStorage import com.digitalasset.canton.time.Clock import com.digitalasset.canton.topology.{ParticipantId, PartyId} import com.digitalasset.canton.tracing.TraceContext @@ -35,7 +35,7 @@ class ExternalPartyWalletManager( private[splice] val clock: Clock, domainTimeSync: DomainTimeSynchronization, domainUnpausedSync: DomainUnpausedSynchronization, - storage: Storage, + storage: DbStorage, retryProvider: RetryProvider, override val loggerFactory: NamedLoggerFactory, domainMigrationInfo: DomainMigrationInfo, diff --git a/apps/wallet/src/main/scala/org/lfdecentralizedtrust/splice/wallet/ExternalPartyWalletService.scala b/apps/wallet/src/main/scala/org/lfdecentralizedtrust/splice/wallet/ExternalPartyWalletService.scala index 89c580e62d..d0879bbad9 100644 --- a/apps/wallet/src/main/scala/org/lfdecentralizedtrust/splice/wallet/ExternalPartyWalletService.scala +++ b/apps/wallet/src/main/scala/org/lfdecentralizedtrust/splice/wallet/ExternalPartyWalletService.scala @@ -9,17 +9,20 @@ import org.lfdecentralizedtrust.splice.migration.DomainMigrationInfo import org.lfdecentralizedtrust.splice.store.{ DomainTimeSynchronization, DomainUnpausedSynchronization, + HistoryMetrics, + UpdateHistory, } import org.lfdecentralizedtrust.splice.util.{HasHealth, TemplateJsonDecoder} import org.lfdecentralizedtrust.splice.wallet.automation.ExternalPartyWalletAutomationService import org.lfdecentralizedtrust.splice.wallet.store.ExternalPartyWalletStore import com.digitalasset.canton.lifecycle.{CloseContext, FlagCloseable} import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging} -import com.digitalasset.canton.resource.Storage +import com.digitalasset.canton.resource.DbStorage import com.digitalasset.canton.time.Clock import com.digitalasset.canton.topology.ParticipantId import io.opentelemetry.api.trace.Tracer import org.apache.pekko.stream.Materializer +import org.lfdecentralizedtrust.splice.store.UpdateHistory.BackfillingRequirement import scala.concurrent.ExecutionContext @@ -31,7 +34,7 @@ class ExternalPartyWalletService( clock: Clock, domainTimeSync: DomainTimeSynchronization, domainUnpausedSync: DomainUnpausedSynchronization, - storage: Storage, + storage: DbStorage, override protected[this] val retryProvider: RetryProvider, override val loggerFactory: NamedLoggerFactory, domainMigrationInfo: DomainMigrationInfo, @@ -61,8 +64,22 @@ class ExternalPartyWalletService( automationConfig.ingestion, ) + val updateHistory = new UpdateHistory( + storage, + domainMigrationInfo, + store.storeName, + participantId, + store.acsContractFilter.ingestionFilter.primaryParty, + BackfillingRequirement.BackfillingNotRequired, + loggerFactory, + enableissue12777Workaround = false, + enableImportUpdateBackfill = false, + HistoryMetrics(retryProvider.metricsFactory, domainMigrationInfo.currentMigrationId), + ) + val automation = new ExternalPartyWalletAutomationService( store, + updateHistory, ledgerClient, automationConfig, clock, @@ -80,6 +97,7 @@ class ExternalPartyWalletService( override def onClosed(): Unit = { automation.close() + updateHistory.close() store.close() super.onClosed() } diff --git a/apps/wallet/src/main/scala/org/lfdecentralizedtrust/splice/wallet/UserWalletManager.scala b/apps/wallet/src/main/scala/org/lfdecentralizedtrust/splice/wallet/UserWalletManager.scala index 54d5a0f542..3fb7c8d4ca 100644 --- a/apps/wallet/src/main/scala/org/lfdecentralizedtrust/splice/wallet/UserWalletManager.scala +++ b/apps/wallet/src/main/scala/org/lfdecentralizedtrust/splice/wallet/UserWalletManager.scala @@ -31,7 +31,7 @@ import org.lfdecentralizedtrust.splice.wallet.store.{UserWalletStore, WalletStor import org.lfdecentralizedtrust.splice.wallet.util.ValidatorTopupConfig import com.digitalasset.canton.lifecycle.* import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging} -import com.digitalasset.canton.resource.Storage +import com.digitalasset.canton.resource.DbStorage import com.digitalasset.canton.time.Clock import com.digitalasset.canton.topology.{ParticipantId, PartyId} import com.digitalasset.canton.tracing.TraceContext @@ -53,7 +53,7 @@ class UserWalletManager( domainTimeSync: DomainTimeSynchronization, domainUnpausedSync: DomainUnpausedSynchronization, treasuryConfig: TreasuryConfig, - storage: Storage, + storage: DbStorage, retryProvider: RetryProvider, scanConnection: BftScanConnection, packageVersionSupport: PackageVersionSupport, diff --git a/apps/wallet/src/main/scala/org/lfdecentralizedtrust/splice/wallet/UserWalletService.scala b/apps/wallet/src/main/scala/org/lfdecentralizedtrust/splice/wallet/UserWalletService.scala index ea5f5c0b06..5bfe8634e3 100644 --- a/apps/wallet/src/main/scala/org/lfdecentralizedtrust/splice/wallet/UserWalletService.scala +++ b/apps/wallet/src/main/scala/org/lfdecentralizedtrust/splice/wallet/UserWalletService.scala @@ -11,6 +11,8 @@ import org.lfdecentralizedtrust.splice.scan.admin.api.client.BftScanConnection import org.lfdecentralizedtrust.splice.store.{ DomainTimeSynchronization, DomainUnpausedSynchronization, + HistoryMetrics, + UpdateHistory, } import org.lfdecentralizedtrust.splice.util.{HasHealth, SpliceCircuitBreaker, TemplateJsonDecoder} import org.lfdecentralizedtrust.splice.wallet.automation.UserWalletAutomationService @@ -24,13 +26,14 @@ import org.lfdecentralizedtrust.splice.wallet.treasury.TreasuryService import org.lfdecentralizedtrust.splice.wallet.util.ValidatorTopupConfig import com.digitalasset.canton.lifecycle.{CloseContext, FlagCloseable} import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging} -import com.digitalasset.canton.resource.Storage +import com.digitalasset.canton.resource.DbStorage import com.digitalasset.canton.time.Clock import com.digitalasset.canton.topology.ParticipantId import io.opentelemetry.api.trace.Tracer import org.apache.pekko.actor.Scheduler import org.apache.pekko.stream.Materializer import org.lfdecentralizedtrust.splice.store.AppStoreWithIngestion.SpliceLedgerConnectionPriority +import org.lfdecentralizedtrust.splice.store.UpdateHistory.BackfillingRequirement import scala.concurrent.ExecutionContext @@ -44,7 +47,7 @@ class UserWalletService( domainTimeSync: DomainTimeSynchronization, domainUnpausedSync: DomainUnpausedSynchronization, treasuryConfig: TreasuryConfig, - storage: Storage, + storage: DbStorage, override protected[this] val retryProvider: RetryProvider, override val loggerFactory: NamedLoggerFactory, scanConnection: BftScanConnection, @@ -84,6 +87,20 @@ class UserWalletService( automationConfig.ingestion, ) + val updateHistory: UpdateHistory = + new UpdateHistory( + storage, + domainMigrationInfo, + store.storeName, + participantId, + store.acsContractFilter.ingestionFilter.primaryParty, + BackfillingRequirement.BackfillingNotRequired, + loggerFactory, + enableissue12777Workaround = true, + enableImportUpdateBackfill = false, + HistoryMetrics(retryProvider.metricsFactory, domainMigrationInfo.currentMigrationId), + ) + val treasury: TreasuryService = new TreasuryService( // The treasury gets its own connection, and is required to manage waiting for the store on its own. ledgerClient.connection( @@ -107,6 +124,7 @@ class UserWalletService( val automation = new UserWalletAutomationService( store, + updateHistory, treasury, ledgerClient, automationConfig, diff --git a/apps/wallet/src/main/scala/org/lfdecentralizedtrust/splice/wallet/automation/ExternalPartyWalletAutomationService.scala b/apps/wallet/src/main/scala/org/lfdecentralizedtrust/splice/wallet/automation/ExternalPartyWalletAutomationService.scala index 2e9a698797..d5f01b86f9 100644 --- a/apps/wallet/src/main/scala/org/lfdecentralizedtrust/splice/wallet/automation/ExternalPartyWalletAutomationService.scala +++ b/apps/wallet/src/main/scala/org/lfdecentralizedtrust/splice/wallet/automation/ExternalPartyWalletAutomationService.scala @@ -13,6 +13,7 @@ import org.lfdecentralizedtrust.splice.environment.* import org.lfdecentralizedtrust.splice.store.{ DomainTimeSynchronization, DomainUnpausedSynchronization, + UpdateHistory, } import org.lfdecentralizedtrust.splice.wallet.store.ExternalPartyWalletStore import com.digitalasset.canton.logging.NamedLoggerFactory @@ -24,6 +25,7 @@ import scala.concurrent.ExecutionContext class ExternalPartyWalletAutomationService( store: ExternalPartyWalletStore, + updateHistory: UpdateHistory, ledgerClient: SpliceLedgerClient, automationConfig: AutomationConfig, clock: Clock, @@ -47,12 +49,16 @@ class ExternalPartyWalletAutomationService( ledgerClient, retryProvider, ingestFromParticipantBegin, - ingestUpdateHistoryFromParticipantBegin, params, ) { override def companion : org.lfdecentralizedtrust.splice.wallet.automation.ExternalPartyWalletAutomationService.type = ExternalPartyWalletAutomationService + + registerUpdateHistoryIngestion( + updateHistory, + ingestUpdateHistoryFromParticipantBegin, + ) } object ExternalPartyWalletAutomationService extends AutomationServiceCompanion { diff --git a/apps/wallet/src/main/scala/org/lfdecentralizedtrust/splice/wallet/automation/UserWalletAutomationService.scala b/apps/wallet/src/main/scala/org/lfdecentralizedtrust/splice/wallet/automation/UserWalletAutomationService.scala index a15b0d2210..f47324c87d 100644 --- a/apps/wallet/src/main/scala/org/lfdecentralizedtrust/splice/wallet/automation/UserWalletAutomationService.scala +++ b/apps/wallet/src/main/scala/org/lfdecentralizedtrust/splice/wallet/automation/UserWalletAutomationService.scala @@ -19,6 +19,7 @@ import org.lfdecentralizedtrust.splice.scan.admin.api.client.BftScanConnection import org.lfdecentralizedtrust.splice.store.{ DomainTimeSynchronization, DomainUnpausedSynchronization, + UpdateHistory, } import org.lfdecentralizedtrust.splice.util.QualifiedName import org.lfdecentralizedtrust.splice.wallet.config.{AutoAcceptTransfersConfig, WalletSweepConfig} @@ -35,6 +36,7 @@ import scala.concurrent.ExecutionContext class UserWalletAutomationService( store: UserWalletStore, + val updateHistory: UpdateHistory, treasury: TreasuryService, ledgerClient: SpliceLedgerClient, automationConfig: AutomationConfig, @@ -67,13 +69,17 @@ class UserWalletAutomationService( ledgerClient, retryProvider, ingestFromParticipantBegin, - ingestUpdateHistoryFromParticipantBegin, paramsConfig, ) { override def companion : org.lfdecentralizedtrust.splice.wallet.automation.UserWalletAutomationService.type = UserWalletAutomationService + registerUpdateHistoryIngestion( + updateHistory, + ingestUpdateHistoryFromParticipantBegin, + ) + registerTrigger( new ExpireTransferOfferTrigger( triggerContext, @@ -182,6 +188,7 @@ class UserWalletAutomationService( registerTrigger( new TxLogBackfillingTrigger( store, + updateHistory, txLogBackfillingBatchSize, triggerContext, ) diff --git a/apps/wallet/src/main/scala/org/lfdecentralizedtrust/splice/wallet/store/ExternalPartyWalletStore.scala b/apps/wallet/src/main/scala/org/lfdecentralizedtrust/splice/wallet/store/ExternalPartyWalletStore.scala index cd0969182e..6e0927f798 100644 --- a/apps/wallet/src/main/scala/org/lfdecentralizedtrust/splice/wallet/store/ExternalPartyWalletStore.scala +++ b/apps/wallet/src/main/scala/org/lfdecentralizedtrust/splice/wallet/store/ExternalPartyWalletStore.scala @@ -20,7 +20,7 @@ import org.lfdecentralizedtrust.splice.wallet.store.db.WalletTables.ExternalPart import com.digitalasset.canton.lifecycle.CloseContext import com.digitalasset.canton.logging.pretty.* import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging} -import com.digitalasset.canton.resource.{DbStorage, Storage} +import com.digitalasset.canton.resource.DbStorage import com.digitalasset.canton.topology.{ParticipantId, PartyId} import com.digitalasset.canton.tracing.TraceContext import org.lfdecentralizedtrust.splice.config.IngestionConfig @@ -55,7 +55,7 @@ trait ExternalPartyWalletStore extends TransferInputStore with NamedLogging { object ExternalPartyWalletStore { def apply( key: Key, - storage: Storage, + storage: DbStorage, loggerFactory: NamedLoggerFactory, retryProvider: RetryProvider, domainMigrationInfo: DomainMigrationInfo, @@ -66,19 +66,15 @@ object ExternalPartyWalletStore { templateJsonDecoder: TemplateJsonDecoder, close: CloseContext, ): ExternalPartyWalletStore = { - storage match { - case dbStorage: DbStorage => - new DbExternalPartyWalletStore( - key, - dbStorage, - loggerFactory, - retryProvider, - domainMigrationInfo, - participantId, - ingestionConfig, - ) - case storageType => throw new RuntimeException(s"Unsupported storage type $storageType") - } + new DbExternalPartyWalletStore( + key, + storage, + loggerFactory, + retryProvider, + domainMigrationInfo, + participantId, + ingestionConfig, + ) } case class Key( diff --git a/apps/wallet/src/main/scala/org/lfdecentralizedtrust/splice/wallet/store/UserWalletStore.scala b/apps/wallet/src/main/scala/org/lfdecentralizedtrust/splice/wallet/store/UserWalletStore.scala index 227a1a362a..6f3532c45e 100644 --- a/apps/wallet/src/main/scala/org/lfdecentralizedtrust/splice/wallet/store/UserWalletStore.scala +++ b/apps/wallet/src/main/scala/org/lfdecentralizedtrust/splice/wallet/store/UserWalletStore.scala @@ -40,7 +40,7 @@ import com.digitalasset.canton.data.CantonTimestamp import com.digitalasset.canton.lifecycle.CloseContext import com.digitalasset.canton.logging.pretty.* import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging} -import com.digitalasset.canton.resource.{DbStorage, Storage} +import com.digitalasset.canton.resource.DbStorage import com.digitalasset.canton.topology.{ParticipantId, PartyId} import com.digitalasset.canton.tracing.TraceContext import io.grpc.Status @@ -477,7 +477,7 @@ object UserWalletStore { def apply( key: Key, - storage: Storage, + storage: DbStorage, loggerFactory: NamedLoggerFactory, retryProvider: RetryProvider, domainMigrationInfo: DomainMigrationInfo, @@ -488,19 +488,15 @@ object UserWalletStore { templateJsonDecoder: TemplateJsonDecoder, close: CloseContext, ): UserWalletStore = { - storage match { - case dbStorage: DbStorage => - new DbUserWalletStore( - key, - dbStorage, - loggerFactory, - retryProvider, - domainMigrationInfo, - participantId, - ingestionConfig, - ) - case storageType => throw new RuntimeException(s"Unsupported storage type $storageType") - } + new DbUserWalletStore( + key, + storage, + loggerFactory, + retryProvider, + domainMigrationInfo, + participantId, + ingestionConfig, + ) } case class Key( diff --git a/apps/wallet/src/main/scala/org/lfdecentralizedtrust/splice/wallet/store/db/DbExternalPartyWalletStore.scala b/apps/wallet/src/main/scala/org/lfdecentralizedtrust/splice/wallet/store/db/DbExternalPartyWalletStore.scala index deed0c45f1..4b106dc088 100644 --- a/apps/wallet/src/main/scala/org/lfdecentralizedtrust/splice/wallet/store/db/DbExternalPartyWalletStore.scala +++ b/apps/wallet/src/main/scala/org/lfdecentralizedtrust/splice/wallet/store/db/DbExternalPartyWalletStore.scala @@ -21,7 +21,6 @@ import com.digitalasset.canton.resource.DbStorage import com.digitalasset.canton.util.ShowUtil.* import com.digitalasset.canton.topology.ParticipantId import org.lfdecentralizedtrust.splice.config.IngestionConfig -import org.lfdecentralizedtrust.splice.store.UpdateHistory.BackfillingRequirement import scala.concurrent.* @@ -53,10 +52,6 @@ class DbExternalPartyWalletStore( ), ), domainMigrationInfo, - participantId, - enableissue12777Workaround = false, - enableImportUpdateBackfill = false, - BackfillingRequirement.BackfillingNotRequired, ingestionConfig, ) with ExternalPartyWalletStore @@ -67,10 +62,9 @@ class DbExternalPartyWalletStore( override def toString: String = show"DbExternalPartyWalletStore(externalParty=${key.externalParty})" - override protected def acsContractFilter - : org.lfdecentralizedtrust.splice.store.MultiDomainAcsStore.ContractFilter[ - org.lfdecentralizedtrust.splice.wallet.store.db.WalletTables.ExternalPartyWalletAcsStoreRowData, - AcsInterfaceViewRowData.NoInterfacesIngested, - ] = ExternalPartyWalletStore.contractFilter(key) + override def acsContractFilter: org.lfdecentralizedtrust.splice.store.MultiDomainAcsStore.ContractFilter[ + org.lfdecentralizedtrust.splice.wallet.store.db.WalletTables.ExternalPartyWalletAcsStoreRowData, + AcsInterfaceViewRowData.NoInterfacesIngested, + ] = ExternalPartyWalletStore.contractFilter(key) } diff --git a/apps/wallet/src/main/scala/org/lfdecentralizedtrust/splice/wallet/store/db/DbUserWalletStore.scala b/apps/wallet/src/main/scala/org/lfdecentralizedtrust/splice/wallet/store/db/DbUserWalletStore.scala index 9188324a94..a813b714a3 100644 --- a/apps/wallet/src/main/scala/org/lfdecentralizedtrust/splice/wallet/store/db/DbUserWalletStore.scala +++ b/apps/wallet/src/main/scala/org/lfdecentralizedtrust/splice/wallet/store/db/DbUserWalletStore.scala @@ -45,7 +45,6 @@ import slick.jdbc.canton.ActionBasedSQLInterpolation.Implicits.actionBasedSQLInt import com.digitalasset.canton.resource.DbStorage.Implicits.BuilderChain.toSQLActionBuilderChain import com.digitalasset.canton.topology.{ParticipantId, PartyId} import org.lfdecentralizedtrust.splice.config.IngestionConfig -import org.lfdecentralizedtrust.splice.store.UpdateHistory.BackfillingRequirement import org.lfdecentralizedtrust.splice.store.db.TxLogQueries.TxLogStoreId import slick.jdbc.canton.SQLActionBuilder @@ -96,10 +95,6 @@ class DbUserWalletStore( ), ), domainMigrationInfo, - participantId, - enableissue12777Workaround = true, - enableImportUpdateBackfill = false, - BackfillingRequirement.BackfillingNotRequired, ingestionConfig, ) with UserWalletStore @@ -117,7 +112,7 @@ class DbUserWalletStore( override def toString: String = show"DbUserWalletStore(endUserParty=${key.endUserParty})" - override protected def acsContractFilter + override def acsContractFilter : org.lfdecentralizedtrust.splice.store.MultiDomainAcsStore.ContractFilter[ org.lfdecentralizedtrust.splice.wallet.store.db.WalletTables.UserWalletAcsStoreRowData, org.lfdecentralizedtrust.splice.wallet.store.db.WalletTables.UserWalletAcsInterfaceViewRowData, diff --git a/cluster/helm/splice-sv-node/values.schema.json b/cluster/helm/splice-sv-node/values.schema.json index 723c72030d..96521950d6 100644 --- a/cluster/helm/splice-sv-node/values.schema.json +++ b/cluster/helm/splice-sv-node/values.schema.json @@ -43,6 +43,7 @@ "defaultJvmOptions", "domain", "imageRepo", + "scan", "nodeIdentifier", "onboardingName", "persistence", @@ -287,7 +288,8 @@ "internalUrl": { "type": "string" } - } + }, + "required": ["publicUrl", "internalUrl"] }, "nodeIdentifier": { "type": "string" diff --git a/docs/src/release_notes.rst b/docs/src/release_notes.rst index 3d5beb2b60..e1bedf9a60 100644 --- a/docs/src/release_notes.rst +++ b/docs/src/release_notes.rst @@ -38,6 +38,17 @@ Upcoming required to be the ``receiver`` instead of the ``sender`` of the transfer instruction. Upgrade to ``splice-util-featured-app-proxies`` version ``1.2.1`` or newer to get the fix. +- SV app + + - The SV app will no longer store the update history and such, will not be able to answer historical queries. + All updates involving the DSO party will still be stored and returned by Scan. + + - Deployment + + - The helm values under ``scan``, that is ``publicUrl`` and ``internalUrl`` are now mandatory. + All SVs already deploy scan on DevNet, TestNet and MainNet so this should have no impact. + + 0.5.1 ----- @@ -141,6 +152,7 @@ Note: 0.4.24 was published incorrectly and should be skipped in favor of 0.4.25. is in catchup mode because there are too many markers. Catchup mode only triggers when one or more of the SVs failed to convert the markers assigned to them for too long. + 0.4.21 ------