Skip to content

Commit 13bd46b

Browse files
authored
[Improve][E2E] modify the method of obtaining JobId (#7880)
1 parent 26c528a commit 13bd46b

File tree

8 files changed

+64
-152
lines changed
  • seatunnel-e2e
    • seatunnel-connector-v2-e2e
      • connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql
      • connector-cdc-opengauss-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres
      • connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle
      • connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres
      • connector-cdc-tidb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tidb
      • connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon
    • seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util
    • seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e

8 files changed

+64
-152
lines changed

Diff for: seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java

+7-17
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.seatunnel.e2e.common.container.TestContainer;
2828
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
2929
import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
30+
import org.apache.seatunnel.e2e.common.util.JobIdGenerator;
3031

3132
import org.junit.jupiter.api.AfterAll;
3233
import org.junit.jupiter.api.Assertions;
@@ -49,8 +50,6 @@
4950
import java.util.List;
5051
import java.util.concurrent.CompletableFuture;
5152
import java.util.concurrent.TimeUnit;
52-
import java.util.regex.Matcher;
53-
import java.util.regex.Pattern;
5453
import java.util.stream.Stream;
5554

5655
import static org.awaitility.Awaitility.await;
@@ -329,11 +328,13 @@ public void testMultiTableWithRestore(TestContainer container)
329328
clearTable(MYSQL_DATABASE2, SOURCE_TABLE_1);
330329
clearTable(MYSQL_DATABASE2, SOURCE_TABLE_2);
331330

331+
Long jobId = JobIdGenerator.newJobId();
332332
CompletableFuture.supplyAsync(
333333
() -> {
334334
try {
335335
return container.executeJob(
336-
"/mysqlcdc_to_mysql_with_multi_table_mode_one_table.conf");
336+
"/mysqlcdc_to_mysql_with_multi_table_mode_one_table.conf",
337+
String.valueOf(jobId));
337338
} catch (Exception e) {
338339
log.error("Commit task exception :" + e.getMessage());
339340
throw new RuntimeException(e);
@@ -365,26 +366,15 @@ public void testMultiTableWithRestore(TestContainer container)
365366
.pollInterval(1000, TimeUnit.MILLISECONDS)
366367
.until(() -> getConnectionStatus("st_user_sink").size() == 1);
367368

368-
Pattern jobIdPattern =
369-
Pattern.compile(
370-
".*Init JobMaster for Job mysqlcdc_to_mysql_with_multi_table_mode_one_table.conf \\(([0-9]*)\\).*",
371-
Pattern.DOTALL);
372-
Matcher matcher = jobIdPattern.matcher(container.getServerLogs());
373-
String jobId;
374-
if (matcher.matches()) {
375-
jobId = matcher.group(1);
376-
} else {
377-
throw new RuntimeException("Can not find jobId");
378-
}
379-
380-
Assertions.assertEquals(0, container.savepointJob(jobId).getExitCode());
369+
Assertions.assertEquals(0, container.savepointJob(String.valueOf(jobId)).getExitCode());
381370

382371
// Restore job with add a new table
383372
CompletableFuture.supplyAsync(
384373
() -> {
385374
try {
386375
container.restoreJob(
387-
"/mysqlcdc_to_mysql_with_multi_table_mode_two_table.conf", jobId);
376+
"/mysqlcdc_to_mysql_with_multi_table_mode_two_table.conf",
377+
String.valueOf(jobId));
388378
} catch (Exception e) {
389379
log.error("Commit task exception :" + e.getMessage());
390380
throw new RuntimeException(e);

Diff for: seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-opengauss-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/OpengaussCDCIT.java

+12-30
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.seatunnel.e2e.common.container.TestContainer;
2525
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
2626
import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
27+
import org.apache.seatunnel.e2e.common.util.JobIdGenerator;
2728

2829
import org.junit.jupiter.api.AfterAll;
2930
import org.junit.jupiter.api.Assertions;
@@ -288,12 +289,14 @@ public void testOpengaussCdcMultiTableE2e(TestContainer container) {
288289
disabledReason = "Currently SPARK and FLINK do not support restore")
289290
public void testMultiTableWithRestore(TestContainer container)
290291
throws IOException, InterruptedException {
292+
Long jobId = JobIdGenerator.newJobId();
291293
try {
292294
CompletableFuture.supplyAsync(
293295
() -> {
294296
try {
295297
return container.executeJob(
296-
"/opengausscdc_to_opengauss_with_multi_table_mode_one_table.conf");
298+
"/opengausscdc_to_opengauss_with_multi_table_mode_one_table.conf",
299+
String.valueOf(jobId));
297300
} catch (Exception e) {
298301
log.error("Commit task exception :" + e.getMessage());
299302
throw new RuntimeException(e);
@@ -319,27 +322,15 @@ public void testMultiTableWithRestore(TestContainer container)
319322
OPENGAUSS_SCHEMA,
320323
SINK_TABLE_1)))));
321324

322-
Pattern jobIdPattern =
323-
Pattern.compile(
324-
".*Init JobMaster for Job opengausscdc_to_opengauss_with_multi_table_mode_one_table.conf \\(([0-9]*)\\).*",
325-
Pattern.DOTALL);
326-
Matcher matcher = jobIdPattern.matcher(container.getServerLogs());
327-
String jobId;
328-
if (matcher.matches()) {
329-
jobId = matcher.group(1);
330-
} else {
331-
throw new RuntimeException("Can not find jobId");
332-
}
333-
334-
Assertions.assertEquals(0, container.savepointJob(jobId).getExitCode());
325+
Assertions.assertEquals(0, container.savepointJob(String.valueOf(jobId)).getExitCode());
335326

336327
// Restore job with add a new table
337328
CompletableFuture.supplyAsync(
338329
() -> {
339330
try {
340331
container.restoreJob(
341332
"/opengausscdc_to_opengauss_with_multi_table_mode_two_table.conf",
342-
jobId);
333+
String.valueOf(jobId));
343334
} catch (Exception e) {
344335
log.error("Commit task exception :" + e.getMessage());
345336
throw new RuntimeException(e);
@@ -397,12 +388,14 @@ public void testMultiTableWithRestore(TestContainer container)
397388
disabledReason = "Currently SPARK and FLINK do not support restore")
398389
public void testAddFiledWithRestore(TestContainer container)
399390
throws IOException, InterruptedException {
391+
Long jobId = JobIdGenerator.newJobId();
400392
try {
401393
CompletableFuture.supplyAsync(
402394
() -> {
403395
try {
404396
return container.executeJob(
405-
"/opengausscdc_to_opengauss_test_add_Filed.conf");
397+
"/opengausscdc_to_opengauss_test_add_Filed.conf",
398+
String.valueOf(jobId));
406399
} catch (Exception e) {
407400
log.error("Commit task exception :" + e.getMessage());
408401
throw new RuntimeException(e);
@@ -425,19 +418,7 @@ public void testAddFiledWithRestore(TestContainer container)
425418
OPENGAUSS_SCHEMA,
426419
SINK_TABLE_3)))));
427420

428-
Pattern jobIdPattern =
429-
Pattern.compile(
430-
".*Init JobMaster for Job opengausscdc_to_opengauss_test_add_Filed.conf \\(([0-9]*)\\).*",
431-
Pattern.DOTALL);
432-
Matcher matcher = jobIdPattern.matcher(container.getServerLogs());
433-
String jobId;
434-
if (matcher.matches()) {
435-
jobId = matcher.group(1);
436-
} else {
437-
throw new RuntimeException("Can not find jobId");
438-
}
439-
440-
Assertions.assertEquals(0, container.savepointJob(jobId).getExitCode());
421+
Assertions.assertEquals(0, container.savepointJob(String.valueOf(jobId)).getExitCode());
441422

442423
// add filed add insert source table data
443424
addFieldsForTable(OPENGAUSS_SCHEMA, SOURCE_TABLE_3);
@@ -449,7 +430,8 @@ public void testAddFiledWithRestore(TestContainer container)
449430
() -> {
450431
try {
451432
container.restoreJob(
452-
"/opengausscdc_to_opengauss_test_add_Filed.conf", jobId);
433+
"/opengausscdc_to_opengauss_test_add_Filed.conf",
434+
String.valueOf(jobId));
453435
} catch (Exception e) {
454436
log.error("Commit task exception :" + e.getMessage());
455437
throw new RuntimeException(e);

Diff for: seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCIT.java

+7-15
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.seatunnel.e2e.common.container.TestContainer;
2424
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
2525
import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
26+
import org.apache.seatunnel.e2e.common.util.JobIdGenerator;
2627

2728
import org.junit.jupiter.api.AfterAll;
2829
import org.junit.jupiter.api.Assertions;
@@ -390,11 +391,13 @@ public void testMultiTableWithRestore(TestContainer container)
390391
insertSourceTable(DATABASE, SOURCE_TABLE1);
391392
insertSourceTable(DATABASE, SOURCE_TABLE2);
392393

394+
Long jobId = JobIdGenerator.newJobId();
393395
CompletableFuture.supplyAsync(
394396
() -> {
395397
try {
396398
return container.executeJob(
397-
"/oraclecdc_to_oracle_with_multi_table_mode_one_table.conf");
399+
"/oraclecdc_to_oracle_with_multi_table_mode_one_table.conf",
400+
String.valueOf(jobId));
398401
} catch (Exception e) {
399402
log.error("Commit task exception :" + e.getMessage());
400403
throw new RuntimeException(e);
@@ -432,26 +435,15 @@ public void testMultiTableWithRestore(TestContainer container)
432435
getSourceQuerySQL(
433436
DATABASE, SINK_TABLE1)))));
434437

435-
Pattern jobIdPattern =
436-
Pattern.compile(
437-
".*Init JobMaster for Job oraclecdc_to_oracle_with_multi_table_mode_one_table.conf \\(([0-9]*)\\).*",
438-
Pattern.DOTALL);
439-
Matcher matcher = jobIdPattern.matcher(container.getServerLogs());
440-
String jobId;
441-
if (matcher.matches()) {
442-
jobId = matcher.group(1);
443-
} else {
444-
throw new RuntimeException("Can not find jobId");
445-
}
446-
447-
Assertions.assertEquals(0, container.savepointJob(jobId).getExitCode());
438+
Assertions.assertEquals(0, container.savepointJob(String.valueOf(jobId)).getExitCode());
448439

449440
// Restore job with add a new table
450441
CompletableFuture.supplyAsync(
451442
() -> {
452443
try {
453444
container.restoreJob(
454-
"/oraclecdc_to_oracle_with_multi_table_mode_two_table.conf", jobId);
445+
"/oraclecdc_to_oracle_with_multi_table_mode_two_table.conf",
446+
String.valueOf(jobId));
455447
} catch (Exception e) {
456448
log.error("Commit task exception :" + e.getMessage());
457449
throw new RuntimeException(e);

Diff for: seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java

+13-30
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.seatunnel.e2e.common.container.TestContainer;
2525
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
2626
import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
27+
import org.apache.seatunnel.e2e.common.util.JobIdGenerator;
2728

2829
import org.junit.jupiter.api.AfterAll;
2930
import org.junit.jupiter.api.Assertions;
@@ -274,12 +275,14 @@ public void testPostgresCdcMultiTableE2e(TestContainer container) {
274275
disabledReason = "Currently SPARK and FLINK do not support restore")
275276
public void testMultiTableWithRestore(TestContainer container)
276277
throws IOException, InterruptedException {
278+
Long jobId = JobIdGenerator.newJobId();
277279
try {
278280
CompletableFuture.supplyAsync(
279281
() -> {
280282
try {
281283
return container.executeJob(
282-
"/pgcdc_to_pg_with_multi_table_mode_one_table.conf");
284+
"/pgcdc_to_pg_with_multi_table_mode_one_table.conf",
285+
String.valueOf(jobId));
283286
} catch (Exception e) {
284287
log.error("Commit task exception :" + e.getMessage());
285288
throw new RuntimeException(e);
@@ -305,26 +308,15 @@ public void testMultiTableWithRestore(TestContainer container)
305308
POSTGRESQL_SCHEMA,
306309
SINK_TABLE_1)))));
307310

308-
Pattern jobIdPattern =
309-
Pattern.compile(
310-
".*Init JobMaster for Job pgcdc_to_pg_with_multi_table_mode_one_table.conf \\(([0-9]*)\\).*",
311-
Pattern.DOTALL);
312-
Matcher matcher = jobIdPattern.matcher(container.getServerLogs());
313-
String jobId;
314-
if (matcher.matches()) {
315-
jobId = matcher.group(1);
316-
} else {
317-
throw new RuntimeException("Can not find jobId");
318-
}
319-
320-
Assertions.assertEquals(0, container.savepointJob(jobId).getExitCode());
311+
Assertions.assertEquals(0, container.savepointJob(String.valueOf(jobId)).getExitCode());
321312

322313
// Restore job with add a new table
323314
CompletableFuture.supplyAsync(
324315
() -> {
325316
try {
326317
container.restoreJob(
327-
"/pgcdc_to_pg_with_multi_table_mode_two_table.conf", jobId);
318+
"/pgcdc_to_pg_with_multi_table_mode_two_table.conf",
319+
String.valueOf(jobId));
328320
} catch (Exception e) {
329321
log.error("Commit task exception :" + e.getMessage());
330322
throw new RuntimeException(e);
@@ -382,12 +374,14 @@ public void testMultiTableWithRestore(TestContainer container)
382374
disabledReason = "Currently SPARK and FLINK do not support restore")
383375
public void testAddFiledWithRestore(TestContainer container)
384376
throws IOException, InterruptedException {
377+
Long jobId = JobIdGenerator.newJobId();
385378
try {
386379
CompletableFuture.supplyAsync(
387380
() -> {
388381
try {
389382
return container.executeJob(
390-
"/postgrescdc_to_postgres_test_add_Filed.conf");
383+
"/postgrescdc_to_postgres_test_add_Filed.conf",
384+
String.valueOf(jobId));
391385
} catch (Exception e) {
392386
log.error("Commit task exception :" + e.getMessage());
393387
throw new RuntimeException(e);
@@ -410,19 +404,7 @@ public void testAddFiledWithRestore(TestContainer container)
410404
POSTGRESQL_SCHEMA,
411405
SINK_TABLE_3)))));
412406

413-
Pattern jobIdPattern =
414-
Pattern.compile(
415-
".*Init JobMaster for Job postgrescdc_to_postgres_test_add_Filed.conf \\(([0-9]*)\\).*",
416-
Pattern.DOTALL);
417-
Matcher matcher = jobIdPattern.matcher(container.getServerLogs());
418-
String jobId;
419-
if (matcher.matches()) {
420-
jobId = matcher.group(1);
421-
} else {
422-
throw new RuntimeException("Can not find jobId");
423-
}
424-
425-
Assertions.assertEquals(0, container.savepointJob(jobId).getExitCode());
407+
Assertions.assertEquals(0, container.savepointJob(String.valueOf(jobId)).getExitCode());
426408

427409
// add filed add insert source table data
428410
addFieldsForTable(POSTGRESQL_SCHEMA, SOURCE_TABLE_3);
@@ -434,7 +416,8 @@ public void testAddFiledWithRestore(TestContainer container)
434416
() -> {
435417
try {
436418
container.restoreJob(
437-
"/postgrescdc_to_postgres_test_add_Filed.conf", jobId);
419+
"/postgrescdc_to_postgres_test_add_Filed.conf",
420+
String.valueOf(jobId));
438421
} catch (Exception e) {
439422
log.error("Commit task exception :" + e.getMessage());
440423
throw new RuntimeException(e);

Diff for: seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tidb/TiDBCDCIT.java

+5-17
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.seatunnel.e2e.common.container.TestContainer;
2525
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
2626
import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
27+
import org.apache.seatunnel.e2e.common.util.JobIdGenerator;
2728

2829
import org.junit.jupiter.api.AfterAll;
2930
import org.junit.jupiter.api.Assertions;
@@ -41,8 +42,6 @@
4142
import java.util.List;
4243
import java.util.concurrent.CompletableFuture;
4344
import java.util.concurrent.TimeUnit;
44-
import java.util.regex.Matcher;
45-
import java.util.regex.Pattern;
4645

4746
import static org.awaitility.Awaitility.await;
4847

@@ -167,11 +166,11 @@ public void testMultiTableWithRestore(TestContainer container)
167166
// Clear related content to ensure that multiple operations are not affected
168167
clearTable(TIDB_DATABASE, SOURCE_TABLE);
169168
clearTable(TIDB_DATABASE, SINK_TABLE);
170-
169+
Long jobId = JobIdGenerator.newJobId();
171170
CompletableFuture.supplyAsync(
172171
() -> {
173172
try {
174-
container.executeJob("/tidb/tidbcdc_to_tidb.conf");
173+
container.executeJob("/tidb/tidbcdc_to_tidb.conf", String.valueOf(jobId));
175174
} catch (Exception e) {
176175
log.error("Commit task exception :" + e.getMessage());
177176
throw new RuntimeException(e);
@@ -192,24 +191,13 @@ public void testMultiTableWithRestore(TestContainer container)
192191
query(getSinkQuerySQL(TIDB_DATABASE, SINK_TABLE))));
193192
});
194193

195-
Pattern jobIdPattern =
196-
Pattern.compile(
197-
".*Init JobMaster for Job tidbcdc_to_tidb.conf \\(([0-9]*)\\).*",
198-
Pattern.DOTALL);
199-
Matcher matcher = jobIdPattern.matcher(container.getServerLogs());
200-
String jobId;
201-
if (matcher.matches()) {
202-
jobId = matcher.group(1);
203-
} else {
204-
throw new RuntimeException("Can not find jobId");
205-
}
206-
Assertions.assertEquals(0, container.savepointJob(jobId).getExitCode());
194+
Assertions.assertEquals(0, container.savepointJob(String.valueOf(jobId)).getExitCode());
207195

208196
// Restore job
209197
CompletableFuture.supplyAsync(
210198
() -> {
211199
try {
212-
container.restoreJob("/tidb/tidbcdc_to_tidb.conf", jobId);
200+
container.restoreJob("/tidb/tidbcdc_to_tidb.conf", String.valueOf(jobId));
213201
} catch (Exception e) {
214202
log.error("Commit task exception :" + e.getMessage());
215203
throw new RuntimeException(e);

0 commit comments

Comments
 (0)