|
20 | 20 | import org.apache.flink.api.common.eventtime.WatermarkStrategy; |
21 | 21 | import org.apache.flink.api.common.restartstrategy.RestartStrategies; |
22 | 22 | import org.apache.flink.api.common.typeutils.TypeSerializer; |
| 23 | +import org.apache.flink.cdc.common.data.RecordData; |
23 | 24 | import org.apache.flink.cdc.common.data.binary.BinaryStringData; |
24 | 25 | import org.apache.flink.cdc.common.event.CreateTableEvent; |
25 | 26 | import org.apache.flink.cdc.common.event.DataChangeEvent; |
|
68 | 69 | import java.sql.SQLException; |
69 | 70 | import java.sql.Statement; |
70 | 71 | import java.util.ArrayList; |
| 72 | +import java.util.Arrays; |
71 | 73 | import java.util.Collections; |
72 | 74 | import java.util.HashMap; |
73 | 75 | import java.util.Iterator; |
@@ -583,6 +585,149 @@ public void testSnapshotOnlyMode() throws Exception { |
583 | 585 | .isEqualTo(String.format("Replication slot \"%s\" does not exist", slotName)); |
584 | 586 | } |
585 | 587 |
|
| 588 | + @Test |
| 589 | + public void testDatabaseNameWithHyphenEndToEnd() throws Exception { |
| 590 | + // Create a real database with hyphen to verify full CDC sync works |
| 591 | + // This test verifies the fix for FLINK-38512 |
| 592 | + String hyphenDbName = "test-db-with-hyphen"; |
| 593 | + |
| 594 | + // Create the database with hyphen (need to quote the name) |
| 595 | + try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER); |
| 596 | + Statement statement = connection.createStatement()) { |
| 597 | + // Drop if exists and create new database with hyphen in name |
| 598 | + statement.execute("DROP DATABASE IF EXISTS \"" + hyphenDbName + "\""); |
| 599 | + statement.execute("CREATE DATABASE \"" + hyphenDbName + "\""); |
| 600 | + } |
| 601 | + |
| 602 | + // Connect to the new database and create a table with data |
| 603 | + String jdbcUrl = |
| 604 | + String.format( |
| 605 | + "jdbc:postgresql://%s:%d/%s", |
| 606 | + POSTGRES_CONTAINER.getHost(), |
| 607 | + POSTGRES_CONTAINER.getMappedPort(POSTGRESQL_PORT), |
| 608 | + hyphenDbName); |
| 609 | + try (Connection connection = |
| 610 | + java.sql.DriverManager.getConnection(jdbcUrl, TEST_USER, TEST_PASSWORD); |
| 611 | + Statement statement = connection.createStatement()) { |
| 612 | + statement.execute("CREATE TABLE test_table (id INT PRIMARY KEY, name VARCHAR(100))"); |
| 613 | + statement.execute( |
| 614 | + "INSERT INTO test_table VALUES (1, 'test1'), (2, 'test2'), (3, 'test3')"); |
| 615 | + } |
| 616 | + |
| 617 | + // Create PostgresDataSource using PostgresSourceConfigFactory |
| 618 | + PostgresSourceConfigFactory configFactory = |
| 619 | + (PostgresSourceConfigFactory) |
| 620 | + new PostgresSourceConfigFactory() |
| 621 | + .hostname(POSTGRES_CONTAINER.getHost()) |
| 622 | + .port(POSTGRES_CONTAINER.getMappedPort(POSTGRESQL_PORT)) |
| 623 | + .username(TEST_USER) |
| 624 | + .password(TEST_PASSWORD) |
| 625 | + .databaseList(hyphenDbName) |
| 626 | + .tableList("public.test_table") |
| 627 | + .startupOptions(StartupOptions.initial()) |
| 628 | + .serverTimeZone("UTC"); |
| 629 | + configFactory.database(hyphenDbName); |
| 630 | + configFactory.slotName(slotName); |
| 631 | + configFactory.decodingPluginName("pgoutput"); |
| 632 | + |
| 633 | + PostgresDataSource dataSource = new PostgresDataSource(configFactory); |
| 634 | + |
| 635 | + // Verify the configuration works |
| 636 | + assertThat(dataSource.getPostgresSourceConfig().getTableList()) |
| 637 | + .isEqualTo(Arrays.asList("public.test_table")); |
| 638 | + assertThat(dataSource.getPostgresSourceConfig().getDatabaseList()).contains(hyphenDbName); |
| 639 | + |
| 640 | + // Now actually read data using the Flink streaming API |
| 641 | + StreamExecutionEnvironment testEnv = StreamExecutionEnvironment.getExecutionEnvironment(); |
| 642 | + testEnv.setParallelism(1); |
| 643 | + testEnv.enableCheckpointing(1000); |
| 644 | + testEnv.setRestartStrategy(RestartStrategies.noRestart()); |
| 645 | + |
| 646 | + FlinkSourceProvider sourceProvider = |
| 647 | + (FlinkSourceProvider) dataSource.getEventSourceProvider(); |
| 648 | + |
| 649 | + CloseableIterator<Event> events = |
| 650 | + testEnv.fromSource( |
| 651 | + sourceProvider.getSource(), |
| 652 | + WatermarkStrategy.noWatermarks(), |
| 653 | + PostgresDataSourceFactory.IDENTIFIER, |
| 654 | + new EventTypeInfo()) |
| 655 | + .executeAndCollect(); |
| 656 | + |
| 657 | + // Collect events and verify data |
| 658 | + List<Event> collectedEvents = new ArrayList<>(); |
| 659 | + int expectedDataCount = 3; // We inserted 3 rows |
| 660 | + int dataCount = 0; |
| 661 | + int maxEvents = 10; // Safety limit |
| 662 | + |
| 663 | + while (events.hasNext() && collectedEvents.size() < maxEvents) { |
| 664 | + Event event = events.next(); |
| 665 | + collectedEvents.add(event); |
| 666 | + if (event instanceof DataChangeEvent) { |
| 667 | + dataCount++; |
| 668 | + if (dataCount >= expectedDataCount) { |
| 669 | + break; |
| 670 | + } |
| 671 | + } |
| 672 | + } |
| 673 | + events.close(); |
| 674 | + |
| 675 | + // Verify we received CreateTableEvent and DataChangeEvents |
| 676 | + assertThat(collectedEvents).isNotEmpty(); |
| 677 | + |
| 678 | + // Check for CreateTableEvent |
| 679 | + long createTableEventCount = |
| 680 | + collectedEvents.stream().filter(e -> e instanceof CreateTableEvent).count(); |
| 681 | + assertThat(createTableEventCount).isGreaterThanOrEqualTo(1); |
| 682 | + |
| 683 | + // Check for DataChangeEvents (INSERT events from snapshot) |
| 684 | + List<DataChangeEvent> dataChangeEvents = |
| 685 | + collectedEvents.stream() |
| 686 | + .filter(e -> e instanceof DataChangeEvent) |
| 687 | + .map(e -> (DataChangeEvent) e) |
| 688 | + .collect(Collectors.toList()); |
| 689 | + |
| 690 | + assertThat(dataChangeEvents).hasSize(expectedDataCount); |
| 691 | + |
| 692 | + // Verify the table ID in events |
| 693 | + for (DataChangeEvent dce : dataChangeEvents) { |
| 694 | + assertThat(dce.tableId().getSchemaName()).isEqualTo("public"); |
| 695 | + assertThat(dce.tableId().getTableName()).isEqualTo("test_table"); |
| 696 | + } |
| 697 | + |
| 698 | + // Verify the data content - we should have 3 INSERT events with ids 1, 2, 3 |
| 699 | + List<Integer> actualIds = |
| 700 | + dataChangeEvents.stream() |
| 701 | + .map( |
| 702 | + dce -> { |
| 703 | + RecordData after = dce.after(); |
| 704 | + return after.getInt(0); // id column |
| 705 | + }) |
| 706 | + .sorted() |
| 707 | + .collect(Collectors.toList()); |
| 708 | + assertThat(actualIds).containsExactly(1, 2, 3); |
| 709 | + |
| 710 | + // Cleanup - first drop replication slot, then terminate connections and drop database |
| 711 | + try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER); |
| 712 | + Statement statement = connection.createStatement()) { |
| 713 | + // Drop replication slot first (it was created during CDC connection) |
| 714 | + try { |
| 715 | + statement.execute(String.format("SELECT pg_drop_replication_slot('%s')", slotName)); |
| 716 | + } catch (SQLException e) { |
| 717 | + // Ignore if slot doesn't exist |
| 718 | + LOG.warn("Failed to drop replication slot: {}", e.getMessage()); |
| 719 | + } |
| 720 | + // Terminate all connections to the database |
| 721 | + statement.execute( |
| 722 | + "SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname = '" |
| 723 | + + hyphenDbName |
| 724 | + + "'"); |
| 725 | + // Small delay to ensure connections are terminated |
| 726 | + Thread.sleep(500); |
| 727 | + statement.execute("DROP DATABASE IF EXISTS \"" + hyphenDbName + "\""); |
| 728 | + } |
| 729 | + } |
| 730 | + |
586 | 731 | private static <T> List<T> fetchResultsExcept(Iterator<T> iter, int size, T sideEvent) { |
587 | 732 | List<T> result = new ArrayList<>(size); |
588 | 733 | List<T> sideResults = new ArrayList<>(); |
|
0 commit comments