Skip to content

Commit 913ad73

Browse files
committed
Enhancement: Add e2e integration and integration tests for oceanbase-cdc connector
1 parent a42c0b5 commit 913ad73

4 files changed

Lines changed: 19 additions & 18 deletions

File tree

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseCharsetITCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ public void testCharset(String testName, String[] snapshotExpected, String[] bin
162162
+ " table_name STRING,\n"
163163
+ " primary key(table_id) not enforced"
164164
+ ") WITH ("
165-
+ " 'connector' = 'oceanbase-cdc',"
165+
+ " 'connector' = 'mysql-cdc',"
166166
+ " 'hostname' = '%s',"
167167
+ " 'port' = '%s',"
168168
+ " 'username' = '%s',"

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseFailoverITCase.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,8 @@ public class OceanBaseFailoverITCase extends OceanBaseSourceTestBase {
6565

6666
private static final String DEFAULT_SCAN_STARTUP_MODE = "initial";
6767
private static final String DDL_FILE = "oceanbase_ddl_test";
68-
private static final String DEFAULT_TEST_DATABASE = "customer_" + getRandomSuffix();
6968
protected static final int DEFAULT_PARALLELISM = 4;
69+
private String testDatabase = "customer_" + getRandomSuffix();
7070

7171
private final List<String> firstPartBinlogEvents =
7272
Arrays.asList(
@@ -105,15 +105,16 @@ public static Stream<Arguments> parameters() {
105105

106106
@BeforeEach
107107
public void setup() throws InterruptedException {
108+
testDatabase = "customer_" + getRandomSuffix();
108109
initializeOceanBaseTables(
109110
DDL_FILE,
110-
DEFAULT_TEST_DATABASE,
111+
testDatabase,
111112
s -> !StringUtils.isNullOrWhitespaceOnly(s) && (s.contains("customers")));
112113
}
113114

114115
@AfterEach
115116
public void clean() {
116-
dropDatabase(DEFAULT_TEST_DATABASE);
117+
dropDatabase(testDatabase);
117118
}
118119

119120
// Failover tests
@@ -309,7 +310,7 @@ private void testMySqlParallelSource(
309310
? ""
310311
: ", primary key (id) not enforced")
311312
+ ") WITH ("
312-
+ " 'connector' = 'oceanbase-cdc',"
313+
+ " 'connector' = 'mysql-cdc',"
313314
+ " 'scan.incremental.snapshot.enabled' = 'true',"
314315
+ " 'hostname' = '%s',"
315316
+ " 'port' = '%s',"
@@ -328,7 +329,7 @@ private void testMySqlParallelSource(
328329
getPort(),
329330
getUserName(),
330331
getPassword(),
331-
DEFAULT_TEST_DATABASE,
332+
testDatabase,
332333
getTableNameRegex(captureCustomerTables),
333334
scanStartupMode,
334335
skipSnapshotBackfill,
@@ -416,7 +417,7 @@ private void checkBinlogData(
416417
JobID jobId = tableResult.getJobClient().get().getJobID();
417418

418419
for (String tableId : captureCustomerTables) {
419-
makeFirstPartBinlogEvents(getConnection(), DEFAULT_TEST_DATABASE + '.' + tableId);
420+
makeFirstPartBinlogEvents(getConnection(), testDatabase + '.' + tableId);
420421
}
421422

422423
// wait for the binlog reading
@@ -431,7 +432,7 @@ private void checkBinlogData(
431432
waitUntilJobRunning(tableResult);
432433
}
433434
for (String tableId : captureCustomerTables) {
434-
makeSecondPartBinlogEvents(getConnection(), DEFAULT_TEST_DATABASE + '.' + tableId);
435+
makeSecondPartBinlogEvents(getConnection(), testDatabase + '.' + tableId);
435436
}
436437

437438
List<String> expectedBinlogData = new ArrayList<>();

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseSourceITCase.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ public void testSingleKey() throws Exception {
7373
+ " create_time TIMESTAMP,"
7474
+ " primary key (id) not enforced"
7575
+ ") WITH ("
76-
+ " 'connector' = 'oceanbase-cdc',"
76+
+ " 'connector' = 'mysql-cdc',"
7777
+ " 'scan.incremental.snapshot.enabled' = 'true',"
7878
+ " 'hostname' = '%s',"
7979
+ " 'port' = '%s',"
@@ -225,7 +225,7 @@ public void testFullTypesDdl() throws Exception {
225225
+ " geometrycollection_c STRING,\n"
226226
+ " primary key (`id`) not enforced"
227227
+ ") WITH ("
228-
+ " 'connector' = 'oceanbase-cdc',"
228+
+ " 'connector' = 'mysql-cdc',"
229229
+ " 'scan.incremental.snapshot.enabled' = 'true',"
230230
+ " 'hostname' = '%s',"
231231
+ " 'port' = '%s',"
@@ -279,7 +279,7 @@ public void testMultiKeys() throws Exception {
279279
+ " create_time TIMESTAMP,"
280280
+ " primary key (id,order_id) not enforced"
281281
+ ") WITH ("
282-
+ " 'connector' = 'oceanbase-cdc',"
282+
+ " 'connector' = 'mysql-cdc',"
283283
+ " 'scan.incremental.snapshot.enabled' = 'true',"
284284
+ " 'hostname' = '%s',"
285285
+ " 'port' = '%s',"

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseFailoverITCase.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,8 @@ public class OceanBaseFailoverITCase extends OceanBaseSourceTestBase {
6262

6363
private static final String DEFAULT_SCAN_STARTUP_MODE = "initial";
6464
private static final String DDL_FILE = "oceanbase_ddl_test";
65-
private static String DEFAULT_TEST_DATABASE = "customer_" + getRandomSuffix();
6665
protected static final int DEFAULT_PARALLELISM = 4;
66+
private String testDatabase = "customer_" + getRandomSuffix();
6767

6868
private final List<String> firstPartBinlogEvents =
6969
Arrays.asList(
@@ -102,16 +102,16 @@ public static Stream<Arguments> parameters() {
102102

103103
@BeforeEach
104104
public void setup() throws InterruptedException {
105-
DEFAULT_TEST_DATABASE = "customer_" + getRandomSuffix();
105+
testDatabase = "customer_" + getRandomSuffix();
106106
initializeOceanBaseTables(
107107
DDL_FILE,
108-
DEFAULT_TEST_DATABASE,
108+
testDatabase,
109109
s -> !StringUtils.isNullOrWhitespaceOnly(s) && (s.contains("customers")));
110110
}
111111

112112
@AfterEach
113113
public void clean() {
114-
dropDatabase(DEFAULT_TEST_DATABASE);
114+
dropDatabase(testDatabase);
115115
}
116116

117117
// Failover tests
@@ -326,7 +326,7 @@ private void testMySqlParallelSource(
326326
getPort(),
327327
getUserName(),
328328
getPassword(),
329-
DEFAULT_TEST_DATABASE,
329+
testDatabase,
330330
getTableNameRegex(captureCustomerTables),
331331
scanStartupMode,
332332
skipSnapshotBackfill,
@@ -414,7 +414,7 @@ private void checkBinlogData(
414414
JobID jobId = tableResult.getJobClient().get().getJobID();
415415

416416
for (String tableId : captureCustomerTables) {
417-
makeFirstPartBinlogEvents(getConnection(), DEFAULT_TEST_DATABASE + '.' + tableId);
417+
makeFirstPartBinlogEvents(getConnection(), testDatabase + '.' + tableId);
418418
}
419419

420420
// wait for the binlog reading
@@ -429,7 +429,7 @@ private void checkBinlogData(
429429
waitUntilJobRunning(tableResult);
430430
}
431431
for (String tableId : captureCustomerTables) {
432-
makeSecondPartBinlogEvents(getConnection(), DEFAULT_TEST_DATABASE + '.' + tableId);
432+
makeSecondPartBinlogEvents(getConnection(), testDatabase + '.' + tableId);
433433
}
434434

435435
List<String> expectedBinlogData = new ArrayList<>();

0 commit comments

Comments
 (0)