Skip to content

Conversation

dybyte
Copy link
Contributor

@dybyte dybyte commented Oct 8, 2025

Purpose of this pull request

  • Exclude tasks and pipelines in CLOSED/FINISHED state from checkpoint, statistics, and trigger operations to improve system stability and performance.
  • Updated logic in CheckpointCoordinator.java and PhysicalPlanGenerator.java to filter out tasks and pipelines no longer active.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Added tests ensuring CLOSED/FINISHED tasks and pipelines are properly excluded from processing.

Check list

@github-actions github-actions bot added the Zeta label Oct 8, 2025
Copy link
Member

@Hisoka-X Hisoka-X left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a e2e test case with savepoint and restore for cdc job? This is a very important and useful change. Thanks @dybyte !

@dybyte
Copy link
Contributor Author

dybyte commented Oct 10, 2025

Could you add a e2e test case with savepoint and restore for cdc job?

public void testRestoreTaskWhenBinlogDelete(TestContainer container)
throws InterruptedException, IOException {
// Clear related content to ensure that multiple operations are not affected
clearTable(MYSQL_DATABASE, SINK_TABLE);
// execute task
Long jobId = JobIdGenerator.newJobId();
CompletableFuture.supplyAsync(
() -> {
try {
return container.executeJob(
"/mysqlcdc_to_mysql_with_binlog_delete.conf",
String.valueOf(jobId));
} catch (Exception e) {
log.error("Commit task exception :" + e.getMessage());
throw new RuntimeException(e);
}
});
// wait for data written to sink
await().atMost(60000, TimeUnit.MILLISECONDS)
.untilAsserted(
() -> {
Assertions.assertIterableEquals(
query(getSourceQuerySQL(MYSQL_DATABASE, SOURCE_TABLE)),
query(getSinkQuerySQL(MYSQL_DATABASE, SINK_TABLE)));
});
// flush binary logs
executeSql("flush binary logs");
// wait a moment for binlog heartbeat event
TimeUnit.SECONDS.sleep(60);
// pause task
Assertions.assertEquals(0, container.savepointJob(String.valueOf(jobId)).getExitCode());
// purge binary logs
List<List<Object>> masterStatus = query("show master status");
String binlogName = masterStatus.get(0).get(0).toString();
executeSql("purge binary logs to '" + binlogName + "'");
// restore task
CompletableFuture.supplyAsync(
() -> {
try {
container.restoreJob(
"/mysqlcdc_to_mysql_with_binlog_delete.conf",
String.valueOf(jobId));
} catch (Exception e) {
log.error("Commit task exception :" + e.getMessage());
throw new RuntimeException(e);
}
return null;
});
await().atMost(2, TimeUnit.MINUTES)
.untilAsserted(
() -> {
String jobStatus = container.getJobStatus(String.valueOf(jobId));
Assertions.assertEquals("RUNNING", jobStatus);
});
// write data again, check no problem
upsertDeleteSourceTable(MYSQL_DATABASE, SOURCE_TABLE);
await().atMost(60000, TimeUnit.MILLISECONDS)
.untilAsserted(
() -> {
Assertions.assertIterableEquals(
query(getSourceQuerySQL(MYSQL_DATABASE, SOURCE_TABLE)),
query(getSinkQuerySQL(MYSQL_DATABASE, SINK_TABLE)));
});
// check job status is not failed
await().pollDelay(20, TimeUnit.SECONDS)
.atMost(60, TimeUnit.SECONDS)
.untilAsserted(
() ->
Assertions.assertEquals(
"RUNNING", container.getJobStatus(String.valueOf(jobId))));
// cancel task
Assertions.assertEquals(0, container.cancelJob(String.valueOf(jobId)).getExitCode());
}

I believe this test already covers savepoint and restore for CDC jobs to some extent. If you have any specific scenarios or additional cases in mind, could you please let me know?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants