Skip to content

Commit 60a163a

Browse files
authored
adapt bulk storage to fixed ACS semantics (#3551)
Signed-off-by: Itai Segall <itai.segall@digitalasset.com>
1 parent 565d699 commit 60a163a

File tree

3 files changed

+19
-25
lines changed

3 files changed

+19
-25
lines changed

apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/UpdateHistory.scala

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1031,17 +1031,15 @@ class UpdateHistory(
10311031
private def afterFilters(
10321032
afterO: Option[(Long, CantonTimestamp)],
10331033
includeImportUpdates: Boolean,
1034-
afterIsInclusive: Boolean,
10351034
): NonEmptyList[SQLActionBuilder] = {
10361035
val gtMin = if (includeImportUpdates) ">=" else ">"
1037-
val gtAfter = if (afterIsInclusive) ">=" else ">"
10381036
afterO match {
10391037
case None =>
10401038
NonEmptyList.of(sql"migration_id >= 0 and record_time #$gtMin ${CantonTimestamp.MinValue}")
10411039
case Some((afterMigrationId, afterRecordTime)) =>
10421040
// This makes it so that the two queries use updt_hist_tran_hi_mi_rt_di,
10431041
NonEmptyList.of(
1044-
sql"migration_id = ${afterMigrationId} and record_time #$gtAfter ${afterRecordTime} ",
1042+
sql"migration_id = ${afterMigrationId} and record_time > ${afterRecordTime} ",
10451043
sql"migration_id > ${afterMigrationId} and record_time #$gtMin ${CantonTimestamp.MinValue}",
10461044
)
10471045
}
@@ -1227,9 +1225,8 @@ class UpdateHistory(
12271225
def getUpdatesWithoutImportUpdates(
12281226
afterO: Option[(Long, CantonTimestamp)],
12291227
limit: Limit,
1230-
afterIsInclusive: Boolean,
12311228
)(implicit tc: TraceContext): Future[Seq[TreeUpdateWithMigrationId]] = {
1232-
val filters = afterFilters(afterO, includeImportUpdates = false, afterIsInclusive)
1229+
val filters = afterFilters(afterO, includeImportUpdates = false)
12331230
val orderBy = sql"migration_id, record_time, domain_id"
12341231
for {
12351232
txs <- getTxUpdates(filters, orderBy, limit)
@@ -1240,17 +1237,11 @@ class UpdateHistory(
12401237
}
12411238
}
12421239

1243-
def getUpdatesWithoutImportUpdates(
1244-
afterO: Option[(Long, CantonTimestamp)],
1245-
limit: Limit,
1246-
)(implicit tc: TraceContext): Future[Seq[TreeUpdateWithMigrationId]] =
1247-
getUpdatesWithoutImportUpdates(afterO, limit, false)
1248-
12491240
def getAllUpdates(
12501241
afterO: Option[(Long, CantonTimestamp)],
12511242
limit: PageLimit,
12521243
)(implicit tc: TraceContext): Future[Seq[TreeUpdateWithMigrationId]] = {
1253-
val filters = afterFilters(afterO, includeImportUpdates = true, afterIsInclusive = false)
1244+
val filters = afterFilters(afterO, includeImportUpdates = true)
12541245
// With import updates, we have to include the update id to get a deterministic order.
12551246
// We don't have an index for this order, but this is only used in test code and deprecated scan endpoints.
12561247
val orderBy = sql"migration_id, record_time, domain_id, update_id"

apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/store/bulk/UpdateHistorySegmentBulkStorage.scala

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -112,20 +112,17 @@ class UpdateHistorySegmentBulkStorage(
112112
updateHistory.getUpdatesWithoutImportUpdates(
113113
Some((fromMigrationId, fromTimestamp)),
114114
HardLimit.tryCreate(config.dbReadChunkSize),
115-
afterIsInclusive = true,
116115
)
117116
)(after =>
118117
updateHistory.getUpdatesWithoutImportUpdates(
119118
Some((after._1, after._2)),
120119
HardLimit.tryCreate(config.dbReadChunkSize),
121-
afterIsInclusive = false,
122120
)
123121
)
124122

125-
// TODO(#3429): Figure out the < vs <= issue
126123
updatesInSegment = updates.filter(update =>
127124
update.migrationId < toMigrationId ||
128-
update.migrationId == toMigrationId && update.update.update.recordTime < toTimestamp
125+
update.migrationId == toMigrationId && update.update.update.recordTime <= toTimestamp
129126
)
130127
_ <-
131128
if (updatesInSegment.length < updates.length || updates.length == config.dbReadChunkSize) {

apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/scan/store/UpdateHistoryBulkStorageTest.scala

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -47,16 +47,21 @@ class UpdateHistoryBulkStorageTest
4747
withS3Mock {
4848
val initialStoreSize = 1500
4949
val segmentSize = 2200L
50+
val segmentFromTimestamp = 100L
5051
val mockStore = new MockUpdateHistoryStore(initialStoreSize)
5152
val bucketConnection = getS3BucketConnection(loggerFactory)
53+
val fromTimestamp =
54+
CantonTimestamp.tryFromInstant(Instant.ofEpochMilli(segmentFromTimestamp))
55+
val toTimestamp =
56+
CantonTimestamp.tryFromInstant(Instant.ofEpochMilli(segmentFromTimestamp + segmentSize))
5257
val segment = new UpdateHistorySegmentBulkStorage(
5358
bulkStorageTestConfig,
5459
mockStore.store,
5560
bucketConnection,
5661
0,
57-
CantonTimestamp.tryFromInstant(Instant.ofEpochMilli(0)),
62+
fromTimestamp,
5863
0,
59-
CantonTimestamp.tryFromInstant(Instant.ofEpochMilli(segmentSize)),
64+
toTimestamp,
6065
loggerFactory,
6166
)
6267
clue(
@@ -85,8 +90,11 @@ class UpdateHistoryBulkStorageTest
8590
.asScala
8691
allUpdates <- mockStore.store.getUpdatesWithoutImportUpdates(
8792
None,
88-
HardLimit.tryCreate(segmentSize.toInt, segmentSize.toInt),
89-
afterIsInclusive = true,
93+
HardLimit.tryCreate(segmentSize.toInt * 2, segmentSize.toInt * 2),
94+
)
95+
segmentUpdates = allUpdates.filter(update =>
96+
update.update.update.recordTime > fromTimestamp &&
97+
update.update.update.recordTime <= toTimestamp
9098
)
9199
} yield {
92100
val objectKeys = s3Objects.contents.asScala.sortBy(_.key())
@@ -95,10 +103,11 @@ class UpdateHistoryBulkStorageTest
95103
val allUpdatesFromS3 = objectKeys.flatMap(
96104
readUncompressAndDecode(bucketConnection, io.circe.parser.decode[UpdateHistoryItemV2])
97105
)
106+
allUpdatesFromS3.length shouldBe segmentUpdates.length
98107
allUpdatesFromS3
99108
.map(
100109
CompactJsonScanHttpEncodingsWithFieldLabels().httpToLapiUpdate
101-
) should contain theSameElementsInOrderAs allUpdates
110+
) should contain theSameElementsInOrderAs segmentUpdates
102111
}
103112
}
104113
}
@@ -120,17 +129,14 @@ class UpdateHistoryBulkStorageTest
120129
store.getUpdatesWithoutImportUpdates(
121130
any[Option[(Long, CantonTimestamp)]],
122131
any[Limit],
123-
anyBoolean,
124132
)(any[TraceContext])
125133
).thenAnswer {
126134
(
127135
afterO: Option[(Long, CantonTimestamp)],
128136
limit: Limit,
129-
afterIsInclusive: Boolean,
130137
) =>
131138
Future {
132-
val afterIdx = afterO.map { case (_, t) => t.toEpochMilli }.getOrElse(0L)
133-
val fromIdx = if (afterIsInclusive) afterIdx else afterIdx + 1
139+
val fromIdx = afterO.map { case (_, t) => t.toEpochMilli }.getOrElse(0L) + 1
134140
val remaining = storeSize - fromIdx
135141
val numElems = math.min(limit.limit.toLong, remaining)
136142
Seq

0 commit comments

Comments
 (0)