Skip to content

Commit ee48e61

Browse files
moritzkiefer-dacocreatureOriolMunoz-da
authored
Don't rollback update history after migration (#2674)
* Don't rollback update history after migration [ci] fixes DACH-NY/canton-network-internal#1490 This essentially reverts DACH-NY/canton-network-node#11483 which we can do because we switched to pausing through mediator timeout = 0. Signed-off-by: Moritz Kiefer <moritz.kiefer@purelyfunctional.org> * Log snapshot record times for debugging [ci] Signed-off-by: Moritz Kiefer <moritz.kiefer@purelyfunctional.org> * Fix ACS snapshots on short-lived migrations (#2781) [ci] --------- Signed-off-by: Oriol Muñoz <oriol.munoz@digitalasset.com> Signed-off-by: Moritz Kiefer <moritz.kiefer@purelyfunctional.org> --------- Signed-off-by: Moritz Kiefer <moritz.kiefer@purelyfunctional.org> Signed-off-by: Oriol Muñoz <oriol.munoz@digitalasset.com> Co-authored-by: Moritz Kiefer <moritz.kiefer@purelyfunctional.org> Co-authored-by: Oriol Muñoz <oriol.munoz@digitalasset.com>
1 parent 3206b4e commit ee48e61

File tree

20 files changed

+447
-66
lines changed

20 files changed

+447
-66
lines changed

apps/common/src/main/scala/org/lfdecentralizedtrust/splice/migration/DomainMigrationInfo.scala

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,20 @@ import scala.concurrent.{ExecutionContext, Future}
1616
/** Holds information about a hard domain migration
1717
*
1818
* @param currentMigrationId The migration id of the current incarnation of the global domain.
19-
* @param acsRecordTime The record time at which the ACS snapshot was taken on the previous
20-
* incarnation of the global domain.
21-
* None if this is the first incarnation of the global domain.
2219
*/
2320
final case class DomainMigrationInfo(
2421
currentMigrationId: Long,
25-
acsRecordTime: Option[CantonTimestamp],
22+
migrationTimeInfo: Option[MigrationTimeInfo],
23+
)
24+
25+
/** @param acsRecordTime The record time at which the ACS snapshot was taken on the previous
26+
* incarnation of the global domain.
27+
* None if this is the first incarnation of the global domain.
28+
* @param synchronizerWasPaused True when we ran through a proper migration, false for disaster recovery
29+
*/
30+
final case class MigrationTimeInfo(
31+
acsRecordTime: CantonTimestamp,
32+
synchronizerWasPaused: Boolean,
2633
)
2734

2835
object DomainMigrationInfo {
@@ -34,9 +41,13 @@ object DomainMigrationInfo {
3441
connection.ensureUserMetadataAnnotation(
3542
user,
3643
Map(
37-
BaseLedgerConnection.DOMAIN_MIGRATION_ACS_RECORD_TIME_METADATA_KEY -> info.acsRecordTime
44+
BaseLedgerConnection.DOMAIN_MIGRATION_ACS_RECORD_TIME_METADATA_KEY -> info.migrationTimeInfo
45+
.map(_.acsRecordTime)
3846
.fold("*")(_.toProtoPrimitive.toString),
3947
BaseLedgerConnection.DOMAIN_MIGRATION_CURRENT_MIGRATION_ID_METADATA_KEY -> info.currentMigrationId.toString,
48+
BaseLedgerConnection.DOMAIN_MIGRATION_DOMAIN_WAS_PAUSED_KEY -> info.migrationTimeInfo
49+
.map(_.synchronizerWasPaused)
50+
.fold("*")(_.toString),
4051
),
4152
RetryFor.WaitingOnInitDependency,
4253
)
@@ -68,9 +79,22 @@ object DomainMigrationInfo {
6879
BaseLedgerConnection.DOMAIN_MIGRATION_CURRENT_MIGRATION_ID_METADATA_KEY,
6980
)
7081
.map(_.toLong)
82+
synchronizerWasPaused <- connection
83+
.waitForUserMetadata(
84+
user,
85+
BaseLedgerConnection.DOMAIN_MIGRATION_DOMAIN_WAS_PAUSED_KEY,
86+
)
87+
.map {
88+
case "*" => None
89+
case s =>
90+
Some(s.toBoolean)
91+
}
7192
} yield DomainMigrationInfo(
7293
currentMigrationId = currentMigrationId,
73-
acsRecordTime = acsRecordTime,
94+
migrationTimeInfo = for {
95+
recordTime <- acsRecordTime
96+
wasPaused <- synchronizerWasPaused
97+
} yield MigrationTimeInfo(recordTime, wasPaused),
7498
)
7599
}
76100
}

apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/UpdateHistory.scala

Lines changed: 99 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -737,18 +737,82 @@ class UpdateHistory(
737737
historyId: Long
738738
)(implicit tc: TraceContext): Future[Unit] = {
739739
val previousMigrationId = domainMigrationInfo.currentMigrationId - 1
740-
domainMigrationInfo.acsRecordTime match {
741-
case Some(acsRecordTime) =>
740+
domainMigrationInfo.migrationTimeInfo match {
741+
case Some(info) =>
742742
for {
743-
_ <- deleteAcsSnapshotsAfter(historyId, previousMigrationId, acsRecordTime)
744-
_ <- deleteRolledBackUpdateHistory(historyId, previousMigrationId, acsRecordTime)
743+
_ <-
744+
if (info.synchronizerWasPaused) {
745+
for {
746+
_ <- verifyNoRolledBackAcsSnapshots(
747+
historyId,
748+
previousMigrationId,
749+
info.acsRecordTime,
750+
)
751+
_ <- verifyNoRolledBackData(historyId, previousMigrationId, info.acsRecordTime)
752+
} yield ()
753+
} else {
754+
for {
755+
_ <- deleteAcsSnapshotsAfter(historyId, previousMigrationId, info.acsRecordTime)
756+
_ <- deleteRolledBackUpdateHistory(
757+
historyId,
758+
previousMigrationId,
759+
info.acsRecordTime,
760+
)
761+
} yield ()
762+
}
745763
} yield ()
746764
case _ =>
747765
logger.debug("No previous domain migration, not checking or deleting updates")
748766
Future.unit
749767
}
750768
}
751769

770+
private[this] def verifyNoRolledBackData(
771+
historyId: Long, // Not using the storeId from the state, as the state might not be updated yet
772+
migrationId: Long,
773+
recordTime: CantonTimestamp,
774+
)(implicit tc: TraceContext): Future[Unit] = {
775+
val action = DBIO
776+
.sequence(
777+
Seq(
778+
sql"""
779+
select count(*) from update_history_creates
780+
where history_id = $historyId and migration_id = $migrationId and record_time > $recordTime
781+
""".as[Long].head,
782+
sql"""
783+
select count(*) from update_history_exercises
784+
where history_id = $historyId and migration_id = $migrationId and record_time > $recordTime
785+
""".as[Long].head,
786+
sql"""
787+
select count(*) from update_history_transactions
788+
where history_id = $historyId and migration_id = $migrationId and record_time > $recordTime
789+
""".as[Long].head,
790+
sql"""
791+
select count(*) from update_history_assignments
792+
where history_id = $historyId and migration_id = $migrationId and record_time > $recordTime
793+
""".as[Long].head,
794+
sql"""
795+
select count(*) from update_history_unassignments
796+
where history_id = $historyId and migration_id = $migrationId and record_time > $recordTime
797+
""".as[Long].head,
798+
)
799+
)
800+
.map(rows =>
801+
if (rows.sum > 0) {
802+
throw new IllegalStateException(
803+
s"Found $rows rows for $updateStreamParty where migration_id = $migrationId and record_time > $recordTime, " +
804+
"but the configuration says the domain was paused during the migration. " +
805+
"Check the domain migration configuration and the content of the update history database."
806+
)
807+
} else {
808+
logger.debug(
809+
s"No updates found for $updateStreamParty where migration_id = $migrationId and record_time > $recordTime"
810+
)
811+
}
812+
)
813+
storage.query(action, "verifyNoRolledBackData")
814+
}
815+
752816
private[this] def deleteRolledBackUpdateHistory(
753817
historyId: Long, // Not using the storeId from the state, as the state might not be updated yet
754818
migrationId: Long,
@@ -843,6 +907,37 @@ class UpdateHistory(
843907
)
844908
}
845909

910+
def verifyNoRolledBackAcsSnapshots(
911+
historyId: Long,
912+
migrationId: Long,
913+
recordTime: CantonTimestamp,
914+
)(implicit tc: TraceContext): Future[Unit] = {
915+
val action = sql"""
916+
select snapshot_record_time from acs_snapshot
917+
where history_id = $historyId and migration_id = $migrationId and snapshot_record_time > $recordTime
918+
"""
919+
.as[CantonTimestamp]
920+
.map(times =>
921+
if (times.length > 0) {
922+
throw new IllegalStateException(
923+
s"Found acs snapshots at $times for $updateStreamParty where migration_id = $migrationId and record_time > $recordTime, " +
924+
"but the configuration says the domain was paused during the migration. " +
925+
"Check the domain migration configuration and the content of the update history database"
926+
)
927+
} else {
928+
logger.debug(
929+
s"No updates found for $updateStreamParty where migration_id = $migrationId and record_time > $recordTime"
930+
)
931+
}
932+
)
933+
934+
storage
935+
.query(
936+
action,
937+
"verifyNoRolledBackAcsSnapshots",
938+
)
939+
}
940+
846941
/** Deletes all updates on the given domain with a record time before the given time.
847942
*/
848943
def deleteUpdatesBefore(

apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/db/DbMultiDomainAcsStore.scala

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1875,16 +1875,48 @@ final class DbMultiDomainAcsStore[TXE](
18751875
)(implicit tc: TraceContext): Future[Unit] = {
18761876
txLogTableNameOpt.fold(Future.unit) { _ =>
18771877
val previousMigrationId = domainMigrationInfo.currentMigrationId - 1
1878-
domainMigrationInfo.acsRecordTime match {
1879-
case Some(acsRecordTime) =>
1880-
deleteRolledBackTxLogEntries(txLogStoreId, previousMigrationId, acsRecordTime)
1878+
domainMigrationInfo.migrationTimeInfo match {
1879+
case Some(info) =>
1880+
if (info.synchronizerWasPaused) {
1881+
verifyNoRolledBackData(txLogStoreId, previousMigrationId, info.acsRecordTime)
1882+
} else {
1883+
deleteRolledBackTxLogEntries(txLogStoreId, previousMigrationId, info.acsRecordTime)
1884+
}
18811885
case _ =>
18821886
logger.debug("No previous domain migration, not checking or deleting txlog entries")
18831887
Future.unit
18841888
}
18851889
}
18861890
}
18871891

1892+
private[this] def verifyNoRolledBackData(
1893+
txLogStoreId: TxLogStoreId, // Not using the storeId from the state, as the state might not be updated yet
1894+
migrationId: Long,
1895+
recordTime: CantonTimestamp,
1896+
)(implicit tc: TraceContext) = {
1897+
val action =
1898+
sql"""
1899+
select count(*) from #$txLogTableName
1900+
where store_id = $txLogStoreId and migration_id = $migrationId and record_time > $recordTime
1901+
"""
1902+
.as[Long]
1903+
.head
1904+
.map(rows =>
1905+
if (rows > 0) {
1906+
throw new IllegalStateException(
1907+
s"Found $rows rows for $txLogStoreDescriptor where migration_id = $migrationId and record_time > $recordTime, " +
1908+
"but the configuration says the domain was paused during the migration. " +
1909+
"Check the domain migration configuration and the content of the txlog database."
1910+
)
1911+
} else {
1912+
logger.debug(
1913+
s"No txlog entries found for $txLogStoreDescriptor where migration_id = $migrationId and record_time > $recordTime"
1914+
)
1915+
}
1916+
)
1917+
storage.query(action, "verifyNoRolledBackData")
1918+
}
1919+
18881920
private[this] def deleteRolledBackTxLogEntries(
18891921
txLogStoreId: TxLogStoreId, // Not using the storeId from the state, as the state might not be updated yet
18901922
migrationId: Long,

apps/common/src/test/scala/org/lfdecentralizedtrust/splice/store/MultiDomainAcsStoreTest.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import org.lfdecentralizedtrust.splice.codegen.java.splice.api.token.{
2828
}
2929
import org.lfdecentralizedtrust.splice.codegen.java.splice.api.token.test.dummyholding.DummyHolding
3030
import org.lfdecentralizedtrust.splice.codegen.java.splice.api.token.test.dummytwointerfaces.DummyTwoInterfaces
31+
import org.lfdecentralizedtrust.splice.migration.MigrationTimeInfo
3132

3233
import java.time.Instant
3334
import java.util.concurrent.atomic.AtomicReference
@@ -104,6 +105,7 @@ abstract class MultiDomainAcsStoreTest[
104105
GenericAcsRowData,
105106
GenericInterfaceRowData,
106107
] = defaultContractFilter,
108+
migrationTimeInfo: Option[MigrationTimeInfo] = None,
107109
): Store
108110

109111
protected type Store = S

apps/common/src/test/scala/org/lfdecentralizedtrust/splice/store/UpdateHistoryTest.scala

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import org.lfdecentralizedtrust.splice.environment.ledger.api.{
2020
TransactionTreeUpdate,
2121
TreeUpdateOrOffsetCheckpoint,
2222
}
23+
import org.lfdecentralizedtrust.splice.migration.MigrationTimeInfo
2324
import org.lfdecentralizedtrust.splice.util.DomainRecordTimeRange
2425

2526
import java.time.Instant
@@ -617,6 +618,83 @@ class UpdateHistoryTest extends UpdateHistoryTestBase {
617618
} yield succeed
618619
}
619620

621+
"tx rollbacks after migrations are handled correctly" in {
622+
val t0 = time(1)
623+
val t1 = time(2)
624+
val store1 = mkStore(party1, migration1, participant1)
625+
val store2TimeTooEarly = mkStore(
626+
party1,
627+
migration2,
628+
participant1,
629+
migrationTimeInfo = Some(MigrationTimeInfo(t0, synchronizerWasPaused = true)),
630+
)
631+
val store2TimeCorrect = mkStore(
632+
party1,
633+
migration2,
634+
participant1,
635+
migrationTimeInfo = Some(MigrationTimeInfo(t1, synchronizerWasPaused = true)),
636+
)
637+
for {
638+
_ <- initStore(store1)
639+
_ <- create(domain1, cid1, offset1, party1, store1, t0)
640+
_ <- create(domain1, cid2, offset2, party1, store1, t1)
641+
updates1 <- updates(store1)
642+
ex <- recoverToExceptionIf[IllegalStateException](initStore(store2TimeTooEarly))
643+
_ = ex.getMessage should include("Found List(1, 0, 1, 0, 0) rows")
644+
_ <- initStore(store2TimeCorrect)
645+
updates2 <- updates(store2TimeCorrect)
646+
} yield {
647+
checkUpdates(
648+
updates1,
649+
Seq(
650+
ExpectedCreate(cid1, domain1),
651+
ExpectedCreate(cid2, domain1),
652+
),
653+
)
654+
checkUpdates(
655+
updates2,
656+
Seq(
657+
ExpectedCreate(cid1, domain1),
658+
ExpectedCreate(cid2, domain1),
659+
),
660+
)
661+
}
662+
}
663+
664+
"tx rollbacks after DR are handled correctly" in {
665+
val t0 = time(1)
666+
val t1 = time(2)
667+
val store1 = mkStore(party1, migration1, participant1)
668+
val store2TimeTooEarly = mkStore(
669+
party1,
670+
migration2,
671+
participant1,
672+
migrationTimeInfo = Some(MigrationTimeInfo(t0, synchronizerWasPaused = false)),
673+
)
674+
for {
675+
_ <- initStore(store1)
676+
_ <- create(domain1, cid1, offset1, party1, store1, t0)
677+
_ <- create(domain1, cid2, offset2, party1, store1, t1)
678+
updates1 <- updates(store1)
679+
_ <- initStore(store2TimeTooEarly)
680+
updates2 <- updates(store2TimeTooEarly)
681+
} yield {
682+
checkUpdates(
683+
updates1,
684+
Seq(
685+
ExpectedCreate(cid1, domain1),
686+
ExpectedCreate(cid2, domain1),
687+
),
688+
)
689+
checkUpdates(
690+
updates2,
691+
Seq(
692+
ExpectedCreate(cid1, domain1)
693+
),
694+
)
695+
}
696+
}
697+
620698
}
621699

622700
"getImportUpdates" should {

apps/common/src/test/scala/org/lfdecentralizedtrust/splice/store/UpdateHistoryTestBase.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import org.lfdecentralizedtrust.splice.environment.ledger.api.{
99
ReassignmentUpdate,
1010
TransactionTreeUpdate,
1111
}
12-
import org.lfdecentralizedtrust.splice.migration.DomainMigrationInfo
12+
import org.lfdecentralizedtrust.splice.migration.{DomainMigrationInfo, MigrationTimeInfo}
1313
import org.lfdecentralizedtrust.splice.store.db.{AcsJdbcTypes, AcsTables, SplicePostgresTest}
1414
import com.digitalasset.canton.data.CantonTimestamp
1515
import com.digitalasset.canton.lifecycle.FutureUnlessShutdown
@@ -239,12 +239,13 @@ abstract class UpdateHistoryTestBase
239239
participantId: ParticipantId = participant1,
240240
storeName: String = storeName1,
241241
backfillingRequired: BackfillingRequirement = BackfillingRequirement.NeedsBackfilling,
242+
migrationTimeInfo: Option[MigrationTimeInfo] = None,
242243
): UpdateHistory = {
243244
new UpdateHistory(
244245
storage,
245246
DomainMigrationInfo(
246247
domainMigrationId,
247-
None,
248+
migrationTimeInfo,
248249
),
249250
storeName,
250251
participantId,

0 commit comments

Comments
 (0)