diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseFailoverITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseFailoverITCase.java index 1887b0a855d..7eb5d58c0d2 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseFailoverITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseFailoverITCase.java @@ -344,7 +344,7 @@ private void testMySqlParallelSource( ? "" : ", primary key (id) not enforced") + ") WITH (" - + " 'connector' = 'mysql-cdc'," + + " 'connector' = 'oceanbase-cdc'," + " 'scan.incremental.snapshot.enabled' = 'true'," + " 'hostname' = '%s'," + " 'port' = '%s'," @@ -435,11 +435,12 @@ private void checkSnapshotData( expectedSnapshotData.addAll(Arrays.asList(snapshotForSingleTable)); } - CloseableIterator iterator = tableResult.collect(); JobID jobId = tableResult.getJobClient().get().getJobID(); - // trigger failover after some snapshot splits read finished - if (failoverPhase == FailoverPhase.SNAPSHOT && iterator.hasNext()) { + // Trigger failover before collecting rows to avoid mixing pre-failover buffered rows with + // replayed snapshot rows after recovery. + if (failoverPhase == FailoverPhase.SNAPSHOT) { + waitUntilJobRunning(tableResult); triggerFailover( failoverType, jobId, @@ -447,6 +448,7 @@ private void checkSnapshotData( () -> sleepMs(100)); } + CloseableIterator iterator = tableResult.collect(); assertEqualsInAnyOrder( expectedSnapshotData, fetchRows(iterator, expectedSnapshotData.size())); }