Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -646,85 +646,104 @@ public void testDatabaseNameWithHyphenEndToEnd() throws Exception {
FlinkSourceProvider sourceProvider =
(FlinkSourceProvider) dataSource.getEventSourceProvider();

CloseableIterator<Event> events =
DataStreamSource<Event> source =
testEnv.fromSource(
sourceProvider.getSource(),
WatermarkStrategy.noWatermarks(),
PostgresDataSourceFactory.IDENTIFIER,
new EventTypeInfo())
.executeAndCollect();
sourceProvider.getSource(),
WatermarkStrategy.noWatermarks(),
PostgresDataSourceFactory.IDENTIFIER,
new EventTypeInfo());

// Collect events and verify data
List<Event> collectedEvents = new ArrayList<>();
int expectedDataCount = 3; // We inserted 3 rows
int dataCount = 0;
int maxEvents = 10; // Safety limit

while (events.hasNext() && collectedEvents.size() < maxEvents) {
Event event = events.next();
collectedEvents.add(event);
if (event instanceof DataChangeEvent) {
dataCount++;
if (dataCount >= expectedDataCount) {
break;
TypeSerializer<Event> serializer =
source.getTransformation().getOutputType().createSerializer(testEnv.getConfig());
CheckpointedCollectResultBuffer<Event> resultBuffer =
new CheckpointedCollectResultBuffer<>(serializer);
String accumulatorName = "dataStreamCollect_" + UUID.randomUUID();
CollectResultIterator<Event> iterator =
addCollector(testEnv, source, resultBuffer, serializer, accumulatorName);

JobClient jobClient = testEnv.executeAsync("testDatabaseNameWithHyphen");
iterator.setJobClient(jobClient);

try {
// Collect events and verify data
List<Event> collectedEvents = new ArrayList<>();
int expectedDataCount = 3; // We inserted 3 rows
int dataCount = 0;
int maxEvents = 10; // Safety limit

while (iterator.hasNext() && collectedEvents.size() < maxEvents) {
Event event = iterator.next();
collectedEvents.add(event);
if (event instanceof DataChangeEvent) {
dataCount++;
if (dataCount >= expectedDataCount) {
break;
}
}
}
}
events.close();

// Verify we received CreateTableEvent and DataChangeEvents
assertThat(collectedEvents).isNotEmpty();
// Verify we received CreateTableEvent and DataChangeEvents
assertThat(collectedEvents).isNotEmpty();

// Check for CreateTableEvent
long createTableEventCount =
collectedEvents.stream().filter(e -> e instanceof CreateTableEvent).count();
assertThat(createTableEventCount).isGreaterThanOrEqualTo(1);
// Check for CreateTableEvent
long createTableEventCount =
collectedEvents.stream().filter(e -> e instanceof CreateTableEvent).count();
assertThat(createTableEventCount).isGreaterThanOrEqualTo(1);

// Check for DataChangeEvents (INSERT events from snapshot)
List<DataChangeEvent> dataChangeEvents =
collectedEvents.stream()
.filter(e -> e instanceof DataChangeEvent)
.map(e -> (DataChangeEvent) e)
.collect(Collectors.toList());
// Check for DataChangeEvents (INSERT events from snapshot)
List<DataChangeEvent> dataChangeEvents =
collectedEvents.stream()
.filter(e -> e instanceof DataChangeEvent)
.map(e -> (DataChangeEvent) e)
.collect(Collectors.toList());

assertThat(dataChangeEvents).hasSize(expectedDataCount);

// Verify the table ID in events
for (DataChangeEvent dce : dataChangeEvents) {
assertThat(dce.tableId().getSchemaName()).isEqualTo("public");
assertThat(dce.tableId().getTableName()).isEqualTo("test_table");
}
assertThat(dataChangeEvents).hasSize(expectedDataCount);

// Verify the data content - we should have 3 INSERT events with ids 1, 2, 3
List<Integer> actualIds =
dataChangeEvents.stream()
.map(
dce -> {
RecordData after = dce.after();
return after.getInt(0); // id column
})
.sorted()
.collect(Collectors.toList());
assertThat(actualIds).containsExactly(1, 2, 3);
// Verify the table ID in events
for (DataChangeEvent dce : dataChangeEvents) {
assertThat(dce.tableId().getSchemaName()).isEqualTo("public");
assertThat(dce.tableId().getTableName()).isEqualTo("test_table");
}

// Cleanup - first drop replication slot, then terminate connections and drop database
try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
Statement statement = connection.createStatement()) {
// Drop replication slot first (it was created during CDC connection)
// Verify the data content - we should have 3 INSERT events with ids 1, 2, 3
List<Integer> actualIds =
dataChangeEvents.stream()
.map(
dce -> {
RecordData after = dce.after();
return after.getInt(0); // id column
})
.sorted()
.collect(Collectors.toList());
assertThat(actualIds).containsExactly(1, 2, 3);
} finally {
// Cancel the job with a bounded wait so cleanup always runs
try {
statement.execute(String.format("SELECT pg_drop_replication_slot('%s')", slotName));
} catch (SQLException e) {
// Ignore if slot doesn't exist
LOG.warn("Failed to drop replication slot: {}", e.getMessage());
iterator.close();
jobClient.cancel().get();
} catch (Exception e) {
LOG.warn("Failed to cancel job: {}", e.getMessage());
}

// Wait for the job to fully stop and release the replication slot
Thread.sleep(3000);

Comment thread
lvyanquan marked this conversation as resolved.
// Cleanup - drop replication slot, terminate connections and drop database
try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
Statement statement = connection.createStatement()) {
try {
statement.execute(
String.format("SELECT pg_drop_replication_slot('%s')", slotName));
} catch (SQLException e) {
LOG.warn("Failed to drop replication slot: {}", e.getMessage());
}
Comment thread
lvyanquan marked this conversation as resolved.
statement.execute(
"SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname = '"
+ hyphenDbName
+ "'");
Thread.sleep(500);
statement.execute("DROP DATABASE IF EXISTS \"" + hyphenDbName + "\"");
}
// Terminate all connections to the database
statement.execute(
"SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname = '"
+ hyphenDbName
+ "'");
// Small delay to ensure connections are terminated
Thread.sleep(500);
statement.execute("DROP DATABASE IF EXISTS \"" + hyphenDbName + "\"");
}
}

Expand Down
Loading