Skip to content

Commit e620531

Browse files
authored
Merge pull request #540 from tapdata/develop
Develop
2 parents 5e930e7 + e973152 commit e620531

File tree

14 files changed

+297
-42
lines changed

14 files changed

+297
-42
lines changed

connectors-common/connector-core/pom.xml

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,15 @@
7676
</execution>
7777
</executions>
7878
</plugin>
79-
<!-- <plugin>-->
79+
<plugin>
80+
<groupId>org.apache.maven.plugins</groupId>
81+
<artifactId>maven-compiler-plugin</artifactId>
82+
<configuration>
83+
<source>17</source>
84+
<target>17</target>
85+
</configuration>
86+
</plugin>
87+
<!-- <plugin>-->
8088
<!-- <groupId>org.apache.maven.plugins</groupId>-->
8189
<!-- <artifactId>maven-source-plugin</artifactId>-->
8290
<!-- <executions>-->

connectors-common/connector-core/src/main/java/io/tapdata/kit/DbKit.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package io.tapdata.kit;
22

33
import io.tapdata.entity.logger.TapLogger;
4+
import io.tapdata.entity.schema.TapConstraint;
5+
import io.tapdata.entity.schema.TapConstraintMapping;
46
import io.tapdata.entity.schema.TapIndex;
57
import io.tapdata.entity.schema.TapIndexField;
68
import io.tapdata.entity.simplify.TapSimplify;
@@ -152,18 +154,37 @@ public static boolean ignoreCreateIndex(TapIndex exists, TapIndex created) {
152154
if (!exists.isUnique() && created.isUnique()) {
153155
return false;
154156
}
155-
return exists.getIndexFields().stream().map(TapIndexField::getName).collect(Collectors.toList())
157+
return exists.getIndexFields().stream().map(TapIndexField::getName).toList()
156158
.equals(created.getIndexFields().stream().map(TapIndexField::getName).collect(Collectors.toList()));
157159
}
158160

159161
public static String buildIndexName(String table) {
160162
return "IDX_" + table.substring(Math.max(table.length() - 10, 0)) + UUID.randomUUID().toString().replaceAll("-", "").substring(20);
161163
}
162164

165+
public static String buildIndexName(String table, TapIndex index, int maxLength) {
166+
if (EmptyKit.isNotBlank(index.getName()) && index.getName().length() <= maxLength) {
167+
return index.getName();
168+
}
169+
String indexName = table + "_" + index.getIndexFields().stream().map(TapIndexField::getName).collect(Collectors.joining("_"));
170+
if (indexName.length() + 4 <= maxLength) {
171+
return "IDX_" + indexName;
172+
}
173+
return "IDX_" + indexName.substring(0, maxLength - 12) + UUID.randomUUID().toString().replaceAll("-", "").substring(24);
174+
}
175+
163176
public static String buildForeignKeyName(String table) {
164177
return "FK_" + table.substring(Math.max(table.length() - 10, 0)) + UUID.randomUUID().toString().replaceAll("-", "").substring(20);
165178
}
166179

180+
public static String buildForeignKeyName(String table, TapConstraint constraint, int maxLength) {
181+
String foreignKeyName = table + "_" + constraint.getMappingFields().stream().map(TapConstraintMapping::getForeignKey).collect(Collectors.joining("_"));
182+
if (foreignKeyName.length() + 3 <= maxLength) {
183+
return "FK_" + foreignKeyName;
184+
}
185+
return "FK_" + foreignKeyName.substring(0, maxLength - 11) + UUID.randomUUID().toString().replaceAll("-", "").substring(24);
186+
}
187+
167188
public static <T> List<List<T>> splitToPieces(List<T> data, int eachPieceSize) {
168189
if (EmptyKit.isEmpty(data)) {
169190
return new ArrayList<>();
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
package io.tapdata.kit;
2+
3+
import java.io.IOException;
4+
import java.net.URI;
5+
import java.net.URLEncoder;
6+
import java.net.http.HttpClient;
7+
import java.net.http.HttpRequest;
8+
import java.net.http.HttpResponse;
9+
import java.nio.charset.StandardCharsets;
10+
import java.time.Duration;
11+
import java.util.Map;
12+
13+
public final class HttpKit {
14+
private static final HttpClient HTTP_CLIENT = HttpClient.newBuilder()
15+
.connectTimeout(Duration.ofSeconds(10))
16+
.followRedirects(HttpClient.Redirect.NORMAL)
17+
.build();
18+
19+
private HttpKit() {
20+
}
21+
22+
// GET请求(带查询参数)
23+
public static String get(String url, Map<String, String> params) {
24+
HttpRequest request = HttpRequest.newBuilder()
25+
.uri(buildUriWithParams(url, params))
26+
.timeout(Duration.ofSeconds(10))
27+
.GET()
28+
.build();
29+
return sendRequest(request);
30+
}
31+
32+
// POST表单请求(x-www-form-urlencoded)
33+
public static String postForm(String url, Map<String, String> formData) {
34+
String body = buildFormData(formData);
35+
HttpRequest request = HttpRequest.newBuilder()
36+
.uri(URI.create(url))
37+
.header("Content-Type", "application/x-www-form-urlencoded")
38+
.timeout(Duration.ofSeconds(10))
39+
.POST(HttpRequest.BodyPublishers.ofString(body))
40+
.build();
41+
return sendRequest(request);
42+
}
43+
44+
// POST JSON请求
45+
public static String postJson(String url, String json) {
46+
HttpRequest request = HttpRequest.newBuilder()
47+
.uri(URI.create(url))
48+
.header("Content-Type", "application/json")
49+
.timeout(Duration.ofSeconds(10))
50+
.POST(HttpRequest.BodyPublishers.ofString(json))
51+
.build();
52+
return sendRequest(request);
53+
}
54+
55+
// 通用请求发送方法
56+
private static String sendRequest(HttpRequest request) {
57+
try {
58+
HttpResponse<String> response = HTTP_CLIENT.send(request, HttpResponse.BodyHandlers.ofString());
59+
if (response.statusCode() >= 200 && response.statusCode() < 300) {
60+
return response.body();
61+
} else {
62+
throw new RuntimeException("HTTP Error: " + response.statusCode() + " - " + response.body());
63+
}
64+
} catch (IOException | InterruptedException e) {
65+
throw new RuntimeException("HTTP Request Failed: " + e.getMessage(), e);
66+
}
67+
}
68+
69+
// 构建带查询参数的URI
70+
private static URI buildUriWithParams(String url, Map<String, String> params) {
71+
if (params == null || params.isEmpty()) return URI.create(url);
72+
73+
StringBuilder query = new StringBuilder();
74+
params.forEach((key, value) -> {
75+
if (!query.isEmpty()) query.append('&');
76+
query.append(encode(key)).append('=').append(encode(value));
77+
});
78+
79+
String baseUrl = url.contains("?") ? url + "&" : url + "?";
80+
return URI.create(baseUrl + query);
81+
}
82+
83+
// 构建表单请求体
84+
private static String buildFormData(Map<String, String> formData) {
85+
if (formData == null || formData.isEmpty()) return "";
86+
87+
StringBuilder body = new StringBuilder();
88+
formData.forEach((key, value) -> {
89+
if (!body.isEmpty()) body.append('&');
90+
body.append(encode(key)).append('=').append(encode(value));
91+
});
92+
return body.toString();
93+
}
94+
95+
// URL编码工具方法
96+
private static String encode(String value) {
97+
return URLEncoder.encode(value, StandardCharsets.UTF_8);
98+
}
99+
}

connectors-common/debezium-bucket/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java

Lines changed: 95 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,10 @@
1212
import java.sql.SQLException;
1313
import java.sql.Statement;
1414
import java.time.Duration;
15-
import java.util.Objects;
16-
import java.util.Optional;
15+
import java.util.*;
1716
import java.util.concurrent.atomic.AtomicLong;
1817

18+
import io.debezium.relational.*;
1919
import org.apache.kafka.connect.errors.ConnectException;
2020
import org.postgresql.core.BaseConnection;
2121
import org.postgresql.replication.LogSequenceNumber;
@@ -32,10 +32,6 @@
3232
import io.debezium.connector.postgresql.spi.SlotState;
3333
import io.debezium.jdbc.JdbcConfiguration;
3434
import io.debezium.jdbc.JdbcConnection;
35-
import io.debezium.relational.Column;
36-
import io.debezium.relational.ColumnEditor;
37-
import io.debezium.relational.TableId;
38-
import io.debezium.relational.Tables;
3935
import io.debezium.util.Clock;
4036
import io.debezium.util.Metronome;
4137

@@ -509,4 +505,97 @@ public TypeRegistry getTypeRegistry() {
509505
Objects.requireNonNull(typeRegistry, "Connection does not provide type registry");
510506
return typeRegistry;
511507
}
508+
509+
public void readSchema(Tables tables, String databaseCatalog, String schemaNamePattern,
510+
Tables.TableFilter tableFilter, Tables.ColumnNameFilter columnFilter, boolean removeTablesNotFoundInJdbc)
511+
throws SQLException {
512+
// Before we make any changes, get the copy of the set of table IDs ...
513+
Set<TableId> tableIdsBefore = new HashSet<>(tables.tableIds());
514+
515+
// Read the metadata for the table columns ...
516+
DatabaseMetaData metadata = connection().getMetaData();
517+
518+
// Find regular and materialized views as they cannot be snapshotted
519+
final Set<TableId> viewIds = new HashSet<>();
520+
final Set<TableId> tableIds = new HashSet<>();
521+
522+
int totalTables = 0;
523+
try (final ResultSet rs = metadata.getTables(databaseCatalog, schemaNamePattern, null,
524+
new String[]{ "VIEW", "MATERIALIZED VIEW", "TABLE" })) {
525+
while (rs.next()) {
526+
final String schemaName = rs.getString(2);
527+
final String tableName = rs.getString(3);
528+
final String tableType = rs.getString(4);
529+
if ("TABLE".equals(tableType)) {
530+
totalTables++;
531+
TableId tableId = new TableId(null, schemaName, tableName);
532+
if (tableFilter == null || tableFilter.isIncluded(tableId)) {
533+
tableIds.add(tableId);
534+
}
535+
}
536+
else {
537+
TableId tableId = new TableId(null, schemaName, tableName);
538+
viewIds.add(tableId);
539+
}
540+
}
541+
}
542+
543+
Map<TableId, List<Column>> columnsByTable = new HashMap<>();
544+
545+
if (totalTables == tableIds.size() || config.getBoolean(RelationalDatabaseConnectorConfig.SNAPSHOT_FULL_COLUMN_SCAN_FORCE)) {
546+
columnsByTable = getColumnsDetails(databaseCatalog, schemaNamePattern, null, tableFilter, columnFilter, metadata, viewIds);
547+
}
548+
else {
549+
for (TableId includeTable : tableIds) {
550+
Map<TableId, List<Column>> cols = getColumnsDetails(databaseCatalog, schemaNamePattern, includeTable.table(), tableFilter,
551+
columnFilter, metadata, viewIds);
552+
columnsByTable.putAll(cols);
553+
}
554+
}
555+
556+
// Read the metadata for the primary keys ...
557+
for (Map.Entry<TableId, List<Column>> tableEntry : columnsByTable.entrySet()) {
558+
// First get the primary key information, which must be done for *each* table ...
559+
List<String> pkColumnNames = readPrimaryKeyOrUniqueIndexNames(metadata, tableEntry.getKey());
560+
561+
// Then define the table ...
562+
List<Column> columns = tableEntry.getValue();
563+
Collections.sort(columns);
564+
String defaultCharsetName = null; // JDBC does not expose character sets
565+
tables.overwriteTable(tableEntry.getKey(), columns, pkColumnNames, defaultCharsetName);
566+
}
567+
568+
if (removeTablesNotFoundInJdbc) {
569+
// Remove any definitions for tables that were not found in the database metadata ...
570+
tableIdsBefore.removeAll(columnsByTable.keySet());
571+
tableIdsBefore.forEach(tables::removeTable);
572+
}
573+
}
574+
575+
protected Map<TableId, List<Column>> getColumnsDetails(String databaseCatalog, String schemaNamePattern,
576+
String tableName, Tables.TableFilter tableFilter, Tables.ColumnNameFilter columnFilter, DatabaseMetaData metadata,
577+
final Set<TableId> viewIds)
578+
throws SQLException {
579+
Map<TableId, List<Column>> columnsByTable = new HashMap<>();
580+
try (ResultSet columnMetadata = metadata.getColumns(databaseCatalog, schemaNamePattern, tableName, null)) {
581+
while (columnMetadata.next()) {
582+
String schemaName = columnMetadata.getString(2);
583+
String metaTableName = columnMetadata.getString(3);
584+
TableId tableId = new TableId(null, schemaName, metaTableName);
585+
586+
// exclude views and non-captured tables
587+
if (viewIds.contains(tableId) ||
588+
(tableFilter != null && !tableFilter.isIncluded(tableId))) {
589+
continue;
590+
}
591+
592+
// add all included columns
593+
readTableColumn(columnMetadata, tableId, columnFilter).ifPresent(column -> {
594+
columnsByTable.computeIfAbsent(tableId, t -> new ArrayList<>())
595+
.add(column.create());
596+
});
597+
}
598+
}
599+
return columnsByTable;
600+
}
512601
}

connectors-common/debezium-bucket/debezium-core/src/main/java/io/debezium/jdbc/JdbcConnection.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,7 @@ private static String findAndReplace(String url, String name, Properties props,
306306
return url;
307307
}
308308

309-
private final Configuration config;
309+
protected final Configuration config;
310310
private final ConnectionFactory factory;
311311
private final Operations initialOps;
312312
private volatile Connection conn;
@@ -1209,7 +1209,7 @@ public void readSchema(Tables tables, String databaseCatalog, String schemaNameP
12091209
}
12101210
}
12111211

1212-
private Map<TableId, List<Column>> getColumnsDetails(String databaseCatalog, String schemaNamePattern,
1212+
protected Map<TableId, List<Column>> getColumnsDetails(String databaseCatalog, String schemaNamePattern,
12131213
String tableName, TableFilter tableFilter, ColumnNameFilter columnFilter, DatabaseMetaData metadata,
12141214
final Set<TableId> viewIds)
12151215
throws SQLException {

connectors-common/mysql-core/src/main/java/io/tapdata/connector/mysql/config/MysqlConfig.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ public MysqlConfig() {
1616
setDbType("mysql");
1717
setEscapeChar('`');
1818
setJdbcDriver("com.mysql.cj.jdbc.Driver");
19+
setMaxIndexNameLength(64);
1920
}
2021

2122
private static final Map<String, String> DEFAULT_PROPERTIES = new HashMap<String, String>() {{

connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/config/PostgresConfig.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ public class PostgresConfig extends CommonDbConfig implements Serializable {
3232
public PostgresConfig() {
3333
setDbType("postgresql");
3434
setJdbcDriver("org.postgresql.Driver");
35+
setMaxIndexNameLength(63);
3536
}
3637

3738
@Override

connectors-common/sql-core/src/main/java/io/tapdata/common/CommonDbConfig.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@ public class CommonDbConfig implements Serializable {
6565
private String sslKeyPassword;
6666
protected String sslRandomPath;
6767

68+
private int maxIndexNameLength = 30;
69+
6870
//pattern for jdbc-url
6971
public String getDatabaseUrlPattern() {
7072
// last %s reserved for extend params
@@ -435,4 +437,12 @@ public String getSslRandomPath() {
435437
public void setSslRandomPath(String sslRandomPath) {
436438
this.sslRandomPath = sslRandomPath;
437439
}
440+
441+
public int getMaxIndexNameLength() {
442+
return maxIndexNameLength;
443+
}
444+
445+
public void setMaxIndexNameLength(int maxIndexNameLength) {
446+
this.maxIndexNameLength = maxIndexNameLength;
447+
}
438448
}

connectors-common/sql-core/src/main/java/io/tapdata/common/CommonDbConnector.java

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -434,7 +434,7 @@ protected void createIndex(TapConnectorContext connectorContext, TapTable tapTab
434434
if (!exceptionCollector.violateIndexName(e)) {
435435
tapLogger.warn("Create index failed {}, please execute it manually [{}]", e.getMessage(), sql);
436436
} else {
437-
String rename = i.getName() + "_" + UUID.randomUUID().toString().replaceAll("-", "").substring(28);
437+
String rename = i.getName().substring(0, i.getName().length() - 5) + "_" + UUID.randomUUID().toString().replaceAll("-", "").substring(28);
438438
tapLogger.warn("Create index failed {}, rename {} to {} and retry ...", e.getMessage(), i.getName(), rename);
439439
i.setName(rename);
440440
sql = getCreateIndexSql(tapTable, i);
@@ -446,8 +446,8 @@ protected void createIndex(TapConnectorContext connectorContext, TapTable tapTab
446446
}
447447
}
448448
});
449-
List<String> afterUniqueAutoIncrementSql =getAfterUniqueAutoIncrementFields(tapTable, indexList);
450-
if(EmptyKit.isNotEmpty(afterUniqueAutoIncrementSql)){
449+
List<String> afterUniqueAutoIncrementSql = getAfterUniqueAutoIncrementFields(tapTable, indexList);
450+
if (EmptyKit.isNotEmpty(afterUniqueAutoIncrementSql)) {
451451
afterUniqueAutoIncrementSql.forEach(sql -> {
452452
try {
453453
jdbcContext.execute(sql);
@@ -646,13 +646,9 @@ protected String getCreateIndexSql(TapTable tapTable, TapIndex tapIndex) {
646646
sb.append("unique ");
647647
}
648648
sb.append("index ");
649-
if (EmptyKit.isNotBlank(tapIndex.getName())) {
650-
sb.append(escapeChar).append(tapIndex.getName()).append(escapeChar);
651-
} else {
652-
String indexName = DbKit.buildIndexName(tapTable.getId());
653-
tapIndex.setName(indexName);
654-
sb.append(escapeChar).append(indexName).append(escapeChar);
655-
}
649+
String indexName = DbKit.buildIndexName(tapTable.getId(), tapIndex, commonDbConfig.getMaxIndexNameLength());
650+
tapIndex.setName(indexName);
651+
sb.append(escapeChar).append(indexName).append(escapeChar);
656652
sb.append(" on ").append(getSchemaAndTable(tapTable.getId())).append('(')
657653
.append(tapIndex.getIndexFields().stream().map(f -> escapeChar + f.getName() + escapeChar + " " + (f.getFieldAsc() ? "asc" : "desc"))
658654
.collect(Collectors.joining(","))).append(')');
@@ -666,7 +662,7 @@ protected String getCreateConstraintSql(TapTable tapTable, TapConstraint tapCons
666662
if (EmptyKit.isNotBlank(tapConstraint.getName())) {
667663
sb.append(escapeChar).append(tapConstraint.getName()).append(escapeChar);
668664
} else {
669-
sb.append(escapeChar).append(DbKit.buildForeignKeyName(tapTable.getId())).append(escapeChar);
665+
sb.append(escapeChar).append(DbKit.buildForeignKeyName(tapTable.getId(), tapConstraint, 32)).append(escapeChar);
670666
}
671667
sb.append(" foreign key (").append(escapeChar).append(tapConstraint.getMappingFields().stream().map(TapConstraintMapping::getForeignKey).collect(Collectors.joining(escapeChar + "," + escapeChar))).append(escapeChar).append(") references ")
672668
.append(getSchemaAndTable(tapConstraint.getReferencesTableName())).append('(').append(escapeChar).append(tapConstraint.getMappingFields().stream().map(TapConstraintMapping::getReferenceKey).collect(Collectors.joining(escapeChar + "," + escapeChar))).append(escapeChar).append(')');
@@ -975,7 +971,8 @@ protected long countByAdvanceFilterV2(TapConnectorContext connectorContext, TapT
975971
});
976972
return count.get();
977973
}
978-
protected List<String> getAfterUniqueAutoIncrementFields(TapTable tapTable,List<TapIndex> indexList) {
974+
975+
protected List<String> getAfterUniqueAutoIncrementFields(TapTable tapTable, List<TapIndex> indexList) {
979976
return new ArrayList<>();
980977
}
981978

0 commit comments

Comments
 (0)