Skip to content

Commit c0cc07e

Browse files
committed
[WIP] Flink support detecting bucket change
1 parent 78f6641 commit c0cc07e

File tree

9 files changed

+531
-34
lines changed

9 files changed

+531
-34
lines changed

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/FlinkConnectorOptions.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,20 @@ public class FlinkConnectorOptions {
100100
+ "the new partitions for partitioned table while scanning."
101101
+ " A non-positive value disables the partition discovery.");
102102

103+
public static final ConfigOption<Duration> SCAN_BUCKET_DISCOVERY_INTERVAL =
104+
ConfigOptions.key("scan.bucket.discovery.interval")
105+
.durationType()
106+
.defaultValue(Duration.ofSeconds(10))
107+
.withDescription(
108+
"The time interval for the Fluss source to discover "
109+
+ "the new buckets for table while scanning."
110+
+ " A non-positive value disables the bucket discovery. "
111+
+ "For partitioned tables, if "
112+
+ SCAN_PARTITION_DISCOVERY_INTERVAL.key()
113+
+ " is also set, then the minimum value of "
114+
+ SCAN_PARTITION_DISCOVERY_INTERVAL.key()
115+
+ " and the current value will be used as the time interval for discovering partitions and buckets.");
116+
103117
public static final ConfigOption<Boolean> SINK_IGNORE_DELETE =
104118
ConfigOptions.key("sink.ignore-delete")
105119
.booleanType()

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,8 @@ public DynamicTableSource createDynamicTableSource(Context context) {
127127
tableOptions
128128
.get(FlinkConnectorOptions.SCAN_PARTITION_DISCOVERY_INTERVAL)
129129
.toMillis();
130+
long bucketDiscoveryIntervalMs =
131+
tableOptions.get(FlinkConnectorOptions.SCAN_BUCKET_DISCOVERY_INTERVAL).toMillis();
130132

131133
return new FlinkTableSource(
132134
toFlussTablePath(context.getObjectIdentifier()),
@@ -142,6 +144,7 @@ public DynamicTableSource createDynamicTableSource(Context context) {
142144
tableOptions.get(FlinkConnectorOptions.LOOKUP_ASYNC),
143145
cache,
144146
partitionDiscoveryIntervalMs,
147+
bucketDiscoveryIntervalMs,
145148
tableOptions.get(toFlinkOption(ConfigOptions.TABLE_DATALAKE_ENABLED)),
146149
tableOptions.get(toFlinkOption(ConfigOptions.TABLE_MERGE_ENGINE)),
147150
context.getCatalogTable().getOptions());

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ public class FlinkSource<OUT>
6767
@Nullable private final int[] projectedFields;
6868
protected final OffsetsInitializer offsetsInitializer;
6969
protected final long scanPartitionDiscoveryIntervalMs;
70+
protected final long scanBucketDiscoveryIntervalMs;
7071
private final boolean streaming;
7172
private final FlussDeserializationSchema<OUT> deserializationSchema;
7273

@@ -83,6 +84,7 @@ public FlinkSource(
8384
@Nullable int[] projectedFields,
8485
OffsetsInitializer offsetsInitializer,
8586
long scanPartitionDiscoveryIntervalMs,
87+
long scanBucketDiscoveryIntervalMs,
8688
FlussDeserializationSchema<OUT> deserializationSchema,
8789
boolean streaming,
8890
List<FieldEqual> partitionFilters) {
@@ -95,6 +97,7 @@ public FlinkSource(
9597
projectedFields,
9698
offsetsInitializer,
9799
scanPartitionDiscoveryIntervalMs,
100+
scanBucketDiscoveryIntervalMs,
98101
deserializationSchema,
99102
streaming,
100103
partitionFilters,
@@ -110,6 +113,7 @@ public FlinkSource(
110113
@Nullable int[] projectedFields,
111114
OffsetsInitializer offsetsInitializer,
112115
long scanPartitionDiscoveryIntervalMs,
116+
long scanBucketDiscoveryIntervalMs,
113117
FlussDeserializationSchema<OUT> deserializationSchema,
114118
boolean streaming,
115119
List<FieldEqual> partitionFilters,
@@ -122,6 +126,7 @@ public FlinkSource(
122126
this.projectedFields = projectedFields;
123127
this.offsetsInitializer = offsetsInitializer;
124128
this.scanPartitionDiscoveryIntervalMs = scanPartitionDiscoveryIntervalMs;
129+
this.scanBucketDiscoveryIntervalMs = scanBucketDiscoveryIntervalMs;
125130
this.deserializationSchema = deserializationSchema;
126131
this.streaming = streaming;
127132
this.partitionFilters = checkNotNull(partitionFilters);
@@ -144,6 +149,7 @@ public SplitEnumerator<SourceSplitBase, SourceEnumeratorState> createEnumerator(
144149
splitEnumeratorContext,
145150
offsetsInitializer,
146151
scanPartitionDiscoveryIntervalMs,
152+
scanBucketDiscoveryIntervalMs,
147153
streaming,
148154
partitionFilters,
149155
lakeSource);
@@ -164,6 +170,7 @@ public SplitEnumerator<SourceSplitBase, SourceEnumeratorState> restoreEnumerator
164170
sourceEnumeratorState.getRemainingHybridLakeFlussSplits(),
165171
offsetsInitializer,
166172
scanPartitionDiscoveryIntervalMs,
173+
scanBucketDiscoveryIntervalMs,
167174
streaming,
168175
partitionFilters,
169176
lakeSource);

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ public class FlinkTableSource
125125
@Nullable private final LookupCache cache;
126126

127127
private final long scanPartitionDiscoveryIntervalMs;
128+
private final long scanBucketDiscoveryIntervalMs;
128129
private final boolean isDataLakeEnabled;
129130
@Nullable private final MergeEngineType mergeEngineType;
130131

@@ -163,6 +164,7 @@ public FlinkTableSource(
163164
boolean lookupAsync,
164165
@Nullable LookupCache cache,
165166
long scanPartitionDiscoveryIntervalMs,
167+
long scanBucketDiscoveryIntervalMs,
166168
boolean isDataLakeEnabled,
167169
@Nullable MergeEngineType mergeEngineType,
168170
Map<String, String> tableOptions) {
@@ -181,6 +183,7 @@ public FlinkTableSource(
181183
this.cache = cache;
182184

183185
this.scanPartitionDiscoveryIntervalMs = scanPartitionDiscoveryIntervalMs;
186+
this.scanBucketDiscoveryIntervalMs = scanBucketDiscoveryIntervalMs;
184187
this.isDataLakeEnabled = isDataLakeEnabled;
185188
this.mergeEngineType = mergeEngineType;
186189
this.tableOptions = tableOptions;
@@ -311,6 +314,7 @@ public boolean isBounded() {
311314
projectedFields,
312315
offsetsInitializer,
313316
scanPartitionDiscoveryIntervalMs,
317+
scanBucketDiscoveryIntervalMs,
314318
new RowDataDeserializationSchema(),
315319
streaming,
316320
partitionFilters,
@@ -432,6 +436,7 @@ public DynamicTableSource copy() {
432436
lookupAsync,
433437
cache,
434438
scanPartitionDiscoveryIntervalMs,
439+
scanBucketDiscoveryIntervalMs,
435440
isDataLakeEnabled,
436441
mergeEngineType,
437442
tableOptions);

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSource.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
* .setProjectedFields("orderId", "amount")
4545
* .setStartingOffsets(OffsetsInitializer.earliest())
4646
* .setScanPartitionDiscoveryIntervalMs(1000L)
47+
* .setScanBucketDiscoveryIntervalMs(1000L)
4748
* .setDeserializationSchema(new OrderDeserializationSchema())
4849
* .build();
4950
*
@@ -68,6 +69,7 @@ public class FlussSource<OUT> extends FlinkSource<OUT> {
6869
@Nullable int[] projectedFields,
6970
OffsetsInitializer offsetsInitializer,
7071
long scanPartitionDiscoveryIntervalMs,
72+
long scanBucketDiscoveryIntervalMs,
7173
FlussDeserializationSchema<OUT> deserializationSchema,
7274
boolean streaming) {
7375
// TODO: Support partition pushDown in datastream
@@ -80,6 +82,7 @@ public class FlussSource<OUT> extends FlinkSource<OUT> {
8082
projectedFields,
8183
offsetsInitializer,
8284
scanPartitionDiscoveryIntervalMs,
85+
scanBucketDiscoveryIntervalMs,
8386
deserializationSchema,
8487
streaming,
8588
Collections.emptyList());

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSourceBuilder.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
* .setTable("orders")
5555
* .setProjectedFields("orderId", "amount")
5656
* .setScanPartitionDiscoveryIntervalMs(1000L)
57+
* .setScanBucketDiscoveryIntervalMs(1000L)
5758
* .setStartingOffsets(OffsetsInitializer.earliest())
5859
* .setDeserializationSchema(new OrderDeserializationSchema())
5960
* .build();
@@ -69,6 +70,7 @@ public class FlussSourceBuilder<OUT> {
6970
private int[] projectedFields;
7071
private String[] projectedFieldNames;
7172
private Long scanPartitionDiscoveryIntervalMs;
73+
private Long scanBucketDiscoveryIntervalMs;
7274
private OffsetsInitializer offsetsInitializer;
7375
private FlussDeserializationSchema<OUT> deserializationSchema;
7476

@@ -131,6 +133,21 @@ public FlussSourceBuilder<OUT> setScanPartitionDiscoveryIntervalMs(
131133
return this;
132134
}
133135

136+
/**
137+
* Sets the scan bucket discovery interval in milliseconds.
138+
*
139+
* <p>If not specified, the default value from {@link
140+
* FlinkConnectorOptions#SCAN_BUCKET_DISCOVERY_INTERVAL} is used.
141+
*
142+
* @param scanBucketDiscoveryIntervalMs interval in milliseconds
143+
* @return this builder
144+
*/
145+
public FlussSourceBuilder<OUT> setScanBucketDiscoveryIntervalMs(
146+
long scanBucketDiscoveryIntervalMs) {
147+
this.scanBucketDiscoveryIntervalMs = scanBucketDiscoveryIntervalMs;
148+
return this;
149+
}
150+
134151
/**
135152
* Sets the starting offsets strategy for the Fluss source.
136153
*
@@ -225,6 +242,12 @@ public FlussSource<OUT> build() {
225242
.toMillis();
226243
}
227244

245+
// if null use the default value:
246+
if (scanBucketDiscoveryIntervalMs == null) {
247+
scanBucketDiscoveryIntervalMs =
248+
FlinkConnectorOptions.SCAN_BUCKET_DISCOVERY_INTERVAL.defaultValue().toMillis();
249+
}
250+
228251
if (this.flussConf == null) {
229252
this.flussConf = new Configuration();
230253
}
@@ -299,6 +322,7 @@ public FlussSource<OUT> build() {
299322
projectedFields,
300323
offsetsInitializer,
301324
scanPartitionDiscoveryIntervalMs,
325+
scanBucketDiscoveryIntervalMs,
302326
deserializationSchema,
303327
true);
304328
}

0 commit comments

Comments
 (0)