From 00e3b60d6177d69022fe7830e2c4387448a9e9cc Mon Sep 17 00:00:00 2001 From: mrart Date: Sun, 4 Jan 2026 08:49:47 +0800 Subject: [PATCH 1/4] [FLINK-38840][postgres] Postgres YAML connector supports emitting complete Table ID [FLINK-38840][postgres] Postgres YAML connector supports emitting complete Table ID [FLINK-38840][postgres] Postgres YAML connector supports emitting complete Table ID [FLINK-38840][postgres] Postgres YAML connector supports emitting complete Table ID --- .../connectors/flink-sources/postgres-cdc.md | 12 +++++ .../pipeline-connectors/postgres.md | 14 ++++- .../connectors/flink-sources/postgres-cdc.md | 12 +++++ .../pipeline-connectors/postgres.md | 12 +++++ .../factory/PostgresDataSourceFactory.java | 5 ++ .../postgres/source/PostgresDataSource.java | 12 +++-- .../source/PostgresDataSourceOptions.java | 9 ++++ .../source/PostgresEventDeserializer.java | 27 +++++++++- .../reader/PostgresPipelineRecordEmitter.java | 51 +++++++++---------- .../postgres/utils/PostgresSchemaUtils.java | 20 ++++++-- .../PostgresDataSourceFactoryTest.java | 12 ++--- .../source/PostgresPipelineITCaseTest.java | 14 ++--- .../connection/PostgresConnection.java | 4 ++ .../source/PostgresSourceBuilder.java | 6 +++ .../source/config/PostgresSourceConfig.java | 14 ++++- .../config/PostgresSourceConfigFactory.java | 11 +++- .../source/config/PostgresSourceOptions.java | 9 ++++ .../fetch/PostgresSourceFetchTaskContext.java | 26 ++++++---- .../source/utils/CustomPostgresSchema.java | 2 +- 19 files changed, 207 insertions(+), 65 deletions(-) diff --git a/docs/content.zh/docs/connectors/flink-sources/postgres-cdc.md b/docs/content.zh/docs/connectors/flink-sources/postgres-cdc.md index f0e0de525c4..741e08d65a8 100644 --- a/docs/content.zh/docs/connectors/flink-sources/postgres-cdc.md +++ b/docs/content.zh/docs/connectors/flink-sources/postgres-cdc.md @@ -292,6 +292,18 @@ Connector Options (2) Table list (regex or predefined list) should only match the parent table name, if table list matches both parent and child tables, snapshot data will be read twice. + + table-id.include-database + optional + false + Boolean + + 是否在生成的 Table ID 中包含数据库名称。
+ 如果设置为 true,Table ID 的格式为 (数据库, 模式, 表)。
+ 如果设置为 false,Table ID 的格式为 (模式, 表)。
+ 默认值为 false。 + + diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md b/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md index 50206761a19..32270b40e71 100644 --- a/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md +++ b/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md @@ -1,4 +1,4 @@ ---- +q--- title: "Postgres" weight: 2 type: docs @@ -270,6 +270,18 @@ pipeline: 此为实验性选项,默认值为 false。 + + table-id.include-database + optional + false + Boolean + + 是否在生成的 Table ID 中包含数据库名称。
+ 如果设置为 true,Table ID 的格式为 (数据库, 模式, 表)。
+ 如果设置为 false,Table ID 的格式为 (模式, 表)。
+ 默认值为 false。 + + diff --git a/docs/content/docs/connectors/flink-sources/postgres-cdc.md b/docs/content/docs/connectors/flink-sources/postgres-cdc.md index 533dea16c10..5d5f170f36f 100644 --- a/docs/content/docs/connectors/flink-sources/postgres-cdc.md +++ b/docs/content/docs/connectors/flink-sources/postgres-cdc.md @@ -289,6 +289,18 @@ SELECT * FROM shipments; (2) Table list (regex or predefined list) should only match the parent table name, if table list matches both parent and child tables, snapshot data will be read twice. + + table-id.include-database + optional + false + Boolean + + Whether to include database in the generated Table ID.
+ If set to true, the Table ID will be in the format (database, schema, table).
+ If set to false, the Table ID will be in the format (schema, table).
+ Defaults to false. + + diff --git a/docs/content/docs/connectors/pipeline-connectors/postgres.md b/docs/content/docs/connectors/pipeline-connectors/postgres.md index 03dd4e5a314..fa5ce5b105f 100644 --- a/docs/content/docs/connectors/pipeline-connectors/postgres.md +++ b/docs/content/docs/connectors/pipeline-connectors/postgres.md @@ -262,6 +262,18 @@ pipeline: Experimental option, defaults to false. + + table-id.include-database + optional + false + Boolean + + Whether to include database in the generated Table ID.
+ If set to true, the Table ID will be in the format (database, schema, table).
+ If set to false, the Table ID will be in the format (schema, table).
+ Defaults to false. + + diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java index 918d479f6df..5e6c446c5d7 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java @@ -80,6 +80,7 @@ import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND; import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.TABLES; import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.TABLES_EXCLUDE; +import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.TABLE_ID_INCLUDE_DATABASE; import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.USERNAME; import static org.apache.flink.cdc.debezium.table.DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX; import static org.apache.flink.cdc.debezium.table.DebeziumOptions.getDebeziumProperties; @@ -129,6 +130,7 @@ public DataSource createDataSource(Context context) { int connectionPoolSize = config.get(CONNECTION_POOL_SIZE); boolean skipSnapshotBackfill = config.get(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP); int lsnCommitCheckpointsDelay = config.get(SCAN_LSN_COMMIT_CHECKPOINTS_DELAY); + boolean tableIdIncludeDatabase = config.get(TABLE_ID_INCLUDE_DATABASE); validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1); validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1); @@ -169,6 +171,7 @@ public DataSource createDataSource(Context context) { .skipSnapshotBackfill(skipSnapshotBackfill) .lsnCommitCheckpointsDelay(lsnCommitCheckpointsDelay) .assignUnboundedChunkFirst(isAssignUnboundedChunkFirst) + .includeDatabaseInTableId(tableIdIncludeDatabase) .getConfigFactory(); List tableIds = PostgresSchemaUtils.listTables(configFactory.create(0), null); @@ -197,6 +200,7 @@ public DataSource createDataSource(Context context) { String metadataList = config.get(METADATA_LIST); List readableMetadataList = listReadableMetadata(metadataList); + // Create a custom PostgresDataSource that passes the includeDatabaseInTableId flag return new PostgresDataSource(configFactory, readableMetadataList); } @@ -257,6 +261,7 @@ public Set> optionalOptions() { options.add(SCAN_LSN_COMMIT_CHECKPOINTS_DELAY); options.add(METADATA_LIST); options.add(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED); + options.add(TABLE_ID_INCLUDE_DATABASE); return options; } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSource.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSource.java index 767fb5fc33a..cb697280357 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSource.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSource.java @@ -65,14 +65,20 @@ public PostgresDataSource( @Override public EventSourceProvider getEventSourceProvider() { + String databaseName = postgresSourceConfig.getDatabaseList().get(0); + boolean includeDatabaseInTableId = postgresSourceConfig.isIncludeDatabaseInTableId(); DebeziumEventDeserializationSchema deserializer = - new PostgresEventDeserializer(DebeziumChangelogMode.ALL, readableMetadataList); + new PostgresEventDeserializer( + DebeziumChangelogMode.ALL, + readableMetadataList, + includeDatabaseInTableId, + databaseName); PostgresOffsetFactory postgresOffsetFactory = new PostgresOffsetFactory(); PostgresDialect postgresDialect = new PostgresDialect(postgresSourceConfig); PostgresSourceBuilder.PostgresIncrementalSource source = - new PostgresPipelineSource<>( + new PostgresPipelineSource( configFactory, deserializer, postgresOffsetFactory, @@ -112,7 +118,7 @@ public PostgresPipelineSource( @Override protected RecordEmitter createRecordEmitter( SourceConfig sourceConfig, SourceReaderMetrics sourceReaderMetrics) { - return new PostgresPipelineRecordEmitter<>( + return new PostgresPipelineRecordEmitter( deserializationSchema, sourceReaderMetrics, this.sourceConfig, diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSourceOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSourceOptions.java index ec51de8b680..7e9ac4b0c58 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSourceOptions.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSourceOptions.java @@ -264,4 +264,13 @@ public class PostgresDataSourceOptions { .defaultValue(false) .withDescription( "Whether to assign the unbounded chunks first during snapshot reading phase. This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk. Defaults to false."); + + public static final ConfigOption TABLE_ID_INCLUDE_DATABASE = + ConfigOptions.key("table-id.include-database") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether to include database in the generated Table ID. " + + "If set to true, the Table ID will be in the format (database, schema, table). " + + "If set to false, the Table ID will be in the format (schema, table). Defaults to false."); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresEventDeserializer.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresEventDeserializer.java index a37c35c5216..c31dbc73b7f 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresEventDeserializer.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresEventDeserializer.java @@ -50,18 +50,37 @@ public class PostgresEventDeserializer extends DebeziumEventDeserializationSchem private static final long serialVersionUID = 1L; private List readableMetadataList; + private final boolean includeDatabaseInTableId; + private final String databaseName; private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); public PostgresEventDeserializer(DebeziumChangelogMode changelogMode) { - super(new PostgresSchemaDataTypeInference(), changelogMode); + this(changelogMode, new ArrayList<>(), false, null); } public PostgresEventDeserializer( DebeziumChangelogMode changelogMode, List readableMetadataList) { + this(changelogMode, readableMetadataList, false, null); + } + + public PostgresEventDeserializer( + DebeziumChangelogMode changelogMode, + List readableMetadataList, + boolean includeDatabaseInTableId) { + this(changelogMode, readableMetadataList, includeDatabaseInTableId, null); + } + + public PostgresEventDeserializer( + DebeziumChangelogMode changelogMode, + List readableMetadataList, + boolean includeDatabaseInTableId, + String databaseName) { super(new PostgresSchemaDataTypeInference(), changelogMode); this.readableMetadataList = readableMetadataList; + this.includeDatabaseInTableId = includeDatabaseInTableId; + this.databaseName = databaseName; } @Override @@ -87,7 +106,11 @@ protected boolean isSchemaChangeRecord(SourceRecord record) { @Override protected TableId getTableId(SourceRecord record) { String[] parts = record.topic().split("\\."); - return TableId.tableId(parts[1], parts[2]); + if (includeDatabaseInTableId && databaseName != null) { + return TableId.tableId(databaseName, parts[1], parts[2]); + } else { + return TableId.tableId(parts[1], parts[2]); + } } @Override diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java index 02761d8f311..23dfe3c4767 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java @@ -61,6 +61,7 @@ import static org.apache.flink.cdc.connectors.base.source.meta.wartermark.WatermarkEvent.isLowWatermarkEvent; import static org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.isDataChangeRecord; import static org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.isSchemaChangeEvent; +import static org.apache.flink.cdc.connectors.postgres.utils.PostgresSchemaUtils.toCdcTableId; /** The {@link RecordEmitter} implementation for PostgreSQL pipeline connector. */ public class PostgresPipelineRecordEmitter extends IncrementalSourceRecordEmitter { @@ -73,6 +74,7 @@ public class PostgresPipelineRecordEmitter extends IncrementalSourceRecordEmi // Used when startup mode is not initial private boolean shouldEmitAllCreateTableEventsInSnapshotMode = true; private boolean isBounded = false; + private boolean includeDatabaseInTableId = false; private final Map createTableEventCache; @@ -88,6 +90,7 @@ public PostgresPipelineRecordEmitter( sourceConfig.isIncludeSchemaChanges(), offsetFactory); this.sourceConfig = sourceConfig; + this.includeDatabaseInTableId = sourceConfig.isIncludeDatabaseInTableId(); this.postgresDialect = postgresDialect; this.alreadySendCreateTableTables = new HashSet<>(); this.createTableEventCache = @@ -103,10 +106,17 @@ public void applySplit(SourceSplitBase split) { // TableSchemas in SnapshotSplit only contains one table. createTableEventCache.putAll(generateCreateTableEvent(sourceConfig)); } else { - for (TableChanges.TableChange tableChange : split.getTableSchemas().values()) { + for (Map.Entry entry : + split.getTableSchemas().entrySet()) { + TableId tableId = + entry.getKey(); // Use the TableId from the map key which contains full info + TableChanges.TableChange tableChange = entry.getValue(); CreateTableEvent createTableEvent = new CreateTableEvent( - toCdcTableId(tableChange.getId()), + toCdcTableId( + tableId, + sourceConfig.getDatabaseList().get(0), + includeDatabaseInTableId), buildSchemaFromTable(tableChange.getTable())); ((DebeziumEventDeserializationSchema) debeziumDeserializationSchema) .applyChangeEvent(createTableEvent); @@ -128,10 +138,8 @@ protected void processElement( } else if (isLowWatermarkEvent(element) && splitState.isSnapshotSplitState()) { TableId tableId = splitState.asSnapshotSplitState().toSourceSplit().getTableId(); if (!alreadySendCreateTableTables.contains(tableId)) { - try (PostgresConnection jdbc = postgresDialect.openJdbcConnection()) { - sendCreateTableEvent(jdbc, tableId, (SourceOutput) output); - alreadySendCreateTableTables.add(tableId); - } + sendCreateTableEvent(tableId, (SourceOutput) output); + alreadySendCreateTableTables.add(tableId); } } else { boolean isDataChangeRecord = isDataChangeRecord(element); @@ -189,21 +197,8 @@ private Schema buildSchemaFromTable(Table table) { return tableBuilder.build(); } - private void sendCreateTableEvent( - PostgresConnection jdbc, TableId tableId, SourceOutput output) { - Schema schema = PostgresSchemaUtils.getTableSchema(tableId, sourceConfig, jdbc); - output.collect( - new CreateTableEvent( - org.apache.flink.cdc.common.event.TableId.tableId( - tableId.schema(), tableId.table()), - schema)); - } - - private org.apache.flink.cdc.common.event.TableId toCdcTableId( - io.debezium.relational.TableId dbzTableId) { - String schemaName = - dbzTableId.catalog() == null ? dbzTableId.schema() : dbzTableId.catalog(); - return org.apache.flink.cdc.common.event.TableId.tableId(schemaName, dbzTableId.table()); + private void sendCreateTableEvent(TableId tableId, SourceOutput output) { + output.collect(getCreateTableEvent(sourceConfig, tableId)); } private CreateTableEvent getCreateTableEvent( @@ -211,13 +206,15 @@ private CreateTableEvent getCreateTableEvent( try (PostgresConnection jdbc = postgresDialect.openJdbcConnection()) { Schema schema = PostgresSchemaUtils.getTableSchema(tableId, sourceConfig, jdbc); return new CreateTableEvent( - org.apache.flink.cdc.common.event.TableId.tableId( - tableId.schema(), tableId.table()), + toCdcTableId( + tableId, + sourceConfig.getDatabaseList().get(0), + includeDatabaseInTableId), schema); } } - private TableId getTableId(SourceRecord dataRecord) { + public static TableId getTableId(SourceRecord dataRecord) { Struct value = (Struct) dataRecord.value(); Struct source = value.getStruct(Envelope.FieldName.SOURCE); Field field = source.schema().field(SCHEMA_NAME_KEY); @@ -244,8 +241,10 @@ private Map generateCreateTableEvent( createTableEventCache.put( tableId, new CreateTableEvent( - org.apache.flink.cdc.common.event.TableId.tableId( - tableId.schema(), tableId.table()), + toCdcTableId( + tableId, + this.sourceConfig.getDatabaseList().get(0), + includeDatabaseInTableId), schema)); } return createTableEventCache; diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresSchemaUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresSchemaUtils.java index 0f3b0580644..93a7099ac39 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresSchemaUtils.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresSchemaUtils.java @@ -208,12 +208,26 @@ public static Column toColumn( public static io.debezium.relational.TableId toDbzTableId(TableId tableId) { return new io.debezium.relational.TableId( - tableId.getSchemaName(), null, tableId.getTableName()); + tableId.getNamespace(), tableId.getSchemaName(), tableId.getTableName()); } public static org.apache.flink.cdc.common.event.TableId toCdcTableId( io.debezium.relational.TableId dbzTableId) { - return org.apache.flink.cdc.common.event.TableId.tableId( - dbzTableId.schema(), dbzTableId.table()); + return toCdcTableId(dbzTableId, null, false); + } + + public static org.apache.flink.cdc.common.event.TableId toCdcTableId( + io.debezium.relational.TableId dbzTableId, + String databaseName, + boolean includeDatabaseInTableId) { + String schema = dbzTableId.schema(); + String table = dbzTableId.table(); + if (includeDatabaseInTableId && databaseName != null) { + return org.apache.flink.cdc.common.event.TableId.tableId(databaseName, schema, table); + } else if (schema != null) { + return org.apache.flink.cdc.common.event.TableId.tableId(schema, table); + } else { + return org.apache.flink.cdc.common.event.TableId.tableId(table); + } } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactoryTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactoryTest.java index 473c2d61ced..32f5da64587 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactoryTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactoryTest.java @@ -89,7 +89,7 @@ public void testCreateDataSource() { PostgresDataSourceFactory factory = new PostgresDataSourceFactory(); PostgresDataSource dataSource = (PostgresDataSource) factory.createDataSource(context); assertThat(dataSource.getPostgresSourceConfig().getTableList()) - .isEqualTo(Arrays.asList("inventory.products")); + .isEqualTo(Arrays.asList("postgres.inventory.products")); } @Test @@ -131,12 +131,12 @@ public void testExcludeTable() { new ArrayList<>(dataSource.getPostgresSourceConfig().getTableList()); Collections.sort(actualTableList); assertThat(actualTableList) - .isNotEqualTo(Collections.singletonList("inventory.orders")) + .isNotEqualTo(Collections.singletonList("postgres.inventory.orders")) .isEqualTo( Arrays.asList( - "inventory.customers", - "inventory.multi_max_table", - "inventory.products")); + "postgres.inventory.customers", + "postgres.inventory.multi_max_table", + "postgres.inventory.products")); } @Test @@ -254,7 +254,7 @@ public void testPrefixRequireOption() { PostgresDataSourceFactory factory = new PostgresDataSourceFactory(); PostgresDataSource dataSource = (PostgresDataSource) factory.createDataSource(context); assertThat(dataSource.getPostgresSourceConfig().getTableList()) - .isEqualTo(Arrays.asList("inventory.products")); + .isEqualTo(Arrays.asList("postgres.inventory.products")); } @Test diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java index bd6c46e2548..2adc54fa19a 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java @@ -365,6 +365,7 @@ public void testInitialStartupModeWithOpts(boolean unboundedChunkFirst) throws E sourceConfiguration.set( PostgresDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED, unboundedChunkFirst); + sourceConfiguration.set(PostgresDataSourceOptions.TABLE_ID_INCLUDE_DATABASE, true); Factory.Context context = new FactoryHelper.DefaultContext( @@ -384,7 +385,8 @@ public void testInitialStartupModeWithOpts(boolean unboundedChunkFirst) throws E new EventTypeInfo()) .executeAndCollect(); - TableId tableId = TableId.tableId("inventory", "products"); + TableId tableId = + TableId.tableId(inventoryDatabase.getDatabaseName(), "inventory", "products"); CreateTableEvent createTableEvent = getProductsCreateTableEvent(tableId); // generate snapshot data @@ -582,16 +584,6 @@ private static List fetchResultsExcept(Iterator iter, int size, T side return result; } - // Helper method to create a temporary directory for savepoint - private Path createTempSavepointDir() throws Exception { - return Files.createTempDirectory("postgres-savepoint"); - } - - // Helper method to execute the job and create a savepoint - private String createSavepoint(JobClient jobClient, Path savepointDir) throws Exception { - return jobClient.stopWithSavepoint(true, savepointDir.toAbsolutePath().toString()).get(); - } - private List getSnapshotExpected(TableId tableId) { RowType rowType = RowType.of( diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java index 69ae4c45497..dfea7f9aaa0 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java @@ -111,6 +111,8 @@ public class PostgresConnection extends JdbcConnection { private final TypeRegistry typeRegistry; private final PostgresDefaultValueConverter defaultValueConverter; + private final JdbcConfiguration config; + /** * Creates a Postgres connection using the supplied configuration. If necessary this connection * is able to resolve data type mappings. Such a connection requires a {@link @@ -152,6 +154,7 @@ public PostgresConnection( null, "\"", "\""); + this.config = config; if (Objects.isNull(valueConverterBuilder)) { this.typeRegistry = null; @@ -182,6 +185,7 @@ public PostgresConnection( null, "\"", "\""); + this.config = config.getJdbcConfig(); if (Objects.isNull(typeRegistry)) { this.typeRegistry = null; this.defaultValueConverter = null; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java index bbd65b5e9ab..0f550ca96b3 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java @@ -305,6 +305,12 @@ public PostgresSourceBuilder includePartitionedTables(boolean includePartitio return this; } + /** Whether to include database in the generated Table ID. */ + public PostgresSourceBuilder includeDatabaseInTableId(boolean includeDatabaseInTableId) { + this.configFactory.setIncludeDatabaseInTableId(includeDatabaseInTableId); + return this; + } + /** * Build the {@link PostgresIncrementalSource}. * diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java index 30271612800..4402219a647 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java @@ -39,6 +39,7 @@ public class PostgresSourceConfig extends JdbcSourceConfig { private final int subtaskId; private final int lsnCommitCheckpointsDelay; private final boolean includePartitionedTables; + private final boolean includeDatabaseInTableId; public PostgresSourceConfig( int subtaskId, @@ -69,7 +70,8 @@ public PostgresSourceConfig( boolean isScanNewlyAddedTableEnabled, int lsnCommitCheckpointsDelay, boolean assignUnboundedChunkFirst, - boolean includePartitionedTables) { + boolean includePartitionedTables, + boolean includeDatabaseInTableId) { super( startupOptions, databaseList, @@ -100,6 +102,7 @@ public PostgresSourceConfig( this.subtaskId = subtaskId; this.lsnCommitCheckpointsDelay = lsnCommitCheckpointsDelay; this.includePartitionedTables = includePartitionedTables; + this.includeDatabaseInTableId = includeDatabaseInTableId; } /** @@ -148,4 +151,13 @@ public String getJdbcUrl() { public PostgresConnectorConfig getDbzConnectorConfig() { return new PostgresConnectorConfig(getDbzConfiguration()); } + + /** + * Returns whether to include database in the generated Table ID. + * + * @return whether to include database in the generated Table ID + */ + public boolean isIncludeDatabaseInTableId() { + return includeDatabaseInTableId; + } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java index 670d4f37a56..847b1547461 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java @@ -54,6 +54,9 @@ public class PostgresSourceConfigFactory extends JdbcSourceConfigFactory { private boolean includePartitionedTables; + private boolean includeDatabaseInTableId = + PostgresSourceOptions.TABLE_ID_INCLUDE_DATABASE.defaultValue(); + /** Creates a new {@link PostgresSourceConfig} for the given subtask {@code subtaskId}. */ @Override public PostgresSourceConfig create(int subtaskId) { @@ -136,7 +139,8 @@ public PostgresSourceConfig create(int subtaskId) { scanNewlyAddedTableEnabled, lsnCommitCheckpointsDelay, assignUnboundedChunkFirst, - includePartitionedTables); + includePartitionedTables, + includeDatabaseInTableId); } /** @@ -189,4 +193,9 @@ public void setLsnCommitCheckpointsDelay(int lsnCommitCheckpointsDelay) { public void setIncludePartitionedTables(boolean includePartitionedTables) { this.includePartitionedTables = includePartitionedTables; } + + /** Set whether to include database in the generated Table ID. */ + public void setIncludeDatabaseInTableId(boolean includeDatabaseInTableId) { + this.includeDatabaseInTableId = includeDatabaseInTableId; + } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceOptions.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceOptions.java index f498c264532..db04ac14237 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceOptions.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceOptions.java @@ -97,4 +97,13 @@ public class PostgresSourceOptions extends JdbcSourceOptions { + "If enabled:\n" + "(1) PUBLICATION must be created beforehand with parameter publish_via_partition_root=true\n" + "(2) Table list (regex or predefined list) should only match the parent table name, if table list matches both parent and child tables, snapshot data will be read twice."); + + public static final ConfigOption TABLE_ID_INCLUDE_DATABASE = + ConfigOptions.key("table-id.include-database") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether to include database in the generated Table ID.\n" + + "If set to true, the Table ID will be in the format (database, schema, table).\n" + + "If set to false, the Table ID will be in the format (schema, table). Defaults to false."); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java index b3b3e778d16..49b62cb3977 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java @@ -132,9 +132,8 @@ public void configure(SourceSplitBase sourceSplitBase) { .edit() .with( "table.include.list", - ((SnapshotSplit) sourceSplitBase) - .getTableId() - .toString()) + getTableList( + ((SnapshotSplit) sourceSplitBase).getTableId())) .with( SLOT_NAME.name(), ((PostgresSourceConfig) sourceConfig) @@ -151,13 +150,13 @@ public void configure(SourceSplitBase sourceSplitBase) { // when backfilled split, only current table schema should be scan builder.with( "table.include.list", - sourceSplitBase - .asStreamSplit() - .getTableSchemas() - .keySet() - .iterator() - .next() - .toString()); + getTableList( + sourceSplitBase + .asStreamSplit() + .getTableSchemas() + .keySet() + .iterator() + .next())); } dbzConfig = @@ -385,4 +384,11 @@ private boolean isBackFillSplit(SourceSplitBase sourceSplitBase) { && !StreamSplit.STREAM_SPLIT_ID.equalsIgnoreCase( sourceSplitBase.asStreamSplit().splitId()); } + + private String getTableList(TableId tableId) { + if (tableId.schema() == null || tableId.schema().isEmpty()) { + return tableId.table(); + } + return tableId.schema() + "." + tableId.table(); + } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/CustomPostgresSchema.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/CustomPostgresSchema.java index 714baf5dde7..b0025f62542 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/CustomPostgresSchema.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/CustomPostgresSchema.java @@ -131,7 +131,7 @@ private List readTableSchema(List tableIds) throws SQLExce offsetContext, dbzConfig.databaseName(), tableId.schema(), - null, + tableId.table(), table, true); From 66e0d11c99d0d6498d9cb95faf3cca1c86985c00 Mon Sep 17 00:00:00 2001 From: mrart Date: Sun, 4 Jan 2026 21:06:19 +0800 Subject: [PATCH 2/4] fixed test. --- .../factory/PostgresDataSourceFactoryTest.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactoryTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactoryTest.java index 32f5da64587..473c2d61ced 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactoryTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactoryTest.java @@ -89,7 +89,7 @@ public void testCreateDataSource() { PostgresDataSourceFactory factory = new PostgresDataSourceFactory(); PostgresDataSource dataSource = (PostgresDataSource) factory.createDataSource(context); assertThat(dataSource.getPostgresSourceConfig().getTableList()) - .isEqualTo(Arrays.asList("postgres.inventory.products")); + .isEqualTo(Arrays.asList("inventory.products")); } @Test @@ -131,12 +131,12 @@ public void testExcludeTable() { new ArrayList<>(dataSource.getPostgresSourceConfig().getTableList()); Collections.sort(actualTableList); assertThat(actualTableList) - .isNotEqualTo(Collections.singletonList("postgres.inventory.orders")) + .isNotEqualTo(Collections.singletonList("inventory.orders")) .isEqualTo( Arrays.asList( - "postgres.inventory.customers", - "postgres.inventory.multi_max_table", - "postgres.inventory.products")); + "inventory.customers", + "inventory.multi_max_table", + "inventory.products")); } @Test @@ -254,7 +254,7 @@ public void testPrefixRequireOption() { PostgresDataSourceFactory factory = new PostgresDataSourceFactory(); PostgresDataSource dataSource = (PostgresDataSource) factory.createDataSource(context); assertThat(dataSource.getPostgresSourceConfig().getTableList()) - .isEqualTo(Arrays.asList("postgres.inventory.products")); + .isEqualTo(Arrays.asList("inventory.products")); } @Test From 81a28ed5934086afcbc3a535f5c43250d186e4dc Mon Sep 17 00:00:00 2001 From: mrart Date: Mon, 5 Jan 2026 09:15:24 +0800 Subject: [PATCH 3/4] review self. --- .../docs/connectors/pipeline-connectors/postgres.md | 2 +- .../cdc/connectors/postgres/source/PostgresDataSource.java | 4 ++-- .../postgres/source/reader/PostgresPipelineRecordEmitter.java | 2 +- .../connector/postgresql/connection/PostgresConnection.java | 4 ---- .../postgres/source/utils/CustomPostgresSchema.java | 2 +- 5 files changed, 5 insertions(+), 9 deletions(-) diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md b/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md index 32270b40e71..561b1efac3c 100644 --- a/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md +++ b/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md @@ -1,4 +1,4 @@ -q--- +--- title: "Postgres" weight: 2 type: docs diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSource.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSource.java index cb697280357..6084df1f36f 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSource.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSource.java @@ -78,7 +78,7 @@ public EventSourceProvider getEventSourceProvider() { PostgresDialect postgresDialect = new PostgresDialect(postgresSourceConfig); PostgresSourceBuilder.PostgresIncrementalSource source = - new PostgresPipelineSource( + new PostgresPipelineSource<>( configFactory, deserializer, postgresOffsetFactory, @@ -118,7 +118,7 @@ public PostgresPipelineSource( @Override protected RecordEmitter createRecordEmitter( SourceConfig sourceConfig, SourceReaderMetrics sourceReaderMetrics) { - return new PostgresPipelineRecordEmitter( + return new PostgresPipelineRecordEmitter<>( deserializationSchema, sourceReaderMetrics, this.sourceConfig, diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java index 23dfe3c4767..dd862354c25 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java @@ -214,7 +214,7 @@ private CreateTableEvent getCreateTableEvent( } } - public static TableId getTableId(SourceRecord dataRecord) { + private TableId getTableId(SourceRecord dataRecord) { Struct value = (Struct) dataRecord.value(); Struct source = value.getStruct(Envelope.FieldName.SOURCE); Field field = source.schema().field(SCHEMA_NAME_KEY); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java index dfea7f9aaa0..69ae4c45497 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java @@ -111,8 +111,6 @@ public class PostgresConnection extends JdbcConnection { private final TypeRegistry typeRegistry; private final PostgresDefaultValueConverter defaultValueConverter; - private final JdbcConfiguration config; - /** * Creates a Postgres connection using the supplied configuration. If necessary this connection * is able to resolve data type mappings. Such a connection requires a {@link @@ -154,7 +152,6 @@ public PostgresConnection( null, "\"", "\""); - this.config = config; if (Objects.isNull(valueConverterBuilder)) { this.typeRegistry = null; @@ -185,7 +182,6 @@ public PostgresConnection( null, "\"", "\""); - this.config = config.getJdbcConfig(); if (Objects.isNull(typeRegistry)) { this.typeRegistry = null; this.defaultValueConverter = null; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/CustomPostgresSchema.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/CustomPostgresSchema.java index b0025f62542..714baf5dde7 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/CustomPostgresSchema.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/CustomPostgresSchema.java @@ -131,7 +131,7 @@ private List readTableSchema(List tableIds) throws SQLExce offsetContext, dbzConfig.databaseName(), tableId.schema(), - tableId.table(), + null, table, true); From 0c3e4477fb898f489d80615d198a115c0eeb137b Mon Sep 17 00:00:00 2001 From: mrart Date: Mon, 5 Jan 2026 20:26:29 +0800 Subject: [PATCH 4/4] rm docs on postgres cdc --- .../connectors/flink-sources/postgres-cdc.md | 12 -------- .../connectors/flink-sources/postgres-cdc.md | 12 -------- .../source/PostgresPipelineITCaseTest.java | 30 ++++++++++++++----- 3 files changed, 23 insertions(+), 31 deletions(-) diff --git a/docs/content.zh/docs/connectors/flink-sources/postgres-cdc.md b/docs/content.zh/docs/connectors/flink-sources/postgres-cdc.md index 741e08d65a8..f0e0de525c4 100644 --- a/docs/content.zh/docs/connectors/flink-sources/postgres-cdc.md +++ b/docs/content.zh/docs/connectors/flink-sources/postgres-cdc.md @@ -292,18 +292,6 @@ Connector Options (2) Table list (regex or predefined list) should only match the parent table name, if table list matches both parent and child tables, snapshot data will be read twice. - - table-id.include-database - optional - false - Boolean - - 是否在生成的 Table ID 中包含数据库名称。
- 如果设置为 true,Table ID 的格式为 (数据库, 模式, 表)。
- 如果设置为 false,Table ID 的格式为 (模式, 表)。
- 默认值为 false。 - - diff --git a/docs/content/docs/connectors/flink-sources/postgres-cdc.md b/docs/content/docs/connectors/flink-sources/postgres-cdc.md index 5d5f170f36f..533dea16c10 100644 --- a/docs/content/docs/connectors/flink-sources/postgres-cdc.md +++ b/docs/content/docs/connectors/flink-sources/postgres-cdc.md @@ -289,18 +289,6 @@ SELECT * FROM shipments; (2) Table list (regex or predefined list) should only match the parent table name, if table list matches both parent and child tables, snapshot data will be read twice. - - table-id.include-database - optional - false - Boolean - - Whether to include database in the generated Table ID.
- If set to true, the Table ID will be in the format (database, schema, table).
- If set to false, the Table ID will be in the format (schema, table).
- Defaults to false. - - diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java index 2adc54fa19a..b2aef9eed9c 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java @@ -57,7 +57,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,6 +75,7 @@ import java.util.Map; import java.util.UUID; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThat; import static org.testcontainers.containers.PostgreSQLContainer.POSTGRESQL_PORT; @@ -103,6 +105,14 @@ public void after() throws SQLException { inventoryDatabase.removeSlot(slotName); } + static Stream provideParameters() { + return Stream.of( + Arguments.of(true, true), + Arguments.of(false, false), + Arguments.of(true, false), + Arguments.of(false, true)); + } + @Test public void testInitialStartupMode() throws Exception { inventoryDatabase.createAndInitialize(); @@ -341,9 +351,10 @@ private CollectResultIterator addCollector( return iterator; } - @ParameterizedTest(name = "unboundedChunkFirst: {0}") - @ValueSource(booleans = {true, false}) - public void testInitialStartupModeWithOpts(boolean unboundedChunkFirst) throws Exception { + @ParameterizedTest + @MethodSource("provideParameters") + public void testInitialStartupModeWithOpts( + boolean unboundedChunkFirst, boolean isTableIdIncludeDatabase) throws Exception { inventoryDatabase.createAndInitialize(); org.apache.flink.cdc.common.configuration.Configuration sourceConfiguration = new org.apache.flink.cdc.common.configuration.Configuration(); @@ -365,7 +376,8 @@ public void testInitialStartupModeWithOpts(boolean unboundedChunkFirst) throws E sourceConfiguration.set( PostgresDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED, unboundedChunkFirst); - sourceConfiguration.set(PostgresDataSourceOptions.TABLE_ID_INCLUDE_DATABASE, true); + sourceConfiguration.set( + PostgresDataSourceOptions.TABLE_ID_INCLUDE_DATABASE, isTableIdIncludeDatabase); Factory.Context context = new FactoryHelper.DefaultContext( @@ -385,8 +397,12 @@ public void testInitialStartupModeWithOpts(boolean unboundedChunkFirst) throws E new EventTypeInfo()) .executeAndCollect(); - TableId tableId = - TableId.tableId(inventoryDatabase.getDatabaseName(), "inventory", "products"); + TableId tableId; + if (isTableIdIncludeDatabase) { + tableId = TableId.tableId(inventoryDatabase.getDatabaseName(), "inventory", "products"); + } else { + tableId = TableId.tableId("inventory", "products"); + } CreateTableEvent createTableEvent = getProductsCreateTableEvent(tableId); // generate snapshot data