Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
7b8dc43
Disable UpdateHistory for SV and Splitwell apps
OriolMunoz-da Oct 13, 2025
ad2b047
[ci] when has lazy not fixed a npe
OriolMunoz-da Oct 13, 2025
3aefc00
[ci] remove register duplicates
OriolMunoz-da Oct 14, 2025
695a66e
[ci] DbStorage errywhere
OriolMunoz-da Oct 14, 2025
c1a2fcf
[ci] Docs
OriolMunoz-da Oct 14, 2025
2827e8b
Merge branch 'main' into oriol/non-optional-metrics
OriolMunoz-da Oct 14, 2025
2ff977b
[ci] run pls gh
OriolMunoz-da Oct 14, 2025
428dbac
Merge branch 'main' into oriol/non-optional-metrics
OriolMunoz-da Oct 15, 2025
4a38d9f
[ci] truncate tables for sv app
OriolMunoz-da Oct 15, 2025
f1f167a
Merge branch 'main' into oriol/non-optional-metrics
OriolMunoz-da Oct 20, 2025
2a99661
fix migration
OriolMunoz-da Oct 20, 2025
3f938d6
[ci] chekc update history of externals
OriolMunoz-da Oct 20, 2025
6163ffd
Make scanConfig mandatory in SV app (#2686)
OriolMunoz-da Oct 20, 2025
865e29e
Merge branch 'main' into oriol/non-optional-metrics
OriolMunoz-da Nov 11, 2025
f02341e
Merge branch 'oriol/non-optional-metrics' of github.com:hyperledger-l…
OriolMunoz-da Nov 11, 2025
7ee24fc
[ci] move migration
OriolMunoz-da Nov 11, 2025
44a8269
[ci] update comments
OriolMunoz-da Nov 11, 2025
f1ad5c7
[ci] fixes
OriolMunoz-da Nov 11, 2025
eadbf72
[ci] fix metrics issue?
OriolMunoz-da Nov 11, 2025
7218048
[ci] is this the problem? am i the problem?
OriolMunoz-da Nov 11, 2025
32c9f2d
[ci] fix it properly
OriolMunoz-da Nov 12, 2025
d652463
[ci] not unused
OriolMunoz-da Nov 12, 2025
96524d5
Merge branch 'main' into oriol/non-optional-metrics
OriolMunoz-da Nov 18, 2025
1ce8dc5
[static] run
OriolMunoz-da Nov 18, 2025
6e84fe3
[ci] run
OriolMunoz-da Nov 18, 2025
0cb1b63
[ci] i cannot have nice things for too long
OriolMunoz-da Nov 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ class UpdateHistorySanityCheckPlugin(
interval = Span(100, Millis),
)
eventually {
scan.automation.store.updateHistory
scan.automation.updateHistory
.getBackfillingState()
.futureValue should be(BackfillingState.Complete)
}(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1093,14 +1093,14 @@ class DecentralizedSynchronizerMigrationIntegrationTest

withClueAndLog("Backfilled history includes ACS import") {
eventually() {
sv1ScanLocalBackend.appState.store.updateHistory.sourceHistory
sv1ScanLocalBackend.appState.automation.updateHistory.sourceHistory
.migrationInfo(1L)
.futureValue
.exists(_.complete) should be(true)
}

val backfilledUpdates =
sv1ScanLocalBackend.appState.store.updateHistory
sv1ScanLocalBackend.appState.automation.updateHistory
.getAllUpdates(None, PageLimit.tryCreate(1000))
.futureValue
backfilledUpdates.collect {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,8 +281,8 @@ class ScanHistoryBackfillingIntegrationTest
)(
"History marked as free of corrupt snapshots",
_ => {
sv1ScanBackend.appState.store.updateHistory.corruptAcsSnapshotsDeleted shouldBe true
sv2ScanBackend.appState.store.updateHistory.corruptAcsSnapshotsDeleted shouldBe true
sv1ScanBackend.appState.automation.updateHistory.corruptAcsSnapshotsDeleted shouldBe true
sv2ScanBackend.appState.automation.updateHistory.corruptAcsSnapshotsDeleted shouldBe true
},
)

Expand All @@ -309,15 +309,15 @@ class ScanHistoryBackfillingIntegrationTest
)(
"Backfilling is complete only on the founding SV",
_ => {
sv1ScanBackend.appState.store.updateHistory
sv1ScanBackend.appState.automation.updateHistory
.getBackfillingState()
.futureValue should be(BackfillingState.Complete)
// Update history is complete at this point, but the status endpoint only reports
// as complete if the txlog is also backfilled
sv1ScanBackend.getBackfillingStatus().complete shouldBe false
readUpdateHistoryFromScan(sv1ScanBackend) should not be empty

sv2ScanBackend.appState.store.updateHistory
sv2ScanBackend.appState.automation.updateHistory
.getBackfillingState()
.futureValue should be(BackfillingState.InProgress(false, false))
sv2ScanBackend.getBackfillingStatus().complete shouldBe false
Expand Down Expand Up @@ -356,12 +356,12 @@ class ScanHistoryBackfillingIntegrationTest
)(
"All backfilling is complete",
_ => {
sv1ScanBackend.appState.store.updateHistory
sv1ScanBackend.appState.automation.updateHistory
.getBackfillingState()
.futureValue should be(BackfillingState.Complete)
// Update history is complete, TxLog is not
sv1ScanBackend.getBackfillingStatus().complete shouldBe false
sv2ScanBackend.appState.store.updateHistory
sv2ScanBackend.appState.automation.updateHistory
.getBackfillingState()
.futureValue should be(BackfillingState.Complete)
// Update history is complete, TxLog is not
Expand Down Expand Up @@ -446,7 +446,7 @@ class ScanHistoryBackfillingIntegrationTest
clue("Compare scan history with participant update stream") {
compareHistory(
sv1Backend.participantClient,
sv1ScanBackend.appState.store.updateHistory,
sv1ScanBackend.appState.automation.updateHistory,
ledgerBeginSv1,
)
}
Expand Down Expand Up @@ -556,7 +556,7 @@ class ScanHistoryBackfillingIntegrationTest

private def allUpdatesFromScanBackend(scanBackend: ScanAppBackendReference) = {
// Need to use the store directly, as the HTTP endpoint refuses to return data unless it's completely backfilled
scanBackend.appState.store.updateHistory
scanBackend.appState.automation.updateHistory
.getAllUpdates(None, PageLimit.tryCreate(1000))
.futureValue
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ class ScanTimeBasedIntegrationTest
"Wait for backfilling to complete, as the ACS snapshot trigger is paused until then"
) {
eventually() {
sv1ScanBackend.automation.store.updateHistory
sv1ScanBackend.automation.updateHistory
.getBackfillingState()
.futureValue should be(BackfillingState.Complete)
advanceTime(sv1ScanBackend.config.automation.pollingInterval.asJava)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ class ScanTotalSupplyBigQueryIntegrationTest
case db: DbStorage => db
case s => fail(s"non-DB storage configured, unsupported for BigQuery: ${s.getClass}")
}
val sourceHistoryId = sv1ScanBackend.appState.store.updateHistory.historyId
val sourceHistoryId = sv1ScanBackend.appState.automation.updateHistory.historyId

copyTableToBigQuery(
"update_history_creates",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ class UpdateHistoryIntegrationTest
eventually() {
compareHistory(
sv1Backend.participantClient,
sv1ScanBackend.appState.store.updateHistory,
sv1ScanBackend.appState.automation.updateHistory,
ledgerBeginSv1,
)
}
Expand All @@ -181,30 +181,16 @@ class UpdateHistoryIntegrationTest
.lookupUserWallet(aliceWalletClient.config.ledgerApiUser)
.futureValue
.getOrElse(throw new RuntimeException("Alice wallet should exist"))
.store
.automation
.updateHistory,
ledgerBeginAlice,
true,
)
}
eventually() {
compareHistory(
sv1Backend.participantClient,
sv1Backend.appState.svStore.updateHistory,
ledgerBeginSv1,
)
}
eventually() {
compareHistory(
sv1Backend.participantClient,
sv1Backend.appState.dsoStore.updateHistory,
ledgerBeginSv1,
)
}
eventually() {
compareHistory(
aliceValidatorBackend.participantClient,
aliceValidatorBackend.appState.store.updateHistory,
aliceValidatorBackend.appState.automation.updateHistory,
ledgerBeginAlice,
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ trait UpdateHistoryTestUtil extends TestCommon {
scanBackend: ScanAppBackendReference,
scanClient: ScanAppClientReference,
): Assertion = {
val historyFromStore = scanBackend.appState.store.updateHistory
val historyFromStore = scanBackend.appState.automation.updateHistory
.getAllUpdates(
None,
PageLimit.tryCreate(1000),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.reflect.ClassTag

/** Shared base class for running ingestion and task-handler automation in applications. */
abstract class AutomationService(
private val automationConfig: AutomationConfig,
protected val automationConfig: AutomationConfig,
clock: Clock,
domainTimeSync: DomainTimeSynchronization,
domainUnpausedSync: DomainUnpausedSynchronization,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ abstract class SpliceAppAutomationService[Store <: AppStore](
ledgerClient: SpliceLedgerClient,
retryProvider: RetryProvider,
ingestFromParticipantBegin: Boolean,
ingestUpdateHistoryFromParticipantBegin: Boolean,
parametersConfig: SpliceParametersConfig,
)(implicit
ec: ExecutionContext,
Expand Down Expand Up @@ -113,19 +112,6 @@ abstract class SpliceAppAutomationService[Store <: AppStore](
)
)

registerService(
new UpdateIngestionService(
store.updateHistory.getClass.getSimpleName,
store.updateHistory.ingestionSink,
connection(SpliceLedgerConnectionPriority.High),
automationConfig,
backoffClock = triggerContext.pollingClock,
triggerContext.retryProvider,
triggerContext.loggerFactory,
ingestUpdateHistoryFromParticipantBegin,
)
)

registerTrigger(
new DomainIngestionService(
store.domains.ingestionSink,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import org.lfdecentralizedtrust.splice.store.{
HistoryMetrics,
TxLogAppStore,
TxLogBackfilling,
UpdateHistory,
}
import com.digitalasset.canton.logging.pretty.{Pretty, PrettyPrinting}
import com.digitalasset.canton.topology.PartyId
Expand All @@ -23,6 +24,7 @@ import scala.concurrent.{ExecutionContext, Future}

class TxLogBackfillingTrigger[TXE](
store: TxLogAppStore[TXE],
updateHistory: UpdateHistory,
batchSize: Int,
override protected val context: TriggerContext,
)(implicit
Expand All @@ -31,13 +33,13 @@ class TxLogBackfillingTrigger[TXE](
mat: Materializer,
) extends PollingParallelTaskExecutionTrigger[TxLogBackfillingTrigger.Task] {

private def party: PartyId = store.updateHistory.updateStreamParty
private def party: PartyId = updateHistory.updateStreamParty

override protected def extraMetricLabels = Seq(
"party" -> party.toProtoPrimitive
)

private val currentMigrationId = store.updateHistory.domainMigrationInfo.currentMigrationId
private val currentMigrationId = updateHistory.domainMigrationInfo.currentMigrationId

private val historyMetrics = new HistoryMetrics(context.metricsFactory)(
MetricsContext.Empty
Expand All @@ -48,23 +50,23 @@ class TxLogBackfillingTrigger[TXE](
)
private val backfilling = new TxLogBackfilling(
store.multiDomainAcsStore,
store.updateHistory,
updateHistory,
batchSize,
context.loggerFactory,
)

override def retrieveTasks()(implicit
tc: TraceContext
): Future[Seq[TxLogBackfillingTrigger.Task]] = {
if (!store.updateHistory.isReady) {
if (!updateHistory.isReady) {
logger.debug("UpdateHistory is not yet ready")
Future.successful(Seq.empty)
} else if (!store.multiDomainAcsStore.destinationHistory.isReady) {
logger.debug("MultiDomainAcsStore is not yet ready")
Future.successful(Seq.empty)
} else {
for {
sourceState <- store.updateHistory.getBackfillingState()
sourceState <- updateHistory.getBackfillingState()
destinationState <- store.multiDomainAcsStore.getTxLogBackfillingState()
} yield {
sourceState match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import com.digitalasset.canton.crypto.Crypto
import com.digitalasset.canton.environment.{CantonNode, CantonNodeBootstrap, CantonNodeParameters}
import com.digitalasset.canton.lifecycle.{HasCloseContext, LifeCycle}
import com.digitalasset.canton.logging.NamedLoggerFactory
import com.digitalasset.canton.resource.StorageFactory
import com.digitalasset.canton.resource.{DbStorage, StorageFactory}
import com.digitalasset.canton.telemetry.ConfiguredOpenTelemetry
import com.digitalasset.canton.time.Clock
import com.digitalasset.canton.tracing.{NoTracing, TracerProvider}
Expand Down Expand Up @@ -102,7 +102,10 @@ abstract class NodeBootstrapBase[
nodeMetrics.storageMetrics,
parameterConfig.processingTimeouts,
loggerFactory,
)
) match {
case storage: DbStorage => storage
case storageType => throw new RuntimeException(s"Unsupported storage type $storageType")
}
protected val httpAdminService: HttpAdminService =
HttpAdminService(
nodeConfig.nodeTypeName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,18 @@ import scala.concurrent.ExecutionContext
*/
trait AppStore extends NamedLogging with AutoCloseable with StoreErrors {

val storeName: String

implicit protected def ec: ExecutionContext

/** Defines which create events are to be ingested into the store. */
protected def acsContractFilter
def acsContractFilter
: MultiDomainAcsStore.ContractFilter[_ <: AcsRowData, _ <: AcsInterfaceViewRowData]

def domains: SynchronizerStore

def multiDomainAcsStore: MultiDomainAcsStore

def updateHistory: UpdateHistory
}

trait TxLogAppStore[TXE] extends AppStore {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,3 +209,9 @@ class HistoryMetrics(metricsFactory: LabeledMetricsFactory)(implicit
ImportUpdatesBackfilling.completed.close()
}
}

object HistoryMetrics {
def apply(metricsFactory: LabeledMetricsFactory, currentMigrationId: Long) = new HistoryMetrics(
metricsFactory
)(MetricsContext("current_migration_id" -> currentMigrationId.toString))
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,15 @@ class UpdateHistory(
override protected val loggerFactory: NamedLoggerFactory,
enableissue12777Workaround: Boolean,
enableImportUpdateBackfill: Boolean,
val oMetrics: Option[HistoryMetrics] = None,
metrics: HistoryMetrics,
)(implicit
ec: ExecutionContext,
closeContext: CloseContext,
) extends HasIngestionSink
with AcsJdbcTypes
with AcsQueries
with NamedLogging {
with NamedLogging
with AutoCloseable {

override lazy val profile: JdbcProfile = storage.api.jdbcProfile

Expand Down Expand Up @@ -133,6 +134,8 @@ class UpdateHistory(

def isReady: Boolean = state.get().historyId.isDefined

override def close(): Unit = metrics.close()

lazy val ingestionSink: MultiDomainAcsStore.IngestionSink =
new MultiDomainAcsStore.IngestionSink {
override def ingestionFilter: IngestionFilter = IngestionFilter(
Expand Down Expand Up @@ -442,7 +445,7 @@ class UpdateHistory(
val safeParticipantOffset = lengthLimited(LegacyOffset.Api.fromLong(reassignment.offset))
val safeUnassignId = lengthLimited(event.unassignId)
val safeContractId = lengthLimited(event.contractId.contractId)
oMetrics.foreach(_.UpdateHistory.unassignments.mark())
metrics.UpdateHistory.unassignments.mark()
sqlu"""
insert into update_history_unassignments(
history_id,update_id,record_time,
Expand Down Expand Up @@ -488,7 +491,7 @@ class UpdateHistory(
val safeCreatedAt = CantonTimestamp.assertFromInstant(event.createdEvent.createdAt)
val safeSignatories = event.createdEvent.getSignatories.asScala.toSeq.map(lengthLimited)
val safeObservers = event.createdEvent.getObservers.asScala.toSeq.map(lengthLimited)
oMetrics.foreach(_.UpdateHistory.assignments.mark())
metrics.UpdateHistory.assignments.mark()
sqlu"""
insert into update_history_assignments(
history_id,update_id,record_time,
Expand Down Expand Up @@ -518,7 +521,7 @@ class UpdateHistory(
tree: TransactionTree,
migrationId: Long,
): DBIOAction[?, NoStream, Effect.Read & Effect.Write] = {
oMetrics.foreach(_.UpdateHistory.transactionsTrees.mark())
metrics.UpdateHistory.transactionsTrees.mark()
insertTransactionUpdateRow(tree, migrationId).flatMap(updateRowId => {
// Note: the order of elements in the eventsById map doesn't matter, and is not preserved here.
// The order of elements in the rootEventIds and childEventIds lists DOES matter, and needs to be preserved.
Expand Down
Loading
Loading