@@ -646,85 +646,104 @@ public void testDatabaseNameWithHyphenEndToEnd() throws Exception {
646646 FlinkSourceProvider sourceProvider =
647647 (FlinkSourceProvider ) dataSource .getEventSourceProvider ();
648648
649- CloseableIterator <Event > events =
649+ DataStreamSource <Event > source =
650650 testEnv .fromSource (
651- sourceProvider .getSource (),
652- WatermarkStrategy .noWatermarks (),
653- PostgresDataSourceFactory .IDENTIFIER ,
654- new EventTypeInfo ())
655- .executeAndCollect ();
651+ sourceProvider .getSource (),
652+ WatermarkStrategy .noWatermarks (),
653+ PostgresDataSourceFactory .IDENTIFIER ,
654+ new EventTypeInfo ());
656655
657- // Collect events and verify data
658- List <Event > collectedEvents = new ArrayList <>();
659- int expectedDataCount = 3 ; // We inserted 3 rows
660- int dataCount = 0 ;
661- int maxEvents = 10 ; // Safety limit
662-
663- while (events .hasNext () && collectedEvents .size () < maxEvents ) {
664- Event event = events .next ();
665- collectedEvents .add (event );
666- if (event instanceof DataChangeEvent ) {
667- dataCount ++;
668- if (dataCount >= expectedDataCount ) {
669- break ;
656+ TypeSerializer <Event > serializer =
657+ source .getTransformation ().getOutputType ().createSerializer (testEnv .getConfig ());
658+ CheckpointedCollectResultBuffer <Event > resultBuffer =
659+ new CheckpointedCollectResultBuffer <>(serializer );
660+ String accumulatorName = "dataStreamCollect_" + UUID .randomUUID ();
661+ CollectResultIterator <Event > iterator =
662+ addCollector (testEnv , source , resultBuffer , serializer , accumulatorName );
663+
664+ JobClient jobClient = testEnv .executeAsync ("testDatabaseNameWithHyphen" );
665+ iterator .setJobClient (jobClient );
666+
667+ try {
668+ // Collect events and verify data
669+ List <Event > collectedEvents = new ArrayList <>();
670+ int expectedDataCount = 3 ; // We inserted 3 rows
671+ int dataCount = 0 ;
672+ int maxEvents = 10 ; // Safety limit
673+
674+ while (iterator .hasNext () && collectedEvents .size () < maxEvents ) {
675+ Event event = iterator .next ();
676+ collectedEvents .add (event );
677+ if (event instanceof DataChangeEvent ) {
678+ dataCount ++;
679+ if (dataCount >= expectedDataCount ) {
680+ break ;
681+ }
670682 }
671683 }
672- }
673- events .close ();
674684
675- // Verify we received CreateTableEvent and DataChangeEvents
676- assertThat (collectedEvents ).isNotEmpty ();
685+ // Verify we received CreateTableEvent and DataChangeEvents
686+ assertThat (collectedEvents ).isNotEmpty ();
677687
678- // Check for CreateTableEvent
679- long createTableEventCount =
680- collectedEvents .stream ().filter (e -> e instanceof CreateTableEvent ).count ();
681- assertThat (createTableEventCount ).isGreaterThanOrEqualTo (1 );
688+ // Check for CreateTableEvent
689+ long createTableEventCount =
690+ collectedEvents .stream ().filter (e -> e instanceof CreateTableEvent ).count ();
691+ assertThat (createTableEventCount ).isGreaterThanOrEqualTo (1 );
682692
683- // Check for DataChangeEvents (INSERT events from snapshot)
684- List <DataChangeEvent > dataChangeEvents =
685- collectedEvents .stream ()
686- .filter (e -> e instanceof DataChangeEvent )
687- .map (e -> (DataChangeEvent ) e )
688- .collect (Collectors .toList ());
693+ // Check for DataChangeEvents (INSERT events from snapshot)
694+ List <DataChangeEvent > dataChangeEvents =
695+ collectedEvents .stream ()
696+ .filter (e -> e instanceof DataChangeEvent )
697+ .map (e -> (DataChangeEvent ) e )
698+ .collect (Collectors .toList ());
689699
690- assertThat (dataChangeEvents ).hasSize (expectedDataCount );
691-
692- // Verify the table ID in events
693- for (DataChangeEvent dce : dataChangeEvents ) {
694- assertThat (dce .tableId ().getSchemaName ()).isEqualTo ("public" );
695- assertThat (dce .tableId ().getTableName ()).isEqualTo ("test_table" );
696- }
700+ assertThat (dataChangeEvents ).hasSize (expectedDataCount );
697701
698- // Verify the data content - we should have 3 INSERT events with ids 1, 2, 3
699- List <Integer > actualIds =
700- dataChangeEvents .stream ()
701- .map (
702- dce -> {
703- RecordData after = dce .after ();
704- return after .getInt (0 ); // id column
705- })
706- .sorted ()
707- .collect (Collectors .toList ());
708- assertThat (actualIds ).containsExactly (1 , 2 , 3 );
702+ // Verify the table ID in events
703+ for (DataChangeEvent dce : dataChangeEvents ) {
704+ assertThat (dce .tableId ().getSchemaName ()).isEqualTo ("public" );
705+ assertThat (dce .tableId ().getTableName ()).isEqualTo ("test_table" );
706+ }
709707
710- // Cleanup - first drop replication slot, then terminate connections and drop database
711- try (Connection connection = getJdbcConnection (POSTGRES_CONTAINER );
712- Statement statement = connection .createStatement ()) {
713- // Drop replication slot first (it was created during CDC connection)
708+ // Verify the data content - we should have 3 INSERT events with ids 1, 2, 3
709+ List <Integer > actualIds =
710+ dataChangeEvents .stream ()
711+ .map (
712+ dce -> {
713+ RecordData after = dce .after ();
714+ return after .getInt (0 ); // id column
715+ })
716+ .sorted ()
717+ .collect (Collectors .toList ());
718+ assertThat (actualIds ).containsExactly (1 , 2 , 3 );
719+ } finally {
720+ // Cancel the job with a bounded wait so cleanup always runs
714721 try {
715- statement .execute (String .format ("SELECT pg_drop_replication_slot('%s')" , slotName ));
716- } catch (SQLException e ) {
717- // Ignore if slot doesn't exist
718- LOG .warn ("Failed to drop replication slot: {}" , e .getMessage ());
722+ iterator .close ();
723+ jobClient .cancel ().get ();
724+ } catch (Exception e ) {
725+ LOG .warn ("Failed to cancel job: {}" , e .getMessage ());
726+ }
727+
728+ // Wait for the job to fully stop and release the replication slot
729+ Thread .sleep (3000 );
730+
731+ // Cleanup - drop replication slot, terminate connections and drop database
732+ try (Connection connection = getJdbcConnection (POSTGRES_CONTAINER );
733+ Statement statement = connection .createStatement ()) {
734+ try {
735+ statement .execute (
736+ String .format ("SELECT pg_drop_replication_slot('%s')" , slotName ));
737+ } catch (SQLException e ) {
738+ LOG .warn ("Failed to drop replication slot: {}" , e .getMessage ());
739+ }
740+ statement .execute (
741+ "SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname = '"
742+ + hyphenDbName
743+ + "'" );
744+ Thread .sleep (500 );
745+ statement .execute ("DROP DATABASE IF EXISTS \" " + hyphenDbName + "\" " );
719746 }
720- // Terminate all connections to the database
721- statement .execute (
722- "SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname = '"
723- + hyphenDbName
724- + "'" );
725- // Small delay to ensure connections are terminated
726- Thread .sleep (500 );
727- statement .execute ("DROP DATABASE IF EXISTS \" " + hyphenDbName + "\" " );
728747 }
729748 }
730749
0 commit comments