1616 */
1717package org .apache .seatunnel .e2e .connector .redis ;
1818
19+ import org .apache .seatunnel .shade .com .fasterxml .jackson .databind .node .ObjectNode ;
20+
1921import org .apache .seatunnel .api .table .type .ArrayType ;
2022import org .apache .seatunnel .api .table .type .BasicType ;
2123import org .apache .seatunnel .api .table .type .DecimalType ;
2527import org .apache .seatunnel .api .table .type .SeaTunnelDataType ;
2628import org .apache .seatunnel .api .table .type .SeaTunnelRow ;
2729import org .apache .seatunnel .api .table .type .SeaTunnelRowType ;
30+ import org .apache .seatunnel .common .utils .JsonUtils ;
2831import org .apache .seatunnel .e2e .common .TestResource ;
2932import org .apache .seatunnel .e2e .common .TestSuiteBase ;
33+ import org .apache .seatunnel .e2e .common .container .EngineType ;
3034import org .apache .seatunnel .e2e .common .container .TestContainer ;
35+ import org .apache .seatunnel .e2e .common .junit .DisabledOnContainer ;
3136import org .apache .seatunnel .format .json .JsonSerializationSchema ;
3237
3338import org .junit .jupiter .api .AfterAll ;
3439import org .junit .jupiter .api .Assertions ;
3540import org .junit .jupiter .api .BeforeAll ;
3641import org .junit .jupiter .api .TestTemplate ;
42+ import org .junit .jupiter .api .condition .DisabledOnOs ;
43+ import org .junit .jupiter .api .condition .OS ;
3744import org .testcontainers .containers .Container ;
3845import org .testcontainers .containers .GenericContainer ;
3946import org .testcontainers .containers .output .Slf4jLogConsumer ;
5259import java .time .LocalDate ;
5360import java .time .LocalDateTime ;
5461import java .util .ArrayList ;
62+ import java .util .Arrays ;
5563import java .util .Collections ;
5664import java .util .HashMap ;
5765import java .util .List ;
5866import java .util .Map ;
5967import java .util .Objects ;
68+ import java .util .concurrent .CompletableFuture ;
69+ import java .util .concurrent .TimeUnit ;
70+ import java .util .regex .Matcher ;
71+ import java .util .regex .Pattern ;
72+ import java .util .stream .Collectors ;
6073import java .util .stream .Stream ;
6174
75+ import static org .awaitility .Awaitility .await ;
76+
6277@ Slf4j
6378public abstract class RedisTestCaseTemplateIT extends TestSuiteBase implements TestResource {
6479
@@ -492,7 +507,7 @@ public void testFakeToRedisDeleteSetTest(TestContainer container)
492507 }
493508
494509 @ TestTemplate
495- public void testMysqlCdcToRedisDeleteZSetTest (TestContainer container )
510+ public void testFakeToToRedisDeleteZSetTest (TestContainer container )
496511 throws IOException , InterruptedException {
497512 Container .ExecResult execResult =
498513 container .executeJob ("/fake-to-redis-test-delete-zset.conf" );
@@ -501,6 +516,69 @@ public void testMysqlCdcToRedisDeleteZSetTest(TestContainer container)
501516 jedis .del ("zset_check" );
502517 }
503518
519+ @ TestTemplate
520+ @ DisabledOnContainer (
521+ value = {},
522+ type = {EngineType .SPARK , EngineType .FLINK },
523+ disabledReason = "Only support for seatunnel" )
524+ @ DisabledOnOs (OS .WINDOWS )
525+ public void testFakeToRedisInRealTimeTest (TestContainer container )
526+ throws IOException , InterruptedException {
527+ CompletableFuture .supplyAsync (
528+ () -> {
529+ try {
530+ container .executeJob ("/fake-to-redis-test-in-real-time.conf" );
531+ } catch (Exception e ) {
532+ log .error ("Commit task exception :" + e .getMessage ());
533+ throw new RuntimeException (e );
534+ }
535+ return null ;
536+ });
537+ await ().atMost (60000 , TimeUnit .MILLISECONDS )
538+ .untilAsserted (
539+ () -> {
540+ Assertions .assertEquals (3 , jedis .llen ("list_check" ));
541+ });
542+ jedis .del ("list_check" );
543+ // Get the task id
544+ Container .ExecResult execResult = container .executeBaseCommand (new String [] {"-l" });
545+ String regex = "(\\ d+)\\ s+" ;
546+ Pattern pattern = Pattern .compile (regex );
547+ List <String > runningJobId =
548+ Arrays .stream (execResult .getStdout ().toString ().split ("\n " ))
549+ .filter (s -> s .contains ("fake-to-redis-test-in-real-time" ))
550+ .map (
551+ s -> {
552+ Matcher matcher = pattern .matcher (s );
553+ return matcher .find () ? matcher .group (1 ) : null ;
554+ })
555+ .filter (jobId -> jobId != null )
556+ .collect (Collectors .toList ());
557+ Assertions .assertEquals (1 , runningJobId .size ());
558+ // Verify that the status is Running
559+ for (String jobId : runningJobId ) {
560+ Container .ExecResult execResult1 =
561+ container .executeBaseCommand (new String [] {"-j" , jobId });
562+ String stdout = execResult1 .getStdout ();
563+ ObjectNode jsonNodes = JsonUtils .parseObject (stdout );
564+ Assertions .assertEquals (jsonNodes .get ("jobStatus" ).asText (), "RUNNING" );
565+ }
566+ // Execute cancellation task
567+ String [] batchCancelCommand =
568+ Stream .concat (Arrays .stream (new String [] {"-can" }), runningJobId .stream ())
569+ .toArray (String []::new );
570+ Assertions .assertEquals (0 , container .executeBaseCommand (batchCancelCommand ).getExitCode ());
571+
572+ // Verify whether the cancellation is successful
573+ for (String jobId : runningJobId ) {
574+ Container .ExecResult execResult1 =
575+ container .executeBaseCommand (new String [] {"-j" , jobId });
576+ String stdout = execResult1 .getStdout ();
577+ ObjectNode jsonNodes = JsonUtils .parseObject (stdout );
578+ Assertions .assertEquals (jsonNodes .get ("jobStatus" ).asText (), "CANCELED" );
579+ }
580+ }
581+
504582 @ TestTemplate
505583 public void testFakeToRedisNormalKeyIsNullTest (TestContainer container )
506584 throws IOException , InterruptedException {
0 commit comments