Skip to content

Commit b5a7e4d

Browse files
committed
[FLINK-39149][mysql-cdc] Implement LATEST mode GTID merging to prevent replaying pre-checkpoint transactions with non-contiguous ranges
1 parent e779015 commit b5a7e4d

5 files changed

Lines changed: 395 additions & 8 deletions

File tree

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/GtidUtils.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,52 @@
2323
import java.util.HashMap;
2424
import java.util.List;
2525
import java.util.Map;
26+
import java.util.function.Predicate;
2627

2728
/** Utils for handling GTIDs. */
2829
public class GtidUtils {
2930

31+
/**
32+
* Computes the merged GTID set for the LATEST new-channel-position mode.
33+
*
34+
* <p>For old channels (UUIDs present in checkpoint), non-contiguous GTID ranges are fixed via
35+
* {@link #fixRestoredGtidSet} to avoid MySQL replaying pre-checkpoint transactions. For new
36+
* channels (UUIDs not in checkpoint), the server's full GTID is used to skip all history.
37+
*
38+
* @param availableServerGtidSet the GTID set currently available on the MySQL server
39+
* @param purgedServerGtid the GTID set already purged from the MySQL server
40+
* @param checkpointGtidSet the GTID set restored from checkpoint (after source filter applied)
41+
* @param gtidSourceFilter optional predicate to filter GTID source UUIDs; may be null
42+
* @return the merged GTID set suitable for binlog subscription
43+
*/
44+
public static GtidSet computeLatestModeGtidSet(
45+
GtidSet availableServerGtidSet,
46+
GtidSet purgedServerGtid,
47+
GtidSet checkpointGtidSet,
48+
Predicate<String> gtidSourceFilter) {
49+
final GtidSet relevantAvailableServerGtidSet =
50+
(gtidSourceFilter != null)
51+
? availableServerGtidSet.retainAll(gtidSourceFilter)
52+
: availableServerGtidSet;
53+
54+
// Step 1: Fix old channels' GTID ranges
55+
GtidSet fixedOldChannelsGtid =
56+
fixRestoredGtidSet(
57+
mergeGtidSetInto(
58+
relevantAvailableServerGtidSet.retainAll(
59+
uuid -> checkpointGtidSet.forServerWithId(uuid) != null),
60+
purgedServerGtid),
61+
checkpointGtidSet);
62+
63+
// Step 2: For new channels, use server's full GTID to skip all history
64+
GtidSet newChannelsGtid =
65+
relevantAvailableServerGtidSet.retainAll(
66+
uuid -> checkpointGtidSet.forServerWithId(uuid) == null);
67+
68+
// Step 3: Merge fixed old channels + new channels
69+
return mergeGtidSetInto(fixedOldChannelsGtid, newChannelsGtid);
70+
}
71+
3072
/**
3173
* This method corrects the GTID set that has been restored from a state or checkpoint using the
3274
* GTID set fetched from the server via SHOW MASTER STATUS. During the correction process, the

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -87,12 +87,16 @@
8787
* Copied from Debezium project(1.9.8.Final) to fix
8888
* https://github.com/ververica/flink-cdc-connectors/issues/1944.
8989
*
90-
* <p>Line 1427-1433 : Adjust GTID merging logic to support recovering from job which previously
90+
* <p>Line 1432-1448 : Adjust GTID merging logic to support recovering from job which previously
9191
* specifying starting offset on start.
9292
*
93-
* <p>Line 1485 : Add more error details for some exceptions.
93+
* <p>Line 1449-1457 : Fix LATEST mode GTID merging to avoid replaying pre-checkpoint transactions
94+
* when checkpoint GTID has non-contiguous ranges. Delegates to {@link
95+
* GtidUtils#computeLatestModeGtidSet}. See FLINK-39149.
9496
*
95-
* <p>Line 947-958 : Use iterator instead of index-based loop to avoid O(n²) complexity when
97+
* <p>Line 1508 : Add more error details for some exceptions.
98+
*
99+
* <p>Line 951-963 : Use iterator instead of index-based loop to avoid O(n²) complexity when
96100
* processing LinkedList rows in handleChange method. See FLINK-38846.
97101
*/
98102
public class MySqlStreamingChangeEventSource
@@ -1443,7 +1447,13 @@ public GtidSet filterGtidSet(
14431447
purgedServerGtid),
14441448
filteredGtidSet);
14451449
} else {
1446-
mergedGtidSet = availableServerGtidSet.with(filteredGtidSet);
1450+
LOGGER.info("Using latest positions for new GTID channels");
1451+
mergedGtidSet =
1452+
GtidUtils.computeLatestModeGtidSet(
1453+
availableServerGtidSet,
1454+
purgedServerGtid,
1455+
filteredGtidSet,
1456+
gtidSourceFilter);
14471457
}
14481458

14491459
LOGGER.info("Final merged GTID set to use when connecting to MySQL: {}", mergedGtidSet);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package io.debezium.connector.mysql;
19+
20+
import io.debezium.config.Configuration;
21+
import io.debezium.connector.mysql.MySqlConnectorConfig.GtidNewChannelPosition;
22+
import io.debezium.jdbc.JdbcConfiguration;
23+
import org.junit.jupiter.api.Test;
24+
25+
import java.lang.reflect.Field;
26+
import java.util.HashMap;
27+
import java.util.Map;
28+
29+
import static org.assertj.core.api.Assertions.assertThat;
30+
31+
/**
32+
* Integration test for {@link MySqlStreamingChangeEventSource#filterGtidSet} to ensure the LATEST
33+
* mode fix (FLINK-39149) cannot regress. This test directly invokes the production method via
34+
* reflection, bypassing the heavy constructor dependencies.
35+
*/
36+
class FilterGtidSetTest {
37+
38+
/**
39+
* Verifies that filterGtidSet() in LATEST mode fixes non-contiguous checkpoint GTIDs for old
40+
* channels and uses server's full GTID for new channels.
41+
*
42+
* <p>This is the core FLINK-39149 bug scenario: checkpoint has "aaa-111:5000-8000" (gap
43+
* 1-4999), and without the fix, MySQL would replay transactions 1-4999.
44+
*/
45+
@Test
46+
void testFilterGtidSetLatestModeFixesNonContiguousGtid() throws Exception {
47+
MySqlStreamingChangeEventSource source = createSourceWithConfig("latest", null);
48+
49+
MySqlOffsetContext offsetContext = createOffsetContext(source, "aaa-111:5000-8000");
50+
GtidSet availableServerGtidSet = new GtidSet("aaa-111:1-10000,bbb-222:1-3000");
51+
GtidSet purgedServerGtid = new GtidSet("");
52+
53+
GtidSet result =
54+
source.filterGtidSet(offsetContext, availableServerGtidSet, purgedServerGtid);
55+
56+
// Old channel aaa-111: gap should be filled from :1 to 8000
57+
assertThat(result.toString()).contains("aaa-111:1-8000");
58+
// New channel bbb-222: should use server's full GTID
59+
assertThat(result.toString()).contains("bbb-222:1-3000");
60+
}
61+
62+
/**
63+
* Verifies that filterGtidSet() in LATEST mode with gtidSourceFilter excludes filtered UUIDs.
64+
*/
65+
@Test
66+
void testFilterGtidSetLatestModeWithSourceFilter() throws Exception {
67+
MySqlStreamingChangeEventSource source = createSourceWithConfig("latest", "ccc-333");
68+
69+
MySqlOffsetContext offsetContext =
70+
createOffsetContext(source, "aaa-111:5000-8000,bbb-222:1-2000");
71+
GtidSet availableServerGtidSet =
72+
new GtidSet("aaa-111:1-10000,bbb-222:1-3000,ccc-333:1-5000");
73+
GtidSet purgedServerGtid = new GtidSet("");
74+
75+
GtidSet result =
76+
source.filterGtidSet(offsetContext, availableServerGtidSet, purgedServerGtid);
77+
78+
assertThat(result.toString()).contains("aaa-111:1-8000");
79+
assertThat(result.toString()).contains("bbb-222:1-2000");
80+
// ccc-333 should be excluded
81+
assertThat(result.toString()).doesNotContain("ccc-333");
82+
}
83+
84+
/**
85+
* Verifies that filterGtidSet() in EARLIEST mode still works correctly (no regression from
86+
* LATEST mode fix).
87+
*/
88+
@Test
89+
void testFilterGtidSetEarliestModeNotAffected() throws Exception {
90+
MySqlStreamingChangeEventSource source = createSourceWithConfig("earliest", null);
91+
92+
MySqlOffsetContext offsetContext = createOffsetContext(source, "aaa-111:5000-8000");
93+
GtidSet availableServerGtidSet = new GtidSet("aaa-111:1-10000,bbb-222:1-3000");
94+
GtidSet purgedServerGtid = new GtidSet("");
95+
96+
GtidSet result =
97+
source.filterGtidSet(offsetContext, availableServerGtidSet, purgedServerGtid);
98+
99+
// EARLIEST mode: old channel should be fixed
100+
assertThat(result.toString()).contains("aaa-111:1-8000");
101+
// EARLIEST mode does NOT add new channels (different from LATEST)
102+
assertThat(result.forServerWithId("bbb-222")).isNull();
103+
}
104+
105+
/** Verifies that filterGtidSet() returns null when offsetContext has no GTID. */
106+
@Test
107+
void testFilterGtidSetReturnsNullWhenNoGtid() throws Exception {
108+
MySqlStreamingChangeEventSource source = createSourceWithConfig("latest", null);
109+
110+
MySqlOffsetContext offsetContext = createOffsetContext(source, null);
111+
GtidSet availableServerGtidSet = new GtidSet("aaa-111:1-10000");
112+
GtidSet purgedServerGtid = new GtidSet("");
113+
114+
GtidSet result =
115+
source.filterGtidSet(offsetContext, availableServerGtidSet, purgedServerGtid);
116+
117+
assertThat(result).isNull();
118+
}
119+
120+
// ---- Helper methods ----
121+
122+
/**
123+
* Creates a MySqlStreamingChangeEventSource via Unsafe (bypassing constructor) and injects the
124+
* connectorConfig field via reflection.
125+
*/
126+
@SuppressWarnings("restriction")
127+
private static MySqlStreamingChangeEventSource createSourceWithConfig(
128+
String gtidNewChannelPosition, String gtidSourceExcludes) throws Exception {
129+
// Build Debezium Configuration
130+
JdbcConfiguration.Builder builder =
131+
JdbcConfiguration.create().with("database.server.name", "test_server");
132+
if (gtidNewChannelPosition != null) {
133+
builder = builder.with("gtid.new.channel.position", gtidNewChannelPosition);
134+
}
135+
if (gtidSourceExcludes != null) {
136+
builder = builder.with("gtid.source.excludes", gtidSourceExcludes);
137+
}
138+
Configuration dezConf = builder.build();
139+
MySqlConnectorConfig connectorConfig = new MySqlConnectorConfig(dezConf);
140+
141+
// Verify config is as expected
142+
if ("latest".equalsIgnoreCase(gtidNewChannelPosition)) {
143+
assertThat(connectorConfig.gtidNewChannelPosition())
144+
.isEqualTo(GtidNewChannelPosition.LATEST);
145+
} else if ("earliest".equalsIgnoreCase(gtidNewChannelPosition)) {
146+
assertThat(connectorConfig.gtidNewChannelPosition())
147+
.isEqualTo(GtidNewChannelPosition.EARLIEST);
148+
}
149+
150+
// Create instance via Unsafe to bypass heavy constructor
151+
sun.misc.Unsafe unsafe = getUnsafe();
152+
MySqlStreamingChangeEventSource source =
153+
(MySqlStreamingChangeEventSource)
154+
unsafe.allocateInstance(MySqlStreamingChangeEventSource.class);
155+
156+
// Inject connectorConfig via reflection
157+
Field configField =
158+
MySqlStreamingChangeEventSource.class.getDeclaredField("connectorConfig");
159+
configField.setAccessible(true);
160+
configField.set(source, connectorConfig);
161+
162+
return source;
163+
}
164+
165+
private static MySqlOffsetContext createOffsetContext(
166+
MySqlStreamingChangeEventSource source, String gtidSetStr) throws Exception {
167+
Field configField =
168+
MySqlStreamingChangeEventSource.class.getDeclaredField("connectorConfig");
169+
configField.setAccessible(true);
170+
MySqlConnectorConfig config = (MySqlConnectorConfig) configField.get(source);
171+
172+
if (gtidSetStr == null) {
173+
// Return an offset context without GTID (gtidSet() returns null)
174+
Map<String, Object> offsetMap = new HashMap<>();
175+
offsetMap.put("file", "mysql-bin.000001");
176+
offsetMap.put("pos", 4L);
177+
return new MySqlOffsetContext.Loader(config).load(offsetMap);
178+
}
179+
180+
Map<String, Object> offsetMap = new HashMap<>();
181+
offsetMap.put("file", "mysql-bin.000001");
182+
offsetMap.put("pos", 4L);
183+
offsetMap.put("gtids", gtidSetStr);
184+
return new MySqlOffsetContext.Loader(config).load(offsetMap);
185+
}
186+
187+
@SuppressWarnings("restriction")
188+
private static sun.misc.Unsafe getUnsafe() throws Exception {
189+
Field unsafeField = sun.misc.Unsafe.class.getDeclaredField("theUnsafe");
190+
unsafeField.setAccessible(true);
191+
return (sun.misc.Unsafe) unsafeField.get(null);
192+
}
193+
}

0 commit comments

Comments
 (0)