Skip to content

Commit 5366137

Browse files
committed
[FLINK-39621] Fix incremental-snapshot based sources may get stuck in binlog-backfill stage
1 parent b94d7b2 commit 5366137

3 files changed

Lines changed: 115 additions & 0 deletions

File tree

flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceStreamFetcher.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ public void submitTask(FetchTask<SourceSplitBase> fetchTask) {
9090
configureFilter();
9191
taskContext.configure(currentStreamSplit);
9292
this.queue = taskContext.getQueue();
93+
startReadTask();
9394
executorService.submit(
9495
() -> {
9596
try {
@@ -282,6 +283,10 @@ private void configureFilter() {
282283
this.pureStreamPhaseTables.clear();
283284
}
284285

286+
public void startReadTask() {
287+
this.currentTaskRunning = true;
288+
}
289+
285290
public void stopReadTask() throws Exception {
286291
this.currentTaskRunning = false;
287292

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTaskTest.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,57 @@ void testSnapshotScanSkipBackfillWithPreHighWatermark() throws Exception {
243243
assertEqualsInAnyOrder(Arrays.asList(expected), actual);
244244
}
245245

246+
@Test
247+
void testMultipleSplitsWithBackfill() throws Exception {
248+
customDatabase.createAndInitialize();
249+
250+
TestTableId tableId = new TestTableId(schemaName, tableName);
251+
PostgresSourceConfigFactory sourceConfigFactory =
252+
getMockPostgresSourceConfigFactory(
253+
customDatabase, schemaName, tableName, null, 4, false);
254+
PostgresSourceConfig sourceConfig = sourceConfigFactory.create(0);
255+
PostgresDialect postgresDialect = new PostgresDialect(sourceConfigFactory.create(0));
256+
257+
SnapshotPhaseHooks snapshotHooks = new SnapshotPhaseHooks();
258+
snapshotHooks.setPreHighWatermarkAction(
259+
(postgresSourceConfig, split) -> {
260+
try (PostgresConnection conn = postgresDialect.openJdbcConnection()) {
261+
conn.execute(
262+
"UPDATE "
263+
+ tableId.toSql()
264+
+ " SET address = 'Beijing' WHERE \"Id\" = 103");
265+
conn.commit();
266+
}
267+
});
268+
269+
final DataType dataType =
270+
DataTypes.ROW(
271+
DataTypes.FIELD("Id", DataTypes.BIGINT()),
272+
DataTypes.FIELD("Name", DataTypes.STRING()),
273+
DataTypes.FIELD("address", DataTypes.STRING()),
274+
DataTypes.FIELD("phone_number", DataTypes.STRING()));
275+
276+
PostgresSourceFetchTaskContext postgresSourceFetchTaskContext =
277+
new PostgresSourceFetchTaskContext(sourceConfig, postgresDialect);
278+
List<SnapshotSplit> snapshotSplits = getSnapshotSplits(sourceConfig, postgresDialect);
279+
280+
List<String> actual =
281+
readTableSnapshotSplits(
282+
reOrderSnapshotSplits(snapshotSplits),
283+
postgresSourceFetchTaskContext,
284+
snapshotSplits.size(),
285+
dataType,
286+
snapshotHooks);
287+
288+
// Verify the ScanFetcher can successfully process all splits without getting stuck
289+
// (the FLINK-39207 bug would cause the reader to appear finished/stuck
290+
// when reusing a stopped ScanFetcher for the next split).
291+
// The preHighWatermark hook forces backfill phase for each split by making
292+
// highWatermark > lowWatermark.
293+
assertThat(actual).hasSize(21);
294+
assertThat(actual).contains("+I[103, user_3, Beijing, 123567891234]");
295+
}
296+
246297
@Test
247298
void testSnapshotFetchSize() throws Exception {
248299
customDatabase.createAndInitialize();

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/reader/fetch/SqlServerScanFetchTaskTest.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,65 @@ void testInsertDataInSnapshotScan() throws Exception {
202202
assertEqualsInAnyOrder(Arrays.asList(expected), actual);
203203
}
204204

205+
@Test
206+
void testMultipleSplitsWithBackfill() throws Exception {
207+
String databaseName = "customer";
208+
String tableName = "dbo.customers";
209+
210+
initializeSqlServerTable(databaseName);
211+
212+
SqlServerSourceConfigFactory sourceConfigFactory =
213+
getConfigFactory(databaseName, new String[] {tableName}, 4);
214+
SqlServerSourceConfig sourceConfig = sourceConfigFactory.create(0);
215+
SqlServerDialect sqlServerDialect = new SqlServerDialect(sourceConfig);
216+
217+
String tableId = databaseName + "." + tableName;
218+
SnapshotPhaseHooks hooks = new SnapshotPhaseHooks();
219+
hooks.setPreHighWatermarkAction(
220+
(config, split) -> {
221+
executeSql(
222+
(SqlServerSourceConfig) config,
223+
new String[] {
224+
"UPDATE " + tableId + " SET address = 'Beijing' WHERE id = 103"
225+
});
226+
try {
227+
Thread.sleep(10 * 1000);
228+
} catch (InterruptedException e) {
229+
throw new RuntimeException(e);
230+
}
231+
});
232+
SqlServerSourceFetchTaskContext sqlServerSourceFetchTaskContext =
233+
new SqlServerSourceFetchTaskContext(
234+
sourceConfig,
235+
sqlServerDialect,
236+
createSqlServerConnection(sourceConfig.getDbzConnectorConfig()),
237+
createSqlServerConnection(sourceConfig.getDbzConnectorConfig()));
238+
239+
final DataType dataType =
240+
DataTypes.ROW(
241+
DataTypes.FIELD("id", DataTypes.BIGINT()),
242+
DataTypes.FIELD("name", DataTypes.STRING()),
243+
DataTypes.FIELD("address", DataTypes.STRING()),
244+
DataTypes.FIELD("phone_number", DataTypes.STRING()));
245+
List<SnapshotSplit> snapshotSplits = getSnapshotSplits(sourceConfig, sqlServerDialect);
246+
247+
List<String> actual =
248+
readTableSnapshotSplits(
249+
reOrderSnapshotSplits(snapshotSplits),
250+
sqlServerSourceFetchTaskContext,
251+
snapshotSplits.size(),
252+
dataType,
253+
hooks);
254+
255+
// Verify the ScanFetcher can successfully process all splits without getting stuck
256+
// (the FLINK-39207 bug would cause the reader to appear finished/stuck
257+
// when reusing a stopped ScanFetcher for the next split).
258+
// The preHighWatermark hook forces backfill phase for each split by making
259+
// highWatermark > lowWatermark.
260+
Assertions.assertThat(actual).hasSize(21);
261+
Assertions.assertThat(actual).contains("+I[103, user_3, Beijing, 123567891234]");
262+
}
263+
205264
@Test
206265
void testDateTimePrimaryKey() throws Exception {
207266
String databaseName = "pk";

0 commit comments

Comments
 (0)