diff --git a/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/migration/DomainMigrationInfo.scala b/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/migration/DomainMigrationInfo.scala index 4e8ded9b46..95f4c1ea57 100644 --- a/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/migration/DomainMigrationInfo.scala +++ b/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/migration/DomainMigrationInfo.scala @@ -16,13 +16,20 @@ import scala.concurrent.{ExecutionContext, Future} /** Holds information about a hard domain migration * * @param currentMigrationId The migration id of the current incarnation of the global domain. - * @param acsRecordTime The record time at which the ACS snapshot was taken on the previous - * incarnation of the global domain. - * None if this is the first incarnation of the global domain. */ final case class DomainMigrationInfo( currentMigrationId: Long, - acsRecordTime: Option[CantonTimestamp], + migrationTimeInfo: Option[MigrationTimeInfo], +) + +/** @param acsRecordTime The record time at which the ACS snapshot was taken on the previous + * incarnation of the global domain. + * None if this is the first incarnation of the global domain. + * @param synchronizerWasPaused True when we ran through a proper migration, false for disaster recovery + */ +final case class MigrationTimeInfo( + acsRecordTime: CantonTimestamp, + synchronizerWasPaused: Boolean, ) object DomainMigrationInfo { @@ -34,9 +41,13 @@ object DomainMigrationInfo { connection.ensureUserMetadataAnnotation( user, Map( - BaseLedgerConnection.DOMAIN_MIGRATION_ACS_RECORD_TIME_METADATA_KEY -> info.acsRecordTime + BaseLedgerConnection.DOMAIN_MIGRATION_ACS_RECORD_TIME_METADATA_KEY -> info.migrationTimeInfo + .map(_.acsRecordTime) .fold("*")(_.toProtoPrimitive.toString), BaseLedgerConnection.DOMAIN_MIGRATION_CURRENT_MIGRATION_ID_METADATA_KEY -> info.currentMigrationId.toString, + BaseLedgerConnection.DOMAIN_MIGRATION_DOMAIN_WAS_PAUSED_KEY -> info.migrationTimeInfo + .map(_.synchronizerWasPaused) + .fold("*")(_.toString), ), RetryFor.WaitingOnInitDependency, ) @@ -68,9 +79,22 @@ object DomainMigrationInfo { BaseLedgerConnection.DOMAIN_MIGRATION_CURRENT_MIGRATION_ID_METADATA_KEY, ) .map(_.toLong) + synchronizerWasPaused <- connection + .waitForUserMetadata( + user, + BaseLedgerConnection.DOMAIN_MIGRATION_DOMAIN_WAS_PAUSED_KEY, + ) + .map { + case "*" => None + case s => + Some(s.toBoolean) + } } yield DomainMigrationInfo( currentMigrationId = currentMigrationId, - acsRecordTime = acsRecordTime, + migrationTimeInfo = for { + recordTime <- acsRecordTime + wasPaused <- synchronizerWasPaused + } yield MigrationTimeInfo(recordTime, wasPaused), ) } } diff --git a/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/UpdateHistory.scala b/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/UpdateHistory.scala index 563d6ea69c..feec06390c 100644 --- a/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/UpdateHistory.scala +++ b/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/UpdateHistory.scala @@ -737,11 +737,29 @@ class UpdateHistory( historyId: Long )(implicit tc: TraceContext): Future[Unit] = { val previousMigrationId = domainMigrationInfo.currentMigrationId - 1 - domainMigrationInfo.acsRecordTime match { - case Some(acsRecordTime) => + domainMigrationInfo.migrationTimeInfo match { + case Some(info) => for { - _ <- deleteAcsSnapshotsAfter(historyId, previousMigrationId, acsRecordTime) - _ <- deleteRolledBackUpdateHistory(historyId, previousMigrationId, acsRecordTime) + _ <- + if (info.synchronizerWasPaused) { + for { + _ <- verifyNoRolledBackAcsSnapshots( + historyId, + previousMigrationId, + info.acsRecordTime, + ) + _ <- verifyNoRolledBackData(historyId, previousMigrationId, info.acsRecordTime) + } yield () + } else { + for { + _ <- deleteAcsSnapshotsAfter(historyId, previousMigrationId, info.acsRecordTime) + _ <- deleteRolledBackUpdateHistory( + historyId, + previousMigrationId, + info.acsRecordTime, + ) + } yield () + } } yield () case _ => logger.debug("No previous domain migration, not checking or deleting updates") @@ -749,6 +767,52 @@ class UpdateHistory( } } + private[this] def verifyNoRolledBackData( + historyId: Long, // Not using the storeId from the state, as the state might not be updated yet + migrationId: Long, + recordTime: CantonTimestamp, + )(implicit tc: TraceContext): Future[Unit] = { + val action = DBIO + .sequence( + Seq( + sql""" + select count(*) from update_history_creates + where history_id = $historyId and migration_id = $migrationId and record_time > $recordTime + """.as[Long].head, + sql""" + select count(*) from update_history_exercises + where history_id = $historyId and migration_id = $migrationId and record_time > $recordTime + """.as[Long].head, + sql""" + select count(*) from update_history_transactions + where history_id = $historyId and migration_id = $migrationId and record_time > $recordTime + """.as[Long].head, + sql""" + select count(*) from update_history_assignments + where history_id = $historyId and migration_id = $migrationId and record_time > $recordTime + """.as[Long].head, + sql""" + select count(*) from update_history_unassignments + where history_id = $historyId and migration_id = $migrationId and record_time > $recordTime + """.as[Long].head, + ) + ) + .map(rows => + if (rows.sum > 0) { + throw new IllegalStateException( + s"Found $rows rows for $updateStreamParty where migration_id = $migrationId and record_time > $recordTime, " + + "but the configuration says the domain was paused during the migration. " + + "Check the domain migration configuration and the content of the update history database." + ) + } else { + logger.debug( + s"No updates found for $updateStreamParty where migration_id = $migrationId and record_time > $recordTime" + ) + } + ) + storage.query(action, "verifyNoRolledBackData") + } + private[this] def deleteRolledBackUpdateHistory( historyId: Long, // Not using the storeId from the state, as the state might not be updated yet migrationId: Long, @@ -843,6 +907,37 @@ class UpdateHistory( ) } + def verifyNoRolledBackAcsSnapshots( + historyId: Long, + migrationId: Long, + recordTime: CantonTimestamp, + )(implicit tc: TraceContext): Future[Unit] = { + val action = sql""" + select snapshot_record_time from acs_snapshot + where history_id = $historyId and migration_id = $migrationId and snapshot_record_time > $recordTime + """ + .as[CantonTimestamp] + .map(times => + if (times.length > 0) { + throw new IllegalStateException( + s"Found acs snapshots at $times for $updateStreamParty where migration_id = $migrationId and record_time > $recordTime, " + + "but the configuration says the domain was paused during the migration. " + + "Check the domain migration configuration and the content of the update history database" + ) + } else { + logger.debug( + s"No updates found for $updateStreamParty where migration_id = $migrationId and record_time > $recordTime" + ) + } + ) + + storage + .query( + action, + "verifyNoRolledBackAcsSnapshots", + ) + } + /** Deletes all updates on the given domain with a record time before the given time. */ def deleteUpdatesBefore( diff --git a/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/db/DbMultiDomainAcsStore.scala b/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/db/DbMultiDomainAcsStore.scala index 4182191bf0..cc5dee48a5 100644 --- a/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/db/DbMultiDomainAcsStore.scala +++ b/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/db/DbMultiDomainAcsStore.scala @@ -1875,9 +1875,13 @@ final class DbMultiDomainAcsStore[TXE]( )(implicit tc: TraceContext): Future[Unit] = { txLogTableNameOpt.fold(Future.unit) { _ => val previousMigrationId = domainMigrationInfo.currentMigrationId - 1 - domainMigrationInfo.acsRecordTime match { - case Some(acsRecordTime) => - deleteRolledBackTxLogEntries(txLogStoreId, previousMigrationId, acsRecordTime) + domainMigrationInfo.migrationTimeInfo match { + case Some(info) => + if (info.synchronizerWasPaused) { + verifyNoRolledBackData(txLogStoreId, previousMigrationId, info.acsRecordTime) + } else { + deleteRolledBackTxLogEntries(txLogStoreId, previousMigrationId, info.acsRecordTime) + } case _ => logger.debug("No previous domain migration, not checking or deleting txlog entries") Future.unit @@ -1885,6 +1889,34 @@ final class DbMultiDomainAcsStore[TXE]( } } + private[this] def verifyNoRolledBackData( + txLogStoreId: TxLogStoreId, // Not using the storeId from the state, as the state might not be updated yet + migrationId: Long, + recordTime: CantonTimestamp, + )(implicit tc: TraceContext) = { + val action = + sql""" + select count(*) from #$txLogTableName + where store_id = $txLogStoreId and migration_id = $migrationId and record_time > $recordTime + """ + .as[Long] + .head + .map(rows => + if (rows > 0) { + throw new IllegalStateException( + s"Found $rows rows for $txLogStoreDescriptor where migration_id = $migrationId and record_time > $recordTime, " + + "but the configuration says the domain was paused during the migration. " + + "Check the domain migration configuration and the content of the txlog database." + ) + } else { + logger.debug( + s"No txlog entries found for $txLogStoreDescriptor where migration_id = $migrationId and record_time > $recordTime" + ) + } + ) + storage.query(action, "verifyNoRolledBackData") + } + private[this] def deleteRolledBackTxLogEntries( txLogStoreId: TxLogStoreId, // Not using the storeId from the state, as the state might not be updated yet migrationId: Long, diff --git a/apps/common/src/test/scala/org/lfdecentralizedtrust/splice/store/MultiDomainAcsStoreTest.scala b/apps/common/src/test/scala/org/lfdecentralizedtrust/splice/store/MultiDomainAcsStoreTest.scala index 79fbee220a..e6d2d654df 100644 --- a/apps/common/src/test/scala/org/lfdecentralizedtrust/splice/store/MultiDomainAcsStoreTest.scala +++ b/apps/common/src/test/scala/org/lfdecentralizedtrust/splice/store/MultiDomainAcsStoreTest.scala @@ -28,6 +28,7 @@ import org.lfdecentralizedtrust.splice.codegen.java.splice.api.token.{ } import org.lfdecentralizedtrust.splice.codegen.java.splice.api.token.test.dummyholding.DummyHolding import org.lfdecentralizedtrust.splice.codegen.java.splice.api.token.test.dummytwointerfaces.DummyTwoInterfaces +import org.lfdecentralizedtrust.splice.migration.MigrationTimeInfo import java.time.Instant import java.util.concurrent.atomic.AtomicReference @@ -104,6 +105,7 @@ abstract class MultiDomainAcsStoreTest[ GenericAcsRowData, GenericInterfaceRowData, ] = defaultContractFilter, + migrationTimeInfo: Option[MigrationTimeInfo] = None, ): Store protected type Store = S diff --git a/apps/common/src/test/scala/org/lfdecentralizedtrust/splice/store/UpdateHistoryTest.scala b/apps/common/src/test/scala/org/lfdecentralizedtrust/splice/store/UpdateHistoryTest.scala index 900171b667..d9a2a9598f 100644 --- a/apps/common/src/test/scala/org/lfdecentralizedtrust/splice/store/UpdateHistoryTest.scala +++ b/apps/common/src/test/scala/org/lfdecentralizedtrust/splice/store/UpdateHistoryTest.scala @@ -20,6 +20,7 @@ import org.lfdecentralizedtrust.splice.environment.ledger.api.{ TransactionTreeUpdate, TreeUpdateOrOffsetCheckpoint, } +import org.lfdecentralizedtrust.splice.migration.MigrationTimeInfo import org.lfdecentralizedtrust.splice.util.DomainRecordTimeRange import java.time.Instant @@ -617,6 +618,83 @@ class UpdateHistoryTest extends UpdateHistoryTestBase { } yield succeed } + "tx rollbacks after migrations are handled correctly" in { + val t0 = time(1) + val t1 = time(2) + val store1 = mkStore(party1, migration1, participant1) + val store2TimeTooEarly = mkStore( + party1, + migration2, + participant1, + migrationTimeInfo = Some(MigrationTimeInfo(t0, synchronizerWasPaused = true)), + ) + val store2TimeCorrect = mkStore( + party1, + migration2, + participant1, + migrationTimeInfo = Some(MigrationTimeInfo(t1, synchronizerWasPaused = true)), + ) + for { + _ <- initStore(store1) + _ <- create(domain1, cid1, offset1, party1, store1, t0) + _ <- create(domain1, cid2, offset2, party1, store1, t1) + updates1 <- updates(store1) + ex <- recoverToExceptionIf[IllegalStateException](initStore(store2TimeTooEarly)) + _ = ex.getMessage should include("Found List(1, 0, 1, 0, 0) rows") + _ <- initStore(store2TimeCorrect) + updates2 <- updates(store2TimeCorrect) + } yield { + checkUpdates( + updates1, + Seq( + ExpectedCreate(cid1, domain1), + ExpectedCreate(cid2, domain1), + ), + ) + checkUpdates( + updates2, + Seq( + ExpectedCreate(cid1, domain1), + ExpectedCreate(cid2, domain1), + ), + ) + } + } + + "tx rollbacks after DR are handled correctly" in { + val t0 = time(1) + val t1 = time(2) + val store1 = mkStore(party1, migration1, participant1) + val store2TimeTooEarly = mkStore( + party1, + migration2, + participant1, + migrationTimeInfo = Some(MigrationTimeInfo(t0, synchronizerWasPaused = false)), + ) + for { + _ <- initStore(store1) + _ <- create(domain1, cid1, offset1, party1, store1, t0) + _ <- create(domain1, cid2, offset2, party1, store1, t1) + updates1 <- updates(store1) + _ <- initStore(store2TimeTooEarly) + updates2 <- updates(store2TimeTooEarly) + } yield { + checkUpdates( + updates1, + Seq( + ExpectedCreate(cid1, domain1), + ExpectedCreate(cid2, domain1), + ), + ) + checkUpdates( + updates2, + Seq( + ExpectedCreate(cid1, domain1) + ), + ) + } + } + } "getImportUpdates" should { diff --git a/apps/common/src/test/scala/org/lfdecentralizedtrust/splice/store/UpdateHistoryTestBase.scala b/apps/common/src/test/scala/org/lfdecentralizedtrust/splice/store/UpdateHistoryTestBase.scala index 07154d2f05..d2aec3f2fe 100644 --- a/apps/common/src/test/scala/org/lfdecentralizedtrust/splice/store/UpdateHistoryTestBase.scala +++ b/apps/common/src/test/scala/org/lfdecentralizedtrust/splice/store/UpdateHistoryTestBase.scala @@ -9,7 +9,7 @@ import org.lfdecentralizedtrust.splice.environment.ledger.api.{ ReassignmentUpdate, TransactionTreeUpdate, } -import org.lfdecentralizedtrust.splice.migration.DomainMigrationInfo +import org.lfdecentralizedtrust.splice.migration.{DomainMigrationInfo, MigrationTimeInfo} import org.lfdecentralizedtrust.splice.store.db.{AcsJdbcTypes, AcsTables, SplicePostgresTest} import com.digitalasset.canton.data.CantonTimestamp import com.digitalasset.canton.lifecycle.FutureUnlessShutdown @@ -239,12 +239,13 @@ abstract class UpdateHistoryTestBase participantId: ParticipantId = participant1, storeName: String = storeName1, backfillingRequired: BackfillingRequirement = BackfillingRequirement.NeedsBackfilling, + migrationTimeInfo: Option[MigrationTimeInfo] = None, ): UpdateHistory = { new UpdateHistory( storage, DomainMigrationInfo( domainMigrationId, - None, + migrationTimeInfo, ), storeName, participantId, diff --git a/apps/common/src/test/scala/org/lfdecentralizedtrust/splice/store/db/DbMultiDomainAcsStoreTest.scala b/apps/common/src/test/scala/org/lfdecentralizedtrust/splice/store/db/DbMultiDomainAcsStoreTest.scala index eb99d8c4a6..d9489ca3d5 100644 --- a/apps/common/src/test/scala/org/lfdecentralizedtrust/splice/store/db/DbMultiDomainAcsStoreTest.scala +++ b/apps/common/src/test/scala/org/lfdecentralizedtrust/splice/store/db/DbMultiDomainAcsStoreTest.scala @@ -9,7 +9,7 @@ import org.lfdecentralizedtrust.splice.environment.ledger.api.{ TransactionTreeUpdate, TreeUpdateOrOffsetCheckpoint, } -import org.lfdecentralizedtrust.splice.migration.DomainMigrationInfo +import org.lfdecentralizedtrust.splice.migration.{DomainMigrationInfo, MigrationTimeInfo} import org.lfdecentralizedtrust.splice.store.StoreTest.testTxLogConfig import org.lfdecentralizedtrust.splice.store.{ HardLimit, @@ -546,6 +546,64 @@ class DbMultiDomainAcsStoreTest _ = store.containsArchived(Seq()).futureValue shouldBe false } yield succeed } + + "tx rollbacks after migrations are handled correctly" in { + import com.digitalasset.canton.data.CantonTimestamp + val store1 = mkStore(acsId = 1, txLogId = Some(1), migrationId = 1L) + val coupon1 = c(1) + val coupon2 = c(2) + val t0 = CantonTimestamp.Epoch + val t1 = CantonTimestamp.Epoch.plusSeconds(60) + val store2TimeTooEarly = mkStore( + acsId = 1, + txLogId = Some(1), + migrationId = 2L, + migrationTimeInfo = Some(MigrationTimeInfo(t0, synchronizerWasPaused = true)), + ) + val store2CorrectTime = mkStore( + acsId = 1, + txLogId = Some(1), + migrationId = 2L, + migrationTimeInfo = Some(MigrationTimeInfo(t1, synchronizerWasPaused = true)), + ) + for { + _ <- initWithAcs()(store1) + _ <- d1.create(coupon1, recordTime = t0.toInstant)(store1) + _ <- d1.create(coupon2, recordTime = t1.toInstant)(store1) + txLogs <- store1.listTxLogEntries() + _ = txLogs should have size 2 + ex <- recoverToExceptionIf[IllegalStateException](initWithAcs()(store2TimeTooEarly)) + _ = ex.getMessage should include("Found 1 rows") + _ <- initWithAcs()(store2CorrectTime) + txLogs <- store2CorrectTime.listTxLogEntries() + _ = txLogs should have size 2 + } yield succeed + } + + "tx rollbacks after DR are handled correctly" in { + import com.digitalasset.canton.data.CantonTimestamp + val store1 = mkStore(acsId = 1, txLogId = Some(1), migrationId = 1L) + val coupon1 = c(1) + val coupon2 = c(2) + val t0 = CantonTimestamp.Epoch + val t1 = CantonTimestamp.Epoch.plusSeconds(60) + val store2TimeTooEarlyDR = mkStore( + acsId = 1, + txLogId = Some(1), + migrationId = 2L, + migrationTimeInfo = Some(MigrationTimeInfo(t0, synchronizerWasPaused = false)), + ) + for { + _ <- initWithAcs()(store1) + _ <- d1.create(coupon1, recordTime = t0.toInstant)(store1) + _ <- d1.create(coupon2, recordTime = t1.toInstant)(store1) + txLogs <- store1.listTxLogEntries() + _ = txLogs should have size 2 + _ <- initWithAcs()(store2TimeTooEarlyDR) + txLogs <- store2TimeTooEarlyDR.listTxLogEntries() + _ = txLogs should have size 1 + } yield succeed + } } private def failedViewStatus(msg: String) = { @@ -607,6 +665,7 @@ class DbMultiDomainAcsStoreTest GenericAcsRowData, GenericInterfaceRowData, ], + migrationTimeInfo: Option[MigrationTimeInfo], ) = { mkStoreWithAcsRowDataF( acsId, @@ -617,6 +676,7 @@ class DbMultiDomainAcsStoreTest "acs_store_template", txLogId.map(_ => "txlog_store_template"), Some("interface_views_template"), + migrationTimeInfo, ) } @@ -629,6 +689,7 @@ class DbMultiDomainAcsStoreTest acsTableName: String, txLogTableName: Option[String], interfaceViewsTableNameOpt: Option[String], + migrationTimeInfo: Option[MigrationTimeInfo] = None, ) = { val packageSignatures = ResourceTemplateDecoder.loadPackageSignaturesFromResources( @@ -649,7 +710,7 @@ class DbMultiDomainAcsStoreTest testTxLogConfig, DomainMigrationInfo( migrationId, - None, + migrationTimeInfo, ), RetryProvider(loggerFactory, timeouts, FutureSupervisor.Noop, NoOpMetricsFactory), ) diff --git a/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/automation/AcsSnapshotTrigger.scala b/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/automation/AcsSnapshotTrigger.scala index 28830aa479..39c95cce8b 100644 --- a/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/automation/AcsSnapshotTrigger.scala +++ b/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/automation/AcsSnapshotTrigger.scala @@ -167,6 +167,26 @@ class AcsSnapshotTrigger( ): Future[Option[AcsSnapshotTrigger.Task]] = { for { migrationRecordTimeRange <- updateHistory.getRecordTimeRange(migrationIdToBackfill) + maxTime = migrationRecordTimeRange + .map(_._2.max) + .maxOption + .getOrElse( + throw new IllegalStateException( + s"SynchronizerId with no data in $migrationRecordTimeRange" + ) + ) + minTime = migrationRecordTimeRange + .map(_._2.min) + .minOption + .getOrElse( + throw new IllegalStateException( + s"SynchronizerId with no data in $migrationRecordTimeRange" + ) + ) + firstSnapshotTime = computeFirstSnapshotTime(minTime) + migrationLastedLongEnough = firstSnapshotTime + .plus(Duration.ofHours(snapshotPeriodHours.toLong)) + .isBefore(maxTime) latestSnapshot <- store .lookupSnapshotBefore(migrationIdToBackfill, CantonTimestamp.MaxValue) task <- latestSnapshot match { @@ -176,14 +196,7 @@ class AcsSnapshotTrigger( case Some(snapshot) if snapshot.snapshotRecordTime.plus( Duration.ofHours(snapshotPeriodHours.toLong) - ) > migrationRecordTimeRange - .map(_._2.max) - .maxOption - .getOrElse( - throw new IllegalStateException( - s"SynchronizerId with no data in $migrationRecordTimeRange" - ) - ) => + ) > maxTime => logger.info( s"Backfilling of migration id $migrationIdToBackfill is complete. Trying with next oldest." ) @@ -196,6 +209,12 @@ class AcsSnapshotTrigger( .Task(nextSnapshotTime(snapshot), migrationIdToBackfill, Some(snapshot)) ) ) + case None if !migrationLastedLongEnough => + logger.info( + s"Migration id $migrationIdToBackfill didn't last more than $snapshotPeriodHours hours (from $minTime to $maxTime), so it won't have any snapshots." + ) + lastCompleteBackfilledMigrationId.set(Right(migrationIdToBackfill)) + taskToContinueBackfillingACSSnapshots().value case None => firstSnapshotForMigrationIdTask(migrationIdToBackfill) } @@ -249,20 +268,27 @@ class AcsSnapshotTrigger( logger.info(s"No updates other than ACS imports found. Retrying snapshot creation later.") None case Some(firstNonAcsImport) => - val firstNonAcsImportRecordTime = - firstNonAcsImport.update.update.recordTime.toInstant.atOffset(ZoneOffset.UTC) - val (hourForSnapshot, plusDays) = timesToDoSnapshot - .find(_ > firstNonAcsImportRecordTime.get(ChronoField.HOUR_OF_DAY)) match { - case Some(hour) => hour -> 0 // current day at hour - case None => 0 -> 1 // next day at 00:00 - } - val until = firstNonAcsImportRecordTime.toLocalDate - .plusDays(plusDays.toLong) - .atTime(hourForSnapshot, 0) - .toInstant(ZoneOffset.UTC) - Some(AcsSnapshotTrigger.Task(CantonTimestamp.assertFromInstant(until), migrationId, None)) + val firstNonAcsImportRecordTime = firstNonAcsImport.update.update.recordTime + Some( + AcsSnapshotTrigger + .Task(computeFirstSnapshotTime(firstNonAcsImportRecordTime), migrationId, None) + ) } } + + private def computeFirstSnapshotTime(firstNonAcsImportRecordTime: CantonTimestamp) = { + val firstUpdateUTCTime = firstNonAcsImportRecordTime.toInstant.atOffset(ZoneOffset.UTC) + val (hourForSnapshot, plusDays) = timesToDoSnapshot + .find(_ > firstUpdateUTCTime.get(ChronoField.HOUR_OF_DAY)) match { + case Some(hour) => hour -> 0 // current day at hour + case None => 0 -> 1 // next day at 00:00 + } + val until = firstUpdateUTCTime.toLocalDate + .plusDays(plusDays.toLong) + .atTime(hourForSnapshot, 0) + .toInstant(ZoneOffset.UTC) + CantonTimestamp.assertFromInstant(until) + } } object AcsSnapshotTrigger { diff --git a/apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/scan/automation/AcsSnapshotTriggerTest.scala b/apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/scan/automation/AcsSnapshotTriggerTest.scala index 93f3dadcd9..27df130fa2 100644 --- a/apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/scan/automation/AcsSnapshotTriggerTest.scala +++ b/apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/scan/automation/AcsSnapshotTriggerTest.scala @@ -346,6 +346,31 @@ class AcsSnapshotTriggerTest trigger.isDoneBackfillingAcsSnapshots should be(false) } + "return no task if the previous migration id is backfilled but didn't last long enough" in new AcsSnapshotTriggerTestScope( + true, + currentMigrationId = 1L, + ) { + // no snapshot needed for the current migration id + historyBackfilled(currentMigrationId, complete = true) + previousSnapshot(now.minusSeconds(60L), currentMigrationId) + + // therefore we attempt to backfill the previous migration id, + // which is backfilled + val previousMigrationId = currentMigrationId - 1 + historyBackfilled(previousMigrationId, complete = true) + // last update happened shortly after the end + recordTimeRange( + previousMigrationId, + // didn't last enough + min = CantonTimestamp.MinValue.plusSeconds(3600L - 1), + max = CantonTimestamp.MinValue.plusSeconds(3600L + 600), + ) + noPreviousSnapshot(previousMigrationId) + + trigger.retrieveTasks().futureValue should be(Seq.empty) + trigger.isDoneBackfillingAcsSnapshots should be(true) + } + "return the first task to backfill the previous migration id" in new AcsSnapshotTriggerTestScope( true ) { @@ -707,11 +732,15 @@ class AcsSnapshotTriggerTest ) } - def recordTimeRange(migrationId: Long, max: CantonTimestamp): Unit = { + def recordTimeRange( + migrationId: Long, + max: CantonTimestamp, + min: CantonTimestamp = CantonTimestamp.MinValue, + ): Unit = { when(updateHistory.getRecordTimeRange(eqTo(migrationId))(any[TraceContext])) .thenReturn( Future.successful( - Map(dummyDomain -> DomainRecordTimeRange(CantonTimestamp.MinValue, max)) + Map(dummyDomain -> DomainRecordTimeRange(min, max)) ) ) } diff --git a/apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/scan/store/ScanEventStoreTest.scala b/apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/scan/store/ScanEventStoreTest.scala index 45113a2db9..7ad8c797d5 100644 --- a/apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/scan/store/ScanEventStoreTest.scala +++ b/apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/scan/store/ScanEventStoreTest.scala @@ -827,7 +827,10 @@ class ScanEventStoreTest extends StoreTest with HasExecutionContext with SpliceP val participantId = mkParticipantId("ScanEventStoreTest") val uh = new UpdateHistory( storage.underlying, - new DomainMigrationInfo(migrationId, None), + new DomainMigrationInfo( + migrationId, + None, + ), "scan_event_store_test", participantId, dsoParty, diff --git a/apps/sv/src/main/openapi/sv-internal.yaml b/apps/sv/src/main/openapi/sv-internal.yaml index 277ab014ce..35b18706bb 100644 --- a/apps/sv/src/main/openapi/sv-internal.yaml +++ b/apps/sv/src/main/openapi/sv-internal.yaml @@ -648,7 +648,7 @@ components: minimum: 1 party_hint: description: | - Optional alias for the validator party. If provided, it is persisted in the + Optional alias for the validator party. If provided, it is persisted in the onboarding secret and stored in the database as the expected party identifier. type: string @@ -676,7 +676,7 @@ components: type: string party_hint: description: | - Alias of the validator party. + Alias of the validator party. Must match the persisted value in the onboarding secret. type: string @@ -1074,6 +1074,8 @@ components: type: array items: $ref: "#/components/schemas/Dar" + synchronizer_was_paused: + type: boolean Dar: type: object diff --git a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/migration/DomainDataSnapshot.scala b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/migration/DomainDataSnapshot.scala index 346494c36e..dc0c931ad0 100644 --- a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/migration/DomainDataSnapshot.scala +++ b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/migration/DomainDataSnapshot.scala @@ -18,6 +18,8 @@ final case class DomainDataSnapshot( acsSnapshot: ByteString, acsTimestamp: Instant, dars: Seq[Dar], + // true if we exported for a proper migration, false for DR. + synchronizerWasPaused: Boolean, ) extends PrettyPrinting { def toHttp: http.DomainDataSnapshot = http.DomainDataSnapshot( genesisState.map(s => Base64.getEncoder.encodeToString(s.toByteArray)), @@ -27,6 +29,7 @@ final case class DomainDataSnapshot( val content = Base64.getEncoder.encodeToString(dar.content.toByteArray) http.Dar(dar.mainPackageId, content) }.toVector, + synchronizerWasPaused = Some(synchronizerWasPaused), ) override def pretty: Pretty[DomainDataSnapshot.this.type] = @@ -36,6 +39,7 @@ final case class DomainDataSnapshot( param("acsSnapshotSize", _.acsSnapshot.size), param("acsTimestamp", _.acsTimestamp), param("darsSize", _.dars.size), + param("synchronizerWasPaused", _.synchronizerWasPaused), ) } @@ -74,6 +78,7 @@ object DomainDataSnapshot { acsSnapshot, acsTimestamp, dars, + src.synchronizerWasPaused.getOrElse(false), ) ) } diff --git a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/migration/DomainDataSnapshotGenerator.scala b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/migration/DomainDataSnapshotGenerator.scala index 4b497527d3..95a3abb180 100644 --- a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/migration/DomainDataSnapshotGenerator.scala +++ b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/migration/DomainDataSnapshotGenerator.scala @@ -67,6 +67,7 @@ class DomainDataSnapshotGenerator( acsSnapshot, acsTimestamp = timestamp, dars, + synchronizerWasPaused = false, ) // This is the safe version used for migrations that exports at the timestamp where we pause the synchronizer. @@ -146,6 +147,7 @@ class DomainDataSnapshotGenerator( acsSnapshot, acsTimestamp, dars, + synchronizerWasPaused = true, ) logger.info(show"Finished generating $result") result diff --git a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/onboarding/domainmigration/DomainMigrationInitializer.scala b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/onboarding/domainmigration/DomainMigrationInitializer.scala index f0a4498264..b68054cb09 100644 --- a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/onboarding/domainmigration/DomainMigrationInitializer.scala +++ b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/onboarding/domainmigration/DomainMigrationInitializer.scala @@ -21,6 +21,7 @@ import org.lfdecentralizedtrust.splice.identities.NodeIdentitiesDump import org.lfdecentralizedtrust.splice.migration.{ DomainDataRestorer, DomainMigrationInfo, + MigrationTimeInfo, ParticipantUsersDataRestorer, } import org.lfdecentralizedtrust.splice.store.{ @@ -165,8 +166,11 @@ class DomainMigrationInitializer( migrationInfo = DomainMigrationInfo( currentMigrationId = config.domainMigrationId, - acsRecordTime = Some( - CantonTimestamp.assertFromInstant(migrationDump.domainDataSnapshot.acsTimestamp) + migrationTimeInfo = Some( + MigrationTimeInfo( + CantonTimestamp.assertFromInstant(migrationDump.domainDataSnapshot.acsTimestamp), + synchronizerWasPaused = migrationDump.domainDataSnapshot.synchronizerWasPaused, + ) ), ) svStore = newSvStore(storeKey, migrationInfo, participantId) diff --git a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/onboarding/joining/JoiningNodeInitializer.scala b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/onboarding/joining/JoiningNodeInitializer.scala index 8e9a8a7558..2509c8979d 100644 --- a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/onboarding/joining/JoiningNodeInitializer.scala +++ b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/onboarding/joining/JoiningNodeInitializer.scala @@ -185,7 +185,7 @@ class JoiningNodeInitializer( migrationInfo = DomainMigrationInfo( currentMigrationId = config.domainMigrationId, - acsRecordTime = None, // This SV doesn't know about any migrations + migrationTimeInfo = None, // This SV doesn't know about any migrations ) svStore = newSvStore(storeKey, migrationInfo, participantId) dsoStore = newDsoStore(svStore.key, migrationInfo, participantId) diff --git a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/onboarding/sv1/SV1Initializer.scala b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/onboarding/sv1/SV1Initializer.scala index c64fec849f..6fe3297e4b 100644 --- a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/onboarding/sv1/SV1Initializer.scala +++ b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/onboarding/sv1/SV1Initializer.scala @@ -259,7 +259,7 @@ class SV1Initializer( migrationInfo = DomainMigrationInfo( currentMigrationId = config.domainMigrationId, // Note: not guaranteed to be 0 for sv1 - acsRecordTime = None, // No previous migration, we're starting the network + migrationTimeInfo = None, // No previous migration, we're starting the network ) svStore = newSvStore(storeKey, migrationInfo, participantId) dsoStore = newDsoStore(svStore.key, migrationInfo, participantId) diff --git a/apps/validator/src/main/openapi/validator-internal.yaml b/apps/validator/src/main/openapi/validator-internal.yaml index 6e2727bcca..bcfae2799b 100644 --- a/apps/validator/src/main/openapi/validator-internal.yaml +++ b/apps/validator/src/main/openapi/validator-internal.yaml @@ -775,6 +775,8 @@ components: type: string created_at: type: string + synchronizer_was_paused: + type: boolean GetValidatorDomainDataSnapshotResponse: type: object diff --git a/apps/validator/src/main/scala/org/lfdecentralizedtrust/splice/validator/ValidatorApp.scala b/apps/validator/src/main/scala/org/lfdecentralizedtrust/splice/validator/ValidatorApp.scala index 386844beac..7d51befd85 100644 --- a/apps/validator/src/main/scala/org/lfdecentralizedtrust/splice/validator/ValidatorApp.scala +++ b/apps/validator/src/main/scala/org/lfdecentralizedtrust/splice/validator/ValidatorApp.scala @@ -27,6 +27,7 @@ import org.lfdecentralizedtrust.splice.identities.NodeIdentitiesStore import org.lfdecentralizedtrust.splice.migration.{ DomainDataRestorer, DomainMigrationInfo, + MigrationTimeInfo, ParticipantUsersDataRestorer, } import org.lfdecentralizedtrust.splice.scan.admin.api.client @@ -438,26 +439,27 @@ class ValidatorApp( } } yield initialSynchronizerTime - private def readRestoreDump = config.restoreFromMigrationDump.map { path => - if (config.svValidator) - throw Status.INVALID_ARGUMENT - .withDescription("SV Validator should not be configured with a dump file") - .asRuntimeException() + private def readRestoreDump: Option[DomainMigrationDump] = config.restoreFromMigrationDump.map { + path => + if (config.svValidator) + throw Status.INVALID_ARGUMENT + .withDescription("SV Validator should not be configured with a dump file") + .asRuntimeException() - val migrationDump = BackupDump.readFromPath[DomainMigrationDump](path) match { - case Failure(exception) => + val migrationDump = BackupDump.readFromPath[DomainMigrationDump](path) match { + case Failure(exception) => + throw Status.INVALID_ARGUMENT + .withDescription(s"Failed to read migration dump from $path: ${exception.getMessage}") + .asRuntimeException() + case Success(value) => value + } + if (migrationDump.migrationId != config.domainMigrationId) throw Status.INVALID_ARGUMENT - .withDescription(s"Failed to read migration dump from $path: ${exception.getMessage}") + .withDescription( + s"Migration id from the dump ${migrationDump.migrationId} does not match the configured migration id in the validator ${config.domainMigrationId}. Please check if the validator app is configured with the correct migration id" + ) .asRuntimeException() - case Success(value) => value - } - if (migrationDump.migrationId != config.domainMigrationId) - throw Status.INVALID_ARGUMENT - .withDescription( - s"Migration id from the dump ${migrationDump.migrationId} does not match the configured migration id in the validator ${config.domainMigrationId}. Please check if the validator app is configured with the correct migration id" - ) - .asRuntimeException() - migrationDump + migrationDump } private def getAcsSnapshotFromSingleScan( @@ -737,13 +739,17 @@ class ValidatorApp( ) } } else { - val acsTimestamp = - readRestoreDump.map(dump => CantonTimestamp.assertFromInstant(dump.acsTimestamp)) + val dump = readRestoreDump Future.successful( // TODO(DACH-NY/canton-network-node#9731): get migration id from sponsor sv / scan instead of configuring here DomainMigrationInfo( config.domainMigrationId, - acsTimestamp, + dump.map(d => + MigrationTimeInfo( + CantonTimestamp.assertFromInstant(d.acsTimestamp), + d.synchronizerWasPaused, + ) + ), ) ) } diff --git a/apps/validator/src/main/scala/org/lfdecentralizedtrust/splice/validator/migration/DomainMigrationDump.scala b/apps/validator/src/main/scala/org/lfdecentralizedtrust/splice/validator/migration/DomainMigrationDump.scala index 3f8324b2d2..aefa110edd 100644 --- a/apps/validator/src/main/scala/org/lfdecentralizedtrust/splice/validator/migration/DomainMigrationDump.scala +++ b/apps/validator/src/main/scala/org/lfdecentralizedtrust/splice/validator/migration/DomainMigrationDump.scala @@ -24,6 +24,8 @@ final case class DomainMigrationDump( acsTimestamp: Instant, dars: Seq[Dar], createdAt: Instant, + // true if we exported for a proper migration, false for DR. + synchronizerWasPaused: Boolean, ) extends PrettyPrinting { override def pretty: Pretty[DomainMigrationDump.this.type] = Pretty.prettyNode( @@ -36,6 +38,7 @@ final case class DomainMigrationDump( param("acsTimestamp", _.acsTimestamp), param("darsSize", _.dars.size), param("createdAt", _.createdAt), + param("synchronizerWasPaused", _.synchronizerWasPaused), ) def toHttp: http.DomainMigrationDump = http.DomainMigrationDump( @@ -50,6 +53,7 @@ final case class DomainMigrationDump( migrationId = migrationId, domainId = domainId.toProtoPrimitive, createdAt = createdAt.toString, + synchronizerWasPaused = Some(synchronizerWasPaused), ) } @@ -87,5 +91,6 @@ object DomainMigrationDump { acsTimestamp = Instant.parse(response.acsTimestamp), dars = dars, createdAt = createdAt, + synchronizerWasPaused = response.synchronizerWasPaused.getOrElse(false), ) } diff --git a/apps/validator/src/main/scala/org/lfdecentralizedtrust/splice/validator/migration/DomainMigrationDumpGenerator.scala b/apps/validator/src/main/scala/org/lfdecentralizedtrust/splice/validator/migration/DomainMigrationDumpGenerator.scala index c7b88485b2..fa57697b77 100644 --- a/apps/validator/src/main/scala/org/lfdecentralizedtrust/splice/validator/migration/DomainMigrationDumpGenerator.scala +++ b/apps/validator/src/main/scala/org/lfdecentralizedtrust/splice/validator/migration/DomainMigrationDumpGenerator.scala @@ -39,6 +39,7 @@ class DomainMigrationDumpGenerator( private val darExporter = new DarExporter(participantConnection) private val participantUsersDataExporter = new ParticipantUsersDataExporter(ledgerConnection) + // This is the safe option used for migrations def generateDomainDump( migrationId: Long, domain: SynchronizerId, @@ -67,6 +68,7 @@ class DomainMigrationDumpGenerator( acsTimestamp = acsTimestamp, dars = dars, createdAt = createdAt, + synchronizerWasPaused = true, ) logger.info( show"Finished generating $result" @@ -75,6 +77,7 @@ class DomainMigrationDumpGenerator( } } + // This is the safe option used for DR def getDomainDataSnapshot( timestamp: Instant, domain: SynchronizerId, @@ -111,6 +114,7 @@ class DomainMigrationDumpGenerator( acsTimestamp = timestamp, dars = dars, createdAt = Instant.now(), + synchronizerWasPaused = false, ) } }