Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ limitations under the License.
<name>flink-connector-mysql-cdc</name>
<packaging>jar</packaging>

<properties>
<mockito.version>3.12.4</mockito.version>
</properties>

<dependencies>

<!-- Debezium dependencies -->
Expand Down Expand Up @@ -183,6 +187,13 @@ limitations under the License.
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,70 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;

/** Utils for handling GTIDs. */
public class GtidUtils {

/**
* Fixes old channels' GTID ranges by filling prefix gaps using server GTID intervals.
*
* <p>This is the shared logic between EARLIEST and LATEST modes. For UUIDs present in the
* checkpoint, non-contiguous GTID ranges are corrected via {@link #fixRestoredGtidSet} to avoid
* MySQL replaying pre-checkpoint transactions.
*
* @param availableServerGtidSet the relevant (filtered) server GTID set
* @param purgedServerGtid the GTID set already purged from the MySQL server
* @param checkpointGtidSet the GTID set restored from checkpoint
* @return the fixed GTID set for old channels
*/
public static GtidSet fixOldChannelsGtidSet(
GtidSet availableServerGtidSet, GtidSet purgedServerGtid, GtidSet checkpointGtidSet) {
return fixRestoredGtidSet(
mergeGtidSetInto(
availableServerGtidSet.retainAll(
uuid -> checkpointGtidSet.forServerWithId(uuid) != null),
purgedServerGtid),
checkpointGtidSet);
}

/**
* Computes the merged GTID set for the LATEST new-channel-position mode.
*
* <p>For old channels (UUIDs present in checkpoint), non-contiguous GTID ranges are fixed via
* {@link #fixOldChannelsGtidSet}. For new channels (UUIDs not in checkpoint), the server's full
* GTID is used to skip all history.
*
* @param availableServerGtidSet the GTID set currently available on the MySQL server
* @param purgedServerGtid the GTID set already purged from the MySQL server
* @param checkpointGtidSet the GTID set restored from checkpoint (after source filter applied)
* @param gtidSourceFilter optional predicate to filter GTID source UUIDs; may be null
* @return the merged GTID set suitable for binlog subscription
*/
public static GtidSet computeLatestModeGtidSet(
GtidSet availableServerGtidSet,
GtidSet purgedServerGtid,
GtidSet checkpointGtidSet,
Predicate<String> gtidSourceFilter) {
final GtidSet relevantAvailableServerGtidSet =
(gtidSourceFilter != null)
? availableServerGtidSet.retainAll(gtidSourceFilter)
: availableServerGtidSet;

// Step 1: Fix old channels' GTID ranges
GtidSet fixedOldChannelsGtid =
fixOldChannelsGtidSet(
relevantAvailableServerGtidSet, purgedServerGtid, checkpointGtidSet);

// Step 2: For new channels, use server's full GTID to skip all history
GtidSet newChannelsGtid =
relevantAvailableServerGtidSet.retainAll(
uuid -> checkpointGtidSet.forServerWithId(uuid) == null);

// Step 3: Merge fixed old channels + new channels
return mergeGtidSetInto(fixedOldChannelsGtid, newChannelsGtid);
}
Comment thread
Hisoka-X marked this conversation as resolved.

/**
* This method corrects the GTID set that has been restored from a state or checkpoint using the
* GTID set fetched from the server via SHOW MASTER STATUS. During the correction process, the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,17 @@
* Copied from Debezium project(1.9.8.Final) to fix
* https://github.com/ververica/flink-cdc-connectors/issues/1944.
*
* <p>Line 1427-1433 : Adjust GTID merging logic to support recovering from job which previously
* specifying starting offset on start.
* <p>Line 1432-1443 : Adjust GTID merging logic to support recovering from job which previously
* specifying starting offset on start. Uses {@link GtidUtils#fixOldChannelsGtidSet} for shared
* EARLIEST/LATEST logic.
*
* <p>Line 1485 : Add more error details for some exceptions.
* <p>Line 1444-1452 : Fix LATEST mode GTID merging to avoid replaying pre-checkpoint transactions
* when checkpoint GTID has non-contiguous ranges. Delegates to {@link
* GtidUtils#computeLatestModeGtidSet}. See FLINK-39149.
*
* <p>Line 947-958 : Use iterator instead of index-based loop to avoid O(n²) complexity when
* <p>Line 1490 : Add more error details for some exceptions.
*
* <p>Line 951-963 : Use iterator instead of index-based loop to avoid O(n²) complexity when
* processing LinkedList rows in handleChange method. See FLINK-38846.
*/
public class MySqlStreamingChangeEventSource
Expand Down Expand Up @@ -1416,7 +1421,6 @@ public GtidSet filterGtidSet(
GtidSet mergedGtidSet;

if (connectorConfig.gtidNewChannelPosition() == GtidNewChannelPosition.EARLIEST) {
final GtidSet knownGtidSet = filteredGtidSet;
LOGGER.info("Using first available positions for new GTID channels");
final GtidSet relevantAvailableServerGtidSet =
(gtidSourceFilter != null)
Expand All @@ -1436,14 +1440,16 @@ public GtidSet filterGtidSet(
// recorded offset in the checkpoint, and the available GTID for other MySQL instances
// should be completed.
mergedGtidSet =
GtidUtils.fixRestoredGtidSet(
GtidUtils.mergeGtidSetInto(
relevantAvailableServerGtidSet.retainAll(
uuid -> knownGtidSet.forServerWithId(uuid) != null),
purgedServerGtid),
filteredGtidSet);
GtidUtils.fixOldChannelsGtidSet(
relevantAvailableServerGtidSet, purgedServerGtid, filteredGtidSet);
} else {
mergedGtidSet = availableServerGtidSet.with(filteredGtidSet);
LOGGER.info("Using latest positions for new GTID channels");
mergedGtidSet =
GtidUtils.computeLatestModeGtidSet(
availableServerGtidSet,
purgedServerGtid,
filteredGtidSet,
gtidSourceFilter);
}

LOGGER.info("Final merged GTID set to use when connecting to MySQL: {}", mergedGtidSet);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.debezium.connector.mysql;

import io.debezium.connector.mysql.MySqlConnectorConfig.GtidNewChannelPosition;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

import java.lang.reflect.Field;
import java.util.function.Predicate;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.when;

/**
* Integration test for {@link MySqlStreamingChangeEventSource#filterGtidSet} to ensure the LATEST
* mode fix (FLINK-39149) cannot regress.
*/
class FilterGtidSetTest {

@Test
void testFilterGtidSetLatestModeFixesNonContiguousGtid() throws Exception {
MySqlStreamingChangeEventSource source =
createSourceWithConfig(GtidNewChannelPosition.LATEST, null);

MySqlOffsetContext offsetContext = createOffsetContext("aaa-111:5000-8000");
GtidSet availableServerGtidSet = new GtidSet("aaa-111:1-10000,bbb-222:1-3000");
GtidSet purgedServerGtid = new GtidSet("");

GtidSet result =
source.filterGtidSet(offsetContext, availableServerGtidSet, purgedServerGtid);

assertThat(result.toString()).contains("aaa-111:1-8000");
assertThat(result.toString()).contains("bbb-222:1-3000");
}

@Test
void testFilterGtidSetLatestModeWithSourceFilter() throws Exception {
Predicate<String> excludeCcc = uuid -> !uuid.equals("ccc-333");
MySqlStreamingChangeEventSource source =
createSourceWithConfig(GtidNewChannelPosition.LATEST, excludeCcc);

MySqlOffsetContext offsetContext = createOffsetContext("aaa-111:5000-8000,bbb-222:1-2000");
GtidSet availableServerGtidSet =
new GtidSet("aaa-111:1-10000,bbb-222:1-3000,ccc-333:1-5000");
GtidSet purgedServerGtid = new GtidSet("");

GtidSet result =
source.filterGtidSet(offsetContext, availableServerGtidSet, purgedServerGtid);

assertThat(result.toString()).contains("aaa-111:1-8000");
assertThat(result.toString()).contains("bbb-222:1-2000");
assertThat(result.toString()).doesNotContain("ccc-333");
}

@Test
void testFilterGtidSetEarliestModeNotAffected() throws Exception {
MySqlStreamingChangeEventSource source =
createSourceWithConfig(GtidNewChannelPosition.EARLIEST, null);

MySqlOffsetContext offsetContext = createOffsetContext("aaa-111:5000-8000");
GtidSet availableServerGtidSet = new GtidSet("aaa-111:1-10000,bbb-222:1-3000");
GtidSet purgedServerGtid = new GtidSet("");

GtidSet result =
source.filterGtidSet(offsetContext, availableServerGtidSet, purgedServerGtid);

assertThat(result.toString()).contains("aaa-111:1-8000");
assertThat(result.forServerWithId("bbb-222")).isNull();
}

@Test
void testFilterGtidSetReturnsNullWhenNoGtid() throws Exception {
MySqlStreamingChangeEventSource source =
createSourceWithConfig(GtidNewChannelPosition.LATEST, null);

MySqlOffsetContext offsetContext = createOffsetContext(null);
GtidSet availableServerGtidSet = new GtidSet("aaa-111:1-10000");
GtidSet purgedServerGtid = new GtidSet("");

GtidSet result =
source.filterGtidSet(offsetContext, availableServerGtidSet, purgedServerGtid);

assertThat(result).isNull();
}

private static MySqlStreamingChangeEventSource createSourceWithConfig(
GtidNewChannelPosition channelPosition, Predicate<String> gtidSourceFilter)
throws Exception {
MySqlConnectorConfig mockConfig = Mockito.mock(MySqlConnectorConfig.class);
when(mockConfig.gtidNewChannelPosition()).thenReturn(channelPosition);
when(mockConfig.gtidSourceFilter()).thenReturn(gtidSourceFilter);

MySqlStreamingChangeEventSource source =
Mockito.mock(MySqlStreamingChangeEventSource.class, Mockito.CALLS_REAL_METHODS);

Field configField =
MySqlStreamingChangeEventSource.class.getDeclaredField("connectorConfig");
configField.setAccessible(true);
configField.set(source, mockConfig);

return source;
}

private static MySqlOffsetContext createOffsetContext(String gtidSetStr) {
MySqlOffsetContext offsetContext = Mockito.mock(MySqlOffsetContext.class);
when(offsetContext.gtidSet()).thenReturn(gtidSetStr);
return offsetContext;
}
}
Loading
Loading