Skip to content

Commit a60fe93

Browse files
[flink] Fluss enumerator partition discovery should be in fixed delay rather than fixed rate. (#1633)
1 parent 680ca7e commit a60fe93

File tree

4 files changed

+227
-22
lines changed

4 files changed

+227
-22
lines changed

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.apache.fluss.row.InternalRow;
4848
import org.apache.fluss.utils.ExceptionUtils;
4949

50+
import org.apache.flink.annotation.Internal;
5051
import org.apache.flink.annotation.VisibleForTesting;
5152
import org.apache.flink.api.connector.source.SourceEvent;
5253
import org.apache.flink.api.connector.source.SplitEnumerator;
@@ -87,11 +88,13 @@
8788
* will be assigned to same reader.
8889
* </ul>
8990
*/
91+
@Internal
9092
public class FlinkSourceEnumerator
9193
implements SplitEnumerator<SourceSplitBase, SourceEnumeratorState> {
9294

9395
private static final Logger LOG = LoggerFactory.getLogger(FlinkSourceEnumerator.class);
9496

97+
private final WorkerExecutor workerExecutor;
9598
private final TablePath tablePath;
9699
private final boolean hasPrimaryKey;
97100
private final boolean isPartitioned;
@@ -148,18 +151,22 @@ public FlinkSourceEnumerator(
148151
OffsetsInitializer startingOffsetsInitializer,
149152
long scanPartitionDiscoveryIntervalMs,
150153
boolean streaming,
151-
@Nullable Predicate partitionFilters) {
154+
@Nullable Predicate partitionFilters,
155+
@Nullable LakeSource<LakeSplit> lakeSource) {
152156
this(
153157
tablePath,
154158
flussConf,
155159
hasPrimaryKey,
156160
isPartitioned,
157161
context,
162+
Collections.emptySet(),
163+
Collections.emptyMap(),
164+
null,
158165
startingOffsetsInitializer,
159166
scanPartitionDiscoveryIntervalMs,
160167
streaming,
161168
partitionFilters,
162-
null);
169+
lakeSource);
163170
}
164171

165172
public FlinkSourceEnumerator(
@@ -168,6 +175,9 @@ public FlinkSourceEnumerator(
168175
boolean hasPrimaryKey,
169176
boolean isPartitioned,
170177
SplitEnumeratorContext<SourceSplitBase> context,
178+
Set<TableBucket> assignedTableBuckets,
179+
Map<Long, String> assignedPartitions,
180+
List<SourceSplitBase> pendingHybridLakeFlussSplits,
171181
OffsetsInitializer startingOffsetsInitializer,
172182
long scanPartitionDiscoveryIntervalMs,
173183
boolean streaming,
@@ -179,17 +189,18 @@ public FlinkSourceEnumerator(
179189
hasPrimaryKey,
180190
isPartitioned,
181191
context,
182-
Collections.emptySet(),
183-
Collections.emptyMap(),
184-
null,
192+
assignedTableBuckets,
193+
assignedPartitions,
194+
pendingHybridLakeFlussSplits,
185195
startingOffsetsInitializer,
186196
scanPartitionDiscoveryIntervalMs,
187197
streaming,
188198
partitionFilters,
189-
lakeSource);
199+
lakeSource,
200+
new WorkerExecutor(context));
190201
}
191202

192-
public FlinkSourceEnumerator(
203+
FlinkSourceEnumerator(
193204
TablePath tablePath,
194205
Configuration flussConf,
195206
boolean hasPrimaryKey,
@@ -202,7 +213,8 @@ public FlinkSourceEnumerator(
202213
long scanPartitionDiscoveryIntervalMs,
203214
boolean streaming,
204215
@Nullable Predicate partitionFilters,
205-
@Nullable LakeSource<LakeSplit> lakeSource) {
216+
@Nullable LakeSource<LakeSplit> lakeSource,
217+
WorkerExecutor workerExecutor) {
206218
this.tablePath = checkNotNull(tablePath);
207219
this.flussConf = checkNotNull(flussConf);
208220
this.hasPrimaryKey = hasPrimaryKey;
@@ -222,6 +234,7 @@ public FlinkSourceEnumerator(
222234
this.stoppingOffsetsInitializer =
223235
streaming ? new NoStoppingOffsetsInitializer() : OffsetsInitializer.latest();
224236
this.lakeSource = lakeSource;
237+
this.workerExecutor = workerExecutor;
225238
}
226239

227240
@Override
@@ -257,8 +270,8 @@ public void start() {
257270
+ "with new partition discovery interval of {} ms.",
258271
tablePath,
259272
scanPartitionDiscoveryIntervalMs);
260-
// discover new partitions and handle new partitions
261-
context.callAsync(
273+
// discover new partitions and handle new partitions at fixed delay.
274+
workerExecutor.callAsyncAtFixedDelay(
262275
this::listPartitions,
263276
this::checkPartitionChanges,
264277
0,
@@ -268,7 +281,7 @@ public void start() {
268281
LOG.info(
269282
"Starting the FlussSourceEnumerator for table {} without partition discovery.",
270283
tablePath);
271-
context.callAsync(this::listPartitions, this::checkPartitionChanges);
284+
workerExecutor.callAsync(this::listPartitions, this::checkPartitionChanges);
272285
}
273286
} else {
274287
startInBatchMode();
@@ -434,7 +447,7 @@ private void checkPartitionChanges(Set<PartitionInfo> partitionInfos, Throwable
434447
partitionChange.newPartitions.size(),
435448
tablePath,
436449
partitionChange.newPartitions);
437-
context.callAsync(
450+
workerExecutor.callAsync(
438451
() -> initPartitionedSplits(partitionChange.newPartitions),
439452
this::handleSplitsAdd);
440453
}
@@ -872,6 +885,9 @@ public SourceEnumeratorState snapshotState(long checkpointId) {
872885
public void close() throws IOException {
873886
try {
874887
closed = true;
888+
if (workerExecutor != null) {
889+
workerExecutor.close();
890+
}
875891
if (flussAdmin != null) {
876892
flussAdmin.close();
877893
}
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.flink.source.enumerator;
19+
20+
import org.apache.fluss.annotation.Internal;
21+
import org.apache.fluss.exception.FlussRuntimeException;
22+
import org.apache.fluss.flink.source.split.SourceSplitBase;
23+
import org.apache.fluss.utils.concurrent.ExecutorThreadFactory;
24+
25+
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
26+
27+
import java.util.concurrent.Callable;
28+
import java.util.concurrent.CountDownLatch;
29+
import java.util.concurrent.Executors;
30+
import java.util.concurrent.ScheduledExecutorService;
31+
import java.util.concurrent.TimeUnit;
32+
import java.util.function.BiConsumer;
33+
34+
/**
35+
* A worker executor that extends {@link SplitEnumeratorContext} with fixed delay scheduling
36+
* capabilities for asynchronous tasks.
37+
*
38+
* <p>{@link SplitEnumeratorContext} natively only supports fixed rate scheduling for asynchronous
39+
* calls, which can lead to task accumulation if individual calls take too long to complete.
40+
*
41+
* <p>This executor wraps a single-threaded {@link ScheduledExecutorService} to handle async
42+
* operations and route their results back to the coordinator thread through the {@link
43+
* SplitEnumeratorContext#callAsync} methods.
44+
*
45+
* <p>TODO: This class is a workaround and should be removed once FLINK-38335 is completed.
46+
*/
47+
@Internal
48+
public class WorkerExecutor implements AutoCloseable {
49+
protected final SplitEnumeratorContext<SourceSplitBase> context;
50+
private final ScheduledExecutorService scheduledExecutor;
51+
52+
public WorkerExecutor(SplitEnumeratorContext<SourceSplitBase> context) {
53+
this.context = context;
54+
this.scheduledExecutor =
55+
Executors.newScheduledThreadPool(
56+
1, new ExecutorThreadFactory("SplitEnumeratorContextWrapper"));
57+
}
58+
59+
public <T> void callAsync(Callable<T> callable, BiConsumer<T, Throwable> handler) {
60+
context.callAsync(callable, handler);
61+
}
62+
63+
public <T> void callAsyncAtFixedDelay(
64+
Callable<T> callable, BiConsumer<T, Throwable> handler, long initialDelay, long delay) {
65+
scheduledExecutor.scheduleWithFixedDelay(
66+
() -> {
67+
final CountDownLatch latch = new CountDownLatch(1);
68+
context.callAsync(
69+
() -> {
70+
try {
71+
return callable.call();
72+
} finally {
73+
latch.countDown();
74+
}
75+
},
76+
handler);
77+
// wait for the call to complete
78+
try {
79+
latch.await();
80+
} catch (InterruptedException e) {
81+
throw new FlussRuntimeException(
82+
"Interrupted while waiting for async call to complete", e);
83+
}
84+
},
85+
initialDelay,
86+
delay,
87+
TimeUnit.MILLISECONDS);
88+
}
89+
90+
@Override
91+
public void close() throws Exception {
92+
scheduledExecutor.shutdownNow();
93+
}
94+
}

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ void testPkTableNoSnapshotSplits() throws Throwable {
9797
OffsetsInitializer.full(),
9898
DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS,
9999
streaming,
100+
null,
100101
null);
101102

102103
enumerator.start();
@@ -144,6 +145,7 @@ void testPkTableWithSnapshotSplits() throws Throwable {
144145
OffsetsInitializer.full(),
145146
DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS,
146147
streaming,
148+
null,
147149
null);
148150
enumerator.start();
149151
// register all read
@@ -215,6 +217,7 @@ void testNonPkTable() throws Throwable {
215217
OffsetsInitializer.full(),
216218
DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS,
217219
streaming,
220+
null,
218221
null);
219222

220223
enumerator.start();
@@ -261,6 +264,7 @@ void testReaderRegistrationTriggerAssignments() throws Throwable {
261264
OffsetsInitializer.full(),
262265
DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS,
263266
streaming,
267+
null,
264268
null);
265269

266270
enumerator.start();
@@ -297,6 +301,7 @@ void testAddSplitBack() throws Throwable {
297301
OffsetsInitializer.full(),
298302
DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS,
299303
streaming,
304+
null,
300305
null);
301306

302307
enumerator.start();
@@ -391,30 +396,36 @@ void testDiscoverPartitionsPeriodically(boolean isPrimaryKeyTable) throws Throwa
391396
ZooKeeperClient zooKeeperClient = FLUSS_CLUSTER_EXTENSION.getZooKeeperClient();
392397
try (MockSplitEnumeratorContext<SourceSplitBase> context =
393398
new MockSplitEnumeratorContext<>(numSubtasks);
399+
MockWorkExecutor workExecutor = new MockWorkExecutor(context);
394400
FlinkSourceEnumerator enumerator =
395401
new FlinkSourceEnumerator(
396402
DEFAULT_TABLE_PATH,
397403
flussConf,
398404
isPrimaryKeyTable,
399405
true,
400406
context,
407+
Collections.emptySet(),
408+
Collections.emptyMap(),
409+
null,
401410
OffsetsInitializer.full(),
402411
DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS,
403412
streaming,
404-
null)) {
413+
null,
414+
null,
415+
workExecutor)) {
405416
Map<Long, String> partitionNameByIds =
406417
waitUntilPartitions(zooKeeperClient, DEFAULT_TABLE_PATH);
407418
enumerator.start();
408419

409420
// invoke partition discovery callable again and there should be pending assignments.
410-
runPeriodicPartitionDiscovery(context);
421+
runPeriodicPartitionDiscovery(workExecutor);
411422

412423
// register two readers
413424
registerReader(context, enumerator, 0);
414425
registerReader(context, enumerator, 1);
415426

416427
// invoke partition discovery callable again, shouldn't produce RemovePartitionEvent.
417-
runPeriodicPartitionDiscovery(context);
428+
runPeriodicPartitionDiscovery(workExecutor);
418429
assertThat(context.getSentSourceEvent()).isEmpty();
419430

420431
// now, register the third reader
@@ -434,7 +445,7 @@ void testDiscoverPartitionsPeriodically(boolean isPrimaryKeyTable) throws Throwa
434445
createPartitions(zooKeeperClient, DEFAULT_TABLE_PATH, newPartitions);
435446

436447
/// invoke partition discovery callable again and there should assignments.
437-
runPeriodicPartitionDiscovery(context);
448+
runPeriodicPartitionDiscovery(workExecutor);
438449

439450
expectedAssignment = expectAssignments(enumerator, tableId, newPartitionNameIds);
440451
actualAssignments = getLastReadersAssignments(context);
@@ -450,7 +461,7 @@ void testDiscoverPartitionsPeriodically(boolean isPrimaryKeyTable) throws Throwa
450461
createPartitions(zooKeeperClient, DEFAULT_TABLE_PATH, newPartitions);
451462

452463
// invoke partition discovery callable again
453-
runPeriodicPartitionDiscovery(context);
464+
runPeriodicPartitionDiscovery(workExecutor);
454465

455466
// there should be partition removed events
456467
Map<Integer, List<SourceEvent>> sentSourceEvents = context.getSentSourceEvent();
@@ -516,6 +527,7 @@ void testGetSplitOwner() throws Exception {
516527
OffsetsInitializer.full(),
517528
DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS,
518529
streaming,
530+
null,
519531
null)) {
520532

521533
// test splits for same non-partitioned bucket, should assign to same task
@@ -587,13 +599,12 @@ private void checkAssignmentIgnoreOrder(
587599
}
588600
}
589601

590-
private void runPeriodicPartitionDiscovery(MockSplitEnumeratorContext<SourceSplitBase> context)
591-
throws Throwable {
602+
private void runPeriodicPartitionDiscovery(MockWorkExecutor workExecutor) throws Throwable {
592603
// Fetch potential topic descriptions
593-
context.runPeriodicCallable(PARTITION_DISCOVERY_CALLABLE_INDEX);
604+
workExecutor.runPeriodicCallable(PARTITION_DISCOVERY_CALLABLE_INDEX);
594605
// Initialize offsets for discovered partitions
595-
if (!context.getOneTimeCallables().isEmpty()) {
596-
context.runNextOneTimeCallable();
606+
if (!workExecutor.getOneTimeCallables().isEmpty()) {
607+
workExecutor.runNextOneTimeCallable();
597608
}
598609
}
599610

0 commit comments

Comments
 (0)