Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class MySqlOnLineSchemaMigrationITCase extends MySqlSourceTestBase {
private static final MySqlContainer MYSQL8_CONTAINER =
createMySqlContainer(MySqlVersion.V8_0, "docker/server-gtids/expire-seconds/my.cnf");

private static final String PERCONA_TOOLKIT = "perconalab/percona-toolkit:3.5.7";
private static final String PERCONA_TOOLKIT = "perconalab/percona-toolkit:3.7.1";

protected static final GenericContainer<?> PERCONA_TOOLKIT_CONTAINER =
createPerconaToolkitContainer();
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -326,3 +326,34 @@ CREATE TABLE default_value_test (
INSERT INTO default_value_test
VALUES (1,'user1','Shanghai',123567),
(2,'user2','Shanghai',123567);

-- table has auto increment primary key for pt-osc testing
CREATE TABLE customers_auto_id (
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL DEFAULT 'flink',
address VARCHAR(1024),
phone_number VARCHAR(512)
);

INSERT INTO customers_auto_id
VALUES (default, 'user_1', 'Shanghai', '123567891234'),
(default, 'user_2', 'Shanghai', '123567891234'),
(default, 'user_3', 'Shanghai', '123567891234'),
(default, 'user_4', 'Shanghai', '123567891234'),
(default, 'user_5', 'Shanghai', '123567891234'),
(default, 'user_6', 'Shanghai', '123567891234'),
(default, 'user_7', 'Shanghai', '123567891234'),
(default, 'user_8', 'Shanghai', '123567891234'),
(default, 'user_9', 'Shanghai', '123567891234'),
(default, 'user_10', 'Shanghai', '123567891234'),
(default, 'user_11', 'Shanghai', '123567891234'),
(default, 'user_12', 'Shanghai', '123567891234'),
(default, 'user_13', 'Shanghai', '123567891234'),
(default, 'user_14', 'Shanghai', '123567891234'),
(default, 'user_15', 'Shanghai', '123567891234'),
(default, 'user_16', 'Shanghai', '123567891234'),
(default, 'user_17', 'Shanghai', '123567891234'),
(default, 'user_18', 'Shanghai', '123567891234'),
(default, 'user_19', 'Shanghai', '123567891234'),
(default, 'user_20', 'Shanghai', '123567891234'),
(default, 'user_21', 'Shanghai', '123567891234');
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ public class BinlogSplitReader implements DebeziumReader<SourceRecords, MySqlSpl
new StoppableChangeEventSourceContext();
private final boolean isParsingOnLineSchemaChanges;
private final boolean isBackfillSkipped;
private final Map<String, List<SourceRecord>> pendingSchemaChangeEvents;

private static final long READER_CLOSE_TIMEOUT = 30L;

Expand All @@ -114,6 +115,7 @@ public BinlogSplitReader(StatefulTaskContext statefulTaskContext, int subtaskId)
this.isParsingOnLineSchemaChanges =
statefulTaskContext.getSourceConfig().isParseOnLineSchemaChanges();
this.isBackfillSkipped = statefulTaskContext.getSourceConfig().isSkipSnapshotBackfill();
this.pendingSchemaChangeEvents = new HashMap<>();
}

public void submitSplit(MySqlSplit mySqlSplit) {
Expand Down Expand Up @@ -181,8 +183,35 @@ public Iterator<SourceRecords> pollSplitRecords() throws InterruptedException {
Optional<SourceRecord> oscRecord =
parseOnLineSchemaChangeEvent(event.getRecord());
if (oscRecord.isPresent()) {
sourceRecords.add(oscRecord.get());
continue;
TableId tableId = RecordUtils.getTableId(oscRecord.get());
if (tableId != null) {
LOG.info(
"Received the start event of online schema change: {}. Save it for later.",
oscRecord.get());
pendingSchemaChangeEvents
.computeIfAbsent(tableId.toString(), k -> new ArrayList<>())
.add(oscRecord.get());
continue;
}
}

Optional<String> finishedTables =
RecordUtils.parseOnLineSchemaRenameEvent(event.getRecord());
if (finishedTables.isPresent()) {
TableId tableId = RecordUtils.getTableId(event.getRecord());
String finishedTableId = tableId.catalog() + "." + finishedTables.get();
LOG.info(
"Received the ending event of table {}. Emit corresponding DDL event now.",
finishedTableId);

if (pendingSchemaChangeEvents.containsKey(finishedTableId)) {
sourceRecords.addAll(pendingSchemaChangeEvents.remove(finishedTableId));
} else {
LOG.error(
"Error: met an unexpected osc finish event. Current pending events: {}, Record: {}",
pendingSchemaChangeEvents,
event);
}
}
}
if (shouldEmit(event.getRecord())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,10 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
Expand Down Expand Up @@ -512,7 +514,7 @@ public static boolean isOnLineSchemaChangeEvent(SourceRecord record) {
// DROP TRIGGER IF EXISTS `db`.`pt_osc_db_test_tb1_ins`
//
// Among all these, we only need the "ALTER" one that happens on the `_gho`/`_new`
// table.
// table and store them temporarily, and emit them when the RENAME TABLE event pops up.
String ddl =
mapper.readTree(value.getString(HISTORY_RECORD_FIELD))
.get(HistoryRecord.Fields.DDL_STATEMENTS)
Expand All @@ -530,8 +532,68 @@ public static boolean isOnLineSchemaChangeEvent(SourceRecord record) {
}
}

public static Optional<String> parseOnLineSchemaRenameEvent(SourceRecord record) {
Comment thread
beryllw marked this conversation as resolved.
Outdated
if (!isSchemaChangeEvent(record)) {
return Optional.empty();
}
Struct value = (Struct) record.value();
ObjectMapper mapper = new ObjectMapper();

try {
String ddl =
mapper.readTree(value.getString(HISTORY_RECORD_FIELD))
.get(HistoryRecord.Fields.DDL_STATEMENTS)
.asText()
.toLowerCase();
if (ddl.startsWith("rename table") || ddl.startsWith("rename /* gh-ost */ table")) {
LOG.info("Checking if DDL might be an OSC renaming event... {}", ddl);
List<String> tableNames =
Arrays.asList(
value.getStruct(Envelope.FieldName.SOURCE)
.getString(TABLE_NAME_KEY)
.split(","));
if (tableNames.size() != 2) {
LOG.info(
"Table name {} is malformed, skip it.",
value.getStruct(Envelope.FieldName.SOURCE).getString(TABLE_NAME_KEY));
return Optional.empty();
}

String renamedFromTableName =
Collections.min(tableNames, Comparator.comparingInt(String::length));
String renamedToTableName =
Collections.max(tableNames, Comparator.comparingInt(String::length));

LOG.info(
"Determined the shorter TableId {} is the renaming source.",
renamedFromTableName);
LOG.info(
"Determined the longer TableId {} is the renaming target.",
renamedToTableName);

if (OSC_TEMP_TABLE_ID_PATTERN.matcher(renamedToTableName).matches()) {
LOG.info(
"Renamed to TableId name {} matches OSC temporary TableId pattern, yield {}.",
renamedToTableName,
renamedFromTableName);
return Optional.of(renamedFromTableName);
}

LOG.info(
"Renamed to TableId {} does not match any RegEx pattern, skip it.",
renamedToTableName);
}
return Optional.empty();
} catch (JsonProcessingException e) {
LOG.warn("Failed to parse schema change event {}", value, e);
return Optional.empty();
Comment thread
beryllw marked this conversation as resolved.
Outdated
}
}

private static final Pattern OSC_TABLE_ID_PATTERN = Pattern.compile("^_(.*)_(gho|new)$");

private static final Pattern OSC_TEMP_TABLE_ID_PATTERN = Pattern.compile("^_(.*)_(del|old)$");

/** This utility method peels out gh-ost/pt-osc mangled tableId to the original one. */
public static TableId peelTableId(TableId tableId) {
Matcher matchingResult = OSC_TABLE_ID_PATTERN.matcher(tableId.table());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class MySqlOnLineSchemaMigrationSourceITCase extends MySqlSourceTestBase {
private static final String TEST_USER = "mysqluser";
private static final String TEST_PASSWORD = "mysqlpw";

private static final String PERCONA_TOOLKIT = "perconalab/percona-toolkit:3.5.7";
private static final String PERCONA_TOOLKIT = "perconalab/percona-toolkit:3.7.1";

protected static final GenericContainer<?> PERCONA_TOOLKIT_CONTAINER =
createPerconaToolkitContainer();
Expand Down Expand Up @@ -174,7 +174,8 @@ void testGhOstSchemaMigrationFromScratch() throws Exception {
.serverId(getServerId())
.deserializer(new JsonDebeziumDeserializationSchema())
.serverTimeZone("UTC")
.includeSchemaChanges(true) // output the schema changes as well
.includeSchemaChanges(true)
.parseOnLineSchemaChanges(true)
.build();

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Expand Down Expand Up @@ -334,7 +335,8 @@ void testPtOscSchemaMigrationFromScratch() throws Exception {
.serverId(getServerId())
.deserializer(new JsonDebeziumDeserializationSchema())
.serverTimeZone("UTC")
.includeSchemaChanges(true) // output the schema changes as well
.includeSchemaChanges(true)
.parseOnLineSchemaChanges(true)
.build();

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class MySqlOnLineSchemaMigrationTableITCase extends MySqlSourceTestBase {
private static final String TEST_USER = "mysqluser";
private static final String TEST_PASSWORD = "mysqlpw";

private static final String PERCONA_TOOLKIT = "perconalab/percona-toolkit:3.5.7";
private static final String PERCONA_TOOLKIT = "perconalab/percona-toolkit:3.7.1";

protected static final GenericContainer<?> PERCONA_TOOLKIT_CONTAINER =
createPerconaToolkitContainer();
Expand Down Expand Up @@ -177,7 +177,8 @@ void testGhOstSchemaMigrationFromScratch() throws Exception {
+ " 'table-name' = '%s',"
+ " 'scan.incremental.snapshot.enabled' = '%s',"
+ " 'server-time-zone' = 'UTC',"
+ " 'server-id' = '%s'"
+ " 'server-id' = '%s',"
+ " 'scan.parse.online.schema.changes.enabled' = 'true'"
+ ")",
MYSQL8_CONTAINER.getHost(),
MYSQL8_CONTAINER.getDatabasePort(),
Expand Down Expand Up @@ -346,7 +347,8 @@ void testPtOscSchemaMigrationFromScratch() throws Exception {
+ " 'table-name' = '%s',"
+ " 'scan.incremental.snapshot.enabled' = '%s',"
+ " 'server-time-zone' = 'UTC',"
+ " 'server-id' = '%s'"
+ " 'server-id' = '%s',"
+ " 'scan.parse.online.schema.changes.enabled' = 'true'"
+ ")",
MYSQL8_CONTAINER.getHost(),
MYSQL8_CONTAINER.getDatabasePort(),
Expand Down
Loading