4949
5050import javax .annotation .Nullable ;
5151
52- import java .io .IOException ;
5352import java .util .ArrayList ;
5453import java .util .Collections ;
5554import java .util .HashMap ;
5655import java .util .List ;
5756import java .util .Map ;
5857
59- import static org .apache .fluss .flink .tiering .committer .TieringCommitOperator .fromLogOffsetProperty ;
60- import static org .apache .fluss .flink .tiering .committer .TieringCommitOperator .toBucketOffsetsProperty ;
6158import static org .apache .fluss .lake .committer .BucketOffset .FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY ;
6259import static org .apache .fluss .record .TestData .DATA1_PARTITIONED_TABLE_DESCRIPTOR ;
6360import static org .assertj .core .api .Assertions .assertThat ;
@@ -181,7 +178,6 @@ void testCommitPartitionedTable() throws Exception {
181178 Map <String , Long > partitionIdByNames =
182179 FLUSS_CLUSTER_EXTENSION .waitUntilPartitionAllReady (tablePath );
183180 Map <TableBucket , Long > expectedLogEndOffsets = new HashMap <>();
184- Map <TableBucket , Long > expectedMaxTimestamps = new HashMap <>();
185181 int numberOfWriteResults = 3 * partitionIdByNames .size ();
186182 long offset = 0 ;
187183 long timestamp = System .currentTimeMillis ();
@@ -201,7 +197,6 @@ void testCommitPartitionedTable() throws Exception {
201197 currentTimestamp ,
202198 numberOfWriteResults ));
203199 expectedLogEndOffsets .put (tableBucket , currentOffset );
204- expectedMaxTimestamps .put (tableBucket , currentTimestamp );
205200 }
206201 if (bucket == 2 ) {
207202 verifyLakeSnapshot (tablePath , tableId , 1 , expectedLogEndOffsets );
@@ -261,10 +256,17 @@ void testTableCommitWhenFlussMissingLakeSnapshot() throws Exception {
261256 long tableId = createTable (tablePath , DEFAULT_PK_TABLE_DESCRIPTOR );
262257 int numberOfWriteResults = 3 ;
263258
264- CommittedLakeSnapshot mockCommittedSnapshot =
265- mockCommittedLakeSnapshot (Collections .singletonList (null ), tableId , 2 );
259+ Map <TableBucket , Long > expectedLogEndOffsets = new HashMap <>();
260+ for (int bucket = 0 ; bucket < 3 ; bucket ++) {
261+ TableBucket tableBucket = new TableBucket (tableId , bucket );
262+ expectedLogEndOffsets .put (tableBucket , 3L );
263+ }
264+
265+ CommittedLakeSnapshot mockMissingCommittedLakeSnapshot =
266+ mockCommittedLakeSnapshot (tableId , tablePath , 0 , expectedLogEndOffsets );
266267 TestingLakeTieringFactory .TestingLakeCommitter testingLakeCommitter =
267- new TestingLakeTieringFactory .TestingLakeCommitter (mockCommittedSnapshot );
268+ new TestingLakeTieringFactory .TestingLakeCommitter (
269+ mockMissingCommittedLakeSnapshot );
268270 committerOperator =
269271 new TieringCommitOperator <>(
270272 parameters ,
@@ -283,18 +285,18 @@ void testTableCommitWhenFlussMissingLakeSnapshot() throws Exception {
283285 verifyLakeSnapshot (
284286 tablePath ,
285287 tableId ,
286- 2 ,
287- getExpectedLogEndOffsets ( tableId , mockCommittedSnapshot ) ,
288+ 0 ,
289+ expectedLogEndOffsets ,
288290 String .format (
289291 "The current Fluss's lake snapshot %s is less than lake actual snapshot %d committed by Fluss for table: {tablePath=%s, tableId=%d},"
290292 + " missing snapshot: %s." ,
291293 null ,
292- mockCommittedSnapshot .getLakeSnapshotId (),
294+ mockMissingCommittedLakeSnapshot .getLakeSnapshotId (),
293295 tablePath ,
294296 tableId ,
295- mockCommittedSnapshot ));
297+ mockMissingCommittedLakeSnapshot ));
296298
297- Map < TableBucket , Long > expectedLogEndOffsets = new HashMap <>();
299+ expectedLogEndOffsets = new HashMap <>();
298300 for (int bucket = 0 ; bucket < 3 ; bucket ++) {
299301 TableBucket tableBucket = new TableBucket (tableId , bucket );
300302 long offset = bucket * bucket ;
@@ -305,7 +307,7 @@ void testTableCommitWhenFlussMissingLakeSnapshot() throws Exception {
305307 expectedLogEndOffsets .put (tableBucket , offset );
306308 }
307309
308- verifyLakeSnapshot (tablePath , tableId , 3 , expectedLogEndOffsets );
310+ verifyLakeSnapshot (tablePath , tableId , 1 , expectedLogEndOffsets );
309311 }
310312
311313 @ Test
@@ -318,10 +320,21 @@ void testPartitionedTableCommitWhenFlussMissingLakeSnapshot() throws Exception {
318320 Map <String , Long > partitionIdByNames =
319321 FLUSS_CLUSTER_EXTENSION .waitUntilPartitionAllReady (tablePath );
320322
321- CommittedLakeSnapshot mockCommittedSnapshot =
322- mockCommittedLakeSnapshot (Collections .singletonList (null ), tableId , 3 );
323+ Map <TableBucket , Long > expectedLogEndOffsets = new HashMap <>();
324+ for (int bucket = 0 ; bucket < 3 ; bucket ++) {
325+ for (String partitionName : partitionIdByNames .keySet ()) {
326+ long partitionId = partitionIdByNames .get (partitionName );
327+ TableBucket tableBucket = new TableBucket (tableId , partitionId , bucket );
328+ expectedLogEndOffsets .put (tableBucket , 3L );
329+ }
330+ }
331+
332+ CommittedLakeSnapshot mockMissingCommittedLakeSnapshot =
333+ mockCommittedLakeSnapshot (tableId , tablePath , 0 , expectedLogEndOffsets );
334+
323335 TestingLakeTieringFactory .TestingLakeCommitter testingLakeCommitter =
324- new TestingLakeTieringFactory .TestingLakeCommitter (mockCommittedSnapshot );
336+ new TestingLakeTieringFactory .TestingLakeCommitter (
337+ mockMissingCommittedLakeSnapshot );
325338 committerOperator =
326339 new TieringCommitOperator <>(
327340 parameters ,
@@ -351,35 +364,15 @@ void testPartitionedTableCommitWhenFlussMissingLakeSnapshot() throws Exception {
351364 verifyLakeSnapshot (
352365 tablePath ,
353366 tableId ,
354- 3 ,
355- getExpectedLogEndOffsets ( tableId , mockCommittedSnapshot ) ,
367+ 0 ,
368+ expectedLogEndOffsets ,
356369 String .format (
357370 "The current Fluss's lake snapshot %s is less than lake actual snapshot %d committed by Fluss for table: {tablePath=%s, tableId=%d}, missing snapshot: %s." ,
358371 null ,
359- mockCommittedSnapshot .getLakeSnapshotId (),
372+ mockMissingCommittedLakeSnapshot .getLakeSnapshotId (),
360373 tablePath ,
361374 tableId ,
362- mockCommittedSnapshot ));
363- }
364-
365- private CommittedLakeSnapshot mockCommittedLakeSnapshot (
366- List <Long > partitions , long tableId , int snapshotId ) throws IOException {
367- Map <TableBucket , Long > logEndOffsets = new HashMap <>();
368- for (Long partition : partitions ) {
369- for (int bucket = 0 ; bucket < DEFAULT_BUCKET_NUM ; bucket ++) {
370- logEndOffsets .put (new TableBucket (tableId , partition , bucket ), bucket + 1L );
371- }
372- }
373- return new CommittedLakeSnapshot (snapshotId , toBucketOffsetsProperty (logEndOffsets ));
374- }
375-
376- private Map <TableBucket , Long > getExpectedLogEndOffsets (
377- long tableId , CommittedLakeSnapshot committedLakeSnapshot ) throws IOException {
378- return fromLogOffsetProperty (
379- tableId ,
380- committedLakeSnapshot
381- .getSnapshotProperties ()
382- .get (FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY ));
375+ mockMissingCommittedLakeSnapshot ));
383376 }
384377
385378 private StreamRecord <TableBucketWriteResult <TestingWriteResult >>
@@ -461,10 +454,25 @@ private void verifyLakeSnapshot(
461454 List <OperatorEvent > operatorEvents = mockOperatorEventGateway .getEventsSent ();
462455 SourceEventWrapper sourceEventWrapper =
463456 (SourceEventWrapper ) operatorEvents .get (operatorEvents .size () - 1 );
464- FailedTieringEvent finishTieringEvent =
457+ FailedTieringEvent failedTieringEvent =
465458 (FailedTieringEvent ) sourceEventWrapper .getSourceEvent ();
466- assertThat (finishTieringEvent .getTableId ()).isEqualTo (tableId );
467- assertThat (finishTieringEvent .failReason ()).contains (failedReason );
459+ assertThat (failedTieringEvent .getTableId ()).isEqualTo (tableId );
460+ assertThat (failedTieringEvent .failReason ()).contains (failedReason );
461+ }
462+
463+ private CommittedLakeSnapshot mockCommittedLakeSnapshot (
464+ long tableId , TablePath tablePath , int snapshotId , Map <TableBucket , Long > logEndOffsets )
465+ throws Exception {
466+ try (FlussTableLakeSnapshotCommitter lakeSnapshotCommitter =
467+ new FlussTableLakeSnapshotCommitter (FLUSS_CLUSTER_EXTENSION .getClientConfig ())) {
468+ lakeSnapshotCommitter .open ();
469+ String lakeSnapshotFile =
470+ lakeSnapshotCommitter .prepareCommit (tableId , tablePath , logEndOffsets );
471+ return new CommittedLakeSnapshot (
472+ snapshotId ,
473+ Collections .singletonMap (
474+ FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY , lakeSnapshotFile ));
475+ }
468476 }
469477
470478 private static class MockOperatorEventDispatcher implements OperatorEventDispatcher {
0 commit comments