From 0909dc459b2801321d4170feab3382978042a57c Mon Sep 17 00:00:00 2001 From: yuanoOo Date: Thu, 30 Apr 2026 10:41:01 +0800 Subject: [PATCH 1/2] [hotfix][oceanbase] Fix connector identifier in OceanBaseFailoverITCase Change 'connector' = 'mysql-cdc' to 'connector' = 'oceanbase-cdc' in OceanBaseFailoverITCase to use the correct connector factory. This fixes failover test failures where the wrong connector was being exercised. --- .../flink/cdc/connectors/oceanbase/OceanBaseFailoverITCase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseFailoverITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseFailoverITCase.java index 1887b0a855d..b2ce8d8f7bd 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseFailoverITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseFailoverITCase.java @@ -344,7 +344,7 @@ private void testMySqlParallelSource( ? "" : ", primary key (id) not enforced") + ") WITH (" - + " 'connector' = 'mysql-cdc'," + + " 'connector' = 'oceanbase-cdc'," + " 'scan.incremental.snapshot.enabled' = 'true'," + " 'hostname' = '%s'," + " 'port' = '%s'," From f13274a322bf37133ef1f7ae2946217ca101a17c Mon Sep 17 00:00:00 2001 From: yuanoOo Date: Thu, 30 Apr 2026 18:27:54 +0800 Subject: [PATCH 2/2] [hotfix][oceanbase] Stabilize failover snapshot collection Avoid consuming snapshot rows before triggering OceanBase failover so recovered snapshot rows are not mixed with pre-failover buffered results. Made-with: Cursor --- .../cdc/connectors/oceanbase/OceanBaseFailoverITCase.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseFailoverITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseFailoverITCase.java index b2ce8d8f7bd..7eb5d58c0d2 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseFailoverITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseFailoverITCase.java @@ -435,11 +435,12 @@ private void checkSnapshotData( expectedSnapshotData.addAll(Arrays.asList(snapshotForSingleTable)); } - CloseableIterator iterator = tableResult.collect(); JobID jobId = tableResult.getJobClient().get().getJobID(); - // trigger failover after some snapshot splits read finished - if (failoverPhase == FailoverPhase.SNAPSHOT && iterator.hasNext()) { + // Trigger failover before collecting rows to avoid mixing pre-failover buffered rows with + // replayed snapshot rows after recovery. + if (failoverPhase == FailoverPhase.SNAPSHOT) { + waitUntilJobRunning(tableResult); triggerFailover( failoverType, jobId, @@ -447,6 +448,7 @@ private void checkSnapshotData( () -> sleepMs(100)); } + CloseableIterator iterator = tableResult.collect(); assertEqualsInAnyOrder( expectedSnapshotData, fetchRows(iterator, expectedSnapshotData.size())); }