Skip to content

Commit fc60308

Browse files
authored
[lake] Avoid to generate empty split in tiering source enumerator (#1925)
1 parent b94b2b6 commit fc60308

File tree

2 files changed

+80
-60
lines changed

2 files changed

+80
-60
lines changed

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSplitGenerator.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,13 @@ private Optional<TieringSplit> generateSplitForPrimaryKeyTableBucket(
243243
@Nullable Long latestOffsetOfSnapshot,
244244
@Nullable Long lastCommittedBucketOffset,
245245
long latestBucketOffset) {
246+
if (latestBucketOffset <= 0) {
247+
LOG.debug(
248+
"The latestBucketOffset {} is equals or less than 0, skip generating split for bucket {}",
249+
latestBucketOffset,
250+
tableBucket);
251+
return Optional.empty();
252+
}
246253

247254
// the bucket is never been tiered, read kv snapshot is more efficient
248255
if (lastCommittedBucketOffset == null) {
@@ -280,6 +287,11 @@ private Optional<TieringSplit> generateSplitForPrimaryKeyTableBucket(
280287
latestBucketOffset,
281288
0));
282289
} else {
290+
LOG.info(
291+
"The lastCommittedBucketOffset {} is equals or bigger than latestBucketOffset {}, skip generating split for bucket {}",
292+
lastCommittedBucketOffset,
293+
latestBucketOffset,
294+
tableBucket);
283295
return Optional.empty();
284296
}
285297
}
@@ -291,6 +303,13 @@ private Optional<TieringSplit> generateSplitForLogTableBucket(
291303
@Nullable String partitionName,
292304
@Nullable Long lastCommittedBucketOffset,
293305
long latestBucketOffset) {
306+
if (latestBucketOffset <= 0) {
307+
LOG.debug(
308+
"The latestBucketOffset {} is equals or less than 0, skip generating split for bucket {}",
309+
latestBucketOffset,
310+
tableBucket);
311+
return Optional.empty();
312+
}
294313

295314
// the bucket is never been tiered
296315
if (lastCommittedBucketOffset == null) {

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java

Lines changed: 61 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -89,23 +89,16 @@ void testPrimaryKeyTableWithNoSnapshotSplits() throws Throwable {
8989
registerReader(context, enumerator, subtaskId, "localhost-" + subtaskId);
9090
enumerator.handleSplitRequest(subtaskId, "localhost-" + subtaskId);
9191
}
92-
waitUntilTieringTableSplitAssignmentReady(context, DEFAULT_BUCKET_NUM, 200L);
93-
List<TieringSplit> expectedAssignment = new ArrayList<>();
94-
for (int bucketId = 0; bucketId < DEFAULT_BUCKET_NUM; bucketId++) {
95-
expectedAssignment.add(
96-
new TieringLogSplit(
97-
tablePath,
98-
new TableBucket(tableId, bucketId),
99-
null,
100-
EARLIEST_OFFSET,
101-
0,
102-
expectNumberOfSplits));
103-
}
92+
93+
// try to assign splits
94+
context.runPeriodicCallable(0);
95+
10496
List<TieringSplit> actualAssignment = new ArrayList<>();
10597
context.getSplitsAssignmentSequence()
10698
.forEach(a -> a.assignment().values().forEach(actualAssignment::addAll));
10799

108-
assertThat(actualAssignment).isEqualTo(expectedAssignment);
100+
// no split assignment for empty buckets
101+
assertThat(actualAssignment).isEmpty();
109102

110103
// mock finished tiered this round, check second round
111104
context.getSplitsAssignmentSequence().clear();
@@ -282,18 +275,23 @@ void testLogTableSplits() throws Throwable {
282275
registerReader(context, enumerator, subtaskId, "localhost-" + subtaskId);
283276
enumerator.handleSplitRequest(subtaskId, "localhost-" + subtaskId);
284277
}
285-
waitUntilTieringTableSplitAssignmentReady(context, DEFAULT_BUCKET_NUM, 200);
286278

279+
// write one row to one bucket, keep the other buckets empty
280+
Map<Integer, Long> bucketOffsetOfFirstWrite =
281+
appendRow(tablePath, DEFAULT_LOG_TABLE_DESCRIPTOR, 0, 1);
282+
283+
// only non-empty buckets should generate splits
284+
waitUntilTieringTableSplitAssignmentReady(context, 1, 200L);
287285
List<TieringSplit> expectedAssignment = new ArrayList<>();
288-
for (int bucketId = 0; bucketId < DEFAULT_BUCKET_NUM; bucketId++) {
286+
for (int bucketId : bucketOffsetOfFirstWrite.keySet()) {
289287
expectedAssignment.add(
290288
new TieringLogSplit(
291289
tablePath,
292290
new TableBucket(tableId, bucketId),
293291
null,
294292
EARLIEST_OFFSET,
295-
0L,
296-
expectNumberOfSplits));
293+
bucketOffsetOfFirstWrite.get(bucketId),
294+
bucketOffsetOfFirstWrite.size()));
297295
}
298296
List<TieringSplit> actualAssignment = new ArrayList<>();
299297
context.getSplitsAssignmentSequence()
@@ -307,7 +305,8 @@ void testLogTableSplits() throws Throwable {
307305
final Map<Integer, Long> bucketOffsetOfInitialWrite = new HashMap<>();
308306
for (int tableBucket = 0; tableBucket < DEFAULT_BUCKET_NUM; tableBucket++) {
309307
bucketOffsetOfEarliest.put(tableBucket, EARLIEST_OFFSET);
310-
bucketOffsetOfInitialWrite.put(tableBucket, 0L);
308+
bucketOffsetOfInitialWrite.put(
309+
tableBucket, bucketOffsetOfFirstWrite.getOrDefault(tableBucket, 0L));
311310
}
312311
// commit and notify this table tiering task finished
313312
coordinatorGateway
@@ -321,8 +320,9 @@ void testLogTableSplits() throws Throwable {
321320
.get();
322321
enumerator.handleSourceEvent(1, new FinishedTieringEvent(tableId));
323322

323+
// write rows to every bucket
324324
Map<Integer, Long> bucketOffsetOfSecondWrite =
325-
appendRow(tablePath, DEFAULT_LOG_TABLE_DESCRIPTOR, 0, 10);
325+
appendRow(tablePath, DEFAULT_LOG_TABLE_DESCRIPTOR, 1, 10);
326326

327327
// request tiering table splits
328328
for (int subtaskId = 0; subtaskId < numSubtasks; subtaskId++) {
@@ -362,12 +362,6 @@ void testPartitionedPrimaryKeyTable() throws Throwable {
362362
FLUSS_CLUSTER_EXTENSION.waitUntilPartitionsCreated(
363363
tablePath, TABLE_AUTO_PARTITION_NUM_PRECREATE.defaultValue());
364364

365-
final Map<Long, Map<Integer, Long>> bucketOffsetOfInitialWrite =
366-
upsertRowForPartitionedTable(
367-
tablePath, DEFAULT_PK_TABLE_DESCRIPTOR, partitionNameByIds, 0, 10);
368-
long snapshotId = 0;
369-
waitUntilPartitionTableSnapshot(tableId, partitionNameByIds, snapshotId);
370-
371365
int numSubtasks = 6;
372366
int expectNumberOfSplits = 6;
373367
// test get snapshot split assignment
@@ -384,46 +378,39 @@ void testPartitionedPrimaryKeyTable() throws Throwable {
384378
registerReader(context, enumerator, subtaskId, "localhost-" + subtaskId);
385379
enumerator.handleSplitRequest(subtaskId, "localhost-" + subtaskId);
386380
}
387-
waitUntilTieringTableSplitAssignmentReady(
388-
context, DEFAULT_BUCKET_NUM * partitionNameByIds.size(), 3000L);
389381

390-
List<TieringSplit> expectedSnapshotAssignment = new ArrayList<>();
391-
for (Map.Entry<String, Long> partitionNameById : partitionNameByIds.entrySet()) {
392-
for (int tableBucket = 0; tableBucket < DEFAULT_BUCKET_NUM; tableBucket++) {
393-
long partitionId = partitionNameById.getValue();
394-
expectedSnapshotAssignment.add(
395-
new TieringSnapshotSplit(
396-
tablePath,
397-
new TableBucket(tableId, partitionId, tableBucket),
398-
partitionNameById.getKey(),
399-
snapshotId,
400-
bucketOffsetOfInitialWrite.get(partitionId).get(tableBucket),
401-
expectNumberOfSplits));
402-
}
403-
}
382+
// try to assign splits
383+
context.runPeriodicCallable(0);
384+
404385
List<TieringSplit> actualSnapshotAssignment = new ArrayList<>();
405386
for (SplitsAssignment<TieringSplit> splitsAssignment :
406387
context.getSplitsAssignmentSequence()) {
407388
splitsAssignment.assignment().values().forEach(actualSnapshotAssignment::addAll);
408389
}
409-
assertThat(sortSplits(actualSnapshotAssignment))
410-
.isEqualTo(sortSplits(expectedSnapshotAssignment));
390+
391+
// no snapshot split should be assigned for empty buckets
392+
assertThat(actualSnapshotAssignment).isEmpty();
411393

412394
// mock finished tiered this round, check second round
413395
context.getSplitsAssignmentSequence().clear();
396+
final Map<Long, Map<Integer, Long>> bucketOffsetOfInitialWrite = new HashMap<>();
414397
for (Map.Entry<String, Long> partitionNameById : partitionNameByIds.entrySet()) {
415-
Map<Integer, Long> partitionInitialBucketOffsets = new HashMap<>();
398+
Map<Integer, Long> partitionBucketOffsetOfEarliest = new HashMap<>();
399+
Map<Integer, Long> partitionBucketOffsetOfInitialWrite = new HashMap<>();
416400
for (int tableBucket = 0; tableBucket < DEFAULT_BUCKET_NUM; tableBucket++) {
417-
partitionInitialBucketOffsets.put(tableBucket, EARLIEST_OFFSET);
401+
partitionBucketOffsetOfEarliest.put(tableBucket, EARLIEST_OFFSET);
402+
partitionBucketOffsetOfInitialWrite.put(tableBucket, 0L);
418403
}
404+
bucketOffsetOfInitialWrite.put(
405+
partitionNameById.getValue(), partitionBucketOffsetOfInitialWrite);
419406
// commit lake table partition
420407
coordinatorGateway
421408
.commitLakeTableSnapshot(
422409
genCommitLakeTableSnapshotRequest(
423410
tableId,
424411
partitionNameById.getValue(),
425412
1,
426-
partitionInitialBucketOffsets,
413+
partitionBucketOffsetOfEarliest,
427414
bucketOffsetOfInitialWrite.get(
428415
partitionNameById.getValue())))
429416
.get();
@@ -434,7 +421,7 @@ void testPartitionedPrimaryKeyTable() throws Throwable {
434421
Map<Long, Map<Integer, Long>> bucketOffsetOfSecondWrite =
435422
upsertRowForPartitionedTable(
436423
tablePath, DEFAULT_PK_TABLE_DESCRIPTOR, partitionNameByIds, 10, 20);
437-
snapshotId = 1;
424+
long snapshotId = 0;
438425
waitUntilPartitionTableSnapshot(tableId, partitionNameByIds, snapshotId);
439426

440427
// request tiering table splits
@@ -447,16 +434,16 @@ void testPartitionedPrimaryKeyTable() throws Throwable {
447434
List<TieringSplit> expectedLogAssignment = new ArrayList<>();
448435
for (Map.Entry<String, Long> partitionNameById : partitionNameByIds.entrySet()) {
449436
for (int tableBucket = 0; tableBucket < DEFAULT_BUCKET_NUM; tableBucket++) {
450-
long partionId = partitionNameById.getValue();
437+
long partitionId = partitionNameById.getValue();
451438
expectedLogAssignment.add(
452439
new TieringLogSplit(
453440
tablePath,
454-
new TableBucket(tableId, partionId, tableBucket),
441+
new TableBucket(tableId, partitionId, tableBucket),
455442
partitionNameById.getKey(),
456-
bucketOffsetOfInitialWrite.get(partionId).get(tableBucket),
457-
bucketOffsetOfInitialWrite.get(partionId).get(tableBucket)
443+
bucketOffsetOfInitialWrite.get(partitionId).get(tableBucket),
444+
bucketOffsetOfInitialWrite.get(partitionId).get(tableBucket)
458445
+ bucketOffsetOfSecondWrite
459-
.get(partionId)
446+
.get(partitionId)
460447
.get(tableBucket),
461448
expectNumberOfSplits));
462449
}
@@ -496,21 +483,29 @@ void testPartitionedLogTableSplits() throws Throwable {
496483
registerReader(context, enumerator, subtaskId, "localhost-" + subtaskId);
497484
enumerator.handleSplitRequest(subtaskId, "localhost-" + subtaskId);
498485
}
499-
waitUntilTieringTableSplitAssignmentReady(
500-
context, DEFAULT_BUCKET_NUM * partitionNameByIds.size(), 3000L);
486+
487+
Map<Long, Map<Integer, Long>> bucketOffsetOfFirstWrite =
488+
appendRowForPartitionedTable(
489+
tablePath,
490+
DEFAULT_AUTO_PARTITIONED_LOG_TABLE_DESCRIPTOR,
491+
partitionNameByIds,
492+
0,
493+
1);
494+
495+
waitUntilTieringTableSplitAssignmentReady(context, partitionNameByIds.size(), 3000L);
501496

502497
List<TieringSplit> expectedAssignment = new ArrayList<>();
503498
for (Map.Entry<String, Long> partitionNameById : partitionNameByIds.entrySet()) {
504-
for (int tableBucket = 0; tableBucket < DEFAULT_BUCKET_NUM; tableBucket++) {
505-
long partitionId = partitionNameById.getValue();
499+
long partitionId = partitionNameById.getValue();
500+
for (int tableBucket : bucketOffsetOfFirstWrite.get(partitionId).keySet()) {
506501
expectedAssignment.add(
507502
new TieringLogSplit(
508503
tablePath,
509504
new TableBucket(tableId, partitionId, tableBucket),
510505
partitionNameById.getKey(),
511506
EARLIEST_OFFSET,
512-
0L,
513-
expectNumberOfSplits));
507+
bucketOffsetOfFirstWrite.get(partitionId).get(tableBucket),
508+
bucketOffsetOfFirstWrite.size()));
514509
}
515510
}
516511
List<TieringSplit> actualAssignment = new ArrayList<>();
@@ -529,7 +524,11 @@ void testPartitionedLogTableSplits() throws Throwable {
529524
Map<Integer, Long> partitionBucketOffsetOfInitialWrite = new HashMap<>();
530525
for (int tableBucket = 0; tableBucket < DEFAULT_BUCKET_NUM; tableBucket++) {
531526
partitionInitialBucketOffsets.put(tableBucket, EARLIEST_OFFSET);
532-
partitionBucketOffsetOfInitialWrite.put(tableBucket, 0L);
527+
partitionBucketOffsetOfInitialWrite.put(
528+
tableBucket,
529+
bucketOffsetOfFirstWrite
530+
.getOrDefault(partitionId, Collections.emptyMap())
531+
.getOrDefault(tableBucket, 0L));
533532
}
534533
bucketOffsetOfInitialWrite.put(partitionId, partitionBucketOffsetOfInitialWrite);
535534
// commit lake table partition
@@ -551,7 +550,7 @@ void testPartitionedLogTableSplits() throws Throwable {
551550
tablePath,
552551
DEFAULT_AUTO_PARTITIONED_LOG_TABLE_DESCRIPTOR,
553552
partitionNameByIds,
554-
0,
553+
1,
555554
10);
556555

557556
// request tiering table splits
@@ -649,9 +648,11 @@ void testHandleFailedTieringTableEvent() throws Throwable {
649648
void testHandleFailOverEvent() throws Throwable {
650649
TablePath tablePath1 = TablePath.of(DEFAULT_DB, "tiering-failover-test-log-table1");
651650
createTable(tablePath1, DEFAULT_LOG_TABLE_DESCRIPTOR);
651+
appendRow(tablePath1, DEFAULT_LOG_TABLE_DESCRIPTOR, 0, 10);
652652

653653
TablePath tablePath2 = TablePath.of(DEFAULT_DB, "tiering-failover-test-log-table2");
654654
createTable(tablePath2, DEFAULT_LOG_TABLE_DESCRIPTOR);
655+
appendRow(tablePath2, DEFAULT_LOG_TABLE_DESCRIPTOR, 0, 10);
655656

656657
int numSubtasks = 1;
657658
try (MockSplitEnumeratorContext<TieringSplit> context =

0 commit comments

Comments
 (0)