Skip to content

Commit a362ac1

Browse files
committed
[flink] Fluss enumerator partition discovery in fixed delay rather than fixed rate.
1 parent 1f9de70 commit a362ac1

File tree

4 files changed

+227
-28
lines changed

4 files changed

+227
-28
lines changed

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java

Lines changed: 34 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.fluss.flink.source.enumerator;
1919

20+
import org.apache.flink.annotation.Internal;
2021
import org.apache.fluss.client.Connection;
2122
import org.apache.fluss.client.ConnectionFactory;
2223
import org.apache.fluss.client.admin.Admin;
@@ -69,6 +70,11 @@
6970
import java.util.Objects;
7071
import java.util.OptionalLong;
7172
import java.util.Set;
73+
import java.util.concurrent.Callable;
74+
import java.util.concurrent.Executors;
75+
import java.util.concurrent.ScheduledExecutorService;
76+
import java.util.concurrent.TimeUnit;
77+
import java.util.function.BiConsumer;
7278
import java.util.stream.Collectors;
7379

7480
import static org.apache.fluss.utils.Preconditions.checkNotNull;
@@ -85,11 +91,14 @@
8591
* will be assigned to same reader.
8692
* </ul>
8793
*/
94+
@Internal
8895
public class FlinkSourceEnumerator
8996
implements SplitEnumerator<SourceSplitBase, SourceEnumeratorState> {
97+
// TODO: remove it until SplitEnumeratorContext.callAsync supports fixed delay.
9098

9199
private static final Logger LOG = LoggerFactory.getLogger(FlinkSourceEnumerator.class);
92100

101+
private final WorkerExecutor workerExecutor;
93102
private final TablePath tablePath;
94103
private final boolean hasPrimaryKey;
95104
private final boolean isPartitioned;
@@ -146,18 +155,22 @@ public FlinkSourceEnumerator(
146155
OffsetsInitializer startingOffsetsInitializer,
147156
long scanPartitionDiscoveryIntervalMs,
148157
boolean streaming,
149-
List<FieldEqual> partitionFilters) {
158+
List<FieldEqual> partitionFilters,
159+
@Nullable LakeSource<LakeSplit> lakeSource) {
150160
this(
151161
tablePath,
152162
flussConf,
153163
hasPrimaryKey,
154164
isPartitioned,
155165
context,
166+
Collections.emptySet(),
167+
Collections.emptyMap(),
168+
null,
156169
startingOffsetsInitializer,
157170
scanPartitionDiscoveryIntervalMs,
158171
streaming,
159172
partitionFilters,
160-
null);
173+
lakeSource);
161174
}
162175

163176
public FlinkSourceEnumerator(
@@ -166,6 +179,9 @@ public FlinkSourceEnumerator(
166179
boolean hasPrimaryKey,
167180
boolean isPartitioned,
168181
SplitEnumeratorContext<SourceSplitBase> context,
182+
Set<TableBucket> assignedTableBuckets,
183+
Map<Long, String> assignedPartitions,
184+
List<SourceSplitBase> pendingHybridLakeFlussSplits,
169185
OffsetsInitializer startingOffsetsInitializer,
170186
long scanPartitionDiscoveryIntervalMs,
171187
boolean streaming,
@@ -177,17 +193,19 @@ public FlinkSourceEnumerator(
177193
hasPrimaryKey,
178194
isPartitioned,
179195
context,
180-
Collections.emptySet(),
181-
Collections.emptyMap(),
182-
null,
196+
assignedTableBuckets,
197+
assignedPartitions,
198+
pendingHybridLakeFlussSplits,
183199
startingOffsetsInitializer,
184200
scanPartitionDiscoveryIntervalMs,
185201
streaming,
186202
partitionFilters,
187-
lakeSource);
203+
lakeSource,
204+
new WorkerExecutor(context));
205+
188206
}
189207

190-
public FlinkSourceEnumerator(
208+
FlinkSourceEnumerator(
191209
TablePath tablePath,
192210
Configuration flussConf,
193211
boolean hasPrimaryKey,
@@ -200,7 +218,9 @@ public FlinkSourceEnumerator(
200218
long scanPartitionDiscoveryIntervalMs,
201219
boolean streaming,
202220
List<FieldEqual> partitionFilters,
203-
@Nullable LakeSource<LakeSplit> lakeSource) {
221+
@Nullable LakeSource<LakeSplit> lakeSource,
222+
WorkerExecutor workerExecutor
223+
) {
204224
this.tablePath = checkNotNull(tablePath);
205225
this.flussConf = checkNotNull(flussConf);
206226
this.hasPrimaryKey = hasPrimaryKey;
@@ -220,8 +240,8 @@ public FlinkSourceEnumerator(
220240
this.stoppingOffsetsInitializer =
221241
streaming ? new NoStoppingOffsetsInitializer() : OffsetsInitializer.latest();
222242
this.lakeSource = lakeSource;
243+
this.workerExecutor = workerExecutor;
223244
}
224-
225245
@Override
226246
public void start() {
227247
// init admin client
@@ -255,8 +275,8 @@ public void start() {
255275
+ "with new partition discovery interval of {} ms.",
256276
tablePath,
257277
scanPartitionDiscoveryIntervalMs);
258-
// discover new partitions and handle new partitions
259-
context.callAsync(
278+
// discover new partitions and handle new partitions at fixed delay.
279+
workerExecutor.callAsyncAtFixedDelay(
260280
this::listPartitions,
261281
this::checkPartitionChanges,
262282
0,
@@ -266,7 +286,7 @@ public void start() {
266286
LOG.info(
267287
"Starting the FlussSourceEnumerator for table {} without partition discovery.",
268288
tablePath);
269-
context.callAsync(this::listPartitions, this::checkPartitionChanges);
289+
workerExecutor.callAsync(this::listPartitions, this::checkPartitionChanges);
270290
}
271291
} else {
272292
startInBatchMode();
@@ -399,7 +419,7 @@ private void checkPartitionChanges(Set<PartitionInfo> partitionInfos, Throwable
399419
handlePartitionsRemoved(partitionChange.removedPartitions);
400420

401421
// handle new partitions
402-
context.callAsync(
422+
workerExecutor.callAsync(
403423
() -> initPartitionedSplits(partitionChange.newPartitions), this::handleSplitsAdd);
404424
}
405425

@@ -846,6 +866,7 @@ public void close() throws IOException {
846866
}
847867
}
848868

869+
849870
// --------------- private class ---------------
850871
/** A container class to hold the newly added partitions and removed partitions. */
851872
private static class PartitionChange {
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.flink.source.enumerator;
19+
20+
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
21+
import org.apache.fluss.annotation.Internal;
22+
import org.apache.fluss.flink.source.split.SourceSplitBase;
23+
24+
import java.util.concurrent.Callable;
25+
import java.util.concurrent.Executors;
26+
import java.util.concurrent.ScheduledExecutorService;
27+
import java.util.concurrent.TimeUnit;
28+
import java.util.function.BiConsumer;
29+
30+
/**
31+
* A worker executor that is used to schedule asynchronous tasks to extend {@link SplitEnumeratorContext}
32+
* with fixed delay capabilities. This class serves as a workaround until {@link SplitEnumeratorContext}
33+
* natively supports asynchronous calls with fixed delay scheduling.
34+
*
35+
* <p>This executor wraps a single-threaded {@link ScheduledExecutorService} to handle async operations
36+
* and route their results back to the coordinator thread through the {@link SplitEnumeratorContext#callAsync} methods.
37+
*/
38+
@Internal
39+
public class WorkerExecutor {
40+
protected final SplitEnumeratorContext<SourceSplitBase> context;
41+
private final ScheduledExecutorService workerExecutor;
42+
43+
public WorkerExecutor(SplitEnumeratorContext<SourceSplitBase> context) {
44+
this.context = context;
45+
this.workerExecutor = Executors.newScheduledThreadPool(1);
46+
}
47+
48+
public <T> void callAsync(Callable<T> callable, BiConsumer<T, Throwable> handler) {
49+
workerExecutor.execute(
50+
() -> {
51+
try {
52+
T result = callable.call();
53+
// reuse the context async call to notify coordinator thread.
54+
context.callAsync(() -> result, handler);
55+
} catch (Throwable t) {
56+
context.callAsync(
57+
() -> {
58+
throw t;
59+
},
60+
handler);
61+
}
62+
});
63+
}
64+
65+
public <T> void callAsyncAtFixedDelay(
66+
Callable<T> callable, BiConsumer<T, Throwable> handler, long initialDelay, long delay) {
67+
workerExecutor.scheduleWithFixedDelay(
68+
() -> {
69+
try {
70+
T result = callable.call();
71+
// reuse the context async call to notify coordinator thread.
72+
context.callAsync(() -> result, handler);
73+
} catch (Throwable t) {
74+
context.callAsync(
75+
() -> {
76+
throw t;
77+
},
78+
handler);
79+
}
80+
},
81+
initialDelay,
82+
delay,
83+
TimeUnit.MILLISECONDS);
84+
}
85+
86+
}

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.apache.flink.api.connector.source.SourceEvent;
4444
import org.apache.flink.api.connector.source.SplitsAssignment;
4545
import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext;
46+
import org.jetbrains.annotations.NotNull;
4647
import org.junit.jupiter.api.BeforeAll;
4748
import org.junit.jupiter.api.Test;
4849
import org.junit.jupiter.params.ParameterizedTest;
@@ -58,6 +59,10 @@
5859
import java.util.List;
5960
import java.util.Map;
6061
import java.util.Set;
62+
import java.util.concurrent.Callable;
63+
import java.util.concurrent.ScheduledExecutorService;
64+
import java.util.concurrent.ScheduledFuture;
65+
import java.util.concurrent.TimeUnit;
6166

6267
import static org.apache.fluss.client.table.scanner.log.LogScanner.EARLIEST_OFFSET;
6368
import static org.apache.fluss.testutils.DataTestUtils.row;
@@ -97,7 +102,8 @@ void testPkTableNoSnapshotSplits() throws Throwable {
97102
OffsetsInitializer.full(),
98103
DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS,
99104
streaming,
100-
Collections.emptyList());
105+
Collections.emptyList(),
106+
null);
101107

102108
enumerator.start();
103109

@@ -144,7 +150,7 @@ void testPkTableWithSnapshotSplits() throws Throwable {
144150
OffsetsInitializer.full(),
145151
DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS,
146152
streaming,
147-
Collections.emptyList());
153+
Collections.emptyList(), null);
148154
enumerator.start();
149155
// register all read
150156
for (int i = 0; i < numSubtasks; i++) {
@@ -215,7 +221,7 @@ void testNonPkTable() throws Throwable {
215221
OffsetsInitializer.full(),
216222
DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS,
217223
streaming,
218-
Collections.emptyList());
224+
Collections.emptyList(), null);
219225

220226
enumerator.start();
221227

@@ -261,7 +267,7 @@ void testReaderRegistrationTriggerAssignments() throws Throwable {
261267
OffsetsInitializer.full(),
262268
DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS,
263269
streaming,
264-
Collections.emptyList());
270+
Collections.emptyList(), null);
265271

266272
enumerator.start();
267273

@@ -297,7 +303,7 @@ void testAddSplitBack() throws Throwable {
297303
OffsetsInitializer.full(),
298304
DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS,
299305
streaming,
300-
Collections.emptyList());
306+
Collections.emptyList(), null);
301307

302308
enumerator.start();
303309

@@ -339,6 +345,7 @@ void testRestore() throws Throwable {
339345
try (MockSplitEnumeratorContext<SourceSplitBase> context =
340346
new MockSplitEnumeratorContext<>(numSubtasks)) {
341347

348+
342349
// mock bucket1 has been assigned
343350
TableBucket bucket1 = new TableBucket(tableId, 1);
344351
Set<TableBucket> assignedBuckets = new HashSet<>(Collections.singletonList(bucket1));
@@ -391,30 +398,35 @@ void testDiscoverPartitionsPeriodically(boolean isPrimaryKeyTable) throws Throwa
391398
ZooKeeperClient zooKeeperClient = FLUSS_CLUSTER_EXTENSION.getZooKeeperClient();
392399
try (MockSplitEnumeratorContext<SourceSplitBase> context =
393400
new MockSplitEnumeratorContext<>(numSubtasks);
401+
MockWorkExecutor workExecutor = new MockWorkExecutor(context);
394402
FlinkSourceEnumerator enumerator =
395403
new FlinkSourceEnumerator(
396404
DEFAULT_TABLE_PATH,
397405
flussConf,
398406
isPrimaryKeyTable,
399407
true,
400408
context,
409+
Collections.emptySet(),
410+
Collections.emptyMap(),
411+
null,
401412
OffsetsInitializer.full(),
402413
DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS,
403414
streaming,
404-
Collections.emptyList())) {
415+
Collections.emptyList(), null,
416+
workExecutor)) {
405417
Map<Long, String> partitionNameByIds =
406418
waitUntilPartitions(zooKeeperClient, DEFAULT_TABLE_PATH);
407419
enumerator.start();
408420

409421
// invoke partition discovery callable again and there should be pending assignments.
410-
runPeriodicPartitionDiscovery(context);
422+
runPeriodicPartitionDiscovery( workExecutor);
411423

412424
// register two readers
413425
registerReader(context, enumerator, 0);
414426
registerReader(context, enumerator, 1);
415427

416428
// invoke partition discovery callable again, shouldn't produce RemovePartitionEvent.
417-
runPeriodicPartitionDiscovery(context);
429+
runPeriodicPartitionDiscovery( workExecutor);
418430
assertThat(context.getSentSourceEvent()).isEmpty();
419431

420432
// now, register the third reader
@@ -434,7 +446,7 @@ void testDiscoverPartitionsPeriodically(boolean isPrimaryKeyTable) throws Throwa
434446
createPartitions(zooKeeperClient, DEFAULT_TABLE_PATH, newPartitions);
435447

436448
/// invoke partition discovery callable again and there should assignments.
437-
runPeriodicPartitionDiscovery(context);
449+
runPeriodicPartitionDiscovery(workExecutor);
438450

439451
expectedAssignment = expectAssignments(enumerator, tableId, newPartitionNameIds);
440452
actualAssignments = getLastReadersAssignments(context);
@@ -450,7 +462,7 @@ void testDiscoverPartitionsPeriodically(boolean isPrimaryKeyTable) throws Throwa
450462
createPartitions(zooKeeperClient, DEFAULT_TABLE_PATH, newPartitions);
451463

452464
// invoke partition discovery callable again
453-
runPeriodicPartitionDiscovery(context);
465+
runPeriodicPartitionDiscovery( workExecutor);
454466

455467
// there should be partition removed events
456468
Map<Integer, List<SourceEvent>> sentSourceEvents = context.getSentSourceEvent();
@@ -516,7 +528,7 @@ void testGetSplitOwner() throws Exception {
516528
OffsetsInitializer.full(),
517529
DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS,
518530
streaming,
519-
Collections.emptyList())) {
531+
Collections.emptyList(), null)) {
520532

521533
// test splits for same non-partitioned bucket, should assign to same task
522534
TableBucket t1 = new TableBucket(tableId, 0);
@@ -587,13 +599,13 @@ private void checkAssignmentIgnoreOrder(
587599
}
588600
}
589601

590-
private void runPeriodicPartitionDiscovery(MockSplitEnumeratorContext<SourceSplitBase> context)
602+
private void runPeriodicPartitionDiscovery( MockWorkExecutor workExecutor)
591603
throws Throwable {
592604
// Fetch potential topic descriptions
593-
context.runPeriodicCallable(PARTITION_DISCOVERY_CALLABLE_INDEX);
605+
workExecutor.runPeriodicCallable(PARTITION_DISCOVERY_CALLABLE_INDEX);
594606
// Initialize offsets for discovered partitions
595-
if (!context.getOneTimeCallables().isEmpty()) {
596-
context.runNextOneTimeCallable();
607+
if (!workExecutor.getOneTimeCallables().isEmpty()) {
608+
workExecutor.runNextOneTimeCallable();
597609
}
598610
}
599611

0 commit comments

Comments
 (0)