From 733daa94c8a4fb7079f0bca9d4e1940c702a8361 Mon Sep 17 00:00:00 2001 From: "yuxiqian.yxq" Date: Thu, 10 Jul 2025 10:52:38 +0800 Subject: [PATCH 1/5] [FLINK-38139] Fix consecutive online schema change causes job failure. --- .../mysql/source/MySqlOscITCase.java | 355 ++++++++++++++++++ .../debezium/reader/BinlogSplitReader.java | 31 +- .../mysql/source/utils/RecordUtils.java | 72 +++- ...ySqlOnLineSchemaMigrationSourceITCase.java | 6 +- ...MySqlOnLineSchemaMigrationTableITCase.java | 6 +- 5 files changed, 463 insertions(+), 7 deletions(-) create mode 100644 flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOscITCase.java diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOscITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOscITCase.java new file mode 100644 index 00000000000..aeea8d54af0 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOscITCase.java @@ -0,0 +1,355 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.mysql.source; + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.cdc.common.configuration.Configuration; +import org.apache.flink.cdc.common.pipeline.PipelineOptions; +import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; +import org.apache.flink.cdc.common.utils.TestCaseUtils; +import org.apache.flink.cdc.composer.PipelineExecution; +import org.apache.flink.cdc.composer.definition.PipelineDef; +import org.apache.flink.cdc.composer.definition.RouteDef; +import org.apache.flink.cdc.composer.definition.SinkDef; +import org.apache.flink.cdc.composer.definition.SourceDef; +import org.apache.flink.cdc.composer.flink.FlinkPipelineComposer; +import org.apache.flink.cdc.connectors.mysql.factory.MySqlDataSourceFactory; +import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer; +import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion; +import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase; +import org.apache.flink.cdc.connectors.values.ValuesDatabase; +import org.apache.flink.cdc.connectors.values.factory.ValuesDataFactory; +import org.apache.flink.cdc.connectors.values.sink.ValuesDataSinkOptions; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.planner.factories.TestValuesTableFactory; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.testcontainers.DockerClientFactory; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.utility.MountableFile; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.sql.Connection; +import java.sql.Statement; +import java.util.Collections; +import java.util.stream.Stream; + +import static org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_PASSWORD; +import static org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_USER; + +/** A more complicated IT case for Evolving MySQL schema with gh-ost/pt-osc utility. */ +class MySqlOscITCase extends MySqlSourceTestBase { + private static final MySqlContainer MYSQL8_CONTAINER = createMySqlContainer(MySqlVersion.V8_0); + + private static final String PERCONA_TOOLKIT = "perconalab/percona-toolkit:3.5.7"; + + protected static final GenericContainer PERCONA_TOOLKIT_CONTAINER = + createPerconaToolkitContainer(); + + private final UniqueDatabase customerDatabase = + new UniqueDatabase(MYSQL8_CONTAINER, "customer", TEST_USER, TEST_PASSWORD); + + private final StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(); + + private static final String GH_OST_RESOURCE_NAME = + DockerClientFactory.instance().client().versionCmd().exec().getArch().equals("amd64") + ? "ghost-cli/gh-ost-binary-linux-amd64-20231207144046.tar.gz" + : "ghost-cli/gh-ost-binary-linux-arm64-20231207144046.tar.gz"; + + private final PrintStream standardOut = System.out; + private final ByteArrayOutputStream outCaptor = new ByteArrayOutputStream(); + + @BeforeEach + void takeoverOutput() { + System.setOut(new PrintStream(outCaptor)); + } + + @AfterEach + protected void handInStdOut() { + System.setOut(standardOut); + outCaptor.reset(); + } + + @BeforeAll + static void beforeClass() { + LOG.info("Starting MySql8 containers..."); + Startables.deepStart(Stream.of(MYSQL8_CONTAINER)).join(); + Startables.deepStart(Stream.of(PERCONA_TOOLKIT_CONTAINER)).join(); + LOG.info("Container MySql8 is started."); + } + + @AfterAll + static void afterClass() { + LOG.info("Stopping MySql8 containers..."); + MYSQL8_CONTAINER.stop(); + PERCONA_TOOLKIT_CONTAINER.stop(); + LOG.info("Container MySql8 is stopped."); + } + + @BeforeEach + void before() { + customerDatabase.createAndInitialize(); + TestValuesTableFactory.clearAllData(); + ValuesDatabase.clear(); + env.setParallelism(4); + env.enableCheckpointing(200); + env.setRestartStrategy(RestartStrategies.noRestart()); + } + + @AfterEach + void after() { + customerDatabase.dropDatabase(); + } + + private static void installGhOstCli(Container container) { + try { + container.copyFileToContainer( + MountableFile.forClasspathResource(GH_OST_RESOURCE_NAME), "/tmp/gh-ost.tar.gz"); + execInContainer( + container, "unzip binary", "tar", "-xzvf", "/tmp/gh-ost.tar.gz", "-C", "/bin"); + } catch (IOException | InterruptedException e) { + throw new RuntimeException(e); + } + } + + private static GenericContainer createPerconaToolkitContainer() { + GenericContainer perconaToolkit = + new GenericContainer<>(PERCONA_TOOLKIT) + // keep container alive + .withCommand("tail", "-f", "/dev/null") + .withNetwork(NETWORK) + .withLogConsumer(new Slf4jLogConsumer(LOG)); + return perconaToolkit; + } + + private void insertRecordsPhase1(UniqueDatabase database, int startIndex, int count) + throws Exception { + try (Connection connection = database.getJdbcConnection(); + Statement statement = connection.createStatement()) { + for (int i = startIndex; i < startIndex + count; i++) { + statement.execute( + String.format( + "insert into customers (id, name, address, phone_number) values (%s, '%s', '%s', '%s');", + i, "flink_" + i, "Address Line #" + i, 1000000000L + i)); + } + } + } + + private void insertRecordsPhase2(UniqueDatabase database, int startIndex, int count) + throws Exception { + try (Connection connection = database.getJdbcConnection(); + Statement statement = connection.createStatement()) { + for (int i = startIndex; i < startIndex + count; i++) { + statement.execute( + String.format( + "insert into customers (id, name, address, phone_number, ext) values (%s, '%s', '%s', '%s', %s);", + i, "flink_" + i, "Address Line #" + i, 1000000000L + i, i)); + } + } + } + + @Test + void testGhOstSchemaMigration() throws Exception { + String databaseName = customerDatabase.getDatabaseName(); + + LOG.info("Step 1: Install gh-ost command line utility"); + installGhOstCli(MYSQL8_CONTAINER); + + Thread yamlJob = runJob(databaseName, "customers"); + yamlJob.start(); + + LOG.info("Step 2: Start pipeline job"); + insertRecordsPhase1(customerDatabase, 5000, 1000); + + LOG.info("Step 3: Evolve schema with gh-ost - ADD COLUMN"); + + Thread thread = + new Thread( + () -> { + try { + execInContainer( + MYSQL8_CONTAINER, + "evolve schema", + "gh-ost", + "--user=" + TEST_USER, + "--password=" + TEST_PASSWORD, + "--database=" + databaseName, + "--table=customers", + "--alter=add column ext int first", + "--allow-on-master", // because we don't have a replica + "--initially-drop-old-table", // drop previously generated + // temporary tables + "--execute"); + } catch (IOException | InterruptedException e) { + throw new RuntimeException(e); + } + }); + + thread.start(); + insertRecordsPhase1(customerDatabase, 7000, 3000); + + thread.join(); + insertRecordsPhase2(customerDatabase, 12000, 1000); + + try { + TestCaseUtils.repeatedCheck( + () -> outCaptor.toString().split(System.lineSeparator()).length == 5023); + } catch (Exception e) { + LOG.error("Failed to verify results. Captured stdout: {}", outCaptor.toString(), e); + } finally { + yamlJob.interrupt(); + } + } + + @Test + void testPtOscSchemaMigration() throws Exception { + String databaseName = customerDatabase.getDatabaseName(); + + LOG.info("Step 1: Install gh-ost command line utility"); + installGhOstCli(MYSQL8_CONTAINER); + + Thread yamlJob = runJob(databaseName, "customers"); + yamlJob.start(); + + LOG.info("Step 2: Start pipeline job"); + insertRecordsPhase1(customerDatabase, 5000, 1000); + + LOG.info("Step 3: Evolve schema with gh-ost - ADD COLUMN"); + + Thread thread = + new Thread( + () -> { + try { + execInContainer( + PERCONA_TOOLKIT_CONTAINER, + "evolve schema", + "pt-online-schema-change", + "--user=" + TEST_USER, + "--host=" + INTER_CONTAINER_MYSQL_ALIAS, + "--password=" + TEST_PASSWORD, + "P=3306,t=customers,D=" + databaseName, + "--alter", + "add column ext int first", + "--charset=utf8", + "--recursion-method=NONE", // Do not look for slave nodes + "--print", + "--execute"); + } catch (IOException | InterruptedException e) { + throw new RuntimeException(e); + } + }); + + LOG.info("Insertion Phase 1 finishes"); + thread.start(); + insertRecordsPhase1(customerDatabase, 7000, 3000); + LOG.info("Insertion Phase 2 finishes"); + + thread.join(); + insertRecordsPhase2(customerDatabase, 12000, 1000); + LOG.info("Insertion Phase 3 finishes"); + + try { + TestCaseUtils.repeatedCheck( + () -> outCaptor.toString().split(System.lineSeparator()).length == 5023); + } catch (Exception e) { + LOG.error("Failed to verify results. Captured stdout: {}", outCaptor.toString(), e); + } finally { + yamlJob.interrupt(); + } + } + + private static void execInContainer(Container container, String prompt, String... commands) + throws IOException, InterruptedException { + { + LOG.info( + "Starting to {} with the following command: `{}`", + prompt, + String.join(" ", commands)); + Container.ExecResult execResult = container.execInContainer(commands); + if (execResult.getExitCode() == 0) { + LOG.info("Successfully {}. Stdout: {}", prompt, execResult.getStdout()); + } else { + LOG.error( + "Failed to {}. Exit code: {}, Stdout: {}, Stderr: {}", + prompt, + execResult.getExitCode(), + execResult.getStdout(), + execResult.getStderr()); + throw new IOException("Failed to execute commands: " + String.join(" ", commands)); + } + } + } + + private Thread runJob(String databaseName, String tableName) { + FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster(); + + // Setup MySQL source + Configuration sourceConfig = new Configuration(); + sourceConfig.set(MySqlDataSourceOptions.HOSTNAME, MYSQL8_CONTAINER.getHost()); + sourceConfig.set(MySqlDataSourceOptions.PORT, MYSQL8_CONTAINER.getDatabasePort()); + sourceConfig.set(MySqlDataSourceOptions.USERNAME, TEST_USER); + sourceConfig.set(MySqlDataSourceOptions.PASSWORD, TEST_PASSWORD); + sourceConfig.set(MySqlDataSourceOptions.SERVER_TIME_ZONE, "UTC"); + sourceConfig.set(MySqlDataSourceOptions.TABLES, databaseName + "." + tableName); + sourceConfig.set(MySqlDataSourceOptions.PARSE_ONLINE_SCHEMA_CHANGES, true); + + SourceDef sourceDef = + new SourceDef(MySqlDataSourceFactory.IDENTIFIER, "MySQL Source", sourceConfig); + + Configuration sinkConfig = new Configuration(); + sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true); + SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig); + + Configuration pipelineConfig = new Configuration(); + pipelineConfig.set( + PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); + PipelineDef pipelineDef = + new PipelineDef( + sourceDef, + sinkDef, + Collections.singletonList( + new RouteDef( + databaseName + "." + tableName, + "sink_db.sink_tbl", + null, + null)), + Collections.emptyList(), + Collections.emptyList(), + pipelineConfig); + + PipelineExecution execution = composer.compose(pipelineDef); + return new Thread( + () -> { + try { + execution.execute(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java index 77682534f55..1c454755a60 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java @@ -92,6 +92,7 @@ public class BinlogSplitReader implements DebeziumReader pendingSchemaChangeEvents; private static final long READER_CLOSE_TIMEOUT = 30L; @@ -114,6 +115,7 @@ public BinlogSplitReader(StatefulTaskContext statefulTaskContext, int subtaskId) this.isParsingOnLineSchemaChanges = statefulTaskContext.getSourceConfig().isParseOnLineSchemaChanges(); this.isBackfillSkipped = statefulTaskContext.getSourceConfig().isSkipSnapshotBackfill(); + this.pendingSchemaChangeEvents = new HashMap<>(); } public void submitSplit(MySqlSplit mySqlSplit) { @@ -181,8 +183,33 @@ public Iterator pollSplitRecords() throws InterruptedException { Optional oscRecord = parseOnLineSchemaChangeEvent(event.getRecord()); if (oscRecord.isPresent()) { - sourceRecords.add(oscRecord.get()); - continue; + TableId tableId = RecordUtils.getTableId(oscRecord.get()); + if (tableId != null) { + LOG.info( + "Received the start event of online schema change: {}. Save it for later.", + oscRecord.get()); + pendingSchemaChangeEvents.put(tableId.toString(), oscRecord.get()); + continue; + } + } + + Optional finishedTables = + RecordUtils.parseOnLineSchemaRenameEvent(event.getRecord()); + if (finishedTables.isPresent()) { + TableId tableId = RecordUtils.getTableId(event.getRecord()); + String finishedTableId = tableId.catalog() + "." + finishedTables.get(); + LOG.info( + "Received the ending event of table {}. Emit corresponding DDL event now.", + finishedTableId); + + if (pendingSchemaChangeEvents.containsKey(finishedTableId)) { + sourceRecords.add(pendingSchemaChangeEvents.remove(finishedTableId)); + } else { + LOG.error( + "Error: met an unexpected osc finish event. Current pending events: {}, Record: {}", + pendingSchemaChangeEvents, + event); + } } } if (shouldEmit(event.getRecord())) { diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java index bf4d4f29f56..37ca4144caf 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java @@ -46,6 +46,7 @@ import java.time.Instant; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -512,7 +513,7 @@ public static boolean isOnLineSchemaChangeEvent(SourceRecord record) { // DROP TRIGGER IF EXISTS `db`.`pt_osc_db_test_tb1_ins` // // Among all these, we only need the "ALTER" one that happens on the `_gho`/`_new` - // table. + // table and store them temporarily, and emit them when the RENAME TABLE event pops up. String ddl = mapper.readTree(value.getString(HISTORY_RECORD_FIELD)) .get(HistoryRecord.Fields.DDL_STATEMENTS) @@ -530,8 +531,77 @@ public static boolean isOnLineSchemaChangeEvent(SourceRecord record) { } } + public static Optional parseOnLineSchemaRenameEvent(SourceRecord record) { + if (!isSchemaChangeEvent(record)) { + return Optional.empty(); + } + Struct value = (Struct) record.value(); + ObjectMapper mapper = new ObjectMapper(); + + try { + String ddl = + mapper.readTree(value.getString(HISTORY_RECORD_FIELD)) + .get(HistoryRecord.Fields.DDL_STATEMENTS) + .asText() + .toLowerCase(); + if (ddl.startsWith("rename table") || ddl.startsWith("rename /* gh-ost */ table")) { + LOG.info("Checking if DDL might be an OSC renaming event... {}", ddl); + List tableNames = + Arrays.asList( + value.getStruct(Envelope.FieldName.SOURCE) + .getString(TABLE_NAME_KEY) + .split(",")); + if (tableNames.size() != 2) { + LOG.info( + "Table name {} is malformed, skip it.", + value.getStruct(Envelope.FieldName.SOURCE).getString(TABLE_NAME_KEY)); + return Optional.empty(); + } + + String renamedFromTableName = + Collections.min(tableNames, Comparator.comparingInt(String::length)); + String renamedToTableName = + Collections.max(tableNames, Comparator.comparingInt(String::length)); + + LOG.info( + "Determined the shorter TableId {} is the renaming source.", + renamedFromTableName); + LOG.info( + "Determined the longer TableId {} is the renaming target.", + renamedToTableName); + + if (OSC_TEMP_TABLE_ID_PATTERN.matcher(renamedToTableName).matches()) { + LOG.info( + "Renamed to TableId name {} matches OSC temporary TableId pattern, yield {}.", + renamedToTableName, + renamedFromTableName); + return Optional.of(renamedFromTableName); + } + + if (RDS_OGT_TEMP_TABLE_ID_PATTERN.matcher(renamedToTableName).matches()) { + LOG.info( + "Renamed to TableId name {} matches RDS temporary TableId pattern, yield {}.", + renamedToTableName, + renamedFromTableName); + return Optional.of(renamedFromTableName); + } + + LOG.info( + "Renamed to TableId {} does not match any RegEx pattern, skip it.", + renamedToTableName); + } + return Optional.empty(); + } catch (JsonProcessingException e) { + return Optional.empty(); + } + } + private static final Pattern OSC_TABLE_ID_PATTERN = Pattern.compile("^_(.*)_(gho|new)$"); + private static final Pattern OSC_TEMP_TABLE_ID_PATTERN = Pattern.compile("^_(.*)_(del|old)$"); + private static final Pattern RDS_OGT_TEMP_TABLE_ID_PATTERN = + Pattern.compile("^tp_\\d*_del_(.*)$"); + /** This utility method peels out gh-ost/pt-osc mangled tableId to the original one. */ public static TableId peelTableId(TableId tableId) { Matcher matchingResult = OSC_TABLE_ID_PATTERN.matcher(tableId.table()); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOnLineSchemaMigrationSourceITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOnLineSchemaMigrationSourceITCase.java index ec9da16d141..25b3df8378b 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOnLineSchemaMigrationSourceITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOnLineSchemaMigrationSourceITCase.java @@ -174,7 +174,8 @@ void testGhOstSchemaMigrationFromScratch() throws Exception { .serverId(getServerId()) .deserializer(new JsonDebeziumDeserializationSchema()) .serverTimeZone("UTC") - .includeSchemaChanges(true) // output the schema changes as well + .includeSchemaChanges(true) + .parseOnLineSchemaChanges(true) .build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -334,7 +335,8 @@ void testPtOscSchemaMigrationFromScratch() throws Exception { .serverId(getServerId()) .deserializer(new JsonDebeziumDeserializationSchema()) .serverTimeZone("UTC") - .includeSchemaChanges(true) // output the schema changes as well + .includeSchemaChanges(true) + .parseOnLineSchemaChanges(true) .build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlOnLineSchemaMigrationTableITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlOnLineSchemaMigrationTableITCase.java index 8bc09a4808a..97e18f1127f 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlOnLineSchemaMigrationTableITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlOnLineSchemaMigrationTableITCase.java @@ -177,7 +177,8 @@ void testGhOstSchemaMigrationFromScratch() throws Exception { + " 'table-name' = '%s'," + " 'scan.incremental.snapshot.enabled' = '%s'," + " 'server-time-zone' = 'UTC'," - + " 'server-id' = '%s'" + + " 'server-id' = '%s'," + + " 'scan.parse.online.schema.changes.enabled' = 'true'" + ")", MYSQL8_CONTAINER.getHost(), MYSQL8_CONTAINER.getDatabasePort(), @@ -346,7 +347,8 @@ void testPtOscSchemaMigrationFromScratch() throws Exception { + " 'table-name' = '%s'," + " 'scan.incremental.snapshot.enabled' = '%s'," + " 'server-time-zone' = 'UTC'," - + " 'server-id' = '%s'" + + " 'server-id' = '%s'," + + " 'scan.parse.online.schema.changes.enabled' = 'true'" + ")", MYSQL8_CONTAINER.getHost(), MYSQL8_CONTAINER.getDatabasePort(), From 14bcdbfa163bd576c2934470da91b13c87b045d8 Mon Sep 17 00:00:00 2001 From: Junbo Wang Date: Tue, 18 Nov 2025 19:39:05 +0800 Subject: [PATCH 2/5] [FLINK-38139][cdc/mysql][follow] Cache Online Schema change DDL Alter AUTO_INCREMENT --- .../mysql/debezium/reader/BinlogSplitReader.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java index 1c454755a60..139e465d8e6 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java @@ -92,7 +92,7 @@ public class BinlogSplitReader implements DebeziumReader pendingSchemaChangeEvents; + private final Map> pendingSchemaChangeEvents; private static final long READER_CLOSE_TIMEOUT = 30L; @@ -188,7 +188,9 @@ public Iterator pollSplitRecords() throws InterruptedException { LOG.info( "Received the start event of online schema change: {}. Save it for later.", oscRecord.get()); - pendingSchemaChangeEvents.put(tableId.toString(), oscRecord.get()); + pendingSchemaChangeEvents + .computeIfAbsent(tableId.toString(), k -> new ArrayList<>()) + .add(oscRecord.get()); continue; } } @@ -203,7 +205,7 @@ public Iterator pollSplitRecords() throws InterruptedException { finishedTableId); if (pendingSchemaChangeEvents.containsKey(finishedTableId)) { - sourceRecords.add(pendingSchemaChangeEvents.remove(finishedTableId)); + sourceRecords.addAll(pendingSchemaChangeEvents.remove(finishedTableId)); } else { LOG.error( "Error: met an unexpected osc finish event. Current pending events: {}, Record: {}", From 5d9737f20d52be3bd79bc2045d730e9040ef3ea8 Mon Sep 17 00:00:00 2001 From: Junbo Wang Date: Thu, 19 Mar 2026 15:54:07 +0800 Subject: [PATCH 3/5] add auto increment ID support for pt-osc schema migration testing --- .../mysql/source/MySqlOscITCase.java | 81 +++++++++++++++++++ .../src/test/resources/ddl/customer.sql | 31 +++++++ 2 files changed, 112 insertions(+) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOscITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOscITCase.java index aeea8d54af0..fffe1c57d9d 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOscITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOscITCase.java @@ -173,6 +173,30 @@ private void insertRecordsPhase2(UniqueDatabase database, int startIndex, int co } } + private void insertRecordsPhase1ForAutoId(UniqueDatabase database, int count) throws Exception { + try (Connection connection = database.getJdbcConnection(); + Statement statement = connection.createStatement()) { + for (int i = 0; i < count; i++) { + statement.execute( + String.format( + "insert into customers_auto_id (name, address, phone_number) values ('%s', '%s', '%s');", + "flink_" + i, "Address Line #" + i, 1000000000L + i)); + } + } + } + + private void insertRecordsPhase2ForAutoId(UniqueDatabase database, int count) throws Exception { + try (Connection connection = database.getJdbcConnection(); + Statement statement = connection.createStatement()) { + for (int i = 0; i < count; i++) { + statement.execute( + String.format( + "insert into customers_auto_id (name, address, phone_number, ext) values ('%s', '%s', '%s', %s);", + "flink_ext_" + i, "Address Line Ext #" + i, 1000000000L + i, i)); + } + } + } + @Test void testGhOstSchemaMigration() throws Exception { String databaseName = customerDatabase.getDatabaseName(); @@ -283,6 +307,63 @@ void testPtOscSchemaMigration() throws Exception { } } + @Test + void testPtOscSchemaMigrationWithAutoIncrementId() throws Exception { + String databaseName = customerDatabase.getDatabaseName(); + + LOG.info("Step 1: Start pipeline job for auto increment id table"); + + Thread yamlJob = runJob(databaseName, "customers_auto_id"); + yamlJob.start(); + + LOG.info("Step 2: Insert initial records (Phase 1)"); + insertRecordsPhase1ForAutoId(customerDatabase, 1000); + + LOG.info("Step 3: Evolve schema with pt-osc - ADD COLUMN"); + + Thread thread = + new Thread( + () -> { + try { + execInContainer( + PERCONA_TOOLKIT_CONTAINER, + "evolve schema", + "pt-online-schema-change", + "--user=" + TEST_USER, + "--host=" + INTER_CONTAINER_MYSQL_ALIAS, + "--password=" + TEST_PASSWORD, + "P=3306,t=customers_auto_id,D=" + databaseName, + "--alter", + "add column ext int first", + "--charset=utf8", + "--recursion-method=NONE", // Do not look for slave nodes + "--print", + "--execute"); + } catch (IOException | InterruptedException e) { + throw new RuntimeException(e); + } + }); + + LOG.info("Insertion Phase 1 finishes"); + thread.start(); + insertRecordsPhase1ForAutoId(customerDatabase, 3000); + LOG.info("Insertion Phase 2 finishes"); + + thread.join(); + insertRecordsPhase2ForAutoId(customerDatabase, 1000); + LOG.info("Insertion Phase 3 finishes"); + + // Initial 21 records + Phase1 1000 + Phase2 3000 + Phase3 1000 = 5021 + try { + TestCaseUtils.repeatedCheck( + () -> outCaptor.toString().split(System.lineSeparator()).length == 5021); + } catch (Exception e) { + LOG.error("Failed to verify results. Captured stdout: {}", outCaptor.toString(), e); + } finally { + yamlJob.interrupt(); + } + } + private static void execInContainer(Container container, String prompt, String... commands) throws IOException, InterruptedException { { diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/customer.sql b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/customer.sql index e4df63f1a33..b840f4358a8 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/customer.sql +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/customer.sql @@ -326,3 +326,34 @@ CREATE TABLE default_value_test ( INSERT INTO default_value_test VALUES (1,'user1','Shanghai',123567), (2,'user2','Shanghai',123567); + +-- table has auto increment primary key for pt-osc testing +CREATE TABLE customers_auto_id ( + id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, + name VARCHAR(255) NOT NULL DEFAULT 'flink', + address VARCHAR(1024), + phone_number VARCHAR(512) +); + +INSERT INTO customers_auto_id +VALUES (default, 'user_1', 'Shanghai', '123567891234'), + (default, 'user_2', 'Shanghai', '123567891234'), + (default, 'user_3', 'Shanghai', '123567891234'), + (default, 'user_4', 'Shanghai', '123567891234'), + (default, 'user_5', 'Shanghai', '123567891234'), + (default, 'user_6', 'Shanghai', '123567891234'), + (default, 'user_7', 'Shanghai', '123567891234'), + (default, 'user_8', 'Shanghai', '123567891234'), + (default, 'user_9', 'Shanghai', '123567891234'), + (default, 'user_10', 'Shanghai', '123567891234'), + (default, 'user_11', 'Shanghai', '123567891234'), + (default, 'user_12', 'Shanghai', '123567891234'), + (default, 'user_13', 'Shanghai', '123567891234'), + (default, 'user_14', 'Shanghai', '123567891234'), + (default, 'user_15', 'Shanghai', '123567891234'), + (default, 'user_16', 'Shanghai', '123567891234'), + (default, 'user_17', 'Shanghai', '123567891234'), + (default, 'user_18', 'Shanghai', '123567891234'), + (default, 'user_19', 'Shanghai', '123567891234'), + (default, 'user_20', 'Shanghai', '123567891234'), + (default, 'user_21', 'Shanghai', '123567891234'); \ No newline at end of file From 9775fc02550b4fe23c63d06a6e78e06e6bf8a368 Mon Sep 17 00:00:00 2001 From: Junbo Wang Date: Thu, 19 Mar 2026 18:59:46 +0800 Subject: [PATCH 4/5] update percona toolkit version and improve gh-ost download mechanism --- .../MySqlOnLineSchemaMigrationITCase.java | 2 +- .../mysql/source/MySqlOscITCase.java | 19 ++++++++++++------- .../mysql/source/utils/RecordUtils.java | 12 ++---------- ...ySqlOnLineSchemaMigrationSourceITCase.java | 2 +- ...MySqlOnLineSchemaMigrationTableITCase.java | 2 +- 5 files changed, 17 insertions(+), 20 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOnLineSchemaMigrationITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOnLineSchemaMigrationITCase.java index 8716fffc420..522d7dd2647 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOnLineSchemaMigrationITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOnLineSchemaMigrationITCase.java @@ -82,7 +82,7 @@ class MySqlOnLineSchemaMigrationITCase extends MySqlSourceTestBase { private static final MySqlContainer MYSQL8_CONTAINER = createMySqlContainer(MySqlVersion.V8_0, "docker/server-gtids/expire-seconds/my.cnf"); - private static final String PERCONA_TOOLKIT = "perconalab/percona-toolkit:3.5.7"; + private static final String PERCONA_TOOLKIT = "perconalab/percona-toolkit:3.7.1"; protected static final GenericContainer PERCONA_TOOLKIT_CONTAINER = createPerconaToolkitContainer(); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOscITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOscITCase.java index fffe1c57d9d..2907a040574 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOscITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOscITCase.java @@ -48,7 +48,6 @@ import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.lifecycle.Startables; -import org.testcontainers.utility.MountableFile; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -65,7 +64,7 @@ class MySqlOscITCase extends MySqlSourceTestBase { private static final MySqlContainer MYSQL8_CONTAINER = createMySqlContainer(MySqlVersion.V8_0); - private static final String PERCONA_TOOLKIT = "perconalab/percona-toolkit:3.5.7"; + private static final String PERCONA_TOOLKIT = "perconalab/percona-toolkit:3.7.1"; protected static final GenericContainer PERCONA_TOOLKIT_CONTAINER = createPerconaToolkitContainer(); @@ -76,10 +75,10 @@ class MySqlOscITCase extends MySqlSourceTestBase { private final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - private static final String GH_OST_RESOURCE_NAME = + private static final String GH_OST_DOWNLOAD_LINK = DockerClientFactory.instance().client().versionCmd().exec().getArch().equals("amd64") - ? "ghost-cli/gh-ost-binary-linux-amd64-20231207144046.tar.gz" - : "ghost-cli/gh-ost-binary-linux-arm64-20231207144046.tar.gz"; + ? "https://github.com/github/gh-ost/releases/download/v1.1.6/gh-ost-binary-linux-amd64-20231207144046.tar.gz" + : "https://github.com/github/gh-ost/releases/download/v1.1.6/gh-ost-binary-linux-arm64-20231207144046.tar.gz"; private final PrintStream standardOut = System.out; private final ByteArrayOutputStream outCaptor = new ByteArrayOutputStream(); @@ -128,8 +127,14 @@ void after() { private static void installGhOstCli(Container container) { try { - container.copyFileToContainer( - MountableFile.forClasspathResource(GH_OST_RESOURCE_NAME), "/tmp/gh-ost.tar.gz"); + execInContainer( + container, + "download gh-ost tarball", + "curl", + "-L", + "-o", + "/tmp/gh-ost.tar.gz", + GH_OST_DOWNLOAD_LINK); execInContainer( container, "unzip binary", "tar", "-xzvf", "/tmp/gh-ost.tar.gz", "-C", "/bin"); } catch (IOException | InterruptedException e) { diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java index 37ca4144caf..332e2c825d9 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java @@ -44,6 +44,7 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.time.Instant; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Comparator; @@ -578,20 +579,13 @@ public static Optional parseOnLineSchemaRenameEvent(SourceRecord record) return Optional.of(renamedFromTableName); } - if (RDS_OGT_TEMP_TABLE_ID_PATTERN.matcher(renamedToTableName).matches()) { - LOG.info( - "Renamed to TableId name {} matches RDS temporary TableId pattern, yield {}.", - renamedToTableName, - renamedFromTableName); - return Optional.of(renamedFromTableName); - } - LOG.info( "Renamed to TableId {} does not match any RegEx pattern, skip it.", renamedToTableName); } return Optional.empty(); } catch (JsonProcessingException e) { + LOG.warn("Failed to parse schema change event {}", value, e); return Optional.empty(); } } @@ -599,8 +593,6 @@ public static Optional parseOnLineSchemaRenameEvent(SourceRecord record) private static final Pattern OSC_TABLE_ID_PATTERN = Pattern.compile("^_(.*)_(gho|new)$"); private static final Pattern OSC_TEMP_TABLE_ID_PATTERN = Pattern.compile("^_(.*)_(del|old)$"); - private static final Pattern RDS_OGT_TEMP_TABLE_ID_PATTERN = - Pattern.compile("^tp_\\d*_del_(.*)$"); /** This utility method peels out gh-ost/pt-osc mangled tableId to the original one. */ public static TableId peelTableId(TableId tableId) { diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOnLineSchemaMigrationSourceITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOnLineSchemaMigrationSourceITCase.java index 25b3df8378b..145bf4c0e40 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOnLineSchemaMigrationSourceITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOnLineSchemaMigrationSourceITCase.java @@ -77,7 +77,7 @@ class MySqlOnLineSchemaMigrationSourceITCase extends MySqlSourceTestBase { private static final String TEST_USER = "mysqluser"; private static final String TEST_PASSWORD = "mysqlpw"; - private static final String PERCONA_TOOLKIT = "perconalab/percona-toolkit:3.5.7"; + private static final String PERCONA_TOOLKIT = "perconalab/percona-toolkit:3.7.1"; protected static final GenericContainer PERCONA_TOOLKIT_CONTAINER = createPerconaToolkitContainer(); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlOnLineSchemaMigrationTableITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlOnLineSchemaMigrationTableITCase.java index 97e18f1127f..752b54f240d 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlOnLineSchemaMigrationTableITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlOnLineSchemaMigrationTableITCase.java @@ -78,7 +78,7 @@ class MySqlOnLineSchemaMigrationTableITCase extends MySqlSourceTestBase { private static final String TEST_USER = "mysqluser"; private static final String TEST_PASSWORD = "mysqlpw"; - private static final String PERCONA_TOOLKIT = "perconalab/percona-toolkit:3.5.7"; + private static final String PERCONA_TOOLKIT = "perconalab/percona-toolkit:3.7.1"; protected static final GenericContainer PERCONA_TOOLKIT_CONTAINER = createPerconaToolkitContainer(); From 34ad9222c813bc0b116b057921616572f5637d83 Mon Sep 17 00:00:00 2001 From: "boyu.wjb" Date: Fri, 20 Mar 2026 09:27:35 +0800 Subject: [PATCH 5/5] refactor(mysql-cdc): move online schema change utilities to dedicated class --- .../debezium/reader/BinlogSplitReader.java | 7 +- .../source/utils/OnlineSchemaChangeUtils.java | 205 ++++++++++++++++++ .../mysql/source/utils/RecordUtils.java | 138 ------------ 3 files changed, 209 insertions(+), 141 deletions(-) create mode 100644 flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/OnlineSchemaChangeUtils.java diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java index 139e465d8e6..b1e6d1dfc86 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java @@ -27,6 +27,7 @@ import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplit; import org.apache.flink.cdc.connectors.mysql.source.split.SourceRecords; import org.apache.flink.cdc.connectors.mysql.source.utils.ChunkUtils; +import org.apache.flink.cdc.connectors.mysql.source.utils.OnlineSchemaChangeUtils; import org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils; import org.apache.flink.cdc.connectors.mysql.source.utils.SplitKeyUtils; import org.apache.flink.cdc.connectors.mysql.table.StartupMode; @@ -196,7 +197,7 @@ public Iterator pollSplitRecords() throws InterruptedException { } Optional finishedTables = - RecordUtils.parseOnLineSchemaRenameEvent(event.getRecord()); + OnlineSchemaChangeUtils.parseOnLineSchemaRenameEvent(event.getRecord()); if (finishedTables.isPresent()) { TableId tableId = RecordUtils.getTableId(event.getRecord()); String finishedTableId = tableId.catalog() + "." + finishedTables.get(); @@ -257,11 +258,11 @@ public void close() { } private Optional parseOnLineSchemaChangeEvent(SourceRecord sourceRecord) { - if (RecordUtils.isOnLineSchemaChangeEvent(sourceRecord)) { + if (OnlineSchemaChangeUtils.isOnLineSchemaChangeEvent(sourceRecord)) { // This is a gh-ost initialized schema change event and should be emitted if the // peeled tableId matches the predicate. TableId originalTableId = RecordUtils.getTableId(sourceRecord); - TableId peeledTableId = RecordUtils.peelTableId(originalTableId); + TableId peeledTableId = OnlineSchemaChangeUtils.peelTableId(originalTableId); if (capturedTableFilter.test(peeledTableId)) { return Optional.of( RecordUtils.setTableId(sourceRecord, originalTableId, peeledTableId)); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/OnlineSchemaChangeUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/OnlineSchemaChangeUtils.java new file mode 100644 index 00000000000..aef47a4d326 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/OnlineSchemaChangeUtils.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.mysql.source.utils; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import io.debezium.data.Envelope; +import io.debezium.relational.TableId; +import io.debezium.relational.history.HistoryRecord; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.source.SourceRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Optional; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static io.debezium.connector.AbstractSourceInfo.TABLE_NAME_KEY; +import static org.apache.flink.cdc.connectors.mysql.debezium.dispatcher.EventDispatcherImpl.HISTORY_RECORD_FIELD; + +/** Utility class for handling gh-ost/pt-osc online schema change events. */ +public class OnlineSchemaChangeUtils { + + private static final Logger LOG = LoggerFactory.getLogger(OnlineSchemaChangeUtils.class); + + private OnlineSchemaChangeUtils() {} + + /** + * Pattern matching gh-ost shadow table ({@code __gho}) and pt-osc new table ({@code + * __new}), which carry the actual ALTER DDL during an online schema change. + */ + private static final Pattern OSC_TABLE_ID_PATTERN = Pattern.compile("^_(.*)_(gho|new)$"); + + /** + * Pattern matching gh-ost delete table ({@code __del}) and pt-osc old table ({@code + * __old}), which are the temporary backup tables created during an online schema change. + */ + private static final Pattern OSC_TEMP_TABLE_ID_PATTERN = Pattern.compile("^_(.*)_(del|old)$"); + + /** + * Checks whether the given source record is a gh-ost/pt-osc initiated schema change event by + * inspecting the ALTER DDL statement targeting a shadow/new table. + * + *

There will be these schema change events generated in total during one transaction. + * + *

gh-ost: + * + *

+     * DROP TABLE IF EXISTS `db`.`_tb1_gho`
+     * DROP TABLE IF EXISTS `db`.`_tb1_del`
+     * DROP TABLE IF EXISTS `db`.`_tb1_ghc`
+     * create /* gh-ost */ table `db`.`_tb1_ghc` ...
+     * create /* gh-ost */ table `db`.`_tb1_gho` like `db`.`tb1`
+     * alter /* gh-ost */ table `db`.`_tb1_gho` add column c varchar(255)
+     * alter /* gh-ost */ table `db`.`_tb1_gho` AUTO_INCREMENT=N  (only present when the table has an AUTO_INCREMENT column)
+     * create /* gh-ost */ table `db`.`_tb1_del` ...
+     * DROP TABLE IF EXISTS `db`.`_tb1_del`
+     * rename /* gh-ost */ table `db`.`tb1` to `db`.`_tb1_del`
+     * rename /* gh-ost */ table `db`.`_tb1_gho` to `db`.`tb1`
+     * DROP TABLE IF EXISTS `db`.`_tb1_ghc`
+     * DROP TABLE IF EXISTS `db`.`_tb1_del`
+     * 
+ * + *

pt-osc: + * + *

+     * CREATE TABLE `db`.`_test_tb1_new`
+     * ALTER TABLE `db`.`_test_tb1_new` add column c varchar(50)
+     * CREATE TRIGGER `pt_osc_db_test_tb1_del`...
+     * CREATE TRIGGER `pt_osc_db_test_tb1_upd`...
+     * CREATE TRIGGER `pt_osc_db_test_tb1_ins`...
+     * ANALYZE TABLE `db`.`_test_tb1_new` /* pt-online-schema-change */
+     * RENAME TABLE `db`.`test_tb1` TO `db`.`_test_tb1_old`, `db`.`_test_tb1_new` TO `db`.`test_tb1`
+     * DROP TABLE IF EXISTS `_test_tb1_old` /* generated by server */
+     * DROP TRIGGER IF EXISTS `db`.`pt_osc_db_test_tb1_del`
+     * DROP TRIGGER IF EXISTS `db`.`pt_osc_db_test_tb1_upd`
+     * DROP TRIGGER IF EXISTS `db`.`pt_osc_db_test_tb1_ins`
+     * 
+ * + *

Among all these, only the ALTER statement targeting the {@code _gho}/{@code _new} table is + * stored temporarily, and emitted when the subsequent RENAME TABLE event arrives. + */ + public static boolean isOnLineSchemaChangeEvent(SourceRecord record) { + if (!RecordUtils.isSchemaChangeEvent(record)) { + return false; + } + Struct value = (Struct) record.value(); + ObjectMapper mapper = new ObjectMapper(); + try { + String ddl = + mapper.readTree(value.getString(HISTORY_RECORD_FIELD)) + .get(HistoryRecord.Fields.DDL_STATEMENTS) + .asText() + .toLowerCase(); + if (ddl.startsWith("alter")) { + String tableName = + value.getStruct(Envelope.FieldName.SOURCE).getString(TABLE_NAME_KEY); + return OSC_TABLE_ID_PATTERN.matcher(tableName).matches(); + } + return false; + } catch (JsonProcessingException e) { + return false; + } + } + + /** + * Parses a gh-ost/pt-osc RENAME TABLE event and returns the original (user-facing) table name + * if the event represents the completion of an online schema change. + * + * @return the original table name if the record is an OSC completion rename, or {@link + * Optional#empty()} otherwise. + */ + public static Optional parseOnLineSchemaRenameEvent(SourceRecord record) { + if (!RecordUtils.isSchemaChangeEvent(record)) { + return Optional.empty(); + } + Struct value = (Struct) record.value(); + ObjectMapper mapper = new ObjectMapper(); + + try { + String ddl = + mapper.readTree(value.getString(HISTORY_RECORD_FIELD)) + .get(HistoryRecord.Fields.DDL_STATEMENTS) + .asText() + .toLowerCase(); + if (ddl.startsWith("rename table") || ddl.startsWith("rename /* gh-ost */ table")) { + LOG.info("Checking if DDL might be an OSC renaming event... {}", ddl); + List tableNames = + Arrays.asList( + value.getStruct(Envelope.FieldName.SOURCE) + .getString(TABLE_NAME_KEY) + .split(",")); + if (tableNames.size() != 2) { + LOG.info( + "Table name {} is malformed, skip it.", + value.getStruct(Envelope.FieldName.SOURCE).getString(TABLE_NAME_KEY)); + return Optional.empty(); + } + + String renamedFromTableName = + Collections.min(tableNames, Comparator.comparingInt(String::length)); + String renamedToTableName = + Collections.max(tableNames, Comparator.comparingInt(String::length)); + + LOG.info( + "Determined the shorter TableId {} is the renaming source.", + renamedFromTableName); + LOG.info( + "Determined the longer TableId {} is the renaming target.", + renamedToTableName); + + if (OSC_TEMP_TABLE_ID_PATTERN.matcher(renamedToTableName).matches()) { + LOG.info( + "Renamed to TableId name {} matches OSC temporary TableId pattern, yield {}.", + renamedToTableName, + renamedFromTableName); + return Optional.of(renamedFromTableName); + } + + LOG.info( + "Renamed to TableId {} does not match any RegEx pattern, skip it.", + renamedToTableName); + } + return Optional.empty(); + } catch (JsonProcessingException e) { + LOG.warn("Failed to parse schema change event {}", value, e); + return Optional.empty(); + } + } + + /** + * Peels out a gh-ost/pt-osc mangled {@link TableId} back to the original user-facing one. + * + *

For example, {@code _customers_gho} → {@code customers}, {@code _orders_new} → {@code + * orders}. + */ + public static TableId peelTableId(TableId tableId) { + Matcher matchingResult = OSC_TABLE_ID_PATTERN.matcher(tableId.table()); + if (matchingResult.matches()) { + return new TableId(tableId.catalog(), tableId.schema(), matchingResult.group(1)); + } + return tableId; + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java index 332e2c825d9..1f9a041d78f 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java @@ -25,9 +25,6 @@ import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSnapshotSplit; import org.apache.flink.table.types.logical.RowType; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; - import io.debezium.data.Envelope; import io.debezium.document.Document; import io.debezium.document.DocumentReader; @@ -44,17 +41,13 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.time.Instant; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import java.util.stream.Collectors; import static io.debezium.connector.AbstractSourceInfo.DATABASE_NAME_KEY; @@ -471,135 +464,4 @@ private static Optional getWatermarkKind(SourceRecord record) { } return Optional.empty(); } - - /** - * This utility method checks if given source record is a gh-ost/pt-osc initiated schema change - * event by checking the "alter" ddl. - */ - public static boolean isOnLineSchemaChangeEvent(SourceRecord record) { - if (!isSchemaChangeEvent(record)) { - return false; - } - Struct value = (Struct) record.value(); - ObjectMapper mapper = new ObjectMapper(); - try { - // There will be these schema change events generated in total during one transaction. - // - // gh-ost: - // DROP TABLE IF EXISTS `db`.`_tb1_gho` - // DROP TABLE IF EXISTS `db`.`_tb1_del` - // DROP TABLE IF EXISTS `db`.`_tb1_ghc` - // create /* gh-ost */ table `db`.`_tb1_ghc` ... - // create /* gh-ost */ table `db`.`_tb1_gho` like `db`.`tb1` - // alter /* gh-ost */ table `db`.`_tb1_gho` add column c varchar(255) - // create /* gh-ost */ table `db`.`_tb1_del` ... - // DROP TABLE IF EXISTS `db`.`_tb1_del` - // rename /* gh-ost */ table `db`.`tb1` to `db`.`_tb1_del` - // rename /* gh-ost */ table `db`.`_tb1_gho` to `db`.`tb1` - // DROP TABLE IF EXISTS `db`.`_tb1_ghc` - // DROP TABLE IF EXISTS `db`.`_tb1_del` - // - // pt-osc: - // CREATE TABLE `db`.`_test_tb1_new` - // ALTER TABLE `db`.`_test_tb1_new` add column c varchar(50) - // CREATE TRIGGER `pt_osc_db_test_tb1_del`... - // CREATE TRIGGER `pt_osc_db_test_tb1_upd`... - // CREATE TRIGGER `pt_osc_db_test_tb1_ins`... - // ANALYZE TABLE `db`.`_test_tb1_new` /* pt-online-schema-change */ - // RENAME TABLE `db`.`test_tb1` TO `db`.`_test_tb1_old`, `db`.`_test_tb1_new` TO - // `db`.`test_tb1` - // DROP TABLE IF EXISTS `_test_tb1_old` /* generated by server */ - // DROP TRIGGER IF EXISTS `db`.`pt_osc_db_test_tb1_del` - // DROP TRIGGER IF EXISTS `db`.`pt_osc_db_test_tb1_upd` - // DROP TRIGGER IF EXISTS `db`.`pt_osc_db_test_tb1_ins` - // - // Among all these, we only need the "ALTER" one that happens on the `_gho`/`_new` - // table and store them temporarily, and emit them when the RENAME TABLE event pops up. - String ddl = - mapper.readTree(value.getString(HISTORY_RECORD_FIELD)) - .get(HistoryRecord.Fields.DDL_STATEMENTS) - .asText() - .toLowerCase(); - if (ddl.startsWith("alter")) { - String tableName = - value.getStruct(Envelope.FieldName.SOURCE).getString(TABLE_NAME_KEY); - return OSC_TABLE_ID_PATTERN.matcher(tableName).matches(); - } - - return false; - } catch (JsonProcessingException e) { - return false; - } - } - - public static Optional parseOnLineSchemaRenameEvent(SourceRecord record) { - if (!isSchemaChangeEvent(record)) { - return Optional.empty(); - } - Struct value = (Struct) record.value(); - ObjectMapper mapper = new ObjectMapper(); - - try { - String ddl = - mapper.readTree(value.getString(HISTORY_RECORD_FIELD)) - .get(HistoryRecord.Fields.DDL_STATEMENTS) - .asText() - .toLowerCase(); - if (ddl.startsWith("rename table") || ddl.startsWith("rename /* gh-ost */ table")) { - LOG.info("Checking if DDL might be an OSC renaming event... {}", ddl); - List tableNames = - Arrays.asList( - value.getStruct(Envelope.FieldName.SOURCE) - .getString(TABLE_NAME_KEY) - .split(",")); - if (tableNames.size() != 2) { - LOG.info( - "Table name {} is malformed, skip it.", - value.getStruct(Envelope.FieldName.SOURCE).getString(TABLE_NAME_KEY)); - return Optional.empty(); - } - - String renamedFromTableName = - Collections.min(tableNames, Comparator.comparingInt(String::length)); - String renamedToTableName = - Collections.max(tableNames, Comparator.comparingInt(String::length)); - - LOG.info( - "Determined the shorter TableId {} is the renaming source.", - renamedFromTableName); - LOG.info( - "Determined the longer TableId {} is the renaming target.", - renamedToTableName); - - if (OSC_TEMP_TABLE_ID_PATTERN.matcher(renamedToTableName).matches()) { - LOG.info( - "Renamed to TableId name {} matches OSC temporary TableId pattern, yield {}.", - renamedToTableName, - renamedFromTableName); - return Optional.of(renamedFromTableName); - } - - LOG.info( - "Renamed to TableId {} does not match any RegEx pattern, skip it.", - renamedToTableName); - } - return Optional.empty(); - } catch (JsonProcessingException e) { - LOG.warn("Failed to parse schema change event {}", value, e); - return Optional.empty(); - } - } - - private static final Pattern OSC_TABLE_ID_PATTERN = Pattern.compile("^_(.*)_(gho|new)$"); - - private static final Pattern OSC_TEMP_TABLE_ID_PATTERN = Pattern.compile("^_(.*)_(del|old)$"); - - /** This utility method peels out gh-ost/pt-osc mangled tableId to the original one. */ - public static TableId peelTableId(TableId tableId) { - Matcher matchingResult = OSC_TABLE_ID_PATTERN.matcher(tableId.table()); - if (matchingResult.matches()) { - return new TableId(tableId.catalog(), tableId.schema(), matchingResult.group(1)); - } - return tableId; - } }