Skip to content

Commit ad002bb

Browse files
authored
Merge pull request #501 from tapdata/develop
Develop
2 parents 60fb193 + b6df249 commit ad002bb

File tree

64 files changed

+5122
-74
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

64 files changed

+5122
-74
lines changed

connectors-common/debezium-bucket/debezium-core/src/main/java/io/debezium/jdbc/JdbcConnection.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package io.debezium.jdbc;
77

88
import java.lang.reflect.InvocationTargetException;
9+
import java.net.URLEncoder;
910
import java.sql.CallableStatement;
1011
import java.sql.Connection;
1112
import java.sql.DatabaseMetaData;
@@ -293,8 +294,12 @@ private static String findAndReplace(String url, String name, Properties props,
293294
}
294295

295296
if (value != null) {
296-
// And replace the variable ...
297-
url = url.replaceAll("\\$\\{" + name + "\\}", value);
297+
if ("dbname".equals(name)) {
298+
url = url.replaceAll("\\$\\{" + name + "\\}", URLEncoder.encode(value));
299+
} else {
300+
// And replace the variable ...
301+
url = url.replaceAll("\\$\\{" + name + "\\}", value);
302+
}
298303
}
299304
}
300305
}

connectors-common/mysql-core/src/main/java/io/tapdata/connector/mysql/MysqlJdbcContextV2.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import java.time.ZoneId;
1919
import java.util.*;
2020
import java.util.concurrent.atomic.AtomicReference;
21+
import java.util.stream.Collectors;
2122

2223
public class MysqlJdbcContextV2 extends JdbcContext {
2324

@@ -57,26 +58,26 @@ public TimeZone queryTimeZone() throws SQLException {
5758

5859
@Override
5960
protected String queryAllTablesSql(String schema, List<String> tableNames) {
60-
String tableSql = EmptyKit.isNotEmpty(tableNames) ? "AND TABLE_NAME IN (" + StringKit.joinString(tableNames, "'", ",") + ")" : "";
61-
return String.format(MYSQL_ALL_TABLE, StringKit.escape(schema, "'"), tableSql);
61+
String tableSql = EmptyKit.isNotEmpty(tableNames) ? "AND TABLE_NAME IN ('" + tableNames.stream().map(v -> StringKit.escape(v, "'\\")).collect(Collectors.joining("','")) + "')" : "";
62+
return String.format(MYSQL_ALL_TABLE, StringKit.escape(schema, "'\\"), tableSql);
6263
}
6364

6465
@Override
6566
protected String queryAllColumnsSql(String schema, List<String> tableNames) {
66-
String tableSql = EmptyKit.isNotEmpty(tableNames) ? "AND TABLE_NAME IN (" + StringKit.joinString(tableNames, "'", ",") + ")" : "";
67-
return String.format(MYSQL_ALL_COLUMN, StringKit.escape(schema, "'"), tableSql);
67+
String tableSql = EmptyKit.isNotEmpty(tableNames) ? "AND TABLE_NAME IN ('" + tableNames.stream().map(v -> StringKit.escape(v, "'\\")).collect(Collectors.joining("','")) + "')" : "";
68+
return String.format(MYSQL_ALL_COLUMN, StringKit.escape(schema, "'\\"), tableSql);
6869
}
6970

7071
@Override
7172
protected String queryAllIndexesSql(String schema, List<String> tableNames) {
72-
String tableSql = EmptyKit.isNotEmpty(tableNames) ? "AND TABLE_NAME IN (" + StringKit.joinString(tableNames, "'", ",") + ")" : "";
73-
return String.format(MYSQL_ALL_INDEX, StringKit.escape(schema, "'"), tableSql);
73+
String tableSql = EmptyKit.isNotEmpty(tableNames) ? "AND TABLE_NAME IN ('" + tableNames.stream().map(v -> StringKit.escape(v, "'\\")).collect(Collectors.joining("','")) + "')" : "";
74+
return String.format(MYSQL_ALL_INDEX, StringKit.escape(schema, "'\\"), tableSql);
7475
}
7576

7677
@Override
7778
protected String queryAllForeignKeysSql(String schema, List<String> tableNames) {
78-
String tableSql = EmptyKit.isNotEmpty(tableNames) ? "AND k.TABLE_NAME IN (" + StringKit.joinString(tableNames, "'", ",") + ")" : "";
79-
return String.format(MYSQL_ALL_FOREIGN_KEY, StringKit.escape(schema, "'"), tableSql);
79+
String tableSql = EmptyKit.isNotEmpty(tableNames) ? "AND k.TABLE_NAME IN ('" + tableNames.stream().map(v -> StringKit.escape(v, "'\\")).collect(Collectors.joining("','")) + "')" : "";
80+
return String.format(MYSQL_ALL_FOREIGN_KEY, StringKit.escape(schema, "'\\"), tableSql);
8081
}
8182

8283
public DataMap getTableInfo(String tableName) {
@@ -85,7 +86,7 @@ public DataMap getTableInfo(String tableName) {
8586
list.add("TABLE_ROWS");
8687
list.add("DATA_LENGTH");
8788
try {
88-
query(String.format(GET_TABLE_INFO_SQL, StringKit.escape(getConfig().getDatabase(), "'"), StringKit.escape(tableName, "'")), resultSet -> {
89+
query(String.format(GET_TABLE_INFO_SQL, StringKit.escape(getConfig().getDatabase(), "'\\"), StringKit.escape(tableName, "'\\")), resultSet -> {
8990
while (resultSet.next()) {
9091
dataMap.putAll(DbKit.getRowFromResultSet(resultSet, list));
9192
}

connectors-common/mysql-core/src/test/java/io/tapdata/connector/mysql/MysqlReaderTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -315,14 +315,14 @@ void init() {
315315
void testNormal() throws IOException {
316316
try (MockedStatic<StringCompressUtil> scu = mockStatic(StringCompressUtil.class)) {
317317
when(stateMap.get(MysqlReader.MYSQL_SCHEMA_HISTORY)).thenReturn("xxx");
318-
scu.when(() -> StringCompressUtil.uncompress(anyString())).thenReturn("{\"key\": [\"1\"]}");
318+
scu.when(() -> StringCompressUtil.uncompress(any())).thenReturn("{\"key\": [\"1\"]}");
319319
Assertions.assertDoesNotThrow(() -> reader.initMysqlSchemaHistory(tapConnectorContext));
320320
}
321321
}
322322
@Test
323323
void testNotString() {
324324
try (MockedStatic<StringCompressUtil> scu = mockStatic(StringCompressUtil.class)) {
325-
scu.when(() -> StringCompressUtil.uncompress(anyString())).thenReturn("{\"key\": 1}");
325+
scu.when(() -> StringCompressUtil.uncompress(any())).thenReturn("{\"key\": 1}");
326326
when(stateMap.get(MysqlReader.MYSQL_SCHEMA_HISTORY)).thenReturn(100);
327327
Assertions.assertDoesNotThrow(() -> reader.initMysqlSchemaHistory(tapConnectorContext));
328328
}
@@ -331,7 +331,7 @@ void testNotString() {
331331
@Test
332332
void testException() {
333333
try (MockedStatic<StringCompressUtil> scu = mockStatic(StringCompressUtil.class)) {
334-
scu.when(() -> StringCompressUtil.uncompress(anyString())).thenAnswer(a -> {
334+
scu.when(() -> StringCompressUtil.uncompress(any())).thenAnswer(a -> {
335335
throw new IOException("");
336336
});
337337
when(stateMap.get(MysqlReader.MYSQL_SCHEMA_HISTORY)).thenReturn("123456789");

connectors-common/mysql-core/src/test/java/io/tapdata/connector/mysql/util/StringCompressUtilTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ class StringCompressUtilTest {
1010
void compressJson() {
1111
String str = "{}";
1212
Assertions.assertDoesNotThrow(() -> {
13-
String compress = StringCompressUtil.compress(str);
13+
byte[] compress = StringCompressUtil.compress(str);
1414
String uncompress = StringCompressUtil.uncompress(compress);
1515
Assertions.assertEquals(str, uncompress);
1616
});
@@ -19,7 +19,7 @@ void compressJson() {
1919
@Test
2020
void compressNull() {
2121
Assertions.assertDoesNotThrow(() -> {
22-
String compress = StringCompressUtil.compress(null);
22+
byte[] compress = StringCompressUtil.compress(null);
2323
String uncompress = StringCompressUtil.uncompress(compress);
2424
Assertions.assertNull(uncompress);
2525
});
@@ -29,7 +29,7 @@ void compressNull() {
2929
void compressEmpty() {
3030
String str = "";
3131
Assertions.assertDoesNotThrow(() -> {
32-
String compress = StringCompressUtil.compress(str);
32+
byte[] compress = StringCompressUtil.compress(str);
3333
String uncompress = StringCompressUtil.uncompress(compress);
3434
Assertions.assertEquals(str, uncompress);
3535
});

connectors-common/postgres-core/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
<sql.core.version>1.0-SNAPSHOT</sql.core.version>
2222
<debezium.version>1.5.4.Final</debezium.version>
2323
<postgres.core.version>1.0-SNAPSHOT</postgres.core.version>
24-
<tapdata.pdk.api.verison>1.4.4-SNAPSHOT</tapdata.pdk.api.verison>
24+
<tapdata.pdk.api.verison>1.4.5-SNAPSHOT</tapdata.pdk.api.verison>
2525

2626
</properties>
2727
<dependencyManagement>

connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/PostgresJdbcContext.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import java.util.List;
1818
import java.util.TimeZone;
1919
import java.util.concurrent.atomic.AtomicReference;
20+
import java.util.stream.Collectors;
2021

2122
public class PostgresJdbcContext extends JdbcContext {
2223

@@ -69,7 +70,7 @@ public Long queryTimestamp() throws SQLException {
6970

7071
@Override
7172
protected String queryAllTablesSql(String schema, List<String> tableNames) {
72-
String tableSql = EmptyKit.isNotEmpty(tableNames) ? "AND t.table_name IN (" + StringKit.joinString(tableNames, "'", ",") + ")" : "";
73+
String tableSql = EmptyKit.isNotEmpty(tableNames) ? "AND t.table_name IN ('" + tableNames.stream().map(v -> StringKit.escape(v, "'")).collect(Collectors.joining("','")) + "')" : "";
7374
if (Integer.parseInt(postgresVersion) < 100000) {
7475
return String.format(PG_ALL_TABLE_LOWER_VERSION, StringKit.escape(getConfig().getDatabase(), "'"), StringKit.escape(schema, "'"), tableSql);
7576
}
@@ -78,7 +79,7 @@ protected String queryAllTablesSql(String schema, List<String> tableNames) {
7879

7980
@Override
8081
protected String queryAllColumnsSql(String schema, List<String> tableNames) {
81-
String tableSql = EmptyKit.isNotEmpty(tableNames) ? "AND table_name IN (" + StringKit.joinString(tableNames, "'", ",") + ")" : "";
82+
String tableSql = EmptyKit.isNotEmpty(tableNames) ? "AND table_name IN ('" + tableNames.stream().map(v -> StringKit.escape(v, "'")).collect(Collectors.joining("','")) + "')" : "";
8283
if (Integer.parseInt(postgresVersion) < 100000) {
8384
return String.format(PG_ALL_COLUMN_LOWER_VERSION, StringKit.escape(schema, "'"), StringKit.escape(getConfig().getDatabase(), "'"), StringKit.escape(schema, "'"), tableSql);
8485
}
@@ -87,13 +88,13 @@ protected String queryAllColumnsSql(String schema, List<String> tableNames) {
8788

8889
@Override
8990
protected String queryAllIndexesSql(String schema, List<String> tableNames) {
90-
String tableSql = EmptyKit.isNotEmpty(tableNames) ? "AND table_name IN (" + StringKit.joinString(tableNames, "'", ",") + ")" : "";
91+
String tableSql = EmptyKit.isNotEmpty(tableNames) ? "AND table_name IN ('" + tableNames.stream().map(v -> StringKit.escape(v, "'")).collect(Collectors.joining("','")) + "')" : "";
9192
return String.format(PG_ALL_INDEX, StringKit.escape(getConfig().getDatabase(), "'"), StringKit.escape(schema, "'"), tableSql);
9293
}
9394

9495
@Override
9596
protected String queryAllForeignKeysSql(String schema, List<String> tableNames) {
96-
return String.format(PG_ALL_FOREIGN_KEY, StringKit.escape(getConfig().getSchema(), "'"), EmptyKit.isEmpty(tableNames) ? "" : " and pc.relname in (" + StringKit.joinString(tableNames, "'", ",") + ")");
97+
return String.format(PG_ALL_FOREIGN_KEY, StringKit.escape(getConfig().getSchema(), "'"), EmptyKit.isEmpty(tableNames) ? "" : " and pc.relname in ('" + tableNames.stream().map(v -> StringKit.escape(v, "'")).collect(Collectors.joining("','")) + "')");
9798
}
9899

99100
public DataMap getTableInfo(String tableName) {

connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/PostgresSqlMaker.java

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,17 @@ protected void buildAutoIncDefinition(StringBuilder builder, TapField tapField)
7070
} else {
7171
startValue = 1;
7272
}
73+
long incrementValue;
74+
if (EmptyKit.isNotNull(tapField.getAutoIncrementValue())) {
75+
incrementValue = tapField.getAutoIncrementValue();
76+
} else {
77+
incrementValue = 1;
78+
}
7379
builder.append("GENERATED BY DEFAULT AS IDENTITY (START WITH ")
7480
.append(startValue)
75-
.append(" INCREMENT BY 1 CACHE ")
81+
.append(" INCREMENT BY ")
82+
.append(incrementValue)
83+
.append(" CACHE ")
7684
.append(autoIncCacheValue)
7785
.append(") ");
7886
}
@@ -131,17 +139,31 @@ public void buildOrderClause(StringBuilder builder, TapAdvanceFilter filter) {
131139
if (null != collate) {
132140
return getOrderByFieldClauseWithCollate(v, collate);
133141
} else {
134-
return v.toString(String.valueOf(escapeChar));
142+
StringBuilder sb = new StringBuilder();
143+
sb.append(escapeChar).append(v.getKey()).append(escapeChar).append(" ").append(v.getSort() == ASCENDING ? "ASC" : "DESC");
144+
sb.append(buildNullSortClause(v));
145+
return sb.toString();
135146
}
136147
}).collect(Collectors.joining(", "))).append(' ');
137148
}
138149
}
139150

151+
private static String buildNullSortClause(SortOn v) {
152+
if (v.getNullSort() == 1) {
153+
return ' ' + "NULLS" + ' ' + "FIRST";
154+
} else if (v.getNullSort() == 2) {
155+
return ' ' + "NULLS" + ' ' + "LAST";
156+
} else {
157+
return "";
158+
}
159+
}
160+
140161
protected String getOrderByFieldClauseWithCollate(SortOn sortOn, Collate collate) {
141162
StringBuilder sb = new StringBuilder();
142163
sb.append(escapeChar).append(sortOn.getKey()).append(escapeChar);
143164
sb.append(' ').append(buildCollate(collate.getCollateName())).append(' ');
144165
sb.append((sortOn.getSort() == ASCENDING ? "ASC" : "DESC"));
166+
sb.append(buildNullSortClause(sortOn));
145167
return sb.toString();
146168
}
147169

connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/config/PostgresDebeziumConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ public Configuration create() {
8787
// .with("offset.storage.file.filename", "d:/cdc/offset/" + slotName + ".dat") //path must be changed with requirement
8888
.with("offset.flush.interval.ms", 60000)
8989
.with("name", slotName + "-postgres-connector")
90-
.with("database.server.name", URLEncoder.encode(postgresConfig.getDatabase()))
90+
.with("database.server.name", URLEncoder.encode(postgresConfig.getDatabase()).replace("*", ""))
9191
.with("database.hostname", postgresConfig.getHost())
9292
.with("database.port", postgresConfig.getPort())
9393
.with("database.user", postgresConfig.getUser())

connectors-common/sql-core/src/main/java/io/tapdata/common/CommonDbConfig.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
import java.io.File;
1212
import java.io.Serializable;
13+
import java.net.URLEncoder;
1314
import java.time.ZoneId;
1415
import java.util.Map;
1516
import java.util.Properties;
@@ -47,6 +48,7 @@ public class CommonDbConfig implements Serializable {
4748
private long autoIncJumpValue = 1000000L;
4849
private long autoIncCacheValue = 100;
4950
private Boolean applyDefault = false;
51+
private Boolean applyForeignKey = false;
5052
private Integer writeThreadSize = 15;
5153
protected String timezone = "+00:00";
5254
protected ZoneId zoneId;
@@ -85,7 +87,7 @@ public String getDatabaseUrl() {
8587
if (EmptyKit.isNotEmpty(this.getExtParams()) && !this.getExtParams().startsWith("?") && !this.getExtParams().startsWith(":")) {
8688
this.setExtParams("?" + this.getExtParams());
8789
}
88-
return String.format(this.getDatabaseUrlPattern(), this.getHost(), this.getPort(), this.getDatabase(), this.getExtParams());
90+
return String.format(this.getDatabaseUrlPattern(), this.getHost(), this.getPort(), URLEncoder.encode(this.getDatabase()), this.getExtParams());
8991
}
9092

9193
public CommonDbConfig load(String json) {
@@ -322,6 +324,14 @@ public void setApplyDefault(Boolean applyDefault) {
322324
this.applyDefault = applyDefault;
323325
}
324326

327+
public Boolean getApplyForeignKey() {
328+
return applyForeignKey;
329+
}
330+
331+
public void setApplyForeignKey(Boolean applyForeignKey) {
332+
this.applyForeignKey = applyForeignKey;
333+
}
334+
325335
public Integer getWriteThreadSize() {
326336
return writeThreadSize;
327337
}

connectors-common/sql-core/src/main/java/io/tapdata/common/CommonDbConnector.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -450,6 +450,9 @@ protected void createIndex(TapConnectorContext connectorContext, TapTable tapTab
450450
}
451451

452452
protected void createConstraint(TapConnectorContext connectorContext, TapTable tapTable, TapCreateConstraintEvent createConstraintEvent, boolean create) {
453+
if (!Boolean.TRUE.equals(commonDbConfig.getApplyForeignKey())) {
454+
return;
455+
}
453456
List<TapConstraint> constraintList = createConstraintEvent.getConstraintList();
454457
if (EmptyKit.isNotEmpty(constraintList)) {
455458
List<String> constraintSqlList = new ArrayList<>();
@@ -748,7 +751,7 @@ protected String getHashSplitModConditions(TapTable tapTable, int maxSplit, int
748751

749752
protected String getBatchReadSelectSql(TapTable tapTable) {
750753
String columns = tapTable.getNameFieldMap().keySet().stream().map(c -> commonDbConfig.getEscapeChar() + StringKit.escape(c, commonDbConfig.getEscapeChar()) + commonDbConfig.getEscapeChar()).collect(Collectors.joining(","));
751-
return String.format("SELECT %s FROM " + getSchemaAndTable(tapTable.getId()), columns);
754+
return "SELECT " + columns + " FROM " + getSchemaAndTable(tapTable.getId());
752755
}
753756

754757
protected void batchReadWithHashSplit(TapConnectorContext tapConnectorContext, TapTable tapTable, Object offsetState, int eventBatchSize, BiConsumer<List<TapEvent>, Object> eventsOffsetConsumer) throws Throwable {

0 commit comments

Comments
 (0)