Skip to content

Commit 31d68ac

Browse files
authored
[chore][test] Fix flaky postgres pipeline test case (#4293)
1 parent 69dae39 commit 31d68ac

1 file changed

Lines changed: 86 additions & 67 deletions

File tree

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java

Lines changed: 86 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -646,85 +646,104 @@ public void testDatabaseNameWithHyphenEndToEnd() throws Exception {
646646
FlinkSourceProvider sourceProvider =
647647
(FlinkSourceProvider) dataSource.getEventSourceProvider();
648648

649-
CloseableIterator<Event> events =
649+
DataStreamSource<Event> source =
650650
testEnv.fromSource(
651-
sourceProvider.getSource(),
652-
WatermarkStrategy.noWatermarks(),
653-
PostgresDataSourceFactory.IDENTIFIER,
654-
new EventTypeInfo())
655-
.executeAndCollect();
651+
sourceProvider.getSource(),
652+
WatermarkStrategy.noWatermarks(),
653+
PostgresDataSourceFactory.IDENTIFIER,
654+
new EventTypeInfo());
656655

657-
// Collect events and verify data
658-
List<Event> collectedEvents = new ArrayList<>();
659-
int expectedDataCount = 3; // We inserted 3 rows
660-
int dataCount = 0;
661-
int maxEvents = 10; // Safety limit
662-
663-
while (events.hasNext() && collectedEvents.size() < maxEvents) {
664-
Event event = events.next();
665-
collectedEvents.add(event);
666-
if (event instanceof DataChangeEvent) {
667-
dataCount++;
668-
if (dataCount >= expectedDataCount) {
669-
break;
656+
TypeSerializer<Event> serializer =
657+
source.getTransformation().getOutputType().createSerializer(testEnv.getConfig());
658+
CheckpointedCollectResultBuffer<Event> resultBuffer =
659+
new CheckpointedCollectResultBuffer<>(serializer);
660+
String accumulatorName = "dataStreamCollect_" + UUID.randomUUID();
661+
CollectResultIterator<Event> iterator =
662+
addCollector(testEnv, source, resultBuffer, serializer, accumulatorName);
663+
664+
JobClient jobClient = testEnv.executeAsync("testDatabaseNameWithHyphen");
665+
iterator.setJobClient(jobClient);
666+
667+
try {
668+
// Collect events and verify data
669+
List<Event> collectedEvents = new ArrayList<>();
670+
int expectedDataCount = 3; // We inserted 3 rows
671+
int dataCount = 0;
672+
int maxEvents = 10; // Safety limit
673+
674+
while (iterator.hasNext() && collectedEvents.size() < maxEvents) {
675+
Event event = iterator.next();
676+
collectedEvents.add(event);
677+
if (event instanceof DataChangeEvent) {
678+
dataCount++;
679+
if (dataCount >= expectedDataCount) {
680+
break;
681+
}
670682
}
671683
}
672-
}
673-
events.close();
674684

675-
// Verify we received CreateTableEvent and DataChangeEvents
676-
assertThat(collectedEvents).isNotEmpty();
685+
// Verify we received CreateTableEvent and DataChangeEvents
686+
assertThat(collectedEvents).isNotEmpty();
677687

678-
// Check for CreateTableEvent
679-
long createTableEventCount =
680-
collectedEvents.stream().filter(e -> e instanceof CreateTableEvent).count();
681-
assertThat(createTableEventCount).isGreaterThanOrEqualTo(1);
688+
// Check for CreateTableEvent
689+
long createTableEventCount =
690+
collectedEvents.stream().filter(e -> e instanceof CreateTableEvent).count();
691+
assertThat(createTableEventCount).isGreaterThanOrEqualTo(1);
682692

683-
// Check for DataChangeEvents (INSERT events from snapshot)
684-
List<DataChangeEvent> dataChangeEvents =
685-
collectedEvents.stream()
686-
.filter(e -> e instanceof DataChangeEvent)
687-
.map(e -> (DataChangeEvent) e)
688-
.collect(Collectors.toList());
693+
// Check for DataChangeEvents (INSERT events from snapshot)
694+
List<DataChangeEvent> dataChangeEvents =
695+
collectedEvents.stream()
696+
.filter(e -> e instanceof DataChangeEvent)
697+
.map(e -> (DataChangeEvent) e)
698+
.collect(Collectors.toList());
689699

690-
assertThat(dataChangeEvents).hasSize(expectedDataCount);
691-
692-
// Verify the table ID in events
693-
for (DataChangeEvent dce : dataChangeEvents) {
694-
assertThat(dce.tableId().getSchemaName()).isEqualTo("public");
695-
assertThat(dce.tableId().getTableName()).isEqualTo("test_table");
696-
}
700+
assertThat(dataChangeEvents).hasSize(expectedDataCount);
697701

698-
// Verify the data content - we should have 3 INSERT events with ids 1, 2, 3
699-
List<Integer> actualIds =
700-
dataChangeEvents.stream()
701-
.map(
702-
dce -> {
703-
RecordData after = dce.after();
704-
return after.getInt(0); // id column
705-
})
706-
.sorted()
707-
.collect(Collectors.toList());
708-
assertThat(actualIds).containsExactly(1, 2, 3);
702+
// Verify the table ID in events
703+
for (DataChangeEvent dce : dataChangeEvents) {
704+
assertThat(dce.tableId().getSchemaName()).isEqualTo("public");
705+
assertThat(dce.tableId().getTableName()).isEqualTo("test_table");
706+
}
709707

710-
// Cleanup - first drop replication slot, then terminate connections and drop database
711-
try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
712-
Statement statement = connection.createStatement()) {
713-
// Drop replication slot first (it was created during CDC connection)
708+
// Verify the data content - we should have 3 INSERT events with ids 1, 2, 3
709+
List<Integer> actualIds =
710+
dataChangeEvents.stream()
711+
.map(
712+
dce -> {
713+
RecordData after = dce.after();
714+
return after.getInt(0); // id column
715+
})
716+
.sorted()
717+
.collect(Collectors.toList());
718+
assertThat(actualIds).containsExactly(1, 2, 3);
719+
} finally {
720+
// Cancel the job with a bounded wait so cleanup always runs
714721
try {
715-
statement.execute(String.format("SELECT pg_drop_replication_slot('%s')", slotName));
716-
} catch (SQLException e) {
717-
// Ignore if slot doesn't exist
718-
LOG.warn("Failed to drop replication slot: {}", e.getMessage());
722+
iterator.close();
723+
jobClient.cancel().get();
724+
} catch (Exception e) {
725+
LOG.warn("Failed to cancel job: {}", e.getMessage());
726+
}
727+
728+
// Wait for the job to fully stop and release the replication slot
729+
Thread.sleep(3000);
730+
731+
// Cleanup - drop replication slot, terminate connections and drop database
732+
try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
733+
Statement statement = connection.createStatement()) {
734+
try {
735+
statement.execute(
736+
String.format("SELECT pg_drop_replication_slot('%s')", slotName));
737+
} catch (SQLException e) {
738+
LOG.warn("Failed to drop replication slot: {}", e.getMessage());
739+
}
740+
statement.execute(
741+
"SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname = '"
742+
+ hyphenDbName
743+
+ "'");
744+
Thread.sleep(500);
745+
statement.execute("DROP DATABASE IF EXISTS \"" + hyphenDbName + "\"");
719746
}
720-
// Terminate all connections to the database
721-
statement.execute(
722-
"SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname = '"
723-
+ hyphenDbName
724-
+ "'");
725-
// Small delay to ensure connections are terminated
726-
Thread.sleep(500);
727-
statement.execute("DROP DATABASE IF EXISTS \"" + hyphenDbName + "\"");
728747
}
729748
}
730749

0 commit comments

Comments
 (0)