Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ limitations under the License.
<name>flink-connector-mysql-cdc</name>
<packaging>jar</packaging>

<properties>
<mockito.version>3.12.4</mockito.version>
</properties>

<dependencies>

<!-- Debezium dependencies -->
Expand Down Expand Up @@ -183,6 +187,13 @@ limitations under the License.
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,52 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;

/** Utils for handling GTIDs. */
public class GtidUtils {

/**
* Computes the merged GTID set for the LATEST new-channel-position mode.
*
* <p>For old channels (UUIDs present in checkpoint), non-contiguous GTID ranges are fixed via
* {@link #fixRestoredGtidSet} to avoid MySQL replaying pre-checkpoint transactions. For new
* channels (UUIDs not in checkpoint), the server's full GTID is used to skip all history.
*
* @param availableServerGtidSet the GTID set currently available on the MySQL server
* @param purgedServerGtid the GTID set already purged from the MySQL server
* @param checkpointGtidSet the GTID set restored from checkpoint (after source filter applied)
* @param gtidSourceFilter optional predicate to filter GTID source UUIDs; may be null
* @return the merged GTID set suitable for binlog subscription
*/
public static GtidSet computeLatestModeGtidSet(
GtidSet availableServerGtidSet,
GtidSet purgedServerGtid,
GtidSet checkpointGtidSet,
Predicate<String> gtidSourceFilter) {
final GtidSet relevantAvailableServerGtidSet =
(gtidSourceFilter != null)
? availableServerGtidSet.retainAll(gtidSourceFilter)
: availableServerGtidSet;

// Step 1: Fix old channels' GTID ranges
GtidSet fixedOldChannelsGtid =
fixRestoredGtidSet(
mergeGtidSetInto(
relevantAvailableServerGtidSet.retainAll(
uuid -> checkpointGtidSet.forServerWithId(uuid) != null),
purgedServerGtid),
checkpointGtidSet);

// Step 2: For new channels, use server's full GTID to skip all history
GtidSet newChannelsGtid =
relevantAvailableServerGtidSet.retainAll(
uuid -> checkpointGtidSet.forServerWithId(uuid) == null);

// Step 3: Merge fixed old channels + new channels
return mergeGtidSetInto(fixedOldChannelsGtid, newChannelsGtid);
}
Comment thread
Hisoka-X marked this conversation as resolved.

/**
* This method corrects the GTID set that has been restored from a state or checkpoint using the
* GTID set fetched from the server via SHOW MASTER STATUS. During the correction process, the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,16 @@
* Copied from Debezium project(1.9.8.Final) to fix
* https://github.com/ververica/flink-cdc-connectors/issues/1944.
*
* <p>Line 1427-1433 : Adjust GTID merging logic to support recovering from job which previously
* <p>Line 1432-1448 : Adjust GTID merging logic to support recovering from job which previously
* specifying starting offset on start.
*
* <p>Line 1485 : Add more error details for some exceptions.
* <p>Line 1449-1457 : Fix LATEST mode GTID merging to avoid replaying pre-checkpoint transactions
* when checkpoint GTID has non-contiguous ranges. Delegates to {@link
* GtidUtils#computeLatestModeGtidSet}. See FLINK-39149.
*
* <p>Line 947-958 : Use iterator instead of index-based loop to avoid O(n²) complexity when
* <p>Line 1508 : Add more error details for some exceptions.
*
* <p>Line 951-963 : Use iterator instead of index-based loop to avoid O(n²) complexity when
* processing LinkedList rows in handleChange method. See FLINK-38846.
*/
public class MySqlStreamingChangeEventSource
Expand Down Expand Up @@ -1443,7 +1447,13 @@ public GtidSet filterGtidSet(
purgedServerGtid),
filteredGtidSet);
} else {
mergedGtidSet = availableServerGtidSet.with(filteredGtidSet);
LOGGER.info("Using latest positions for new GTID channels");
mergedGtidSet =
GtidUtils.computeLatestModeGtidSet(
availableServerGtidSet,
purgedServerGtid,
filteredGtidSet,
gtidSourceFilter);
}

LOGGER.info("Final merged GTID set to use when connecting to MySQL: {}", mergedGtidSet);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.debezium.connector.mysql;

import io.debezium.connector.mysql.MySqlConnectorConfig.GtidNewChannelPosition;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

import java.lang.reflect.Field;
import java.util.function.Predicate;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.when;

/**
* Integration test for {@link MySqlStreamingChangeEventSource#filterGtidSet} to ensure the LATEST
* mode fix (FLINK-39149) cannot regress.
*/
class FilterGtidSetTest {

@Test
void testFilterGtidSetLatestModeFixesNonContiguousGtid() throws Exception {
MySqlStreamingChangeEventSource source =
createSourceWithConfig(GtidNewChannelPosition.LATEST, null);

MySqlOffsetContext offsetContext = createOffsetContext("aaa-111:5000-8000");
GtidSet availableServerGtidSet = new GtidSet("aaa-111:1-10000,bbb-222:1-3000");
GtidSet purgedServerGtid = new GtidSet("");

GtidSet result =
source.filterGtidSet(offsetContext, availableServerGtidSet, purgedServerGtid);

assertThat(result.toString()).contains("aaa-111:1-8000");
assertThat(result.toString()).contains("bbb-222:1-3000");
}

@Test
void testFilterGtidSetLatestModeWithSourceFilter() throws Exception {
Predicate<String> excludeCcc = uuid -> !uuid.equals("ccc-333");
MySqlStreamingChangeEventSource source =
createSourceWithConfig(GtidNewChannelPosition.LATEST, excludeCcc);

MySqlOffsetContext offsetContext = createOffsetContext("aaa-111:5000-8000,bbb-222:1-2000");
GtidSet availableServerGtidSet =
new GtidSet("aaa-111:1-10000,bbb-222:1-3000,ccc-333:1-5000");
GtidSet purgedServerGtid = new GtidSet("");

GtidSet result =
source.filterGtidSet(offsetContext, availableServerGtidSet, purgedServerGtid);

assertThat(result.toString()).contains("aaa-111:1-8000");
assertThat(result.toString()).contains("bbb-222:1-2000");
assertThat(result.toString()).doesNotContain("ccc-333");
}

@Test
void testFilterGtidSetEarliestModeNotAffected() throws Exception {
MySqlStreamingChangeEventSource source =
createSourceWithConfig(GtidNewChannelPosition.EARLIEST, null);

MySqlOffsetContext offsetContext = createOffsetContext("aaa-111:5000-8000");
GtidSet availableServerGtidSet = new GtidSet("aaa-111:1-10000,bbb-222:1-3000");
GtidSet purgedServerGtid = new GtidSet("");

GtidSet result =
source.filterGtidSet(offsetContext, availableServerGtidSet, purgedServerGtid);

assertThat(result.toString()).contains("aaa-111:1-8000");
assertThat(result.forServerWithId("bbb-222")).isNull();
}

@Test
void testFilterGtidSetReturnsNullWhenNoGtid() throws Exception {
MySqlStreamingChangeEventSource source =
createSourceWithConfig(GtidNewChannelPosition.LATEST, null);

MySqlOffsetContext offsetContext = createOffsetContext(null);
GtidSet availableServerGtidSet = new GtidSet("aaa-111:1-10000");
GtidSet purgedServerGtid = new GtidSet("");

GtidSet result =
source.filterGtidSet(offsetContext, availableServerGtidSet, purgedServerGtid);

assertThat(result).isNull();
}

private static MySqlStreamingChangeEventSource createSourceWithConfig(
GtidNewChannelPosition channelPosition, Predicate<String> gtidSourceFilter)
throws Exception {
MySqlConnectorConfig mockConfig = Mockito.mock(MySqlConnectorConfig.class);
when(mockConfig.gtidNewChannelPosition()).thenReturn(channelPosition);
when(mockConfig.gtidSourceFilter()).thenReturn(gtidSourceFilter);

MySqlStreamingChangeEventSource source =
Mockito.mock(MySqlStreamingChangeEventSource.class, Mockito.CALLS_REAL_METHODS);

Field configField =
MySqlStreamingChangeEventSource.class.getDeclaredField("connectorConfig");
configField.setAccessible(true);
configField.set(source, mockConfig);

return source;
}

private static MySqlOffsetContext createOffsetContext(String gtidSetStr) {
MySqlOffsetContext offsetContext = Mockito.mock(MySqlOffsetContext.class);
when(offsetContext.gtidSet()).thenReturn(gtidSetStr);
return offsetContext;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.util.function.Predicate;
import java.util.stream.Stream;

import static io.debezium.connector.mysql.GtidUtils.computeLatestModeGtidSet;
import static io.debezium.connector.mysql.GtidUtils.fixRestoredGtidSet;
import static io.debezium.connector.mysql.GtidUtils.mergeGtidSetInto;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -83,6 +85,75 @@ private static Stream<Arguments> gtidSetsProvider() {
"A:1-20:30-35:45-50:60-65:75-80"));
}

/** Tests {@link GtidUtils#computeLatestModeGtidSet} for FLINK-39149. */
@ParameterizedTest(name = "{0}")
@MethodSource("latestModeGtidSetsProvider")
void testLatestModeGtidMerge(
String description,
String serverGtidStr,
String checkpointGtidStr,
String expectedMergedStr) {
GtidSet serverGtidSet = new GtidSet(serverGtidStr);
GtidSet checkpointGtidSet = new GtidSet(checkpointGtidStr);

GtidSet mergedGtidSet =
computeLatestModeGtidSet(serverGtidSet, new GtidSet(""), checkpointGtidSet, null);

assertThat(mergedGtidSet).hasToString(expectedMergedStr);

// Verify MySQL would not replay pre-checkpoint transactions
GtidSet transactionsToSend = serverGtidSet.subtract(mergedGtidSet);
for (GtidSet.UUIDSet uuidSet : checkpointGtidSet.getUUIDSets()) {
String uuid = uuidSet.getUUID();
long earliestCheckpointTx =
uuidSet.getIntervals().stream()
.mapToLong(GtidSet.Interval::getStart)
.min()
.orElse(1);
if (earliestCheckpointTx > 1) {
GtidSet.UUIDSet toSendUuidSet = transactionsToSend.forServerWithId(uuid);
if (toSendUuidSet != null) {
for (GtidSet.Interval interval : toSendUuidSet.getIntervals()) {
assertThat(interval.getStart())
.as(
"Should not replay pre-checkpoint transactions for UUID %s",
uuid)
.isGreaterThan(earliestCheckpointTx);
}
}
}
}
}

private static Stream<Arguments> latestModeGtidSetsProvider() {
return Stream.of(
Arguments.of(
"Old channel with non-contiguous GTID, new channel present",
"aaa-111:1-10000,bbb-222:1-3000",
"aaa-111:5000-8000",
"aaa-111:1-8000,bbb-222:1-3000"),
Arguments.of(
"Mixed old channels (contiguous and non-contiguous) with new channel",
"aaa-111:1-10000,bbb-222:1-3000,ccc-333:1-5000",
"aaa-111:5000-8000,bbb-222:1-2000",
"aaa-111:1-8000,bbb-222:1-2000,ccc-333:1-5000"),
Arguments.of(
"All old channels, no new channels",
"aaa-111:1-10000,bbb-222:1-3000",
"aaa-111:1-8000,bbb-222:1-2000",
"aaa-111:1-8000,bbb-222:1-2000"),
Arguments.of(
"Contiguous checkpoint GTID, no regression",
"aaa-111:1-10000,bbb-222:1-3000",
"aaa-111:1-8000",
"aaa-111:1-8000,bbb-222:1-3000"),
Arguments.of(
"Only new channels, checkpoint has unknown UUID",
"aaa-111:1-10000,bbb-222:1-3000",
"xxx-999:1-500",
"aaa-111:1-10000,bbb-222:1-3000,xxx-999:1-500"));
}

@Test
void testMergingGtidSets() {
GtidSet base = new GtidSet("A:1-100");
Expand All @@ -96,4 +167,65 @@ void testMergingGtidSets() {
toMerge = new GtidSet("A:1-10,C:1-10");
assertThat(mergeGtidSetInto(base, toMerge)).hasToString("A:1-100,B:1-100,C:1-10");
}

/** Tests {@link GtidUtils#computeLatestModeGtidSet} with {@code gtidSourceFilter}. */
@Test
void testLatestModeGtidMergeWithSourceFilter() {
GtidSet availableServerGtidSet =
new GtidSet("aaa-111:1-10000,bbb-222:1-3000,ccc-333:1-5000");
GtidSet checkpointGtidSet = new GtidSet("aaa-111:5000-8000,bbb-222:1-2000");
Predicate<String> gtidSourceFilter = uuid -> !uuid.equals("ccc-333");

GtidSet mergedGtidSet =
computeLatestModeGtidSet(
availableServerGtidSet,
new GtidSet(""),
checkpointGtidSet,
gtidSourceFilter);

assertThat(mergedGtidSet.toString()).contains("aaa-111:1-8000");
assertThat(mergedGtidSet.toString()).contains("bbb-222:1-2000");
assertThat(mergedGtidSet.toString()).doesNotContain("ccc-333");
}

/** Tests {@link GtidUtils#computeLatestModeGtidSet} with purged GTID. */
@Test
void testLatestModeGtidMergeWithPurgedGtid() {
GtidSet availableServerGtidSet = new GtidSet("aaa-111:50-10000,bbb-222:1-3000");
GtidSet purgedServerGtid = new GtidSet("aaa-111:1-49");
GtidSet checkpointGtidSet = new GtidSet("aaa-111:5000-8000");

GtidSet mergedGtidSet =
computeLatestModeGtidSet(
availableServerGtidSet, purgedServerGtid, checkpointGtidSet, null);

assertThat(mergedGtidSet.toString()).contains("aaa-111:50-8000");
assertThat(mergedGtidSet.toString()).contains("bbb-222:1-3000");

// Verify no pre-checkpoint replay
GtidSet transactionsToSend = availableServerGtidSet.subtract(mergedGtidSet);
GtidSet.UUIDSet aaaToSend = transactionsToSend.forServerWithId("aaa-111");
if (aaaToSend != null) {
for (GtidSet.Interval interval : aaaToSend.getIntervals()) {
assertThat(interval.getStart())
.as("Should not request pre-checkpoint transactions")
.isGreaterThanOrEqualTo(8001);
}
}
}

/** Tests {@link GtidUtils#computeLatestModeGtidSet} with a completely purged UUID. */
@Test
void testLatestModeGtidMergeWithFullyPurgedChannel() {
GtidSet availableServerGtidSet = new GtidSet("bbb-222:1-3000");
GtidSet purgedServerGtid = new GtidSet("aaa-111:1-500");
GtidSet checkpointGtidSet = new GtidSet("aaa-111:200-400");

GtidSet mergedGtidSet =
computeLatestModeGtidSet(
availableServerGtidSet, purgedServerGtid, checkpointGtidSet, null);

assertThat(mergedGtidSet.toString()).contains("aaa-111:1-400");
assertThat(mergedGtidSet.toString()).contains("bbb-222:1-3000");
}
}
Loading
Loading