Skip to content

Commit 4bab367

Browse files
authored
[FLINK-38618][base] Fix offset error due to duplicated stream split during TM failover if startup mode is latest-offset
This closes #4169.
1 parent cfaf3cb commit 4bab367

4 files changed

Lines changed: 140 additions & 14 deletions

File tree

flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/enumerator/IncrementalSourceEnumerator.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.apache.flink.cdc.connectors.base.source.meta.split.FinishedSnapshotSplitInfo;
4141
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
4242
import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit;
43+
import org.apache.flink.util.CollectionUtil;
4344
import org.apache.flink.util.FlinkRuntimeException;
4445

4546
import org.apache.flink.shaded.guava31.com.google.common.collect.Lists;
@@ -126,7 +127,9 @@ public void addSplitsBack(List<SourceSplitBase> splits, int subtaskId) {
126127
LOG.info("The enumerator adds add stream split back: {}", streamSplit);
127128
this.streamSplitTaskId = null;
128129
}
129-
splitAssigner.addSplits(splits);
130+
if (!CollectionUtil.isNullOrEmpty(splits)) {
131+
splitAssigner.addSplits(splits);
132+
}
130133
}
131134

132135
@Override

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlBinlogSplitAssigner.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.apache.flink.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo;
2626
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlBinlogSplit;
2727
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplit;
28-
import org.apache.flink.util.CollectionUtil;
2928

3029
import java.io.IOException;
3130
import java.util.ArrayList;
@@ -90,10 +89,8 @@ public void onFinishedSplits(Map<String, BinlogOffset> splitFinishedOffsets) {
9089

9190
@Override
9291
public void addSplits(Collection<MySqlSplit> splits) {
93-
if (!CollectionUtil.isNullOrEmpty(splits)) {
94-
// we don't store the split, but will re-create binlog split later
95-
isBinlogSplitAssigned = false;
96-
}
92+
// we don't store the split, but will re-create binlog split later
93+
isBinlogSplitAssigned = false;
9794
}
9895

9996
@Override

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/enumerator/MySqlSourceEnumerator.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.apache.flink.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo;
4141
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlBinlogSplit;
4242
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplit;
43+
import org.apache.flink.util.CollectionUtil;
4344
import org.apache.flink.util.FlinkRuntimeException;
4445

4546
import org.apache.flink.shaded.guava31.com.google.common.collect.Lists;
@@ -126,7 +127,9 @@ public void addSplitsBack(List<MySqlSplit> splits, int subtaskId) {
126127
LOG.info("The enumerator adds add binlog split back: {}", binlogSplit);
127128
this.binlogSplitTaskId = null;
128129
}
129-
splitAssigner.addSplits(splits);
130+
if (!CollectionUtil.isNullOrEmpty(splits)) {
131+
splitAssigner.addSplits(splits);
132+
}
130133
}
131134

132135
@Override

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerSourceITCase.java

Lines changed: 130 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737

3838
import io.debezium.jdbc.JdbcConnection;
3939
import org.apache.commons.lang3.StringUtils;
40+
import org.assertj.core.api.Assertions;
4041
import org.junit.jupiter.api.Disabled;
4142
import org.junit.jupiter.api.Test;
4243
import org.junit.jupiter.api.Timeout;
@@ -48,11 +49,17 @@
4849
import java.util.Collections;
4950
import java.util.Iterator;
5051
import java.util.List;
52+
import java.util.concurrent.ExecutionException;
53+
import java.util.concurrent.ExecutorService;
54+
import java.util.concurrent.Executors;
55+
import java.util.concurrent.FutureTask;
5156
import java.util.concurrent.TimeUnit;
57+
import java.util.concurrent.TimeoutException;
5258
import java.util.function.Function;
5359
import java.util.stream.Collectors;
5460

5561
import static java.lang.String.format;
62+
import static org.apache.flink.api.common.JobStatus.RUNNING;
5663
import static org.apache.flink.table.api.DataTypes.BIGINT;
5764
import static org.apache.flink.table.api.DataTypes.STRING;
5865
import static org.apache.flink.table.catalog.Column.physical;
@@ -67,6 +74,7 @@ class SqlServerSourceITCase extends SqlServerSourceTestBase {
6774

6875
private static final int USE_POST_LOWWATERMARK_HOOK = 1;
6976
private static final int USE_PRE_HIGHWATERMARK_HOOK = 2;
77+
private static final String DEFAULT_SCAN_STARTUP_MODE = "initial";
7078

7179
@Test
7280
void testReadSingleTableWithSingleParallelism() throws Exception {
@@ -111,6 +119,26 @@ void testJobManagerFailoverSingleParallelism() throws Exception {
111119
1, FailoverType.JM, FailoverPhase.SNAPSHOT, new String[] {"dbo.customers"});
112120
}
113121

122+
@Test
123+
public void testJobManagerFailoverFromLatestOffset() throws Exception {
124+
testSqlServerParallelSource(
125+
DEFAULT_PARALLELISM,
126+
"latest-offset",
127+
FailoverType.JM,
128+
FailoverPhase.STREAM,
129+
new String[] {"dbo.customers"});
130+
}
131+
132+
@Test
133+
public void testTaskManagerFailoverFromLatestOffset() throws Exception {
134+
testSqlServerParallelSource(
135+
DEFAULT_PARALLELISM,
136+
"latest-offset",
137+
FailoverType.TM,
138+
FailoverPhase.STREAM,
139+
new String[] {"dbo.customers"});
140+
}
141+
114142
@Test
115143
void testReadSingleTableWithSingleParallelismAndSkipBackfill() throws Exception {
116144
testSqlServerParallelSource(
@@ -375,12 +403,14 @@ private void testSqlServerParallelSource(
375403

376404
private void testSqlServerParallelSource(
377405
int parallelism,
406+
String scanStartupMode,
378407
FailoverType failoverType,
379408
FailoverPhase failoverPhase,
380409
String[] captureCustomerTables)
381410
throws Exception {
382411
testSqlServerParallelSource(
383412
parallelism,
413+
scanStartupMode,
384414
failoverType,
385415
failoverPhase,
386416
captureCustomerTables,
@@ -393,6 +423,43 @@ private void testSqlServerParallelSource(
393423
int parallelism,
394424
FailoverType failoverType,
395425
FailoverPhase failoverPhase,
426+
String[] captureCustomerTables)
427+
throws Exception {
428+
testSqlServerParallelSource(
429+
parallelism,
430+
failoverType,
431+
failoverPhase,
432+
captureCustomerTables,
433+
false,
434+
RestartStrategies.fixedDelayRestart(1, 0),
435+
null);
436+
}
437+
438+
private void testSqlServerParallelSource(
439+
int parallelism,
440+
FailoverType failoverType,
441+
FailoverPhase failoverPhase,
442+
String[] captureCustomerTables,
443+
boolean skipSnapshotBackfill,
444+
RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration,
445+
String chunkColumn)
446+
throws Exception {
447+
testSqlServerParallelSource(
448+
parallelism,
449+
DEFAULT_SCAN_STARTUP_MODE,
450+
failoverType,
451+
failoverPhase,
452+
captureCustomerTables,
453+
skipSnapshotBackfill,
454+
restartStrategyConfiguration,
455+
chunkColumn);
456+
}
457+
458+
private void testSqlServerParallelSource(
459+
int parallelism,
460+
String scanStartupMode,
461+
FailoverType failoverType,
462+
FailoverPhase failoverPhase,
396463
String[] captureCustomerTables,
397464
boolean skipSnapshotBackfill,
398465
RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration,
@@ -418,6 +485,7 @@ private void testSqlServerParallelSource(
418485
+ " phone_number STRING,"
419486
+ " primary key (id) not enforced"
420487
+ ") WITH ("
488+
+ " 'scan.startup.mode' = '%s',"
421489
+ " 'connector' = 'sqlserver-cdc',"
422490
+ " 'hostname' = '%s',"
423491
+ " 'port' = '%s',"
@@ -430,6 +498,7 @@ private void testSqlServerParallelSource(
430498
+ " 'scan.incremental.snapshot.backfill.skip' = '%s'"
431499
+ "%s"
432500
+ ")",
501+
scanStartupMode,
433502
MSSQL_SERVER_CONTAINER.getHost(),
434503
MSSQL_SERVER_CONTAINER.getMappedPort(MS_SQL_SERVER_PORT),
435504
MSSQL_SERVER_CONTAINER.getUsername(),
@@ -442,8 +511,26 @@ private void testSqlServerParallelSource(
442511
: ",'scan.incremental.snapshot.chunk.key-column'='"
443512
+ chunkColumn
444513
+ "'");
514+
tEnv.executeSql(sourceDDL);
515+
TableResult tableResult = tEnv.executeSql("select * from customers");
445516

446517
// first step: check the snapshot data
518+
if (DEFAULT_SCAN_STARTUP_MODE.equals(scanStartupMode)) {
519+
checkSnapshotData(tableResult, failoverType, failoverPhase, captureCustomerTables);
520+
}
521+
522+
// second step: check the binlog data
523+
checkBinlogData(tableResult, failoverType, failoverPhase, captureCustomerTables);
524+
525+
tableResult.getJobClient().get().cancel().get();
526+
}
527+
528+
private void checkSnapshotData(
529+
TableResult tableResult,
530+
FailoverType failoverType,
531+
FailoverPhase failoverPhase,
532+
String[] captureCustomerTables)
533+
throws Exception {
447534
String[] snapshotForSingleTable =
448535
new String[] {
449536
"+I[101, user_1, Shanghai, 123567891234]",
@@ -468,15 +555,15 @@ private void testSqlServerParallelSource(
468555
"+I[1019, user_20, Shanghai, 123567891234]",
469556
"+I[2000, user_21, Shanghai, 123567891234]"
470557
};
471-
tEnv.executeSql(sourceDDL);
472-
TableResult tableResult = tEnv.executeSql("select * from customers");
473-
CloseableIterator<Row> iterator = tableResult.collect();
474-
JobID jobId = tableResult.getJobClient().get().getJobID();
558+
475559
List<String> expectedSnapshotData = new ArrayList<>();
476560
for (int i = 0; i < captureCustomerTables.length; i++) {
477561
expectedSnapshotData.addAll(Arrays.asList(snapshotForSingleTable));
478562
}
479563

564+
CloseableIterator<Row> iterator = tableResult.collect();
565+
JobID jobId = tableResult.getJobClient().get().getJobID();
566+
480567
// trigger failover after some snapshot splits read finished
481568
if (failoverPhase == FailoverPhase.SNAPSHOT && iterator.hasNext()) {
482569
triggerFailover(
@@ -486,20 +573,35 @@ private void testSqlServerParallelSource(
486573
() -> sleepMs(100));
487574
}
488575

489-
LOG.info("snapshot data start");
490576
assertEqualsInAnyOrder(
491577
expectedSnapshotData, fetchRows(iterator, expectedSnapshotData.size()));
578+
}
579+
580+
private void checkBinlogData(
581+
TableResult tableResult,
582+
FailoverType failoverType,
583+
FailoverPhase failoverPhase,
584+
String[] captureCustomerTables)
585+
throws Exception {
586+
String databaseName = "customer";
587+
waitUntilJobRunning(tableResult);
588+
CloseableIterator<Row> iterator = tableResult.collect();
589+
JobID jobId = tableResult.getJobClient().get().getJobID();
492590

493-
// second step: check the change stream data
494591
for (String tableId : captureCustomerTables) {
495592
makeFirstPartChangeStreamEvents(databaseName + "." + tableId);
496593
}
594+
595+
// wait for the binlog reading
596+
Thread.sleep(2000L);
597+
497598
if (failoverPhase == FailoverPhase.STREAM) {
498599
triggerFailover(
499600
failoverType,
500601
jobId,
501602
miniClusterResource.get().getMiniCluster(),
502603
() -> sleepMs(200));
604+
waitUntilJobRunning(tableResult);
503605
}
504606
for (String tableId : captureCustomerTables) {
505607
makeSecondPartBinlogEvents(databaseName + "." + tableId);
@@ -524,7 +626,28 @@ private void testSqlServerParallelSource(
524626
expectedBinlogData.addAll(Arrays.asList(binlogForSingleTable));
525627
}
526628
assertEqualsInAnyOrder(expectedBinlogData, fetchRows(iterator, expectedBinlogData.size()));
527-
tableResult.getJobClient().get().cancel().get();
629+
Assertions.assertThat(hasNextData(iterator)).isFalse();
630+
}
631+
632+
private void waitUntilJobRunning(TableResult tableResult)
633+
throws InterruptedException, ExecutionException {
634+
do {
635+
Thread.sleep(5000L);
636+
} while (tableResult.getJobClient().get().getJobStatus().get() != RUNNING);
637+
}
638+
639+
private boolean hasNextData(final CloseableIterator<?> iterator)
640+
throws InterruptedException, ExecutionException {
641+
ExecutorService executor = Executors.newSingleThreadExecutor();
642+
try {
643+
FutureTask<Boolean> future = new FutureTask(iterator::hasNext);
644+
executor.execute(future);
645+
return future.get(3, TimeUnit.SECONDS);
646+
} catch (TimeoutException e) {
647+
return false;
648+
} finally {
649+
executor.shutdown();
650+
}
528651
}
529652

530653
private void makeFirstPartChangeStreamEvents(String tableId) {

0 commit comments

Comments
 (0)