diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java index f8d249c4d68..578b6e0f8c6 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java @@ -404,12 +404,14 @@ private Optional getValidateDatabaseName(String tables) { private boolean isValidPostgresDbName(String dbName) { // PostgreSQL database name conventions: // 1. Length does not exceed 63 characters - // 2. Can contain letters, numbers, underscores, and dollar signs - // 3. Cannot start with a dollar sign + // 2. Can contain letters, numbers, underscores, dollar signs, and hyphens + // 3. Must start with a letter, underscore, or dollar sign (cannot start with a hyphen) + // Note: While SQL identifiers have strict rules, database names created + // via createdb command can contain hyphens (e.g., createdb foo-bar) if (dbName == null || dbName.length() > 63) { return false; } - if (!dbName.matches("[a-zA-Z_$][a-zA-Z0-9_$]*")) { + if (!dbName.matches("[a-zA-Z_$][a-zA-Z0-9_$-]*")) { return false; } return true; diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java index b2aef9eed9c..40b87f5d4e0 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java @@ -20,6 +20,7 @@ import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.cdc.common.data.RecordData; import org.apache.flink.cdc.common.data.binary.BinaryStringData; import org.apache.flink.cdc.common.event.CreateTableEvent; import org.apache.flink.cdc.common.event.DataChangeEvent; @@ -68,6 +69,7 @@ import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; @@ -583,6 +585,149 @@ public void testSnapshotOnlyMode() throws Exception { .isEqualTo(String.format("Replication slot \"%s\" does not exist", slotName)); } + @Test + public void testDatabaseNameWithHyphenEndToEnd() throws Exception { + // Create a real database with hyphen to verify full CDC sync works + // This test verifies the fix for FLINK-38512 + String hyphenDbName = "test-db-with-hyphen"; + + // Create the database with hyphen (need to quote the name) + try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER); + Statement statement = connection.createStatement()) { + // Drop if exists and create new database with hyphen in name + statement.execute("DROP DATABASE IF EXISTS \"" + hyphenDbName + "\""); + statement.execute("CREATE DATABASE \"" + hyphenDbName + "\""); + } + + // Connect to the new database and create a table with data + String jdbcUrl = + String.format( + "jdbc:postgresql://%s:%d/%s", + POSTGRES_CONTAINER.getHost(), + POSTGRES_CONTAINER.getMappedPort(POSTGRESQL_PORT), + hyphenDbName); + try (Connection connection = + java.sql.DriverManager.getConnection(jdbcUrl, TEST_USER, TEST_PASSWORD); + Statement statement = connection.createStatement()) { + statement.execute("CREATE TABLE test_table (id INT PRIMARY KEY, name VARCHAR(100))"); + statement.execute( + "INSERT INTO test_table VALUES (1, 'test1'), (2, 'test2'), (3, 'test3')"); + } + + // Create PostgresDataSource using PostgresSourceConfigFactory + PostgresSourceConfigFactory configFactory = + (PostgresSourceConfigFactory) + new PostgresSourceConfigFactory() + .hostname(POSTGRES_CONTAINER.getHost()) + .port(POSTGRES_CONTAINER.getMappedPort(POSTGRESQL_PORT)) + .username(TEST_USER) + .password(TEST_PASSWORD) + .databaseList(hyphenDbName) + .tableList("public.test_table") + .startupOptions(StartupOptions.initial()) + .serverTimeZone("UTC"); + configFactory.database(hyphenDbName); + configFactory.slotName(slotName); + configFactory.decodingPluginName("pgoutput"); + + PostgresDataSource dataSource = new PostgresDataSource(configFactory); + + // Verify the configuration works + assertThat(dataSource.getPostgresSourceConfig().getTableList()) + .isEqualTo(Arrays.asList("public.test_table")); + assertThat(dataSource.getPostgresSourceConfig().getDatabaseList()).contains(hyphenDbName); + + // Now actually read data using the Flink streaming API + StreamExecutionEnvironment testEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + testEnv.setParallelism(1); + testEnv.enableCheckpointing(1000); + testEnv.setRestartStrategy(RestartStrategies.noRestart()); + + FlinkSourceProvider sourceProvider = + (FlinkSourceProvider) dataSource.getEventSourceProvider(); + + CloseableIterator events = + testEnv.fromSource( + sourceProvider.getSource(), + WatermarkStrategy.noWatermarks(), + PostgresDataSourceFactory.IDENTIFIER, + new EventTypeInfo()) + .executeAndCollect(); + + // Collect events and verify data + List collectedEvents = new ArrayList<>(); + int expectedDataCount = 3; // We inserted 3 rows + int dataCount = 0; + int maxEvents = 10; // Safety limit + + while (events.hasNext() && collectedEvents.size() < maxEvents) { + Event event = events.next(); + collectedEvents.add(event); + if (event instanceof DataChangeEvent) { + dataCount++; + if (dataCount >= expectedDataCount) { + break; + } + } + } + events.close(); + + // Verify we received CreateTableEvent and DataChangeEvents + assertThat(collectedEvents).isNotEmpty(); + + // Check for CreateTableEvent + long createTableEventCount = + collectedEvents.stream().filter(e -> e instanceof CreateTableEvent).count(); + assertThat(createTableEventCount).isGreaterThanOrEqualTo(1); + + // Check for DataChangeEvents (INSERT events from snapshot) + List dataChangeEvents = + collectedEvents.stream() + .filter(e -> e instanceof DataChangeEvent) + .map(e -> (DataChangeEvent) e) + .collect(Collectors.toList()); + + assertThat(dataChangeEvents).hasSize(expectedDataCount); + + // Verify the table ID in events + for (DataChangeEvent dce : dataChangeEvents) { + assertThat(dce.tableId().getSchemaName()).isEqualTo("public"); + assertThat(dce.tableId().getTableName()).isEqualTo("test_table"); + } + + // Verify the data content - we should have 3 INSERT events with ids 1, 2, 3 + List actualIds = + dataChangeEvents.stream() + .map( + dce -> { + RecordData after = dce.after(); + return after.getInt(0); // id column + }) + .sorted() + .collect(Collectors.toList()); + assertThat(actualIds).containsExactly(1, 2, 3); + + // Cleanup - first drop replication slot, then terminate connections and drop database + try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER); + Statement statement = connection.createStatement()) { + // Drop replication slot first (it was created during CDC connection) + try { + statement.execute(String.format("SELECT pg_drop_replication_slot('%s')", slotName)); + } catch (SQLException e) { + // Ignore if slot doesn't exist + LOG.warn("Failed to drop replication slot: {}", e.getMessage()); + } + // Terminate all connections to the database + statement.execute( + "SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname = '" + + hyphenDbName + + "'"); + // Small delay to ensure connections are terminated + Thread.sleep(500); + statement.execute("DROP DATABASE IF EXISTS \"" + hyphenDbName + "\""); + } + } + private static List fetchResultsExcept(Iterator iter, int size, T sideEvent) { List result = new ArrayList<>(size); List sideResults = new ArrayList<>();