diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java
index ceecca942a9..b195d463346 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java
@@ -94,6 +94,10 @@
*
*
Line 947-958 : Use iterator instead of index-based loop to avoid O(n²) complexity when
* processing LinkedList rows in handleChange method. See FLINK-38846.
+ *
+ *
Line 106-109, 631, 641-646, 1578-1583 : Add REPLACE INTO filtering in handleQueryEvent to
+ * prevent pt-table-checksum STATEMENT-format DML from being parsed as DDL. Sync from Debezium PR
+ * #7004 (DBZ-9428).
*/
public class MySqlStreamingChangeEventSource
implements StreamingChangeEventSource {
@@ -103,6 +107,11 @@ public class MySqlStreamingChangeEventSource
private static final String KEEPALIVE_THREAD_NAME = "blc-keepalive";
+ private static final String DML_INSERT_PREFIX = "INSERT ";
+ private static final String DML_UPDATE_PREFIX = "UPDATE ";
+ private static final String DML_DELETE_PREFIX = "DELETE ";
+ private static final String DML_REPLACE_PREFIX = "REPLACE ";
+
private final EnumMap> eventHandlers =
new EnumMap<>(EventType.class);
private final BinaryLogClient client;
@@ -623,7 +632,7 @@ protected void handleQueryEvent(
return;
}
- String upperCasedStatementBegin = Strings.getBegin(sql, 7).toUpperCase();
+ String upperCasedStatementBegin = Strings.getBegin(sql, 8).toUpperCase();
if (upperCasedStatementBegin.startsWith("XA ")) {
// This is an XA transaction, and we currently ignore these and do nothing ...
@@ -633,13 +642,10 @@ protected void handleQueryEvent(
LOGGER.debug("DDL '{}' was filtered out of processing", sql);
return;
}
- if (upperCasedStatementBegin.equals("INSERT ")
- || upperCasedStatementBegin.equals("UPDATE ")
- || upperCasedStatementBegin.equals("DELETE ")) {
+ if (isDmlStatement(upperCasedStatementBegin)) {
LOGGER.warn(
- "Received DML '"
- + sql
- + "' for processing, binlog probably contains events generated with statement or mixed based replication format");
+ "Received DML '{}' for processing, binlog probably contains events generated with statement or mixed based replication format",
+ sql);
return;
}
if (sql.equalsIgnoreCase("ROLLBACK")) {
@@ -1573,6 +1579,13 @@ public void onEventDeserializationFailure(BinaryLogClient client, Exception ex)
}
}
+ private static boolean isDmlStatement(String upperCasedStatementBegin) {
+ return upperCasedStatementBegin.startsWith(DML_INSERT_PREFIX)
+ || upperCasedStatementBegin.startsWith(DML_UPDATE_PREFIX)
+ || upperCasedStatementBegin.startsWith(DML_DELETE_PREFIX)
+ || upperCasedStatementBegin.startsWith(DML_REPLACE_PREFIX);
+ }
+
@FunctionalInterface
private interface TableIdProvider {
TableId getTableId(E data);
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java
index e7fd2c4cbb9..63b8b7ad8c8 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java
@@ -143,6 +143,7 @@ public void after() throws Exception {
binaryLogClient.disconnect();
}
customerDatabase.dropDatabase();
+ customerDatabaseNoGtid.dropDatabase();
}
@Test
@@ -526,6 +527,68 @@ void testReadBinlogWithoutGtidFromLatestOffset() throws Exception {
reader.close();
}
+ @Test
+ void testDmlStatementFilteringWithStatementBinlogFormat() throws Exception {
+ customerDatabaseNoGtid.createAndInitialize();
+ MySqlSourceConfig sourceConfig =
+ getConfig(
+ MYSQL_CONTAINER_NOGTID,
+ customerDatabaseNoGtid,
+ StartupOptions.latest(),
+ new String[] {"customers"});
+ binaryLogClient = DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration());
+ mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig);
+
+ // Create reader and submit splits
+ MySqlBinlogSplit split = createBinlogSplit(sourceConfig);
+ BinlogSplitReader reader = createBinlogReader(sourceConfig);
+ reader.submitSplit(split);
+
+ // Simulate pt-table-checksum: use an independent JDBC connection to set session-level
+ // binlog_format to STATEMENT, then execute DML statements that will appear as QueryEvents
+ // in binlog. These should be filtered by isDmlStatement() in handleQueryEvent().
+ String qualifiedTableName = customerDatabaseNoGtid.qualifiedTableName("customers");
+ try (Connection conn = customerDatabaseNoGtid.getJdbcConnection();
+ Statement stmt = conn.createStatement()) {
+ stmt.execute("SET SESSION binlog_format = 'STATEMENT'");
+ // REPLACE INTO - the key DML that was missing from the filter before DBZ-9428
+ stmt.execute(
+ "REPLACE INTO "
+ + qualifiedTableName
+ + " VALUES(103, 'user_3', 'Shanghai', '123567891234')");
+ // Other DML statements in STATEMENT mode
+ stmt.execute(
+ "INSERT INTO "
+ + qualifiedTableName
+ + " VALUES(9999, 'pt_checksum_user', 'TestCity', '000000000000')");
+ stmt.execute(
+ "UPDATE " + qualifiedTableName + " SET address = 'PtCity' WHERE id = 9999");
+ stmt.execute("DELETE FROM " + qualifiedTableName + " WHERE id = 9999");
+ // Restore ROW mode for subsequent normal DML
+ stmt.execute("SET SESSION binlog_format = 'ROW'");
+ }
+
+ // Execute a normal ROW-mode DML to verify the reader is still healthy
+ mySqlConnection.execute(
+ "INSERT INTO "
+ + qualifiedTableName
+ + " VALUES(2001, 'user_22', 'Shanghai', '123567891234')");
+ mySqlConnection.commit();
+
+ final DataType dataType =
+ DataTypes.ROW(
+ DataTypes.FIELD("id", DataTypes.BIGINT()),
+ DataTypes.FIELD("name", DataTypes.STRING()),
+ DataTypes.FIELD("address", DataTypes.STRING()),
+ DataTypes.FIELD("phone_number", DataTypes.STRING()));
+ // Only the ROW-mode INSERT should be captured; STATEMENT-mode DMLs are filtered out
+ String[] expected = new String[] {"+I[2001, user_22, Shanghai, 123567891234]"};
+ List actual = readBinlogSplits(dataType, reader, expected.length);
+ assertEqualsInOrder(Arrays.asList(expected), actual);
+
+ reader.close();
+ }
+
@Test
void testReadBinlogFromEarliestOffset() throws Exception {
customerDatabase.createAndInitialize();