Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ public void submitTask(FetchTask<SourceSplitBase> fetchTask) {
configureFilter();
taskContext.configure(currentStreamSplit);
this.queue = taskContext.getQueue();
startReadTask();
executorService.submit(
() -> {
try {
Expand Down Expand Up @@ -282,6 +283,10 @@ private void configureFilter() {
this.pureStreamPhaseTables.clear();
}

public void startReadTask() {
this.currentTaskRunning = true;
Comment thread
loserwang1024 marked this conversation as resolved.
Outdated
}

public void stopReadTask() throws Exception {
this.currentTaskRunning = false;
Comment thread
yuxiqian marked this conversation as resolved.
Outdated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,57 @@ void testSnapshotScanSkipBackfillWithPreHighWatermark() throws Exception {
assertEqualsInAnyOrder(Arrays.asList(expected), actual);
}

@Test
void testMultipleSplitsWithBackfill() throws Exception {
customDatabase.createAndInitialize();

TestTableId tableId = new TestTableId(schemaName, tableName);
PostgresSourceConfigFactory sourceConfigFactory =
getMockPostgresSourceConfigFactory(
customDatabase, schemaName, tableName, null, 4, false);
PostgresSourceConfig sourceConfig = sourceConfigFactory.create(0);
PostgresDialect postgresDialect = new PostgresDialect(sourceConfigFactory.create(0));

SnapshotPhaseHooks snapshotHooks = new SnapshotPhaseHooks();
snapshotHooks.setPreHighWatermarkAction(
(postgresSourceConfig, split) -> {
try (PostgresConnection conn = postgresDialect.openJdbcConnection()) {
conn.execute(
"UPDATE "
+ tableId.toSql()
+ " SET address = 'Beijing' WHERE \"Id\" = 103");
conn.commit();
}
});

final DataType dataType =
DataTypes.ROW(
DataTypes.FIELD("Id", DataTypes.BIGINT()),
DataTypes.FIELD("Name", DataTypes.STRING()),
DataTypes.FIELD("address", DataTypes.STRING()),
DataTypes.FIELD("phone_number", DataTypes.STRING()));

PostgresSourceFetchTaskContext postgresSourceFetchTaskContext =
new PostgresSourceFetchTaskContext(sourceConfig, postgresDialect);
List<SnapshotSplit> snapshotSplits = getSnapshotSplits(sourceConfig, postgresDialect);

List<String> actual =
readTableSnapshotSplits(
reOrderSnapshotSplits(snapshotSplits),
postgresSourceFetchTaskContext,
snapshotSplits.size(),
dataType,
snapshotHooks);

// Verify the ScanFetcher can successfully process all splits without getting stuck
// (the FLINK-39207 bug would cause the reader to appear finished/stuck
// when reusing a stopped ScanFetcher for the next split).
// The preHighWatermark hook forces backfill phase for each split by making
// highWatermark > lowWatermark.
assertThat(actual).hasSize(21);
assertThat(actual).contains("+I[103, user_3, Beijing, 123567891234]");
}

@Test
void testSnapshotFetchSize() throws Exception {
customDatabase.createAndInitialize();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,65 @@ void testInsertDataInSnapshotScan() throws Exception {
assertEqualsInAnyOrder(Arrays.asList(expected), actual);
}

@Test
void testMultipleSplitsWithBackfill() throws Exception {
String databaseName = "customer";
String tableName = "dbo.customers";

initializeSqlServerTable(databaseName);

SqlServerSourceConfigFactory sourceConfigFactory =
getConfigFactory(databaseName, new String[] {tableName}, 4);
SqlServerSourceConfig sourceConfig = sourceConfigFactory.create(0);
SqlServerDialect sqlServerDialect = new SqlServerDialect(sourceConfig);

String tableId = databaseName + "." + tableName;
SnapshotPhaseHooks hooks = new SnapshotPhaseHooks();
hooks.setPreHighWatermarkAction(
(config, split) -> {
executeSql(
(SqlServerSourceConfig) config,
new String[] {
"UPDATE " + tableId + " SET address = 'Beijing' WHERE id = 103"
});
try {
Thread.sleep(10 * 1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
SqlServerSourceFetchTaskContext sqlServerSourceFetchTaskContext =
new SqlServerSourceFetchTaskContext(
sourceConfig,
sqlServerDialect,
createSqlServerConnection(sourceConfig.getDbzConnectorConfig()),
createSqlServerConnection(sourceConfig.getDbzConnectorConfig()));

final DataType dataType =
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.BIGINT()),
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("address", DataTypes.STRING()),
DataTypes.FIELD("phone_number", DataTypes.STRING()));
List<SnapshotSplit> snapshotSplits = getSnapshotSplits(sourceConfig, sqlServerDialect);

List<String> actual =
readTableSnapshotSplits(
reOrderSnapshotSplits(snapshotSplits),
sqlServerSourceFetchTaskContext,
snapshotSplits.size(),
dataType,
hooks);

// Verify the ScanFetcher can successfully process all splits without getting stuck
// (the FLINK-39207 bug would cause the reader to appear finished/stuck
// when reusing a stopped ScanFetcher for the next split).
// The preHighWatermark hook forces backfill phase for each split by making
// highWatermark > lowWatermark.
Assertions.assertThat(actual).hasSize(21);
Assertions.assertThat(actual).contains("+I[103, user_3, Beijing, 123567891234]");
}

@Test
void testDateTimePrimaryKey() throws Exception {
String databaseName = "pk";
Expand Down
Loading