diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/pom.xml b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/pom.xml index 96366a9af91..5d9b2f774bc 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/pom.xml +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/pom.xml @@ -29,6 +29,10 @@ limitations under the License. flink-connector-mysql-cdc jar + + 3.12.4 + + @@ -183,6 +187,13 @@ limitations under the License. test + + org.mockito + mockito-core + ${mockito.version} + test + + org.apache.commons commons-lang3 diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/GtidUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/GtidUtils.java index f6c4987e74d..5956de9b312 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/GtidUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/GtidUtils.java @@ -23,10 +23,70 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.function.Predicate; /** Utils for handling GTIDs. */ public class GtidUtils { + /** + * Fixes old channels' GTID ranges by filling prefix gaps using server GTID intervals. + * + *

This is the shared logic between EARLIEST and LATEST modes. For UUIDs present in the + * checkpoint, non-contiguous GTID ranges are corrected via {@link #fixRestoredGtidSet} to avoid + * MySQL replaying pre-checkpoint transactions. + * + * @param availableServerGtidSet the relevant (filtered) server GTID set + * @param purgedServerGtid the GTID set already purged from the MySQL server + * @param checkpointGtidSet the GTID set restored from checkpoint + * @return the fixed GTID set for old channels + */ + public static GtidSet fixOldChannelsGtidSet( + GtidSet availableServerGtidSet, GtidSet purgedServerGtid, GtidSet checkpointGtidSet) { + return fixRestoredGtidSet( + mergeGtidSetInto( + availableServerGtidSet.retainAll( + uuid -> checkpointGtidSet.forServerWithId(uuid) != null), + purgedServerGtid), + checkpointGtidSet); + } + + /** + * Computes the merged GTID set for the LATEST new-channel-position mode. + * + *

For old channels (UUIDs present in checkpoint), non-contiguous GTID ranges are fixed via + * {@link #fixOldChannelsGtidSet}. For new channels (UUIDs not in checkpoint), the server's full + * GTID is used to skip all history. + * + * @param availableServerGtidSet the GTID set currently available on the MySQL server + * @param purgedServerGtid the GTID set already purged from the MySQL server + * @param checkpointGtidSet the GTID set restored from checkpoint (after source filter applied) + * @param gtidSourceFilter optional predicate to filter GTID source UUIDs; may be null + * @return the merged GTID set suitable for binlog subscription + */ + public static GtidSet computeLatestModeGtidSet( + GtidSet availableServerGtidSet, + GtidSet purgedServerGtid, + GtidSet checkpointGtidSet, + Predicate gtidSourceFilter) { + final GtidSet relevantAvailableServerGtidSet = + (gtidSourceFilter != null) + ? availableServerGtidSet.retainAll(gtidSourceFilter) + : availableServerGtidSet; + + // Step 1: Fix old channels' GTID ranges + GtidSet fixedOldChannelsGtid = + fixOldChannelsGtidSet( + relevantAvailableServerGtidSet, purgedServerGtid, checkpointGtidSet); + + // Step 2: For new channels, use server's full GTID to skip all history + GtidSet newChannelsGtid = + relevantAvailableServerGtidSet.retainAll( + uuid -> checkpointGtidSet.forServerWithId(uuid) == null); + + // Step 3: Merge fixed old channels + new channels + return mergeGtidSetInto(fixedOldChannelsGtid, newChannelsGtid); + } + /** * This method corrects the GTID set that has been restored from a state or checkpoint using the * GTID set fetched from the server via SHOW MASTER STATUS. During the correction process, the diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java index ceecca942a9..557a149a2d0 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java @@ -87,12 +87,17 @@ * Copied from Debezium project(1.9.8.Final) to fix * https://github.com/ververica/flink-cdc-connectors/issues/1944. * - *

Line 1427-1433 : Adjust GTID merging logic to support recovering from job which previously - * specifying starting offset on start. + *

Line 1432-1443 : Adjust GTID merging logic to support recovering from job which previously + * specifying starting offset on start. Uses {@link GtidUtils#fixOldChannelsGtidSet} for shared + * EARLIEST/LATEST logic. * - *

Line 1485 : Add more error details for some exceptions. + *

Line 1444-1452 : Fix LATEST mode GTID merging to avoid replaying pre-checkpoint transactions + * when checkpoint GTID has non-contiguous ranges. Delegates to {@link + * GtidUtils#computeLatestModeGtidSet}. See FLINK-39149. * - *

Line 947-958 : Use iterator instead of index-based loop to avoid O(n²) complexity when + *

Line 1490 : Add more error details for some exceptions. + * + *

Line 951-963 : Use iterator instead of index-based loop to avoid O(n²) complexity when * processing LinkedList rows in handleChange method. See FLINK-38846. */ public class MySqlStreamingChangeEventSource @@ -1416,7 +1421,6 @@ public GtidSet filterGtidSet( GtidSet mergedGtidSet; if (connectorConfig.gtidNewChannelPosition() == GtidNewChannelPosition.EARLIEST) { - final GtidSet knownGtidSet = filteredGtidSet; LOGGER.info("Using first available positions for new GTID channels"); final GtidSet relevantAvailableServerGtidSet = (gtidSourceFilter != null) @@ -1436,14 +1440,16 @@ public GtidSet filterGtidSet( // recorded offset in the checkpoint, and the available GTID for other MySQL instances // should be completed. mergedGtidSet = - GtidUtils.fixRestoredGtidSet( - GtidUtils.mergeGtidSetInto( - relevantAvailableServerGtidSet.retainAll( - uuid -> knownGtidSet.forServerWithId(uuid) != null), - purgedServerGtid), - filteredGtidSet); + GtidUtils.fixOldChannelsGtidSet( + relevantAvailableServerGtidSet, purgedServerGtid, filteredGtidSet); } else { - mergedGtidSet = availableServerGtidSet.with(filteredGtidSet); + LOGGER.info("Using latest positions for new GTID channels"); + mergedGtidSet = + GtidUtils.computeLatestModeGtidSet( + availableServerGtidSet, + purgedServerGtid, + filteredGtidSet, + gtidSourceFilter); } LOGGER.info("Final merged GTID set to use when connecting to MySQL: {}", mergedGtidSet); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/io/debezium/connector/mysql/FilterGtidSetTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/io/debezium/connector/mysql/FilterGtidSetTest.java new file mode 100644 index 00000000000..d50f2eea05b --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/io/debezium/connector/mysql/FilterGtidSetTest.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.debezium.connector.mysql; + +import io.debezium.connector.mysql.MySqlConnectorConfig.GtidNewChannelPosition; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.lang.reflect.Field; +import java.util.function.Predicate; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.when; + +/** + * Integration test for {@link MySqlStreamingChangeEventSource#filterGtidSet} to ensure the LATEST + * mode fix (FLINK-39149) cannot regress. + */ +class FilterGtidSetTest { + + @Test + void testFilterGtidSetLatestModeFixesNonContiguousGtid() throws Exception { + MySqlStreamingChangeEventSource source = + createSourceWithConfig(GtidNewChannelPosition.LATEST, null); + + MySqlOffsetContext offsetContext = createOffsetContext("aaa-111:5000-8000"); + GtidSet availableServerGtidSet = new GtidSet("aaa-111:1-10000,bbb-222:1-3000"); + GtidSet purgedServerGtid = new GtidSet(""); + + GtidSet result = + source.filterGtidSet(offsetContext, availableServerGtidSet, purgedServerGtid); + + assertThat(result.toString()).contains("aaa-111:1-8000"); + assertThat(result.toString()).contains("bbb-222:1-3000"); + } + + @Test + void testFilterGtidSetLatestModeWithSourceFilter() throws Exception { + Predicate excludeCcc = uuid -> !uuid.equals("ccc-333"); + MySqlStreamingChangeEventSource source = + createSourceWithConfig(GtidNewChannelPosition.LATEST, excludeCcc); + + MySqlOffsetContext offsetContext = createOffsetContext("aaa-111:5000-8000,bbb-222:1-2000"); + GtidSet availableServerGtidSet = + new GtidSet("aaa-111:1-10000,bbb-222:1-3000,ccc-333:1-5000"); + GtidSet purgedServerGtid = new GtidSet(""); + + GtidSet result = + source.filterGtidSet(offsetContext, availableServerGtidSet, purgedServerGtid); + + assertThat(result.toString()).contains("aaa-111:1-8000"); + assertThat(result.toString()).contains("bbb-222:1-2000"); + assertThat(result.toString()).doesNotContain("ccc-333"); + } + + @Test + void testFilterGtidSetEarliestModeNotAffected() throws Exception { + MySqlStreamingChangeEventSource source = + createSourceWithConfig(GtidNewChannelPosition.EARLIEST, null); + + MySqlOffsetContext offsetContext = createOffsetContext("aaa-111:5000-8000"); + GtidSet availableServerGtidSet = new GtidSet("aaa-111:1-10000,bbb-222:1-3000"); + GtidSet purgedServerGtid = new GtidSet(""); + + GtidSet result = + source.filterGtidSet(offsetContext, availableServerGtidSet, purgedServerGtid); + + assertThat(result.toString()).contains("aaa-111:1-8000"); + assertThat(result.forServerWithId("bbb-222")).isNull(); + } + + @Test + void testFilterGtidSetReturnsNullWhenNoGtid() throws Exception { + MySqlStreamingChangeEventSource source = + createSourceWithConfig(GtidNewChannelPosition.LATEST, null); + + MySqlOffsetContext offsetContext = createOffsetContext(null); + GtidSet availableServerGtidSet = new GtidSet("aaa-111:1-10000"); + GtidSet purgedServerGtid = new GtidSet(""); + + GtidSet result = + source.filterGtidSet(offsetContext, availableServerGtidSet, purgedServerGtid); + + assertThat(result).isNull(); + } + + private static MySqlStreamingChangeEventSource createSourceWithConfig( + GtidNewChannelPosition channelPosition, Predicate gtidSourceFilter) + throws Exception { + MySqlConnectorConfig mockConfig = Mockito.mock(MySqlConnectorConfig.class); + when(mockConfig.gtidNewChannelPosition()).thenReturn(channelPosition); + when(mockConfig.gtidSourceFilter()).thenReturn(gtidSourceFilter); + + MySqlStreamingChangeEventSource source = + Mockito.mock(MySqlStreamingChangeEventSource.class, Mockito.CALLS_REAL_METHODS); + + Field configField = + MySqlStreamingChangeEventSource.class.getDeclaredField("connectorConfig"); + configField.setAccessible(true); + configField.set(source, mockConfig); + + return source; + } + + private static MySqlOffsetContext createOffsetContext(String gtidSetStr) { + MySqlOffsetContext offsetContext = Mockito.mock(MySqlOffsetContext.class); + when(offsetContext.gtidSet()).thenReturn(gtidSetStr); + return offsetContext; + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/io/debezium/connector/mysql/GtidUtilsTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/io/debezium/connector/mysql/GtidUtilsTest.java index 88d4b8aba44..f0dbe6cd46f 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/io/debezium/connector/mysql/GtidUtilsTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/io/debezium/connector/mysql/GtidUtilsTest.java @@ -22,8 +22,10 @@ import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; +import java.util.function.Predicate; import java.util.stream.Stream; +import static io.debezium.connector.mysql.GtidUtils.computeLatestModeGtidSet; import static io.debezium.connector.mysql.GtidUtils.fixRestoredGtidSet; import static io.debezium.connector.mysql.GtidUtils.mergeGtidSetInto; import static org.assertj.core.api.Assertions.assertThat; @@ -83,6 +85,75 @@ private static Stream gtidSetsProvider() { "A:1-20:30-35:45-50:60-65:75-80")); } + /** Tests {@link GtidUtils#computeLatestModeGtidSet} for FLINK-39149. */ + @ParameterizedTest(name = "{0}") + @MethodSource("latestModeGtidSetsProvider") + void testLatestModeGtidMerge( + String description, + String serverGtidStr, + String checkpointGtidStr, + String expectedMergedStr) { + GtidSet serverGtidSet = new GtidSet(serverGtidStr); + GtidSet checkpointGtidSet = new GtidSet(checkpointGtidStr); + + GtidSet mergedGtidSet = + computeLatestModeGtidSet(serverGtidSet, new GtidSet(""), checkpointGtidSet, null); + + assertThat(mergedGtidSet).hasToString(expectedMergedStr); + + // Verify MySQL would not replay pre-checkpoint transactions + GtidSet transactionsToSend = serverGtidSet.subtract(mergedGtidSet); + for (GtidSet.UUIDSet uuidSet : checkpointGtidSet.getUUIDSets()) { + String uuid = uuidSet.getUUID(); + long earliestCheckpointTx = + uuidSet.getIntervals().stream() + .mapToLong(GtidSet.Interval::getStart) + .min() + .orElse(1); + if (earliestCheckpointTx > 1) { + GtidSet.UUIDSet toSendUuidSet = transactionsToSend.forServerWithId(uuid); + if (toSendUuidSet != null) { + for (GtidSet.Interval interval : toSendUuidSet.getIntervals()) { + assertThat(interval.getStart()) + .as( + "Should not replay pre-checkpoint transactions for UUID %s", + uuid) + .isGreaterThan(earliestCheckpointTx); + } + } + } + } + } + + private static Stream latestModeGtidSetsProvider() { + return Stream.of( + Arguments.of( + "Old channel with non-contiguous GTID, new channel present", + "aaa-111:1-10000,bbb-222:1-3000", + "aaa-111:5000-8000", + "aaa-111:1-8000,bbb-222:1-3000"), + Arguments.of( + "Mixed old channels (contiguous and non-contiguous) with new channel", + "aaa-111:1-10000,bbb-222:1-3000,ccc-333:1-5000", + "aaa-111:5000-8000,bbb-222:1-2000", + "aaa-111:1-8000,bbb-222:1-2000,ccc-333:1-5000"), + Arguments.of( + "All old channels, no new channels", + "aaa-111:1-10000,bbb-222:1-3000", + "aaa-111:1-8000,bbb-222:1-2000", + "aaa-111:1-8000,bbb-222:1-2000"), + Arguments.of( + "Contiguous checkpoint GTID, no regression", + "aaa-111:1-10000,bbb-222:1-3000", + "aaa-111:1-8000", + "aaa-111:1-8000,bbb-222:1-3000"), + Arguments.of( + "Only new channels, checkpoint has unknown UUID", + "aaa-111:1-10000,bbb-222:1-3000", + "xxx-999:1-500", + "aaa-111:1-10000,bbb-222:1-3000,xxx-999:1-500")); + } + @Test void testMergingGtidSets() { GtidSet base = new GtidSet("A:1-100"); @@ -96,4 +167,65 @@ void testMergingGtidSets() { toMerge = new GtidSet("A:1-10,C:1-10"); assertThat(mergeGtidSetInto(base, toMerge)).hasToString("A:1-100,B:1-100,C:1-10"); } + + /** Tests {@link GtidUtils#computeLatestModeGtidSet} with {@code gtidSourceFilter}. */ + @Test + void testLatestModeGtidMergeWithSourceFilter() { + GtidSet availableServerGtidSet = + new GtidSet("aaa-111:1-10000,bbb-222:1-3000,ccc-333:1-5000"); + GtidSet checkpointGtidSet = new GtidSet("aaa-111:5000-8000,bbb-222:1-2000"); + Predicate gtidSourceFilter = uuid -> !uuid.equals("ccc-333"); + + GtidSet mergedGtidSet = + computeLatestModeGtidSet( + availableServerGtidSet, + new GtidSet(""), + checkpointGtidSet, + gtidSourceFilter); + + assertThat(mergedGtidSet.toString()).contains("aaa-111:1-8000"); + assertThat(mergedGtidSet.toString()).contains("bbb-222:1-2000"); + assertThat(mergedGtidSet.toString()).doesNotContain("ccc-333"); + } + + /** Tests {@link GtidUtils#computeLatestModeGtidSet} with purged GTID. */ + @Test + void testLatestModeGtidMergeWithPurgedGtid() { + GtidSet availableServerGtidSet = new GtidSet("aaa-111:50-10000,bbb-222:1-3000"); + GtidSet purgedServerGtid = new GtidSet("aaa-111:1-49"); + GtidSet checkpointGtidSet = new GtidSet("aaa-111:5000-8000"); + + GtidSet mergedGtidSet = + computeLatestModeGtidSet( + availableServerGtidSet, purgedServerGtid, checkpointGtidSet, null); + + assertThat(mergedGtidSet.toString()).contains("aaa-111:50-8000"); + assertThat(mergedGtidSet.toString()).contains("bbb-222:1-3000"); + + // Verify no pre-checkpoint replay + GtidSet transactionsToSend = availableServerGtidSet.subtract(mergedGtidSet); + GtidSet.UUIDSet aaaToSend = transactionsToSend.forServerWithId("aaa-111"); + if (aaaToSend != null) { + for (GtidSet.Interval interval : aaaToSend.getIntervals()) { + assertThat(interval.getStart()) + .as("Should not request pre-checkpoint transactions") + .isGreaterThanOrEqualTo(8001); + } + } + } + + /** Tests {@link GtidUtils#computeLatestModeGtidSet} with a completely purged UUID. */ + @Test + void testLatestModeGtidMergeWithFullyPurgedChannel() { + GtidSet availableServerGtidSet = new GtidSet("bbb-222:1-3000"); + GtidSet purgedServerGtid = new GtidSet("aaa-111:1-500"); + GtidSet checkpointGtidSet = new GtidSet("aaa-111:200-400"); + + GtidSet mergedGtidSet = + computeLatestModeGtidSet( + availableServerGtidSet, purgedServerGtid, checkpointGtidSet, null); + + assertThat(mergedGtidSet.toString()).contains("aaa-111:1-400"); + assertThat(mergedGtidSet.toString()).contains("bbb-222:1-3000"); + } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java index 279d6fb3a3b..14b3d96f147 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java @@ -89,12 +89,17 @@ *

Copied from Debezium project(1.9.8.Final) to fix * https://github.com/ververica/flink-cdc-connectors/issues/1944. * - *

Line 1427-1433 : Adjust GTID merging logic to support recovering from job which previously - * specifying starting offset on start. + *

Line 1438-1449 : Adjust GTID merging logic to support recovering from job which previously + * specifying starting offset on start. Uses {@link GtidUtils#fixOldChannelsGtidSet} for shared + * EARLIEST/LATEST logic. * - *

Line 1485 : Add more error details for some exceptions. + *

Line 1450-1458 : Fix LATEST mode GTID merging to avoid replaying pre-checkpoint transactions + * when checkpoint GTID has non-contiguous ranges. Delegates to {@link + * GtidUtils#computeLatestModeGtidSet}. See FLINK-39149. * - *

Line 951-964 : Use iterator instead of index-based loop to avoid O(n²) complexity when + *

Line 1496 : Add more error details for some exceptions. + * + *

Line 956-968 : Use iterator instead of index-based loop to avoid O(n²) complexity when * processing LinkedList rows in handleChange method. See FLINK-38846. */ public class MySqlStreamingChangeEventSource @@ -1421,7 +1426,6 @@ public GtidSet filterGtidSet( GtidSet mergedGtidSet; if (connectorConfig.gtidNewChannelPosition() == GtidNewChannelPosition.EARLIEST) { - final GtidSet knownGtidSet = filteredGtidSet; LOGGER.info("Using first available positions for new GTID channels"); final GtidSet relevantAvailableServerGtidSet = (gtidSourceFilter != null) @@ -1441,14 +1445,16 @@ public GtidSet filterGtidSet( // recorded offset in the checkpoint, and the available GTID for other MySQL instances // should be completed. mergedGtidSet = - GtidUtils.fixRestoredGtidSet( - GtidUtils.mergeGtidSetInto( - relevantAvailableServerGtidSet.retainAll( - uuid -> knownGtidSet.forServerWithId(uuid) != null), - purgedServerGtid), - filteredGtidSet); + GtidUtils.fixOldChannelsGtidSet( + relevantAvailableServerGtidSet, purgedServerGtid, filteredGtidSet); } else { - mergedGtidSet = availableServerGtidSet.with(filteredGtidSet); + LOGGER.info("Using latest positions for new GTID channels"); + mergedGtidSet = + GtidUtils.computeLatestModeGtidSet( + availableServerGtidSet, + purgedServerGtid, + filteredGtidSet, + gtidSourceFilter); } LOGGER.info("Final merged GTID set to use when connecting to MySQL: {}", mergedGtidSet);