Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -404,12 +404,14 @@ private Optional<String> getValidateDatabaseName(String tables) {
private boolean isValidPostgresDbName(String dbName) {
// PostgreSQL database name conventions:
// 1. Length does not exceed 63 characters
// 2. Can contain letters, numbers, underscores, and dollar signs
// 3. Cannot start with a dollar sign
// 2. Can contain letters, numbers, underscores, dollar signs, and hyphens
// 3. Must start with a letter, underscore, or dollar sign (cannot start with a hyphen)
// Note: While SQL identifiers have strict rules, database names created
// via createdb command can contain hyphens (e.g., createdb foo-bar)
if (dbName == null || dbName.length() > 63) {
return false;
}
if (!dbName.matches("[a-zA-Z_$][a-zA-Z0-9_$]*")) {
if (!dbName.matches("[a-zA-Z_$][a-zA-Z0-9_$-]*")) {
return false;
}
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
Expand Down Expand Up @@ -68,6 +69,7 @@
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
Expand Down Expand Up @@ -583,6 +585,149 @@ public void testSnapshotOnlyMode() throws Exception {
.isEqualTo(String.format("Replication slot \"%s\" does not exist", slotName));
}

@Test
public void testDatabaseNameWithHyphenEndToEnd() throws Exception {
// Create a real database with hyphen to verify full CDC sync works
// This test verifies the fix for FLINK-38512
String hyphenDbName = "test-db-with-hyphen";

// Create the database with hyphen (need to quote the name)
try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
Statement statement = connection.createStatement()) {
// Drop if exists and create new database with hyphen in name
statement.execute("DROP DATABASE IF EXISTS \"" + hyphenDbName + "\"");
statement.execute("CREATE DATABASE \"" + hyphenDbName + "\"");
}

// Connect to the new database and create a table with data
String jdbcUrl =
String.format(
"jdbc:postgresql://%s:%d/%s",
POSTGRES_CONTAINER.getHost(),
POSTGRES_CONTAINER.getMappedPort(POSTGRESQL_PORT),
hyphenDbName);
try (Connection connection =
java.sql.DriverManager.getConnection(jdbcUrl, TEST_USER, TEST_PASSWORD);
Statement statement = connection.createStatement()) {
statement.execute("CREATE TABLE test_table (id INT PRIMARY KEY, name VARCHAR(100))");
statement.execute(
"INSERT INTO test_table VALUES (1, 'test1'), (2, 'test2'), (3, 'test3')");
}

// Create PostgresDataSource using PostgresSourceConfigFactory
PostgresSourceConfigFactory configFactory =
(PostgresSourceConfigFactory)
new PostgresSourceConfigFactory()
.hostname(POSTGRES_CONTAINER.getHost())
.port(POSTGRES_CONTAINER.getMappedPort(POSTGRESQL_PORT))
.username(TEST_USER)
.password(TEST_PASSWORD)
.databaseList(hyphenDbName)
.tableList("public.test_table")
.startupOptions(StartupOptions.initial())
.serverTimeZone("UTC");
configFactory.database(hyphenDbName);
configFactory.slotName(slotName);
configFactory.decodingPluginName("pgoutput");

PostgresDataSource dataSource = new PostgresDataSource(configFactory);

// Verify the configuration works
assertThat(dataSource.getPostgresSourceConfig().getTableList())
.isEqualTo(Arrays.asList("public.test_table"));
assertThat(dataSource.getPostgresSourceConfig().getDatabaseList()).contains(hyphenDbName);

// Now actually read data using the Flink streaming API
StreamExecutionEnvironment testEnv = StreamExecutionEnvironment.getExecutionEnvironment();
testEnv.setParallelism(1);
testEnv.enableCheckpointing(1000);
testEnv.setRestartStrategy(RestartStrategies.noRestart());

FlinkSourceProvider sourceProvider =
(FlinkSourceProvider) dataSource.getEventSourceProvider();

CloseableIterator<Event> events =
testEnv.fromSource(
sourceProvider.getSource(),
WatermarkStrategy.noWatermarks(),
PostgresDataSourceFactory.IDENTIFIER,
new EventTypeInfo())
.executeAndCollect();

// Collect events and verify data
List<Event> collectedEvents = new ArrayList<>();
int expectedDataCount = 3; // We inserted 3 rows
int dataCount = 0;
int maxEvents = 10; // Safety limit

while (events.hasNext() && collectedEvents.size() < maxEvents) {
Event event = events.next();
collectedEvents.add(event);
if (event instanceof DataChangeEvent) {
dataCount++;
if (dataCount >= expectedDataCount) {
break;
}
}
}
events.close();

// Verify we received CreateTableEvent and DataChangeEvents
assertThat(collectedEvents).isNotEmpty();

// Check for CreateTableEvent
long createTableEventCount =
collectedEvents.stream().filter(e -> e instanceof CreateTableEvent).count();
assertThat(createTableEventCount).isGreaterThanOrEqualTo(1);

// Check for DataChangeEvents (INSERT events from snapshot)
List<DataChangeEvent> dataChangeEvents =
collectedEvents.stream()
.filter(e -> e instanceof DataChangeEvent)
.map(e -> (DataChangeEvent) e)
.collect(Collectors.toList());

assertThat(dataChangeEvents).hasSize(expectedDataCount);

// Verify the table ID in events
for (DataChangeEvent dce : dataChangeEvents) {
assertThat(dce.tableId().getSchemaName()).isEqualTo("public");
assertThat(dce.tableId().getTableName()).isEqualTo("test_table");
}

// Verify the data content - we should have 3 INSERT events with ids 1, 2, 3
List<Integer> actualIds =
dataChangeEvents.stream()
.map(
dce -> {
RecordData after = dce.after();
return after.getInt(0); // id column
})
.sorted()
.collect(Collectors.toList());
assertThat(actualIds).containsExactly(1, 2, 3);

// Cleanup - first drop replication slot, then terminate connections and drop database
try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
Statement statement = connection.createStatement()) {
// Drop replication slot first (it was created during CDC connection)
try {
statement.execute(String.format("SELECT pg_drop_replication_slot('%s')", slotName));
} catch (SQLException e) {
// Ignore if slot doesn't exist
LOG.warn("Failed to drop replication slot: {}", e.getMessage());
}
// Terminate all connections to the database
statement.execute(
"SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname = '"
+ hyphenDbName
+ "'");
// Small delay to ensure connections are terminated
Thread.sleep(500);
statement.execute("DROP DATABASE IF EXISTS \"" + hyphenDbName + "\"");
}
}

private static <T> List<T> fetchResultsExcept(Iterator<T> iter, int size, T sideEvent) {
List<T> result = new ArrayList<>(size);
List<T> sideResults = new ArrayList<>();
Expand Down