diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOnLineSchemaMigrationITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOnLineSchemaMigrationITCase.java index 8716fffc420..522d7dd2647 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOnLineSchemaMigrationITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOnLineSchemaMigrationITCase.java @@ -82,7 +82,7 @@ class MySqlOnLineSchemaMigrationITCase extends MySqlSourceTestBase { private static final MySqlContainer MYSQL8_CONTAINER = createMySqlContainer(MySqlVersion.V8_0, "docker/server-gtids/expire-seconds/my.cnf"); - private static final String PERCONA_TOOLKIT = "perconalab/percona-toolkit:3.5.7"; + private static final String PERCONA_TOOLKIT = "perconalab/percona-toolkit:3.7.1"; protected static final GenericContainer PERCONA_TOOLKIT_CONTAINER = createPerconaToolkitContainer(); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOscITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOscITCase.java new file mode 100644 index 00000000000..2907a040574 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOscITCase.java @@ -0,0 +1,441 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.mysql.source; + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.cdc.common.configuration.Configuration; +import org.apache.flink.cdc.common.pipeline.PipelineOptions; +import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; +import org.apache.flink.cdc.common.utils.TestCaseUtils; +import org.apache.flink.cdc.composer.PipelineExecution; +import org.apache.flink.cdc.composer.definition.PipelineDef; +import org.apache.flink.cdc.composer.definition.RouteDef; +import org.apache.flink.cdc.composer.definition.SinkDef; +import org.apache.flink.cdc.composer.definition.SourceDef; +import org.apache.flink.cdc.composer.flink.FlinkPipelineComposer; +import org.apache.flink.cdc.connectors.mysql.factory.MySqlDataSourceFactory; +import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer; +import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion; +import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase; +import org.apache.flink.cdc.connectors.values.ValuesDatabase; +import org.apache.flink.cdc.connectors.values.factory.ValuesDataFactory; +import org.apache.flink.cdc.connectors.values.sink.ValuesDataSinkOptions; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.planner.factories.TestValuesTableFactory; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.testcontainers.DockerClientFactory; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.sql.Connection; +import java.sql.Statement; +import java.util.Collections; +import java.util.stream.Stream; + +import static org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_PASSWORD; +import static org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_USER; + +/** A more complicated IT case for Evolving MySQL schema with gh-ost/pt-osc utility. */ +class MySqlOscITCase extends MySqlSourceTestBase { + private static final MySqlContainer MYSQL8_CONTAINER = createMySqlContainer(MySqlVersion.V8_0); + + private static final String PERCONA_TOOLKIT = "perconalab/percona-toolkit:3.7.1"; + + protected static final GenericContainer PERCONA_TOOLKIT_CONTAINER = + createPerconaToolkitContainer(); + + private final UniqueDatabase customerDatabase = + new UniqueDatabase(MYSQL8_CONTAINER, "customer", TEST_USER, TEST_PASSWORD); + + private final StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(); + + private static final String GH_OST_DOWNLOAD_LINK = + DockerClientFactory.instance().client().versionCmd().exec().getArch().equals("amd64") + ? "https://github.com/github/gh-ost/releases/download/v1.1.6/gh-ost-binary-linux-amd64-20231207144046.tar.gz" + : "https://github.com/github/gh-ost/releases/download/v1.1.6/gh-ost-binary-linux-arm64-20231207144046.tar.gz"; + + private final PrintStream standardOut = System.out; + private final ByteArrayOutputStream outCaptor = new ByteArrayOutputStream(); + + @BeforeEach + void takeoverOutput() { + System.setOut(new PrintStream(outCaptor)); + } + + @AfterEach + protected void handInStdOut() { + System.setOut(standardOut); + outCaptor.reset(); + } + + @BeforeAll + static void beforeClass() { + LOG.info("Starting MySql8 containers..."); + Startables.deepStart(Stream.of(MYSQL8_CONTAINER)).join(); + Startables.deepStart(Stream.of(PERCONA_TOOLKIT_CONTAINER)).join(); + LOG.info("Container MySql8 is started."); + } + + @AfterAll + static void afterClass() { + LOG.info("Stopping MySql8 containers..."); + MYSQL8_CONTAINER.stop(); + PERCONA_TOOLKIT_CONTAINER.stop(); + LOG.info("Container MySql8 is stopped."); + } + + @BeforeEach + void before() { + customerDatabase.createAndInitialize(); + TestValuesTableFactory.clearAllData(); + ValuesDatabase.clear(); + env.setParallelism(4); + env.enableCheckpointing(200); + env.setRestartStrategy(RestartStrategies.noRestart()); + } + + @AfterEach + void after() { + customerDatabase.dropDatabase(); + } + + private static void installGhOstCli(Container container) { + try { + execInContainer( + container, + "download gh-ost tarball", + "curl", + "-L", + "-o", + "/tmp/gh-ost.tar.gz", + GH_OST_DOWNLOAD_LINK); + execInContainer( + container, "unzip binary", "tar", "-xzvf", "/tmp/gh-ost.tar.gz", "-C", "/bin"); + } catch (IOException | InterruptedException e) { + throw new RuntimeException(e); + } + } + + private static GenericContainer createPerconaToolkitContainer() { + GenericContainer perconaToolkit = + new GenericContainer<>(PERCONA_TOOLKIT) + // keep container alive + .withCommand("tail", "-f", "/dev/null") + .withNetwork(NETWORK) + .withLogConsumer(new Slf4jLogConsumer(LOG)); + return perconaToolkit; + } + + private void insertRecordsPhase1(UniqueDatabase database, int startIndex, int count) + throws Exception { + try (Connection connection = database.getJdbcConnection(); + Statement statement = connection.createStatement()) { + for (int i = startIndex; i < startIndex + count; i++) { + statement.execute( + String.format( + "insert into customers (id, name, address, phone_number) values (%s, '%s', '%s', '%s');", + i, "flink_" + i, "Address Line #" + i, 1000000000L + i)); + } + } + } + + private void insertRecordsPhase2(UniqueDatabase database, int startIndex, int count) + throws Exception { + try (Connection connection = database.getJdbcConnection(); + Statement statement = connection.createStatement()) { + for (int i = startIndex; i < startIndex + count; i++) { + statement.execute( + String.format( + "insert into customers (id, name, address, phone_number, ext) values (%s, '%s', '%s', '%s', %s);", + i, "flink_" + i, "Address Line #" + i, 1000000000L + i, i)); + } + } + } + + private void insertRecordsPhase1ForAutoId(UniqueDatabase database, int count) throws Exception { + try (Connection connection = database.getJdbcConnection(); + Statement statement = connection.createStatement()) { + for (int i = 0; i < count; i++) { + statement.execute( + String.format( + "insert into customers_auto_id (name, address, phone_number) values ('%s', '%s', '%s');", + "flink_" + i, "Address Line #" + i, 1000000000L + i)); + } + } + } + + private void insertRecordsPhase2ForAutoId(UniqueDatabase database, int count) throws Exception { + try (Connection connection = database.getJdbcConnection(); + Statement statement = connection.createStatement()) { + for (int i = 0; i < count; i++) { + statement.execute( + String.format( + "insert into customers_auto_id (name, address, phone_number, ext) values ('%s', '%s', '%s', %s);", + "flink_ext_" + i, "Address Line Ext #" + i, 1000000000L + i, i)); + } + } + } + + @Test + void testGhOstSchemaMigration() throws Exception { + String databaseName = customerDatabase.getDatabaseName(); + + LOG.info("Step 1: Install gh-ost command line utility"); + installGhOstCli(MYSQL8_CONTAINER); + + Thread yamlJob = runJob(databaseName, "customers"); + yamlJob.start(); + + LOG.info("Step 2: Start pipeline job"); + insertRecordsPhase1(customerDatabase, 5000, 1000); + + LOG.info("Step 3: Evolve schema with gh-ost - ADD COLUMN"); + + Thread thread = + new Thread( + () -> { + try { + execInContainer( + MYSQL8_CONTAINER, + "evolve schema", + "gh-ost", + "--user=" + TEST_USER, + "--password=" + TEST_PASSWORD, + "--database=" + databaseName, + "--table=customers", + "--alter=add column ext int first", + "--allow-on-master", // because we don't have a replica + "--initially-drop-old-table", // drop previously generated + // temporary tables + "--execute"); + } catch (IOException | InterruptedException e) { + throw new RuntimeException(e); + } + }); + + thread.start(); + insertRecordsPhase1(customerDatabase, 7000, 3000); + + thread.join(); + insertRecordsPhase2(customerDatabase, 12000, 1000); + + try { + TestCaseUtils.repeatedCheck( + () -> outCaptor.toString().split(System.lineSeparator()).length == 5023); + } catch (Exception e) { + LOG.error("Failed to verify results. Captured stdout: {}", outCaptor.toString(), e); + } finally { + yamlJob.interrupt(); + } + } + + @Test + void testPtOscSchemaMigration() throws Exception { + String databaseName = customerDatabase.getDatabaseName(); + + LOG.info("Step 1: Install gh-ost command line utility"); + installGhOstCli(MYSQL8_CONTAINER); + + Thread yamlJob = runJob(databaseName, "customers"); + yamlJob.start(); + + LOG.info("Step 2: Start pipeline job"); + insertRecordsPhase1(customerDatabase, 5000, 1000); + + LOG.info("Step 3: Evolve schema with gh-ost - ADD COLUMN"); + + Thread thread = + new Thread( + () -> { + try { + execInContainer( + PERCONA_TOOLKIT_CONTAINER, + "evolve schema", + "pt-online-schema-change", + "--user=" + TEST_USER, + "--host=" + INTER_CONTAINER_MYSQL_ALIAS, + "--password=" + TEST_PASSWORD, + "P=3306,t=customers,D=" + databaseName, + "--alter", + "add column ext int first", + "--charset=utf8", + "--recursion-method=NONE", // Do not look for slave nodes + "--print", + "--execute"); + } catch (IOException | InterruptedException e) { + throw new RuntimeException(e); + } + }); + + LOG.info("Insertion Phase 1 finishes"); + thread.start(); + insertRecordsPhase1(customerDatabase, 7000, 3000); + LOG.info("Insertion Phase 2 finishes"); + + thread.join(); + insertRecordsPhase2(customerDatabase, 12000, 1000); + LOG.info("Insertion Phase 3 finishes"); + + try { + TestCaseUtils.repeatedCheck( + () -> outCaptor.toString().split(System.lineSeparator()).length == 5023); + } catch (Exception e) { + LOG.error("Failed to verify results. Captured stdout: {}", outCaptor.toString(), e); + } finally { + yamlJob.interrupt(); + } + } + + @Test + void testPtOscSchemaMigrationWithAutoIncrementId() throws Exception { + String databaseName = customerDatabase.getDatabaseName(); + + LOG.info("Step 1: Start pipeline job for auto increment id table"); + + Thread yamlJob = runJob(databaseName, "customers_auto_id"); + yamlJob.start(); + + LOG.info("Step 2: Insert initial records (Phase 1)"); + insertRecordsPhase1ForAutoId(customerDatabase, 1000); + + LOG.info("Step 3: Evolve schema with pt-osc - ADD COLUMN"); + + Thread thread = + new Thread( + () -> { + try { + execInContainer( + PERCONA_TOOLKIT_CONTAINER, + "evolve schema", + "pt-online-schema-change", + "--user=" + TEST_USER, + "--host=" + INTER_CONTAINER_MYSQL_ALIAS, + "--password=" + TEST_PASSWORD, + "P=3306,t=customers_auto_id,D=" + databaseName, + "--alter", + "add column ext int first", + "--charset=utf8", + "--recursion-method=NONE", // Do not look for slave nodes + "--print", + "--execute"); + } catch (IOException | InterruptedException e) { + throw new RuntimeException(e); + } + }); + + LOG.info("Insertion Phase 1 finishes"); + thread.start(); + insertRecordsPhase1ForAutoId(customerDatabase, 3000); + LOG.info("Insertion Phase 2 finishes"); + + thread.join(); + insertRecordsPhase2ForAutoId(customerDatabase, 1000); + LOG.info("Insertion Phase 3 finishes"); + + // Initial 21 records + Phase1 1000 + Phase2 3000 + Phase3 1000 = 5021 + try { + TestCaseUtils.repeatedCheck( + () -> outCaptor.toString().split(System.lineSeparator()).length == 5021); + } catch (Exception e) { + LOG.error("Failed to verify results. Captured stdout: {}", outCaptor.toString(), e); + } finally { + yamlJob.interrupt(); + } + } + + private static void execInContainer(Container container, String prompt, String... commands) + throws IOException, InterruptedException { + { + LOG.info( + "Starting to {} with the following command: `{}`", + prompt, + String.join(" ", commands)); + Container.ExecResult execResult = container.execInContainer(commands); + if (execResult.getExitCode() == 0) { + LOG.info("Successfully {}. Stdout: {}", prompt, execResult.getStdout()); + } else { + LOG.error( + "Failed to {}. Exit code: {}, Stdout: {}, Stderr: {}", + prompt, + execResult.getExitCode(), + execResult.getStdout(), + execResult.getStderr()); + throw new IOException("Failed to execute commands: " + String.join(" ", commands)); + } + } + } + + private Thread runJob(String databaseName, String tableName) { + FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster(); + + // Setup MySQL source + Configuration sourceConfig = new Configuration(); + sourceConfig.set(MySqlDataSourceOptions.HOSTNAME, MYSQL8_CONTAINER.getHost()); + sourceConfig.set(MySqlDataSourceOptions.PORT, MYSQL8_CONTAINER.getDatabasePort()); + sourceConfig.set(MySqlDataSourceOptions.USERNAME, TEST_USER); + sourceConfig.set(MySqlDataSourceOptions.PASSWORD, TEST_PASSWORD); + sourceConfig.set(MySqlDataSourceOptions.SERVER_TIME_ZONE, "UTC"); + sourceConfig.set(MySqlDataSourceOptions.TABLES, databaseName + "." + tableName); + sourceConfig.set(MySqlDataSourceOptions.PARSE_ONLINE_SCHEMA_CHANGES, true); + + SourceDef sourceDef = + new SourceDef(MySqlDataSourceFactory.IDENTIFIER, "MySQL Source", sourceConfig); + + Configuration sinkConfig = new Configuration(); + sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true); + SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig); + + Configuration pipelineConfig = new Configuration(); + pipelineConfig.set( + PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); + PipelineDef pipelineDef = + new PipelineDef( + sourceDef, + sinkDef, + Collections.singletonList( + new RouteDef( + databaseName + "." + tableName, + "sink_db.sink_tbl", + null, + null)), + Collections.emptyList(), + Collections.emptyList(), + pipelineConfig); + + PipelineExecution execution = composer.compose(pipelineDef); + return new Thread( + () -> { + try { + execution.execute(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/customer.sql b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/customer.sql index e4df63f1a33..b840f4358a8 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/customer.sql +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/customer.sql @@ -326,3 +326,34 @@ CREATE TABLE default_value_test ( INSERT INTO default_value_test VALUES (1,'user1','Shanghai',123567), (2,'user2','Shanghai',123567); + +-- table has auto increment primary key for pt-osc testing +CREATE TABLE customers_auto_id ( + id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, + name VARCHAR(255) NOT NULL DEFAULT 'flink', + address VARCHAR(1024), + phone_number VARCHAR(512) +); + +INSERT INTO customers_auto_id +VALUES (default, 'user_1', 'Shanghai', '123567891234'), + (default, 'user_2', 'Shanghai', '123567891234'), + (default, 'user_3', 'Shanghai', '123567891234'), + (default, 'user_4', 'Shanghai', '123567891234'), + (default, 'user_5', 'Shanghai', '123567891234'), + (default, 'user_6', 'Shanghai', '123567891234'), + (default, 'user_7', 'Shanghai', '123567891234'), + (default, 'user_8', 'Shanghai', '123567891234'), + (default, 'user_9', 'Shanghai', '123567891234'), + (default, 'user_10', 'Shanghai', '123567891234'), + (default, 'user_11', 'Shanghai', '123567891234'), + (default, 'user_12', 'Shanghai', '123567891234'), + (default, 'user_13', 'Shanghai', '123567891234'), + (default, 'user_14', 'Shanghai', '123567891234'), + (default, 'user_15', 'Shanghai', '123567891234'), + (default, 'user_16', 'Shanghai', '123567891234'), + (default, 'user_17', 'Shanghai', '123567891234'), + (default, 'user_18', 'Shanghai', '123567891234'), + (default, 'user_19', 'Shanghai', '123567891234'), + (default, 'user_20', 'Shanghai', '123567891234'), + (default, 'user_21', 'Shanghai', '123567891234'); \ No newline at end of file diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java index 77682534f55..b1e6d1dfc86 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java @@ -27,6 +27,7 @@ import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplit; import org.apache.flink.cdc.connectors.mysql.source.split.SourceRecords; import org.apache.flink.cdc.connectors.mysql.source.utils.ChunkUtils; +import org.apache.flink.cdc.connectors.mysql.source.utils.OnlineSchemaChangeUtils; import org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils; import org.apache.flink.cdc.connectors.mysql.source.utils.SplitKeyUtils; import org.apache.flink.cdc.connectors.mysql.table.StartupMode; @@ -92,6 +93,7 @@ public class BinlogSplitReader implements DebeziumReader> pendingSchemaChangeEvents; private static final long READER_CLOSE_TIMEOUT = 30L; @@ -114,6 +116,7 @@ public BinlogSplitReader(StatefulTaskContext statefulTaskContext, int subtaskId) this.isParsingOnLineSchemaChanges = statefulTaskContext.getSourceConfig().isParseOnLineSchemaChanges(); this.isBackfillSkipped = statefulTaskContext.getSourceConfig().isSkipSnapshotBackfill(); + this.pendingSchemaChangeEvents = new HashMap<>(); } public void submitSplit(MySqlSplit mySqlSplit) { @@ -181,8 +184,35 @@ public Iterator pollSplitRecords() throws InterruptedException { Optional oscRecord = parseOnLineSchemaChangeEvent(event.getRecord()); if (oscRecord.isPresent()) { - sourceRecords.add(oscRecord.get()); - continue; + TableId tableId = RecordUtils.getTableId(oscRecord.get()); + if (tableId != null) { + LOG.info( + "Received the start event of online schema change: {}. Save it for later.", + oscRecord.get()); + pendingSchemaChangeEvents + .computeIfAbsent(tableId.toString(), k -> new ArrayList<>()) + .add(oscRecord.get()); + continue; + } + } + + Optional finishedTables = + OnlineSchemaChangeUtils.parseOnLineSchemaRenameEvent(event.getRecord()); + if (finishedTables.isPresent()) { + TableId tableId = RecordUtils.getTableId(event.getRecord()); + String finishedTableId = tableId.catalog() + "." + finishedTables.get(); + LOG.info( + "Received the ending event of table {}. Emit corresponding DDL event now.", + finishedTableId); + + if (pendingSchemaChangeEvents.containsKey(finishedTableId)) { + sourceRecords.addAll(pendingSchemaChangeEvents.remove(finishedTableId)); + } else { + LOG.error( + "Error: met an unexpected osc finish event. Current pending events: {}, Record: {}", + pendingSchemaChangeEvents, + event); + } } } if (shouldEmit(event.getRecord())) { @@ -228,11 +258,11 @@ public void close() { } private Optional parseOnLineSchemaChangeEvent(SourceRecord sourceRecord) { - if (RecordUtils.isOnLineSchemaChangeEvent(sourceRecord)) { + if (OnlineSchemaChangeUtils.isOnLineSchemaChangeEvent(sourceRecord)) { // This is a gh-ost initialized schema change event and should be emitted if the // peeled tableId matches the predicate. TableId originalTableId = RecordUtils.getTableId(sourceRecord); - TableId peeledTableId = RecordUtils.peelTableId(originalTableId); + TableId peeledTableId = OnlineSchemaChangeUtils.peelTableId(originalTableId); if (capturedTableFilter.test(peeledTableId)) { return Optional.of( RecordUtils.setTableId(sourceRecord, originalTableId, peeledTableId)); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/OnlineSchemaChangeUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/OnlineSchemaChangeUtils.java new file mode 100644 index 00000000000..aef47a4d326 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/OnlineSchemaChangeUtils.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.mysql.source.utils; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import io.debezium.data.Envelope; +import io.debezium.relational.TableId; +import io.debezium.relational.history.HistoryRecord; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.source.SourceRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Optional; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static io.debezium.connector.AbstractSourceInfo.TABLE_NAME_KEY; +import static org.apache.flink.cdc.connectors.mysql.debezium.dispatcher.EventDispatcherImpl.HISTORY_RECORD_FIELD; + +/** Utility class for handling gh-ost/pt-osc online schema change events. */ +public class OnlineSchemaChangeUtils { + + private static final Logger LOG = LoggerFactory.getLogger(OnlineSchemaChangeUtils.class); + + private OnlineSchemaChangeUtils() {} + + /** + * Pattern matching gh-ost shadow table ({@code __gho}) and pt-osc new table ({@code + * __new}), which carry the actual ALTER DDL during an online schema change. + */ + private static final Pattern OSC_TABLE_ID_PATTERN = Pattern.compile("^_(.*)_(gho|new)$"); + + /** + * Pattern matching gh-ost delete table ({@code __del}) and pt-osc old table ({@code + * __old}), which are the temporary backup tables created during an online schema change. + */ + private static final Pattern OSC_TEMP_TABLE_ID_PATTERN = Pattern.compile("^_(.*)_(del|old)$"); + + /** + * Checks whether the given source record is a gh-ost/pt-osc initiated schema change event by + * inspecting the ALTER DDL statement targeting a shadow/new table. + * + *

There will be these schema change events generated in total during one transaction. + * + *

gh-ost: + * + *

+     * DROP TABLE IF EXISTS `db`.`_tb1_gho`
+     * DROP TABLE IF EXISTS `db`.`_tb1_del`
+     * DROP TABLE IF EXISTS `db`.`_tb1_ghc`
+     * create /* gh-ost */ table `db`.`_tb1_ghc` ...
+     * create /* gh-ost */ table `db`.`_tb1_gho` like `db`.`tb1`
+     * alter /* gh-ost */ table `db`.`_tb1_gho` add column c varchar(255)
+     * alter /* gh-ost */ table `db`.`_tb1_gho` AUTO_INCREMENT=N  (only present when the table has an AUTO_INCREMENT column)
+     * create /* gh-ost */ table `db`.`_tb1_del` ...
+     * DROP TABLE IF EXISTS `db`.`_tb1_del`
+     * rename /* gh-ost */ table `db`.`tb1` to `db`.`_tb1_del`
+     * rename /* gh-ost */ table `db`.`_tb1_gho` to `db`.`tb1`
+     * DROP TABLE IF EXISTS `db`.`_tb1_ghc`
+     * DROP TABLE IF EXISTS `db`.`_tb1_del`
+     * 
+ * + *

pt-osc: + * + *

+     * CREATE TABLE `db`.`_test_tb1_new`
+     * ALTER TABLE `db`.`_test_tb1_new` add column c varchar(50)
+     * CREATE TRIGGER `pt_osc_db_test_tb1_del`...
+     * CREATE TRIGGER `pt_osc_db_test_tb1_upd`...
+     * CREATE TRIGGER `pt_osc_db_test_tb1_ins`...
+     * ANALYZE TABLE `db`.`_test_tb1_new` /* pt-online-schema-change */
+     * RENAME TABLE `db`.`test_tb1` TO `db`.`_test_tb1_old`, `db`.`_test_tb1_new` TO `db`.`test_tb1`
+     * DROP TABLE IF EXISTS `_test_tb1_old` /* generated by server */
+     * DROP TRIGGER IF EXISTS `db`.`pt_osc_db_test_tb1_del`
+     * DROP TRIGGER IF EXISTS `db`.`pt_osc_db_test_tb1_upd`
+     * DROP TRIGGER IF EXISTS `db`.`pt_osc_db_test_tb1_ins`
+     * 
+ * + *

Among all these, only the ALTER statement targeting the {@code _gho}/{@code _new} table is + * stored temporarily, and emitted when the subsequent RENAME TABLE event arrives. + */ + public static boolean isOnLineSchemaChangeEvent(SourceRecord record) { + if (!RecordUtils.isSchemaChangeEvent(record)) { + return false; + } + Struct value = (Struct) record.value(); + ObjectMapper mapper = new ObjectMapper(); + try { + String ddl = + mapper.readTree(value.getString(HISTORY_RECORD_FIELD)) + .get(HistoryRecord.Fields.DDL_STATEMENTS) + .asText() + .toLowerCase(); + if (ddl.startsWith("alter")) { + String tableName = + value.getStruct(Envelope.FieldName.SOURCE).getString(TABLE_NAME_KEY); + return OSC_TABLE_ID_PATTERN.matcher(tableName).matches(); + } + return false; + } catch (JsonProcessingException e) { + return false; + } + } + + /** + * Parses a gh-ost/pt-osc RENAME TABLE event and returns the original (user-facing) table name + * if the event represents the completion of an online schema change. + * + * @return the original table name if the record is an OSC completion rename, or {@link + * Optional#empty()} otherwise. + */ + public static Optional parseOnLineSchemaRenameEvent(SourceRecord record) { + if (!RecordUtils.isSchemaChangeEvent(record)) { + return Optional.empty(); + } + Struct value = (Struct) record.value(); + ObjectMapper mapper = new ObjectMapper(); + + try { + String ddl = + mapper.readTree(value.getString(HISTORY_RECORD_FIELD)) + .get(HistoryRecord.Fields.DDL_STATEMENTS) + .asText() + .toLowerCase(); + if (ddl.startsWith("rename table") || ddl.startsWith("rename /* gh-ost */ table")) { + LOG.info("Checking if DDL might be an OSC renaming event... {}", ddl); + List tableNames = + Arrays.asList( + value.getStruct(Envelope.FieldName.SOURCE) + .getString(TABLE_NAME_KEY) + .split(",")); + if (tableNames.size() != 2) { + LOG.info( + "Table name {} is malformed, skip it.", + value.getStruct(Envelope.FieldName.SOURCE).getString(TABLE_NAME_KEY)); + return Optional.empty(); + } + + String renamedFromTableName = + Collections.min(tableNames, Comparator.comparingInt(String::length)); + String renamedToTableName = + Collections.max(tableNames, Comparator.comparingInt(String::length)); + + LOG.info( + "Determined the shorter TableId {} is the renaming source.", + renamedFromTableName); + LOG.info( + "Determined the longer TableId {} is the renaming target.", + renamedToTableName); + + if (OSC_TEMP_TABLE_ID_PATTERN.matcher(renamedToTableName).matches()) { + LOG.info( + "Renamed to TableId name {} matches OSC temporary TableId pattern, yield {}.", + renamedToTableName, + renamedFromTableName); + return Optional.of(renamedFromTableName); + } + + LOG.info( + "Renamed to TableId {} does not match any RegEx pattern, skip it.", + renamedToTableName); + } + return Optional.empty(); + } catch (JsonProcessingException e) { + LOG.warn("Failed to parse schema change event {}", value, e); + return Optional.empty(); + } + } + + /** + * Peels out a gh-ost/pt-osc mangled {@link TableId} back to the original user-facing one. + * + *

For example, {@code _customers_gho} → {@code customers}, {@code _orders_new} → {@code + * orders}. + */ + public static TableId peelTableId(TableId tableId) { + Matcher matchingResult = OSC_TABLE_ID_PATTERN.matcher(tableId.table()); + if (matchingResult.matches()) { + return new TableId(tableId.catalog(), tableId.schema(), matchingResult.group(1)); + } + return tableId; + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java index bf4d4f29f56..1f9a041d78f 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java @@ -25,9 +25,6 @@ import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSnapshotSplit; import org.apache.flink.table.types.logical.RowType; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; - import io.debezium.data.Envelope; import io.debezium.document.Document; import io.debezium.document.DocumentReader; @@ -51,8 +48,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import java.util.stream.Collectors; import static io.debezium.connector.AbstractSourceInfo.DATABASE_NAME_KEY; @@ -469,75 +464,4 @@ private static Optional getWatermarkKind(SourceRecord record) { } return Optional.empty(); } - - /** - * This utility method checks if given source record is a gh-ost/pt-osc initiated schema change - * event by checking the "alter" ddl. - */ - public static boolean isOnLineSchemaChangeEvent(SourceRecord record) { - if (!isSchemaChangeEvent(record)) { - return false; - } - Struct value = (Struct) record.value(); - ObjectMapper mapper = new ObjectMapper(); - try { - // There will be these schema change events generated in total during one transaction. - // - // gh-ost: - // DROP TABLE IF EXISTS `db`.`_tb1_gho` - // DROP TABLE IF EXISTS `db`.`_tb1_del` - // DROP TABLE IF EXISTS `db`.`_tb1_ghc` - // create /* gh-ost */ table `db`.`_tb1_ghc` ... - // create /* gh-ost */ table `db`.`_tb1_gho` like `db`.`tb1` - // alter /* gh-ost */ table `db`.`_tb1_gho` add column c varchar(255) - // create /* gh-ost */ table `db`.`_tb1_del` ... - // DROP TABLE IF EXISTS `db`.`_tb1_del` - // rename /* gh-ost */ table `db`.`tb1` to `db`.`_tb1_del` - // rename /* gh-ost */ table `db`.`_tb1_gho` to `db`.`tb1` - // DROP TABLE IF EXISTS `db`.`_tb1_ghc` - // DROP TABLE IF EXISTS `db`.`_tb1_del` - // - // pt-osc: - // CREATE TABLE `db`.`_test_tb1_new` - // ALTER TABLE `db`.`_test_tb1_new` add column c varchar(50) - // CREATE TRIGGER `pt_osc_db_test_tb1_del`... - // CREATE TRIGGER `pt_osc_db_test_tb1_upd`... - // CREATE TRIGGER `pt_osc_db_test_tb1_ins`... - // ANALYZE TABLE `db`.`_test_tb1_new` /* pt-online-schema-change */ - // RENAME TABLE `db`.`test_tb1` TO `db`.`_test_tb1_old`, `db`.`_test_tb1_new` TO - // `db`.`test_tb1` - // DROP TABLE IF EXISTS `_test_tb1_old` /* generated by server */ - // DROP TRIGGER IF EXISTS `db`.`pt_osc_db_test_tb1_del` - // DROP TRIGGER IF EXISTS `db`.`pt_osc_db_test_tb1_upd` - // DROP TRIGGER IF EXISTS `db`.`pt_osc_db_test_tb1_ins` - // - // Among all these, we only need the "ALTER" one that happens on the `_gho`/`_new` - // table. - String ddl = - mapper.readTree(value.getString(HISTORY_RECORD_FIELD)) - .get(HistoryRecord.Fields.DDL_STATEMENTS) - .asText() - .toLowerCase(); - if (ddl.startsWith("alter")) { - String tableName = - value.getStruct(Envelope.FieldName.SOURCE).getString(TABLE_NAME_KEY); - return OSC_TABLE_ID_PATTERN.matcher(tableName).matches(); - } - - return false; - } catch (JsonProcessingException e) { - return false; - } - } - - private static final Pattern OSC_TABLE_ID_PATTERN = Pattern.compile("^_(.*)_(gho|new)$"); - - /** This utility method peels out gh-ost/pt-osc mangled tableId to the original one. */ - public static TableId peelTableId(TableId tableId) { - Matcher matchingResult = OSC_TABLE_ID_PATTERN.matcher(tableId.table()); - if (matchingResult.matches()) { - return new TableId(tableId.catalog(), tableId.schema(), matchingResult.group(1)); - } - return tableId; - } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOnLineSchemaMigrationSourceITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOnLineSchemaMigrationSourceITCase.java index ec9da16d141..145bf4c0e40 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOnLineSchemaMigrationSourceITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOnLineSchemaMigrationSourceITCase.java @@ -77,7 +77,7 @@ class MySqlOnLineSchemaMigrationSourceITCase extends MySqlSourceTestBase { private static final String TEST_USER = "mysqluser"; private static final String TEST_PASSWORD = "mysqlpw"; - private static final String PERCONA_TOOLKIT = "perconalab/percona-toolkit:3.5.7"; + private static final String PERCONA_TOOLKIT = "perconalab/percona-toolkit:3.7.1"; protected static final GenericContainer PERCONA_TOOLKIT_CONTAINER = createPerconaToolkitContainer(); @@ -174,7 +174,8 @@ void testGhOstSchemaMigrationFromScratch() throws Exception { .serverId(getServerId()) .deserializer(new JsonDebeziumDeserializationSchema()) .serverTimeZone("UTC") - .includeSchemaChanges(true) // output the schema changes as well + .includeSchemaChanges(true) + .parseOnLineSchemaChanges(true) .build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -334,7 +335,8 @@ void testPtOscSchemaMigrationFromScratch() throws Exception { .serverId(getServerId()) .deserializer(new JsonDebeziumDeserializationSchema()) .serverTimeZone("UTC") - .includeSchemaChanges(true) // output the schema changes as well + .includeSchemaChanges(true) + .parseOnLineSchemaChanges(true) .build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlOnLineSchemaMigrationTableITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlOnLineSchemaMigrationTableITCase.java index 8bc09a4808a..752b54f240d 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlOnLineSchemaMigrationTableITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlOnLineSchemaMigrationTableITCase.java @@ -78,7 +78,7 @@ class MySqlOnLineSchemaMigrationTableITCase extends MySqlSourceTestBase { private static final String TEST_USER = "mysqluser"; private static final String TEST_PASSWORD = "mysqlpw"; - private static final String PERCONA_TOOLKIT = "perconalab/percona-toolkit:3.5.7"; + private static final String PERCONA_TOOLKIT = "perconalab/percona-toolkit:3.7.1"; protected static final GenericContainer PERCONA_TOOLKIT_CONTAINER = createPerconaToolkitContainer(); @@ -177,7 +177,8 @@ void testGhOstSchemaMigrationFromScratch() throws Exception { + " 'table-name' = '%s'," + " 'scan.incremental.snapshot.enabled' = '%s'," + " 'server-time-zone' = 'UTC'," - + " 'server-id' = '%s'" + + " 'server-id' = '%s'," + + " 'scan.parse.online.schema.changes.enabled' = 'true'" + ")", MYSQL8_CONTAINER.getHost(), MYSQL8_CONTAINER.getDatabasePort(), @@ -346,7 +347,8 @@ void testPtOscSchemaMigrationFromScratch() throws Exception { + " 'table-name' = '%s'," + " 'scan.incremental.snapshot.enabled' = '%s'," + " 'server-time-zone' = 'UTC'," - + " 'server-id' = '%s'" + + " 'server-id' = '%s'," + + " 'scan.parse.online.schema.changes.enabled' = 'true'" + ")", MYSQL8_CONTAINER.getHost(), MYSQL8_CONTAINER.getDatabasePort(),