Skip to content

Commit eb4f325

Browse files
committed
[flink] Flink source support detect new buckets for existing table or partition
1 parent c40ba30 commit eb4f325

File tree

10 files changed

+424
-56
lines changed

10 files changed

+424
-56
lines changed

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,21 @@ public class FlinkConnectorOptions {
107107
+ "as a small value would cause frequent requests and increase server load. In the future, "
108108
+ "once list partitions is optimized, the default value of this parameter can be reduced.");
109109

110+
public static final ConfigOption<Duration> SCAN_BUCKET_DISCOVERY_INTERVAL =
111+
ConfigOptions.key("scan.bucket.discovery.interval")
112+
.durationType()
113+
.defaultValue(Duration.ofMinutes(1))
114+
.withDescription(
115+
"The time interval for the Fluss source to discover "
116+
+ "the new buckets for table while scanning."
117+
+ " A non-positive value disables the bucket discovery. "
118+
+ "For partitioned tables, if "
119+
+ SCAN_PARTITION_DISCOVERY_INTERVAL.key()
120+
+ " is also set, then the minimum value of "
121+
+ SCAN_PARTITION_DISCOVERY_INTERVAL.key()
122+
+ " and the current value will be used as the time interval for "
123+
+ "discovering partitions and buckets.");
124+
110125
public static final ConfigOption<Boolean> SINK_IGNORE_DELETE =
111126
ConfigOptions.key("sink.ignore-delete")
112127
.booleanType()

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,8 @@ public DynamicTableSource createDynamicTableSource(Context context) {
137137
tableOptions
138138
.get(FlinkConnectorOptions.SCAN_PARTITION_DISCOVERY_INTERVAL)
139139
.toMillis();
140+
long bucketDiscoveryIntervalMs =
141+
tableOptions.get(FlinkConnectorOptions.SCAN_BUCKET_DISCOVERY_INTERVAL).toMillis();
140142

141143
return new FlinkTableSource(
142144
toFlussTablePath(context.getObjectIdentifier()),
@@ -151,6 +153,7 @@ public DynamicTableSource createDynamicTableSource(Context context) {
151153
tableOptions.get(FlinkConnectorOptions.LOOKUP_ASYNC),
152154
cache,
153155
partitionDiscoveryIntervalMs,
156+
bucketDiscoveryIntervalMs,
154157
tableOptions.get(toFlinkOption(ConfigOptions.TABLE_DATALAKE_ENABLED)),
155158
tableOptions.get(toFlinkOption(ConfigOptions.TABLE_MERGE_ENGINE)),
156159
context.getCatalogTable().getOptions());

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ public class FlinkSource<OUT>
6363
@Nullable private final int[] projectedFields;
6464
protected final OffsetsInitializer offsetsInitializer;
6565
protected final long scanPartitionDiscoveryIntervalMs;
66+
protected final long scanBucketDiscoveryIntervalMs;
6667
private final boolean streaming;
6768
private final FlussDeserializationSchema<OUT> deserializationSchema;
6869
@Nullable private final Predicate partitionFilters;
@@ -77,6 +78,7 @@ public FlinkSource(
7778
@Nullable int[] projectedFields,
7879
OffsetsInitializer offsetsInitializer,
7980
long scanPartitionDiscoveryIntervalMs,
81+
long scanBucketDiscoveryIntervalMs,
8082
FlussDeserializationSchema<OUT> deserializationSchema,
8183
boolean streaming,
8284
@Nullable Predicate partitionFilters) {
@@ -89,6 +91,7 @@ public FlinkSource(
8991
projectedFields,
9092
offsetsInitializer,
9193
scanPartitionDiscoveryIntervalMs,
94+
scanBucketDiscoveryIntervalMs,
9295
deserializationSchema,
9396
streaming,
9497
partitionFilters,
@@ -104,6 +107,7 @@ public FlinkSource(
104107
@Nullable int[] projectedFields,
105108
OffsetsInitializer offsetsInitializer,
106109
long scanPartitionDiscoveryIntervalMs,
110+
long scanBucketDiscoveryIntervalMs,
107111
FlussDeserializationSchema<OUT> deserializationSchema,
108112
boolean streaming,
109113
@Nullable Predicate partitionFilters,
@@ -116,6 +120,7 @@ public FlinkSource(
116120
this.projectedFields = projectedFields;
117121
this.offsetsInitializer = offsetsInitializer;
118122
this.scanPartitionDiscoveryIntervalMs = scanPartitionDiscoveryIntervalMs;
123+
this.scanBucketDiscoveryIntervalMs = scanBucketDiscoveryIntervalMs;
119124
this.deserializationSchema = deserializationSchema;
120125
this.streaming = streaming;
121126
this.partitionFilters = partitionFilters;
@@ -138,6 +143,7 @@ public SplitEnumerator<SourceSplitBase, SourceEnumeratorState> createEnumerator(
138143
splitEnumeratorContext,
139144
offsetsInitializer,
140145
scanPartitionDiscoveryIntervalMs,
146+
scanBucketDiscoveryIntervalMs,
141147
streaming,
142148
partitionFilters,
143149
lakeSource);
@@ -158,6 +164,7 @@ public SplitEnumerator<SourceSplitBase, SourceEnumeratorState> restoreEnumerator
158164
sourceEnumeratorState.getRemainingHybridLakeFlussSplits(),
159165
offsetsInitializer,
160166
scanPartitionDiscoveryIntervalMs,
167+
scanBucketDiscoveryIntervalMs,
161168
streaming,
162169
partitionFilters,
163170
lakeSource);

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ public class FlinkTableSource
131131
@Nullable private final LookupCache cache;
132132

133133
private final long scanPartitionDiscoveryIntervalMs;
134+
private final long scanBucketDiscoveryIntervalMs;
134135
private final boolean isDataLakeEnabled;
135136
@Nullable private final MergeEngineType mergeEngineType;
136137

@@ -168,6 +169,7 @@ public FlinkTableSource(
168169
boolean lookupAsync,
169170
@Nullable LookupCache cache,
170171
long scanPartitionDiscoveryIntervalMs,
172+
long scanBucketDiscoveryIntervalMs,
171173
boolean isDataLakeEnabled,
172174
@Nullable MergeEngineType mergeEngineType,
173175
Map<String, String> tableOptions) {
@@ -185,6 +187,7 @@ public FlinkTableSource(
185187
this.cache = cache;
186188

187189
this.scanPartitionDiscoveryIntervalMs = scanPartitionDiscoveryIntervalMs;
190+
this.scanBucketDiscoveryIntervalMs = scanBucketDiscoveryIntervalMs;
188191
this.isDataLakeEnabled = isDataLakeEnabled;
189192
this.mergeEngineType = mergeEngineType;
190193
this.tableOptions = tableOptions;
@@ -328,6 +331,7 @@ public boolean isBounded() {
328331
projectedFields,
329332
offsetsInitializer,
330333
scanPartitionDiscoveryIntervalMs,
334+
scanBucketDiscoveryIntervalMs,
331335
new RowDataDeserializationSchema(),
332336
streaming,
333337
partitionFilters,
@@ -446,6 +450,7 @@ public DynamicTableSource copy() {
446450
lookupAsync,
447451
cache,
448452
scanPartitionDiscoveryIntervalMs,
453+
scanBucketDiscoveryIntervalMs,
449454
isDataLakeEnabled,
450455
mergeEngineType,
451456
tableOptions);

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSource.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
* .setProjectedFields("orderId", "amount")
4343
* .setStartingOffsets(OffsetsInitializer.earliest())
4444
* .setScanPartitionDiscoveryIntervalMs(1000L)
45+
* .setScanBucketDiscoveryIntervalMs(1000L)
4546
* .setDeserializationSchema(new OrderDeserializationSchema())
4647
* .build();
4748
*
@@ -66,6 +67,7 @@ public class FlussSource<OUT> extends FlinkSource<OUT> {
6667
@Nullable int[] projectedFields,
6768
OffsetsInitializer offsetsInitializer,
6869
long scanPartitionDiscoveryIntervalMs,
70+
long scanBucketDiscoveryIntervalMs,
6971
FlussDeserializationSchema<OUT> deserializationSchema,
7072
boolean streaming) {
7173
// TODO: Support partition pushDown in datastream
@@ -78,6 +80,7 @@ public class FlussSource<OUT> extends FlinkSource<OUT> {
7880
projectedFields,
7981
offsetsInitializer,
8082
scanPartitionDiscoveryIntervalMs,
83+
scanBucketDiscoveryIntervalMs,
8184
deserializationSchema,
8285
streaming,
8386
null);

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSourceBuilder.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ public class FlussSourceBuilder<OUT> {
6969
private int[] projectedFields;
7070
private String[] projectedFieldNames;
7171
private Long scanPartitionDiscoveryIntervalMs;
72+
private Long scanBucketDiscoveryIntervalMs;
7273
private OffsetsInitializer offsetsInitializer;
7374
private FlussDeserializationSchema<OUT> deserializationSchema;
7475

@@ -131,6 +132,21 @@ public FlussSourceBuilder<OUT> setScanPartitionDiscoveryIntervalMs(
131132
return this;
132133
}
133134

135+
/**
136+
* Sets the scan bucket discovery interval in milliseconds.
137+
*
138+
* <p>If not specified, the default value from {@link
139+
* FlinkConnectorOptions#SCAN_BUCKET_DISCOVERY_INTERVAL} is used.
140+
*
141+
* @param scanBucketDiscoveryIntervalMs interval in milliseconds
142+
* @return this builder
143+
*/
144+
public FlussSourceBuilder<OUT> setScanBucketDiscoveryIntervalMs(
145+
long scanBucketDiscoveryIntervalMs) {
146+
this.scanBucketDiscoveryIntervalMs = scanBucketDiscoveryIntervalMs;
147+
return this;
148+
}
149+
134150
/**
135151
* Sets the starting offsets strategy for the Fluss source.
136152
*
@@ -225,6 +241,12 @@ public FlussSource<OUT> build() {
225241
.toMillis();
226242
}
227243

244+
// if null use the default value:
245+
if (scanBucketDiscoveryIntervalMs == null) {
246+
scanBucketDiscoveryIntervalMs =
247+
FlinkConnectorOptions.SCAN_BUCKET_DISCOVERY_INTERVAL.defaultValue().toMillis();
248+
}
249+
228250
if (this.flussConf == null) {
229251
this.flussConf = new Configuration();
230252
}
@@ -299,6 +321,7 @@ public FlussSource<OUT> build() {
299321
projectedFields,
300322
offsetsInitializer,
301323
scanPartitionDiscoveryIntervalMs,
324+
scanBucketDiscoveryIntervalMs,
302325
deserializationSchema,
303326
true);
304327
}

0 commit comments

Comments
 (0)