Skip to content

Commit 1c59744

Browse files
authored
[FLINK-38512][postgres] Update database name validation to allow hyphens (apache#4258)
1 parent aac7d8d commit 1c59744

2 files changed

Lines changed: 150 additions & 3 deletions

File tree

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -404,12 +404,14 @@ private Optional<String> getValidateDatabaseName(String tables) {
404404
private boolean isValidPostgresDbName(String dbName) {
405405
// PostgreSQL database name conventions:
406406
// 1. Length does not exceed 63 characters
407-
// 2. Can contain letters, numbers, underscores, and dollar signs
408-
// 3. Cannot start with a dollar sign
407+
// 2. Can contain letters, numbers, underscores, dollar signs, and hyphens
408+
// 3. Must start with a letter, underscore, or dollar sign (cannot start with a hyphen)
409+
// Note: While SQL identifiers have strict rules, database names created
410+
// via createdb command can contain hyphens (e.g., createdb foo-bar)
409411
if (dbName == null || dbName.length() > 63) {
410412
return false;
411413
}
412-
if (!dbName.matches("[a-zA-Z_$][a-zA-Z0-9_$]*")) {
414+
if (!dbName.matches("[a-zA-Z_$][a-zA-Z0-9_$-]*")) {
413415
return false;
414416
}
415417
return true;

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
2121
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
2222
import org.apache.flink.api.common.typeutils.TypeSerializer;
23+
import org.apache.flink.cdc.common.data.RecordData;
2324
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
2425
import org.apache.flink.cdc.common.event.CreateTableEvent;
2526
import org.apache.flink.cdc.common.event.DataChangeEvent;
@@ -68,6 +69,7 @@
6869
import java.sql.SQLException;
6970
import java.sql.Statement;
7071
import java.util.ArrayList;
72+
import java.util.Arrays;
7173
import java.util.Collections;
7274
import java.util.HashMap;
7375
import java.util.Iterator;
@@ -583,6 +585,149 @@ public void testSnapshotOnlyMode() throws Exception {
583585
.isEqualTo(String.format("Replication slot \"%s\" does not exist", slotName));
584586
}
585587

588+
@Test
589+
public void testDatabaseNameWithHyphenEndToEnd() throws Exception {
590+
// Create a real database with hyphen to verify full CDC sync works
591+
// This test verifies the fix for FLINK-38512
592+
String hyphenDbName = "test-db-with-hyphen";
593+
594+
// Create the database with hyphen (need to quote the name)
595+
try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
596+
Statement statement = connection.createStatement()) {
597+
// Drop if exists and create new database with hyphen in name
598+
statement.execute("DROP DATABASE IF EXISTS \"" + hyphenDbName + "\"");
599+
statement.execute("CREATE DATABASE \"" + hyphenDbName + "\"");
600+
}
601+
602+
// Connect to the new database and create a table with data
603+
String jdbcUrl =
604+
String.format(
605+
"jdbc:postgresql://%s:%d/%s",
606+
POSTGRES_CONTAINER.getHost(),
607+
POSTGRES_CONTAINER.getMappedPort(POSTGRESQL_PORT),
608+
hyphenDbName);
609+
try (Connection connection =
610+
java.sql.DriverManager.getConnection(jdbcUrl, TEST_USER, TEST_PASSWORD);
611+
Statement statement = connection.createStatement()) {
612+
statement.execute("CREATE TABLE test_table (id INT PRIMARY KEY, name VARCHAR(100))");
613+
statement.execute(
614+
"INSERT INTO test_table VALUES (1, 'test1'), (2, 'test2'), (3, 'test3')");
615+
}
616+
617+
// Create PostgresDataSource using PostgresSourceConfigFactory
618+
PostgresSourceConfigFactory configFactory =
619+
(PostgresSourceConfigFactory)
620+
new PostgresSourceConfigFactory()
621+
.hostname(POSTGRES_CONTAINER.getHost())
622+
.port(POSTGRES_CONTAINER.getMappedPort(POSTGRESQL_PORT))
623+
.username(TEST_USER)
624+
.password(TEST_PASSWORD)
625+
.databaseList(hyphenDbName)
626+
.tableList("public.test_table")
627+
.startupOptions(StartupOptions.initial())
628+
.serverTimeZone("UTC");
629+
configFactory.database(hyphenDbName);
630+
configFactory.slotName(slotName);
631+
configFactory.decodingPluginName("pgoutput");
632+
633+
PostgresDataSource dataSource = new PostgresDataSource(configFactory);
634+
635+
// Verify the configuration works
636+
assertThat(dataSource.getPostgresSourceConfig().getTableList())
637+
.isEqualTo(Arrays.asList("public.test_table"));
638+
assertThat(dataSource.getPostgresSourceConfig().getDatabaseList()).contains(hyphenDbName);
639+
640+
// Now actually read data using the Flink streaming API
641+
StreamExecutionEnvironment testEnv = StreamExecutionEnvironment.getExecutionEnvironment();
642+
testEnv.setParallelism(1);
643+
testEnv.enableCheckpointing(1000);
644+
testEnv.setRestartStrategy(RestartStrategies.noRestart());
645+
646+
FlinkSourceProvider sourceProvider =
647+
(FlinkSourceProvider) dataSource.getEventSourceProvider();
648+
649+
CloseableIterator<Event> events =
650+
testEnv.fromSource(
651+
sourceProvider.getSource(),
652+
WatermarkStrategy.noWatermarks(),
653+
PostgresDataSourceFactory.IDENTIFIER,
654+
new EventTypeInfo())
655+
.executeAndCollect();
656+
657+
// Collect events and verify data
658+
List<Event> collectedEvents = new ArrayList<>();
659+
int expectedDataCount = 3; // We inserted 3 rows
660+
int dataCount = 0;
661+
int maxEvents = 10; // Safety limit
662+
663+
while (events.hasNext() && collectedEvents.size() < maxEvents) {
664+
Event event = events.next();
665+
collectedEvents.add(event);
666+
if (event instanceof DataChangeEvent) {
667+
dataCount++;
668+
if (dataCount >= expectedDataCount) {
669+
break;
670+
}
671+
}
672+
}
673+
events.close();
674+
675+
// Verify we received CreateTableEvent and DataChangeEvents
676+
assertThat(collectedEvents).isNotEmpty();
677+
678+
// Check for CreateTableEvent
679+
long createTableEventCount =
680+
collectedEvents.stream().filter(e -> e instanceof CreateTableEvent).count();
681+
assertThat(createTableEventCount).isGreaterThanOrEqualTo(1);
682+
683+
// Check for DataChangeEvents (INSERT events from snapshot)
684+
List<DataChangeEvent> dataChangeEvents =
685+
collectedEvents.stream()
686+
.filter(e -> e instanceof DataChangeEvent)
687+
.map(e -> (DataChangeEvent) e)
688+
.collect(Collectors.toList());
689+
690+
assertThat(dataChangeEvents).hasSize(expectedDataCount);
691+
692+
// Verify the table ID in events
693+
for (DataChangeEvent dce : dataChangeEvents) {
694+
assertThat(dce.tableId().getSchemaName()).isEqualTo("public");
695+
assertThat(dce.tableId().getTableName()).isEqualTo("test_table");
696+
}
697+
698+
// Verify the data content - we should have 3 INSERT events with ids 1, 2, 3
699+
List<Integer> actualIds =
700+
dataChangeEvents.stream()
701+
.map(
702+
dce -> {
703+
RecordData after = dce.after();
704+
return after.getInt(0); // id column
705+
})
706+
.sorted()
707+
.collect(Collectors.toList());
708+
assertThat(actualIds).containsExactly(1, 2, 3);
709+
710+
// Cleanup - first drop replication slot, then terminate connections and drop database
711+
try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
712+
Statement statement = connection.createStatement()) {
713+
// Drop replication slot first (it was created during CDC connection)
714+
try {
715+
statement.execute(String.format("SELECT pg_drop_replication_slot('%s')", slotName));
716+
} catch (SQLException e) {
717+
// Ignore if slot doesn't exist
718+
LOG.warn("Failed to drop replication slot: {}", e.getMessage());
719+
}
720+
// Terminate all connections to the database
721+
statement.execute(
722+
"SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname = '"
723+
+ hyphenDbName
724+
+ "'");
725+
// Small delay to ensure connections are terminated
726+
Thread.sleep(500);
727+
statement.execute("DROP DATABASE IF EXISTS \"" + hyphenDbName + "\"");
728+
}
729+
}
730+
586731
private static <T> List<T> fetchResultsExcept(Iterator<T> iter, int size, T sideEvent) {
587732
List<T> result = new ArrayList<>(size);
588733
List<T> sideResults = new ArrayList<>();

0 commit comments

Comments
 (0)