@@ -47,16 +47,21 @@ class UpdateHistoryBulkStorageTest
4747 withS3Mock {
4848 val initialStoreSize = 1500
4949 val segmentSize = 2200L
50+ val segmentFromTimestamp = 100L
5051 val mockStore = new MockUpdateHistoryStore (initialStoreSize)
5152 val bucketConnection = getS3BucketConnection(loggerFactory)
53+ val fromTimestamp =
54+ CantonTimestamp .tryFromInstant(Instant .ofEpochMilli(segmentFromTimestamp))
55+ val toTimestamp =
56+ CantonTimestamp .tryFromInstant(Instant .ofEpochMilli(segmentFromTimestamp + segmentSize))
5257 val segment = new UpdateHistorySegmentBulkStorage (
5358 bulkStorageTestConfig,
5459 mockStore.store,
5560 bucketConnection,
5661 0 ,
57- CantonTimestamp .tryFromInstant( Instant .ofEpochMilli( 0 )) ,
62+ fromTimestamp ,
5863 0 ,
59- CantonTimestamp .tryFromInstant( Instant .ofEpochMilli(segmentSize)) ,
64+ toTimestamp ,
6065 loggerFactory,
6166 )
6267 clue(
@@ -85,8 +90,11 @@ class UpdateHistoryBulkStorageTest
8590 .asScala
8691 allUpdates <- mockStore.store.getUpdatesWithoutImportUpdates(
8792 None ,
88- HardLimit .tryCreate(segmentSize.toInt, segmentSize.toInt),
89- afterIsInclusive = true ,
93+ HardLimit .tryCreate(segmentSize.toInt * 2 , segmentSize.toInt * 2 ),
94+ )
95+ segmentUpdates = allUpdates.filter(update =>
96+ update.update.update.recordTime > fromTimestamp &&
97+ update.update.update.recordTime <= toTimestamp
9098 )
9199 } yield {
92100 val objectKeys = s3Objects.contents.asScala.sortBy(_.key())
@@ -95,10 +103,11 @@ class UpdateHistoryBulkStorageTest
95103 val allUpdatesFromS3 = objectKeys.flatMap(
96104 readUncompressAndDecode(bucketConnection, io.circe.parser.decode[UpdateHistoryItemV2 ])
97105 )
106+ allUpdatesFromS3.length shouldBe segmentUpdates.length
98107 allUpdatesFromS3
99108 .map(
100109 CompactJsonScanHttpEncodingsWithFieldLabels ().httpToLapiUpdate
101- ) should contain theSameElementsInOrderAs allUpdates
110+ ) should contain theSameElementsInOrderAs segmentUpdates
102111 }
103112 }
104113 }
@@ -120,17 +129,14 @@ class UpdateHistoryBulkStorageTest
120129 store.getUpdatesWithoutImportUpdates(
121130 any[Option [(Long , CantonTimestamp )]],
122131 any[Limit ],
123- anyBoolean,
124132 )(any[TraceContext ])
125133 ).thenAnswer {
126134 (
127135 afterO : Option [(Long , CantonTimestamp )],
128136 limit : Limit ,
129- afterIsInclusive : Boolean ,
130137 ) =>
131138 Future {
132- val afterIdx = afterO.map { case (_, t) => t.toEpochMilli }.getOrElse(0L )
133- val fromIdx = if (afterIsInclusive) afterIdx else afterIdx + 1
139+ val fromIdx = afterO.map { case (_, t) => t.toEpochMilli }.getOrElse(0L ) + 1
134140 val remaining = storeSize - fromIdx
135141 val numElems = math.min(limit.limit.toLong, remaining)
136142 Seq
0 commit comments