Skip to content

Commit b15f7db

Browse files
committed
update
1 parent 5a919c2 commit b15f7db

3 files changed

Lines changed: 40 additions & 30 deletions

File tree

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/GtidUtils.java

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,34 @@
2828
/** Utils for handling GTIDs. */
2929
public class GtidUtils {
3030

31+
/**
32+
* Fixes old channels' GTID ranges by filling prefix gaps using server GTID intervals.
33+
*
34+
* <p>This is the shared logic between EARLIEST and LATEST modes. For UUIDs present in the
35+
* checkpoint, non-contiguous GTID ranges are corrected via {@link #fixRestoredGtidSet} to avoid
36+
* MySQL replaying pre-checkpoint transactions.
37+
*
38+
* @param availableServerGtidSet the relevant (filtered) server GTID set
39+
* @param purgedServerGtid the GTID set already purged from the MySQL server
40+
* @param checkpointGtidSet the GTID set restored from checkpoint
41+
* @return the fixed GTID set for old channels
42+
*/
43+
public static GtidSet fixOldChannelsGtidSet(
44+
GtidSet availableServerGtidSet, GtidSet purgedServerGtid, GtidSet checkpointGtidSet) {
45+
return fixRestoredGtidSet(
46+
mergeGtidSetInto(
47+
availableServerGtidSet.retainAll(
48+
uuid -> checkpointGtidSet.forServerWithId(uuid) != null),
49+
purgedServerGtid),
50+
checkpointGtidSet);
51+
}
52+
3153
/**
3254
* Computes the merged GTID set for the LATEST new-channel-position mode.
3355
*
3456
* <p>For old channels (UUIDs present in checkpoint), non-contiguous GTID ranges are fixed via
35-
* {@link #fixRestoredGtidSet} to avoid MySQL replaying pre-checkpoint transactions. For new
36-
* channels (UUIDs not in checkpoint), the server's full GTID is used to skip all history.
57+
* {@link #fixOldChannelsGtidSet}. For new channels (UUIDs not in checkpoint), the server's full
58+
* GTID is used to skip all history.
3759
*
3860
* @param availableServerGtidSet the GTID set currently available on the MySQL server
3961
* @param purgedServerGtid the GTID set already purged from the MySQL server
@@ -53,12 +75,8 @@ public static GtidSet computeLatestModeGtidSet(
5375

5476
// Step 1: Fix old channels' GTID ranges
5577
GtidSet fixedOldChannelsGtid =
56-
fixRestoredGtidSet(
57-
mergeGtidSetInto(
58-
relevantAvailableServerGtidSet.retainAll(
59-
uuid -> checkpointGtidSet.forServerWithId(uuid) != null),
60-
purgedServerGtid),
61-
checkpointGtidSet);
78+
fixOldChannelsGtidSet(
79+
relevantAvailableServerGtidSet, purgedServerGtid, checkpointGtidSet);
6280

6381
// Step 2: For new channels, use server's full GTID to skip all history
6482
GtidSet newChannelsGtid =

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -87,14 +87,15 @@
8787
* Copied from Debezium project(1.9.8.Final) to fix
8888
* https://github.com/ververica/flink-cdc-connectors/issues/1944.
8989
*
90-
* <p>Line 1432-1448 : Adjust GTID merging logic to support recovering from job which previously
91-
* specifying starting offset on start.
90+
* <p>Line 1432-1443 : Adjust GTID merging logic to support recovering from job which previously
91+
* specifying starting offset on start. Uses {@link GtidUtils#fixOldChannelsGtidSet} for shared
92+
* EARLIEST/LATEST logic.
9293
*
93-
* <p>Line 1449-1457 : Fix LATEST mode GTID merging to avoid replaying pre-checkpoint transactions
94+
* <p>Line 1444-1452 : Fix LATEST mode GTID merging to avoid replaying pre-checkpoint transactions
9495
* when checkpoint GTID has non-contiguous ranges. Delegates to {@link
9596
* GtidUtils#computeLatestModeGtidSet}. See FLINK-39149.
9697
*
97-
* <p>Line 1508 : Add more error details for some exceptions.
98+
* <p>Line 1490 : Add more error details for some exceptions.
9899
*
99100
* <p>Line 951-963 : Use iterator instead of index-based loop to avoid O(n²) complexity when
100101
* processing LinkedList rows in handleChange method. See FLINK-38846.
@@ -1420,7 +1421,6 @@ public GtidSet filterGtidSet(
14201421
GtidSet mergedGtidSet;
14211422

14221423
if (connectorConfig.gtidNewChannelPosition() == GtidNewChannelPosition.EARLIEST) {
1423-
final GtidSet knownGtidSet = filteredGtidSet;
14241424
LOGGER.info("Using first available positions for new GTID channels");
14251425
final GtidSet relevantAvailableServerGtidSet =
14261426
(gtidSourceFilter != null)
@@ -1440,12 +1440,8 @@ public GtidSet filterGtidSet(
14401440
// recorded offset in the checkpoint, and the available GTID for other MySQL instances
14411441
// should be completed.
14421442
mergedGtidSet =
1443-
GtidUtils.fixRestoredGtidSet(
1444-
GtidUtils.mergeGtidSetInto(
1445-
relevantAvailableServerGtidSet.retainAll(
1446-
uuid -> knownGtidSet.forServerWithId(uuid) != null),
1447-
purgedServerGtid),
1448-
filteredGtidSet);
1443+
GtidUtils.fixOldChannelsGtidSet(
1444+
relevantAvailableServerGtidSet, purgedServerGtid, filteredGtidSet);
14491445
} else {
14501446
LOGGER.info("Using latest positions for new GTID channels");
14511447
mergedGtidSet =

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -89,14 +89,15 @@
8989
* <p>Copied from Debezium project(1.9.8.Final) to fix
9090
* https://github.com/ververica/flink-cdc-connectors/issues/1944.
9191
*
92-
* <p>Line 1437-1453 : Adjust GTID merging logic to support recovering from job which previously
93-
* specifying starting offset on start.
92+
* <p>Line 1438-1449 : Adjust GTID merging logic to support recovering from job which previously
93+
* specifying starting offset on start. Uses {@link GtidUtils#fixOldChannelsGtidSet} for shared
94+
* EARLIEST/LATEST logic.
9495
*
95-
* <p>Line 1454-1462 : Fix LATEST mode GTID merging to avoid replaying pre-checkpoint transactions
96+
* <p>Line 1450-1458 : Fix LATEST mode GTID merging to avoid replaying pre-checkpoint transactions
9697
* when checkpoint GTID has non-contiguous ranges. Delegates to {@link
9798
* GtidUtils#computeLatestModeGtidSet}. See FLINK-39149.
9899
*
99-
* <p>Line 1513 : Add more error details for some exceptions.
100+
* <p>Line 1496 : Add more error details for some exceptions.
100101
*
101102
* <p>Line 956-968 : Use iterator instead of index-based loop to avoid O(n²) complexity when
102103
* processing LinkedList rows in handleChange method. See FLINK-38846.
@@ -1425,7 +1426,6 @@ public GtidSet filterGtidSet(
14251426
GtidSet mergedGtidSet;
14261427

14271428
if (connectorConfig.gtidNewChannelPosition() == GtidNewChannelPosition.EARLIEST) {
1428-
final GtidSet knownGtidSet = filteredGtidSet;
14291429
LOGGER.info("Using first available positions for new GTID channels");
14301430
final GtidSet relevantAvailableServerGtidSet =
14311431
(gtidSourceFilter != null)
@@ -1445,12 +1445,8 @@ public GtidSet filterGtidSet(
14451445
// recorded offset in the checkpoint, and the available GTID for other MySQL instances
14461446
// should be completed.
14471447
mergedGtidSet =
1448-
GtidUtils.fixRestoredGtidSet(
1449-
GtidUtils.mergeGtidSetInto(
1450-
relevantAvailableServerGtidSet.retainAll(
1451-
uuid -> knownGtidSet.forServerWithId(uuid) != null),
1452-
purgedServerGtid),
1453-
filteredGtidSet);
1448+
GtidUtils.fixOldChannelsGtidSet(
1449+
relevantAvailableServerGtidSet, purgedServerGtid, filteredGtidSet);
14541450
} else {
14551451
LOGGER.info("Using latest positions for new GTID channels");
14561452
mergedGtidSet =

0 commit comments

Comments
 (0)