Skip to content

Commit 1f3c286

Browse files
committed
[FLINK-39179][mysql] Add REPLACE INTO filtering in handleQueryEvent to fix pt-table-checksum DML parsing
1 parent 37b7c51 commit 1f3c286

2 files changed

Lines changed: 83 additions & 7 deletions

File tree

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,10 @@
9494
*
9595
* <p>Line 947-958 : Use iterator instead of index-based loop to avoid O(n²) complexity when
9696
* processing LinkedList rows in handleChange method. See FLINK-38846.
97+
*
98+
* <p>Line 106-109, 631, 641-646, 1578-1583 : Add REPLACE INTO filtering in handleQueryEvent to
99+
* prevent pt-table-checksum STATEMENT-format DML from being parsed as DDL. Sync from Debezium PR
100+
* #7004 (DBZ-9428).
97101
*/
98102
public class MySqlStreamingChangeEventSource
99103
implements StreamingChangeEventSource<MySqlPartition, MySqlOffsetContext> {
@@ -103,6 +107,11 @@ public class MySqlStreamingChangeEventSource
103107

104108
private static final String KEEPALIVE_THREAD_NAME = "blc-keepalive";
105109

110+
private static final String DML_INSERT_PREFIX = "INSERT ";
111+
private static final String DML_UPDATE_PREFIX = "UPDATE ";
112+
private static final String DML_DELETE_PREFIX = "DELETE ";
113+
private static final String DML_REPLACE_PREFIX = "REPLACE ";
114+
106115
private final EnumMap<EventType, BlockingConsumer<Event>> eventHandlers =
107116
new EnumMap<>(EventType.class);
108117
private final BinaryLogClient client;
@@ -623,7 +632,7 @@ protected void handleQueryEvent(
623632
return;
624633
}
625634

626-
String upperCasedStatementBegin = Strings.getBegin(sql, 7).toUpperCase();
635+
String upperCasedStatementBegin = Strings.getBegin(sql, 8).toUpperCase();
627636

628637
if (upperCasedStatementBegin.startsWith("XA ")) {
629638
// This is an XA transaction, and we currently ignore these and do nothing ...
@@ -633,13 +642,10 @@ protected void handleQueryEvent(
633642
LOGGER.debug("DDL '{}' was filtered out of processing", sql);
634643
return;
635644
}
636-
if (upperCasedStatementBegin.equals("INSERT ")
637-
|| upperCasedStatementBegin.equals("UPDATE ")
638-
|| upperCasedStatementBegin.equals("DELETE ")) {
645+
if (isDmlStatement(upperCasedStatementBegin)) {
639646
LOGGER.warn(
640-
"Received DML '"
641-
+ sql
642-
+ "' for processing, binlog probably contains events generated with statement or mixed based replication format");
647+
"Received DML '{}' for processing, binlog probably contains events generated with statement or mixed based replication format",
648+
sql);
643649
return;
644650
}
645651
if (sql.equalsIgnoreCase("ROLLBACK")) {
@@ -1573,6 +1579,13 @@ public void onEventDeserializationFailure(BinaryLogClient client, Exception ex)
15731579
}
15741580
}
15751581

1582+
private static boolean isDmlStatement(String upperCasedStatementBegin) {
1583+
return upperCasedStatementBegin.startsWith(DML_INSERT_PREFIX)
1584+
|| upperCasedStatementBegin.startsWith(DML_UPDATE_PREFIX)
1585+
|| upperCasedStatementBegin.startsWith(DML_DELETE_PREFIX)
1586+
|| upperCasedStatementBegin.startsWith(DML_REPLACE_PREFIX);
1587+
}
1588+
15761589
@FunctionalInterface
15771590
private interface TableIdProvider<E extends EventData> {
15781591
TableId getTableId(E data);

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ public void after() throws Exception {
143143
binaryLogClient.disconnect();
144144
}
145145
customerDatabase.dropDatabase();
146+
customerDatabaseNoGtid.dropDatabase();
146147
}
147148

148149
@Test
@@ -526,6 +527,68 @@ void testReadBinlogWithoutGtidFromLatestOffset() throws Exception {
526527
reader.close();
527528
}
528529

530+
@Test
531+
void testDmlStatementFilteringWithStatementBinlogFormat() throws Exception {
532+
customerDatabaseNoGtid.createAndInitialize();
533+
MySqlSourceConfig sourceConfig =
534+
getConfig(
535+
MYSQL_CONTAINER_NOGTID,
536+
customerDatabaseNoGtid,
537+
StartupOptions.latest(),
538+
new String[] {"customers"});
539+
binaryLogClient = DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration());
540+
mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig);
541+
542+
// Create reader and submit splits
543+
MySqlBinlogSplit split = createBinlogSplit(sourceConfig);
544+
BinlogSplitReader reader = createBinlogReader(sourceConfig);
545+
reader.submitSplit(split);
546+
547+
// Simulate pt-table-checksum: use an independent JDBC connection to set session-level
548+
// binlog_format to STATEMENT, then execute DML statements that will appear as QueryEvents
549+
// in binlog. These should be filtered by isDmlStatement() in handleQueryEvent().
550+
String qualifiedTableName = customerDatabaseNoGtid.qualifiedTableName("customers");
551+
try (Connection conn = customerDatabaseNoGtid.getJdbcConnection();
552+
Statement stmt = conn.createStatement()) {
553+
stmt.execute("SET SESSION binlog_format = 'STATEMENT'");
554+
// REPLACE INTO - the key DML that was missing from the filter before DBZ-9428
555+
stmt.execute(
556+
"REPLACE INTO "
557+
+ qualifiedTableName
558+
+ " VALUES(103, 'user_3', 'Shanghai', '123567891234')");
559+
// Other DML statements in STATEMENT mode
560+
stmt.execute(
561+
"INSERT INTO "
562+
+ qualifiedTableName
563+
+ " VALUES(9999, 'pt_checksum_user', 'TestCity', '000000000000')");
564+
stmt.execute(
565+
"UPDATE " + qualifiedTableName + " SET address = 'PtCity' WHERE id = 9999");
566+
stmt.execute("DELETE FROM " + qualifiedTableName + " WHERE id = 9999");
567+
// Restore ROW mode for subsequent normal DML
568+
stmt.execute("SET SESSION binlog_format = 'ROW'");
569+
}
570+
571+
// Execute a normal ROW-mode DML to verify the reader is still healthy
572+
mySqlConnection.execute(
573+
"INSERT INTO "
574+
+ qualifiedTableName
575+
+ " VALUES(2001, 'user_22', 'Shanghai', '123567891234')");
576+
mySqlConnection.commit();
577+
578+
final DataType dataType =
579+
DataTypes.ROW(
580+
DataTypes.FIELD("id", DataTypes.BIGINT()),
581+
DataTypes.FIELD("name", DataTypes.STRING()),
582+
DataTypes.FIELD("address", DataTypes.STRING()),
583+
DataTypes.FIELD("phone_number", DataTypes.STRING()));
584+
// Only the ROW-mode INSERT should be captured; STATEMENT-mode DMLs are filtered out
585+
String[] expected = new String[] {"+I[2001, user_22, Shanghai, 123567891234]"};
586+
List<String> actual = readBinlogSplits(dataType, reader, expected.length);
587+
assertEqualsInOrder(Arrays.asList(expected), actual);
588+
589+
reader.close();
590+
}
591+
529592
@Test
530593
void testReadBinlogFromEarliestOffset() throws Exception {
531594
customerDatabase.createAndInitialize();

0 commit comments

Comments
 (0)