Skip to content

Commit 2557922

Browse files
OriolMunoz-dacocreature
authored andcommitted
Fix ACS snapshots on short-lived migrations (#2781)
[ci] --------- Signed-off-by: Oriol Muñoz <oriol.munoz@digitalasset.com> Signed-off-by: Moritz Kiefer <moritz.kiefer@purelyfunctional.org>
1 parent c44806b commit 2557922

File tree

2 files changed

+77
-22
lines changed

2 files changed

+77
-22
lines changed

apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/automation/AcsSnapshotTrigger.scala

Lines changed: 46 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,26 @@ class AcsSnapshotTrigger(
167167
): Future[Option[AcsSnapshotTrigger.Task]] = {
168168
for {
169169
migrationRecordTimeRange <- updateHistory.getRecordTimeRange(migrationIdToBackfill)
170+
maxTime = migrationRecordTimeRange
171+
.map(_._2.max)
172+
.maxOption
173+
.getOrElse(
174+
throw new IllegalStateException(
175+
s"SynchronizerId with no data in $migrationRecordTimeRange"
176+
)
177+
)
178+
minTime = migrationRecordTimeRange
179+
.map(_._2.min)
180+
.minOption
181+
.getOrElse(
182+
throw new IllegalStateException(
183+
s"SynchronizerId with no data in $migrationRecordTimeRange"
184+
)
185+
)
186+
firstSnapshotTime = computeFirstSnapshotTime(minTime)
187+
migrationLastedLongEnough = firstSnapshotTime
188+
.plus(Duration.ofHours(snapshotPeriodHours.toLong))
189+
.isBefore(maxTime)
170190
latestSnapshot <- store
171191
.lookupSnapshotBefore(migrationIdToBackfill, CantonTimestamp.MaxValue)
172192
task <- latestSnapshot match {
@@ -176,14 +196,7 @@ class AcsSnapshotTrigger(
176196
case Some(snapshot)
177197
if snapshot.snapshotRecordTime.plus(
178198
Duration.ofHours(snapshotPeriodHours.toLong)
179-
) > migrationRecordTimeRange
180-
.map(_._2.max)
181-
.maxOption
182-
.getOrElse(
183-
throw new IllegalStateException(
184-
s"SynchronizerId with no data in $migrationRecordTimeRange"
185-
)
186-
) =>
199+
) > maxTime =>
187200
logger.info(
188201
s"Backfilling of migration id $migrationIdToBackfill is complete. Trying with next oldest."
189202
)
@@ -196,6 +209,12 @@ class AcsSnapshotTrigger(
196209
.Task(nextSnapshotTime(snapshot), migrationIdToBackfill, Some(snapshot))
197210
)
198211
)
212+
case None if !migrationLastedLongEnough =>
213+
logger.info(
214+
s"Migration id $migrationIdToBackfill didn't last more than $snapshotPeriodHours hours (from $minTime to $maxTime), so it won't have any snapshots."
215+
)
216+
lastCompleteBackfilledMigrationId.set(Right(migrationIdToBackfill))
217+
taskToContinueBackfillingACSSnapshots().value
199218
case None =>
200219
firstSnapshotForMigrationIdTask(migrationIdToBackfill)
201220
}
@@ -249,20 +268,27 @@ class AcsSnapshotTrigger(
249268
logger.info(s"No updates other than ACS imports found. Retrying snapshot creation later.")
250269
None
251270
case Some(firstNonAcsImport) =>
252-
val firstNonAcsImportRecordTime =
253-
firstNonAcsImport.update.update.recordTime.toInstant.atOffset(ZoneOffset.UTC)
254-
val (hourForSnapshot, plusDays) = timesToDoSnapshot
255-
.find(_ > firstNonAcsImportRecordTime.get(ChronoField.HOUR_OF_DAY)) match {
256-
case Some(hour) => hour -> 0 // current day at hour
257-
case None => 0 -> 1 // next day at 00:00
258-
}
259-
val until = firstNonAcsImportRecordTime.toLocalDate
260-
.plusDays(plusDays.toLong)
261-
.atTime(hourForSnapshot, 0)
262-
.toInstant(ZoneOffset.UTC)
263-
Some(AcsSnapshotTrigger.Task(CantonTimestamp.assertFromInstant(until), migrationId, None))
271+
val firstNonAcsImportRecordTime = firstNonAcsImport.update.update.recordTime
272+
Some(
273+
AcsSnapshotTrigger
274+
.Task(computeFirstSnapshotTime(firstNonAcsImportRecordTime), migrationId, None)
275+
)
264276
}
265277
}
278+
279+
private def computeFirstSnapshotTime(firstNonAcsImportRecordTime: CantonTimestamp) = {
280+
val firstUpdateUTCTime = firstNonAcsImportRecordTime.toInstant.atOffset(ZoneOffset.UTC)
281+
val (hourForSnapshot, plusDays) = timesToDoSnapshot
282+
.find(_ > firstUpdateUTCTime.get(ChronoField.HOUR_OF_DAY)) match {
283+
case Some(hour) => hour -> 0 // current day at hour
284+
case None => 0 -> 1 // next day at 00:00
285+
}
286+
val until = firstUpdateUTCTime.toLocalDate
287+
.plusDays(plusDays.toLong)
288+
.atTime(hourForSnapshot, 0)
289+
.toInstant(ZoneOffset.UTC)
290+
CantonTimestamp.assertFromInstant(until)
291+
}
266292
}
267293

268294
object AcsSnapshotTrigger {

apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/scan/automation/AcsSnapshotTriggerTest.scala

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,31 @@ class AcsSnapshotTriggerTest
346346
trigger.isDoneBackfillingAcsSnapshots should be(false)
347347
}
348348

349+
"return no task if the previous migration id is backfilled but didn't last long enough" in new AcsSnapshotTriggerTestScope(
350+
true,
351+
currentMigrationId = 1L,
352+
) {
353+
// no snapshot needed for the current migration id
354+
historyBackfilled(currentMigrationId, complete = true)
355+
previousSnapshot(now.minusSeconds(60L), currentMigrationId)
356+
357+
// therefore we attempt to backfill the previous migration id,
358+
// which is backfilled
359+
val previousMigrationId = currentMigrationId - 1
360+
historyBackfilled(previousMigrationId, complete = true)
361+
// last update happened shortly after the end
362+
recordTimeRange(
363+
previousMigrationId,
364+
// didn't last enough
365+
min = CantonTimestamp.MinValue.plusSeconds(3600L - 1),
366+
max = CantonTimestamp.MinValue.plusSeconds(3600L + 600),
367+
)
368+
noPreviousSnapshot(previousMigrationId)
369+
370+
trigger.retrieveTasks().futureValue should be(Seq.empty)
371+
trigger.isDoneBackfillingAcsSnapshots should be(true)
372+
}
373+
349374
"return the first task to backfill the previous migration id" in new AcsSnapshotTriggerTestScope(
350375
true
351376
) {
@@ -707,11 +732,15 @@ class AcsSnapshotTriggerTest
707732
)
708733
}
709734

710-
def recordTimeRange(migrationId: Long, max: CantonTimestamp): Unit = {
735+
def recordTimeRange(
736+
migrationId: Long,
737+
max: CantonTimestamp,
738+
min: CantonTimestamp = CantonTimestamp.MinValue,
739+
): Unit = {
711740
when(updateHistory.getRecordTimeRange(eqTo(migrationId))(any[TraceContext]))
712741
.thenReturn(
713742
Future.successful(
714-
Map(dummyDomain -> DomainRecordTimeRange(CantonTimestamp.MinValue, max))
743+
Map(dummyDomain -> DomainRecordTimeRange(min, max))
715744
)
716745
)
717746
}

0 commit comments

Comments
 (0)