Skip to content

Commit a8c6cc6

Browse files
authored
[Improve] Improve read table schema in cdc connector (#6702)
1 parent 6f74663 commit a8c6cc6

File tree

3 files changed

+43
-37
lines changed
  • seatunnel-connectors-v2/connector-cdc
    • connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils
    • connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils
    • connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils

3 files changed

+43
-37
lines changed

Diff for: seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleSchema.java

+13-10
Original file line numberDiff line numberDiff line change
@@ -57,14 +57,12 @@ public TableChange getTableSchema(JdbcConnection jdbc, TableId tableId) {
5757
TableChange schema = schemasByTableId.get(tableId);
5858
if (schema == null) {
5959
schema = readTableSchema(jdbc, tableId);
60-
schemasByTableId.put(tableId, schema);
6160
}
6261
return schema;
6362
}
6463

6564
private TableChange readTableSchema(JdbcConnection jdbc, TableId tableId) {
6665
OracleConnection oracleConnection = (OracleConnection) jdbc;
67-
final Map<TableId, TableChange> tableChangeMap = new HashMap<>();
6866
Tables tables = new Tables();
6967

7068
try {
@@ -75,22 +73,27 @@ private TableChange readTableSchema(JdbcConnection jdbc, TableId tableId) {
7573
connectorConfig.getTableFilters().dataCollectionFilter(),
7674
null,
7775
false);
78-
79-
Table table =
80-
CatalogTableUtils.mergeCatalogTableConfig(
81-
tables.forTable(tableId), tableMap.get(tableId));
82-
TableChange tableChange = new TableChange(TableChanges.TableChangeType.CREATE, table);
83-
tableChangeMap.put(tableId, tableChange);
76+
for (TableId id : tables.tableIds()) {
77+
if (tableMap.containsKey(id)) {
78+
Table table =
79+
CatalogTableUtils.mergeCatalogTableConfig(
80+
tables.forTable(id), tableMap.get(id));
81+
TableChanges.TableChange tableChange =
82+
new TableChanges.TableChange(
83+
TableChanges.TableChangeType.CREATE, table);
84+
schemasByTableId.put(id, tableChange);
85+
}
86+
}
8487
} catch (SQLException e) {
8588
throw new SeaTunnelException(
8689
String.format("Failed to read schema for table %s ", tableId), e);
8790
}
8891

89-
if (!tableChangeMap.containsKey(tableId)) {
92+
if (!schemasByTableId.containsKey(tableId)) {
9093
throw new SeaTunnelException(
9194
String.format("Can't obtain schema for table %s ", tableId));
9295
}
9396

94-
return tableChangeMap.get(tableId);
97+
return schemasByTableId.get(tableId);
9598
}
9699
}

Diff for: seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresSchema.java

+17-16
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import io.debezium.relational.history.TableChanges;
3131

3232
import java.sql.SQLException;
33-
import java.util.HashMap;
3433
import java.util.Map;
3534
import java.util.concurrent.ConcurrentHashMap;
3635

@@ -52,44 +51,46 @@ public TableChanges.TableChange getTableSchema(JdbcConnection jdbc, TableId tabl
5251
TableChanges.TableChange schema = schemasByTableId.get(tableId);
5352
if (schema == null) {
5453
schema = readTableSchema(jdbc, tableId);
55-
schemasByTableId.put(tableId, schema);
5654
}
5755
return schema;
5856
}
5957

6058
private TableChanges.TableChange readTableSchema(JdbcConnection jdbc, TableId tableId) {
61-
62-
CatalogTable catalogTable = tableMap.get(tableId);
6359
// Because the catalog is null in the postgresConnection.readSchema method
64-
tableId = new TableId(null, tableId.schema(), tableId.table());
60+
TableId tableIdWithoutCatalog = new TableId(null, tableId.schema(), tableId.table());
6561

6662
PostgresConnection postgresConnection = (PostgresConnection) jdbc;
67-
final Map<TableId, TableChanges.TableChange> tableChangeMap = new HashMap<>();
6863
Tables tables = new Tables();
6964
try {
7065
postgresConnection.readSchema(
7166
tables,
72-
tableId.catalog(),
73-
tableId.schema(),
67+
tableIdWithoutCatalog.catalog(),
68+
tableIdWithoutCatalog.schema(),
7469
connectorConfig.getTableFilters().dataCollectionFilter(),
7570
null,
7671
false);
77-
Table table =
78-
CatalogTableUtils.mergeCatalogTableConfig(
79-
tables.forTable(tableId), catalogTable);
80-
TableChanges.TableChange tableChange =
81-
new TableChanges.TableChange(TableChanges.TableChangeType.CREATE, table);
82-
tableChangeMap.put(tableId, tableChange);
72+
for (TableId id : tables.tableIds()) {
73+
TableId idWithCatalog = new TableId(tableId.catalog(), id.schema(), id.table());
74+
if (tableMap.containsKey(idWithCatalog)) {
75+
Table table =
76+
CatalogTableUtils.mergeCatalogTableConfig(
77+
tables.forTable(id), tableMap.get(idWithCatalog));
78+
TableChanges.TableChange tableChange =
79+
new TableChanges.TableChange(
80+
TableChanges.TableChangeType.CREATE, table);
81+
schemasByTableId.put(idWithCatalog, tableChange);
82+
}
83+
}
8384
} catch (SQLException e) {
8485
throw new SeaTunnelException(
8586
String.format("Failed to read schema for table %s ", tableId), e);
8687
}
8788

88-
if (!tableChangeMap.containsKey(tableId)) {
89+
if (!schemasByTableId.containsKey(tableId)) {
8990
throw new SeaTunnelException(
9091
String.format("Can't obtain schema for table %s ", tableId));
9192
}
9293

93-
return tableChangeMap.get(tableId);
94+
return schemasByTableId.get(tableId);
9495
}
9596
}

Diff for: seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerSchema.java

+13-11
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import io.debezium.relational.history.TableChanges.TableChange;
3232

3333
import java.sql.SQLException;
34-
import java.util.HashMap;
3534
import java.util.Map;
3635
import java.util.concurrent.ConcurrentHashMap;
3736

@@ -54,15 +53,12 @@ public TableChange getTableSchema(JdbcConnection jdbc, TableId tableId) {
5453
TableChange schema = schemasByTableId.get(tableId);
5554
if (schema == null) {
5655
schema = readTableSchema(jdbc, tableId);
57-
schemasByTableId.put(tableId, schema);
5856
}
5957
return schema;
6058
}
6159

6260
private TableChange readTableSchema(JdbcConnection jdbc, TableId tableId) {
6361
SqlServerConnection sqlServerConnection = (SqlServerConnection) jdbc;
64-
65-
final Map<TableId, TableChange> tableChangeMap = new HashMap<>();
6662
Tables tables = new Tables();
6763
try {
6864
sqlServerConnection.readSchema(
@@ -72,21 +68,27 @@ private TableChange readTableSchema(JdbcConnection jdbc, TableId tableId) {
7268
connectorConfig.getTableFilters().dataCollectionFilter(),
7369
null,
7470
false);
75-
Table table =
76-
CatalogTableUtils.mergeCatalogTableConfig(
77-
tables.forTable(tableId), tableMap.get(tableId));
78-
TableChange tableChange = new TableChange(TableChanges.TableChangeType.CREATE, table);
79-
tableChangeMap.put(tableId, tableChange);
71+
for (TableId id : tables.tableIds()) {
72+
if (tableMap.containsKey(id)) {
73+
Table table =
74+
CatalogTableUtils.mergeCatalogTableConfig(
75+
tables.forTable(id), tableMap.get(id));
76+
TableChanges.TableChange tableChange =
77+
new TableChanges.TableChange(
78+
TableChanges.TableChangeType.CREATE, table);
79+
schemasByTableId.put(id, tableChange);
80+
}
81+
}
8082
} catch (SQLException e) {
8183
throw new SeaTunnelException(
8284
String.format("Failed to read schema for table %s ", tableId), e);
8385
}
8486

85-
if (!tableChangeMap.containsKey(tableId)) {
87+
if (!schemasByTableId.containsKey(tableId)) {
8688
throw new SeaTunnelException(
8789
String.format("Can't obtain schema for table %s ", tableId));
8890
}
8991

90-
return tableChangeMap.get(tableId);
92+
return schemasByTableId.get(tableId);
9193
}
9294
}

0 commit comments

Comments
 (0)