Skip to content

Commit c4822b0

Browse files
Merge branch 'develop' into feat_TAP-5816_Support-export-and-repair-sql
# Conflicts: # connectors-common/postgres-core/pom.xml
2 parents 3de2132 + 6c4eead commit c4822b0

File tree

44 files changed

+2413
-56
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+2413
-56
lines changed

connectors-common/connector-core/src/main/java/io/tapdata/kit/HttpKit.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
package io.tapdata.kit;
22

3+
import java.io.ByteArrayOutputStream;
34
import java.io.IOException;
5+
import java.io.InputStream;
6+
import java.io.OutputStream;
7+
import java.net.Socket;
48
import java.net.URI;
59
import java.net.URLEncoder;
610
import java.net.http.HttpClient;
@@ -96,4 +100,52 @@ private static String buildFormData(Map<String, String> formData) {
96100
private static String encode(String value) {
97101
return URLEncoder.encode(value, StandardCharsets.UTF_8);
98102
}
103+
104+
public static String sendHttp09Request(String host, int port, String data) throws IOException {
105+
106+
try (Socket socket = new Socket(host, port)) {
107+
socket.setSoTimeout(10000); // 10秒超时
108+
109+
// 使用OutputStream直接发送,避免PrintWriter的自动换行问题
110+
OutputStream out = socket.getOutputStream();
111+
InputStream in = socket.getInputStream();
112+
113+
// 构建HTTP/0.9请求
114+
// 注意:真正的HTTP/0.9格式非常简单,可能不需要所有头部
115+
String request = "POST /\r\n" +
116+
"Content-Type: application/json\r\n" +
117+
"Content-Length: " + data.length() + "\r\n" +
118+
"\r\n" +
119+
data;
120+
121+
// 发送请求
122+
out.write(request.getBytes(StandardCharsets.UTF_8));
123+
out.flush();
124+
125+
// 读取响应
126+
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
127+
byte[] buffer = new byte[1024];
128+
int bytesRead;
129+
130+
// 设置一个简单的超时机制
131+
long startTime = System.currentTimeMillis();
132+
while (System.currentTimeMillis() - startTime < 5000) { // 5秒超时
133+
if (in.available() > 0) {
134+
bytesRead = in.read(buffer);
135+
if (bytesRead > 0) {
136+
responseBuffer.write(buffer, 0, bytesRead);
137+
}
138+
} else {
139+
try {
140+
Thread.sleep(100);
141+
} catch (InterruptedException e) {
142+
Thread.currentThread().interrupt();
143+
break;
144+
}
145+
}
146+
}
147+
148+
return responseBuffer.toString(StandardCharsets.UTF_8);
149+
}
150+
}
99151
}

connectors-common/file-connector-core/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@
1313
<packaging>jar</packaging>
1414

1515
<properties>
16-
<tapdata.api.version>2.0.0-SNAPSHOT</tapdata.api.version>
17-
<tapdata.pdk.api.version>2.0.0-SNAPSHOT</tapdata.pdk.api.version>
16+
<tapdata.api.version>2.0.1-SNAPSHOT</tapdata.api.version>
17+
<tapdata.pdk.api.version>2.0.1-SNAPSHOT</tapdata.pdk.api.version>
1818
<java.version>8</java.version>
1919
</properties>
2020

connectors-common/file-connector-core/src/main/java/io/tapdata/common/FileConnector.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ public int tableCount(TapConnectionContext connectionContext) {
106106
return 1;
107107
}
108108

109-
protected long batchCount(TapConnectorContext tapConnectorContext, TapTable tapTable) {
109+
protected long batchCount(TapConnectorContext tapConnectorContext, TapTable tapTable) throws Exception {
110110
return 0;
111111
}
112112

@@ -262,4 +262,8 @@ protected void initMergeCacheFilesThread() {
262262
});
263263
}
264264

265+
protected String correctPath(String path) {
266+
return path.endsWith("/") ? path : (path + "/");
267+
}
268+
265269
}

connectors-common/postgres-core/pom.xml

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
<debezium.version>1.5.4.Final</debezium.version>
2323
<postgres.core.version>1.0-SNAPSHOT</postgres.core.version>
2424
<tapdata.pdk.api.verison>2.0.1-SNAPSHOT</tapdata.pdk.api.verison>
25-
25+
<fastjson.version>1.2.83</fastjson.version>
2626
</properties>
2727
<dependencyManagement>
2828
<dependencies>
@@ -57,6 +57,11 @@
5757
<artifactId>commons-lang3</artifactId>
5858
<version>${commons.lang3.version}</version>
5959
</dependency>
60+
<dependency>
61+
<groupId>com.alibaba</groupId>
62+
<artifactId>fastjson</artifactId>
63+
<version>${fastjson.version}</version>
64+
</dependency>
6065
</dependencies>
6166
</dependencyManagement>
6267

@@ -92,6 +97,10 @@
9297
<groupId>org.apache.commons</groupId>
9398
<artifactId>commons-lang3</artifactId>
9499
</dependency>
100+
<dependency>
101+
<groupId>com.alibaba</groupId>
102+
<artifactId>fastjson</artifactId>
103+
</dependency>
95104
<dependency>
96105
<groupId>io.debezium</groupId>
97106
<artifactId>debezium-embedded</artifactId>

connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/AbstractWalLogMiner.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public abstract class AbstractWalLogMiner {
4040
protected boolean filterSchema;
4141
private Map<String, String> dataTypeMap;
4242
protected final AtomicReference<Throwable> threadException = new AtomicReference<>();
43-
private final PostgresCDCSQLParser sqlParser = new PostgresCDCSQLParser();
43+
protected final PostgresCDCSQLParser sqlParser = new PostgresCDCSQLParser();
4444
protected final PostgresConfig postgresConfig;
4545
protected boolean withSchema;
4646
protected Map<String, List<String>> schemaTableMap;

connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/PostgresCDCSQLParser.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public ResultDO from(String sql, Boolean undo) {
4141
default:
4242
break;
4343
}
44-
throw sr.ex("SQL must start with 'INSERT' or 'DELETE' or 'UPDATE'");
44+
return null;
4545
}
4646

4747
protected ResultDO updateBuild(SQLReader sr) {
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
package io.tapdata.connector.postgres.cdc;
2+
3+
import com.alibaba.fastjson.JSONObject;
4+
import io.tapdata.common.sqlparser.ResultDO;
5+
import io.tapdata.connector.postgres.PostgresJdbcContext;
6+
import io.tapdata.connector.postgres.cdc.offset.PostgresOffset;
7+
import io.tapdata.entity.event.TapEvent;
8+
import io.tapdata.entity.logger.Log;
9+
import io.tapdata.entity.simplify.TapSimplify;
10+
import io.tapdata.kit.EmptyKit;
11+
import io.tapdata.kit.HttpKit;
12+
13+
import java.util.ArrayList;
14+
import java.util.List;
15+
import java.util.Map;
16+
import java.util.concurrent.atomic.AtomicReference;
17+
import java.util.function.Supplier;
18+
19+
import static io.tapdata.entity.simplify.TapSimplify.list;
20+
21+
public class WalPgtoMiner extends AbstractWalLogMiner {
22+
23+
public WalPgtoMiner(PostgresJdbcContext postgresJdbcContext, Log tapLogger) {
24+
super(postgresJdbcContext, tapLogger);
25+
}
26+
27+
@Override
28+
public AbstractWalLogMiner offset(Object offsetState) {
29+
return this;
30+
}
31+
32+
@Override
33+
public void startMiner(Supplier<Boolean> isAlive) throws Throwable {
34+
while (isAlive.get()) {
35+
String json = HttpKit.sendHttp09Request(postgresConfig.getPgtoHost(), postgresConfig.getPgtoPort(), "seek");
36+
AtomicReference<List<TapEvent>> tapEvents = new AtomicReference<>(list());
37+
extractJsonObjects(json).forEach(str -> {
38+
JSONObject jsonObject = TapSimplify.fromJson(str, JSONObject.class);
39+
String sql = jsonObject.getString("sql");
40+
ResultDO resultDO = sqlParser.from(sql, false);
41+
if (EmptyKit.isNull(resultDO)) {
42+
return;
43+
}
44+
String schema = resultDO.getSchema();
45+
String tableName = resultDO.getTableName();
46+
String op;
47+
switch (resultDO.getOp()) {
48+
case INSERT:
49+
op = "1";
50+
break;
51+
case UPDATE:
52+
op = "2";
53+
break;
54+
case DELETE:
55+
op = "3";
56+
break;
57+
default:
58+
op = null;
59+
}
60+
if (!postgresConfig.getSchema().equals(schema) || !tableList.contains(tableName)) {
61+
return;
62+
}
63+
NormalRedo redo = new NormalRedo();
64+
redo.setSqlRedo(sql);
65+
redo.setOperation(op);
66+
for (Map.Entry<String, Object> entry : resultDO.getData().entrySet()) {
67+
parseKeyAndValue(tableName, entry);
68+
}
69+
redo.setRedoRecord(resultDO.getData());
70+
redo.setNameSpace(schema);
71+
redo.setTableName(tableName);
72+
redo.setTimestamp(System.currentTimeMillis());
73+
tapEvents.get().add(createEvent(redo));
74+
if (tapEvents.get().size() >= recordSize) {
75+
consumer.accept(tapEvents.get(), new PostgresOffset());
76+
tapEvents.set(new ArrayList<>());
77+
}
78+
});
79+
if (EmptyKit.isNotEmpty(tapEvents.get())) {
80+
consumer.accept(tapEvents.get(), new PostgresOffset());
81+
}
82+
}
83+
}
84+
85+
private List<String> extractJsonObjects(String input) {
86+
List<String> jsonObjects = new ArrayList<>();
87+
StringBuilder jsonBuilder = null;
88+
int braceCount = 0;
89+
boolean insideJson = false;
90+
91+
for (char c : input.toCharArray()) {
92+
if (c == '{') {
93+
if (!insideJson) {
94+
// 开始新的JSON对象
95+
jsonBuilder = new StringBuilder();
96+
insideJson = true;
97+
}
98+
braceCount++;
99+
}
100+
101+
if (insideJson) {
102+
jsonBuilder.append(c);
103+
104+
if (c == '}') {
105+
braceCount--;
106+
// 当所有大括号都闭合时
107+
if (braceCount == 0) {
108+
jsonObjects.add(jsonBuilder.toString());
109+
insideJson = false;
110+
}
111+
}
112+
}
113+
}
114+
115+
return jsonObjects;
116+
}
117+
}

connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/config/PostgresConfig.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ public class PostgresConfig extends CommonDbConfig implements Serializable {
2727
private Integer maximumQueueSize = 8000;
2828
private List<String> distributedKey = new ArrayList<>();
2929
private Boolean isPartition = false;
30+
private String pgtoHost;
31+
private int pgtoPort;
3032

3133
//customize
3234
public PostgresConfig() {
@@ -132,4 +134,20 @@ public Boolean getIsPartition() {
132134
public void setIsPartition(Boolean isPartition) {
133135
this.isPartition = isPartition;
134136
}
137+
138+
public String getPgtoHost() {
139+
return pgtoHost;
140+
}
141+
142+
public void setPgtoHost(String pgtoHost) {
143+
this.pgtoHost = pgtoHost;
144+
}
145+
146+
public int getPgtoPort() {
147+
return pgtoPort;
148+
}
149+
150+
public void setPgtoPort(int pgtoPort) {
151+
this.pgtoPort = pgtoPort;
152+
}
135153
}

connectors-common/sql-core/src/main/java/io/tapdata/common/sqlparser/ResultDO.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,22 @@ public void setSchema(String schema) {
2626
this.schema = schema;
2727
}
2828

29+
public String getTableName() {
30+
return tableName;
31+
}
32+
2933
public void setTableName(String tableName) {
3034
this.tableName = tableName;
3135
}
3236

37+
public Operate getOp() {
38+
return op;
39+
}
40+
41+
public void setOp(Operate op) {
42+
this.op = op;
43+
}
44+
3345
public Map<String, Object> getData() {
3446
return data;
3547
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package io.tapdata.common;
2+
3+
import io.tapdata.entity.codec.TapCodecsRegistry;
4+
import io.tapdata.entity.schema.TapField;
5+
import io.tapdata.entity.schema.TapTable;
6+
import io.tapdata.pdk.apis.context.TapConnectionContext;
7+
import io.tapdata.pdk.apis.entity.ConnectionOptions;
8+
import io.tapdata.pdk.apis.entity.TestItem;
9+
import io.tapdata.pdk.apis.functions.ConnectorFunctions;
10+
import org.junit.jupiter.api.BeforeEach;
11+
import org.junit.jupiter.api.Test;
12+
import org.springframework.test.util.ReflectionTestUtils;
13+
14+
import java.util.function.Consumer;
15+
16+
public class CommonDbConnectorTest {
17+
18+
CommonDbConnector common;
19+
TapTable tapTable;
20+
21+
@BeforeEach
22+
void init(){
23+
common = new CommonDbConnector() {
24+
@Override
25+
public ConnectionOptions connectionTest(TapConnectionContext connectionContext, Consumer<TestItem> consumer) {
26+
return null;
27+
}
28+
29+
@Override
30+
public void registerCapabilities(ConnectorFunctions connectorFunctions, TapCodecsRegistry codecRegistry) {
31+
32+
}
33+
34+
@Override
35+
public void onStart(TapConnectionContext connectionContext) {
36+
37+
}
38+
39+
@Override
40+
public void onStop(TapConnectionContext connectionContext) {
41+
42+
}
43+
};
44+
CommonDbConfig config = new CommonDbConfig();
45+
config.setMaxIndexNameLength(32);
46+
ReflectionTestUtils.setField(common, "commonDbConfig", config);
47+
48+
tapTable = new TapTable("test");
49+
tapTable.add(new TapField("a1", "int"));
50+
tapTable.add(new TapField("a2", "varchar(20)"));
51+
tapTable.add(new TapField("a3", "varchar(20)"));
52+
}
53+
54+
@Test
55+
public void testGetCreateIndexSql() {
56+
// common.getCreateIndexSql()
57+
}
58+
}

0 commit comments

Comments
 (0)