Skip to content

Commit 5a919c2

Browse files
committed
[FLINK-39149][mysql-cdc] Implement LATEST mode GTID merging to prevent replaying pre-checkpoint transactions with non-contiguous ranges
1 parent b5a7e4d commit 5a919c2

2 files changed

Lines changed: 40 additions & 97 deletions

File tree

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/pom.xml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@ limitations under the License.
2929
<name>flink-connector-mysql-cdc</name>
3030
<packaging>jar</packaging>
3131

32+
<properties>
33+
<mockito.version>3.12.4</mockito.version>
34+
</properties>
35+
3236
<dependencies>
3337

3438
<!-- Debezium dependencies -->
@@ -183,6 +187,13 @@ limitations under the License.
183187
<scope>test</scope>
184188
</dependency>
185189

190+
<dependency>
191+
<groupId>org.mockito</groupId>
192+
<artifactId>mockito-core</artifactId>
193+
<version>${mockito.version}</version>
194+
<scope>test</scope>
195+
</dependency>
196+
186197
<dependency>
187198
<groupId>org.apache.commons</groupId>
188199
<artifactId>commons-lang3</artifactId>

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/io/debezium/connector/mysql/FilterGtidSetTest.java

Lines changed: 29 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -17,57 +17,45 @@
1717

1818
package io.debezium.connector.mysql;
1919

20-
import io.debezium.config.Configuration;
2120
import io.debezium.connector.mysql.MySqlConnectorConfig.GtidNewChannelPosition;
22-
import io.debezium.jdbc.JdbcConfiguration;
2321
import org.junit.jupiter.api.Test;
22+
import org.mockito.Mockito;
2423

2524
import java.lang.reflect.Field;
26-
import java.util.HashMap;
27-
import java.util.Map;
25+
import java.util.function.Predicate;
2826

2927
import static org.assertj.core.api.Assertions.assertThat;
28+
import static org.mockito.Mockito.when;
3029

3130
/**
3231
* Integration test for {@link MySqlStreamingChangeEventSource#filterGtidSet} to ensure the LATEST
33-
* mode fix (FLINK-39149) cannot regress. This test directly invokes the production method via
34-
* reflection, bypassing the heavy constructor dependencies.
32+
* mode fix (FLINK-39149) cannot regress.
3533
*/
3634
class FilterGtidSetTest {
3735

38-
/**
39-
* Verifies that filterGtidSet() in LATEST mode fixes non-contiguous checkpoint GTIDs for old
40-
* channels and uses server's full GTID for new channels.
41-
*
42-
* <p>This is the core FLINK-39149 bug scenario: checkpoint has "aaa-111:5000-8000" (gap
43-
* 1-4999), and without the fix, MySQL would replay transactions 1-4999.
44-
*/
4536
@Test
4637
void testFilterGtidSetLatestModeFixesNonContiguousGtid() throws Exception {
47-
MySqlStreamingChangeEventSource source = createSourceWithConfig("latest", null);
38+
MySqlStreamingChangeEventSource source =
39+
createSourceWithConfig(GtidNewChannelPosition.LATEST, null);
4840

49-
MySqlOffsetContext offsetContext = createOffsetContext(source, "aaa-111:5000-8000");
41+
MySqlOffsetContext offsetContext = createOffsetContext("aaa-111:5000-8000");
5042
GtidSet availableServerGtidSet = new GtidSet("aaa-111:1-10000,bbb-222:1-3000");
5143
GtidSet purgedServerGtid = new GtidSet("");
5244

5345
GtidSet result =
5446
source.filterGtidSet(offsetContext, availableServerGtidSet, purgedServerGtid);
5547

56-
// Old channel aaa-111: gap should be filled from :1 to 8000
5748
assertThat(result.toString()).contains("aaa-111:1-8000");
58-
// New channel bbb-222: should use server's full GTID
5949
assertThat(result.toString()).contains("bbb-222:1-3000");
6050
}
6151

62-
/**
63-
* Verifies that filterGtidSet() in LATEST mode with gtidSourceFilter excludes filtered UUIDs.
64-
*/
6552
@Test
6653
void testFilterGtidSetLatestModeWithSourceFilter() throws Exception {
67-
MySqlStreamingChangeEventSource source = createSourceWithConfig("latest", "ccc-333");
54+
Predicate<String> excludeCcc = uuid -> !uuid.equals("ccc-333");
55+
MySqlStreamingChangeEventSource source =
56+
createSourceWithConfig(GtidNewChannelPosition.LATEST, excludeCcc);
6857

69-
MySqlOffsetContext offsetContext =
70-
createOffsetContext(source, "aaa-111:5000-8000,bbb-222:1-2000");
58+
MySqlOffsetContext offsetContext = createOffsetContext("aaa-111:5000-8000,bbb-222:1-2000");
7159
GtidSet availableServerGtidSet =
7260
new GtidSet("aaa-111:1-10000,bbb-222:1-3000,ccc-333:1-5000");
7361
GtidSet purgedServerGtid = new GtidSet("");
@@ -77,37 +65,31 @@ void testFilterGtidSetLatestModeWithSourceFilter() throws Exception {
7765

7866
assertThat(result.toString()).contains("aaa-111:1-8000");
7967
assertThat(result.toString()).contains("bbb-222:1-2000");
80-
// ccc-333 should be excluded
8168
assertThat(result.toString()).doesNotContain("ccc-333");
8269
}
8370

84-
/**
85-
* Verifies that filterGtidSet() in EARLIEST mode still works correctly (no regression from
86-
* LATEST mode fix).
87-
*/
8871
@Test
8972
void testFilterGtidSetEarliestModeNotAffected() throws Exception {
90-
MySqlStreamingChangeEventSource source = createSourceWithConfig("earliest", null);
73+
MySqlStreamingChangeEventSource source =
74+
createSourceWithConfig(GtidNewChannelPosition.EARLIEST, null);
9175

92-
MySqlOffsetContext offsetContext = createOffsetContext(source, "aaa-111:5000-8000");
76+
MySqlOffsetContext offsetContext = createOffsetContext("aaa-111:5000-8000");
9377
GtidSet availableServerGtidSet = new GtidSet("aaa-111:1-10000,bbb-222:1-3000");
9478
GtidSet purgedServerGtid = new GtidSet("");
9579

9680
GtidSet result =
9781
source.filterGtidSet(offsetContext, availableServerGtidSet, purgedServerGtid);
9882

99-
// EARLIEST mode: old channel should be fixed
10083
assertThat(result.toString()).contains("aaa-111:1-8000");
101-
// EARLIEST mode does NOT add new channels (different from LATEST)
10284
assertThat(result.forServerWithId("bbb-222")).isNull();
10385
}
10486

105-
/** Verifies that filterGtidSet() returns null when offsetContext has no GTID. */
10687
@Test
10788
void testFilterGtidSetReturnsNullWhenNoGtid() throws Exception {
108-
MySqlStreamingChangeEventSource source = createSourceWithConfig("latest", null);
89+
MySqlStreamingChangeEventSource source =
90+
createSourceWithConfig(GtidNewChannelPosition.LATEST, null);
10991

110-
MySqlOffsetContext offsetContext = createOffsetContext(source, null);
92+
MySqlOffsetContext offsetContext = createOffsetContext(null);
11193
GtidSet availableServerGtidSet = new GtidSet("aaa-111:1-10000");
11294
GtidSet purgedServerGtid = new GtidSet("");
11395

@@ -117,77 +99,27 @@ void testFilterGtidSetReturnsNullWhenNoGtid() throws Exception {
11799
assertThat(result).isNull();
118100
}
119101

120-
// ---- Helper methods ----
121-
122-
/**
123-
* Creates a MySqlStreamingChangeEventSource via Unsafe (bypassing constructor) and injects the
124-
* connectorConfig field via reflection.
125-
*/
126-
@SuppressWarnings("restriction")
127102
private static MySqlStreamingChangeEventSource createSourceWithConfig(
128-
String gtidNewChannelPosition, String gtidSourceExcludes) throws Exception {
129-
// Build Debezium Configuration
130-
JdbcConfiguration.Builder builder =
131-
JdbcConfiguration.create().with("database.server.name", "test_server");
132-
if (gtidNewChannelPosition != null) {
133-
builder = builder.with("gtid.new.channel.position", gtidNewChannelPosition);
134-
}
135-
if (gtidSourceExcludes != null) {
136-
builder = builder.with("gtid.source.excludes", gtidSourceExcludes);
137-
}
138-
Configuration dezConf = builder.build();
139-
MySqlConnectorConfig connectorConfig = new MySqlConnectorConfig(dezConf);
140-
141-
// Verify config is as expected
142-
if ("latest".equalsIgnoreCase(gtidNewChannelPosition)) {
143-
assertThat(connectorConfig.gtidNewChannelPosition())
144-
.isEqualTo(GtidNewChannelPosition.LATEST);
145-
} else if ("earliest".equalsIgnoreCase(gtidNewChannelPosition)) {
146-
assertThat(connectorConfig.gtidNewChannelPosition())
147-
.isEqualTo(GtidNewChannelPosition.EARLIEST);
148-
}
149-
150-
// Create instance via Unsafe to bypass heavy constructor
151-
sun.misc.Unsafe unsafe = getUnsafe();
103+
GtidNewChannelPosition channelPosition, Predicate<String> gtidSourceFilter)
104+
throws Exception {
105+
MySqlConnectorConfig mockConfig = Mockito.mock(MySqlConnectorConfig.class);
106+
when(mockConfig.gtidNewChannelPosition()).thenReturn(channelPosition);
107+
when(mockConfig.gtidSourceFilter()).thenReturn(gtidSourceFilter);
108+
152109
MySqlStreamingChangeEventSource source =
153-
(MySqlStreamingChangeEventSource)
154-
unsafe.allocateInstance(MySqlStreamingChangeEventSource.class);
110+
Mockito.mock(MySqlStreamingChangeEventSource.class, Mockito.CALLS_REAL_METHODS);
155111

156-
// Inject connectorConfig via reflection
157112
Field configField =
158113
MySqlStreamingChangeEventSource.class.getDeclaredField("connectorConfig");
159114
configField.setAccessible(true);
160-
configField.set(source, connectorConfig);
115+
configField.set(source, mockConfig);
161116

162117
return source;
163118
}
164119

165-
private static MySqlOffsetContext createOffsetContext(
166-
MySqlStreamingChangeEventSource source, String gtidSetStr) throws Exception {
167-
Field configField =
168-
MySqlStreamingChangeEventSource.class.getDeclaredField("connectorConfig");
169-
configField.setAccessible(true);
170-
MySqlConnectorConfig config = (MySqlConnectorConfig) configField.get(source);
171-
172-
if (gtidSetStr == null) {
173-
// Return an offset context without GTID (gtidSet() returns null)
174-
Map<String, Object> offsetMap = new HashMap<>();
175-
offsetMap.put("file", "mysql-bin.000001");
176-
offsetMap.put("pos", 4L);
177-
return new MySqlOffsetContext.Loader(config).load(offsetMap);
178-
}
179-
180-
Map<String, Object> offsetMap = new HashMap<>();
181-
offsetMap.put("file", "mysql-bin.000001");
182-
offsetMap.put("pos", 4L);
183-
offsetMap.put("gtids", gtidSetStr);
184-
return new MySqlOffsetContext.Loader(config).load(offsetMap);
185-
}
186-
187-
@SuppressWarnings("restriction")
188-
private static sun.misc.Unsafe getUnsafe() throws Exception {
189-
Field unsafeField = sun.misc.Unsafe.class.getDeclaredField("theUnsafe");
190-
unsafeField.setAccessible(true);
191-
return (sun.misc.Unsafe) unsafeField.get(null);
120+
private static MySqlOffsetContext createOffsetContext(String gtidSetStr) {
121+
MySqlOffsetContext offsetContext = Mockito.mock(MySqlOffsetContext.class);
122+
when(offsetContext.gtidSet()).thenReturn(gtidSetStr);
123+
return offsetContext;
192124
}
193125
}

0 commit comments

Comments
 (0)