Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1031,17 +1031,15 @@ class UpdateHistory(
private def afterFilters(
afterO: Option[(Long, CantonTimestamp)],
includeImportUpdates: Boolean,
afterIsInclusive: Boolean,
): NonEmptyList[SQLActionBuilder] = {
val gtMin = if (includeImportUpdates) ">=" else ">"
val gtAfter = if (afterIsInclusive) ">=" else ">"
afterO match {
case None =>
NonEmptyList.of(sql"migration_id >= 0 and record_time #$gtMin ${CantonTimestamp.MinValue}")
case Some((afterMigrationId, afterRecordTime)) =>
// This makes it so that the two queries use updt_hist_tran_hi_mi_rt_di,
NonEmptyList.of(
sql"migration_id = ${afterMigrationId} and record_time #$gtAfter ${afterRecordTime} ",
sql"migration_id = ${afterMigrationId} and record_time > ${afterRecordTime} ",
sql"migration_id > ${afterMigrationId} and record_time #$gtMin ${CantonTimestamp.MinValue}",
)
}
Expand Down Expand Up @@ -1227,9 +1225,8 @@ class UpdateHistory(
def getUpdatesWithoutImportUpdates(
afterO: Option[(Long, CantonTimestamp)],
limit: Limit,
afterIsInclusive: Boolean,
)(implicit tc: TraceContext): Future[Seq[TreeUpdateWithMigrationId]] = {
val filters = afterFilters(afterO, includeImportUpdates = false, afterIsInclusive)
val filters = afterFilters(afterO, includeImportUpdates = false)
val orderBy = sql"migration_id, record_time, domain_id"
for {
txs <- getTxUpdates(filters, orderBy, limit)
Expand All @@ -1240,17 +1237,11 @@ class UpdateHistory(
}
}

def getUpdatesWithoutImportUpdates(
afterO: Option[(Long, CantonTimestamp)],
limit: Limit,
)(implicit tc: TraceContext): Future[Seq[TreeUpdateWithMigrationId]] =
getUpdatesWithoutImportUpdates(afterO, limit, false)

def getAllUpdates(
afterO: Option[(Long, CantonTimestamp)],
limit: PageLimit,
)(implicit tc: TraceContext): Future[Seq[TreeUpdateWithMigrationId]] = {
val filters = afterFilters(afterO, includeImportUpdates = true, afterIsInclusive = false)
val filters = afterFilters(afterO, includeImportUpdates = true)
// With import updates, we have to include the update id to get a deterministic order.
// We don't have an index for this order, but this is only used in test code and deprecated scan endpoints.
val orderBy = sql"migration_id, record_time, domain_id, update_id"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,20 +112,17 @@ class UpdateHistorySegmentBulkStorage(
updateHistory.getUpdatesWithoutImportUpdates(
Some((fromMigrationId, fromTimestamp)),
HardLimit.tryCreate(config.dbReadChunkSize),
afterIsInclusive = true,
)
)(after =>
updateHistory.getUpdatesWithoutImportUpdates(
Some((after._1, after._2)),
HardLimit.tryCreate(config.dbReadChunkSize),
afterIsInclusive = false,
)
)

// TODO(#3429): Figure out the < vs <= issue
updatesInSegment = updates.filter(update =>
update.migrationId < toMigrationId ||
update.migrationId == toMigrationId && update.update.update.recordTime < toTimestamp
update.migrationId == toMigrationId && update.update.update.recordTime <= toTimestamp
)
_ <-
if (updatesInSegment.length < updates.length || updates.length == config.dbReadChunkSize) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,21 @@ class UpdateHistoryBulkStorageTest
withS3Mock {
val initialStoreSize = 1500
val segmentSize = 2200L
val segmentFromTimestamp = 100L
val mockStore = new MockUpdateHistoryStore(initialStoreSize)
val bucketConnection = getS3BucketConnection(loggerFactory)
val fromTimestamp =
CantonTimestamp.tryFromInstant(Instant.ofEpochMilli(segmentFromTimestamp))
val toTimestamp =
CantonTimestamp.tryFromInstant(Instant.ofEpochMilli(segmentFromTimestamp + segmentSize))
val segment = new UpdateHistorySegmentBulkStorage(
bulkStorageTestConfig,
mockStore.store,
bucketConnection,
0,
CantonTimestamp.tryFromInstant(Instant.ofEpochMilli(0)),
fromTimestamp,
0,
CantonTimestamp.tryFromInstant(Instant.ofEpochMilli(segmentSize)),
toTimestamp,
loggerFactory,
)
clue(
Expand Down Expand Up @@ -85,8 +90,11 @@ class UpdateHistoryBulkStorageTest
.asScala
allUpdates <- mockStore.store.getUpdatesWithoutImportUpdates(
None,
HardLimit.tryCreate(segmentSize.toInt, segmentSize.toInt),
afterIsInclusive = true,
HardLimit.tryCreate(segmentSize.toInt * 2, segmentSize.toInt * 2),
)
segmentUpdates = allUpdates.filter(update =>
update.update.update.recordTime > fromTimestamp &&
update.update.update.recordTime <= toTimestamp
)
} yield {
val objectKeys = s3Objects.contents.asScala.sortBy(_.key())
Expand All @@ -95,10 +103,11 @@ class UpdateHistoryBulkStorageTest
val allUpdatesFromS3 = objectKeys.flatMap(
readUncompressAndDecode(bucketConnection, io.circe.parser.decode[UpdateHistoryItemV2])
)
allUpdatesFromS3.length shouldBe segmentUpdates.length
allUpdatesFromS3
.map(
CompactJsonScanHttpEncodingsWithFieldLabels().httpToLapiUpdate
) should contain theSameElementsInOrderAs allUpdates
) should contain theSameElementsInOrderAs segmentUpdates
}
}
}
Expand All @@ -120,17 +129,14 @@ class UpdateHistoryBulkStorageTest
store.getUpdatesWithoutImportUpdates(
any[Option[(Long, CantonTimestamp)]],
any[Limit],
anyBoolean,
)(any[TraceContext])
).thenAnswer {
(
afterO: Option[(Long, CantonTimestamp)],
limit: Limit,
afterIsInclusive: Boolean,
) =>
Future {
val afterIdx = afterO.map { case (_, t) => t.toEpochMilli }.getOrElse(0L)
val fromIdx = if (afterIsInclusive) afterIdx else afterIdx + 1
val fromIdx = afterO.map { case (_, t) => t.toEpochMilli }.getOrElse(0L) + 1
val remaining = storeSize - fromIdx
val numElems = math.min(limit.limit.toLong, remaining)
Seq
Expand Down
Loading