Skip to content

Commit 874594c

Browse files
committed
fix: postgres pgoutput part publication with partition root table
1 parent 860181a commit 874594c

File tree

2 files changed

+10
-2
lines changed

2 files changed

+10
-2
lines changed

connectors-common/debezium-bucket/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,15 @@ protected void initPublication() {
171171
Set<TableId> tablesToCapture = determineCapturedTables();
172172
Set<TableId> existsTables = new HashSet<>();
173173
String catalog = connection().getCatalog();
174-
try (ResultSet resultSet = stmt.executeQuery(String.format("select * from pg_publication_tables where pubname='%s'", publicationName))) {
174+
try (ResultSet resultSet = stmt.executeQuery(String.format("select schemaname, tablename from pg_publication_tables pt join pg_publication pp\n" +
175+
" on pt.pubname = pp.pubname\n" +
176+
"where pt.pubname='%s' and pp.pubviaroot='false'\n" +
177+
"union all\n" +
178+
"select schemaname, pc.relname from pg_publication_tables pt join pg_publication pp\n" +
179+
" on pt.pubname = pp.pubname join pg_class pc on pc.oid in (SELECT inhrelid\n" +
180+
" FROM pg_inherits\n" +
181+
" WHERE inhparent = pt.tablename::regclass)\n" +
182+
"where pt.pubname='%s' and pp.pubviaroot='true'", publicationName, publicationName))) {
175183
while (resultSet.next()) {
176184
existsTables.add(new TableId(catalog, resultSet.getString("schemaname"), resultSet.getString("tablename")));
177185
}

connectors/postgres-connector/src/main/java/io/tapdata/connector/postgres/PostgresConnector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -861,7 +861,7 @@ private void createAllPublicationIfNotExist() throws SQLException {
861861
}
862862

863863
private void createCustomPublicationIfNotExist(List<String> tableList) {
864-
String sql = String.format("CREATE PUBLICATION %s FOR TABLE %s", slotName, tableList.stream().map(this::getSchemaAndTable).collect(Collectors.joining(", ")));
864+
String sql = String.format("CREATE PUBLICATION %s FOR TABLE %s %s", slotName, tableList.stream().map(this::getSchemaAndTable).collect(Collectors.joining(", ")), postgresConfig.getPartitionRoot() ? "WITH (publish_via_partition_root = true)" : "");
865865
try {
866866
tapLogger.info("Create publication sql: {}", sql);
867867
postgresJdbcContext.execute(sql);

0 commit comments

Comments
 (0)