From bdc5ed520232a9f26d2e80219b958ecb4c23c861 Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Thu, 16 Apr 2026 20:02:05 +0800 Subject: [PATCH] [FLINK-38061][mysql] Make defensive copies of collections in SnapshotPendingSplitsState to prevent concurrent modification issues --- .../state/SnapshotPendingSplitsState.java | 19 ++++++++++++------- .../state/SnapshotPendingSplitsState.java | 16 ++++++++++------ 2 files changed, 22 insertions(+), 13 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/state/SnapshotPendingSplitsState.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/state/SnapshotPendingSplitsState.java index 367bc792d14..ca5e578bddc 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/state/SnapshotPendingSplitsState.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/state/SnapshotPendingSplitsState.java @@ -26,6 +26,9 @@ import io.debezium.relational.TableId; import io.debezium.relational.history.TableChanges; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -89,17 +92,19 @@ public SnapshotPendingSplitsState( boolean isRemainingTablesCheckpointed, Map splitFinishedCheckpointIds, ChunkSplitterState chunkSplitterState) { - this.alreadyProcessedTables = alreadyProcessedTables; - this.remainingSplits = remainingSplits; - this.assignedSplits = assignedSplits; - this.splitFinishedOffsets = splitFinishedOffsets; + // FLINK-38061: make defensive copy to avoid potential concurrent modification of the + // collections. + this.alreadyProcessedTables = new ArrayList<>(alreadyProcessedTables); + this.remainingSplits = new ArrayList<>(remainingSplits); + this.assignedSplits = new LinkedHashMap<>(assignedSplits); + this.splitFinishedOffsets = new HashMap<>(splitFinishedOffsets); this.assignerStatus = assignerStatus; - this.remainingTables = remainingTables; + this.remainingTables = new ArrayList<>(remainingTables); this.isTableIdCaseSensitive = isTableIdCaseSensitive; this.isRemainingTablesCheckpointed = isRemainingTablesCheckpointed; - this.tableSchemas = tableSchemas; + this.tableSchemas = new HashMap<>(tableSchemas); this.chunkSplitterState = chunkSplitterState; - this.splitFinishedCheckpointIds = splitFinishedCheckpointIds; + this.splitFinishedCheckpointIds = new HashMap<>(splitFinishedCheckpointIds); } public Map getSplitFinishedCheckpointIds() { diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/SnapshotPendingSplitsState.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/SnapshotPendingSplitsState.java index c49069a60b9..55df2e7dafa 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/SnapshotPendingSplitsState.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/SnapshotPendingSplitsState.java @@ -27,6 +27,8 @@ import io.debezium.relational.TableId; import io.debezium.relational.history.TableChanges.TableChange; +import java.util.ArrayList; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -84,15 +86,17 @@ public SnapshotPendingSplitsState( boolean isTableIdCaseSensitive, boolean isRemainingTablesCheckpointed, ChunkSplitterState chunkSplitterState) { - this.alreadyProcessedTables = alreadyProcessedTables; - this.remainingSplits = remainingSplits; - this.assignedSplits = assignedSplits; - this.splitFinishedOffsets = splitFinishedOffsets; + // FLINK-38061: make defensive copy to avoid potential concurrent modification of the + // collections. + this.alreadyProcessedTables = new ArrayList<>(alreadyProcessedTables); + this.remainingSplits = new ArrayList<>(remainingSplits); + this.assignedSplits = new LinkedHashMap<>(assignedSplits); + this.splitFinishedOffsets = new HashMap<>(splitFinishedOffsets); this.assignerStatus = assignerStatus; - this.remainingTables = remainingTables; + this.remainingTables = new ArrayList<>(remainingTables); this.isTableIdCaseSensitive = isTableIdCaseSensitive; this.isRemainingTablesCheckpointed = isRemainingTablesCheckpointed; - this.tableSchemas = tableSchemas; + this.tableSchemas = new HashMap<>(tableSchemas); this.chunkSplitterState = chunkSplitterState; }