diff --git a/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/UpdateHistory.scala b/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/UpdateHistory.scala index 1999cd3bc4..35c5abcd86 100644 --- a/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/UpdateHistory.scala +++ b/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/store/UpdateHistory.scala @@ -1031,17 +1031,15 @@ class UpdateHistory( private def afterFilters( afterO: Option[(Long, CantonTimestamp)], includeImportUpdates: Boolean, - afterIsInclusive: Boolean, ): NonEmptyList[SQLActionBuilder] = { val gtMin = if (includeImportUpdates) ">=" else ">" - val gtAfter = if (afterIsInclusive) ">=" else ">" afterO match { case None => NonEmptyList.of(sql"migration_id >= 0 and record_time #$gtMin ${CantonTimestamp.MinValue}") case Some((afterMigrationId, afterRecordTime)) => // This makes it so that the two queries use updt_hist_tran_hi_mi_rt_di, NonEmptyList.of( - sql"migration_id = ${afterMigrationId} and record_time #$gtAfter ${afterRecordTime} ", + sql"migration_id = ${afterMigrationId} and record_time > ${afterRecordTime} ", sql"migration_id > ${afterMigrationId} and record_time #$gtMin ${CantonTimestamp.MinValue}", ) } @@ -1227,9 +1225,8 @@ class UpdateHistory( def getUpdatesWithoutImportUpdates( afterO: Option[(Long, CantonTimestamp)], limit: Limit, - afterIsInclusive: Boolean, )(implicit tc: TraceContext): Future[Seq[TreeUpdateWithMigrationId]] = { - val filters = afterFilters(afterO, includeImportUpdates = false, afterIsInclusive) + val filters = afterFilters(afterO, includeImportUpdates = false) val orderBy = sql"migration_id, record_time, domain_id" for { txs <- getTxUpdates(filters, orderBy, limit) @@ -1240,17 +1237,11 @@ class UpdateHistory( } } - def getUpdatesWithoutImportUpdates( - afterO: Option[(Long, CantonTimestamp)], - limit: Limit, - )(implicit tc: TraceContext): Future[Seq[TreeUpdateWithMigrationId]] = - getUpdatesWithoutImportUpdates(afterO, limit, false) - def getAllUpdates( afterO: Option[(Long, CantonTimestamp)], limit: PageLimit, )(implicit tc: TraceContext): Future[Seq[TreeUpdateWithMigrationId]] = { - val filters = afterFilters(afterO, includeImportUpdates = true, afterIsInclusive = false) + val filters = afterFilters(afterO, includeImportUpdates = true) // With import updates, we have to include the update id to get a deterministic order. // We don't have an index for this order, but this is only used in test code and deprecated scan endpoints. val orderBy = sql"migration_id, record_time, domain_id, update_id" diff --git a/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/store/bulk/UpdateHistorySegmentBulkStorage.scala b/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/store/bulk/UpdateHistorySegmentBulkStorage.scala index 59b607990b..680d9ebedf 100644 --- a/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/store/bulk/UpdateHistorySegmentBulkStorage.scala +++ b/apps/scan/src/main/scala/org/lfdecentralizedtrust/splice/scan/store/bulk/UpdateHistorySegmentBulkStorage.scala @@ -112,20 +112,17 @@ class UpdateHistorySegmentBulkStorage( updateHistory.getUpdatesWithoutImportUpdates( Some((fromMigrationId, fromTimestamp)), HardLimit.tryCreate(config.dbReadChunkSize), - afterIsInclusive = true, ) )(after => updateHistory.getUpdatesWithoutImportUpdates( Some((after._1, after._2)), HardLimit.tryCreate(config.dbReadChunkSize), - afterIsInclusive = false, ) ) - // TODO(#3429): Figure out the < vs <= issue updatesInSegment = updates.filter(update => update.migrationId < toMigrationId || - update.migrationId == toMigrationId && update.update.update.recordTime < toTimestamp + update.migrationId == toMigrationId && update.update.update.recordTime <= toTimestamp ) _ <- if (updatesInSegment.length < updates.length || updates.length == config.dbReadChunkSize) { diff --git a/apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/scan/store/UpdateHistoryBulkStorageTest.scala b/apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/scan/store/UpdateHistoryBulkStorageTest.scala index 326f93e4ec..a3878fe5ff 100644 --- a/apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/scan/store/UpdateHistoryBulkStorageTest.scala +++ b/apps/scan/src/test/scala/org/lfdecentralizedtrust/splice/scan/store/UpdateHistoryBulkStorageTest.scala @@ -47,16 +47,21 @@ class UpdateHistoryBulkStorageTest withS3Mock { val initialStoreSize = 1500 val segmentSize = 2200L + val segmentFromTimestamp = 100L val mockStore = new MockUpdateHistoryStore(initialStoreSize) val bucketConnection = getS3BucketConnection(loggerFactory) + val fromTimestamp = + CantonTimestamp.tryFromInstant(Instant.ofEpochMilli(segmentFromTimestamp)) + val toTimestamp = + CantonTimestamp.tryFromInstant(Instant.ofEpochMilli(segmentFromTimestamp + segmentSize)) val segment = new UpdateHistorySegmentBulkStorage( bulkStorageTestConfig, mockStore.store, bucketConnection, 0, - CantonTimestamp.tryFromInstant(Instant.ofEpochMilli(0)), + fromTimestamp, 0, - CantonTimestamp.tryFromInstant(Instant.ofEpochMilli(segmentSize)), + toTimestamp, loggerFactory, ) clue( @@ -85,8 +90,11 @@ class UpdateHistoryBulkStorageTest .asScala allUpdates <- mockStore.store.getUpdatesWithoutImportUpdates( None, - HardLimit.tryCreate(segmentSize.toInt, segmentSize.toInt), - afterIsInclusive = true, + HardLimit.tryCreate(segmentSize.toInt * 2, segmentSize.toInt * 2), + ) + segmentUpdates = allUpdates.filter(update => + update.update.update.recordTime > fromTimestamp && + update.update.update.recordTime <= toTimestamp ) } yield { val objectKeys = s3Objects.contents.asScala.sortBy(_.key()) @@ -95,10 +103,11 @@ class UpdateHistoryBulkStorageTest val allUpdatesFromS3 = objectKeys.flatMap( readUncompressAndDecode(bucketConnection, io.circe.parser.decode[UpdateHistoryItemV2]) ) + allUpdatesFromS3.length shouldBe segmentUpdates.length allUpdatesFromS3 .map( CompactJsonScanHttpEncodingsWithFieldLabels().httpToLapiUpdate - ) should contain theSameElementsInOrderAs allUpdates + ) should contain theSameElementsInOrderAs segmentUpdates } } } @@ -120,17 +129,14 @@ class UpdateHistoryBulkStorageTest store.getUpdatesWithoutImportUpdates( any[Option[(Long, CantonTimestamp)]], any[Limit], - anyBoolean, )(any[TraceContext]) ).thenAnswer { ( afterO: Option[(Long, CantonTimestamp)], limit: Limit, - afterIsInclusive: Boolean, ) => Future { - val afterIdx = afterO.map { case (_, t) => t.toEpochMilli }.getOrElse(0L) - val fromIdx = if (afterIsInclusive) afterIdx else afterIdx + 1 + val fromIdx = afterO.map { case (_, t) => t.toEpochMilli }.getOrElse(0L) + 1 val remaining = storeSize - fromIdx val numElems = math.min(limit.limit.toLong, remaining) Seq