Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
7b8dc43
Disable UpdateHistory for SV and Splitwell apps
OriolMunoz-da Oct 13, 2025
ad2b047
[ci] when has lazy not fixed a npe
OriolMunoz-da Oct 13, 2025
3aefc00
[ci] remove register duplicates
OriolMunoz-da Oct 14, 2025
695a66e
[ci] DbStorage errywhere
OriolMunoz-da Oct 14, 2025
c1a2fcf
[ci] Docs
OriolMunoz-da Oct 14, 2025
2827e8b
Merge branch 'main' into oriol/non-optional-metrics
OriolMunoz-da Oct 14, 2025
2ff977b
[ci] run pls gh
OriolMunoz-da Oct 14, 2025
428dbac
Merge branch 'main' into oriol/non-optional-metrics
OriolMunoz-da Oct 15, 2025
4a38d9f
[ci] truncate tables for sv app
OriolMunoz-da Oct 15, 2025
f1f167a
Merge branch 'main' into oriol/non-optional-metrics
OriolMunoz-da Oct 20, 2025
2a99661
fix migration
OriolMunoz-da Oct 20, 2025
3f938d6
[ci] chekc update history of externals
OriolMunoz-da Oct 20, 2025
6163ffd
Make scanConfig mandatory in SV app (#2686)
OriolMunoz-da Oct 20, 2025
865e29e
Merge branch 'main' into oriol/non-optional-metrics
OriolMunoz-da Nov 11, 2025
f02341e
Merge branch 'oriol/non-optional-metrics' of github.com:hyperledger-l…
OriolMunoz-da Nov 11, 2025
7ee24fc
[ci] move migration
OriolMunoz-da Nov 11, 2025
44a8269
[ci] update comments
OriolMunoz-da Nov 11, 2025
f1ad5c7
[ci] fixes
OriolMunoz-da Nov 11, 2025
eadbf72
[ci] fix metrics issue?
OriolMunoz-da Nov 11, 2025
7218048
[ci] is this the problem? am i the problem?
OriolMunoz-da Nov 11, 2025
32c9f2d
[ci] fix it properly
OriolMunoz-da Nov 12, 2025
d652463
[ci] not unused
OriolMunoz-da Nov 12, 2025
96524d5
Merge branch 'main' into oriol/non-optional-metrics
OriolMunoz-da Nov 18, 2025
1ce8dc5
[static] run
OriolMunoz-da Nov 18, 2025
6e84fe3
[ci] run
OriolMunoz-da Nov 18, 2025
0cb1b63
[ci] i cannot have nice things for too long
OriolMunoz-da Nov 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ class UpdateHistorySanityCheckPlugin(
interval = Span(100, Millis),
)
eventually {
scan.automation.store.updateHistory
scan.automation.updateHistory
.getBackfillingState()
.futureValue should be(BackfillingState.Complete)
}(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1093,14 +1093,14 @@ class DecentralizedSynchronizerMigrationIntegrationTest

withClueAndLog("Backfilled history includes ACS import") {
eventually() {
sv1ScanLocalBackend.appState.store.updateHistory.sourceHistory
sv1ScanLocalBackend.appState.automation.updateHistory.sourceHistory
.migrationInfo(1L)
.futureValue
.exists(_.complete) should be(true)
}

val backfilledUpdates =
sv1ScanLocalBackend.appState.store.updateHistory
sv1ScanLocalBackend.appState.automation.updateHistory
.getAllUpdates(None, PageLimit.tryCreate(1000))
.futureValue
backfilledUpdates.collect {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,8 @@ class ScanHistoryBackfillingIntegrationTest
)(
"History marked as free of corrupt snapshots",
_ => {
sv1ScanBackend.appState.store.updateHistory.corruptAcsSnapshotsDeleted shouldBe true
sv2ScanBackend.appState.store.updateHistory.corruptAcsSnapshotsDeleted shouldBe true
sv1ScanBackend.appState.automation.updateHistory.corruptAcsSnapshotsDeleted shouldBe true
sv2ScanBackend.appState.automation.updateHistory.corruptAcsSnapshotsDeleted shouldBe true
},
)

Expand All @@ -307,15 +307,15 @@ class ScanHistoryBackfillingIntegrationTest
)(
"Backfilling is complete only on the founding SV",
_ => {
sv1ScanBackend.appState.store.updateHistory
sv1ScanBackend.appState.automation.updateHistory
.getBackfillingState()
.futureValue should be(BackfillingState.Complete)
// Update history is complete at this point, but the status endpoint only reports
// as complete if the txlog is also backfilled
sv1ScanBackend.getBackfillingStatus().complete shouldBe false
readUpdateHistoryFromScan(sv1ScanBackend) should not be empty

sv2ScanBackend.appState.store.updateHistory
sv2ScanBackend.appState.automation.updateHistory
.getBackfillingState()
.futureValue should be(BackfillingState.InProgress(false, false))
sv2ScanBackend.getBackfillingStatus().complete shouldBe false
Expand Down Expand Up @@ -354,12 +354,12 @@ class ScanHistoryBackfillingIntegrationTest
)(
"All backfilling is complete",
_ => {
sv1ScanBackend.appState.store.updateHistory
sv1ScanBackend.appState.automation.updateHistory
.getBackfillingState()
.futureValue should be(BackfillingState.Complete)
// Update history is complete, TxLog is not
sv1ScanBackend.getBackfillingStatus().complete shouldBe false
sv2ScanBackend.appState.store.updateHistory
sv2ScanBackend.appState.automation.updateHistory
.getBackfillingState()
.futureValue should be(BackfillingState.Complete)
// Update history is complete, TxLog is not
Expand Down Expand Up @@ -444,7 +444,7 @@ class ScanHistoryBackfillingIntegrationTest
clue("Compare scan history with participant update stream") {
compareHistory(
sv1Backend.participantClient,
sv1ScanBackend.appState.store.updateHistory,
sv1ScanBackend.appState.automation.updateHistory,
ledgerBeginSv1,
)
}
Expand Down Expand Up @@ -554,7 +554,7 @@ class ScanHistoryBackfillingIntegrationTest

private def allUpdatesFromScanBackend(scanBackend: ScanAppBackendReference) = {
// Need to use the store directly, as the HTTP endpoint refuses to return data unless it's completely backfilled
scanBackend.appState.store.updateHistory
scanBackend.appState.automation.updateHistory
.getAllUpdates(None, PageLimit.tryCreate(1000))
.futureValue
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ class ScanTimeBasedIntegrationTest
"Wait for backfilling to complete, as the ACS snapshot trigger is paused until then"
) {
eventually() {
sv1ScanBackend.automation.store.updateHistory
sv1ScanBackend.automation.updateHistory
.getBackfillingState()
.futureValue should be(BackfillingState.Complete)
advanceTime(sv1ScanBackend.config.automation.pollingInterval.asJava)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ class ScanTotalSupplyBigQueryIntegrationTest
case db: DbStorage => db
case s => fail(s"non-DB storage configured, unsupported for BigQuery: ${s.getClass}")
}
val sourceHistoryId = sv1ScanBackend.appState.store.updateHistory.historyId
val sourceHistoryId = sv1ScanBackend.appState.automation.updateHistory.historyId

copyTableToBigQuery(
"update_history_creates",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ class UpdateHistoryIntegrationTest
eventually() {
compareHistory(
sv1Backend.participantClient,
sv1ScanBackend.appState.store.updateHistory,
sv1ScanBackend.appState.automation.updateHistory,
ledgerBeginSv1,
)
}
Expand All @@ -179,30 +179,16 @@ class UpdateHistoryIntegrationTest
.lookupUserWallet(aliceWalletClient.config.ledgerApiUser)
.futureValue
.getOrElse(throw new RuntimeException("Alice wallet should exist"))
.store
.automation
.updateHistory,
ledgerBeginAlice,
true,
)
}
eventually() {
compareHistory(
sv1Backend.participantClient,
sv1Backend.appState.svStore.updateHistory,
ledgerBeginSv1,
)
}
eventually() {
compareHistory(
sv1Backend.participantClient,
sv1Backend.appState.dsoStore.updateHistory,
ledgerBeginSv1,
)
}
eventually() {
compareHistory(
aliceValidatorBackend.participantClient,
aliceValidatorBackend.appState.store.updateHistory,
aliceValidatorBackend.appState.automation.updateHistory,
ledgerBeginAlice,
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ trait UpdateHistoryTestUtil extends TestCommon {
scanBackend: ScanAppBackendReference,
scanClient: ScanAppClientReference,
): Assertion = {
val historyFromStore = scanBackend.appState.store.updateHistory
val historyFromStore = scanBackend.appState.automation.updateHistory
.getAllUpdates(
None,
PageLimit.tryCreate(1000),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
-- if the only 2 store descriptors belong to the SV app, that means we can truncate the update_history tables

DO $$
DECLARE
descriptors TEXT[];
BEGIN

-- array equality (ordered) ensures that exactly these two, no more, no less, are there <=> it's the SV app
select array_agg(store_name order by store_name) into descriptors
from update_history_descriptors;

IF descriptors = '{"DbSvDsoStore", "DbSvSvStore"}' THEN
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Image

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is more strict than it needs to be - some of the descriptors in the update_history_descriptors might not be used by the tables that we want to truncate.

Also, in case someone has a very stupid production setup where multiple apps write into the same database, this will fail to delete the unused data.

I still think it's fine as is, in real setups it should work and worst case it just leaves around unused data.

RAISE NOTICE 'Truncating update history tables as only SV app descriptors are present. Descriptors: %', descriptors::text;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fun fact, flyway logs these, very useful!

Image

EXECUTE 'TRUNCATE TABLE update_history_assignments CASCADE';
EXECUTE 'TRUNCATE TABLE update_history_unassignments CASCADE';
EXECUTE 'TRUNCATE TABLE update_history_backfilling CASCADE';
EXECUTE 'TRUNCATE TABLE update_history_creates CASCADE';
EXECUTE 'TRUNCATE TABLE update_history_exercises CASCADE';
EXECUTE 'TRUNCATE TABLE update_history_transactions CASCADE';
EXECUTE 'TRUNCATE TABLE update_history_last_ingested_offsets CASCADE';
EXECUTE 'TRUNCATE TABLE update_history_descriptors CASCADE';
ELSE
RAISE NOTICE 'This is not the SV app, NOT truncating update history tables. Descriptors: %', descriptors::text;
END IF;

END $$;
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.reflect.ClassTag

/** Shared base class for running ingestion and task-handler automation in applications. */
abstract class AutomationService(
private val automationConfig: AutomationConfig,
protected val automationConfig: AutomationConfig,
clock: Clock,
domainTimeSync: DomainTimeSynchronization,
domainUnpausedSync: DomainUnpausedSynchronization,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import org.lfdecentralizedtrust.splice.store.{
AppStoreWithIngestion,
DomainTimeSynchronization,
DomainUnpausedSynchronization,
UpdateHistory,
}
import com.digitalasset.canton.time.{Clock, WallClock}
import com.digitalasset.canton.tracing.TraceContext
Expand All @@ -37,7 +38,6 @@ abstract class SpliceAppAutomationService[Store <: AppStore](
ledgerClient: SpliceLedgerClient,
retryProvider: RetryProvider,
ingestFromParticipantBegin: Boolean,
ingestUpdateHistoryFromParticipantBegin: Boolean,
parametersConfig: SpliceParametersConfig,
)(implicit
ec: ExecutionContext,
Expand Down Expand Up @@ -97,6 +97,24 @@ abstract class SpliceAppAutomationService[Store <: AppStore](
case SpliceLedgerConnectionPriority.AmuletExpiry => amuletExpiryConnection
}

final protected def registerUpdateHistoryIngestion(
updateHistory: UpdateHistory,
ingestUpdateHistoryFromParticipantBegin: Boolean,
): Unit = {
registerService(
new UpdateIngestionService(
updateHistory.getClass.getSimpleName,
updateHistory.ingestionSink,
connection(SpliceLedgerConnectionPriority.High),
automationConfig,
backoffClock = triggerContext.pollingClock,
triggerContext.retryProvider,
triggerContext.loggerFactory,
ingestUpdateHistoryFromParticipantBegin,
)
)
}

private def completionOffsetCallback(offset: Long): Future[Unit] =
store.multiDomainAcsStore.signalWhenIngestedOrShutdown(offset)(TraceContext.empty)

Expand All @@ -113,19 +131,6 @@ abstract class SpliceAppAutomationService[Store <: AppStore](
)
)

registerService(
new UpdateIngestionService(
store.updateHistory.getClass.getSimpleName,
store.updateHistory.ingestionSink,
connection(SpliceLedgerConnectionPriority.High),
automationConfig,
backoffClock = triggerContext.pollingClock,
triggerContext.retryProvider,
triggerContext.loggerFactory,
ingestUpdateHistoryFromParticipantBegin,
)
)

registerTrigger(
new DomainIngestionService(
store.domains.ingestionSink,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import com.digitalasset.canton.data.CantonTimestamp
import com.digitalasset.canton.discard.Implicits.DiscardOps
import com.digitalasset.canton.lifecycle.{FutureUnlessShutdown, *}
import com.digitalasset.canton.logging.pretty.{Pretty, PrettyPrinting}
import com.digitalasset.canton.resource.{DbStorage, Storage}
import com.digitalasset.canton.resource.DbStorage
import com.digitalasset.canton.tracing.TraceContext
import io.opentelemetry.api.trace.Tracer
import org.apache.pekko.stream.Materializer
Expand Down Expand Up @@ -150,23 +150,19 @@ class SqlIndexInitializationTrigger(
object SqlIndexInitializationTrigger {

def apply(
storage: Storage,
storage: DbStorage,
triggerContext: TriggerContext,
indexActions: List[IndexAction] = defaultIndexActions,
)(implicit
ec: ExecutionContextExecutor,
tracer: Tracer,
mat: Materializer,
): SqlIndexInitializationTrigger = storage match {
case dbStorage: DbStorage =>
new SqlIndexInitializationTrigger(
dbStorage,
triggerContext,
indexActions,
)
case storageType =>
// Same behavior as in `ScanStore.apply` and similar - we only really support DbStorage in our apps.
throw new RuntimeException(s"Unsupported storage type $storageType")
): SqlIndexInitializationTrigger = {
new SqlIndexInitializationTrigger(
storage,
triggerContext,
indexActions,
)
}

sealed trait IndexStatus
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import org.lfdecentralizedtrust.splice.store.{
HistoryMetrics,
TxLogAppStore,
TxLogBackfilling,
UpdateHistory,
}
import com.digitalasset.canton.logging.pretty.{Pretty, PrettyPrinting}
import com.digitalasset.canton.topology.PartyId
Expand All @@ -23,6 +24,7 @@ import scala.concurrent.{ExecutionContext, Future}

class TxLogBackfillingTrigger[TXE](
store: TxLogAppStore[TXE],
updateHistory: UpdateHistory,
batchSize: Int,
override protected val context: TriggerContext,
)(implicit
Expand All @@ -31,13 +33,13 @@ class TxLogBackfillingTrigger[TXE](
mat: Materializer,
) extends PollingParallelTaskExecutionTrigger[TxLogBackfillingTrigger.Task] {

private def party: PartyId = store.updateHistory.updateStreamParty
private def party: PartyId = updateHistory.updateStreamParty

override protected def extraMetricLabels = Seq(
"party" -> party.toProtoPrimitive
)

private val currentMigrationId = store.updateHistory.domainMigrationInfo.currentMigrationId
private val currentMigrationId = updateHistory.domainMigrationInfo.currentMigrationId

private val historyMetrics = new HistoryMetrics(context.metricsFactory)(
MetricsContext.Empty
Expand All @@ -48,23 +50,23 @@ class TxLogBackfillingTrigger[TXE](
)
private val backfilling = new TxLogBackfilling(
store.multiDomainAcsStore,
store.updateHistory,
updateHistory,
batchSize,
context.loggerFactory,
)

override def retrieveTasks()(implicit
tc: TraceContext
): Future[Seq[TxLogBackfillingTrigger.Task]] = {
if (!store.updateHistory.isReady) {
if (!updateHistory.isReady) {
logger.debug("UpdateHistory is not yet ready")
Future.successful(Seq.empty)
} else if (!store.multiDomainAcsStore.destinationHistory.isReady) {
logger.debug("MultiDomainAcsStore is not yet ready")
Future.successful(Seq.empty)
} else {
for {
sourceState <- store.updateHistory.getBackfillingState()
sourceState <- updateHistory.getBackfillingState()
destinationState <- store.multiDomainAcsStore.getTxLogBackfillingState()
} yield {
sourceState match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import com.digitalasset.canton.crypto.Crypto
import com.digitalasset.canton.environment.{CantonNode, CantonNodeBootstrap, CantonNodeParameters}
import com.digitalasset.canton.lifecycle.{HasCloseContext, LifeCycle}
import com.digitalasset.canton.logging.NamedLoggerFactory
import com.digitalasset.canton.resource.StorageFactory
import com.digitalasset.canton.resource.{DbStorage, StorageFactory}
import com.digitalasset.canton.telemetry.ConfiguredOpenTelemetry
import com.digitalasset.canton.time.Clock
import com.digitalasset.canton.tracing.{NoTracing, TracerProvider}
Expand Down Expand Up @@ -102,7 +102,10 @@ abstract class NodeBootstrapBase[
nodeMetrics.storageMetrics,
parameterConfig.processingTimeouts,
loggerFactory,
)
) match {
case storage: DbStorage => storage
case storageType => throw new RuntimeException(s"Unsupported storage type $storageType")
}
protected val httpAdminService: HttpAdminService =
HttpAdminService(
nodeConfig.nodeTypeName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,18 @@ import scala.concurrent.ExecutionContext
*/
trait AppStore extends NamedLogging with AutoCloseable with StoreErrors {

val storeName: String

implicit protected def ec: ExecutionContext

/** Defines which create events are to be ingested into the store. */
protected def acsContractFilter
def acsContractFilter
: MultiDomainAcsStore.ContractFilter[_ <: AcsRowData, _ <: AcsInterfaceViewRowData]

def domains: SynchronizerStore

def multiDomainAcsStore: MultiDomainAcsStore

def updateHistory: UpdateHistory
}

trait TxLogAppStore[TXE] extends AppStore {
Expand Down
Loading
Loading