From 3f5d6e991d85d1aef51768f3072ac9c6bb5cced2 Mon Sep 17 00:00:00 2001 From: SunXiaoWei <1206128610@qq.com> Date: Fri, 19 Dec 2025 14:26:15 +0800 Subject: [PATCH] [cdc-connector-mysql] Supports MySQL 8.4+ binary log status query command --- .../connector/mysql/MySqlConnection.java | 3 +- .../mysql/debezium/DebeziumUtils.java | 46 ++++++++++++++++++- 2 files changed, 47 insertions(+), 2 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlConnection.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlConnection.java index 801e27d6b89..df3376d8d35 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlConnection.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlConnection.java @@ -23,6 +23,7 @@ import io.debezium.relational.history.DatabaseHistory; import io.debezium.schema.DatabaseSchema; import io.debezium.util.Strings; +import org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -275,7 +276,7 @@ public boolean isGtidModeEnabled() { public String knownGtidSet() { try { return queryAndMap( - "SHOW MASTER STATUS", + DebeziumUtils.getBinlogStatusCommand(this), rs -> { if (rs.next() && rs.getMetaData().getColumnCount() > 4) { return rs.getString( diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java index 4e512a81c67..8670865f09d 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java @@ -52,6 +52,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Arrays; import java.util.Properties; import java.util.concurrent.ArrayBlockingQueue; import java.util.function.Predicate; @@ -59,6 +60,9 @@ /** Utilities related to Debezium. */ public class DebeziumUtils { private static final String QUOTED_CHARACTER = "`"; + private static final String SQL_SELECT_VERSION = "SELECT VERSION()"; + private static final String SQL_SHOW_MASTER_STATUS = "SHOW MASTER STATUS"; + private static final String SQL_SHOW_BINARY_LOG_STATUS = "SHOW BINARY LOG STATUS"; private static final Logger LOG = LoggerFactory.getLogger(DebeziumUtils.class); @@ -118,7 +122,7 @@ public static MySqlDatabaseSchema createMySqlDatabaseSchema( /** Fetch current binlog offsets in MySql Server. */ public static BinlogOffset currentBinlogOffset(JdbcConnection jdbc) { - final String showMasterStmt = "SHOW MASTER STATUS"; + final String showMasterStmt = getBinlogStatusCommand(jdbc); try { return jdbc.queryAndMap( showMasterStmt, @@ -336,4 +340,44 @@ private static long getBinlogTimestamp(BinaryLogClient client, String binlogFile } return binlogTimestamps.take(); } + + /** + * Get the appropriate SHOW BINLOG STATUS command based on MySQL version. + * MySQL 8.4+ uses SHOW BINARY LOG STATUS, earlier versions use SHOW MASTER STATUS. + * + * @param jdbc the JDBC connection + * @return the appropriate command string + */ + public static String getBinlogStatusCommand(JdbcConnection jdbc) { + try { + String version = jdbc.queryAndMap( + SQL_SELECT_VERSION, + rs -> rs.next() ? rs.getString(1) : ""); + + // Parse version numbers (major and minor) + Integer[] versionNumbers = Arrays.stream(version.split("\\.")) + .limit(2) + .map(s -> { + try { + // Handle version strings like "8.4.0-1.el8" + String numStr = s.split("-")[0]; + return Integer.parseInt(numStr); + } catch (NumberFormatException e) { + // If version number is not numeric, treat as 0 (fallback to old command) + LOG.warn("Failed to parse MySQL version component '{}', treating as 0", s); + return 0; + } + }) + .toArray(Integer[]::new); + + // MySQL 8.4+ removed SHOW MASTER STATUS, use SHOW BINARY LOG STATUS instead + boolean useBinaryLogStatus = (versionNumbers[0] == 8 && versionNumbers[1] >= 4) + || versionNumbers[0] > 8; + + return useBinaryLogStatus ? SQL_SHOW_BINARY_LOG_STATUS : SQL_SHOW_MASTER_STATUS; + } catch (Exception e) { + LOG.warn("Unexpected error while determining MySQL version, using SHOW MASTER STATUS", e); + return SQL_SHOW_MASTER_STATUS; + } + } }