diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java index b7b23a18e0e..e3549237065 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java @@ -153,7 +153,7 @@ public void submitSplit(MySqlSplit mySqlSplit) { executorService.execute( () -> { try { - currentTaskRunning = true; + startCurrentTask(); final SnapshotSplitChangeEventSourceContextImpl sourceContext = new SnapshotSplitChangeEventSourceContextImpl(); @@ -432,6 +432,11 @@ public void close() { } } + private void startCurrentTask() { + currentTaskRunning = true; + changeEventSourceContext.startChangeEventSource(); + } + private void stopCurrentTask() { currentTaskRunning = false; changeEventSourceContext.stopChangeEventSource(); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/StoppableChangeEventSourceContext.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/StoppableChangeEventSourceContext.java index bc9c9f981cb..96c5eb6b38e 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/StoppableChangeEventSourceContext.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/StoppableChangeEventSourceContext.java @@ -28,6 +28,10 @@ public class StoppableChangeEventSourceContext private volatile boolean isRunning = true; + public void startChangeEventSource() { + isRunning = true; + } + public void stopChangeEventSource() { isRunning = false; } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java index 15713861879..6598ed12674 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java @@ -38,8 +38,8 @@ import io.debezium.relational.TableId; import org.apache.kafka.connect.source.SourceRecord; import org.assertj.core.api.Assertions; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.sql.SQLException; @@ -64,18 +64,17 @@ class SnapshotSplitReaderTest extends MySqlSourceTestBase { private static BinaryLogClient binaryLogClient; private static MySqlConnection mySqlConnection; - @BeforeAll - public static void init() { + @BeforeEach + public void beforeEach() { customerDatabase.createAndInitialize(); - customer3_0Database.createAndInitialize(); MySqlSourceConfig sourceConfig = getConfig(customerDatabase, new String[] {"customers"}, 10); binaryLogClient = DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration()); mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig); } - @AfterAll - public static void afterClass() throws Exception { + @AfterEach + public void afterEach() throws Exception { if (mySqlConnection != null) { mySqlConnection.close(); } @@ -83,6 +82,7 @@ public static void afterClass() throws Exception { if (binaryLogClient != null) { binaryLogClient.disconnect(); } + customerDatabase.dropDatabase(); } @Test @@ -113,6 +113,7 @@ void testReadSingleSnapshotSplit() throws Exception { @Test void testReadSingleSnapshotSplitWithDotName() throws Exception { + customer3_0Database.createAndInitialize(); MySqlSourceConfig sourceConfig = getConfig(customer3_0Database, new String[] {"customers3.0"}, 4); BinaryLogClient binaryLogClient = @@ -146,6 +147,9 @@ void testReadSingleSnapshotSplitWithDotName() throws Exception { }; List actual = readTableSnapshotSplits(mySqlSplits, statefulTaskContext, 1, dataType); + mySqlConnection.close(); + binaryLogClient.disconnect(); + assertEqualsInAnyOrder(Arrays.asList(expected), actual); } @@ -565,6 +569,65 @@ void testSnapshotScanSkipBackfillWithPreHighWatermark() throws Exception { assertEqualsInAnyOrder(Arrays.asList(expected), actual); } + @Test + void testMultipleSplitsWithBackfill() throws Exception { + String tableName = "customers_even_dist"; + String tableId = customerDatabase.getDatabaseName() + "." + tableName; + MySqlSourceConfig sourceConfig = getConfig(customerDatabase, new String[] {tableName}, 4); + StatefulTaskContext statefulTaskContext = + new StatefulTaskContext(sourceConfig, binaryLogClient, mySqlConnection); + + // Hook to make highWatermark > lowWatermark, enforcing backfill phase + SnapshotPhaseHooks snapshotHooks = new SnapshotPhaseHooks(); + snapshotHooks.setPreHighWatermarkAction( + (mySqlConnection, split) -> { + if (split.splitId().equals(tableId + ":0")) { + mySqlConnection.execute( + "UPDATE " + tableId + " SET address = 'Beijing' WHERE id = 103"); + mySqlConnection.commit(); + } else if (split.splitId().equals(tableId + ":1")) { + mySqlConnection.execute( + "UPDATE " + tableId + " SET address = 'Beijing' WHERE id = 106"); + mySqlConnection.commit(); + } else if (split.splitId().equals(tableId + ":2")) { + mySqlConnection.execute( + "UPDATE " + tableId + " SET address = 'Beijing' WHERE id = 109"); + mySqlConnection.commit(); + } + }); + + final DataType dataType = + DataTypes.ROW( + DataTypes.FIELD("id", DataTypes.BIGINT()), + DataTypes.FIELD("name", DataTypes.STRING()), + DataTypes.FIELD("address", DataTypes.STRING()), + DataTypes.FIELD("phone_number", DataTypes.STRING())); + List mySqlSplits = getMySqlSplits(sourceConfig); + + String[] expected = + new String[] { + "+I[101, user_1, Shanghai, 123567891234]", + "+I[102, user_2, Shanghai, 123567891234]", + "+I[103, user_3, Beijing, 123567891234]", + "+I[104, user_4, Shanghai, 123567891234]", + "+I[105, user_5, Shanghai, 123567891234]", + "+I[106, user_6, Beijing, 123567891234]", + "+I[107, user_7, Shanghai, 123567891234]", + "+I[108, user_8, Shanghai, 123567891234]", + "+I[109, user_9, Beijing, 123567891234]", + "+I[110, user_10, Shanghai, 123567891234]" + }; + + List actual = + readTableSnapshotSplits( + mySqlSplits, + statefulTaskContext, + mySqlSplits.size(), + dataType, + snapshotHooks); + assertEqualsInAnyOrder(Arrays.asList(expected), actual); + } + private List readTableSnapshotSplits( List mySqlSplits, StatefulTaskContext statefulTaskContext,