diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOscITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOscITCase.java new file mode 100644 index 00000000000..aeea8d54af0 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOscITCase.java @@ -0,0 +1,355 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.mysql.source; + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.cdc.common.configuration.Configuration; +import org.apache.flink.cdc.common.pipeline.PipelineOptions; +import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; +import org.apache.flink.cdc.common.utils.TestCaseUtils; +import org.apache.flink.cdc.composer.PipelineExecution; +import org.apache.flink.cdc.composer.definition.PipelineDef; +import org.apache.flink.cdc.composer.definition.RouteDef; +import org.apache.flink.cdc.composer.definition.SinkDef; +import org.apache.flink.cdc.composer.definition.SourceDef; +import org.apache.flink.cdc.composer.flink.FlinkPipelineComposer; +import org.apache.flink.cdc.connectors.mysql.factory.MySqlDataSourceFactory; +import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer; +import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion; +import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase; +import org.apache.flink.cdc.connectors.values.ValuesDatabase; +import org.apache.flink.cdc.connectors.values.factory.ValuesDataFactory; +import org.apache.flink.cdc.connectors.values.sink.ValuesDataSinkOptions; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.planner.factories.TestValuesTableFactory; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.testcontainers.DockerClientFactory; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.utility.MountableFile; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.sql.Connection; +import java.sql.Statement; +import java.util.Collections; +import java.util.stream.Stream; + +import static org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_PASSWORD; +import static org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_USER; + +/** A more complicated IT case for Evolving MySQL schema with gh-ost/pt-osc utility. */ +class MySqlOscITCase extends MySqlSourceTestBase { + private static final MySqlContainer MYSQL8_CONTAINER = createMySqlContainer(MySqlVersion.V8_0); + + private static final String PERCONA_TOOLKIT = "perconalab/percona-toolkit:3.5.7"; + + protected static final GenericContainer PERCONA_TOOLKIT_CONTAINER = + createPerconaToolkitContainer(); + + private final UniqueDatabase customerDatabase = + new UniqueDatabase(MYSQL8_CONTAINER, "customer", TEST_USER, TEST_PASSWORD); + + private final StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(); + + private static final String GH_OST_RESOURCE_NAME = + DockerClientFactory.instance().client().versionCmd().exec().getArch().equals("amd64") + ? "ghost-cli/gh-ost-binary-linux-amd64-20231207144046.tar.gz" + : "ghost-cli/gh-ost-binary-linux-arm64-20231207144046.tar.gz"; + + private final PrintStream standardOut = System.out; + private final ByteArrayOutputStream outCaptor = new ByteArrayOutputStream(); + + @BeforeEach + void takeoverOutput() { + System.setOut(new PrintStream(outCaptor)); + } + + @AfterEach + protected void handInStdOut() { + System.setOut(standardOut); + outCaptor.reset(); + } + + @BeforeAll + static void beforeClass() { + LOG.info("Starting MySql8 containers..."); + Startables.deepStart(Stream.of(MYSQL8_CONTAINER)).join(); + Startables.deepStart(Stream.of(PERCONA_TOOLKIT_CONTAINER)).join(); + LOG.info("Container MySql8 is started."); + } + + @AfterAll + static void afterClass() { + LOG.info("Stopping MySql8 containers..."); + MYSQL8_CONTAINER.stop(); + PERCONA_TOOLKIT_CONTAINER.stop(); + LOG.info("Container MySql8 is stopped."); + } + + @BeforeEach + void before() { + customerDatabase.createAndInitialize(); + TestValuesTableFactory.clearAllData(); + ValuesDatabase.clear(); + env.setParallelism(4); + env.enableCheckpointing(200); + env.setRestartStrategy(RestartStrategies.noRestart()); + } + + @AfterEach + void after() { + customerDatabase.dropDatabase(); + } + + private static void installGhOstCli(Container container) { + try { + container.copyFileToContainer( + MountableFile.forClasspathResource(GH_OST_RESOURCE_NAME), "/tmp/gh-ost.tar.gz"); + execInContainer( + container, "unzip binary", "tar", "-xzvf", "/tmp/gh-ost.tar.gz", "-C", "/bin"); + } catch (IOException | InterruptedException e) { + throw new RuntimeException(e); + } + } + + private static GenericContainer createPerconaToolkitContainer() { + GenericContainer perconaToolkit = + new GenericContainer<>(PERCONA_TOOLKIT) + // keep container alive + .withCommand("tail", "-f", "/dev/null") + .withNetwork(NETWORK) + .withLogConsumer(new Slf4jLogConsumer(LOG)); + return perconaToolkit; + } + + private void insertRecordsPhase1(UniqueDatabase database, int startIndex, int count) + throws Exception { + try (Connection connection = database.getJdbcConnection(); + Statement statement = connection.createStatement()) { + for (int i = startIndex; i < startIndex + count; i++) { + statement.execute( + String.format( + "insert into customers (id, name, address, phone_number) values (%s, '%s', '%s', '%s');", + i, "flink_" + i, "Address Line #" + i, 1000000000L + i)); + } + } + } + + private void insertRecordsPhase2(UniqueDatabase database, int startIndex, int count) + throws Exception { + try (Connection connection = database.getJdbcConnection(); + Statement statement = connection.createStatement()) { + for (int i = startIndex; i < startIndex + count; i++) { + statement.execute( + String.format( + "insert into customers (id, name, address, phone_number, ext) values (%s, '%s', '%s', '%s', %s);", + i, "flink_" + i, "Address Line #" + i, 1000000000L + i, i)); + } + } + } + + @Test + void testGhOstSchemaMigration() throws Exception { + String databaseName = customerDatabase.getDatabaseName(); + + LOG.info("Step 1: Install gh-ost command line utility"); + installGhOstCli(MYSQL8_CONTAINER); + + Thread yamlJob = runJob(databaseName, "customers"); + yamlJob.start(); + + LOG.info("Step 2: Start pipeline job"); + insertRecordsPhase1(customerDatabase, 5000, 1000); + + LOG.info("Step 3: Evolve schema with gh-ost - ADD COLUMN"); + + Thread thread = + new Thread( + () -> { + try { + execInContainer( + MYSQL8_CONTAINER, + "evolve schema", + "gh-ost", + "--user=" + TEST_USER, + "--password=" + TEST_PASSWORD, + "--database=" + databaseName, + "--table=customers", + "--alter=add column ext int first", + "--allow-on-master", // because we don't have a replica + "--initially-drop-old-table", // drop previously generated + // temporary tables + "--execute"); + } catch (IOException | InterruptedException e) { + throw new RuntimeException(e); + } + }); + + thread.start(); + insertRecordsPhase1(customerDatabase, 7000, 3000); + + thread.join(); + insertRecordsPhase2(customerDatabase, 12000, 1000); + + try { + TestCaseUtils.repeatedCheck( + () -> outCaptor.toString().split(System.lineSeparator()).length == 5023); + } catch (Exception e) { + LOG.error("Failed to verify results. Captured stdout: {}", outCaptor.toString(), e); + } finally { + yamlJob.interrupt(); + } + } + + @Test + void testPtOscSchemaMigration() throws Exception { + String databaseName = customerDatabase.getDatabaseName(); + + LOG.info("Step 1: Install gh-ost command line utility"); + installGhOstCli(MYSQL8_CONTAINER); + + Thread yamlJob = runJob(databaseName, "customers"); + yamlJob.start(); + + LOG.info("Step 2: Start pipeline job"); + insertRecordsPhase1(customerDatabase, 5000, 1000); + + LOG.info("Step 3: Evolve schema with gh-ost - ADD COLUMN"); + + Thread thread = + new Thread( + () -> { + try { + execInContainer( + PERCONA_TOOLKIT_CONTAINER, + "evolve schema", + "pt-online-schema-change", + "--user=" + TEST_USER, + "--host=" + INTER_CONTAINER_MYSQL_ALIAS, + "--password=" + TEST_PASSWORD, + "P=3306,t=customers,D=" + databaseName, + "--alter", + "add column ext int first", + "--charset=utf8", + "--recursion-method=NONE", // Do not look for slave nodes + "--print", + "--execute"); + } catch (IOException | InterruptedException e) { + throw new RuntimeException(e); + } + }); + + LOG.info("Insertion Phase 1 finishes"); + thread.start(); + insertRecordsPhase1(customerDatabase, 7000, 3000); + LOG.info("Insertion Phase 2 finishes"); + + thread.join(); + insertRecordsPhase2(customerDatabase, 12000, 1000); + LOG.info("Insertion Phase 3 finishes"); + + try { + TestCaseUtils.repeatedCheck( + () -> outCaptor.toString().split(System.lineSeparator()).length == 5023); + } catch (Exception e) { + LOG.error("Failed to verify results. Captured stdout: {}", outCaptor.toString(), e); + } finally { + yamlJob.interrupt(); + } + } + + private static void execInContainer(Container container, String prompt, String... commands) + throws IOException, InterruptedException { + { + LOG.info( + "Starting to {} with the following command: `{}`", + prompt, + String.join(" ", commands)); + Container.ExecResult execResult = container.execInContainer(commands); + if (execResult.getExitCode() == 0) { + LOG.info("Successfully {}. Stdout: {}", prompt, execResult.getStdout()); + } else { + LOG.error( + "Failed to {}. Exit code: {}, Stdout: {}, Stderr: {}", + prompt, + execResult.getExitCode(), + execResult.getStdout(), + execResult.getStderr()); + throw new IOException("Failed to execute commands: " + String.join(" ", commands)); + } + } + } + + private Thread runJob(String databaseName, String tableName) { + FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster(); + + // Setup MySQL source + Configuration sourceConfig = new Configuration(); + sourceConfig.set(MySqlDataSourceOptions.HOSTNAME, MYSQL8_CONTAINER.getHost()); + sourceConfig.set(MySqlDataSourceOptions.PORT, MYSQL8_CONTAINER.getDatabasePort()); + sourceConfig.set(MySqlDataSourceOptions.USERNAME, TEST_USER); + sourceConfig.set(MySqlDataSourceOptions.PASSWORD, TEST_PASSWORD); + sourceConfig.set(MySqlDataSourceOptions.SERVER_TIME_ZONE, "UTC"); + sourceConfig.set(MySqlDataSourceOptions.TABLES, databaseName + "." + tableName); + sourceConfig.set(MySqlDataSourceOptions.PARSE_ONLINE_SCHEMA_CHANGES, true); + + SourceDef sourceDef = + new SourceDef(MySqlDataSourceFactory.IDENTIFIER, "MySQL Source", sourceConfig); + + Configuration sinkConfig = new Configuration(); + sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true); + SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig); + + Configuration pipelineConfig = new Configuration(); + pipelineConfig.set( + PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); + PipelineDef pipelineDef = + new PipelineDef( + sourceDef, + sinkDef, + Collections.singletonList( + new RouteDef( + databaseName + "." + tableName, + "sink_db.sink_tbl", + null, + null)), + Collections.emptyList(), + Collections.emptyList(), + pipelineConfig); + + PipelineExecution execution = composer.compose(pipelineDef); + return new Thread( + () -> { + try { + execution.execute(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java index 87a435ff62b..806142e57df 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java @@ -90,6 +90,7 @@ public class BinlogSplitReader implements DebeziumReader pendingSchemaChangeEvents; private static final long READER_CLOSE_TIMEOUT = 30L; @@ -112,6 +113,7 @@ public BinlogSplitReader(StatefulTaskContext statefulTaskContext, int subtaskId) this.isParsingOnLineSchemaChanges = statefulTaskContext.getSourceConfig().isParseOnLineSchemaChanges(); this.isBackfillSkipped = statefulTaskContext.getSourceConfig().isSkipSnapshotBackfill(); + this.pendingSchemaChangeEvents = new HashMap<>(); } public void submitSplit(MySqlSplit mySqlSplit) { @@ -171,8 +173,33 @@ public Iterator pollSplitRecords() throws InterruptedException { Optional oscRecord = parseOnLineSchemaChangeEvent(event.getRecord()); if (oscRecord.isPresent()) { - sourceRecords.add(oscRecord.get()); - continue; + TableId tableId = RecordUtils.getTableId(oscRecord.get()); + if (tableId != null) { + LOG.info( + "Received the start event of online schema change: {}. Save it for later.", + oscRecord.get()); + pendingSchemaChangeEvents.put(tableId.toString(), oscRecord.get()); + continue; + } + } + + Optional finishedTables = + RecordUtils.parseOnLineSchemaRenameEvent(event.getRecord()); + if (finishedTables.isPresent()) { + TableId tableId = RecordUtils.getTableId(event.getRecord()); + String finishedTableId = tableId.catalog() + "." + finishedTables.get(); + LOG.info( + "Received the ending event of table {}. Emit corresponding DDL event now.", + finishedTableId); + + if (pendingSchemaChangeEvents.containsKey(finishedTableId)) { + sourceRecords.add(pendingSchemaChangeEvents.remove(finishedTableId)); + } else { + LOG.error( + "Error: met an unexpected osc finish event. Current pending events: {}, Record: {}", + pendingSchemaChangeEvents, + event); + } } } if (shouldEmit(event.getRecord())) { diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java index e6848f1c4c8..d3d0916b466 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java @@ -49,6 +49,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -572,7 +573,7 @@ public static boolean isOnLineSchemaChangeEvent(SourceRecord record) { // DROP TRIGGER IF EXISTS `db`.`pt_osc_db_test_tb1_ins` // // Among all these, we only need the "ALTER" one that happens on the `_gho`/`_new` - // table. + // table and store them temporarily, and emit them when the RENAME TABLE event pops up. String ddl = mapper.readTree(value.getString(HISTORY_RECORD_FIELD)) .get(HistoryRecord.Fields.DDL_STATEMENTS) @@ -590,8 +591,77 @@ public static boolean isOnLineSchemaChangeEvent(SourceRecord record) { } } + public static Optional parseOnLineSchemaRenameEvent(SourceRecord record) { + if (!isSchemaChangeEvent(record)) { + return Optional.empty(); + } + Struct value = (Struct) record.value(); + ObjectMapper mapper = new ObjectMapper(); + + try { + String ddl = + mapper.readTree(value.getString(HISTORY_RECORD_FIELD)) + .get(HistoryRecord.Fields.DDL_STATEMENTS) + .asText() + .toLowerCase(); + if (ddl.startsWith("rename table") || ddl.startsWith("rename /* gh-ost */ table")) { + LOG.info("Checking if DDL might be an OSC renaming event... {}", ddl); + List tableNames = + Arrays.asList( + value.getStruct(Envelope.FieldName.SOURCE) + .getString(TABLE_NAME_KEY) + .split(",")); + if (tableNames.size() != 2) { + LOG.info( + "Table name {} is malformed, skip it.", + value.getStruct(Envelope.FieldName.SOURCE).getString(TABLE_NAME_KEY)); + return Optional.empty(); + } + + String renamedFromTableName = + Collections.min(tableNames, Comparator.comparingInt(String::length)); + String renamedToTableName = + Collections.max(tableNames, Comparator.comparingInt(String::length)); + + LOG.info( + "Determined the shorter TableId {} is the renaming source.", + renamedFromTableName); + LOG.info( + "Determined the longer TableId {} is the renaming target.", + renamedToTableName); + + if (OSC_TEMP_TABLE_ID_PATTERN.matcher(renamedToTableName).matches()) { + LOG.info( + "Renamed to TableId name {} matches OSC temporary TableId pattern, yield {}.", + renamedToTableName, + renamedFromTableName); + return Optional.of(renamedFromTableName); + } + + if (RDS_OGT_TEMP_TABLE_ID_PATTERN.matcher(renamedToTableName).matches()) { + LOG.info( + "Renamed to TableId name {} matches RDS temporary TableId pattern, yield {}.", + renamedToTableName, + renamedFromTableName); + return Optional.of(renamedFromTableName); + } + + LOG.info( + "Renamed to TableId {} does not match any RegEx pattern, skip it.", + renamedToTableName); + } + return Optional.empty(); + } catch (JsonProcessingException e) { + return Optional.empty(); + } + } + private static final Pattern OSC_TABLE_ID_PATTERN = Pattern.compile("^_(.*)_(gho|new)$"); + private static final Pattern OSC_TEMP_TABLE_ID_PATTERN = Pattern.compile("^_(.*)_(del|old)$"); + private static final Pattern RDS_OGT_TEMP_TABLE_ID_PATTERN = + Pattern.compile("^tp_\\d*_del_(.*)$"); + /** This utility method peels out gh-ost/pt-osc mangled tableId to the original one. */ public static TableId peelTableId(TableId tableId) { Matcher matchingResult = OSC_TABLE_ID_PATTERN.matcher(tableId.table()); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOnLineSchemaMigrationSourceITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOnLineSchemaMigrationSourceITCase.java index ec9da16d141..25b3df8378b 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOnLineSchemaMigrationSourceITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlOnLineSchemaMigrationSourceITCase.java @@ -174,7 +174,8 @@ void testGhOstSchemaMigrationFromScratch() throws Exception { .serverId(getServerId()) .deserializer(new JsonDebeziumDeserializationSchema()) .serverTimeZone("UTC") - .includeSchemaChanges(true) // output the schema changes as well + .includeSchemaChanges(true) + .parseOnLineSchemaChanges(true) .build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -334,7 +335,8 @@ void testPtOscSchemaMigrationFromScratch() throws Exception { .serverId(getServerId()) .deserializer(new JsonDebeziumDeserializationSchema()) .serverTimeZone("UTC") - .includeSchemaChanges(true) // output the schema changes as well + .includeSchemaChanges(true) + .parseOnLineSchemaChanges(true) .build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlOnLineSchemaMigrationTableITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlOnLineSchemaMigrationTableITCase.java index 8bc09a4808a..97e18f1127f 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlOnLineSchemaMigrationTableITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlOnLineSchemaMigrationTableITCase.java @@ -177,7 +177,8 @@ void testGhOstSchemaMigrationFromScratch() throws Exception { + " 'table-name' = '%s'," + " 'scan.incremental.snapshot.enabled' = '%s'," + " 'server-time-zone' = 'UTC'," - + " 'server-id' = '%s'" + + " 'server-id' = '%s'," + + " 'scan.parse.online.schema.changes.enabled' = 'true'" + ")", MYSQL8_CONTAINER.getHost(), MYSQL8_CONTAINER.getDatabasePort(), @@ -346,7 +347,8 @@ void testPtOscSchemaMigrationFromScratch() throws Exception { + " 'table-name' = '%s'," + " 'scan.incremental.snapshot.enabled' = '%s'," + " 'server-time-zone' = 'UTC'," - + " 'server-id' = '%s'" + + " 'server-id' = '%s'," + + " 'scan.parse.online.schema.changes.enabled' = 'true'" + ")", MYSQL8_CONTAINER.getHost(), MYSQL8_CONTAINER.getDatabasePort(),