Skip to content

Commit c6fe0df

Browse files
committed
[connector] Flink source support detect bucket changes
1 parent 8a93663 commit c6fe0df

File tree

8 files changed

+163
-2
lines changed

8 files changed

+163
-2
lines changed

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/FlinkConnectorOptions.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,15 @@ public class FlinkConnectorOptions {
100100
+ "the new partitions for partitioned table while scanning."
101101
+ " A non-positive value disables the partition discovery.");
102102

103+
public static final ConfigOption<Duration> SCAN_BUCKET_DISCOVERY_INTERVAL =
104+
ConfigOptions.key("scan.bucket.discovery.interval")
105+
.durationType()
106+
.defaultValue(Duration.ofSeconds(10))
107+
.withDescription(
108+
"The time interval for the Fluss source to discover "
109+
+ "the new buckets for table while scanning."
110+
+ " A non-positive value disables the bucket discovery.");
111+
103112
public static final ConfigOption<Boolean> SINK_IGNORE_DELETE =
104113
ConfigOptions.key("sink.ignore-delete")
105114
.booleanType()

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/catalog/FlinkTableFactory.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,8 @@ public DynamicTableSource createDynamicTableSource(Context context) {
127127
tableOptions
128128
.get(FlinkConnectorOptions.SCAN_PARTITION_DISCOVERY_INTERVAL)
129129
.toMillis();
130+
long bucketDiscoveryIntervalMs =
131+
tableOptions.get(FlinkConnectorOptions.SCAN_BUCKET_DISCOVERY_INTERVAL).toMillis();
130132

131133
return new FlinkTableSource(
132134
toFlussTablePath(context.getObjectIdentifier()),
@@ -142,6 +144,7 @@ public DynamicTableSource createDynamicTableSource(Context context) {
142144
tableOptions.get(FlinkConnectorOptions.LOOKUP_ASYNC),
143145
cache,
144146
partitionDiscoveryIntervalMs,
147+
bucketDiscoveryIntervalMs,
145148
tableOptions.get(toFlinkOption(ConfigOptions.TABLE_DATALAKE_ENABLED)),
146149
tableOptions.get(toFlinkOption(ConfigOptions.TABLE_MERGE_ENGINE)),
147150
context.getCatalogTable().getOptions());

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlinkSource.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ public class FlinkSource<OUT>
6767
@Nullable private final int[] projectedFields;
6868
protected final OffsetsInitializer offsetsInitializer;
6969
protected final long scanPartitionDiscoveryIntervalMs;
70+
protected final long scanBucketDiscoveryIntervalMs;
7071
private final boolean streaming;
7172
private final FlussDeserializationSchema<OUT> deserializationSchema;
7273

@@ -83,6 +84,7 @@ public FlinkSource(
8384
@Nullable int[] projectedFields,
8485
OffsetsInitializer offsetsInitializer,
8586
long scanPartitionDiscoveryIntervalMs,
87+
long scanBucketDiscoveryIntervalMs,
8688
FlussDeserializationSchema<OUT> deserializationSchema,
8789
boolean streaming,
8890
List<FieldEqual> partitionFilters) {
@@ -95,6 +97,7 @@ public FlinkSource(
9597
projectedFields,
9698
offsetsInitializer,
9799
scanPartitionDiscoveryIntervalMs,
100+
scanBucketDiscoveryIntervalMs,
98101
deserializationSchema,
99102
streaming,
100103
partitionFilters,
@@ -110,6 +113,7 @@ public FlinkSource(
110113
@Nullable int[] projectedFields,
111114
OffsetsInitializer offsetsInitializer,
112115
long scanPartitionDiscoveryIntervalMs,
116+
long scanBucketDiscoveryIntervalMs,
113117
FlussDeserializationSchema<OUT> deserializationSchema,
114118
boolean streaming,
115119
List<FieldEqual> partitionFilters,
@@ -122,6 +126,7 @@ public FlinkSource(
122126
this.projectedFields = projectedFields;
123127
this.offsetsInitializer = offsetsInitializer;
124128
this.scanPartitionDiscoveryIntervalMs = scanPartitionDiscoveryIntervalMs;
129+
this.scanBucketDiscoveryIntervalMs = scanBucketDiscoveryIntervalMs;
125130
this.deserializationSchema = deserializationSchema;
126131
this.streaming = streaming;
127132
this.partitionFilters = checkNotNull(partitionFilters);
@@ -144,6 +149,7 @@ public SplitEnumerator<SourceSplitBase, SourceEnumeratorState> createEnumerator(
144149
splitEnumeratorContext,
145150
offsetsInitializer,
146151
scanPartitionDiscoveryIntervalMs,
152+
scanBucketDiscoveryIntervalMs,
147153
streaming,
148154
partitionFilters,
149155
lakeSource);
@@ -163,6 +169,7 @@ public SplitEnumerator<SourceSplitBase, SourceEnumeratorState> restoreEnumerator
163169
sourceEnumeratorState.getAssignedPartitions(),
164170
offsetsInitializer,
165171
scanPartitionDiscoveryIntervalMs,
172+
scanBucketDiscoveryIntervalMs,
166173
streaming,
167174
partitionFilters,
168175
lakeSource);

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlinkTableSource.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ public class FlinkTableSource
112112
@Nullable private final LookupCache cache;
113113

114114
private final long scanPartitionDiscoveryIntervalMs;
115+
private final long scanBucketDiscoveryIntervalMs;
115116
private final boolean isDataLakeEnabled;
116117
@Nullable private final MergeEngineType mergeEngineType;
117118

@@ -150,6 +151,7 @@ public FlinkTableSource(
150151
boolean lookupAsync,
151152
@Nullable LookupCache cache,
152153
long scanPartitionDiscoveryIntervalMs,
154+
long scanBucketDiscoveryIntervalMs,
153155
boolean isDataLakeEnabled,
154156
@Nullable MergeEngineType mergeEngineType,
155157
Map<String, String> tableOptions) {
@@ -168,6 +170,7 @@ public FlinkTableSource(
168170
this.cache = cache;
169171

170172
this.scanPartitionDiscoveryIntervalMs = scanPartitionDiscoveryIntervalMs;
173+
this.scanBucketDiscoveryIntervalMs = scanBucketDiscoveryIntervalMs;
171174
this.isDataLakeEnabled = isDataLakeEnabled;
172175
this.mergeEngineType = mergeEngineType;
173176
this.tableOptions = tableOptions;
@@ -280,6 +283,7 @@ public boolean isBounded() {
280283
projectedFields,
281284
offsetsInitializer,
282285
scanPartitionDiscoveryIntervalMs,
286+
scanBucketDiscoveryIntervalMs,
283287
new RowDataDeserializationSchema(),
284288
streaming,
285289
partitionFilters,
@@ -371,6 +375,7 @@ public DynamicTableSource copy() {
371375
lookupAsync,
372376
cache,
373377
scanPartitionDiscoveryIntervalMs,
378+
scanBucketDiscoveryIntervalMs,
374379
isDataLakeEnabled,
375380
mergeEngineType,
376381
tableOptions);

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlussSource.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
* .setProjectedFields("orderId", "amount")
4545
* .setStartingOffsets(OffsetsInitializer.earliest())
4646
* .setScanPartitionDiscoveryIntervalMs(1000L)
47+
* .setScanBucketDiscoveryIntervalMs(1000L)
4748
* .setDeserializationSchema(new OrderDeserializationSchema())
4849
* .build();
4950
*
@@ -68,6 +69,7 @@ public class FlussSource<OUT> extends FlinkSource<OUT> {
6869
@Nullable int[] projectedFields,
6970
OffsetsInitializer offsetsInitializer,
7071
long scanPartitionDiscoveryIntervalMs,
72+
long scanBucketDiscoveryIntervalMs,
7173
FlussDeserializationSchema<OUT> deserializationSchema,
7274
boolean streaming) {
7375
// TODO: Support partition pushDown in datastream
@@ -80,6 +82,7 @@ public class FlussSource<OUT> extends FlinkSource<OUT> {
8082
projectedFields,
8183
offsetsInitializer,
8284
scanPartitionDiscoveryIntervalMs,
85+
scanBucketDiscoveryIntervalMs,
8386
deserializationSchema,
8487
streaming,
8588
Collections.emptyList());

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlussSourceBuilder.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
* .setTable("orders")
5555
* .setProjectedFields("orderId", "amount")
5656
* .setScanPartitionDiscoveryIntervalMs(1000L)
57+
* .setScanBucketDiscoveryIntervalMs(1000L)
5758
* .setStartingOffsets(OffsetsInitializer.earliest())
5859
* .setDeserializationSchema(new OrderDeserializationSchema())
5960
* .build();
@@ -69,6 +70,7 @@ public class FlussSourceBuilder<OUT> {
6970
private int[] projectedFields;
7071
private String[] projectedFieldNames;
7172
private Long scanPartitionDiscoveryIntervalMs;
73+
private Long scanBucketDiscoveryIntervalMs;
7274
private OffsetsInitializer offsetsInitializer;
7375
private FlussDeserializationSchema<OUT> deserializationSchema;
7476

@@ -131,6 +133,21 @@ public FlussSourceBuilder<OUT> setScanPartitionDiscoveryIntervalMs(
131133
return this;
132134
}
133135

136+
/**
137+
* Sets the scan bucket discovery interval in milliseconds.
138+
*
139+
* <p>If not specified, the default value from {@link
140+
* FlinkConnectorOptions#SCAN_BUCKET_DISCOVERY_INTERVAL} is used.
141+
*
142+
* @param scanBucketDiscoveryIntervalMs interval in milliseconds
143+
* @return this builder
144+
*/
145+
public FlussSourceBuilder<OUT> setScanBucketDiscoveryIntervalMs(
146+
long scanBucketDiscoveryIntervalMs) {
147+
this.scanBucketDiscoveryIntervalMs = scanBucketDiscoveryIntervalMs;
148+
return this;
149+
}
150+
134151
/**
135152
* Sets the starting offsets strategy for the Fluss source.
136153
*
@@ -225,6 +242,12 @@ public FlussSource<OUT> build() {
225242
.toMillis();
226243
}
227244

245+
// if null use the default value:
246+
if (scanBucketDiscoveryIntervalMs == null) {
247+
scanBucketDiscoveryIntervalMs =
248+
FlinkConnectorOptions.SCAN_BUCKET_DISCOVERY_INTERVAL.defaultValue().toMillis();
249+
}
250+
228251
if (this.flussConf == null) {
229252
this.flussConf = new Configuration();
230253
}
@@ -299,6 +322,7 @@ public FlussSource<OUT> build() {
299322
projectedFields,
300323
offsetsInitializer,
301324
scanPartitionDiscoveryIntervalMs,
325+
scanBucketDiscoveryIntervalMs,
302326
deserializationSchema,
303327
true);
304328
}

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/enumerator/FlinkSourceEnumerator.java

Lines changed: 103 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ public class FlinkSourceEnumerator
111111
private final Set<TableBucket> assignedTableBuckets;
112112

113113
private final long scanPartitionDiscoveryIntervalMs;
114+
private final long scanBucketDiscoveryIntervalMs;
114115

115116
private final boolean streaming;
116117
private final OffsetsInitializer startingOffsetsInitializer;
@@ -142,6 +143,7 @@ public FlinkSourceEnumerator(
142143
SplitEnumeratorContext<SourceSplitBase> context,
143144
OffsetsInitializer startingOffsetsInitializer,
144145
long scanPartitionDiscoveryIntervalMs,
146+
long scanBucketDiscoveryIntervalMs,
145147
boolean streaming,
146148
List<FieldEqual> partitionFilters) {
147149
this(
@@ -152,6 +154,7 @@ public FlinkSourceEnumerator(
152154
context,
153155
startingOffsetsInitializer,
154156
scanPartitionDiscoveryIntervalMs,
157+
scanBucketDiscoveryIntervalMs,
155158
streaming,
156159
partitionFilters,
157160
null);
@@ -165,6 +168,7 @@ public FlinkSourceEnumerator(
165168
SplitEnumeratorContext<SourceSplitBase> context,
166169
OffsetsInitializer startingOffsetsInitializer,
167170
long scanPartitionDiscoveryIntervalMs,
171+
long scanBucketDiscoveryIntervalMs,
168172
boolean streaming,
169173
List<FieldEqual> partitionFilters,
170174
LakeSource<LakeSplit> lakeSource) {
@@ -178,6 +182,7 @@ public FlinkSourceEnumerator(
178182
Collections.emptyMap(),
179183
startingOffsetsInitializer,
180184
scanPartitionDiscoveryIntervalMs,
185+
scanBucketDiscoveryIntervalMs,
181186
streaming,
182187
partitionFilters,
183188
lakeSource);
@@ -193,6 +198,7 @@ public FlinkSourceEnumerator(
193198
Map<Long, String> assignedPartitions,
194199
OffsetsInitializer startingOffsetsInitializer,
195200
long scanPartitionDiscoveryIntervalMs,
201+
long scanBucketDiscoveryIntervalMs,
196202
boolean streaming,
197203
List<FieldEqual> partitionFilters,
198204
@Nullable LakeSource<LakeSplit> lakeSource) {
@@ -206,6 +212,7 @@ public FlinkSourceEnumerator(
206212
this.startingOffsetsInitializer = startingOffsetsInitializer;
207213
this.assignedPartitions = new HashMap<>(assignedPartitions);
208214
this.scanPartitionDiscoveryIntervalMs = scanPartitionDiscoveryIntervalMs;
215+
this.scanBucketDiscoveryIntervalMs = scanBucketDiscoveryIntervalMs;
209216
this.streaming = streaming;
210217
this.partitionFilters = checkNotNull(partitionFilters);
211218
this.stoppingOffsetsInitializer =
@@ -258,8 +265,26 @@ public void start() {
258265
if (!streaming) {
259266
startInBatchMode();
260267
} else {
261-
// init bucket splits and assign
262-
context.callAsync(this::initNonPartitionedSplits, this::handleSplitsAdd);
268+
// Currently, only log table support alter bucket.
269+
// So we only enable bucket discovery for log table.
270+
if (scanBucketDiscoveryIntervalMs > 0 && !hasPrimaryKey) {
271+
LOG.info(
272+
"Starting the FlussSourceEnumerator for table {} "
273+
+ "with new bucket discovery interval of {} ms.",
274+
tablePath,
275+
scanBucketDiscoveryIntervalMs);
276+
context.callAsync(
277+
this::listBuckets,
278+
this::checkBucketChanges,
279+
0,
280+
scanBucketDiscoveryIntervalMs);
281+
} else {
282+
LOG.info(
283+
"Starting the FlussSourceEnumerator for table {} without bucket discovery.",
284+
tablePath);
285+
// init bucket splits and assign
286+
context.callAsync(this::initNonPartitionedSplits, this::handleSplitsAdd);
287+
}
263288
}
264289
}
265290
}
@@ -292,6 +317,66 @@ private List<SourceSplitBase> initNonPartitionedSplits() {
292317
}
293318
}
294319

320+
private Set<TableBucket> listBuckets() {
321+
try {
322+
// Always refresh tableInfo to discovery bucket num change
323+
tableInfo = flussAdmin.getTableInfo(tablePath).get();
324+
int bucketNum = tableInfo.getNumBuckets();
325+
Set<TableBucket> tableBuckets = new HashSet<>(bucketNum);
326+
for (int bucketId = 0; bucketId < bucketNum; bucketId++) {
327+
TableBucket tableBucket = new TableBucket(tableInfo.getTableId(), null, bucketId);
328+
tableBuckets.add(tableBucket);
329+
}
330+
return tableBuckets;
331+
} catch (Exception e) {
332+
throw new FlinkRuntimeException(
333+
String.format("Failed to list buckets for %s", tablePath),
334+
ExceptionUtils.stripCompletionException(e));
335+
}
336+
}
337+
338+
private void checkBucketChanges(Set<TableBucket> tableBuckets, Throwable t) {
339+
if (closed) {
340+
// skip if the enumerator is closed to avoid unnecessary error logs
341+
return;
342+
}
343+
if (t != null) {
344+
LOG.error("Failed to list buckets for {}", tablePath, t);
345+
return;
346+
}
347+
final BucketChange bucketChange = getBucketChange(tableBuckets);
348+
if (bucketChange.isEmpty()) {
349+
return;
350+
}
351+
352+
// handle new buckets
353+
context.callAsync(() -> initBucketSplits(bucketChange.newBuckets), this::handleSplitsAdd);
354+
}
355+
356+
private BucketChange getBucketChange(Set<TableBucket> tableBuckets) {
357+
return null;
358+
}
359+
360+
private List<SourceSplitBase> initBucketSplits(Collection<TableBucket> newBuckets) {
361+
List<Integer> bucketsNeedInitOffset =
362+
newBuckets.stream().map(TableBucket::getBucket).collect(Collectors.toList());
363+
364+
List<SourceSplitBase> splits = new ArrayList<>(bucketsNeedInitOffset.size());
365+
if (!bucketsNeedInitOffset.isEmpty()) {
366+
startingOffsetsInitializer
367+
.getBucketOffsets(null, bucketsNeedInitOffset, bucketOffsetsRetriever)
368+
.forEach(
369+
(bucketId, startingOffset) ->
370+
splits.add(
371+
new LogSplit(
372+
new TableBucket(
373+
tableInfo.getTableId(), null, bucketId),
374+
null,
375+
startingOffset)));
376+
}
377+
return splits;
378+
}
379+
295380
private Set<PartitionInfo> listPartitions() {
296381
try {
297382
List<PartitionInfo> partitionInfos = flussAdmin.listPartitionInfos(tablePath).get();
@@ -775,6 +860,22 @@ public void close() throws IOException {
775860
}
776861

777862
// --------------- private class ---------------
863+
864+
/** A container class to hold the newly added buckets and removed buckets. */
865+
private static class BucketChange {
866+
private final Collection<TableBucket> newBuckets;
867+
private final Collection<TableBucket> removedBuckets;
868+
869+
BucketChange(Collection<TableBucket> newBuckets, Collection<TableBucket> removedBuckets) {
870+
this.newBuckets = newBuckets;
871+
this.removedBuckets = removedBuckets;
872+
}
873+
874+
public boolean isEmpty() {
875+
return newBuckets.isEmpty() && removedBuckets.isEmpty();
876+
}
877+
}
878+
778879
/** A container class to hold the newly added partitions and removed partitions. */
779880
private static class PartitionChange {
780881
private final Collection<Partition> newPartitions;

0 commit comments

Comments
 (0)