Skip to content

Commit 520c0bf

Browse files
beryllwboyu.wjb
authored andcommitted
[FLINK-38139][cdc/mysql][follow] Cache Online Schema change DDL Alter AUTO_INCREMENT
1 parent a052ad9 commit 520c0bf

1 file changed

Lines changed: 5 additions & 3 deletions

File tree

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ public class BinlogSplitReader implements DebeziumReader<SourceRecords, MySqlSpl
9090
new StoppableChangeEventSourceContext();
9191
private final boolean isParsingOnLineSchemaChanges;
9292
private final boolean isBackfillSkipped;
93-
private final Map<String, SourceRecord> pendingSchemaChangeEvents;
93+
private final Map<String, List<SourceRecord>> pendingSchemaChangeEvents;
9494

9595
private static final long READER_CLOSE_TIMEOUT = 30L;
9696

@@ -178,7 +178,9 @@ public Iterator<SourceRecords> pollSplitRecords() throws InterruptedException {
178178
LOG.info(
179179
"Received the start event of online schema change: {}. Save it for later.",
180180
oscRecord.get());
181-
pendingSchemaChangeEvents.put(tableId.toString(), oscRecord.get());
181+
pendingSchemaChangeEvents
182+
.computeIfAbsent(tableId.toString(), k -> new ArrayList<>())
183+
.add(oscRecord.get());
182184
continue;
183185
}
184186
}
@@ -193,7 +195,7 @@ public Iterator<SourceRecords> pollSplitRecords() throws InterruptedException {
193195
finishedTableId);
194196

195197
if (pendingSchemaChangeEvents.containsKey(finishedTableId)) {
196-
sourceRecords.add(pendingSchemaChangeEvents.remove(finishedTableId));
198+
sourceRecords.addAll(pendingSchemaChangeEvents.remove(finishedTableId));
197199
} else {
198200
LOG.error(
199201
"Error: met an unexpected osc finish event. Current pending events: {}, Record: {}",

0 commit comments

Comments
 (0)