Skip to content

Commit 204726e

Browse files
committed
fix: postgres wal2json catalog bug
1 parent 4a3121d commit 204726e

File tree

2 files changed

+70
-8
lines changed

2 files changed

+70
-8
lines changed

connectors-common/debezium-bucket/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java

Lines changed: 68 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,10 @@
1212
import java.sql.SQLException;
1313
import java.sql.Statement;
1414
import java.time.Duration;
15-
import java.util.Objects;
16-
import java.util.Optional;
15+
import java.util.*;
1716
import java.util.concurrent.atomic.AtomicLong;
1817

18+
import io.debezium.relational.*;
1919
import org.apache.kafka.connect.errors.ConnectException;
2020
import org.postgresql.core.BaseConnection;
2121
import org.postgresql.replication.LogSequenceNumber;
@@ -32,10 +32,6 @@
3232
import io.debezium.connector.postgresql.spi.SlotState;
3333
import io.debezium.jdbc.JdbcConfiguration;
3434
import io.debezium.jdbc.JdbcConnection;
35-
import io.debezium.relational.Column;
36-
import io.debezium.relational.ColumnEditor;
37-
import io.debezium.relational.TableId;
38-
import io.debezium.relational.Tables;
3935
import io.debezium.util.Clock;
4036
import io.debezium.util.Metronome;
4137

@@ -509,4 +505,70 @@ public TypeRegistry getTypeRegistry() {
509505
Objects.requireNonNull(typeRegistry, "Connection does not provide type registry");
510506
return typeRegistry;
511507
}
508+
509+
public void readSchema(Tables tables, String databaseCatalog, String schemaNamePattern,
510+
Tables.TableFilter tableFilter, Tables.ColumnNameFilter columnFilter, boolean removeTablesNotFoundInJdbc)
511+
throws SQLException {
512+
// Before we make any changes, get the copy of the set of table IDs ...
513+
Set<TableId> tableIdsBefore = new HashSet<>(tables.tableIds());
514+
515+
// Read the metadata for the table columns ...
516+
DatabaseMetaData metadata = connection().getMetaData();
517+
518+
// Find regular and materialized views as they cannot be snapshotted
519+
final Set<TableId> viewIds = new HashSet<>();
520+
final Set<TableId> tableIds = new HashSet<>();
521+
522+
int totalTables = 0;
523+
try (final ResultSet rs = metadata.getTables(databaseCatalog, schemaNamePattern, null,
524+
new String[]{ "VIEW", "MATERIALIZED VIEW", "TABLE" })) {
525+
while (rs.next()) {
526+
final String schemaName = rs.getString(2);
527+
final String tableName = rs.getString(3);
528+
final String tableType = rs.getString(4);
529+
if ("TABLE".equals(tableType)) {
530+
totalTables++;
531+
TableId tableId = new TableId(null, schemaName, tableName);
532+
if (tableFilter == null || tableFilter.isIncluded(tableId)) {
533+
tableIds.add(tableId);
534+
}
535+
}
536+
else {
537+
TableId tableId = new TableId(null, schemaName, tableName);
538+
viewIds.add(tableId);
539+
}
540+
}
541+
}
542+
543+
Map<TableId, List<Column>> columnsByTable = new HashMap<>();
544+
545+
if (totalTables == tableIds.size() || config.getBoolean(RelationalDatabaseConnectorConfig.SNAPSHOT_FULL_COLUMN_SCAN_FORCE)) {
546+
columnsByTable = getColumnsDetails(databaseCatalog, schemaNamePattern, null, tableFilter, columnFilter, metadata, viewIds);
547+
}
548+
else {
549+
for (TableId includeTable : tableIds) {
550+
Map<TableId, List<Column>> cols = getColumnsDetails(databaseCatalog, schemaNamePattern, includeTable.table(), tableFilter,
551+
columnFilter, metadata, viewIds);
552+
columnsByTable.putAll(cols);
553+
}
554+
}
555+
556+
// Read the metadata for the primary keys ...
557+
for (Map.Entry<TableId, List<Column>> tableEntry : columnsByTable.entrySet()) {
558+
// First get the primary key information, which must be done for *each* table ...
559+
List<String> pkColumnNames = readPrimaryKeyOrUniqueIndexNames(metadata, tableEntry.getKey());
560+
561+
// Then define the table ...
562+
List<Column> columns = tableEntry.getValue();
563+
Collections.sort(columns);
564+
String defaultCharsetName = null; // JDBC does not expose character sets
565+
tables.overwriteTable(tableEntry.getKey(), columns, pkColumnNames, defaultCharsetName);
566+
}
567+
568+
if (removeTablesNotFoundInJdbc) {
569+
// Remove any definitions for tables that were not found in the database metadata ...
570+
tableIdsBefore.removeAll(columnsByTable.keySet());
571+
tableIdsBefore.forEach(tables::removeTable);
572+
}
573+
}
512574
}

connectors-common/debezium-bucket/debezium-core/src/main/java/io/debezium/jdbc/JdbcConnection.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,7 @@ private static String findAndReplace(String url, String name, Properties props,
306306
return url;
307307
}
308308

309-
private final Configuration config;
309+
protected final Configuration config;
310310
private final ConnectionFactory factory;
311311
private final Operations initialOps;
312312
private volatile Connection conn;
@@ -1209,7 +1209,7 @@ public void readSchema(Tables tables, String databaseCatalog, String schemaNameP
12091209
}
12101210
}
12111211

1212-
private Map<TableId, List<Column>> getColumnsDetails(String databaseCatalog, String schemaNamePattern,
1212+
protected Map<TableId, List<Column>> getColumnsDetails(String databaseCatalog, String schemaNamePattern,
12131213
String tableName, TableFilter tableFilter, ColumnNameFilter columnFilter, DatabaseMetaData metadata,
12141214
final Set<TableId> viewIds)
12151215
throws SQLException {

0 commit comments

Comments
 (0)