Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,18 +87,20 @@
* Copied from Debezium project(1.9.8.Final) to fix
* https://github.com/ververica/flink-cdc-connectors/issues/1944.
*
* <p>Line 1432-1443 : Adjust GTID merging logic to support recovering from job which previously
* <p>Line 1449-1461 : Adjust GTID merging logic to support recovering from job which previously
* specifying starting offset on start. Uses {@link GtidUtils#fixOldChannelsGtidSet} for shared
* EARLIEST/LATEST logic.
*
* <p>Line 1444-1452 : Fix LATEST mode GTID merging to avoid replaying pre-checkpoint transactions
* <p>Line 1463-1469 : Fix LATEST mode GTID merging to avoid replaying pre-checkpoint transactions
* when checkpoint GTID has non-contiguous ranges. Delegates to {@link
* GtidUtils#computeLatestModeGtidSet}. See FLINK-39149.
*
* <p>Line 1490 : Add more error details for some exceptions.
* <p>Line 1507 : Add more error details for some exceptions.
*
* <p>Line 951-963 : Use iterator instead of index-based loop to avoid O(n²) complexity when
* <p>Line 954-966 : Use iterator instead of index-based loop to avoid O(n²) complexity when
* processing LinkedList rows in handleChange method. See FLINK-38846.
*
* <p>Line 1271-1277 : Unregister listeners to avoid client reuse interference. See FLINK-39315.
*/
public class MySqlStreamingChangeEventSource
implements StreamingChangeEventSource<MySqlPartition, MySqlOffsetContext> {
Expand Down Expand Up @@ -1106,23 +1108,30 @@ public void execute(
(event) -> handleRowsQuery(effectiveOffsetContext, event));
}

BinaryLogClient.EventListener listener;
BinaryLogClient.EventListener eventListener;
if (connectorConfig.bufferSizeForStreamingChangeEventSource() == 0) {
listener = (event) -> handleEvent(partition, effectiveOffsetContext, event);
eventListener = (event) -> handleEvent(partition, effectiveOffsetContext, event);
} else {
EventBuffer buffer =
new EventBuffer(
connectorConfig.bufferSizeForStreamingChangeEventSource(),
this,
context);
listener = (event) -> buffer.add(partition, effectiveOffsetContext, event);
eventListener = (event) -> buffer.add(partition, effectiveOffsetContext, event);
}
client.registerEventListener(listener);

client.registerLifecycleListener(new ReaderThreadLifecycleListener(effectiveOffsetContext));
client.registerEventListener((event) -> onEvent(effectiveOffsetContext, event));
if (LOGGER.isDebugEnabled()) {
client.registerEventListener((event) -> logEvent(effectiveOffsetContext, event));
ReaderThreadLifecycleListener lifecycleListener =
new ReaderThreadLifecycleListener(effectiveOffsetContext);
BinaryLogClient.EventListener metricsEventListener =
(event) -> onEvent(effectiveOffsetContext, event);
BinaryLogClient.EventListener logEventListener =
LOGGER.isDebugEnabled() ? (event) -> logEvent(effectiveOffsetContext, event) : null;

client.registerEventListener(eventListener);
client.registerLifecycleListener(lifecycleListener);
client.registerEventListener(metricsEventListener);
if (logEventListener != null) {
client.registerEventListener(logEventListener);
}

final boolean isGtidModeEnabled = connection.isGtidModeEnabled();
Expand Down Expand Up @@ -1258,6 +1267,14 @@ public void execute(
while (context.isRunning()) {
Thread.sleep(100);
}

// Unregister listeners to avoid client reuse interference (FLINK-39315)
client.unregisterEventListener(eventListener);
client.unregisterEventListener(metricsEventListener);
client.unregisterLifecycleListener(lifecycleListener);
if (logEventListener != null) {
client.unregisterEventListener(logEventListener);
Comment on lines +1272 to +1276
Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If an exception occurs in the main try and any unregister* call throws in finally, the cleanup exception will replace the original failure, making the root cause harder to diagnose. Consider using the executionError variable to preserve the primary exception: record the original throwable, then wrap unregister* in a try/catch and attach cleanup failures via addSuppressed (or only throw cleanup failures when there was no primary error).

Suggested change
client.unregisterEventListener(eventListener);
client.unregisterEventListener(metricsEventListener);
client.unregisterLifecycleListener(lifecycleListener);
if (logEventListener != null) {
client.unregisterEventListener(logEventListener);
try {
client.unregisterEventListener(eventListener);
} catch (Exception e) {
LOGGER.info("Exception while unregistering event listener", e);
}
try {
client.unregisterEventListener(metricsEventListener);
} catch (Exception e) {
LOGGER.info("Exception while unregistering metrics event listener", e);
}
try {
client.unregisterLifecycleListener(lifecycleListener);
} catch (Exception e) {
LOGGER.info("Exception while unregistering lifecycle listener", e);
}
if (logEventListener != null) {
try {
client.unregisterEventListener(logEventListener);
} catch (Exception e) {
LOGGER.info("Exception while unregistering log event listener", e);
}

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review. I agree that if unregister*() throws in finally , it can mask the original exception and make the root cause harder to diagnose.

After reconsideration, I moved the listener unregistration to the end of the normal execution path instead of the finally block. The reason is that the problematic case we want to avoid is cross-split reuse when the execution finishes normally; if an exception happens and we exit early, the task will fail and the BinaryLogClient will be recreated on recovery, so the listener accumulation issue should not be hit in that path.

}
} finally {
try {
client.disconnect();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,21 @@ void testMultipleSplitsWithBackfill() throws Exception {
"UPDATE " + tableId + " SET address = 'Beijing' WHERE id = 103");
mySqlConnection.commit();
} else if (split.splitId().equals(tableId + ":1")) {
// To verify that FLINK-39315 is fixed, generate sufficient binlog events,
// so that the MySqlBinlogSplitReadTask runs long enough to exercise the
// context-running checks in binlog reading backfill phase.
for (int i = 0; i < 100; i++) {
mySqlConnection.execute(
"UPDATE "
+ tableId
+ " SET address = 'Beijing' WHERE id = 106");
mySqlConnection.commit();
mySqlConnection.execute(
"UPDATE "
+ tableId
+ " SET address = 'Shanghai' WHERE id = 106");
mySqlConnection.commit();
}
Comment on lines +589 to +603
Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This adds 2000 updates with 2000 commits, which can significantly slow down the unit test and increase flakiness on slower CI environments. Consider batching commits (e.g., commit every N updates or run the loop in a single transaction), reducing iterations to the minimum that still reproduces the issue, or deriving the iteration count from a constant/system property so CI can tune it if needed.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

@chengcongchina chengcongchina Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reduced the loop to 100 loops, which is still sufficient to reproduce the issue when the fix is commented out. This should make the unit test significantly faster and less flaky on CI.

mySqlConnection.execute(
"UPDATE " + tableId + " SET address = 'Beijing' WHERE id = 106");
mySqlConnection.commit();
Expand Down