Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package io.tapdata.kit;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.net.URI;
import java.net.URLEncoder;
import java.net.http.HttpClient;
Expand Down Expand Up @@ -96,4 +100,52 @@ private static String buildFormData(Map<String, String> formData) {
private static String encode(String value) {
return URLEncoder.encode(value, StandardCharsets.UTF_8);
}

public static String sendHttp09Request(String host, int port, String data) throws IOException {

try (Socket socket = new Socket(host, port)) {
socket.setSoTimeout(10000); // 10秒超时

// 使用OutputStream直接发送,避免PrintWriter的自动换行问题
OutputStream out = socket.getOutputStream();
InputStream in = socket.getInputStream();

// 构建HTTP/0.9请求
// 注意:真正的HTTP/0.9格式非常简单,可能不需要所有头部
String request = "POST /\r\n" +
"Content-Type: application/json\r\n" +
"Content-Length: " + data.length() + "\r\n" +
"\r\n" +
data;

// 发送请求
out.write(request.getBytes(StandardCharsets.UTF_8));
out.flush();

// 读取响应
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
byte[] buffer = new byte[1024];
int bytesRead;

// 设置一个简单的超时机制
long startTime = System.currentTimeMillis();
while (System.currentTimeMillis() - startTime < 5000) { // 5秒超时
if (in.available() > 0) {
bytesRead = in.read(buffer);
if (bytesRead > 0) {
responseBuffer.write(buffer, 0, bytesRead);
}
} else {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}

return responseBuffer.toString(StandardCharsets.UTF_8);
}
}
}
4 changes: 2 additions & 2 deletions connectors-common/file-connector-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
<packaging>jar</packaging>

<properties>
<tapdata.api.version>2.0.0-SNAPSHOT</tapdata.api.version>
<tapdata.pdk.api.version>2.0.0-SNAPSHOT</tapdata.pdk.api.version>
<tapdata.api.version>2.0.1-SNAPSHOT</tapdata.api.version>
<tapdata.pdk.api.version>2.0.1-SNAPSHOT</tapdata.pdk.api.version>
<java.version>8</java.version>
</properties>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public int tableCount(TapConnectionContext connectionContext) {
return 1;
}

protected long batchCount(TapConnectorContext tapConnectorContext, TapTable tapTable) {
protected long batchCount(TapConnectorContext tapConnectorContext, TapTable tapTable) throws Exception {
return 0;
}

Expand Down Expand Up @@ -262,4 +262,8 @@ protected void initMergeCacheFilesThread() {
});
}

protected String correctPath(String path) {
return path.endsWith("/") ? path : (path + "/");
}

}
11 changes: 10 additions & 1 deletion connectors-common/postgres-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<debezium.version>1.5.4.Final</debezium.version>
<postgres.core.version>1.0-SNAPSHOT</postgres.core.version>
<tapdata.pdk.api.verison>2.0.0-SNAPSHOT</tapdata.pdk.api.verison>

<fastjson.version>1.2.83</fastjson.version>
</properties>
<dependencyManagement>
<dependencies>
Expand Down Expand Up @@ -57,6 +57,11 @@
<artifactId>commons-lang3</artifactId>
<version>${commons.lang3.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down Expand Up @@ -92,6 +97,10 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-embedded</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public abstract class AbstractWalLogMiner {
protected boolean filterSchema;
private Map<String, String> dataTypeMap;
protected final AtomicReference<Throwable> threadException = new AtomicReference<>();
private final PostgresCDCSQLParser sqlParser = new PostgresCDCSQLParser();
protected final PostgresCDCSQLParser sqlParser = new PostgresCDCSQLParser();
protected final PostgresConfig postgresConfig;
protected boolean withSchema;
protected Map<String, List<String>> schemaTableMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public ResultDO from(String sql, Boolean undo) {
default:
break;
}
throw sr.ex("SQL must start with 'INSERT' or 'DELETE' or 'UPDATE'");
return null;
}

protected ResultDO updateBuild(SQLReader sr) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package io.tapdata.connector.postgres.cdc;

import com.alibaba.fastjson.JSONObject;
import io.tapdata.common.sqlparser.ResultDO;
import io.tapdata.connector.postgres.PostgresJdbcContext;
import io.tapdata.connector.postgres.cdc.offset.PostgresOffset;
import io.tapdata.entity.event.TapEvent;
import io.tapdata.entity.logger.Log;
import io.tapdata.entity.simplify.TapSimplify;
import io.tapdata.kit.EmptyKit;
import io.tapdata.kit.HttpKit;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

import static io.tapdata.entity.simplify.TapSimplify.list;

public class WalPgtoMiner extends AbstractWalLogMiner {

public WalPgtoMiner(PostgresJdbcContext postgresJdbcContext, Log tapLogger) {
super(postgresJdbcContext, tapLogger);
}

@Override
public AbstractWalLogMiner offset(Object offsetState) {
return this;
}

@Override
public void startMiner(Supplier<Boolean> isAlive) throws Throwable {
while (isAlive.get()) {
String json = HttpKit.sendHttp09Request(postgresConfig.getPgtoHost(), postgresConfig.getPgtoPort(), "seek");
AtomicReference<List<TapEvent>> tapEvents = new AtomicReference<>(list());
extractJsonObjects(json).forEach(str -> {
JSONObject jsonObject = TapSimplify.fromJson(str, JSONObject.class);
String sql = jsonObject.getString("sql");
ResultDO resultDO = sqlParser.from(sql, false);
if (EmptyKit.isNull(resultDO)) {
return;
}
String schema = resultDO.getSchema();
String tableName = resultDO.getTableName();
String op;
switch (resultDO.getOp()) {
case INSERT:
op = "1";
break;
case UPDATE:
op = "2";
break;
case DELETE:
op = "3";
break;
default:
op = null;
}
if (!postgresConfig.getSchema().equals(schema) || !tableList.contains(tableName)) {
return;
}
NormalRedo redo = new NormalRedo();
redo.setSqlRedo(sql);
redo.setOperation(op);
for (Map.Entry<String, Object> entry : resultDO.getData().entrySet()) {
parseKeyAndValue(tableName, entry);
}
redo.setRedoRecord(resultDO.getData());
redo.setNameSpace(schema);
redo.setTableName(tableName);
redo.setTimestamp(System.currentTimeMillis());
tapEvents.get().add(createEvent(redo));
if (tapEvents.get().size() >= recordSize) {
consumer.accept(tapEvents.get(), new PostgresOffset());
tapEvents.set(new ArrayList<>());
}
});
if (EmptyKit.isNotEmpty(tapEvents.get())) {
consumer.accept(tapEvents.get(), new PostgresOffset());
}
}
}

private List<String> extractJsonObjects(String input) {
List<String> jsonObjects = new ArrayList<>();
StringBuilder jsonBuilder = null;
int braceCount = 0;
boolean insideJson = false;

for (char c : input.toCharArray()) {
if (c == '{') {
if (!insideJson) {
// 开始新的JSON对象
jsonBuilder = new StringBuilder();
insideJson = true;
}
braceCount++;
}

if (insideJson) {
jsonBuilder.append(c);

if (c == '}') {
braceCount--;
// 当所有大括号都闭合时
if (braceCount == 0) {
jsonObjects.add(jsonBuilder.toString());
insideJson = false;
}
}
}
}

return jsonObjects;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ public class PostgresConfig extends CommonDbConfig implements Serializable {
private Integer maximumQueueSize = 8000;
private List<String> distributedKey = new ArrayList<>();
private Boolean isPartition = false;
private String pgtoHost;
private int pgtoPort;

//customize
public PostgresConfig() {
Expand Down Expand Up @@ -132,4 +134,20 @@ public Boolean getIsPartition() {
public void setIsPartition(Boolean isPartition) {
this.isPartition = isPartition;
}

public String getPgtoHost() {
return pgtoHost;
}

public void setPgtoHost(String pgtoHost) {
this.pgtoHost = pgtoHost;
}

public int getPgtoPort() {
return pgtoPort;
}

public void setPgtoPort(int pgtoPort) {
this.pgtoPort = pgtoPort;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,22 @@ public void setSchema(String schema) {
this.schema = schema;
}

public String getTableName() {
return tableName;
}

public void setTableName(String tableName) {
this.tableName = tableName;
}

public Operate getOp() {
return op;
}

public void setOp(Operate op) {
this.op = op;
}

public Map<String, Object> getData() {
return data;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package io.tapdata.common;

import io.tapdata.entity.codec.TapCodecsRegistry;
import io.tapdata.entity.schema.TapField;
import io.tapdata.entity.schema.TapTable;
import io.tapdata.pdk.apis.context.TapConnectionContext;
import io.tapdata.pdk.apis.entity.ConnectionOptions;
import io.tapdata.pdk.apis.entity.TestItem;
import io.tapdata.pdk.apis.functions.ConnectorFunctions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.test.util.ReflectionTestUtils;

import java.util.function.Consumer;

public class CommonDbConnectorTest {

CommonDbConnector common;
TapTable tapTable;

@BeforeEach
void init(){
common = new CommonDbConnector() {
@Override
public ConnectionOptions connectionTest(TapConnectionContext connectionContext, Consumer<TestItem> consumer) {
return null;
}

@Override
public void registerCapabilities(ConnectorFunctions connectorFunctions, TapCodecsRegistry codecRegistry) {

}

@Override
public void onStart(TapConnectionContext connectionContext) {

}

@Override
public void onStop(TapConnectionContext connectionContext) {

}
};
CommonDbConfig config = new CommonDbConfig();
config.setMaxIndexNameLength(32);
ReflectionTestUtils.setField(common, "commonDbConfig", config);

tapTable = new TapTable("test");
tapTable.add(new TapField("a1", "int"));
tapTable.add(new TapField("a2", "varchar(20)"));
tapTable.add(new TapField("a3", "varchar(20)"));
}

@Test
public void testGetCreateIndexSql() {
// common.getCreateIndexSql()
}
}
2 changes: 1 addition & 1 deletion connectors/csv-connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
<packaging>jar</packaging>

<properties>
<tapdata.pdk.api.version>2.0.0-SNAPSHOT</tapdata.pdk.api.version>
<tapdata.pdk.api.version>2.0.1-SNAPSHOT</tapdata.pdk.api.version>
<java.version>8</java.version>
</properties>

Expand Down
2 changes: 1 addition & 1 deletion connectors/excel-connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
<packaging>jar</packaging>

<properties>
<tapdata.pdk.api.version>2.0.0-SNAPSHOT</tapdata.pdk.api.version>
<tapdata.pdk.api.version>2.0.1-SNAPSHOT</tapdata.pdk.api.version>
<java.version>8</java.version>
</properties>

Expand Down
Loading
Loading