Skip to content

Commit 23e23d6

Browse files
authored
[FLINK-39372][connect/mysql] Fix comparison logic for binlog filename with various digits (#4358)
1 parent 209c0c6 commit 23e23d6

2 files changed

Lines changed: 58 additions & 30 deletions

File tree

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/offset/BinlogOffset.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,9 @@ public int compareTo(BinlogOffset that) {
245245
if (this.getFilename() != null
246246
&& that.getFilename() != null
247247
&& this.getFilename().compareToIgnoreCase(that.getFilename()) != 0) {
248+
if (this.getFilename().length() != that.getFilename().length()) {
249+
return Integer.compare(this.getFilename().length(), that.getFilename().length());
250+
}
248251
return this.getFilename().compareToIgnoreCase(that.getFilename());
249252
}
250253

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/offset/BinlogOffsetTest.java

Lines changed: 55 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public void testCompareToWithGtidSet() {
3535
BinlogOffset.builder()
3636
.setGtidSet(String.join(",", PART_OF_GTID_SET_2, PART_OF_GTID_SET_1))
3737
.build();
38-
assetCompareTo(offset1, offset2, 0);
38+
assertCompareTo(offset1, offset2, 0);
3939

4040
// The test uses GTID instead of position for comparison.
4141
offset1 =
@@ -48,7 +48,7 @@ public void testCompareToWithGtidSet() {
4848
.setGtidSet(String.join(",", PART_OF_GTID_SET_2, PART_OF_GTID_SET_1))
4949
.setBinlogFilePosition("binlog.001", 456)
5050
.build();
51-
assetCompareTo(offset1, offset2, 0);
51+
assertCompareTo(offset1, offset2, 0);
5252

5353
// Test different GTID sets where one contains another
5454
BinlogOffset offset3 = BinlogOffset.builder().setGtidSet(PART_OF_GTID_SET_1).build();
@@ -58,8 +58,8 @@ public void testCompareToWithGtidSet() {
5858
.build();
5959

6060
// offset3 should be before offset4
61-
assetCompareTo(offset3, offset4, -1);
62-
assetCompareTo(offset4, offset3, 1);
61+
assertCompareTo(offset3, offset4, -1);
62+
assertCompareTo(offset4, offset3, 1);
6363

6464
// The test uses GTID instead of position for comparison.
6565
offset3 =
@@ -72,16 +72,16 @@ public void testCompareToWithGtidSet() {
7272
.setGtidSet("abcd:1-5") // Contains offset3's GTID set
7373
.setBinlogFilePosition("binlog.001", 23)
7474
.build();
75-
assetCompareTo(offset3, offset4, -1);
76-
assetCompareTo(offset4, offset3, 1);
75+
assertCompareTo(offset3, offset4, -1);
76+
assertCompareTo(offset4, offset3, 1);
7777

7878
// Test completely different GTID sets
7979
BinlogOffset offset5 = BinlogOffset.builder().setGtidSet(PART_OF_GTID_SET_1).build();
8080
BinlogOffset offset6 = BinlogOffset.builder().setGtidSet(PART_OF_GTID_SET_2).build();
8181

8282
// offsets don't contain each other, result is always 1
83-
assetCompareTo(offset5, offset6, 1);
84-
assetCompareTo(offset6, offset5, 1);
83+
assertCompareTo(offset5, offset6, 1);
84+
assertCompareTo(offset6, offset5, 1);
8585
}
8686

8787
@Test
@@ -92,8 +92,8 @@ public void testCompareToWithGtidSetAndSkipEventsAndSkipRows() {
9292
BinlogOffset offset2 =
9393
BinlogOffset.builder().setGtidSet(FULL_GTID_SET).setSkipEvents(10).build();
9494

95-
assetCompareTo(offset1, offset2, -1);
96-
assetCompareTo(offset2, offset1, 1);
95+
assertCompareTo(offset1, offset2, -1);
96+
assertCompareTo(offset2, offset1, 1);
9797

9898
// Test same GTID and skip events but different skip rows
9999
BinlogOffset offset3 =
@@ -109,8 +109,8 @@ public void testCompareToWithGtidSetAndSkipEventsAndSkipRows() {
109109
.setSkipRows(20)
110110
.build();
111111

112-
assetCompareTo(offset3, offset4, -1);
113-
assetCompareTo(offset4, offset3, 1);
112+
assertCompareTo(offset3, offset4, -1);
113+
assertCompareTo(offset4, offset3, 1);
114114
}
115115

116116
@Test
@@ -125,8 +125,8 @@ public void testCompareToWithGtidSetExistence() {
125125
BinlogOffset.builder().setBinlogFilePosition("binlog.001", 456).build();
126126

127127
// When one has GTID and another doesn't, the one without GTID is considered older
128-
assetCompareTo(offsetWithGtid, offsetWithoutGtid, 1);
129-
assetCompareTo(offsetWithoutGtid, offsetWithGtid, -1);
128+
assertCompareTo(offsetWithGtid, offsetWithoutGtid, 1);
129+
assertCompareTo(offsetWithoutGtid, offsetWithGtid, -1);
130130

131131
// Test the reverse scenario
132132
BinlogOffset offsetWithGtid2 =
@@ -137,8 +137,8 @@ public void testCompareToWithGtidSetExistence() {
137137
.setSkipEvents(5)
138138
.build();
139139

140-
assetCompareTo(offsetWithGtid2, offsetWithoutGtid2, 1);
141-
assetCompareTo(offsetWithoutGtid2, offsetWithGtid2, -1);
140+
assertCompareTo(offsetWithGtid2, offsetWithoutGtid2, 1);
141+
assertCompareTo(offsetWithoutGtid2, offsetWithGtid2, -1);
142142
}
143143

144144
@Test
@@ -148,23 +148,23 @@ public void testCompareToWithFilePosition() {
148148
BinlogOffset.builder().setBinlogFilePosition("binlog.001", 123).build();
149149
BinlogOffset offset2 =
150150
BinlogOffset.builder().setBinlogFilePosition("binlog.001", 123).build();
151-
assetCompareTo(offset1, offset2, 0);
151+
assertCompareTo(offset1, offset2, 0);
152152

153153
// Test different file names
154154
BinlogOffset offset3 =
155155
BinlogOffset.builder().setBinlogFilePosition("binlog.001", 123).build();
156156
BinlogOffset offset4 =
157157
BinlogOffset.builder().setBinlogFilePosition("binlog.002", 123).build();
158-
assetCompareTo(offset3, offset4, -1);
159-
assetCompareTo(offset4, offset3, 1);
158+
assertCompareTo(offset3, offset4, -1);
159+
assertCompareTo(offset4, offset3, 1);
160160

161161
// Test different positions in same file
162162
BinlogOffset offset5 =
163163
BinlogOffset.builder().setBinlogFilePosition("binlog.001", 100).build();
164164
BinlogOffset offset6 =
165165
BinlogOffset.builder().setBinlogFilePosition("binlog.001", 200).build();
166-
assetCompareTo(offset5, offset6, -1);
167-
assetCompareTo(offset6, offset5, 1);
166+
assertCompareTo(offset5, offset6, -1);
167+
assertCompareTo(offset6, offset5, 1);
168168
}
169169

170170
@Test
@@ -180,8 +180,8 @@ public void testCompareToWithFilePositionAndSkipEventsAndSkipRows() {
180180
.setBinlogFilePosition("binlog.001", 123)
181181
.setSkipEvents(10)
182182
.build();
183-
assetCompareTo(offset1, offset2, -1);
184-
assetCompareTo(offset2, offset1, 1);
183+
assertCompareTo(offset1, offset2, -1);
184+
assertCompareTo(offset2, offset1, 1);
185185

186186
// Test with skip rows
187187
BinlogOffset offset3 =
@@ -196,8 +196,32 @@ public void testCompareToWithFilePositionAndSkipEventsAndSkipRows() {
196196
.setSkipEvents(5)
197197
.setSkipRows(20)
198198
.build();
199-
assetCompareTo(offset3, offset4, -1);
200-
assetCompareTo(offset4, offset3, 1);
199+
assertCompareTo(offset3, offset4, -1);
200+
assertCompareTo(offset4, offset3, 1);
201+
}
202+
203+
@Test
204+
public void testCompareToWithDifferentFilenameLength() {
205+
BinlogOffset offset1 =
206+
BinlogOffset.builder().setBinlogFilePosition("mysql.999", 123).build();
207+
BinlogOffset offset2 =
208+
BinlogOffset.builder().setBinlogFilePosition("mysql.1000", 123).build();
209+
assertCompareTo(offset1, offset2, -1);
210+
assertCompareTo(offset2, offset1, 1);
211+
212+
BinlogOffset offset3 =
213+
BinlogOffset.builder().setBinlogFilePosition("binlog.99", 100).build();
214+
BinlogOffset offset4 =
215+
BinlogOffset.builder().setBinlogFilePosition("binlog.100", 100).build();
216+
assertCompareTo(offset3, offset4, -1);
217+
assertCompareTo(offset4, offset3, 1);
218+
219+
BinlogOffset offset5 =
220+
BinlogOffset.builder().setBinlogFilePosition("mysql.9999", 50).build();
221+
BinlogOffset offset6 =
222+
BinlogOffset.builder().setBinlogFilePosition("mysql.10000", 50).build();
223+
assertCompareTo(offset5, offset6, -1);
224+
assertCompareTo(offset6, offset5, 1);
201225
}
202226

203227
@Test
@@ -217,8 +241,8 @@ public void testCompareToTimestampWithDifferentServerId() {
217241
.build();
218242

219243
// Should compare based on timestamp since server IDs are different
220-
assetCompareTo(offset1, offset2, -1);
221-
assetCompareTo(offset2, offset1, 1);
244+
assertCompareTo(offset1, offset2, -1);
245+
assertCompareTo(offset2, offset1, 1);
222246

223247
// Test same timestamps but different server IDs
224248
BinlogOffset offset3 =
@@ -239,11 +263,12 @@ public void testCompareToTimestampWithDifferentServerId() {
239263
// But since server IDs are different and timestamps are same, it will fall through to file
240264
// position comparison
241265
// Since file positions are same, it will compare skip events (default 0)
242-
assetCompareTo(offset3, offset4, 0);
266+
assertCompareTo(offset3, offset4, 0);
243267
}
244268

245-
private void assetCompareTo(BinlogOffset offset1, BinlogOffset offset2, int expected) {
269+
private void assertCompareTo(BinlogOffset offset1, BinlogOffset offset2, int expected) {
246270
int actual = offset1.compareTo(offset2);
247-
Assertions.assertThat(expected).isEqualTo(actual);
271+
// compareTo does not guarantee returning -1, 0, or 1. Just check the sign.
272+
Assertions.assertThat(Integer.signum(actual)).isEqualTo(expected);
248273
}
249274
}

0 commit comments

Comments
 (0)