Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1106,23 +1106,30 @@ public void execute(
(event) -> handleRowsQuery(effectiveOffsetContext, event));
}

BinaryLogClient.EventListener listener;
BinaryLogClient.EventListener eventListener;
if (connectorConfig.bufferSizeForStreamingChangeEventSource() == 0) {
listener = (event) -> handleEvent(partition, effectiveOffsetContext, event);
eventListener = (event) -> handleEvent(partition, effectiveOffsetContext, event);
} else {
EventBuffer buffer =
new EventBuffer(
connectorConfig.bufferSizeForStreamingChangeEventSource(),
this,
context);
listener = (event) -> buffer.add(partition, effectiveOffsetContext, event);
eventListener = (event) -> buffer.add(partition, effectiveOffsetContext, event);
}
client.registerEventListener(listener);

client.registerLifecycleListener(new ReaderThreadLifecycleListener(effectiveOffsetContext));
client.registerEventListener((event) -> onEvent(effectiveOffsetContext, event));
if (LOGGER.isDebugEnabled()) {
client.registerEventListener((event) -> logEvent(effectiveOffsetContext, event));
ReaderThreadLifecycleListener lifecycleListener =
new ReaderThreadLifecycleListener(effectiveOffsetContext);
BinaryLogClient.EventListener metricsEventListener =
(event) -> onEvent(effectiveOffsetContext, event);
BinaryLogClient.EventListener logEventListener =
LOGGER.isDebugEnabled() ? (event) -> logEvent(effectiveOffsetContext, event) : null;

client.registerEventListener(eventListener);
client.registerLifecycleListener(lifecycleListener);
client.registerEventListener(metricsEventListener);
if (logEventListener != null) {
client.registerEventListener(logEventListener);
}

final boolean isGtidModeEnabled = connection.isGtidModeEnabled();
Expand Down Expand Up @@ -1179,6 +1186,7 @@ public void execute(
// Only when we reach the first BEGIN event will we start to skip events ...
skipEvent = false;

Throwable executionError = null;
Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If an exception occurs in the main try and any unregister* call throws in finally, the cleanup exception will replace the original failure, making the root cause harder to diagnose. Consider using the executionError variable to preserve the primary exception: record the original throwable, then wrap unregister* in a try/catch and attach cleanup failures via addSuppressed (or only throw cleanup failures when there was no primary error).

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Useless, deleted.

try {
// Start the log reader, which starts background threads ...
if (context.isRunning()) {
Expand Down Expand Up @@ -1264,6 +1272,13 @@ public void execute(
} catch (Exception e) {
LOGGER.info("Exception while stopping binary log client", e);
}

client.unregisterEventListener(eventListener);
client.unregisterEventListener(metricsEventListener);
client.unregisterLifecycleListener(lifecycleListener);
if (logEventListener != null) {
client.unregisterEventListener(logEventListener);
Comment on lines +1272 to +1276
Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If an exception occurs in the main try and any unregister* call throws in finally, the cleanup exception will replace the original failure, making the root cause harder to diagnose. Consider using the executionError variable to preserve the primary exception: record the original throwable, then wrap unregister* in a try/catch and attach cleanup failures via addSuppressed (or only throw cleanup failures when there was no primary error).

Suggested change
client.unregisterEventListener(eventListener);
client.unregisterEventListener(metricsEventListener);
client.unregisterLifecycleListener(lifecycleListener);
if (logEventListener != null) {
client.unregisterEventListener(logEventListener);
try {
client.unregisterEventListener(eventListener);
} catch (Exception e) {
LOGGER.info("Exception while unregistering event listener", e);
}
try {
client.unregisterEventListener(metricsEventListener);
} catch (Exception e) {
LOGGER.info("Exception while unregistering metrics event listener", e);
}
try {
client.unregisterLifecycleListener(lifecycleListener);
} catch (Exception e) {
LOGGER.info("Exception while unregistering lifecycle listener", e);
}
if (logEventListener != null) {
try {
client.unregisterEventListener(logEventListener);
} catch (Exception e) {
LOGGER.info("Exception while unregistering log event listener", e);
}

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review. I agree that if unregister*() throws in finally , it can mask the original exception and make the root cause harder to diagnose.

After reconsideration, I moved the listener unregistration to the end of the normal execution path instead of the finally block. The reason is that the problematic case we want to avoid is cross-split reuse when the execution finishes normally; if an exception happens and we exit early, the task will fail and the BinaryLogClient will be recreated on recovery, so the listener accumulation issue should not be hit in that path.

}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,21 @@ void testMultipleSplitsWithBackfill() throws Exception {
"UPDATE " + tableId + " SET address = 'Beijing' WHERE id = 103");
mySqlConnection.commit();
} else if (split.splitId().equals(tableId + ":1")) {
// To verify that FLINK-39315 is fixed, generate sufficient binlog events,
// so that the MySqlBinlogSplitReadTask runs long enough to exercise the
// context-running checks in binlog reading backfill phase.
for (int i = 0; i < 1000; i++) {
mySqlConnection.execute(
"UPDATE "
+ tableId
+ " SET address = 'Beijing' WHERE id = 106");
mySqlConnection.commit();
mySqlConnection.execute(
"UPDATE "
+ tableId
+ " SET address = 'Shanghai' WHERE id = 106");
mySqlConnection.commit();
}
Comment on lines +589 to +603
Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This adds 2000 updates with 2000 commits, which can significantly slow down the unit test and increase flakiness on slower CI environments. Consider batching commits (e.g., commit every N updates or run the loop in a single transaction), reducing iterations to the minimum that still reproduces the issue, or deriving the iteration count from a constant/system property so CI can tune it if needed.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

@chengcongchina chengcongchina Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reduced the loop to 100 loops, which is still sufficient to reproduce the issue when the fix is commented out. This should make the unit test significantly faster and less flaky on CI.

mySqlConnection.execute(
"UPDATE " + tableId + " SET address = 'Beijing' WHERE id = 106");
mySqlConnection.commit();
Expand Down
Loading