Skip to content

Commit eb3cdc0

Browse files
authored
[FLINK-38061][mysql] Make defensive copies of collections in SnapshotPendingSplitsState to prevent concurrent modification issues (apache#4379)
1 parent 13cd198 commit eb3cdc0

2 files changed

Lines changed: 22 additions & 13 deletions

File tree

flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/state/SnapshotPendingSplitsState.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@
2626
import io.debezium.relational.TableId;
2727
import io.debezium.relational.history.TableChanges;
2828

29+
import java.util.ArrayList;
30+
import java.util.HashMap;
31+
import java.util.LinkedHashMap;
2932
import java.util.List;
3033
import java.util.Map;
3134
import java.util.Objects;
@@ -89,17 +92,19 @@ public SnapshotPendingSplitsState(
8992
boolean isRemainingTablesCheckpointed,
9093
Map<String, Long> splitFinishedCheckpointIds,
9194
ChunkSplitterState chunkSplitterState) {
92-
this.alreadyProcessedTables = alreadyProcessedTables;
93-
this.remainingSplits = remainingSplits;
94-
this.assignedSplits = assignedSplits;
95-
this.splitFinishedOffsets = splitFinishedOffsets;
95+
// FLINK-38061: make defensive copy to avoid potential concurrent modification of the
96+
// collections.
97+
this.alreadyProcessedTables = new ArrayList<>(alreadyProcessedTables);
98+
this.remainingSplits = new ArrayList<>(remainingSplits);
99+
this.assignedSplits = new LinkedHashMap<>(assignedSplits);
100+
this.splitFinishedOffsets = new HashMap<>(splitFinishedOffsets);
96101
this.assignerStatus = assignerStatus;
97-
this.remainingTables = remainingTables;
102+
this.remainingTables = new ArrayList<>(remainingTables);
98103
this.isTableIdCaseSensitive = isTableIdCaseSensitive;
99104
this.isRemainingTablesCheckpointed = isRemainingTablesCheckpointed;
100-
this.tableSchemas = tableSchemas;
105+
this.tableSchemas = new HashMap<>(tableSchemas);
101106
this.chunkSplitterState = chunkSplitterState;
102-
this.splitFinishedCheckpointIds = splitFinishedCheckpointIds;
107+
this.splitFinishedCheckpointIds = new HashMap<>(splitFinishedCheckpointIds);
103108
}
104109

105110
public Map<String, Long> getSplitFinishedCheckpointIds() {

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/SnapshotPendingSplitsState.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import io.debezium.relational.TableId;
2828
import io.debezium.relational.history.TableChanges.TableChange;
2929

30+
import java.util.ArrayList;
31+
import java.util.HashMap;
3032
import java.util.LinkedHashMap;
3133
import java.util.List;
3234
import java.util.Map;
@@ -84,15 +86,17 @@ public SnapshotPendingSplitsState(
8486
boolean isTableIdCaseSensitive,
8587
boolean isRemainingTablesCheckpointed,
8688
ChunkSplitterState chunkSplitterState) {
87-
this.alreadyProcessedTables = alreadyProcessedTables;
88-
this.remainingSplits = remainingSplits;
89-
this.assignedSplits = assignedSplits;
90-
this.splitFinishedOffsets = splitFinishedOffsets;
89+
// FLINK-38061: make defensive copy to avoid potential concurrent modification of the
90+
// collections.
91+
this.alreadyProcessedTables = new ArrayList<>(alreadyProcessedTables);
92+
this.remainingSplits = new ArrayList<>(remainingSplits);
93+
this.assignedSplits = new LinkedHashMap<>(assignedSplits);
94+
this.splitFinishedOffsets = new HashMap<>(splitFinishedOffsets);
9195
this.assignerStatus = assignerStatus;
92-
this.remainingTables = remainingTables;
96+
this.remainingTables = new ArrayList<>(remainingTables);
9397
this.isTableIdCaseSensitive = isTableIdCaseSensitive;
9498
this.isRemainingTablesCheckpointed = isRemainingTablesCheckpointed;
95-
this.tableSchemas = tableSchemas;
99+
this.tableSchemas = new HashMap<>(tableSchemas);
96100
this.chunkSplitterState = chunkSplitterState;
97101
}
98102

0 commit comments

Comments
 (0)