Skip to content

Commit 708fb51

Browse files
committed
[postgres] Fix O(N²) JDBC metadata lookups in CustomPostgresSchema during snapshot
1 parent 9e075ca commit 708fb51

1 file changed

Lines changed: 22 additions & 12 deletions

File tree

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/CustomPostgresSchema.java

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
import java.util.HashMap;
4040
import java.util.List;
4141
import java.util.Map;
42-
import java.util.Objects;
4342

4443
/** A CustomPostgresSchema similar to PostgresSchema with customization. */
4544
public class CustomPostgresSchema {
@@ -82,7 +81,7 @@ public Map<TableId, TableChange> getTableSchema(List<TableId> tableIds) {
8281

8382
if (!unMatchTableIds.isEmpty()) {
8483
try {
85-
readTableSchema(tableIds);
84+
readTableSchema(unMatchTableIds);
8685
} catch (SQLException e) {
8786
throw new FlinkRuntimeException("Failed to read table schema", e);
8887
}
@@ -119,26 +118,37 @@ private List<TableChange> readTableSchema(List<TableId> tableIds) throws SQLExce
119118
throw new FlinkRuntimeException("Failed to read schema", e);
120119
}
121120

122-
for (TableId tableId : tableIds) {
123-
Table table = Objects.requireNonNull(tables.forTable(tableId));
124-
// set the events to populate proper sourceInfo into offsetContext
125-
offsetContext.event(tableId, Instant.now());
126-
127-
// TODO: check whether we always set isFromSnapshot = true
121+
// Cache all tables discovered by readSchema to avoid redundant full scans on future calls.
122+
for (TableId discoveredId : tables.tableIds()) {
123+
if (this.schemasByTableId.containsKey(discoveredId)) {
124+
continue;
125+
}
126+
Table table = tables.forTable(discoveredId);
127+
if (table == null) {
128+
continue;
129+
}
130+
offsetContext.event(discoveredId, Instant.now());
128131
SchemaChangeEvent schemaChangeEvent =
129132
SchemaChangeEvent.ofCreate(
130133
partition,
131134
offsetContext,
132135
dbzConfig.databaseName(),
133-
tableId.schema(),
136+
discoveredId.schema(),
134137
null,
135138
table,
136139
true);
137-
138140
for (TableChanges.TableChange tableChange : schemaChangeEvent.getTableChanges()) {
139-
this.schemasByTableId.put(tableId, tableChange);
141+
this.schemasByTableId.put(discoveredId, tableChange);
142+
}
143+
}
144+
145+
for (TableId tableId : tableIds) {
146+
TableChange cached = this.schemasByTableId.get(tableId);
147+
if (cached == null) {
148+
throw new FlinkRuntimeException(
149+
String.format("Failed to read table schema of table %s", tableId));
140150
}
141-
tableChanges.add(this.schemasByTableId.get(tableId));
151+
tableChanges.add(cached);
142152
}
143153
return tableChanges;
144154
}

0 commit comments

Comments
 (0)