Skip to content

Commit 1e26e17

Browse files
beryllwMrart
authored andcommitted
[FLINK-38139] Fix consecutive online schema change causes job failure (apache#4324)
1 parent bbb9bdd commit 1e26e17

8 files changed

Lines changed: 722 additions & 87 deletions

File tree

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOnLineSchemaMigrationITCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ class MySqlOnLineSchemaMigrationITCase extends MySqlSourceTestBase {
8282
private static final MySqlContainer MYSQL8_CONTAINER =
8383
createMySqlContainer(MySqlVersion.V8_0, "docker/server-gtids/expire-seconds/my.cnf");
8484

85-
private static final String PERCONA_TOOLKIT = "perconalab/percona-toolkit:3.5.7";
85+
private static final String PERCONA_TOOLKIT = "perconalab/percona-toolkit:3.7.1";
8686

8787
protected static final GenericContainer<?> PERCONA_TOOLKIT_CONTAINER =
8888
createPerconaToolkitContainer();

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOscITCase.java

Lines changed: 441 additions & 0 deletions
Large diffs are not rendered by default.

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/customer.sql

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -326,3 +326,34 @@ CREATE TABLE default_value_test (
326326
INSERT INTO default_value_test
327327
VALUES (1,'user1','Shanghai',123567),
328328
(2,'user2','Shanghai',123567);
329+
330+
-- table has auto increment primary key for pt-osc testing
331+
CREATE TABLE customers_auto_id (
332+
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
333+
name VARCHAR(255) NOT NULL DEFAULT 'flink',
334+
address VARCHAR(1024),
335+
phone_number VARCHAR(512)
336+
);
337+
338+
INSERT INTO customers_auto_id
339+
VALUES (default, 'user_1', 'Shanghai', '123567891234'),
340+
(default, 'user_2', 'Shanghai', '123567891234'),
341+
(default, 'user_3', 'Shanghai', '123567891234'),
342+
(default, 'user_4', 'Shanghai', '123567891234'),
343+
(default, 'user_5', 'Shanghai', '123567891234'),
344+
(default, 'user_6', 'Shanghai', '123567891234'),
345+
(default, 'user_7', 'Shanghai', '123567891234'),
346+
(default, 'user_8', 'Shanghai', '123567891234'),
347+
(default, 'user_9', 'Shanghai', '123567891234'),
348+
(default, 'user_10', 'Shanghai', '123567891234'),
349+
(default, 'user_11', 'Shanghai', '123567891234'),
350+
(default, 'user_12', 'Shanghai', '123567891234'),
351+
(default, 'user_13', 'Shanghai', '123567891234'),
352+
(default, 'user_14', 'Shanghai', '123567891234'),
353+
(default, 'user_15', 'Shanghai', '123567891234'),
354+
(default, 'user_16', 'Shanghai', '123567891234'),
355+
(default, 'user_17', 'Shanghai', '123567891234'),
356+
(default, 'user_18', 'Shanghai', '123567891234'),
357+
(default, 'user_19', 'Shanghai', '123567891234'),
358+
(default, 'user_20', 'Shanghai', '123567891234'),
359+
(default, 'user_21', 'Shanghai', '123567891234');

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplit;
2828
import org.apache.flink.cdc.connectors.mysql.source.split.SourceRecords;
2929
import org.apache.flink.cdc.connectors.mysql.source.utils.ChunkUtils;
30+
import org.apache.flink.cdc.connectors.mysql.source.utils.OnlineSchemaChangeUtils;
3031
import org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils;
3132
import org.apache.flink.cdc.connectors.mysql.source.utils.SplitKeyUtils;
3233
import org.apache.flink.cdc.connectors.mysql.table.StartupMode;
@@ -92,6 +93,7 @@ public class BinlogSplitReader implements DebeziumReader<SourceRecords, MySqlSpl
9293
new StoppableChangeEventSourceContext();
9394
private final boolean isParsingOnLineSchemaChanges;
9495
private final boolean isBackfillSkipped;
96+
private final Map<String, List<SourceRecord>> pendingSchemaChangeEvents;
9597

9698
private static final long READER_CLOSE_TIMEOUT = 30L;
9799

@@ -114,6 +116,7 @@ public BinlogSplitReader(StatefulTaskContext statefulTaskContext, int subtaskId)
114116
this.isParsingOnLineSchemaChanges =
115117
statefulTaskContext.getSourceConfig().isParseOnLineSchemaChanges();
116118
this.isBackfillSkipped = statefulTaskContext.getSourceConfig().isSkipSnapshotBackfill();
119+
this.pendingSchemaChangeEvents = new HashMap<>();
117120
}
118121

119122
public void submitSplit(MySqlSplit mySqlSplit) {
@@ -181,8 +184,35 @@ public Iterator<SourceRecords> pollSplitRecords() throws InterruptedException {
181184
Optional<SourceRecord> oscRecord =
182185
parseOnLineSchemaChangeEvent(event.getRecord());
183186
if (oscRecord.isPresent()) {
184-
sourceRecords.add(oscRecord.get());
185-
continue;
187+
TableId tableId = RecordUtils.getTableId(oscRecord.get());
188+
if (tableId != null) {
189+
LOG.info(
190+
"Received the start event of online schema change: {}. Save it for later.",
191+
oscRecord.get());
192+
pendingSchemaChangeEvents
193+
.computeIfAbsent(tableId.toString(), k -> new ArrayList<>())
194+
.add(oscRecord.get());
195+
continue;
196+
}
197+
}
198+
199+
Optional<String> finishedTables =
200+
OnlineSchemaChangeUtils.parseOnLineSchemaRenameEvent(event.getRecord());
201+
if (finishedTables.isPresent()) {
202+
TableId tableId = RecordUtils.getTableId(event.getRecord());
203+
String finishedTableId = tableId.catalog() + "." + finishedTables.get();
204+
LOG.info(
205+
"Received the ending event of table {}. Emit corresponding DDL event now.",
206+
finishedTableId);
207+
208+
if (pendingSchemaChangeEvents.containsKey(finishedTableId)) {
209+
sourceRecords.addAll(pendingSchemaChangeEvents.remove(finishedTableId));
210+
} else {
211+
LOG.error(
212+
"Error: met an unexpected osc finish event. Current pending events: {}, Record: {}",
213+
pendingSchemaChangeEvents,
214+
event);
215+
}
186216
}
187217
}
188218
if (shouldEmit(event.getRecord())) {
@@ -228,11 +258,11 @@ public void close() {
228258
}
229259

230260
private Optional<SourceRecord> parseOnLineSchemaChangeEvent(SourceRecord sourceRecord) {
231-
if (RecordUtils.isOnLineSchemaChangeEvent(sourceRecord)) {
261+
if (OnlineSchemaChangeUtils.isOnLineSchemaChangeEvent(sourceRecord)) {
232262
// This is a gh-ost initialized schema change event and should be emitted if the
233263
// peeled tableId matches the predicate.
234264
TableId originalTableId = RecordUtils.getTableId(sourceRecord);
235-
TableId peeledTableId = RecordUtils.peelTableId(originalTableId);
265+
TableId peeledTableId = OnlineSchemaChangeUtils.peelTableId(originalTableId);
236266
if (capturedTableFilter.test(peeledTableId)) {
237267
return Optional.of(
238268
RecordUtils.setTableId(sourceRecord, originalTableId, peeledTableId));
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.connectors.mysql.source.utils;
19+
20+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
21+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
22+
23+
import io.debezium.data.Envelope;
24+
import io.debezium.relational.TableId;
25+
import io.debezium.relational.history.HistoryRecord;
26+
import org.apache.kafka.connect.data.Struct;
27+
import org.apache.kafka.connect.source.SourceRecord;
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
30+
31+
import java.util.Arrays;
32+
import java.util.Collections;
33+
import java.util.Comparator;
34+
import java.util.List;
35+
import java.util.Optional;
36+
import java.util.regex.Matcher;
37+
import java.util.regex.Pattern;
38+
39+
import static io.debezium.connector.AbstractSourceInfo.TABLE_NAME_KEY;
40+
import static org.apache.flink.cdc.connectors.mysql.debezium.dispatcher.EventDispatcherImpl.HISTORY_RECORD_FIELD;
41+
42+
/** Utility class for handling gh-ost/pt-osc online schema change events. */
43+
public class OnlineSchemaChangeUtils {
44+
45+
private static final Logger LOG = LoggerFactory.getLogger(OnlineSchemaChangeUtils.class);
46+
47+
private OnlineSchemaChangeUtils() {}
48+
49+
/**
50+
* Pattern matching gh-ost shadow table ({@code _<name>_gho}) and pt-osc new table ({@code
51+
* _<name>_new}), which carry the actual ALTER DDL during an online schema change.
52+
*/
53+
private static final Pattern OSC_TABLE_ID_PATTERN = Pattern.compile("^_(.*)_(gho|new)$");
54+
55+
/**
56+
* Pattern matching gh-ost delete table ({@code _<name>_del}) and pt-osc old table ({@code
57+
* _<name>_old}), which are the temporary backup tables created during an online schema change.
58+
*/
59+
private static final Pattern OSC_TEMP_TABLE_ID_PATTERN = Pattern.compile("^_(.*)_(del|old)$");
60+
61+
/**
62+
* Checks whether the given source record is a gh-ost/pt-osc initiated schema change event by
63+
* inspecting the ALTER DDL statement targeting a shadow/new table.
64+
*
65+
* <p>There will be these schema change events generated in total during one transaction.
66+
*
67+
* <p>gh-ost:
68+
*
69+
* <pre>
70+
* DROP TABLE IF EXISTS `db`.`_tb1_gho`
71+
* DROP TABLE IF EXISTS `db`.`_tb1_del`
72+
* DROP TABLE IF EXISTS `db`.`_tb1_ghc`
73+
* create /* gh-ost *&#47; table `db`.`_tb1_ghc` ...
74+
* create /* gh-ost *&#47; table `db`.`_tb1_gho` like `db`.`tb1`
75+
* alter /* gh-ost *&#47; table `db`.`_tb1_gho` add column c varchar(255)
76+
* alter /* gh-ost *&#47; table `db`.`_tb1_gho` AUTO_INCREMENT=N (only present when the table has an AUTO_INCREMENT column)
77+
* create /* gh-ost *&#47; table `db`.`_tb1_del` ...
78+
* DROP TABLE IF EXISTS `db`.`_tb1_del`
79+
* rename /* gh-ost *&#47; table `db`.`tb1` to `db`.`_tb1_del`
80+
* rename /* gh-ost *&#47; table `db`.`_tb1_gho` to `db`.`tb1`
81+
* DROP TABLE IF EXISTS `db`.`_tb1_ghc`
82+
* DROP TABLE IF EXISTS `db`.`_tb1_del`
83+
* </pre>
84+
*
85+
* <p>pt-osc:
86+
*
87+
* <pre>
88+
* CREATE TABLE `db`.`_test_tb1_new`
89+
* ALTER TABLE `db`.`_test_tb1_new` add column c varchar(50)
90+
* CREATE TRIGGER `pt_osc_db_test_tb1_del`...
91+
* CREATE TRIGGER `pt_osc_db_test_tb1_upd`...
92+
* CREATE TRIGGER `pt_osc_db_test_tb1_ins`...
93+
* ANALYZE TABLE `db`.`_test_tb1_new` /* pt-online-schema-change *&#47;
94+
* RENAME TABLE `db`.`test_tb1` TO `db`.`_test_tb1_old`, `db`.`_test_tb1_new` TO `db`.`test_tb1`
95+
* DROP TABLE IF EXISTS `_test_tb1_old` /* generated by server *&#47;
96+
* DROP TRIGGER IF EXISTS `db`.`pt_osc_db_test_tb1_del`
97+
* DROP TRIGGER IF EXISTS `db`.`pt_osc_db_test_tb1_upd`
98+
* DROP TRIGGER IF EXISTS `db`.`pt_osc_db_test_tb1_ins`
99+
* </pre>
100+
*
101+
* <p>Among all these, only the ALTER statement targeting the {@code _gho}/{@code _new} table is
102+
* stored temporarily, and emitted when the subsequent RENAME TABLE event arrives.
103+
*/
104+
public static boolean isOnLineSchemaChangeEvent(SourceRecord record) {
105+
if (!RecordUtils.isSchemaChangeEvent(record)) {
106+
return false;
107+
}
108+
Struct value = (Struct) record.value();
109+
ObjectMapper mapper = new ObjectMapper();
110+
try {
111+
String ddl =
112+
mapper.readTree(value.getString(HISTORY_RECORD_FIELD))
113+
.get(HistoryRecord.Fields.DDL_STATEMENTS)
114+
.asText()
115+
.toLowerCase();
116+
if (ddl.startsWith("alter")) {
117+
String tableName =
118+
value.getStruct(Envelope.FieldName.SOURCE).getString(TABLE_NAME_KEY);
119+
return OSC_TABLE_ID_PATTERN.matcher(tableName).matches();
120+
}
121+
return false;
122+
} catch (JsonProcessingException e) {
123+
return false;
124+
}
125+
}
126+
127+
/**
128+
* Parses a gh-ost/pt-osc RENAME TABLE event and returns the original (user-facing) table name
129+
* if the event represents the completion of an online schema change.
130+
*
131+
* @return the original table name if the record is an OSC completion rename, or {@link
132+
* Optional#empty()} otherwise.
133+
*/
134+
public static Optional<String> parseOnLineSchemaRenameEvent(SourceRecord record) {
135+
if (!RecordUtils.isSchemaChangeEvent(record)) {
136+
return Optional.empty();
137+
}
138+
Struct value = (Struct) record.value();
139+
ObjectMapper mapper = new ObjectMapper();
140+
141+
try {
142+
String ddl =
143+
mapper.readTree(value.getString(HISTORY_RECORD_FIELD))
144+
.get(HistoryRecord.Fields.DDL_STATEMENTS)
145+
.asText()
146+
.toLowerCase();
147+
if (ddl.startsWith("rename table") || ddl.startsWith("rename /* gh-ost */ table")) {
148+
LOG.info("Checking if DDL might be an OSC renaming event... {}", ddl);
149+
List<String> tableNames =
150+
Arrays.asList(
151+
value.getStruct(Envelope.FieldName.SOURCE)
152+
.getString(TABLE_NAME_KEY)
153+
.split(","));
154+
if (tableNames.size() != 2) {
155+
LOG.info(
156+
"Table name {} is malformed, skip it.",
157+
value.getStruct(Envelope.FieldName.SOURCE).getString(TABLE_NAME_KEY));
158+
return Optional.empty();
159+
}
160+
161+
String renamedFromTableName =
162+
Collections.min(tableNames, Comparator.comparingInt(String::length));
163+
String renamedToTableName =
164+
Collections.max(tableNames, Comparator.comparingInt(String::length));
165+
166+
LOG.info(
167+
"Determined the shorter TableId {} is the renaming source.",
168+
renamedFromTableName);
169+
LOG.info(
170+
"Determined the longer TableId {} is the renaming target.",
171+
renamedToTableName);
172+
173+
if (OSC_TEMP_TABLE_ID_PATTERN.matcher(renamedToTableName).matches()) {
174+
LOG.info(
175+
"Renamed to TableId name {} matches OSC temporary TableId pattern, yield {}.",
176+
renamedToTableName,
177+
renamedFromTableName);
178+
return Optional.of(renamedFromTableName);
179+
}
180+
181+
LOG.info(
182+
"Renamed to TableId {} does not match any RegEx pattern, skip it.",
183+
renamedToTableName);
184+
}
185+
return Optional.empty();
186+
} catch (JsonProcessingException e) {
187+
LOG.warn("Failed to parse schema change event {}", value, e);
188+
return Optional.empty();
189+
}
190+
}
191+
192+
/**
193+
* Peels out a gh-ost/pt-osc mangled {@link TableId} back to the original user-facing one.
194+
*
195+
* <p>For example, {@code _customers_gho} → {@code customers}, {@code _orders_new} → {@code
196+
* orders}.
197+
*/
198+
public static TableId peelTableId(TableId tableId) {
199+
Matcher matchingResult = OSC_TABLE_ID_PATTERN.matcher(tableId.table());
200+
if (matchingResult.matches()) {
201+
return new TableId(tableId.catalog(), tableId.schema(), matchingResult.group(1));
202+
}
203+
return tableId;
204+
}
205+
}

0 commit comments

Comments
 (0)