|
27 | 27 | import io.debezium.relational.TableId; |
28 | 28 | import io.debezium.relational.history.TableChanges.TableChange; |
29 | 29 |
|
| 30 | +import java.util.ArrayList; |
| 31 | +import java.util.HashMap; |
30 | 32 | import java.util.LinkedHashMap; |
31 | 33 | import java.util.List; |
32 | 34 | import java.util.Map; |
@@ -84,15 +86,16 @@ public SnapshotPendingSplitsState( |
84 | 86 | boolean isTableIdCaseSensitive, |
85 | 87 | boolean isRemainingTablesCheckpointed, |
86 | 88 | ChunkSplitterState chunkSplitterState) { |
87 | | - this.alreadyProcessedTables = alreadyProcessedTables; |
88 | | - this.remainingSplits = remainingSplits; |
89 | | - this.assignedSplits = assignedSplits; |
90 | | - this.splitFinishedOffsets = splitFinishedOffsets; |
| 89 | + // FLINK-38061: make defensive copy to avoid potential concurrent modification of the collections. |
| 90 | + this.alreadyProcessedTables = new ArrayList<>(alreadyProcessedTables); |
| 91 | + this.remainingSplits = new ArrayList<>(remainingSplits); |
| 92 | + this.assignedSplits = new LinkedHashMap<>(assignedSplits); |
| 93 | + this.splitFinishedOffsets = new HashMap<>(splitFinishedOffsets); |
91 | 94 | this.assignerStatus = assignerStatus; |
92 | | - this.remainingTables = remainingTables; |
| 95 | + this.remainingTables = new ArrayList<>(remainingTables); |
93 | 96 | this.isTableIdCaseSensitive = isTableIdCaseSensitive; |
94 | 97 | this.isRemainingTablesCheckpointed = isRemainingTablesCheckpointed; |
95 | | - this.tableSchemas = tableSchemas; |
| 98 | + this.tableSchemas = new HashMap<>(tableSchemas); |
96 | 99 | this.chunkSplitterState = chunkSplitterState; |
97 | 100 | } |
98 | 101 |
|
|
0 commit comments