From b5a7e4d320cb0941aa944c91de2c9095eff89ed1 Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Wed, 25 Feb 2026 11:29:32 +0800 Subject: [PATCH 1/3] [FLINK-39149][mysql-cdc] Implement LATEST mode GTID merging to prevent replaying pre-checkpoint transactions with non-contiguous ranges --- .../debezium/connector/mysql/GtidUtils.java | 42 ++++ .../MySqlStreamingChangeEventSource.java | 18 +- .../connector/mysql/FilterGtidSetTest.java | 193 ++++++++++++++++++ .../connector/mysql/GtidUtilsTest.java | 132 ++++++++++++ .../MySqlStreamingChangeEventSource.java | 18 +- 5 files changed, 395 insertions(+), 8 deletions(-) create mode 100644 flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/io/debezium/connector/mysql/FilterGtidSetTest.java diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/GtidUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/GtidUtils.java index f6c4987e74d..9c05084c0f8 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/GtidUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/GtidUtils.java @@ -23,10 +23,52 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.function.Predicate; /** Utils for handling GTIDs. */ public class GtidUtils { + /** + * Computes the merged GTID set for the LATEST new-channel-position mode. + * + *

For old channels (UUIDs present in checkpoint), non-contiguous GTID ranges are fixed via + * {@link #fixRestoredGtidSet} to avoid MySQL replaying pre-checkpoint transactions. For new + * channels (UUIDs not in checkpoint), the server's full GTID is used to skip all history. + * + * @param availableServerGtidSet the GTID set currently available on the MySQL server + * @param purgedServerGtid the GTID set already purged from the MySQL server + * @param checkpointGtidSet the GTID set restored from checkpoint (after source filter applied) + * @param gtidSourceFilter optional predicate to filter GTID source UUIDs; may be null + * @return the merged GTID set suitable for binlog subscription + */ + public static GtidSet computeLatestModeGtidSet( + GtidSet availableServerGtidSet, + GtidSet purgedServerGtid, + GtidSet checkpointGtidSet, + Predicate gtidSourceFilter) { + final GtidSet relevantAvailableServerGtidSet = + (gtidSourceFilter != null) + ? availableServerGtidSet.retainAll(gtidSourceFilter) + : availableServerGtidSet; + + // Step 1: Fix old channels' GTID ranges + GtidSet fixedOldChannelsGtid = + fixRestoredGtidSet( + mergeGtidSetInto( + relevantAvailableServerGtidSet.retainAll( + uuid -> checkpointGtidSet.forServerWithId(uuid) != null), + purgedServerGtid), + checkpointGtidSet); + + // Step 2: For new channels, use server's full GTID to skip all history + GtidSet newChannelsGtid = + relevantAvailableServerGtidSet.retainAll( + uuid -> checkpointGtidSet.forServerWithId(uuid) == null); + + // Step 3: Merge fixed old channels + new channels + return mergeGtidSetInto(fixedOldChannelsGtid, newChannelsGtid); + } + /** * This method corrects the GTID set that has been restored from a state or checkpoint using the * GTID set fetched from the server via SHOW MASTER STATUS. During the correction process, the diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java index ceecca942a9..c44c34c6e5a 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java @@ -87,12 +87,16 @@ * Copied from Debezium project(1.9.8.Final) to fix * https://github.com/ververica/flink-cdc-connectors/issues/1944. * - *

Line 1427-1433 : Adjust GTID merging logic to support recovering from job which previously + *

Line 1432-1448 : Adjust GTID merging logic to support recovering from job which previously * specifying starting offset on start. * - *

Line 1485 : Add more error details for some exceptions. + *

Line 1449-1457 : Fix LATEST mode GTID merging to avoid replaying pre-checkpoint transactions + * when checkpoint GTID has non-contiguous ranges. Delegates to {@link + * GtidUtils#computeLatestModeGtidSet}. See FLINK-39149. * - *

Line 947-958 : Use iterator instead of index-based loop to avoid O(n²) complexity when + *

Line 1508 : Add more error details for some exceptions. + * + *

Line 951-963 : Use iterator instead of index-based loop to avoid O(n²) complexity when * processing LinkedList rows in handleChange method. See FLINK-38846. */ public class MySqlStreamingChangeEventSource @@ -1443,7 +1447,13 @@ public GtidSet filterGtidSet( purgedServerGtid), filteredGtidSet); } else { - mergedGtidSet = availableServerGtidSet.with(filteredGtidSet); + LOGGER.info("Using latest positions for new GTID channels"); + mergedGtidSet = + GtidUtils.computeLatestModeGtidSet( + availableServerGtidSet, + purgedServerGtid, + filteredGtidSet, + gtidSourceFilter); } LOGGER.info("Final merged GTID set to use when connecting to MySQL: {}", mergedGtidSet); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/io/debezium/connector/mysql/FilterGtidSetTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/io/debezium/connector/mysql/FilterGtidSetTest.java new file mode 100644 index 00000000000..763b8e36799 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/io/debezium/connector/mysql/FilterGtidSetTest.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.debezium.connector.mysql; + +import io.debezium.config.Configuration; +import io.debezium.connector.mysql.MySqlConnectorConfig.GtidNewChannelPosition; +import io.debezium.jdbc.JdbcConfiguration; +import org.junit.jupiter.api.Test; + +import java.lang.reflect.Field; +import java.util.HashMap; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Integration test for {@link MySqlStreamingChangeEventSource#filterGtidSet} to ensure the LATEST + * mode fix (FLINK-39149) cannot regress. This test directly invokes the production method via + * reflection, bypassing the heavy constructor dependencies. + */ +class FilterGtidSetTest { + + /** + * Verifies that filterGtidSet() in LATEST mode fixes non-contiguous checkpoint GTIDs for old + * channels and uses server's full GTID for new channels. + * + *

This is the core FLINK-39149 bug scenario: checkpoint has "aaa-111:5000-8000" (gap + * 1-4999), and without the fix, MySQL would replay transactions 1-4999. + */ + @Test + void testFilterGtidSetLatestModeFixesNonContiguousGtid() throws Exception { + MySqlStreamingChangeEventSource source = createSourceWithConfig("latest", null); + + MySqlOffsetContext offsetContext = createOffsetContext(source, "aaa-111:5000-8000"); + GtidSet availableServerGtidSet = new GtidSet("aaa-111:1-10000,bbb-222:1-3000"); + GtidSet purgedServerGtid = new GtidSet(""); + + GtidSet result = + source.filterGtidSet(offsetContext, availableServerGtidSet, purgedServerGtid); + + // Old channel aaa-111: gap should be filled from :1 to 8000 + assertThat(result.toString()).contains("aaa-111:1-8000"); + // New channel bbb-222: should use server's full GTID + assertThat(result.toString()).contains("bbb-222:1-3000"); + } + + /** + * Verifies that filterGtidSet() in LATEST mode with gtidSourceFilter excludes filtered UUIDs. + */ + @Test + void testFilterGtidSetLatestModeWithSourceFilter() throws Exception { + MySqlStreamingChangeEventSource source = createSourceWithConfig("latest", "ccc-333"); + + MySqlOffsetContext offsetContext = + createOffsetContext(source, "aaa-111:5000-8000,bbb-222:1-2000"); + GtidSet availableServerGtidSet = + new GtidSet("aaa-111:1-10000,bbb-222:1-3000,ccc-333:1-5000"); + GtidSet purgedServerGtid = new GtidSet(""); + + GtidSet result = + source.filterGtidSet(offsetContext, availableServerGtidSet, purgedServerGtid); + + assertThat(result.toString()).contains("aaa-111:1-8000"); + assertThat(result.toString()).contains("bbb-222:1-2000"); + // ccc-333 should be excluded + assertThat(result.toString()).doesNotContain("ccc-333"); + } + + /** + * Verifies that filterGtidSet() in EARLIEST mode still works correctly (no regression from + * LATEST mode fix). + */ + @Test + void testFilterGtidSetEarliestModeNotAffected() throws Exception { + MySqlStreamingChangeEventSource source = createSourceWithConfig("earliest", null); + + MySqlOffsetContext offsetContext = createOffsetContext(source, "aaa-111:5000-8000"); + GtidSet availableServerGtidSet = new GtidSet("aaa-111:1-10000,bbb-222:1-3000"); + GtidSet purgedServerGtid = new GtidSet(""); + + GtidSet result = + source.filterGtidSet(offsetContext, availableServerGtidSet, purgedServerGtid); + + // EARLIEST mode: old channel should be fixed + assertThat(result.toString()).contains("aaa-111:1-8000"); + // EARLIEST mode does NOT add new channels (different from LATEST) + assertThat(result.forServerWithId("bbb-222")).isNull(); + } + + /** Verifies that filterGtidSet() returns null when offsetContext has no GTID. */ + @Test + void testFilterGtidSetReturnsNullWhenNoGtid() throws Exception { + MySqlStreamingChangeEventSource source = createSourceWithConfig("latest", null); + + MySqlOffsetContext offsetContext = createOffsetContext(source, null); + GtidSet availableServerGtidSet = new GtidSet("aaa-111:1-10000"); + GtidSet purgedServerGtid = new GtidSet(""); + + GtidSet result = + source.filterGtidSet(offsetContext, availableServerGtidSet, purgedServerGtid); + + assertThat(result).isNull(); + } + + // ---- Helper methods ---- + + /** + * Creates a MySqlStreamingChangeEventSource via Unsafe (bypassing constructor) and injects the + * connectorConfig field via reflection. + */ + @SuppressWarnings("restriction") + private static MySqlStreamingChangeEventSource createSourceWithConfig( + String gtidNewChannelPosition, String gtidSourceExcludes) throws Exception { + // Build Debezium Configuration + JdbcConfiguration.Builder builder = + JdbcConfiguration.create().with("database.server.name", "test_server"); + if (gtidNewChannelPosition != null) { + builder = builder.with("gtid.new.channel.position", gtidNewChannelPosition); + } + if (gtidSourceExcludes != null) { + builder = builder.with("gtid.source.excludes", gtidSourceExcludes); + } + Configuration dezConf = builder.build(); + MySqlConnectorConfig connectorConfig = new MySqlConnectorConfig(dezConf); + + // Verify config is as expected + if ("latest".equalsIgnoreCase(gtidNewChannelPosition)) { + assertThat(connectorConfig.gtidNewChannelPosition()) + .isEqualTo(GtidNewChannelPosition.LATEST); + } else if ("earliest".equalsIgnoreCase(gtidNewChannelPosition)) { + assertThat(connectorConfig.gtidNewChannelPosition()) + .isEqualTo(GtidNewChannelPosition.EARLIEST); + } + + // Create instance via Unsafe to bypass heavy constructor + sun.misc.Unsafe unsafe = getUnsafe(); + MySqlStreamingChangeEventSource source = + (MySqlStreamingChangeEventSource) + unsafe.allocateInstance(MySqlStreamingChangeEventSource.class); + + // Inject connectorConfig via reflection + Field configField = + MySqlStreamingChangeEventSource.class.getDeclaredField("connectorConfig"); + configField.setAccessible(true); + configField.set(source, connectorConfig); + + return source; + } + + private static MySqlOffsetContext createOffsetContext( + MySqlStreamingChangeEventSource source, String gtidSetStr) throws Exception { + Field configField = + MySqlStreamingChangeEventSource.class.getDeclaredField("connectorConfig"); + configField.setAccessible(true); + MySqlConnectorConfig config = (MySqlConnectorConfig) configField.get(source); + + if (gtidSetStr == null) { + // Return an offset context without GTID (gtidSet() returns null) + Map offsetMap = new HashMap<>(); + offsetMap.put("file", "mysql-bin.000001"); + offsetMap.put("pos", 4L); + return new MySqlOffsetContext.Loader(config).load(offsetMap); + } + + Map offsetMap = new HashMap<>(); + offsetMap.put("file", "mysql-bin.000001"); + offsetMap.put("pos", 4L); + offsetMap.put("gtids", gtidSetStr); + return new MySqlOffsetContext.Loader(config).load(offsetMap); + } + + @SuppressWarnings("restriction") + private static sun.misc.Unsafe getUnsafe() throws Exception { + Field unsafeField = sun.misc.Unsafe.class.getDeclaredField("theUnsafe"); + unsafeField.setAccessible(true); + return (sun.misc.Unsafe) unsafeField.get(null); + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/io/debezium/connector/mysql/GtidUtilsTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/io/debezium/connector/mysql/GtidUtilsTest.java index 88d4b8aba44..f0dbe6cd46f 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/io/debezium/connector/mysql/GtidUtilsTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/io/debezium/connector/mysql/GtidUtilsTest.java @@ -22,8 +22,10 @@ import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; +import java.util.function.Predicate; import java.util.stream.Stream; +import static io.debezium.connector.mysql.GtidUtils.computeLatestModeGtidSet; import static io.debezium.connector.mysql.GtidUtils.fixRestoredGtidSet; import static io.debezium.connector.mysql.GtidUtils.mergeGtidSetInto; import static org.assertj.core.api.Assertions.assertThat; @@ -83,6 +85,75 @@ private static Stream gtidSetsProvider() { "A:1-20:30-35:45-50:60-65:75-80")); } + /** Tests {@link GtidUtils#computeLatestModeGtidSet} for FLINK-39149. */ + @ParameterizedTest(name = "{0}") + @MethodSource("latestModeGtidSetsProvider") + void testLatestModeGtidMerge( + String description, + String serverGtidStr, + String checkpointGtidStr, + String expectedMergedStr) { + GtidSet serverGtidSet = new GtidSet(serverGtidStr); + GtidSet checkpointGtidSet = new GtidSet(checkpointGtidStr); + + GtidSet mergedGtidSet = + computeLatestModeGtidSet(serverGtidSet, new GtidSet(""), checkpointGtidSet, null); + + assertThat(mergedGtidSet).hasToString(expectedMergedStr); + + // Verify MySQL would not replay pre-checkpoint transactions + GtidSet transactionsToSend = serverGtidSet.subtract(mergedGtidSet); + for (GtidSet.UUIDSet uuidSet : checkpointGtidSet.getUUIDSets()) { + String uuid = uuidSet.getUUID(); + long earliestCheckpointTx = + uuidSet.getIntervals().stream() + .mapToLong(GtidSet.Interval::getStart) + .min() + .orElse(1); + if (earliestCheckpointTx > 1) { + GtidSet.UUIDSet toSendUuidSet = transactionsToSend.forServerWithId(uuid); + if (toSendUuidSet != null) { + for (GtidSet.Interval interval : toSendUuidSet.getIntervals()) { + assertThat(interval.getStart()) + .as( + "Should not replay pre-checkpoint transactions for UUID %s", + uuid) + .isGreaterThan(earliestCheckpointTx); + } + } + } + } + } + + private static Stream latestModeGtidSetsProvider() { + return Stream.of( + Arguments.of( + "Old channel with non-contiguous GTID, new channel present", + "aaa-111:1-10000,bbb-222:1-3000", + "aaa-111:5000-8000", + "aaa-111:1-8000,bbb-222:1-3000"), + Arguments.of( + "Mixed old channels (contiguous and non-contiguous) with new channel", + "aaa-111:1-10000,bbb-222:1-3000,ccc-333:1-5000", + "aaa-111:5000-8000,bbb-222:1-2000", + "aaa-111:1-8000,bbb-222:1-2000,ccc-333:1-5000"), + Arguments.of( + "All old channels, no new channels", + "aaa-111:1-10000,bbb-222:1-3000", + "aaa-111:1-8000,bbb-222:1-2000", + "aaa-111:1-8000,bbb-222:1-2000"), + Arguments.of( + "Contiguous checkpoint GTID, no regression", + "aaa-111:1-10000,bbb-222:1-3000", + "aaa-111:1-8000", + "aaa-111:1-8000,bbb-222:1-3000"), + Arguments.of( + "Only new channels, checkpoint has unknown UUID", + "aaa-111:1-10000,bbb-222:1-3000", + "xxx-999:1-500", + "aaa-111:1-10000,bbb-222:1-3000,xxx-999:1-500")); + } + @Test void testMergingGtidSets() { GtidSet base = new GtidSet("A:1-100"); @@ -96,4 +167,65 @@ void testMergingGtidSets() { toMerge = new GtidSet("A:1-10,C:1-10"); assertThat(mergeGtidSetInto(base, toMerge)).hasToString("A:1-100,B:1-100,C:1-10"); } + + /** Tests {@link GtidUtils#computeLatestModeGtidSet} with {@code gtidSourceFilter}. */ + @Test + void testLatestModeGtidMergeWithSourceFilter() { + GtidSet availableServerGtidSet = + new GtidSet("aaa-111:1-10000,bbb-222:1-3000,ccc-333:1-5000"); + GtidSet checkpointGtidSet = new GtidSet("aaa-111:5000-8000,bbb-222:1-2000"); + Predicate gtidSourceFilter = uuid -> !uuid.equals("ccc-333"); + + GtidSet mergedGtidSet = + computeLatestModeGtidSet( + availableServerGtidSet, + new GtidSet(""), + checkpointGtidSet, + gtidSourceFilter); + + assertThat(mergedGtidSet.toString()).contains("aaa-111:1-8000"); + assertThat(mergedGtidSet.toString()).contains("bbb-222:1-2000"); + assertThat(mergedGtidSet.toString()).doesNotContain("ccc-333"); + } + + /** Tests {@link GtidUtils#computeLatestModeGtidSet} with purged GTID. */ + @Test + void testLatestModeGtidMergeWithPurgedGtid() { + GtidSet availableServerGtidSet = new GtidSet("aaa-111:50-10000,bbb-222:1-3000"); + GtidSet purgedServerGtid = new GtidSet("aaa-111:1-49"); + GtidSet checkpointGtidSet = new GtidSet("aaa-111:5000-8000"); + + GtidSet mergedGtidSet = + computeLatestModeGtidSet( + availableServerGtidSet, purgedServerGtid, checkpointGtidSet, null); + + assertThat(mergedGtidSet.toString()).contains("aaa-111:50-8000"); + assertThat(mergedGtidSet.toString()).contains("bbb-222:1-3000"); + + // Verify no pre-checkpoint replay + GtidSet transactionsToSend = availableServerGtidSet.subtract(mergedGtidSet); + GtidSet.UUIDSet aaaToSend = transactionsToSend.forServerWithId("aaa-111"); + if (aaaToSend != null) { + for (GtidSet.Interval interval : aaaToSend.getIntervals()) { + assertThat(interval.getStart()) + .as("Should not request pre-checkpoint transactions") + .isGreaterThanOrEqualTo(8001); + } + } + } + + /** Tests {@link GtidUtils#computeLatestModeGtidSet} with a completely purged UUID. */ + @Test + void testLatestModeGtidMergeWithFullyPurgedChannel() { + GtidSet availableServerGtidSet = new GtidSet("bbb-222:1-3000"); + GtidSet purgedServerGtid = new GtidSet("aaa-111:1-500"); + GtidSet checkpointGtidSet = new GtidSet("aaa-111:200-400"); + + GtidSet mergedGtidSet = + computeLatestModeGtidSet( + availableServerGtidSet, purgedServerGtid, checkpointGtidSet, null); + + assertThat(mergedGtidSet.toString()).contains("aaa-111:1-400"); + assertThat(mergedGtidSet.toString()).contains("bbb-222:1-3000"); + } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java index 279d6fb3a3b..ec84202fcb0 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java @@ -89,12 +89,16 @@ *

Copied from Debezium project(1.9.8.Final) to fix * https://github.com/ververica/flink-cdc-connectors/issues/1944. * - *

Line 1427-1433 : Adjust GTID merging logic to support recovering from job which previously + *

Line 1437-1453 : Adjust GTID merging logic to support recovering from job which previously * specifying starting offset on start. * - *

Line 1485 : Add more error details for some exceptions. + *

Line 1454-1462 : Fix LATEST mode GTID merging to avoid replaying pre-checkpoint transactions + * when checkpoint GTID has non-contiguous ranges. Delegates to {@link + * GtidUtils#computeLatestModeGtidSet}. See FLINK-39149. * - *

Line 951-964 : Use iterator instead of index-based loop to avoid O(n²) complexity when + *

Line 1513 : Add more error details for some exceptions. + * + *

Line 956-968 : Use iterator instead of index-based loop to avoid O(n²) complexity when * processing LinkedList rows in handleChange method. See FLINK-38846. */ public class MySqlStreamingChangeEventSource @@ -1448,7 +1452,13 @@ public GtidSet filterGtidSet( purgedServerGtid), filteredGtidSet); } else { - mergedGtidSet = availableServerGtidSet.with(filteredGtidSet); + LOGGER.info("Using latest positions for new GTID channels"); + mergedGtidSet = + GtidUtils.computeLatestModeGtidSet( + availableServerGtidSet, + purgedServerGtid, + filteredGtidSet, + gtidSourceFilter); } LOGGER.info("Final merged GTID set to use when connecting to MySQL: {}", mergedGtidSet); From 5a919c21717b6fb1727e56236d038525d6dec2d0 Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Wed, 25 Feb 2026 11:43:46 +0800 Subject: [PATCH 2/3] [FLINK-39149][mysql-cdc] Implement LATEST mode GTID merging to prevent replaying pre-checkpoint transactions with non-contiguous ranges --- .../flink-connector-mysql-cdc/pom.xml | 11 ++ .../connector/mysql/FilterGtidSetTest.java | 126 ++++-------------- 2 files changed, 40 insertions(+), 97 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/pom.xml b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/pom.xml index 96366a9af91..5d9b2f774bc 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/pom.xml +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/pom.xml @@ -29,6 +29,10 @@ limitations under the License. flink-connector-mysql-cdc jar + + 3.12.4 + + @@ -183,6 +187,13 @@ limitations under the License. test + + org.mockito + mockito-core + ${mockito.version} + test + + org.apache.commons commons-lang3 diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/io/debezium/connector/mysql/FilterGtidSetTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/io/debezium/connector/mysql/FilterGtidSetTest.java index 763b8e36799..d50f2eea05b 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/io/debezium/connector/mysql/FilterGtidSetTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/io/debezium/connector/mysql/FilterGtidSetTest.java @@ -17,57 +17,45 @@ package io.debezium.connector.mysql; -import io.debezium.config.Configuration; import io.debezium.connector.mysql.MySqlConnectorConfig.GtidNewChannelPosition; -import io.debezium.jdbc.JdbcConfiguration; import org.junit.jupiter.api.Test; +import org.mockito.Mockito; import java.lang.reflect.Field; -import java.util.HashMap; -import java.util.Map; +import java.util.function.Predicate; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.when; /** * Integration test for {@link MySqlStreamingChangeEventSource#filterGtidSet} to ensure the LATEST - * mode fix (FLINK-39149) cannot regress. This test directly invokes the production method via - * reflection, bypassing the heavy constructor dependencies. + * mode fix (FLINK-39149) cannot regress. */ class FilterGtidSetTest { - /** - * Verifies that filterGtidSet() in LATEST mode fixes non-contiguous checkpoint GTIDs for old - * channels and uses server's full GTID for new channels. - * - *

This is the core FLINK-39149 bug scenario: checkpoint has "aaa-111:5000-8000" (gap - * 1-4999), and without the fix, MySQL would replay transactions 1-4999. - */ @Test void testFilterGtidSetLatestModeFixesNonContiguousGtid() throws Exception { - MySqlStreamingChangeEventSource source = createSourceWithConfig("latest", null); + MySqlStreamingChangeEventSource source = + createSourceWithConfig(GtidNewChannelPosition.LATEST, null); - MySqlOffsetContext offsetContext = createOffsetContext(source, "aaa-111:5000-8000"); + MySqlOffsetContext offsetContext = createOffsetContext("aaa-111:5000-8000"); GtidSet availableServerGtidSet = new GtidSet("aaa-111:1-10000,bbb-222:1-3000"); GtidSet purgedServerGtid = new GtidSet(""); GtidSet result = source.filterGtidSet(offsetContext, availableServerGtidSet, purgedServerGtid); - // Old channel aaa-111: gap should be filled from :1 to 8000 assertThat(result.toString()).contains("aaa-111:1-8000"); - // New channel bbb-222: should use server's full GTID assertThat(result.toString()).contains("bbb-222:1-3000"); } - /** - * Verifies that filterGtidSet() in LATEST mode with gtidSourceFilter excludes filtered UUIDs. - */ @Test void testFilterGtidSetLatestModeWithSourceFilter() throws Exception { - MySqlStreamingChangeEventSource source = createSourceWithConfig("latest", "ccc-333"); + Predicate excludeCcc = uuid -> !uuid.equals("ccc-333"); + MySqlStreamingChangeEventSource source = + createSourceWithConfig(GtidNewChannelPosition.LATEST, excludeCcc); - MySqlOffsetContext offsetContext = - createOffsetContext(source, "aaa-111:5000-8000,bbb-222:1-2000"); + MySqlOffsetContext offsetContext = createOffsetContext("aaa-111:5000-8000,bbb-222:1-2000"); GtidSet availableServerGtidSet = new GtidSet("aaa-111:1-10000,bbb-222:1-3000,ccc-333:1-5000"); GtidSet purgedServerGtid = new GtidSet(""); @@ -77,37 +65,31 @@ void testFilterGtidSetLatestModeWithSourceFilter() throws Exception { assertThat(result.toString()).contains("aaa-111:1-8000"); assertThat(result.toString()).contains("bbb-222:1-2000"); - // ccc-333 should be excluded assertThat(result.toString()).doesNotContain("ccc-333"); } - /** - * Verifies that filterGtidSet() in EARLIEST mode still works correctly (no regression from - * LATEST mode fix). - */ @Test void testFilterGtidSetEarliestModeNotAffected() throws Exception { - MySqlStreamingChangeEventSource source = createSourceWithConfig("earliest", null); + MySqlStreamingChangeEventSource source = + createSourceWithConfig(GtidNewChannelPosition.EARLIEST, null); - MySqlOffsetContext offsetContext = createOffsetContext(source, "aaa-111:5000-8000"); + MySqlOffsetContext offsetContext = createOffsetContext("aaa-111:5000-8000"); GtidSet availableServerGtidSet = new GtidSet("aaa-111:1-10000,bbb-222:1-3000"); GtidSet purgedServerGtid = new GtidSet(""); GtidSet result = source.filterGtidSet(offsetContext, availableServerGtidSet, purgedServerGtid); - // EARLIEST mode: old channel should be fixed assertThat(result.toString()).contains("aaa-111:1-8000"); - // EARLIEST mode does NOT add new channels (different from LATEST) assertThat(result.forServerWithId("bbb-222")).isNull(); } - /** Verifies that filterGtidSet() returns null when offsetContext has no GTID. */ @Test void testFilterGtidSetReturnsNullWhenNoGtid() throws Exception { - MySqlStreamingChangeEventSource source = createSourceWithConfig("latest", null); + MySqlStreamingChangeEventSource source = + createSourceWithConfig(GtidNewChannelPosition.LATEST, null); - MySqlOffsetContext offsetContext = createOffsetContext(source, null); + MySqlOffsetContext offsetContext = createOffsetContext(null); GtidSet availableServerGtidSet = new GtidSet("aaa-111:1-10000"); GtidSet purgedServerGtid = new GtidSet(""); @@ -117,77 +99,27 @@ void testFilterGtidSetReturnsNullWhenNoGtid() throws Exception { assertThat(result).isNull(); } - // ---- Helper methods ---- - - /** - * Creates a MySqlStreamingChangeEventSource via Unsafe (bypassing constructor) and injects the - * connectorConfig field via reflection. - */ - @SuppressWarnings("restriction") private static MySqlStreamingChangeEventSource createSourceWithConfig( - String gtidNewChannelPosition, String gtidSourceExcludes) throws Exception { - // Build Debezium Configuration - JdbcConfiguration.Builder builder = - JdbcConfiguration.create().with("database.server.name", "test_server"); - if (gtidNewChannelPosition != null) { - builder = builder.with("gtid.new.channel.position", gtidNewChannelPosition); - } - if (gtidSourceExcludes != null) { - builder = builder.with("gtid.source.excludes", gtidSourceExcludes); - } - Configuration dezConf = builder.build(); - MySqlConnectorConfig connectorConfig = new MySqlConnectorConfig(dezConf); - - // Verify config is as expected - if ("latest".equalsIgnoreCase(gtidNewChannelPosition)) { - assertThat(connectorConfig.gtidNewChannelPosition()) - .isEqualTo(GtidNewChannelPosition.LATEST); - } else if ("earliest".equalsIgnoreCase(gtidNewChannelPosition)) { - assertThat(connectorConfig.gtidNewChannelPosition()) - .isEqualTo(GtidNewChannelPosition.EARLIEST); - } - - // Create instance via Unsafe to bypass heavy constructor - sun.misc.Unsafe unsafe = getUnsafe(); + GtidNewChannelPosition channelPosition, Predicate gtidSourceFilter) + throws Exception { + MySqlConnectorConfig mockConfig = Mockito.mock(MySqlConnectorConfig.class); + when(mockConfig.gtidNewChannelPosition()).thenReturn(channelPosition); + when(mockConfig.gtidSourceFilter()).thenReturn(gtidSourceFilter); + MySqlStreamingChangeEventSource source = - (MySqlStreamingChangeEventSource) - unsafe.allocateInstance(MySqlStreamingChangeEventSource.class); + Mockito.mock(MySqlStreamingChangeEventSource.class, Mockito.CALLS_REAL_METHODS); - // Inject connectorConfig via reflection Field configField = MySqlStreamingChangeEventSource.class.getDeclaredField("connectorConfig"); configField.setAccessible(true); - configField.set(source, connectorConfig); + configField.set(source, mockConfig); return source; } - private static MySqlOffsetContext createOffsetContext( - MySqlStreamingChangeEventSource source, String gtidSetStr) throws Exception { - Field configField = - MySqlStreamingChangeEventSource.class.getDeclaredField("connectorConfig"); - configField.setAccessible(true); - MySqlConnectorConfig config = (MySqlConnectorConfig) configField.get(source); - - if (gtidSetStr == null) { - // Return an offset context without GTID (gtidSet() returns null) - Map offsetMap = new HashMap<>(); - offsetMap.put("file", "mysql-bin.000001"); - offsetMap.put("pos", 4L); - return new MySqlOffsetContext.Loader(config).load(offsetMap); - } - - Map offsetMap = new HashMap<>(); - offsetMap.put("file", "mysql-bin.000001"); - offsetMap.put("pos", 4L); - offsetMap.put("gtids", gtidSetStr); - return new MySqlOffsetContext.Loader(config).load(offsetMap); - } - - @SuppressWarnings("restriction") - private static sun.misc.Unsafe getUnsafe() throws Exception { - Field unsafeField = sun.misc.Unsafe.class.getDeclaredField("theUnsafe"); - unsafeField.setAccessible(true); - return (sun.misc.Unsafe) unsafeField.get(null); + private static MySqlOffsetContext createOffsetContext(String gtidSetStr) { + MySqlOffsetContext offsetContext = Mockito.mock(MySqlOffsetContext.class); + when(offsetContext.gtidSet()).thenReturn(gtidSetStr); + return offsetContext; } } From b15f7db86c76f7902ecb967ab5c3b895e9b1378e Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Fri, 13 Mar 2026 14:23:23 +0800 Subject: [PATCH 3/3] update --- .../debezium/connector/mysql/GtidUtils.java | 34 ++++++++++++++----- .../MySqlStreamingChangeEventSource.java | 18 ++++------ .../MySqlStreamingChangeEventSource.java | 18 ++++------ 3 files changed, 40 insertions(+), 30 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/GtidUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/GtidUtils.java index 9c05084c0f8..5956de9b312 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/GtidUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/GtidUtils.java @@ -28,12 +28,34 @@ /** Utils for handling GTIDs. */ public class GtidUtils { + /** + * Fixes old channels' GTID ranges by filling prefix gaps using server GTID intervals. + * + *

This is the shared logic between EARLIEST and LATEST modes. For UUIDs present in the + * checkpoint, non-contiguous GTID ranges are corrected via {@link #fixRestoredGtidSet} to avoid + * MySQL replaying pre-checkpoint transactions. + * + * @param availableServerGtidSet the relevant (filtered) server GTID set + * @param purgedServerGtid the GTID set already purged from the MySQL server + * @param checkpointGtidSet the GTID set restored from checkpoint + * @return the fixed GTID set for old channels + */ + public static GtidSet fixOldChannelsGtidSet( + GtidSet availableServerGtidSet, GtidSet purgedServerGtid, GtidSet checkpointGtidSet) { + return fixRestoredGtidSet( + mergeGtidSetInto( + availableServerGtidSet.retainAll( + uuid -> checkpointGtidSet.forServerWithId(uuid) != null), + purgedServerGtid), + checkpointGtidSet); + } + /** * Computes the merged GTID set for the LATEST new-channel-position mode. * *

For old channels (UUIDs present in checkpoint), non-contiguous GTID ranges are fixed via - * {@link #fixRestoredGtidSet} to avoid MySQL replaying pre-checkpoint transactions. For new - * channels (UUIDs not in checkpoint), the server's full GTID is used to skip all history. + * {@link #fixOldChannelsGtidSet}. For new channels (UUIDs not in checkpoint), the server's full + * GTID is used to skip all history. * * @param availableServerGtidSet the GTID set currently available on the MySQL server * @param purgedServerGtid the GTID set already purged from the MySQL server @@ -53,12 +75,8 @@ public static GtidSet computeLatestModeGtidSet( // Step 1: Fix old channels' GTID ranges GtidSet fixedOldChannelsGtid = - fixRestoredGtidSet( - mergeGtidSetInto( - relevantAvailableServerGtidSet.retainAll( - uuid -> checkpointGtidSet.forServerWithId(uuid) != null), - purgedServerGtid), - checkpointGtidSet); + fixOldChannelsGtidSet( + relevantAvailableServerGtidSet, purgedServerGtid, checkpointGtidSet); // Step 2: For new channels, use server's full GTID to skip all history GtidSet newChannelsGtid = diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java index c44c34c6e5a..557a149a2d0 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java @@ -87,14 +87,15 @@ * Copied from Debezium project(1.9.8.Final) to fix * https://github.com/ververica/flink-cdc-connectors/issues/1944. * - *

Line 1432-1448 : Adjust GTID merging logic to support recovering from job which previously - * specifying starting offset on start. + *

Line 1432-1443 : Adjust GTID merging logic to support recovering from job which previously + * specifying starting offset on start. Uses {@link GtidUtils#fixOldChannelsGtidSet} for shared + * EARLIEST/LATEST logic. * - *

Line 1449-1457 : Fix LATEST mode GTID merging to avoid replaying pre-checkpoint transactions + *

Line 1444-1452 : Fix LATEST mode GTID merging to avoid replaying pre-checkpoint transactions * when checkpoint GTID has non-contiguous ranges. Delegates to {@link * GtidUtils#computeLatestModeGtidSet}. See FLINK-39149. * - *

Line 1508 : Add more error details for some exceptions. + *

Line 1490 : Add more error details for some exceptions. * *

Line 951-963 : Use iterator instead of index-based loop to avoid O(n²) complexity when * processing LinkedList rows in handleChange method. See FLINK-38846. @@ -1420,7 +1421,6 @@ public GtidSet filterGtidSet( GtidSet mergedGtidSet; if (connectorConfig.gtidNewChannelPosition() == GtidNewChannelPosition.EARLIEST) { - final GtidSet knownGtidSet = filteredGtidSet; LOGGER.info("Using first available positions for new GTID channels"); final GtidSet relevantAvailableServerGtidSet = (gtidSourceFilter != null) @@ -1440,12 +1440,8 @@ public GtidSet filterGtidSet( // recorded offset in the checkpoint, and the available GTID for other MySQL instances // should be completed. mergedGtidSet = - GtidUtils.fixRestoredGtidSet( - GtidUtils.mergeGtidSetInto( - relevantAvailableServerGtidSet.retainAll( - uuid -> knownGtidSet.forServerWithId(uuid) != null), - purgedServerGtid), - filteredGtidSet); + GtidUtils.fixOldChannelsGtidSet( + relevantAvailableServerGtidSet, purgedServerGtid, filteredGtidSet); } else { LOGGER.info("Using latest positions for new GTID channels"); mergedGtidSet = diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java index ec84202fcb0..14b3d96f147 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java @@ -89,14 +89,15 @@ *

Copied from Debezium project(1.9.8.Final) to fix * https://github.com/ververica/flink-cdc-connectors/issues/1944. * - *

Line 1437-1453 : Adjust GTID merging logic to support recovering from job which previously - * specifying starting offset on start. + *

Line 1438-1449 : Adjust GTID merging logic to support recovering from job which previously + * specifying starting offset on start. Uses {@link GtidUtils#fixOldChannelsGtidSet} for shared + * EARLIEST/LATEST logic. * - *

Line 1454-1462 : Fix LATEST mode GTID merging to avoid replaying pre-checkpoint transactions + *

Line 1450-1458 : Fix LATEST mode GTID merging to avoid replaying pre-checkpoint transactions * when checkpoint GTID has non-contiguous ranges. Delegates to {@link * GtidUtils#computeLatestModeGtidSet}. See FLINK-39149. * - *

Line 1513 : Add more error details for some exceptions. + *

Line 1496 : Add more error details for some exceptions. * *

Line 956-968 : Use iterator instead of index-based loop to avoid O(n²) complexity when * processing LinkedList rows in handleChange method. See FLINK-38846. @@ -1425,7 +1426,6 @@ public GtidSet filterGtidSet( GtidSet mergedGtidSet; if (connectorConfig.gtidNewChannelPosition() == GtidNewChannelPosition.EARLIEST) { - final GtidSet knownGtidSet = filteredGtidSet; LOGGER.info("Using first available positions for new GTID channels"); final GtidSet relevantAvailableServerGtidSet = (gtidSourceFilter != null) @@ -1445,12 +1445,8 @@ public GtidSet filterGtidSet( // recorded offset in the checkpoint, and the available GTID for other MySQL instances // should be completed. mergedGtidSet = - GtidUtils.fixRestoredGtidSet( - GtidUtils.mergeGtidSetInto( - relevantAvailableServerGtidSet.retainAll( - uuid -> knownGtidSet.forServerWithId(uuid) != null), - purgedServerGtid), - filteredGtidSet); + GtidUtils.fixOldChannelsGtidSet( + relevantAvailableServerGtidSet, purgedServerGtid, filteredGtidSet); } else { LOGGER.info("Using latest positions for new GTID channels"); mergedGtidSet =