Skip to content

Commit e01a36f

Browse files
committed
[FLINK-38218] Extract duplicate split ID check
1 parent ee9d6e1 commit e01a36f

1 file changed

Lines changed: 16 additions & 11 deletions

File tree

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/split/MySqlBinlogSplit.java

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -63,17 +63,7 @@ public MySqlBinlogSplit(
6363
boolean isSuspended) {
6464
super(splitId);
6565

66-
Set<String> seenSplitIds = new HashSet<>();
67-
for (FinishedSnapshotSplitInfo splitInfo : finishedSnapshotSplitInfos) {
68-
if (seenSplitIds.contains(splitInfo.getSplitId())) {
69-
throw new IllegalArgumentException(
70-
String.format(
71-
"Found duplicate split ID %s in finished snapshot split infos",
72-
splitInfo.getSplitId()));
73-
}
74-
75-
seenSplitIds.add(splitInfo.getSplitId());
76-
}
66+
ensureNoDuplicates(finishedSnapshotSplitInfos);
7767

7868
this.startingOffset = startingOffset;
7969
this.endingOffset = endingOffset;
@@ -101,6 +91,21 @@ public MySqlBinlogSplit(
10191
false);
10292
}
10393

94+
private static void ensureNoDuplicates(
95+
List<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos) {
96+
Set<String> seenSplitIds = new HashSet<>();
97+
for (FinishedSnapshotSplitInfo splitInfo : finishedSnapshotSplitInfos) {
98+
if (seenSplitIds.contains(splitInfo.getSplitId())) {
99+
throw new IllegalArgumentException(
100+
String.format(
101+
"Found duplicate split ID %s in finished snapshot split infos",
102+
splitInfo.getSplitId()));
103+
}
104+
105+
seenSplitIds.add(splitInfo.getSplitId());
106+
}
107+
}
108+
104109
public BinlogOffset getStartingOffset() {
105110
return startingOffset;
106111
}

0 commit comments

Comments
 (0)