Skip to content

Commit 59fc489

Browse files
committed
Prevent txlog backfill from including ACS imports
[ci] Signed-off-by: Oriol Muñoz <oriol.munoz@digitalasset.com>
1 parent fdb156a commit 59fc489

File tree

8 files changed

+36
-21
lines changed

8 files changed

+36
-21
lines changed

apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/DecentralizedSynchronizerMigrationIntegrationTest.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1091,7 +1091,8 @@ class DecentralizedSynchronizerMigrationIntegrationTest
10911091

10921092
withClueAndLog("Backfilled history includes ACS import") {
10931093
eventually() {
1094-
sv1ScanLocalBackend.appState.store.updateHistory.sourceHistory
1094+
sv1ScanLocalBackend.appState.store.updateHistory
1095+
.sourceHistory(excludeAcsImportUpdates = false)
10951096
.migrationInfo(1L)
10961097
.futureValue
10971098
.exists(_.complete) should be(true)

apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/TxLogBackfilling.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ final class TxLogBackfilling(
1919
) extends NamedLogging {
2020

2121
private val currentMigrationId = updateHistory.domainMigrationInfo.currentMigrationId
22-
private val sourceHistory = updateHistory.sourceHistory
22+
// ACS import updates should not be included in txlog
23+
private val sourceHistory = updateHistory.sourceHistory(excludeAcsImportUpdates = true)
2324
private val destinationHistory = store.destinationHistory
2425
private val backfilling =
2526
new HistoryBackfilling(

apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/UpdateHistory.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1959,7 +1959,9 @@ class UpdateHistory(
19591959
storage.query(readOffsetAction(), "readOffset")
19601960
}
19611961

1962-
lazy val sourceHistory: HistoryBackfilling.SourceHistory[UpdateHistoryResponse] =
1962+
def sourceHistory(
1963+
excludeAcsImportUpdates: Boolean
1964+
): HistoryBackfilling.SourceHistory[UpdateHistoryResponse] =
19631965
new HistoryBackfilling.SourceHistory[UpdateHistoryResponse] {
19641966
override def isReady: Boolean = state
19651967
.get()
@@ -2019,7 +2021,8 @@ class UpdateHistory(
20192021
migrationId = migrationId,
20202022
synchronizerId = synchronizerId,
20212023
beforeRecordTime = before,
2022-
atOrAfterRecordTime = None,
2024+
atOrAfterRecordTime =
2025+
Option.when(excludeAcsImportUpdates)(CantonTimestamp.MinValue.plusSeconds(1L)),
20232026
limit = PageLimit.tryCreate(count),
20242027
).map(_.map(_.update))
20252028
}

apps/common/src/test/scala/org/lfdecentralizedtrust/splice/store/UpdateHistoryBackfillingTest.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ class UpdateHistoryBackfillingTest extends UpdateHistoryTestBase {
6666
None,
6767
PageLimit.tryCreate(1000),
6868
)
69-
infoB2 <- storeB2.sourceHistory.migrationInfo(0)
69+
infoB2 <- storeB2.sourceHistory(excludeAcsImportUpdates = false).migrationInfo(0)
7070
} yield {
7171
infoB2.value.complete shouldBe true
7272
updatesA.map(_.update.update.updateId) should contain theSameElementsAs updatesB.map(
@@ -88,7 +88,7 @@ class UpdateHistoryBackfillingTest extends UpdateHistoryTestBase {
8888
_ <- create(domain2, validContractId(2), validOffset(2), party1, storeA0, time(2))
8989
// If the store doesn't need backfilling, it should return the correct info
9090
// without explicit initialization of backfilling
91-
infoS <- storeA0.sourceHistory.migrationInfo(13)
91+
infoS <- storeA0.sourceHistory(excludeAcsImportUpdates = false).migrationInfo(13)
9292
} yield {
9393
infoS.value.complete shouldBe true
9494
infoS.value.recordTimeRange shouldBe Map(
@@ -107,11 +107,11 @@ class UpdateHistoryBackfillingTest extends UpdateHistoryTestBase {
107107
tx1 <- create(domain1, validContractId(1), validOffset(1), party1, storeA0, time(1))
108108
_ <- create(domain2, validContractId(2), validOffset(2), party1, storeA0, time(2))
109109
// Before initializing backfilling, it should not return any data
110-
infoS1 <- storeA0.sourceHistory.migrationInfo(13)
110+
infoS1 <- storeA0.sourceHistory(excludeAcsImportUpdates = false).migrationInfo(13)
111111
infoD1 <- storeA0.destinationHistory.backfillingInfo
112112
// After initializing backfilling, it should return the correct data
113113
_ <- storeA0.initializeBackfilling(13, domain1, tx1.getUpdateId, complete = true)
114-
infoS2 <- storeA0.sourceHistory.migrationInfo(13)
114+
infoS2 <- storeA0.sourceHistory(excludeAcsImportUpdates = false).migrationInfo(13)
115115
infoD2 <- storeA0.destinationHistory.backfillingInfo
116116
} yield {
117117
infoS1 shouldBe None
@@ -128,9 +128,9 @@ class UpdateHistoryBackfillingTest extends UpdateHistoryTestBase {
128128
tx1 <- create(domain1, validContractId(1), validOffset(1), party1, storeA0, time(1))
129129
_ <- create(domain2, validContractId(2), validOffset(2), party1, storeA0, time(2))
130130
_ <- storeA0.initializeBackfilling(13, domain1, tx1.getUpdateId, complete = true)
131-
info12 <- storeA0.sourceHistory.migrationInfo(12)
132-
info13 <- storeA0.sourceHistory.migrationInfo(13)
133-
info14 <- storeA0.sourceHistory.migrationInfo(14)
131+
info12 <- storeA0.sourceHistory(excludeAcsImportUpdates = false).migrationInfo(12)
132+
info13 <- storeA0.sourceHistory(excludeAcsImportUpdates = false).migrationInfo(13)
133+
info14 <- storeA0.sourceHistory(excludeAcsImportUpdates = false).migrationInfo(14)
134134
} yield {
135135
info12 shouldBe None
136136
inside(info13) { case Some(s: SourceMigrationInfo) =>
@@ -341,7 +341,7 @@ class UpdateHistoryBackfillingTest extends UpdateHistoryTestBase {
341341
) =
342342
new HistoryBackfilling[UpdateHistoryResponse](
343343
destination.destinationHistory,
344-
source.sourceHistory,
344+
source.sourceHistory(excludeAcsImportUpdates = false),
345345
latestMigrationId,
346346
batchSize = 10,
347347
loggerFactory = loggerFactory,

apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/admin/http/HttpScanHandler.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1816,7 +1816,7 @@ class HttpScanHandler(
18161816
)(extracted: TraceContext): Future[ScanResource.GetMigrationInfoResponse] = {
18171817
implicit val tc = extracted
18181818
withSpan(s"$workflowId.getMigrationInfo") { _ => _ =>
1819-
val sourceHistory = store.updateHistory.sourceHistory
1819+
val sourceHistory = store.updateHistory.sourceHistory(excludeAcsImportUpdates = false)
18201820
for {
18211821
infoO <- sourceHistory.migrationInfo(body.migrationId)
18221822
} yield infoO match {

apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/automation/AcsSnapshotTrigger.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,8 @@ class AcsSnapshotTrigger(
6565
* And also for past migrations, whether the SV was present in them or not.
6666
*/
6767
private def isHistoryBackfilled(migrationId: Long)(implicit tc: TraceContext) = {
68-
updateHistory.sourceHistory
68+
updateHistory
69+
.sourceHistory(excludeAcsImportUpdates = false)
6970
.migrationInfo(migrationId)
7071
.map(_.exists(i => i.complete && i.importUpdatesComplete))
7172
}

apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/scan/automation/AcsSnapshotTriggerTest.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -598,7 +598,7 @@ class AcsSnapshotTriggerTest
598598
)
599599
)
600600
)
601-
when(updateHistory.sourceHistory).thenReturn(sourceHistory)
601+
when(updateHistory.sourceHistory(anyBoolean)).thenReturn(sourceHistory)
602602
when(updateHistory.getPreviousMigrationId(anyLong)(any[TraceContext])).thenAnswer { (n: Long) =>
603603
Future.successful(n match {
604604
case 0L => None
@@ -667,7 +667,9 @@ class AcsSnapshotTriggerTest
667667

668668
def historyBackfilled(migrationId: Long, complete: Boolean): Unit = {
669669
when(
670-
updateHistory.sourceHistory.migrationInfo(eqTo(migrationId))(any[TraceContext])
670+
updateHistory
671+
.sourceHistory(excludeAcsImportUpdates = false)
672+
.migrationInfo(eqTo(migrationId))(any[TraceContext])
671673
)
672674
.thenReturn(
673675
Future.successful(
@@ -690,7 +692,9 @@ class AcsSnapshotTriggerTest
690692
importUpdatesComplete: Boolean,
691693
): Unit = {
692694
when(
693-
updateHistory.sourceHistory.migrationInfo(eqTo(migrationId))(any[TraceContext])
695+
updateHistory
696+
.sourceHistory(excludeAcsImportUpdates = false)
697+
.migrationInfo(eqTo(migrationId))(any[TraceContext])
694698
)
695699
.thenReturn(
696700
Future.successful(

apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/store/ScanHistoryBackfillingTest.scala

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ class ScanHistoryBackfillingTest extends UpdateHistoryTestBase {
3030
testData.destinationHistory,
3131
Map(domain1 -> time(0), domain2 -> time(0)),
3232
)
33-
backfillingComplete <- testData.destinationHistory.sourceHistory
33+
backfillingComplete <- testData.destinationHistory
34+
.sourceHistory(excludeAcsImportUpdates = false)
3435
.migrationInfo(0)
3536
.map(_.value.complete)
3637
// Check that the updates are the same
@@ -101,7 +102,8 @@ class ScanHistoryBackfillingTest extends UpdateHistoryTestBase {
101102
testData.destinationHistory,
102103
Map(domain1 -> time(5), domain2 -> time(5)),
103104
)
104-
migrationInfo0 <- testData.destinationHistory.sourceHistory
105+
migrationInfo0 <- testData.destinationHistory
106+
.sourceHistory(excludeAcsImportUpdates = false)
105107
.migrationInfo(0)
106108
updatesB1 <- testData.destinationHistory.getAllUpdates(
107109
None,
@@ -114,7 +116,8 @@ class ScanHistoryBackfillingTest extends UpdateHistoryTestBase {
114116
testData.destinationHistory,
115117
Map(domain1 -> time(0), domain2 -> time(0)),
116118
)
117-
backfillingComplete2 <- testData.destinationHistory.sourceHistory
119+
backfillingComplete2 <- testData.destinationHistory
120+
.sourceHistory(excludeAcsImportUpdates = false)
118121
.migrationInfo(0)
119122
.map(_.value.complete)
120123

@@ -306,7 +309,9 @@ class ScanHistoryBackfillingTest extends UpdateHistoryTestBase {
306309
migrationId: Long
307310
)(implicit tc: TraceContext): Future[Option[SourceMigrationInfo]] =
308311
for {
309-
original <- history.sourceHistory.migrationInfo(migrationId)(tc)
312+
original <- history
313+
.sourceHistory(excludeAcsImportUpdates = false)
314+
.migrationInfo(migrationId)(tc)
310315
filteredRange = original.map(
311316
_.recordTimeRange.toList
312317
.flatMap { case (k, v) =>

0 commit comments

Comments
 (0)