Skip to content

Commit 19cb419

Browse files
moritzkiefer-dacocreatureOriolMunoz-da
authored
[release-line-0.4] Don't rollback update history after migration (#2790)
* Don't rollback update history after migration backport from #2674 We technically only need the change to the export format on 0.4 but seems safer to backport the whole PR. [ci] --------- Signed-off-by: Oriol Muñoz <oriol.munoz@digitalasset.com> Signed-off-by: Moritz Kiefer <moritz.kiefer@purelyfunctional.org> --------- Signed-off-by: Moritz Kiefer <moritz.kiefer@purelyfunctional.org> Signed-off-by: Oriol Muñoz <oriol.munoz@digitalasset.com> Co-authored-by: Moritz Kiefer <moritz.kiefer@purelyfunctional.org> Co-authored-by: Oriol Muñoz <oriol.munoz@digitalasset.com> Signed-off-by: Moritz Kiefer <moritz.kiefer@purelyfunctional.org> * Sneak in release notes [ci] Signed-off-by: Moritz Kiefer <moritz.kiefer@purelyfunctional.org> --------- Signed-off-by: Moritz Kiefer <moritz.kiefer@purelyfunctional.org> Signed-off-by: Oriol Muñoz <oriol.munoz@digitalasset.com> Co-authored-by: Moritz Kiefer <moritz.kiefer@purelyfunctional.org> Co-authored-by: Oriol Muñoz <oriol.munoz@digitalasset.com>
1 parent 4dfbc9a commit 19cb419

File tree

21 files changed

+461
-66
lines changed

21 files changed

+461
-66
lines changed

apps/common/src/main/scala/org/lfdecentralizedtrust/splice/migration/DomainMigrationInfo.scala

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,20 @@ import scala.concurrent.{ExecutionContext, Future}
1616
/** Holds information about a hard domain migration
1717
*
1818
* @param currentMigrationId The migration id of the current incarnation of the global domain.
19-
* @param acsRecordTime The record time at which the ACS snapshot was taken on the previous
20-
* incarnation of the global domain.
21-
* None if this is the first incarnation of the global domain.
2219
*/
2320
final case class DomainMigrationInfo(
2421
currentMigrationId: Long,
25-
acsRecordTime: Option[CantonTimestamp],
22+
migrationTimeInfo: Option[MigrationTimeInfo],
23+
)
24+
25+
/** @param acsRecordTime The record time at which the ACS snapshot was taken on the previous
26+
* incarnation of the global domain.
27+
* None if this is the first incarnation of the global domain.
28+
* @param synchronizerWasPaused True when we ran through a proper migration, false for disaster recovery
29+
*/
30+
final case class MigrationTimeInfo(
31+
acsRecordTime: CantonTimestamp,
32+
synchronizerWasPaused: Boolean,
2633
)
2734

2835
object DomainMigrationInfo {
@@ -34,9 +41,13 @@ object DomainMigrationInfo {
3441
connection.ensureUserMetadataAnnotation(
3542
user,
3643
Map(
37-
BaseLedgerConnection.DOMAIN_MIGRATION_ACS_RECORD_TIME_METADATA_KEY -> info.acsRecordTime
44+
BaseLedgerConnection.DOMAIN_MIGRATION_ACS_RECORD_TIME_METADATA_KEY -> info.migrationTimeInfo
45+
.map(_.acsRecordTime)
3846
.fold("*")(_.toProtoPrimitive.toString),
3947
BaseLedgerConnection.DOMAIN_MIGRATION_CURRENT_MIGRATION_ID_METADATA_KEY -> info.currentMigrationId.toString,
48+
BaseLedgerConnection.DOMAIN_MIGRATION_DOMAIN_WAS_PAUSED_KEY -> info.migrationTimeInfo
49+
.map(_.synchronizerWasPaused)
50+
.fold("*")(_.toString),
4051
),
4152
RetryFor.WaitingOnInitDependency,
4253
)
@@ -68,9 +79,22 @@ object DomainMigrationInfo {
6879
BaseLedgerConnection.DOMAIN_MIGRATION_CURRENT_MIGRATION_ID_METADATA_KEY,
6980
)
7081
.map(_.toLong)
82+
synchronizerWasPaused <- connection
83+
.waitForUserMetadata(
84+
user,
85+
BaseLedgerConnection.DOMAIN_MIGRATION_DOMAIN_WAS_PAUSED_KEY,
86+
)
87+
.map {
88+
case "*" => None
89+
case s =>
90+
Some(s.toBoolean)
91+
}
7192
} yield DomainMigrationInfo(
7293
currentMigrationId = currentMigrationId,
73-
acsRecordTime = acsRecordTime,
94+
migrationTimeInfo = for {
95+
recordTime <- acsRecordTime
96+
wasPaused <- synchronizerWasPaused
97+
} yield MigrationTimeInfo(recordTime, wasPaused),
7498
)
7599
}
76100
}

apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/UpdateHistory.scala

Lines changed: 99 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -743,18 +743,82 @@ class UpdateHistory(
743743
historyId: Long
744744
)(implicit tc: TraceContext): Future[Unit] = {
745745
val previousMigrationId = domainMigrationInfo.currentMigrationId - 1
746-
domainMigrationInfo.acsRecordTime match {
747-
case Some(acsRecordTime) =>
746+
domainMigrationInfo.migrationTimeInfo match {
747+
case Some(info) =>
748748
for {
749-
_ <- deleteAcsSnapshotsAfter(historyId, previousMigrationId, acsRecordTime)
750-
_ <- deleteRolledBackUpdateHistory(historyId, previousMigrationId, acsRecordTime)
749+
_ <-
750+
if (info.synchronizerWasPaused) {
751+
for {
752+
_ <- verifyNoRolledBackAcsSnapshots(
753+
historyId,
754+
previousMigrationId,
755+
info.acsRecordTime,
756+
)
757+
_ <- verifyNoRolledBackData(historyId, previousMigrationId, info.acsRecordTime)
758+
} yield ()
759+
} else {
760+
for {
761+
_ <- deleteAcsSnapshotsAfter(historyId, previousMigrationId, info.acsRecordTime)
762+
_ <- deleteRolledBackUpdateHistory(
763+
historyId,
764+
previousMigrationId,
765+
info.acsRecordTime,
766+
)
767+
} yield ()
768+
}
751769
} yield ()
752770
case _ =>
753771
logger.debug("No previous domain migration, not checking or deleting updates")
754772
Future.unit
755773
}
756774
}
757775

776+
private[this] def verifyNoRolledBackData(
777+
historyId: Long, // Not using the storeId from the state, as the state might not be updated yet
778+
migrationId: Long,
779+
recordTime: CantonTimestamp,
780+
)(implicit tc: TraceContext): Future[Unit] = {
781+
val action = DBIO
782+
.sequence(
783+
Seq(
784+
sql"""
785+
select count(*) from update_history_creates
786+
where history_id = $historyId and migration_id = $migrationId and record_time > $recordTime
787+
""".as[Long].head,
788+
sql"""
789+
select count(*) from update_history_exercises
790+
where history_id = $historyId and migration_id = $migrationId and record_time > $recordTime
791+
""".as[Long].head,
792+
sql"""
793+
select count(*) from update_history_transactions
794+
where history_id = $historyId and migration_id = $migrationId and record_time > $recordTime
795+
""".as[Long].head,
796+
sql"""
797+
select count(*) from update_history_assignments
798+
where history_id = $historyId and migration_id = $migrationId and record_time > $recordTime
799+
""".as[Long].head,
800+
sql"""
801+
select count(*) from update_history_unassignments
802+
where history_id = $historyId and migration_id = $migrationId and record_time > $recordTime
803+
""".as[Long].head,
804+
)
805+
)
806+
.map(rows =>
807+
if (rows.sum > 0) {
808+
throw new IllegalStateException(
809+
s"Found $rows rows for $updateStreamParty where migration_id = $migrationId and record_time > $recordTime, " +
810+
"but the configuration says the domain was paused during the migration. " +
811+
"Check the domain migration configuration and the content of the update history database."
812+
)
813+
} else {
814+
logger.debug(
815+
s"No updates found for $updateStreamParty where migration_id = $migrationId and record_time > $recordTime"
816+
)
817+
}
818+
)
819+
storage.query(action, "verifyNoRolledBackData")
820+
}
821+
758822
private[this] def deleteRolledBackUpdateHistory(
759823
historyId: Long, // Not using the storeId from the state, as the state might not be updated yet
760824
migrationId: Long,
@@ -849,6 +913,37 @@ class UpdateHistory(
849913
)
850914
}
851915

916+
def verifyNoRolledBackAcsSnapshots(
917+
historyId: Long,
918+
migrationId: Long,
919+
recordTime: CantonTimestamp,
920+
)(implicit tc: TraceContext): Future[Unit] = {
921+
val action = sql"""
922+
select snapshot_record_time from acs_snapshot
923+
where history_id = $historyId and migration_id = $migrationId and snapshot_record_time > $recordTime
924+
"""
925+
.as[CantonTimestamp]
926+
.map(times =>
927+
if (times.length > 0) {
928+
throw new IllegalStateException(
929+
s"Found acs snapshots at $times for $updateStreamParty where migration_id = $migrationId and record_time > $recordTime, " +
930+
"but the configuration says the domain was paused during the migration. " +
931+
"Check the domain migration configuration and the content of the update history database"
932+
)
933+
} else {
934+
logger.debug(
935+
s"No updates found for $updateStreamParty where migration_id = $migrationId and record_time > $recordTime"
936+
)
937+
}
938+
)
939+
940+
storage
941+
.query(
942+
action,
943+
"verifyNoRolledBackAcsSnapshots",
944+
)
945+
}
946+
852947
/** Deletes all updates on the given domain with a record time before the given time.
853948
*/
854949
def deleteUpdatesBefore(

apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/db/DbMultiDomainAcsStore.scala

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1909,16 +1909,48 @@ final class DbMultiDomainAcsStore[TXE](
19091909
)(implicit tc: TraceContext): Future[Unit] = {
19101910
txLogTableNameOpt.fold(Future.unit) { _ =>
19111911
val previousMigrationId = domainMigrationInfo.currentMigrationId - 1
1912-
domainMigrationInfo.acsRecordTime match {
1913-
case Some(acsRecordTime) =>
1914-
deleteRolledBackTxLogEntries(txLogStoreId, previousMigrationId, acsRecordTime)
1912+
domainMigrationInfo.migrationTimeInfo match {
1913+
case Some(info) =>
1914+
if (info.synchronizerWasPaused) {
1915+
verifyNoRolledBackData(txLogStoreId, previousMigrationId, info.acsRecordTime)
1916+
} else {
1917+
deleteRolledBackTxLogEntries(txLogStoreId, previousMigrationId, info.acsRecordTime)
1918+
}
19151919
case _ =>
19161920
logger.debug("No previous domain migration, not checking or deleting txlog entries")
19171921
Future.unit
19181922
}
19191923
}
19201924
}
19211925

1926+
private[this] def verifyNoRolledBackData(
1927+
txLogStoreId: TxLogStoreId, // Not using the storeId from the state, as the state might not be updated yet
1928+
migrationId: Long,
1929+
recordTime: CantonTimestamp,
1930+
)(implicit tc: TraceContext) = {
1931+
val action =
1932+
sql"""
1933+
select count(*) from #$txLogTableName
1934+
where store_id = $txLogStoreId and migration_id = $migrationId and record_time > $recordTime
1935+
"""
1936+
.as[Long]
1937+
.head
1938+
.map(rows =>
1939+
if (rows > 0) {
1940+
throw new IllegalStateException(
1941+
s"Found $rows rows for $txLogStoreDescriptor where migration_id = $migrationId and record_time > $recordTime, " +
1942+
"but the configuration says the domain was paused during the migration. " +
1943+
"Check the domain migration configuration and the content of the txlog database."
1944+
)
1945+
} else {
1946+
logger.debug(
1947+
s"No txlog entries found for $txLogStoreDescriptor where migration_id = $migrationId and record_time > $recordTime"
1948+
)
1949+
}
1950+
)
1951+
storage.query(action, "verifyNoRolledBackData")
1952+
}
1953+
19221954
private[this] def deleteRolledBackTxLogEntries(
19231955
txLogStoreId: TxLogStoreId, // Not using the storeId from the state, as the state might not be updated yet
19241956
migrationId: Long,

apps/common/src/test/scala/org/lfdecentralizedtrust/splice/store/MultiDomainAcsStoreTest.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import org.lfdecentralizedtrust.splice.codegen.java.splice.api.token.{
2828
}
2929
import org.lfdecentralizedtrust.splice.codegen.java.splice.api.token.test.dummyholding.DummyHolding
3030
import org.lfdecentralizedtrust.splice.codegen.java.splice.api.token.test.dummytwointerfaces.DummyTwoInterfaces
31+
import org.lfdecentralizedtrust.splice.migration.MigrationTimeInfo
3132

3233
import java.time.Instant
3334
import java.util.concurrent.atomic.AtomicReference
@@ -104,6 +105,7 @@ abstract class MultiDomainAcsStoreTest[
104105
GenericAcsRowData,
105106
GenericInterfaceRowData,
106107
] = defaultContractFilter,
108+
migrationTimeInfo: Option[MigrationTimeInfo] = None,
107109
): Store
108110

109111
protected type Store = S

apps/common/src/test/scala/org/lfdecentralizedtrust/splice/store/UpdateHistoryTest.scala

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import org.lfdecentralizedtrust.splice.environment.ledger.api.{
2020
TransactionTreeUpdate,
2121
TreeUpdateOrOffsetCheckpoint,
2222
}
23+
import org.lfdecentralizedtrust.splice.migration.MigrationTimeInfo
2324
import org.lfdecentralizedtrust.splice.util.DomainRecordTimeRange
2425

2526
import java.time.Instant
@@ -614,6 +615,83 @@ class UpdateHistoryTest extends UpdateHistoryTestBase {
614615
} yield succeed
615616
}
616617

618+
"tx rollbacks after migrations are handled correctly" in {
619+
val t0 = time(1)
620+
val t1 = time(2)
621+
val store1 = mkStore(party1, migration1, participant1)
622+
val store2TimeTooEarly = mkStore(
623+
party1,
624+
migration2,
625+
participant1,
626+
migrationTimeInfo = Some(MigrationTimeInfo(t0, synchronizerWasPaused = true)),
627+
)
628+
val store2TimeCorrect = mkStore(
629+
party1,
630+
migration2,
631+
participant1,
632+
migrationTimeInfo = Some(MigrationTimeInfo(t1, synchronizerWasPaused = true)),
633+
)
634+
for {
635+
_ <- initStore(store1)
636+
_ <- create(domain1, cid1, offset1, party1, store1, t0)
637+
_ <- create(domain1, cid2, offset2, party1, store1, t1)
638+
updates1 <- updates(store1)
639+
ex <- recoverToExceptionIf[IllegalStateException](initStore(store2TimeTooEarly))
640+
_ = ex.getMessage should include("Found List(1, 0, 1, 0, 0) rows")
641+
_ <- initStore(store2TimeCorrect)
642+
updates2 <- updates(store2TimeCorrect)
643+
} yield {
644+
checkUpdates(
645+
updates1,
646+
Seq(
647+
ExpectedCreate(cid1, domain1),
648+
ExpectedCreate(cid2, domain1),
649+
),
650+
)
651+
checkUpdates(
652+
updates2,
653+
Seq(
654+
ExpectedCreate(cid1, domain1),
655+
ExpectedCreate(cid2, domain1),
656+
),
657+
)
658+
}
659+
}
660+
661+
"tx rollbacks after DR are handled correctly" in {
662+
val t0 = time(1)
663+
val t1 = time(2)
664+
val store1 = mkStore(party1, migration1, participant1)
665+
val store2TimeTooEarly = mkStore(
666+
party1,
667+
migration2,
668+
participant1,
669+
migrationTimeInfo = Some(MigrationTimeInfo(t0, synchronizerWasPaused = false)),
670+
)
671+
for {
672+
_ <- initStore(store1)
673+
_ <- create(domain1, cid1, offset1, party1, store1, t0)
674+
_ <- create(domain1, cid2, offset2, party1, store1, t1)
675+
updates1 <- updates(store1)
676+
_ <- initStore(store2TimeTooEarly)
677+
updates2 <- updates(store2TimeTooEarly)
678+
} yield {
679+
checkUpdates(
680+
updates1,
681+
Seq(
682+
ExpectedCreate(cid1, domain1),
683+
ExpectedCreate(cid2, domain1),
684+
),
685+
)
686+
checkUpdates(
687+
updates2,
688+
Seq(
689+
ExpectedCreate(cid1, domain1)
690+
),
691+
)
692+
}
693+
}
694+
617695
}
618696

619697
"getImportUpdates" should {

apps/common/src/test/scala/org/lfdecentralizedtrust/splice/store/UpdateHistoryTestBase.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import org.lfdecentralizedtrust.splice.environment.ledger.api.{
99
ReassignmentUpdate,
1010
TransactionTreeUpdate,
1111
}
12-
import org.lfdecentralizedtrust.splice.migration.DomainMigrationInfo
12+
import org.lfdecentralizedtrust.splice.migration.{DomainMigrationInfo, MigrationTimeInfo}
1313
import org.lfdecentralizedtrust.splice.store.db.{AcsJdbcTypes, AcsTables, SplicePostgresTest}
1414
import com.digitalasset.canton.data.CantonTimestamp
1515
import com.digitalasset.canton.lifecycle.FutureUnlessShutdown
@@ -239,12 +239,13 @@ abstract class UpdateHistoryTestBase
239239
participantId: ParticipantId = participant1,
240240
storeName: String = storeName1,
241241
backfillingRequired: BackfillingRequirement = BackfillingRequirement.NeedsBackfilling,
242+
migrationTimeInfo: Option[MigrationTimeInfo] = None,
242243
): UpdateHistory = {
243244
new UpdateHistory(
244245
storage,
245246
DomainMigrationInfo(
246247
domainMigrationId,
247-
None,
248+
migrationTimeInfo,
248249
),
249250
storeName,
250251
participantId,

0 commit comments

Comments
 (0)