Skip to content

Commit 09bcdc7

Browse files
committed
[flink] Support detect buckets change for partitioned table
1 parent c009e11 commit 09bcdc7

File tree

3 files changed

+166
-13
lines changed

3 files changed

+166
-13
lines changed

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java

Lines changed: 35 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -451,6 +451,21 @@ private void checkPartitionChanges(Set<PartitionInfo> partitionInfos, Throwable
451451
() -> initPartitionedSplits(partitionChange.newPartitions),
452452
this::handleSplitsAdd);
453453
}
454+
455+
// We also need to handle unchanged partitions, because the assigned buckets within a
456+
// partition may also change.
457+
// This mainly occurs in two situations:
458+
// 1. During restore, only a subset of buckets under a partition may have been assigned,
459+
// and we need to assign the remaining buckets. This is required for all types of
460+
// partitioned tables.
461+
// 2. When the number of buckets in a table is changed, new buckets may be added under a
462+
// partition, and we need to assign these newly added buckets. Currently, this situation
463+
// only occurs in log tables without bucket keys.
464+
if (!partitionChange.unchangedPartitions.isEmpty()) {
465+
workerExecutor.callAsync(
466+
() -> initPartitionedSplits(partitionChange.unchangedPartitions),
467+
this::handleSplitsAdd);
468+
}
454469
}
455470

456471
private PartitionChange getPartitionChange(Set<PartitionInfo> fetchedPartitionInfos) {
@@ -459,6 +474,7 @@ private PartitionChange getPartitionChange(Set<PartitionInfo> fetchedPartitionIn
459474
.map(p -> new Partition(p.getPartitionId(), p.getPartitionName()))
460475
.collect(Collectors.toSet());
461476
final Set<Partition> removedPartitions = new HashSet<>();
477+
final Set<Partition> unChangedPartitions = new HashSet<>();
462478

463479
Set<Partition> assignedOrPendingPartitions = new HashSet<>();
464480
assignedPartitions.forEach(
@@ -485,6 +501,8 @@ private PartitionChange getPartitionChange(Set<PartitionInfo> fetchedPartitionIn
485501
p -> {
486502
if (!newPartitions.remove(p)) {
487503
removedPartitions.add(p);
504+
} else {
505+
unChangedPartitions.add(p);
488506
}
489507
});
490508

@@ -495,7 +513,7 @@ private PartitionChange getPartitionChange(Set<PartitionInfo> fetchedPartitionIn
495513
LOG.info("Discovered new partitions: {}", newPartitions);
496514
}
497515

498-
return new PartitionChange(newPartitions, removedPartitions);
516+
return new PartitionChange(newPartitions, removedPartitions, unChangedPartitions);
499517
}
500518

501519
private List<SourceSplitBase> initPartitionedSplits(Collection<Partition> newPartitions) {
@@ -636,9 +654,14 @@ private List<SourceSplitBase> generateHybridLakeFlussSplits() {
636654
}
637655

638656
private boolean ignoreTableBucket(TableBucket tableBucket) {
639-
// if the bucket has been assigned, we can ignore it
640-
// the bucket has been assigned, skip
641-
return assignedTableBuckets.contains(tableBucket);
657+
// if the bucket has been assigned or pending, we can ignore it
658+
// the bucket has been assigned or pending, skip
659+
Set<TableBucket> pendingBuckets =
660+
pendingSplitAssignment.values().stream()
661+
.flatMap(Collection::stream)
662+
.map(SourceSplitBase::getTableBucket)
663+
.collect(Collectors.toSet());
664+
return assignedTableBuckets.contains(tableBucket) || pendingBuckets.contains(tableBucket);
642665
}
643666

644667
private void handlePartitionsRemoved(Collection<Partition> removedPartitionInfo) {
@@ -904,15 +927,21 @@ public void close() throws IOException {
904927
private static class PartitionChange {
905928
private final Collection<Partition> newPartitions;
906929
private final Collection<Partition> removedPartitions;
930+
private final Collection<Partition> unchangedPartitions;
907931

908932
PartitionChange(
909-
Collection<Partition> newPartitions, Collection<Partition> removedPartitions) {
933+
Collection<Partition> newPartitions,
934+
Collection<Partition> removedPartitions,
935+
Collection<Partition> noChangePartitions) {
910936
this.newPartitions = newPartitions;
911937
this.removedPartitions = removedPartitions;
938+
this.unchangedPartitions = noChangePartitions;
912939
}
913940

914941
public boolean isEmpty() {
915-
return newPartitions.isEmpty() && removedPartitions.isEmpty();
942+
return newPartitions.isEmpty()
943+
&& removedPartitions.isEmpty()
944+
&& unchangedPartitions.isEmpty();
916945
}
917946
}
918947

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java

Lines changed: 121 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -336,9 +336,14 @@ void testAddSplitBack() throws Throwable {
336336
}
337337
}
338338

339-
@Test
340-
void testRestore() throws Throwable {
341-
long tableId = createTable(DEFAULT_TABLE_PATH, DEFAULT_PK_TABLE_DESCRIPTOR);
339+
@ParameterizedTest
340+
@ValueSource(booleans = {true, false})
341+
void testRestoreForNonPartitionedTable(boolean isPrimaryKeyTable) throws Throwable {
342+
TableDescriptor tableDescriptor =
343+
isPrimaryKeyTable
344+
? DEFAULT_AUTO_PARTITIONED_PK_TABLE_DESCRIPTOR
345+
: DEFAULT_AUTO_PARTITIONED_LOG_TABLE_DESCRIPTOR;
346+
long tableId = createTable(DEFAULT_TABLE_PATH, tableDescriptor);
342347
int numSubtasks = 3;
343348
// test get snapshot split & log split and the assignment
344349
try (MockSplitEnumeratorContext<SourceSplitBase> context =
@@ -353,7 +358,7 @@ void testRestore() throws Throwable {
353358
new FlinkSourceEnumerator(
354359
DEFAULT_TABLE_PATH,
355360
flussConf,
356-
false,
361+
isPrimaryKeyTable,
357362
false,
358363
context,
359364
assignedBuckets,
@@ -384,6 +389,96 @@ void testRestore() throws Throwable {
384389
}
385390
}
386391

392+
@ParameterizedTest
393+
@ValueSource(booleans = {true, false})
394+
void testRestoreForPartitionedTable(boolean isPrimaryKeyTable) throws Throwable {
395+
int numSubtasks = 3;
396+
TableDescriptor tableDescriptor =
397+
isPrimaryKeyTable
398+
? DEFAULT_AUTO_PARTITIONED_PK_TABLE_DESCRIPTOR
399+
: DEFAULT_AUTO_PARTITIONED_LOG_TABLE_DESCRIPTOR;
400+
long tableId = createTable(DEFAULT_TABLE_PATH, tableDescriptor);
401+
ZooKeeperClient zooKeeperClient = FLUSS_CLUSTER_EXTENSION.getZooKeeperClient();
402+
403+
// Wait until partitions are created
404+
Map<Long, String> partitionNameByIds =
405+
waitUntilPartitions(zooKeeperClient, DEFAULT_TABLE_PATH);
406+
assertThat(partitionNameByIds.size()).isGreaterThanOrEqualTo(2);
407+
408+
// Get first two partitions for testing
409+
List<Map.Entry<Long, String>> partitionEntries =
410+
new ArrayList<>(partitionNameByIds.entrySet());
411+
Long partition1Id = partitionEntries.get(0).getKey();
412+
String partition1Name = partitionEntries.get(0).getValue();
413+
Long partition2Id = partitionEntries.get(1).getKey();
414+
String partition2Name = partitionEntries.get(1).getValue();
415+
416+
// Mock that partition1's all buckets and partition2's bucket0 have been assigned
417+
Set<TableBucket> assignedBuckets = new HashSet<>();
418+
// All buckets of partition1 are assigned
419+
for (int bucketId = 0; bucketId < DEFAULT_BUCKET_NUM; bucketId++) {
420+
assignedBuckets.add(new TableBucket(tableId, partition1Id, bucketId));
421+
}
422+
// Only bucket0 of partition2 is assigned
423+
assignedBuckets.add(new TableBucket(tableId, partition2Id, 0));
424+
425+
// Mock assigned partitions (partition1 and partition2 are both assigned)
426+
Map<Long, String> assignedPartitions = new HashMap<>();
427+
assignedPartitions.put(partition1Id, partition1Name);
428+
assignedPartitions.put(partition2Id, partition2Name);
429+
430+
try (MockSplitEnumeratorContext<SourceSplitBase> context =
431+
new MockSplitEnumeratorContext<>(numSubtasks);
432+
MockWorkExecutor workerExecutor = new MockWorkExecutor(context);
433+
// Mock restore with assigned buckets and partitions
434+
FlinkSourceEnumerator enumerator =
435+
new FlinkSourceEnumerator(
436+
DEFAULT_TABLE_PATH,
437+
flussConf,
438+
isPrimaryKeyTable,
439+
true,
440+
context,
441+
assignedBuckets,
442+
assignedPartitions,
443+
Collections.emptyList(),
444+
OffsetsInitializer.earliest(),
445+
DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS,
446+
streaming,
447+
null,
448+
null,
449+
workerExecutor)) {
450+
451+
enumerator.start();
452+
assertThat(context.getSplitsAssignmentSequence()).isEmpty();
453+
454+
// invoke partition discovery callable again and there should be pending assignments.
455+
runPeriodicPartitionDiscovery(workerExecutor);
456+
457+
// Register all readers
458+
for (int i = 0; i < numSubtasks; i++) {
459+
registerReader(context, enumerator, i);
460+
}
461+
462+
// Check assignment: should contain partition2's bucket1 and bucket2
463+
// (partition1's all buckets are already assigned, partition2's bucket0 is assigned)
464+
Map<Integer, List<SourceSplitBase>> expectedAssignment = new HashMap<>();
465+
LogSplit split1 = genLogSplit(tableId, partition2Id, 1, partition2Name);
466+
LogSplit split2 = genLogSplit(tableId, partition2Id, 2, partition2Name);
467+
int task1 = enumerator.getSplitOwner(split1);
468+
int task2 = enumerator.getSplitOwner(split2);
469+
expectedAssignment.computeIfAbsent(task1, k -> new ArrayList<>()).add(split1);
470+
expectedAssignment.computeIfAbsent(task2, k -> new ArrayList<>()).add(split2);
471+
472+
Map<Integer, List<SourceSplitBase>> actualAssignment = getReadersAssignments(context);
473+
checkAssignmentIgnoreOrder(actualAssignment, expectedAssignment);
474+
475+
// Verify that assigned partitions are correctly tracked
476+
assertThat(enumerator.getAssignedPartitions())
477+
.containsEntry(partition1Id, partition1Name)
478+
.containsEntry(partition2Id, partition2Name);
479+
}
480+
}
481+
387482
@ParameterizedTest
388483
@ValueSource(booleans = {true, false})
389484
void testDiscoverPartitionsPeriodically(boolean isPrimaryKeyTable) throws Throwable {
@@ -444,7 +539,7 @@ void testDiscoverPartitionsPeriodically(boolean isPrimaryKeyTable) throws Throwa
444539
Map<Long, String> newPartitionNameIds =
445540
createPartitions(zooKeeperClient, DEFAULT_TABLE_PATH, newPartitions);
446541

447-
/// invoke partition discovery callable again and there should assignments.
542+
// invoke partition discovery callable again and there should assignments.
448543
runPeriodicPartitionDiscovery(workExecutor);
449544

450545
expectedAssignment = expectAssignments(enumerator, tableId, newPartitionNameIds);
@@ -570,6 +665,19 @@ private void registerReader(
570665
enumerator.addReader(readerId);
571666
}
572667

668+
/** Get expect assignments of non-partitioned table. */
669+
private Map<Integer, List<SourceSplitBase>> expectAssignments(
670+
FlinkSourceEnumerator enumerator, long tableId, int bucketNum) {
671+
Map<Integer, List<SourceSplitBase>> expectedAssignment = new HashMap<>();
672+
for (int i = 0; i < bucketNum; i++) {
673+
TableBucket tableBucket = new TableBucket(tableId, i);
674+
LogSplit logSplit = new LogSplit(tableBucket, null, EARLIEST_OFFSET);
675+
int task = enumerator.getSplitOwner(logSplit);
676+
expectedAssignment.computeIfAbsent(task, k -> new ArrayList<>()).add(logSplit);
677+
}
678+
return expectedAssignment;
679+
}
680+
573681
private Map<Integer, List<SourceSplitBase>> expectAssignments(
574682
FlinkSourceEnumerator enumerator, long tableId, Map<Long, String> partitionNameIds) {
575683
Map<Integer, List<SourceSplitBase>> expectedAssignment = new HashMap<>();
@@ -603,13 +711,19 @@ private void runPeriodicPartitionDiscovery(MockWorkExecutor workExecutor) throws
603711
// Fetch potential topic descriptions
604712
workExecutor.runPeriodicCallable(PARTITION_DISCOVERY_CALLABLE_INDEX);
605713
// Initialize offsets for discovered partitions
606-
if (!workExecutor.getOneTimeCallables().isEmpty()) {
714+
while (!workExecutor.getOneTimeCallables().isEmpty()) {
607715
workExecutor.runNextOneTimeCallable();
608716
}
609717
}
610718

611719
private LogSplit genLogSplit(long tableId, int bucketId) {
612-
return new LogSplit(new TableBucket(tableId, bucketId), null, -2L);
720+
return new LogSplit(new TableBucket(tableId, bucketId), null, EARLIEST_OFFSET);
721+
}
722+
723+
private LogSplit genLogSplit(
724+
long tableId, Long partitionId, int bucketId, String partitionName) {
725+
return new LogSplit(
726+
new TableBucket(tableId, partitionId, bucketId), partitionName, EARLIEST_OFFSET);
613727
}
614728

615729
private Map<Integer, List<SourceSplitBase>> getReadersAssignments(

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkTestBase.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.fluss.metadata.DatabaseDescriptor;
3232
import org.apache.fluss.metadata.Schema;
3333
import org.apache.fluss.metadata.TableBucket;
34+
import org.apache.fluss.metadata.TableChange;
3435
import org.apache.fluss.metadata.TableDescriptor;
3536
import org.apache.fluss.metadata.TableInfo;
3637
import org.apache.fluss.metadata.TablePath;
@@ -52,6 +53,7 @@
5253
import java.time.Duration;
5354
import java.util.ArrayList;
5455
import java.util.Collection;
56+
import java.util.Collections;
5557
import java.util.HashMap;
5658
import java.util.List;
5759
import java.util.Map;
@@ -261,6 +263,14 @@ public static void dropPartitions(
261263
}
262264
}
263265

266+
public static void alterTableBucket(TablePath tablePath, int bucketNum) throws Exception {
267+
admin.alterTable(
268+
tablePath,
269+
Collections.singletonList(TableChange.bucketNum(bucketNum)),
270+
false)
271+
.get();
272+
}
273+
264274
public static List<String> writeRowsToPartition(
265275
Connection connection, TablePath tablePath, Collection<String> partitions)
266276
throws Exception {

0 commit comments

Comments
 (0)