Skip to content

Commit db96202

Browse files
Hisoka-XThorneANN
authored andcommitted
[FLINK-39149][mysql-cdc] Implement LATEST mode GTID merging to prevent replaying pre-checkpoint transactions with non-contiguous ranges (apache#4286)
1 parent edc012f commit db96202

6 files changed

Lines changed: 364 additions & 24 deletions

File tree

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/pom.xml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@ limitations under the License.
2929
<name>flink-connector-mysql-cdc</name>
3030
<packaging>jar</packaging>
3131

32+
<properties>
33+
<mockito.version>3.12.4</mockito.version>
34+
</properties>
35+
3236
<dependencies>
3337

3438
<!-- Debezium dependencies -->
@@ -195,6 +199,13 @@ limitations under the License.
195199
<scope>test</scope>
196200
</dependency>
197201

202+
<dependency>
203+
<groupId>org.mockito</groupId>
204+
<artifactId>mockito-core</artifactId>
205+
<version>${mockito.version}</version>
206+
<scope>test</scope>
207+
</dependency>
208+
198209
<dependency>
199210
<groupId>org.apache.commons</groupId>
200211
<artifactId>commons-lang3</artifactId>

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/GtidUtils.java

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,70 @@
2323
import java.util.HashMap;
2424
import java.util.List;
2525
import java.util.Map;
26+
import java.util.function.Predicate;
2627

2728
/** Utils for handling GTIDs. */
2829
public class GtidUtils {
2930

31+
/**
32+
* Fixes old channels' GTID ranges by filling prefix gaps using server GTID intervals.
33+
*
34+
* <p>This is the shared logic between EARLIEST and LATEST modes. For UUIDs present in the
35+
* checkpoint, non-contiguous GTID ranges are corrected via {@link #fixRestoredGtidSet} to avoid
36+
* MySQL replaying pre-checkpoint transactions.
37+
*
38+
* @param availableServerGtidSet the relevant (filtered) server GTID set
39+
* @param purgedServerGtid the GTID set already purged from the MySQL server
40+
* @param checkpointGtidSet the GTID set restored from checkpoint
41+
* @return the fixed GTID set for old channels
42+
*/
43+
public static GtidSet fixOldChannelsGtidSet(
44+
GtidSet availableServerGtidSet, GtidSet purgedServerGtid, GtidSet checkpointGtidSet) {
45+
return fixRestoredGtidSet(
46+
mergeGtidSetInto(
47+
availableServerGtidSet.retainAll(
48+
uuid -> checkpointGtidSet.forServerWithId(uuid) != null),
49+
purgedServerGtid),
50+
checkpointGtidSet);
51+
}
52+
53+
/**
54+
* Computes the merged GTID set for the LATEST new-channel-position mode.
55+
*
56+
* <p>For old channels (UUIDs present in checkpoint), non-contiguous GTID ranges are fixed via
57+
* {@link #fixOldChannelsGtidSet}. For new channels (UUIDs not in checkpoint), the server's full
58+
* GTID is used to skip all history.
59+
*
60+
* @param availableServerGtidSet the GTID set currently available on the MySQL server
61+
* @param purgedServerGtid the GTID set already purged from the MySQL server
62+
* @param checkpointGtidSet the GTID set restored from checkpoint (after source filter applied)
63+
* @param gtidSourceFilter optional predicate to filter GTID source UUIDs; may be null
64+
* @return the merged GTID set suitable for binlog subscription
65+
*/
66+
public static GtidSet computeLatestModeGtidSet(
67+
GtidSet availableServerGtidSet,
68+
GtidSet purgedServerGtid,
69+
GtidSet checkpointGtidSet,
70+
Predicate<String> gtidSourceFilter) {
71+
final GtidSet relevantAvailableServerGtidSet =
72+
(gtidSourceFilter != null)
73+
? availableServerGtidSet.retainAll(gtidSourceFilter)
74+
: availableServerGtidSet;
75+
76+
// Step 1: Fix old channels' GTID ranges
77+
GtidSet fixedOldChannelsGtid =
78+
fixOldChannelsGtidSet(
79+
relevantAvailableServerGtidSet, purgedServerGtid, checkpointGtidSet);
80+
81+
// Step 2: For new channels, use server's full GTID to skip all history
82+
GtidSet newChannelsGtid =
83+
relevantAvailableServerGtidSet.retainAll(
84+
uuid -> checkpointGtidSet.forServerWithId(uuid) == null);
85+
86+
// Step 3: Merge fixed old channels + new channels
87+
return mergeGtidSetInto(fixedOldChannelsGtid, newChannelsGtid);
88+
}
89+
3090
/**
3191
* This method corrects the GTID set that has been restored from a state or checkpoint using the
3292
* GTID set fetched from the server via SHOW MASTER STATUS. During the correction process, the

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -87,12 +87,17 @@
8787
* Copied from Debezium project(1.9.8.Final) to fix
8888
* https://github.com/ververica/flink-cdc-connectors/issues/1944.
8989
*
90-
* <p>Line 1427-1433 : Adjust GTID merging logic to support recovering from job which previously
91-
* specifying starting offset on start.
90+
* <p>Line 1432-1443 : Adjust GTID merging logic to support recovering from job which previously
91+
* specifying starting offset on start. Uses {@link GtidUtils#fixOldChannelsGtidSet} for shared
92+
* EARLIEST/LATEST logic.
9293
*
93-
* <p>Line 1485 : Add more error details for some exceptions.
94+
* <p>Line 1444-1452 : Fix LATEST mode GTID merging to avoid replaying pre-checkpoint transactions
95+
* when checkpoint GTID has non-contiguous ranges. Delegates to {@link
96+
* GtidUtils#computeLatestModeGtidSet}. See FLINK-39149.
9497
*
95-
* <p>Line 947-958 : Use iterator instead of index-based loop to avoid O(n²) complexity when
98+
* <p>Line 1490 : Add more error details for some exceptions.
99+
*
100+
* <p>Line 951-963 : Use iterator instead of index-based loop to avoid O(n²) complexity when
96101
* processing LinkedList rows in handleChange method. See FLINK-38846.
97102
*/
98103
public class MySqlStreamingChangeEventSource
@@ -1416,7 +1421,6 @@ public GtidSet filterGtidSet(
14161421
GtidSet mergedGtidSet;
14171422

14181423
if (connectorConfig.gtidNewChannelPosition() == GtidNewChannelPosition.EARLIEST) {
1419-
final GtidSet knownGtidSet = filteredGtidSet;
14201424
LOGGER.info("Using first available positions for new GTID channels");
14211425
final GtidSet relevantAvailableServerGtidSet =
14221426
(gtidSourceFilter != null)
@@ -1436,14 +1440,16 @@ public GtidSet filterGtidSet(
14361440
// recorded offset in the checkpoint, and the available GTID for other MySQL instances
14371441
// should be completed.
14381442
mergedGtidSet =
1439-
GtidUtils.fixRestoredGtidSet(
1440-
GtidUtils.mergeGtidSetInto(
1441-
relevantAvailableServerGtidSet.retainAll(
1442-
uuid -> knownGtidSet.forServerWithId(uuid) != null),
1443-
purgedServerGtid),
1444-
filteredGtidSet);
1443+
GtidUtils.fixOldChannelsGtidSet(
1444+
relevantAvailableServerGtidSet, purgedServerGtid, filteredGtidSet);
14451445
} else {
1446-
mergedGtidSet = availableServerGtidSet.with(filteredGtidSet);
1446+
LOGGER.info("Using latest positions for new GTID channels");
1447+
mergedGtidSet =
1448+
GtidUtils.computeLatestModeGtidSet(
1449+
availableServerGtidSet,
1450+
purgedServerGtid,
1451+
filteredGtidSet,
1452+
gtidSourceFilter);
14471453
}
14481454

14491455
LOGGER.info("Final merged GTID set to use when connecting to MySQL: {}", mergedGtidSet);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package io.debezium.connector.mysql;
19+
20+
import io.debezium.connector.mysql.MySqlConnectorConfig.GtidNewChannelPosition;
21+
import org.junit.jupiter.api.Test;
22+
import org.mockito.Mockito;
23+
24+
import java.lang.reflect.Field;
25+
import java.util.function.Predicate;
26+
27+
import static org.assertj.core.api.Assertions.assertThat;
28+
import static org.mockito.Mockito.when;
29+
30+
/**
31+
* Integration test for {@link MySqlStreamingChangeEventSource#filterGtidSet} to ensure the LATEST
32+
* mode fix (FLINK-39149) cannot regress.
33+
*/
34+
class FilterGtidSetTest {
35+
36+
@Test
37+
void testFilterGtidSetLatestModeFixesNonContiguousGtid() throws Exception {
38+
MySqlStreamingChangeEventSource source =
39+
createSourceWithConfig(GtidNewChannelPosition.LATEST, null);
40+
41+
MySqlOffsetContext offsetContext = createOffsetContext("aaa-111:5000-8000");
42+
GtidSet availableServerGtidSet = new GtidSet("aaa-111:1-10000,bbb-222:1-3000");
43+
GtidSet purgedServerGtid = new GtidSet("");
44+
45+
GtidSet result =
46+
source.filterGtidSet(offsetContext, availableServerGtidSet, purgedServerGtid);
47+
48+
assertThat(result.toString()).contains("aaa-111:1-8000");
49+
assertThat(result.toString()).contains("bbb-222:1-3000");
50+
}
51+
52+
@Test
53+
void testFilterGtidSetLatestModeWithSourceFilter() throws Exception {
54+
Predicate<String> excludeCcc = uuid -> !uuid.equals("ccc-333");
55+
MySqlStreamingChangeEventSource source =
56+
createSourceWithConfig(GtidNewChannelPosition.LATEST, excludeCcc);
57+
58+
MySqlOffsetContext offsetContext = createOffsetContext("aaa-111:5000-8000,bbb-222:1-2000");
59+
GtidSet availableServerGtidSet =
60+
new GtidSet("aaa-111:1-10000,bbb-222:1-3000,ccc-333:1-5000");
61+
GtidSet purgedServerGtid = new GtidSet("");
62+
63+
GtidSet result =
64+
source.filterGtidSet(offsetContext, availableServerGtidSet, purgedServerGtid);
65+
66+
assertThat(result.toString()).contains("aaa-111:1-8000");
67+
assertThat(result.toString()).contains("bbb-222:1-2000");
68+
assertThat(result.toString()).doesNotContain("ccc-333");
69+
}
70+
71+
@Test
72+
void testFilterGtidSetEarliestModeNotAffected() throws Exception {
73+
MySqlStreamingChangeEventSource source =
74+
createSourceWithConfig(GtidNewChannelPosition.EARLIEST, null);
75+
76+
MySqlOffsetContext offsetContext = createOffsetContext("aaa-111:5000-8000");
77+
GtidSet availableServerGtidSet = new GtidSet("aaa-111:1-10000,bbb-222:1-3000");
78+
GtidSet purgedServerGtid = new GtidSet("");
79+
80+
GtidSet result =
81+
source.filterGtidSet(offsetContext, availableServerGtidSet, purgedServerGtid);
82+
83+
assertThat(result.toString()).contains("aaa-111:1-8000");
84+
assertThat(result.forServerWithId("bbb-222")).isNull();
85+
}
86+
87+
@Test
88+
void testFilterGtidSetReturnsNullWhenNoGtid() throws Exception {
89+
MySqlStreamingChangeEventSource source =
90+
createSourceWithConfig(GtidNewChannelPosition.LATEST, null);
91+
92+
MySqlOffsetContext offsetContext = createOffsetContext(null);
93+
GtidSet availableServerGtidSet = new GtidSet("aaa-111:1-10000");
94+
GtidSet purgedServerGtid = new GtidSet("");
95+
96+
GtidSet result =
97+
source.filterGtidSet(offsetContext, availableServerGtidSet, purgedServerGtid);
98+
99+
assertThat(result).isNull();
100+
}
101+
102+
private static MySqlStreamingChangeEventSource createSourceWithConfig(
103+
GtidNewChannelPosition channelPosition, Predicate<String> gtidSourceFilter)
104+
throws Exception {
105+
MySqlConnectorConfig mockConfig = Mockito.mock(MySqlConnectorConfig.class);
106+
when(mockConfig.gtidNewChannelPosition()).thenReturn(channelPosition);
107+
when(mockConfig.gtidSourceFilter()).thenReturn(gtidSourceFilter);
108+
109+
MySqlStreamingChangeEventSource source =
110+
Mockito.mock(MySqlStreamingChangeEventSource.class, Mockito.CALLS_REAL_METHODS);
111+
112+
Field configField =
113+
MySqlStreamingChangeEventSource.class.getDeclaredField("connectorConfig");
114+
configField.setAccessible(true);
115+
configField.set(source, mockConfig);
116+
117+
return source;
118+
}
119+
120+
private static MySqlOffsetContext createOffsetContext(String gtidSetStr) {
121+
MySqlOffsetContext offsetContext = Mockito.mock(MySqlOffsetContext.class);
122+
when(offsetContext.gtidSet()).thenReturn(gtidSetStr);
123+
return offsetContext;
124+
}
125+
}

0 commit comments

Comments
 (0)