diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java index 557a149a2d0..eb2e7229ab0 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java @@ -87,18 +87,20 @@ * Copied from Debezium project(1.9.8.Final) to fix * https://github.com/ververica/flink-cdc-connectors/issues/1944. * - *

Line 1432-1443 : Adjust GTID merging logic to support recovering from job which previously + *

Line 1449-1461 : Adjust GTID merging logic to support recovering from job which previously * specifying starting offset on start. Uses {@link GtidUtils#fixOldChannelsGtidSet} for shared * EARLIEST/LATEST logic. * - *

Line 1444-1452 : Fix LATEST mode GTID merging to avoid replaying pre-checkpoint transactions + *

Line 1463-1469 : Fix LATEST mode GTID merging to avoid replaying pre-checkpoint transactions * when checkpoint GTID has non-contiguous ranges. Delegates to {@link * GtidUtils#computeLatestModeGtidSet}. See FLINK-39149. * - *

Line 1490 : Add more error details for some exceptions. + *

Line 1507 : Add more error details for some exceptions. * - *

Line 951-963 : Use iterator instead of index-based loop to avoid O(n²) complexity when + *

Line 954-966 : Use iterator instead of index-based loop to avoid O(n²) complexity when * processing LinkedList rows in handleChange method. See FLINK-38846. + * + *

Line 1271-1277 : Unregister listeners to avoid client reuse interference. See FLINK-39315. */ public class MySqlStreamingChangeEventSource implements StreamingChangeEventSource { @@ -1106,23 +1108,30 @@ public void execute( (event) -> handleRowsQuery(effectiveOffsetContext, event)); } - BinaryLogClient.EventListener listener; + BinaryLogClient.EventListener eventListener; if (connectorConfig.bufferSizeForStreamingChangeEventSource() == 0) { - listener = (event) -> handleEvent(partition, effectiveOffsetContext, event); + eventListener = (event) -> handleEvent(partition, effectiveOffsetContext, event); } else { EventBuffer buffer = new EventBuffer( connectorConfig.bufferSizeForStreamingChangeEventSource(), this, context); - listener = (event) -> buffer.add(partition, effectiveOffsetContext, event); + eventListener = (event) -> buffer.add(partition, effectiveOffsetContext, event); } - client.registerEventListener(listener); - client.registerLifecycleListener(new ReaderThreadLifecycleListener(effectiveOffsetContext)); - client.registerEventListener((event) -> onEvent(effectiveOffsetContext, event)); - if (LOGGER.isDebugEnabled()) { - client.registerEventListener((event) -> logEvent(effectiveOffsetContext, event)); + ReaderThreadLifecycleListener lifecycleListener = + new ReaderThreadLifecycleListener(effectiveOffsetContext); + BinaryLogClient.EventListener metricsEventListener = + (event) -> onEvent(effectiveOffsetContext, event); + BinaryLogClient.EventListener logEventListener = + LOGGER.isDebugEnabled() ? (event) -> logEvent(effectiveOffsetContext, event) : null; + + client.registerEventListener(eventListener); + client.registerLifecycleListener(lifecycleListener); + client.registerEventListener(metricsEventListener); + if (logEventListener != null) { + client.registerEventListener(logEventListener); } final boolean isGtidModeEnabled = connection.isGtidModeEnabled(); @@ -1258,6 +1267,14 @@ public void execute( while (context.isRunning()) { Thread.sleep(100); } + + // Unregister listeners to avoid client reuse interference (FLINK-39315) + client.unregisterEventListener(eventListener); + client.unregisterEventListener(metricsEventListener); + client.unregisterLifecycleListener(lifecycleListener); + if (logEventListener != null) { + client.unregisterEventListener(logEventListener); + } } finally { try { client.disconnect(); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java index 6598ed12674..b41f6a077c4 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java @@ -586,6 +586,21 @@ void testMultipleSplitsWithBackfill() throws Exception { "UPDATE " + tableId + " SET address = 'Beijing' WHERE id = 103"); mySqlConnection.commit(); } else if (split.splitId().equals(tableId + ":1")) { + // To verify that FLINK-39315 is fixed, generate sufficient binlog events, + // so that the MySqlBinlogSplitReadTask runs long enough to exercise the + // context-running checks in binlog reading backfill phase. + for (int i = 0; i < 100; i++) { + mySqlConnection.execute( + "UPDATE " + + tableId + + " SET address = 'Beijing' WHERE id = 106"); + mySqlConnection.commit(); + mySqlConnection.execute( + "UPDATE " + + tableId + + " SET address = 'Shanghai' WHERE id = 106"); + mySqlConnection.commit(); + } mySqlConnection.execute( "UPDATE " + tableId + " SET address = 'Beijing' WHERE id = 106"); mySqlConnection.commit();