Skip to content

Commit 6b6abb0

Browse files
committed
[lake] Avoid to generate empty split in tiering source enumerator
1 parent f9b0072 commit 6b6abb0

File tree

3 files changed

+104
-46
lines changed

3 files changed

+104
-46
lines changed

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSplitGenerator.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,13 @@ private Optional<TieringSplit> generateSplitForPrimaryKeyTableBucket(
243243
@Nullable Long latestOffsetOfSnapshot,
244244
@Nullable Long lastCommittedBucketOffset,
245245
long latestBucketOffset) {
246+
if (latestBucketOffset <= 0) {
247+
LOG.info(
248+
"The latestBucketOffset {} is equals or less than 0, skip generating split for bucket {}",
249+
latestBucketOffset,
250+
tableBucket);
251+
return Optional.empty();
252+
}
246253

247254
// the bucket is never been tiered, read kv snapshot is more efficient
248255
if (lastCommittedBucketOffset == null) {
@@ -280,6 +287,11 @@ private Optional<TieringSplit> generateSplitForPrimaryKeyTableBucket(
280287
latestBucketOffset,
281288
0));
282289
} else {
290+
LOG.info(
291+
"The lastCommittedBucketOffset {} is equals or bigger than latestBucketOffset {}, skip generating split for bucket {}",
292+
lastCommittedBucketOffset,
293+
latestBucketOffset,
294+
tableBucket);
283295
return Optional.empty();
284296
}
285297
}
@@ -291,6 +303,13 @@ private Optional<TieringSplit> generateSplitForLogTableBucket(
291303
@Nullable String partitionName,
292304
@Nullable Long lastCommittedBucketOffset,
293305
long latestBucketOffset) {
306+
if (latestBucketOffset <= 0) {
307+
LOG.info(
308+
"The latestBucketOffset {} is equals or less than 0, skip generating split for bucket {}",
309+
latestBucketOffset,
310+
tableBucket);
311+
return Optional.empty();
312+
}
294313

295314
// the bucket is never been tiered
296315
if (lastCommittedBucketOffset == null) {

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringTestBase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ static void afterAll() throws Exception {
180180
private static Configuration flussClusterConfig() {
181181
Configuration conf = new Configuration();
182182
// set snapshot interval to 1s for testing purposes
183-
conf.set(ConfigOptions.KV_SNAPSHOT_INTERVAL, Duration.ofSeconds(1));
183+
conf.set(ConfigOptions.KV_SNAPSHOT_INTERVAL, Duration.ofSeconds(10));
184184
// not to clean snapshots for test purpose
185185
conf.set(ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS, Integer.MAX_VALUE);
186186

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java

Lines changed: 84 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -89,17 +89,24 @@ void testPrimaryKeyTableWithNoSnapshotSplits() throws Throwable {
8989
registerReader(context, enumerator, subtaskId, "localhost-" + subtaskId);
9090
enumerator.handleSplitRequest(subtaskId, "localhost-" + subtaskId);
9191
}
92-
waitUntilTieringTableSplitAssignmentReady(context, DEFAULT_BUCKET_NUM, 200L);
92+
93+
// write one row to one bucket, keep the other buckets empty
94+
Map<Integer, Long> bucketOffsetOfFirstWrite =
95+
upsertRow(tablePath, DEFAULT_PK_TABLE_DESCRIPTOR, 0, 1);
96+
97+
// only non-empty buckets should generate splits
98+
waitUntilTieringTableSplitAssignmentReady(context, 1, 200L);
9399
List<TieringSplit> expectedAssignment = new ArrayList<>();
94-
for (int bucketId = 0; bucketId < DEFAULT_BUCKET_NUM; bucketId++) {
100+
// we don't wait snapshot here, so the generated split is TieringLogSplit
101+
for (int bucketId : bucketOffsetOfFirstWrite.keySet()) {
95102
expectedAssignment.add(
96103
new TieringLogSplit(
97104
tablePath,
98105
new TableBucket(tableId, bucketId),
99106
null,
100107
EARLIEST_OFFSET,
101-
0,
102-
expectNumberOfSplits));
108+
bucketOffsetOfFirstWrite.get(bucketId),
109+
bucketOffsetOfFirstWrite.size()));
103110
}
104111
List<TieringSplit> actualAssignment = new ArrayList<>();
105112
context.getSplitsAssignmentSequence()
@@ -113,7 +120,8 @@ void testPrimaryKeyTableWithNoSnapshotSplits() throws Throwable {
113120
final Map<Integer, Long> bucketOffsetOfInitialWrite = new HashMap<>();
114121
for (int tableBucket = 0; tableBucket < DEFAULT_BUCKET_NUM; tableBucket++) {
115122
bucketOffsetOfEarliest.put(tableBucket, EARLIEST_OFFSET);
116-
bucketOffsetOfInitialWrite.put(tableBucket, 0L);
123+
bucketOffsetOfInitialWrite.put(
124+
tableBucket, bucketOffsetOfFirstWrite.getOrDefault(tableBucket, 0L));
117125
}
118126
// commit and notify this table tiering task finished
119127
coordinatorGateway
@@ -128,8 +136,9 @@ void testPrimaryKeyTableWithNoSnapshotSplits() throws Throwable {
128136

129137
enumerator.handleSourceEvent(1, new FinishedTieringEvent(tableId));
130138

139+
// write rows to every bucket
131140
Map<Integer, Long> bucketOffsetOfSecondWrite =
132-
upsertRow(tablePath, DEFAULT_PK_TABLE_DESCRIPTOR, 0, 10);
141+
upsertRow(tablePath, DEFAULT_PK_TABLE_DESCRIPTOR, 1, 10);
133142
long snapshotId = 0;
134143
waitUntilSnapshot(tableId, snapshotId);
135144

@@ -282,18 +291,23 @@ void testLogTableSplits() throws Throwable {
282291
registerReader(context, enumerator, subtaskId, "localhost-" + subtaskId);
283292
enumerator.handleSplitRequest(subtaskId, "localhost-" + subtaskId);
284293
}
285-
waitUntilTieringTableSplitAssignmentReady(context, DEFAULT_BUCKET_NUM, 200);
286294

295+
// write one row to one bucket, keep the other buckets empty
296+
Map<Integer, Long> bucketOffsetOfFirstWrite =
297+
appendRow(tablePath, DEFAULT_LOG_TABLE_DESCRIPTOR, 0, 1);
298+
299+
// only non-empty buckets should generate splits
300+
waitUntilTieringTableSplitAssignmentReady(context, 1, 200L);
287301
List<TieringSplit> expectedAssignment = new ArrayList<>();
288-
for (int bucketId = 0; bucketId < DEFAULT_BUCKET_NUM; bucketId++) {
302+
for (int bucketId : bucketOffsetOfFirstWrite.keySet()) {
289303
expectedAssignment.add(
290304
new TieringLogSplit(
291305
tablePath,
292306
new TableBucket(tableId, bucketId),
293307
null,
294308
EARLIEST_OFFSET,
295-
0L,
296-
expectNumberOfSplits));
309+
bucketOffsetOfFirstWrite.get(bucketId),
310+
bucketOffsetOfFirstWrite.size()));
297311
}
298312
List<TieringSplit> actualAssignment = new ArrayList<>();
299313
context.getSplitsAssignmentSequence()
@@ -307,7 +321,8 @@ void testLogTableSplits() throws Throwable {
307321
final Map<Integer, Long> bucketOffsetOfInitialWrite = new HashMap<>();
308322
for (int tableBucket = 0; tableBucket < DEFAULT_BUCKET_NUM; tableBucket++) {
309323
bucketOffsetOfEarliest.put(tableBucket, EARLIEST_OFFSET);
310-
bucketOffsetOfInitialWrite.put(tableBucket, 0L);
324+
bucketOffsetOfInitialWrite.put(
325+
tableBucket, bucketOffsetOfFirstWrite.getOrDefault(tableBucket, 0L));
311326
}
312327
// commit and notify this table tiering task finished
313328
coordinatorGateway
@@ -321,8 +336,9 @@ void testLogTableSplits() throws Throwable {
321336
.get();
322337
enumerator.handleSourceEvent(1, new FinishedTieringEvent(tableId));
323338

339+
// write rows to every bucket
324340
Map<Integer, Long> bucketOffsetOfSecondWrite =
325-
appendRow(tablePath, DEFAULT_LOG_TABLE_DESCRIPTOR, 0, 10);
341+
appendRow(tablePath, DEFAULT_LOG_TABLE_DESCRIPTOR, 1, 10);
326342

327343
// request tiering table splits
328344
for (int subtaskId = 0; subtaskId < numSubtasks; subtaskId++) {
@@ -362,12 +378,6 @@ void testPartitionedPrimaryKeyTable() throws Throwable {
362378
FLUSS_CLUSTER_EXTENSION.waitUntilPartitionsCreated(
363379
tablePath, TABLE_AUTO_PARTITION_NUM_PRECREATE.defaultValue());
364380

365-
final Map<Long, Map<Integer, Long>> bucketOffsetOfInitialWrite =
366-
upsertRowForPartitionedTable(
367-
tablePath, DEFAULT_PK_TABLE_DESCRIPTOR, partitionNameByIds, 0, 10);
368-
long snapshotId = 0;
369-
waitUntilPartitionTableSnapshot(tableId, partitionNameByIds, snapshotId);
370-
371381
int numSubtasks = 6;
372382
int expectNumberOfSplits = 6;
373383
// test get snapshot split assignment
@@ -384,21 +394,28 @@ void testPartitionedPrimaryKeyTable() throws Throwable {
384394
registerReader(context, enumerator, subtaskId, "localhost-" + subtaskId);
385395
enumerator.handleSplitRequest(subtaskId, "localhost-" + subtaskId);
386396
}
387-
waitUntilTieringTableSplitAssignmentReady(
388-
context, DEFAULT_BUCKET_NUM * partitionNameByIds.size(), 3000L);
389397

390-
List<TieringSplit> expectedSnapshotAssignment = new ArrayList<>();
398+
// write one row to one bucket of each partition
399+
final Map<Long, Map<Integer, Long>> bucketOffsetOfFirstWrite =
400+
upsertRowForPartitionedTable(
401+
tablePath, DEFAULT_PK_TABLE_DESCRIPTOR, partitionNameByIds, 0, 1);
402+
403+
// only non-empty buckets should generate splits
404+
waitUntilTieringTableSplitAssignmentReady(context, partitionNameByIds.size(), 3000L);
405+
406+
List<TieringSplit> expectedAssignment = new ArrayList<>();
407+
// we don't wait snapshot here, so the generated split is TieringLogSplit
391408
for (Map.Entry<String, Long> partitionNameById : partitionNameByIds.entrySet()) {
392-
for (int tableBucket = 0; tableBucket < DEFAULT_BUCKET_NUM; tableBucket++) {
393-
long partitionId = partitionNameById.getValue();
394-
expectedSnapshotAssignment.add(
395-
new TieringSnapshotSplit(
409+
long partitionId = partitionNameById.getValue();
410+
for (int tableBucket : bucketOffsetOfFirstWrite.get(partitionId).keySet()) {
411+
expectedAssignment.add(
412+
new TieringLogSplit(
396413
tablePath,
397414
new TableBucket(tableId, partitionId, tableBucket),
398415
partitionNameById.getKey(),
399-
snapshotId,
400-
bucketOffsetOfInitialWrite.get(partitionId).get(tableBucket),
401-
expectNumberOfSplits));
416+
EARLIEST_OFFSET,
417+
bucketOffsetOfFirstWrite.get(partitionId).get(tableBucket),
418+
bucketOffsetOfFirstWrite.size()));
402419
}
403420
}
404421
List<TieringSplit> actualSnapshotAssignment = new ArrayList<>();
@@ -407,15 +424,24 @@ void testPartitionedPrimaryKeyTable() throws Throwable {
407424
splitsAssignment.assignment().values().forEach(actualSnapshotAssignment::addAll);
408425
}
409426
assertThat(sortSplits(actualSnapshotAssignment))
410-
.isEqualTo(sortSplits(expectedSnapshotAssignment));
427+
.isEqualTo(sortSplits(expectedAssignment));
411428

412429
// mock finished tiered this round, check second round
413430
context.getSplitsAssignmentSequence().clear();
431+
final Map<Long, Map<Integer, Long>> bucketOffsetOfInitialWrite = new HashMap<>();
414432
for (Map.Entry<String, Long> partitionNameById : partitionNameByIds.entrySet()) {
433+
long partitionId = partitionNameById.getValue();
415434
Map<Integer, Long> partitionInitialBucketOffsets = new HashMap<>();
435+
Map<Integer, Long> partitionBucketOffsetOfInitialWrite = new HashMap<>();
416436
for (int tableBucket = 0; tableBucket < DEFAULT_BUCKET_NUM; tableBucket++) {
417437
partitionInitialBucketOffsets.put(tableBucket, EARLIEST_OFFSET);
438+
partitionBucketOffsetOfInitialWrite.put(
439+
tableBucket,
440+
bucketOffsetOfFirstWrite
441+
.getOrDefault(partitionId, Collections.emptyMap())
442+
.getOrDefault(tableBucket, 0L));
418443
}
444+
bucketOffsetOfInitialWrite.put(partitionId, partitionBucketOffsetOfInitialWrite);
419445
// commit lake table partition
420446
coordinatorGateway
421447
.commitLakeTableSnapshot(
@@ -424,8 +450,7 @@ void testPartitionedPrimaryKeyTable() throws Throwable {
424450
partitionNameById.getValue(),
425451
1,
426452
partitionInitialBucketOffsets,
427-
bucketOffsetOfInitialWrite.get(
428-
partitionNameById.getValue())))
453+
bucketOffsetOfInitialWrite.get(partitionId)))
429454
.get();
430455
}
431456
// notify this table tiering task finished
@@ -434,7 +459,7 @@ void testPartitionedPrimaryKeyTable() throws Throwable {
434459
Map<Long, Map<Integer, Long>> bucketOffsetOfSecondWrite =
435460
upsertRowForPartitionedTable(
436461
tablePath, DEFAULT_PK_TABLE_DESCRIPTOR, partitionNameByIds, 10, 20);
437-
snapshotId = 1;
462+
long snapshotId = 0;
438463
waitUntilPartitionTableSnapshot(tableId, partitionNameByIds, snapshotId);
439464

440465
// request tiering table splits
@@ -447,16 +472,16 @@ void testPartitionedPrimaryKeyTable() throws Throwable {
447472
List<TieringSplit> expectedLogAssignment = new ArrayList<>();
448473
for (Map.Entry<String, Long> partitionNameById : partitionNameByIds.entrySet()) {
449474
for (int tableBucket = 0; tableBucket < DEFAULT_BUCKET_NUM; tableBucket++) {
450-
long partionId = partitionNameById.getValue();
475+
long partitionId = partitionNameById.getValue();
451476
expectedLogAssignment.add(
452477
new TieringLogSplit(
453478
tablePath,
454-
new TableBucket(tableId, partionId, tableBucket),
479+
new TableBucket(tableId, partitionId, tableBucket),
455480
partitionNameById.getKey(),
456-
bucketOffsetOfInitialWrite.get(partionId).get(tableBucket),
457-
bucketOffsetOfInitialWrite.get(partionId).get(tableBucket)
481+
bucketOffsetOfInitialWrite.get(partitionId).get(tableBucket),
482+
bucketOffsetOfInitialWrite.get(partitionId).get(tableBucket)
458483
+ bucketOffsetOfSecondWrite
459-
.get(partionId)
484+
.get(partitionId)
460485
.get(tableBucket),
461486
expectNumberOfSplits));
462487
}
@@ -496,21 +521,29 @@ void testPartitionedLogTableSplits() throws Throwable {
496521
registerReader(context, enumerator, subtaskId, "localhost-" + subtaskId);
497522
enumerator.handleSplitRequest(subtaskId, "localhost-" + subtaskId);
498523
}
499-
waitUntilTieringTableSplitAssignmentReady(
500-
context, DEFAULT_BUCKET_NUM * partitionNameByIds.size(), 3000L);
524+
525+
Map<Long, Map<Integer, Long>> bucketOffsetOfFirstWrite =
526+
appendRowForPartitionedTable(
527+
tablePath,
528+
DEFAULT_AUTO_PARTITIONED_LOG_TABLE_DESCRIPTOR,
529+
partitionNameByIds,
530+
0,
531+
1);
532+
533+
waitUntilTieringTableSplitAssignmentReady(context, partitionNameByIds.size(), 3000L);
501534

502535
List<TieringSplit> expectedAssignment = new ArrayList<>();
503536
for (Map.Entry<String, Long> partitionNameById : partitionNameByIds.entrySet()) {
504-
for (int tableBucket = 0; tableBucket < DEFAULT_BUCKET_NUM; tableBucket++) {
505-
long partitionId = partitionNameById.getValue();
537+
long partitionId = partitionNameById.getValue();
538+
for (int tableBucket : bucketOffsetOfFirstWrite.get(partitionId).keySet()) {
506539
expectedAssignment.add(
507540
new TieringLogSplit(
508541
tablePath,
509542
new TableBucket(tableId, partitionId, tableBucket),
510543
partitionNameById.getKey(),
511544
EARLIEST_OFFSET,
512-
0L,
513-
expectNumberOfSplits));
545+
bucketOffsetOfFirstWrite.get(partitionId).get(tableBucket),
546+
bucketOffsetOfFirstWrite.size()));
514547
}
515548
}
516549
List<TieringSplit> actualAssignment = new ArrayList<>();
@@ -529,7 +562,11 @@ void testPartitionedLogTableSplits() throws Throwable {
529562
Map<Integer, Long> partitionBucketOffsetOfInitialWrite = new HashMap<>();
530563
for (int tableBucket = 0; tableBucket < DEFAULT_BUCKET_NUM; tableBucket++) {
531564
partitionInitialBucketOffsets.put(tableBucket, EARLIEST_OFFSET);
532-
partitionBucketOffsetOfInitialWrite.put(tableBucket, 0L);
565+
partitionBucketOffsetOfInitialWrite.put(
566+
tableBucket,
567+
bucketOffsetOfFirstWrite
568+
.getOrDefault(partitionId, Collections.emptyMap())
569+
.getOrDefault(tableBucket, 0L));
533570
}
534571
bucketOffsetOfInitialWrite.put(partitionId, partitionBucketOffsetOfInitialWrite);
535572
// commit lake table partition
@@ -551,7 +588,7 @@ void testPartitionedLogTableSplits() throws Throwable {
551588
tablePath,
552589
DEFAULT_AUTO_PARTITIONED_LOG_TABLE_DESCRIPTOR,
553590
partitionNameByIds,
554-
0,
591+
1,
555592
10);
556593

557594
// request tiering table splits
@@ -649,9 +686,11 @@ void testHandleFailedTieringTableEvent() throws Throwable {
649686
void testHandleFailOverEvent() throws Throwable {
650687
TablePath tablePath1 = TablePath.of(DEFAULT_DB, "tiering-failover-test-log-table1");
651688
createTable(tablePath1, DEFAULT_LOG_TABLE_DESCRIPTOR);
689+
appendRow(tablePath1, DEFAULT_LOG_TABLE_DESCRIPTOR, 0, 10);
652690

653691
TablePath tablePath2 = TablePath.of(DEFAULT_DB, "tiering-failover-test-log-table2");
654692
createTable(tablePath2, DEFAULT_LOG_TABLE_DESCRIPTOR);
693+
appendRow(tablePath2, DEFAULT_LOG_TABLE_DESCRIPTOR, 0, 10);
655694

656695
int numSubtasks = 1;
657696
try (MockSplitEnumeratorContext<TieringSplit> context =

0 commit comments

Comments
 (0)