Skip to content

Commit 93d50d9

Browse files
committed
Don't rollback update history after migration
[ci] fixes https://github.com/DACH-NY/canton-network-internal/issues/1490 This essentially reverts DACH-NY/canton-network-node#11483 which we can do because we switched to pausing through mediator timeout = 0. Signed-off-by: Moritz Kiefer <moritz.kiefer@purelyfunctional.org>
1 parent 506f2c1 commit 93d50d9

File tree

26 files changed

+182
-25
lines changed

26 files changed

+182
-25
lines changed

apps/common/src/main/scala/org/lfdecentralizedtrust/splice/migration/DomainMigrationInfo.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,12 @@ import scala.concurrent.{ExecutionContext, Future}
1919
* @param acsRecordTime The record time at which the ACS snapshot was taken on the previous
2020
* incarnation of the global domain.
2121
* None if this is the first incarnation of the global domain.
22+
* @param synchronizerWasPaused True when we ran through a proper migration, false for disaster recovery
2223
*/
2324
final case class DomainMigrationInfo(
2425
currentMigrationId: Long,
2526
acsRecordTime: Option[CantonTimestamp],
27+
synchronizerWasPaused: Boolean,
2628
)
2729

2830
object DomainMigrationInfo {
@@ -37,6 +39,7 @@ object DomainMigrationInfo {
3739
BaseLedgerConnection.DOMAIN_MIGRATION_ACS_RECORD_TIME_METADATA_KEY -> info.acsRecordTime
3840
.fold("*")(_.toProtoPrimitive.toString),
3941
BaseLedgerConnection.DOMAIN_MIGRATION_CURRENT_MIGRATION_ID_METADATA_KEY -> info.currentMigrationId.toString,
42+
BaseLedgerConnection.DOMAIN_MIGRATION_DOMAIN_WAS_PAUSED_KEY -> info.synchronizerWasPaused.toString,
4043
),
4144
RetryFor.WaitingOnInitDependency,
4245
)
@@ -68,9 +71,16 @@ object DomainMigrationInfo {
6871
BaseLedgerConnection.DOMAIN_MIGRATION_CURRENT_MIGRATION_ID_METADATA_KEY,
6972
)
7073
.map(_.toLong)
74+
synchronizerWasPaused <- connection
75+
.waitForUserMetadata(
76+
user,
77+
BaseLedgerConnection.DOMAIN_MIGRATION_DOMAIN_WAS_PAUSED_KEY,
78+
)
79+
.map(_.toBoolean)
7180
} yield DomainMigrationInfo(
7281
currentMigrationId = currentMigrationId,
7382
acsRecordTime = acsRecordTime,
83+
synchronizerWasPaused = synchronizerWasPaused,
7484
)
7585
}
7686
}

apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/UpdateHistory.scala

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -746,15 +746,75 @@ class UpdateHistory(
746746
domainMigrationInfo.acsRecordTime match {
747747
case Some(acsRecordTime) =>
748748
for {
749+
// FIXME
749750
_ <- deleteAcsSnapshotsAfter(historyId, previousMigrationId, acsRecordTime)
750-
_ <- deleteRolledBackUpdateHistory(historyId, previousMigrationId, acsRecordTime)
751+
_ <-
752+
if (domainMigrationInfo.synchronizerWasPaused) {
753+
verifyNoRolledBackData(historyId, previousMigrationId, acsRecordTime)
754+
} else {
755+
deleteRolledBackUpdateHistory(historyId, previousMigrationId, acsRecordTime)
756+
}
751757
} yield ()
752758
case _ =>
753759
logger.debug("No previous domain migration, not checking or deleting updates")
754760
Future.unit
755761
}
756762
}
757763

764+
private[this] def verifyNoRolledBackData(
765+
historyId: Long, // Not using the storeId from the state, as the state might not be updated yet
766+
migrationId: Long,
767+
recordTime: CantonTimestamp,
768+
)(implicit tc: TraceContext): Future[Unit] = {
769+
val action = DBIO
770+
.sequence(
771+
Seq(
772+
sql"""
773+
select count(*) from update_history_creates
774+
where update_row_id in (
775+
select row_id
776+
from update_history_transactions
777+
where history_id = $historyId and migration_id = $migrationId and record_time > $recordTime
778+
)
779+
""".as[Long].head,
780+
sql"""
781+
select count(*) from update_history_exercises
782+
where update_row_id in (
783+
select row_id
784+
from update_history_transactions
785+
where history_id = $historyId and migration_id = $migrationId and record_time > $recordTime
786+
)
787+
""".as[Long].head,
788+
sql"""
789+
select count(*) from update_history_transactions
790+
where history_id = $historyId and migration_id = $migrationId and record_time > $recordTime
791+
""".as[Long].head,
792+
sql"""
793+
select count(*) from update_history_assignments
794+
where history_id = $historyId and migration_id = $migrationId and record_time > $recordTime
795+
""".as[Long].head,
796+
sql"""
797+
select count(*) from update_history_unassignments
798+
where history_id = $historyId and migration_id = $migrationId and record_time > $recordTime
799+
""".as[Long].head,
800+
)
801+
)
802+
.map(rows =>
803+
if (rows.sum > 0) {
804+
logger.error(
805+
s"Found $rows rows for $updateStreamParty where migration_id = $migrationId and record_time > $recordTime, " +
806+
"but the configuration says the domain was paused during the migration. " +
807+
"Check the domain migration configuration and the content of the update history database."
808+
)
809+
} else {
810+
logger.debug(
811+
s"No updates found for $updateStreamParty where migration_id = $migrationId and record_time > $recordTime"
812+
)
813+
}
814+
)
815+
storage.query(action, "deleteRolledBackUpdateHistory")
816+
}
817+
758818
private[this] def deleteRolledBackUpdateHistory(
759819
historyId: Long, // Not using the storeId from the state, as the state might not be updated yet
760820
migrationId: Long,

apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/db/DbMultiDomainAcsStore.scala

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1911,14 +1911,47 @@ final class DbMultiDomainAcsStore[TXE](
19111911
val previousMigrationId = domainMigrationInfo.currentMigrationId - 1
19121912
domainMigrationInfo.acsRecordTime match {
19131913
case Some(acsRecordTime) =>
1914-
deleteRolledBackTxLogEntries(txLogStoreId, previousMigrationId, acsRecordTime)
1914+
if (domainMigrationInfo.synchronizerWasPaused) {
1915+
verifyNoRolledBackData(txLogStoreId, previousMigrationId, acsRecordTime)
1916+
} else {
1917+
deleteRolledBackTxLogEntries(txLogStoreId, previousMigrationId, acsRecordTime)
1918+
}
19151919
case _ =>
19161920
logger.debug("No previous domain migration, not checking or deleting txlog entries")
19171921
Future.unit
19181922
}
19191923
}
19201924
}
19211925

1926+
private[this] def verifyNoRolledBackData(
1927+
txLogStoreId: TxLogStoreId, // Not using the storeId from the state, as the state might not be updated yet
1928+
migrationId: Long,
1929+
recordTime: CantonTimestamp,
1930+
)(implicit tc: TraceContext) = {
1931+
val action =
1932+
sql"""
1933+
select count(*) from #$txLogTableName
1934+
where store_id = $txLogStoreId and migration_id = $migrationId and record_time > $recordTime
1935+
"""
1936+
.as[Long]
1937+
.head
1938+
.map(rows =>
1939+
if (rows > 0) {
1940+
logger.error(
1941+
s"Found $rows rows for $txLogStoreDescriptor where migration_id = $migrationId and record_time > $recordTime, " +
1942+
"but the configuration says the domain was paused during the migration. " +
1943+
"Check the domain migration configuration and the content of the txlog database."
1944+
)
1945+
throw new RuntimeException("FIXME")
1946+
} else {
1947+
logger.debug(
1948+
s"No txlog entries found for $txLogStoreDescriptor where migration_id = $migrationId and record_time > $recordTime"
1949+
)
1950+
}
1951+
)
1952+
storage.query(action, "deleteRolledBackTxLogEntries")
1953+
}
1954+
19221955
private[this] def deleteRolledBackTxLogEntries(
19231956
txLogStoreId: TxLogStoreId, // Not using the storeId from the state, as the state might not be updated yet
19241957
migrationId: Long,

apps/common/src/test/scala/org/lfdecentralizedtrust/splice/store/TxLogBackfillingStoreTest.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -551,6 +551,8 @@ class TxLogBackfillingStoreTest
551551
DomainMigrationInfo(
552552
migrationId,
553553
None,
554+
// FIXME
555+
synchronizerWasPaused = true,
554556
),
555557
"TxLogBackfillingStoreTest",
556558
participantId,
@@ -589,6 +591,8 @@ class TxLogBackfillingStoreTest
589591
DomainMigrationInfo(
590592
migrationId,
591593
None,
594+
// FIXME
595+
synchronizerWasPaused = true,
592596
),
593597
participantId,
594598
RetryProvider(loggerFactory, timeouts, FutureSupervisor.Noop, NoOpMetricsFactory),

apps/common/src/test/scala/org/lfdecentralizedtrust/splice/store/UpdateHistoryTestBase.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,8 @@ abstract class UpdateHistoryTestBase
245245
DomainMigrationInfo(
246246
domainMigrationId,
247247
None,
248+
// FIXME
249+
synchronizerWasPaused = true,
248250
),
249251
storeName,
250252
participantId,

apps/common/src/test/scala/org/lfdecentralizedtrust/splice/store/db/DbMultiDomainAcsStoreTest.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -650,6 +650,8 @@ class DbMultiDomainAcsStoreTest
650650
DomainMigrationInfo(
651651
migrationId,
652652
None,
653+
// FIXME
654+
synchronizerWasPaused = true,
653655
),
654656
participantId,
655657
RetryProvider(loggerFactory, timeouts, FutureSupervisor.Noop, NoOpMetricsFactory),

apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/scan/store/ScanEventStoreTest.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -747,7 +747,8 @@ class ScanEventStoreTest extends StoreTest with HasExecutionContext with SpliceP
747747
val participantId = mkParticipantId("ScanEventStoreTest")
748748
val uh = new UpdateHistory(
749749
storage.underlying,
750-
new DomainMigrationInfo(migrationId, None),
750+
// FIXME
751+
new DomainMigrationInfo(migrationId, None, synchronizerWasPaused = true),
751752
"scan_event_store_test",
752753
participantId,
753754
dsoParty,

apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/store/db/AcsSnapshotStoreTest.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1018,7 +1018,8 @@ class AcsSnapshotStoreTest
10181018
): Future[UpdateHistory] = {
10191019
val updateHistory = new UpdateHistory(
10201020
storage.underlying, // not under test
1021-
new DomainMigrationInfo(migrationId, None),
1021+
// FIXME
1022+
new DomainMigrationInfo(migrationId, None, synchronizerWasPaused = true),
10221023
"update_history_acs_snapshot_test",
10231024
mkParticipantId(participantId),
10241025
dsoParty,

apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/store/db/ScanAggregatorTest.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -975,6 +975,8 @@ class ScanAggregatorTest
975975
DomainMigrationInfo(
976976
0,
977977
None,
978+
// FIXME
979+
synchronizerWasPaused = true,
978980
),
979981
participantId = mkParticipantId("ScanAggregatorTest"),
980982
enableImportUpdateBackfill = true,

apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/store/db/ScanStoreTest.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2351,6 +2351,8 @@ class DbScanStoreTest
23512351
DomainMigrationInfo(
23522352
domainMigrationId,
23532353
None,
2354+
// FIXME
2355+
synchronizerWasPaused = true,
23542356
),
23552357
participantId = mkParticipantId("ScanStoreTest"),
23562358
enableImportUpdateBackfill = true,

0 commit comments

Comments
 (0)