Skip to content

Commit 4efbc47

Browse files
[FLINK-39200][mysql] Fix mysql cdc could get stuck in backfill binlog reading when reuse snapshot split reader
1 parent 91ae677 commit 4efbc47

3 files changed

Lines changed: 59 additions & 0 deletions

File tree

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@ public void submitSplit(MySqlSplit mySqlSplit) {
154154
() -> {
155155
try {
156156
currentTaskRunning = true;
157+
changeEventSourceContext.startChangeEventSource();
157158
final SnapshotSplitChangeEventSourceContextImpl sourceContext =
158159
new SnapshotSplitChangeEventSourceContextImpl();
159160

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/StoppableChangeEventSourceContext.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ public class StoppableChangeEventSourceContext
2828

2929
private volatile boolean isRunning = true;
3030

31+
public void startChangeEventSource() {
32+
isRunning = true;
33+
}
34+
3135
public void stopChangeEventSource() {
3236
isRunning = false;
3337
}

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -565,6 +565,60 @@ void testSnapshotScanSkipBackfillWithPreHighWatermark() throws Exception {
565565
assertEqualsInAnyOrder(Arrays.asList(expected), actual);
566566
}
567567

568+
@Test
569+
void testMultipleSplitsWithBackfill() throws Exception {
570+
String tableName = "customers_even_dist";
571+
String tableId = customerDatabase.getDatabaseName() + "." + tableName;
572+
MySqlSourceConfig sourceConfig = getConfig(customerDatabase, new String[] {tableName}, 4);
573+
StatefulTaskContext statefulTaskContext =
574+
new StatefulTaskContext(sourceConfig, binaryLogClient, mySqlConnection);
575+
576+
// Hook to make highWatermark > lowWatermark, enforcing backfill phase
577+
SnapshotPhaseHooks snapshotHooks = new SnapshotPhaseHooks();
578+
snapshotHooks.setPreHighWatermarkAction(
579+
(mySqlConnection, split) -> {
580+
if (split.splitId().equals(tableId + ":0")) {
581+
mySqlConnection.execute(
582+
"UPDATE " + tableId + " SET address = 'Beijing' WHERE id = 103");
583+
mySqlConnection.commit();
584+
} else if (split.splitId().equals(tableId + ":1")) {
585+
mySqlConnection.execute(
586+
"UPDATE " + tableId + " SET address = 'Beijing' WHERE id = 106");
587+
mySqlConnection.commit();
588+
} else if (split.splitId().equals(tableId + ":2")) {
589+
mySqlConnection.execute(
590+
"UPDATE " + tableId + " SET address = 'Beijing' WHERE id = 109");
591+
mySqlConnection.commit();
592+
}
593+
});
594+
595+
final DataType dataType =
596+
DataTypes.ROW(
597+
DataTypes.FIELD("id", DataTypes.BIGINT()),
598+
DataTypes.FIELD("name", DataTypes.STRING()),
599+
DataTypes.FIELD("address", DataTypes.STRING()),
600+
DataTypes.FIELD("phone_number", DataTypes.STRING()));
601+
List<MySqlSplit> mySqlSplits = getMySqlSplits(sourceConfig);
602+
603+
String[] expected =
604+
new String[] {
605+
"+I[101, user_1, Shanghai, 123567891234]",
606+
"+I[102, user_2, Shanghai, 123567891234]",
607+
"+I[103, user_3, Beijing, 123567891234]",
608+
"+I[104, user_4, Shanghai, 123567891234]",
609+
"+I[105, user_5, Shanghai, 123567891234]",
610+
"+I[106, user_6, Beijing, 123567891234]",
611+
"+I[107, user_7, Shanghai, 123567891234]",
612+
"+I[108, user_8, Shanghai, 123567891234]",
613+
"+I[109, user_9, Beijing, 123567891234]",
614+
"+I[110, user_10, Shanghai, 123567891234]"
615+
};
616+
617+
List<String> actual = readTableSnapshotSplits(
618+
mySqlSplits, statefulTaskContext, mySqlSplits.size(), dataType, snapshotHooks);
619+
assertEqualsInAnyOrder(Arrays.asList(expected), actual);
620+
}
621+
568622
private List<String> readTableSnapshotSplits(
569623
List<MySqlSplit> mySqlSplits,
570624
StatefulTaskContext statefulTaskContext,

0 commit comments

Comments
 (0)