Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions connectors-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@
<properties>
<connector.file.name>${project.artifactId}-v${project.version}</connector.file.name>
<java.version>8</java.version>
<tapdata.pdk.runner.verison>2.0-SNAPSHOT</tapdata.pdk.runner.verison>
<tapdata.api.verison>2.0.0-SNAPSHOT</tapdata.api.verison>
<tapdata.pdk.api.verison>2.0.0-SNAPSHOT</tapdata.pdk.api.verison>
<tapdata.pdk.runner.verison>2.1-SNAPSHOT</tapdata.pdk.runner.verison>
<tapdata.api.verison>2.0.1-SNAPSHOT</tapdata.api.verison>
<tapdata.pdk.api.verison>2.0.1-SNAPSHOT</tapdata.pdk.api.verison>
<tapdata.pdk.connector.core.version>1.0-SNAPSHOT</tapdata.pdk.connector.core.version>
<junit.jupiter.version>5.8.1</junit.jupiter.version>
<junit.platform.version>1.8.1</junit.platform.version>
Expand Down
2 changes: 1 addition & 1 deletion connectors-common/postgres-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<sql.core.version>1.0-SNAPSHOT</sql.core.version>
<debezium.version>1.5.4.Final</debezium.version>
<postgres.core.version>1.0-SNAPSHOT</postgres.core.version>
<tapdata.pdk.api.verison>2.0.0-SNAPSHOT</tapdata.pdk.api.verison>
<tapdata.pdk.api.verison>2.0.1-SNAPSHOT</tapdata.pdk.api.verison>
<fastjson.version>1.2.83</fastjson.version>
</properties>
<dependencyManagement>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,45 @@ protected void upsert(Map<String, Object> after, WriteListResult<TapRecordEvent>
}
}

@Override
public String getUpsertSql(Map<String, Object> after) throws SQLException {
String sql = getUpsertSql();
for (String key : allColumn) {
sql = sql.replaceFirst("\\?", formatValueForSql(after.get(key), columnTypeMap.get(key)));
}
for (String key : updatedColumn) {
sql = sql.replaceFirst("\\?", formatValueForSql(after.get(key), columnTypeMap.get(key)));
}
return sql;
}

private String formatValueForSql(Object value, String dataType) {
if (dataType.startsWith("bit")) {
if (value instanceof Boolean) {
value = ((Boolean) value) ? "1" : "0";
}else{
value = String.valueOf(value);
}
}
if(value instanceof byte[]){
return "'\\\\x" + StringKit.convertToHexString((byte[]) value) + "'";
}

if (dataType.endsWith("with time zone") && value instanceof LocalDateTime) {
Timestamp timestamp = Timestamp.valueOf(((LocalDateTime) value));
timestamp.setTime(timestamp.getTime() + TimeZone.getDefault().getRawOffset());
value = timestamp;
}
if (value instanceof Boolean && dataType.contains("int")) {
value = Boolean.TRUE.equals(value) ? 1 : 0;
}
if (value instanceof Boolean && dataType.contains("boolean")) {
value = Boolean.TRUE.equals(value) ? "true" : "false";
}
return object2String(value);
}


protected String getUpsertSql() {
return "INSERT INTO " + getSchemaAndTable() + " ("
+ allColumn.stream().map(this::quoteAndEscape).collect(Collectors.joining(", ")) + ") " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,10 @@ protected void upsert(Map<String, Object> after, WriteListResult<TapRecordEvent>
throw new UnsupportedOperationException("upsert is not supported");
}

public String getUpsertSql(Map<String, Object> after) throws SQLException {
throw new UnsupportedOperationException("upsert is not supported");
}

//插入唯一键冲突时忽略
protected void insertIgnore(Map<String, Object> after, WriteListResult<TapRecordEvent> listResult) throws SQLException {
throw new UnsupportedOperationException("insertIgnore is not supported");
Expand Down Expand Up @@ -512,4 +516,15 @@ protected String object2String(Object obj) {
}
return result;
}

protected String byteArrayToHexString(byte[] bytes) {
if (bytes == null || bytes.length == 0) {
return "";
}
StringBuilder sb = new StringBuilder();
for (byte b : bytes) {
sb.append(String.format("%02X", b));
}
return "0x"+ sb;
}
}
2 changes: 1 addition & 1 deletion connectors/postgres-connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
<properties>
<java.version>8</java.version>
<postgres.driver.version>42.7.5</postgres.driver.version>
<tapdata.pdk.api.version>2.0.0-SNAPSHOT</tapdata.pdk.api.version>
<tapdata.pdk.api.version>2.0.1-SNAPSHOT</tapdata.pdk.api.version>
</properties>

<dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,19 @@
import io.tapdata.connector.postgres.config.PostgresConfig;
import io.tapdata.connector.postgres.ddl.PostgresDDLSqlGenerator;
import io.tapdata.connector.postgres.dml.PostgresRecordWriter;
import io.tapdata.connector.postgres.dml.PostgresWriteRecorder;
import io.tapdata.connector.postgres.error.PostgresErrorCode;
import io.tapdata.connector.postgres.exception.PostgresExceptionCollector;
import io.tapdata.connector.postgres.partition.PostgresPartitionContext;
import io.tapdata.connector.postgres.partition.TableType;
import io.tapdata.entity.TapConstraintException;
import io.tapdata.entity.codec.TapCodecsRegistry;
import io.tapdata.entity.error.CoreException;
import io.tapdata.entity.event.TapEvent;
import io.tapdata.entity.event.ddl.constraint.TapCreateConstraintEvent;
import io.tapdata.entity.event.ddl.index.TapCreateIndexEvent;
import io.tapdata.entity.event.ddl.table.*;
import io.tapdata.entity.event.dml.TapInsertRecordEvent;
import io.tapdata.entity.event.dml.TapRecordEvent;
import io.tapdata.entity.schema.TapConstraint;
import io.tapdata.entity.schema.TapField;
Expand Down Expand Up @@ -241,6 +244,7 @@ public void registerCapabilities(ConnectorFunctions connectorFunctions, TapCodec
connectorFunctions.supportQueryHashByAdvanceFilterFunction(this::queryTableHash);
connectorFunctions.supportQueryPartitionTablesByParentName(this::discoverPartitionInfoByParentName);
connectorFunctions.supportStreamReadMultiConnectionFunction(this::streamReadMultiConnection);
connectorFunctions.supportExportEventSqlFunction(this::exportEventSql);

}

Expand Down Expand Up @@ -956,4 +960,12 @@ protected void createConstraint(TapConnectorContext connectorContext, TapTable t
}
}
}

public String exportEventSql(TapConnectorContext connectorContext, TapEvent tapEvent, TapTable table) throws SQLException {
if(tapEvent instanceof TapInsertRecordEvent){
PostgresWriteRecorder postgresWriter = new PostgresWriteRecorder(null, table, jdbcContext.getConfig().getSchema());
return postgresWriter.getUpsertSql(((TapInsertRecordEvent)tapEvent).getAfter());
}
return null;
}
}
Loading