Skip to content

Commit 88b7afa

Browse files
committed
[FLINK-38061][mysql] Make defensive copies of collections in SnapshotPendingSplitsState to prevent concurrent modification issues
1 parent 19596e3 commit 88b7afa

1 file changed

Lines changed: 12 additions & 7 deletions

File tree

flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/state/SnapshotPendingSplitsState.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@
2626
import io.debezium.relational.TableId;
2727
import io.debezium.relational.history.TableChanges;
2828

29+
import java.util.ArrayList;
30+
import java.util.HashMap;
31+
import java.util.LinkedHashMap;
2932
import java.util.List;
3033
import java.util.Map;
3134
import java.util.Objects;
@@ -89,17 +92,19 @@ public SnapshotPendingSplitsState(
8992
boolean isRemainingTablesCheckpointed,
9093
Map<String, Long> splitFinishedCheckpointIds,
9194
ChunkSplitterState chunkSplitterState) {
92-
this.alreadyProcessedTables = alreadyProcessedTables;
93-
this.remainingSplits = remainingSplits;
94-
this.assignedSplits = assignedSplits;
95-
this.splitFinishedOffsets = splitFinishedOffsets;
95+
// FLINK-38061: make defensive copy to avoid potential concurrent modification of the
96+
// collections.
97+
this.alreadyProcessedTables = new ArrayList<>(alreadyProcessedTables);
98+
this.remainingSplits = new ArrayList<>(remainingSplits);
99+
this.assignedSplits = new LinkedHashMap<>(assignedSplits);
100+
this.splitFinishedOffsets = new HashMap<>(splitFinishedOffsets);
96101
this.assignerStatus = assignerStatus;
97-
this.remainingTables = remainingTables;
102+
this.remainingTables = new ArrayList<>(remainingTables);
98103
this.isTableIdCaseSensitive = isTableIdCaseSensitive;
99104
this.isRemainingTablesCheckpointed = isRemainingTablesCheckpointed;
100-
this.tableSchemas = tableSchemas;
105+
this.tableSchemas = new HashMap<>(tableSchemas);
101106
this.chunkSplitterState = chunkSplitterState;
102-
this.splitFinishedCheckpointIds = splitFinishedCheckpointIds;
107+
this.splitFinishedCheckpointIds = new HashMap<>(splitFinishedCheckpointIds);
103108
}
104109

105110
public Map<String, Long> getSplitFinishedCheckpointIds() {

0 commit comments

Comments
 (0)