Skip to content

Commit 2e79073

Browse files
committed
Use AtomicInteger for thread-safe reference counting.
1 parent 5366137 commit 2e79073

1 file changed

Lines changed: 7 additions & 6 deletions

File tree

flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceStreamFetcher.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import java.util.concurrent.Executors;
4848
import java.util.concurrent.ThreadFactory;
4949
import java.util.concurrent.TimeUnit;
50+
import java.util.concurrent.atomic.AtomicInteger;
5051

5152
import static org.apache.flink.cdc.connectors.base.source.meta.wartermark.WatermarkEvent.isEndWatermarkEvent;
5253

@@ -57,9 +58,9 @@ public class IncrementalSourceStreamFetcher implements Fetcher<SourceRecords, So
5758
private final FetchTask.Context taskContext;
5859
private final ExecutorService executorService;
5960
private final Set<TableId> pureStreamPhaseTables;
61+
private final AtomicInteger numberOfRunningTasks;
6062

6163
private volatile ChangeEventQueue<DataChangeEvent> queue;
62-
private volatile boolean currentTaskRunning;
6364
private volatile Throwable readException;
6465

6566
private FetchTask<SourceSplitBase> streamFetchTask;
@@ -77,10 +78,10 @@ public IncrementalSourceStreamFetcher(FetchTask.Context taskContext, int subTask
7778
ThreadFactory threadFactory =
7879
new ThreadFactoryBuilder().setNameFormat("debezium-reader-" + subTaskId).build();
7980
this.executorService = Executors.newSingleThreadExecutor(threadFactory);
80-
this.currentTaskRunning = true;
8181
this.pureStreamPhaseTables = new HashSet<>();
8282
this.isBackfillSkipped = taskContext.getSourceConfig().isSkipSnapshotBackfill();
8383
this.supportsSplitKeyOptimization = taskContext.supportsSplitKeyOptimization();
84+
this.numberOfRunningTasks = new AtomicInteger(0);
8485
}
8586

8687
@Override
@@ -108,7 +109,7 @@ public void submitTask(FetchTask<SourceSplitBase> fetchTask) {
108109

109110
@Override
110111
public boolean isFinished() {
111-
return currentStreamSplit == null || !currentTaskRunning;
112+
return currentStreamSplit == null || numberOfRunningTasks.get() == 0;
112113
}
113114

114115
@Nullable
@@ -117,7 +118,7 @@ public Iterator<SourceRecords> pollSplitRecords() throws InterruptedException {
117118
checkReadException();
118119
final List<SourceRecord> sourceRecords = new ArrayList<>();
119120
// what happens if currentTaskRunning
120-
if (currentTaskRunning) {
121+
if (numberOfRunningTasks.get() > 0) {
121122
List<DataChangeEvent> batch = queue.poll();
122123
for (DataChangeEvent event : batch) {
123124
if (isEndWatermarkEvent(event.getRecord())) {
@@ -284,11 +285,11 @@ private void configureFilter() {
284285
}
285286

286287
public void startReadTask() {
287-
this.currentTaskRunning = true;
288+
this.numberOfRunningTasks.incrementAndGet();
288289
}
289290

290291
public void stopReadTask() throws Exception {
291-
this.currentTaskRunning = false;
292+
this.numberOfRunningTasks.decrementAndGet();
292293

293294
if (taskContext != null) {
294295
taskContext.close();

0 commit comments

Comments
 (0)