Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -646,85 +646,100 @@ public void testDatabaseNameWithHyphenEndToEnd() throws Exception {
FlinkSourceProvider sourceProvider =
(FlinkSourceProvider) dataSource.getEventSourceProvider();

CloseableIterator<Event> events =
DataStreamSource<Event> source =
testEnv.fromSource(
sourceProvider.getSource(),
WatermarkStrategy.noWatermarks(),
PostgresDataSourceFactory.IDENTIFIER,
new EventTypeInfo())
.executeAndCollect();
sourceProvider.getSource(),
WatermarkStrategy.noWatermarks(),
PostgresDataSourceFactory.IDENTIFIER,
new EventTypeInfo());

TypeSerializer<Event> serializer =
source.getTransformation().getOutputType().createSerializer(testEnv.getConfig());
CheckpointedCollectResultBuffer<Event> resultBuffer =
new CheckpointedCollectResultBuffer<>(serializer);
String accumulatorName = "dataStreamCollect_" + UUID.randomUUID();
CollectResultIterator<Event> iterator =
addCollector(testEnv, source, resultBuffer, serializer, accumulatorName);

// Collect events and verify data
List<Event> collectedEvents = new ArrayList<>();
int expectedDataCount = 3; // We inserted 3 rows
int dataCount = 0;
int maxEvents = 10; // Safety limit

while (events.hasNext() && collectedEvents.size() < maxEvents) {
Event event = events.next();
collectedEvents.add(event);
if (event instanceof DataChangeEvent) {
dataCount++;
if (dataCount >= expectedDataCount) {
break;
JobClient jobClient = testEnv.executeAsync("testDatabaseNameWithHyphen");
iterator.setJobClient(jobClient);

try {
// Collect events and verify data
List<Event> collectedEvents = new ArrayList<>();
int expectedDataCount = 3; // We inserted 3 rows
int dataCount = 0;
int maxEvents = 10; // Safety limit

while (iterator.hasNext() && collectedEvents.size() < maxEvents) {
Event event = iterator.next();
collectedEvents.add(event);
if (event instanceof DataChangeEvent) {
dataCount++;
if (dataCount >= expectedDataCount) {
break;
}
}
}
}
events.close();

// Verify we received CreateTableEvent and DataChangeEvents
assertThat(collectedEvents).isNotEmpty();

// Check for CreateTableEvent
long createTableEventCount =
collectedEvents.stream().filter(e -> e instanceof CreateTableEvent).count();
assertThat(createTableEventCount).isGreaterThanOrEqualTo(1);
// Verify we received CreateTableEvent and DataChangeEvents
assertThat(collectedEvents).isNotEmpty();

// Check for DataChangeEvents (INSERT events from snapshot)
List<DataChangeEvent> dataChangeEvents =
collectedEvents.stream()
.filter(e -> e instanceof DataChangeEvent)
.map(e -> (DataChangeEvent) e)
.collect(Collectors.toList());
// Check for CreateTableEvent
long createTableEventCount =
collectedEvents.stream().filter(e -> e instanceof CreateTableEvent).count();
assertThat(createTableEventCount).isGreaterThanOrEqualTo(1);

assertThat(dataChangeEvents).hasSize(expectedDataCount);
// Check for DataChangeEvents (INSERT events from snapshot)
List<DataChangeEvent> dataChangeEvents =
collectedEvents.stream()
.filter(e -> e instanceof DataChangeEvent)
.map(e -> (DataChangeEvent) e)
.collect(Collectors.toList());

// Verify the table ID in events
for (DataChangeEvent dce : dataChangeEvents) {
assertThat(dce.tableId().getSchemaName()).isEqualTo("public");
assertThat(dce.tableId().getTableName()).isEqualTo("test_table");
}
assertThat(dataChangeEvents).hasSize(expectedDataCount);

// Verify the data content - we should have 3 INSERT events with ids 1, 2, 3
List<Integer> actualIds =
dataChangeEvents.stream()
.map(
dce -> {
RecordData after = dce.after();
return after.getInt(0); // id column
})
.sorted()
.collect(Collectors.toList());
assertThat(actualIds).containsExactly(1, 2, 3);
// Verify the table ID in events
for (DataChangeEvent dce : dataChangeEvents) {
assertThat(dce.tableId().getSchemaName()).isEqualTo("public");
assertThat(dce.tableId().getTableName()).isEqualTo("test_table");
}

// Cleanup - first drop replication slot, then terminate connections and drop database
try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
Statement statement = connection.createStatement()) {
// Drop replication slot first (it was created during CDC connection)
try {
statement.execute(String.format("SELECT pg_drop_replication_slot('%s')", slotName));
} catch (SQLException e) {
// Ignore if slot doesn't exist
LOG.warn("Failed to drop replication slot: {}", e.getMessage());
// Verify the data content - we should have 3 INSERT events with ids 1, 2, 3
List<Integer> actualIds =
dataChangeEvents.stream()
.map(
dce -> {
RecordData after = dce.after();
return after.getInt(0); // id column
})
.sorted()
.collect(Collectors.toList());
assertThat(actualIds).containsExactly(1, 2, 3);
} finally {
// Cancel the job first to release the replication slot
iterator.close();
jobClient.cancel().get();

Comment thread
Hisoka-X marked this conversation as resolved.
Outdated
// Wait for the job to fully stop and release the replication slot
Thread.sleep(3000);

Comment thread
lvyanquan marked this conversation as resolved.
// Cleanup - drop replication slot, terminate connections and drop database
try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
Statement statement = connection.createStatement()) {
try {
statement.execute(
String.format("SELECT pg_drop_replication_slot('%s')", slotName));
} catch (SQLException e) {
LOG.warn("Failed to drop replication slot: {}", e.getMessage());
}
Comment thread
lvyanquan marked this conversation as resolved.
statement.execute(
"SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname = '"
+ hyphenDbName
+ "'");
Thread.sleep(500);
statement.execute("DROP DATABASE IF EXISTS \"" + hyphenDbName + "\"");
}
// Terminate all connections to the database
statement.execute(
"SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname = '"
+ hyphenDbName
+ "'");
// Small delay to ensure connections are terminated
Thread.sleep(500);
statement.execute("DROP DATABASE IF EXISTS \"" + hyphenDbName + "\"");
}
}

Expand Down
Loading