diff --git a/connectors-common/pom.xml b/connectors-common/pom.xml index 7bc358079..03c451ac2 100644 --- a/connectors-common/pom.xml +++ b/connectors-common/pom.xml @@ -27,9 +27,9 @@ ${project.artifactId}-v${project.version} 8 - 2.0-SNAPSHOT - 2.0.0-SNAPSHOT - 2.0.0-SNAPSHOT + 2.1-SNAPSHOT + 2.0.1-SNAPSHOT + 2.0.1-SNAPSHOT 1.0-SNAPSHOT 5.8.1 1.8.1 diff --git a/connectors-common/postgres-core/pom.xml b/connectors-common/postgres-core/pom.xml index 6e86f5c87..fbb825a92 100644 --- a/connectors-common/postgres-core/pom.xml +++ b/connectors-common/postgres-core/pom.xml @@ -21,7 +21,7 @@ 1.0-SNAPSHOT 1.5.4.Final 1.0-SNAPSHOT - 2.0.0-SNAPSHOT + 2.0.1-SNAPSHOT 1.2.83 diff --git a/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/dml/PostgresWriteRecorder.java b/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/dml/PostgresWriteRecorder.java index 81415689e..c52c7fd00 100644 --- a/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/dml/PostgresWriteRecorder.java +++ b/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/dml/PostgresWriteRecorder.java @@ -255,6 +255,45 @@ protected void upsert(Map after, WriteListResult } } + @Override + public String getUpsertSql(Map after) throws SQLException { + String sql = getUpsertSql(); + for (String key : allColumn) { + sql = sql.replaceFirst("\\?", formatValueForSql(after.get(key), columnTypeMap.get(key))); + } + for (String key : updatedColumn) { + sql = sql.replaceFirst("\\?", formatValueForSql(after.get(key), columnTypeMap.get(key))); + } + return sql; + } + + private String formatValueForSql(Object value, String dataType) { + if (dataType.startsWith("bit")) { + if (value instanceof Boolean) { + value = ((Boolean) value) ? "1" : "0"; + }else{ + value = String.valueOf(value); + } + } + if(value instanceof byte[]){ + return "'\\\\x" + StringKit.convertToHexString((byte[]) value) + "'"; + } + + if (dataType.endsWith("with time zone") && value instanceof LocalDateTime) { + Timestamp timestamp = Timestamp.valueOf(((LocalDateTime) value)); + timestamp.setTime(timestamp.getTime() + TimeZone.getDefault().getRawOffset()); + value = timestamp; + } + if (value instanceof Boolean && dataType.contains("int")) { + value = Boolean.TRUE.equals(value) ? 1 : 0; + } + if (value instanceof Boolean && dataType.contains("boolean")) { + value = Boolean.TRUE.equals(value) ? "true" : "false"; + } + return object2String(value); + } + + protected String getUpsertSql() { return "INSERT INTO " + getSchemaAndTable() + " (" + allColumn.stream().map(this::quoteAndEscape).collect(Collectors.joining(", ")) + ") " + diff --git a/connectors-common/sql-core/src/main/java/io/tapdata/common/dml/NormalWriteRecorder.java b/connectors-common/sql-core/src/main/java/io/tapdata/common/dml/NormalWriteRecorder.java index 78e86da41..d8bdad837 100644 --- a/connectors-common/sql-core/src/main/java/io/tapdata/common/dml/NormalWriteRecorder.java +++ b/connectors-common/sql-core/src/main/java/io/tapdata/common/dml/NormalWriteRecorder.java @@ -297,6 +297,10 @@ protected void upsert(Map after, WriteListResult throw new UnsupportedOperationException("upsert is not supported"); } + public String getUpsertSql(Map after) throws SQLException { + throw new UnsupportedOperationException("upsert is not supported"); + } + //插入唯一键冲突时忽略 protected void insertIgnore(Map after, WriteListResult listResult) throws SQLException { throw new UnsupportedOperationException("insertIgnore is not supported"); @@ -512,4 +516,15 @@ protected String object2String(Object obj) { } return result; } + + protected String byteArrayToHexString(byte[] bytes) { + if (bytes == null || bytes.length == 0) { + return ""; + } + StringBuilder sb = new StringBuilder(); + for (byte b : bytes) { + sb.append(String.format("%02X", b)); + } + return "0x"+ sb; + } } diff --git a/connectors/postgres-connector/pom.xml b/connectors/postgres-connector/pom.xml index 9073e7140..8a170a760 100644 --- a/connectors/postgres-connector/pom.xml +++ b/connectors/postgres-connector/pom.xml @@ -16,7 +16,7 @@ 8 42.7.5 - 2.0.0-SNAPSHOT + 2.0.1-SNAPSHOT diff --git a/connectors/postgres-connector/src/main/java/io/tapdata/connector/postgres/PostgresConnector.java b/connectors/postgres-connector/src/main/java/io/tapdata/connector/postgres/PostgresConnector.java index ba1c17360..3bc17944c 100644 --- a/connectors/postgres-connector/src/main/java/io/tapdata/connector/postgres/PostgresConnector.java +++ b/connectors/postgres-connector/src/main/java/io/tapdata/connector/postgres/PostgresConnector.java @@ -11,6 +11,7 @@ import io.tapdata.connector.postgres.config.PostgresConfig; import io.tapdata.connector.postgres.ddl.PostgresDDLSqlGenerator; import io.tapdata.connector.postgres.dml.PostgresRecordWriter; +import io.tapdata.connector.postgres.dml.PostgresWriteRecorder; import io.tapdata.connector.postgres.error.PostgresErrorCode; import io.tapdata.connector.postgres.exception.PostgresExceptionCollector; import io.tapdata.connector.postgres.partition.PostgresPartitionContext; @@ -18,9 +19,11 @@ import io.tapdata.entity.TapConstraintException; import io.tapdata.entity.codec.TapCodecsRegistry; import io.tapdata.entity.error.CoreException; +import io.tapdata.entity.event.TapEvent; import io.tapdata.entity.event.ddl.constraint.TapCreateConstraintEvent; import io.tapdata.entity.event.ddl.index.TapCreateIndexEvent; import io.tapdata.entity.event.ddl.table.*; +import io.tapdata.entity.event.dml.TapInsertRecordEvent; import io.tapdata.entity.event.dml.TapRecordEvent; import io.tapdata.entity.schema.TapConstraint; import io.tapdata.entity.schema.TapField; @@ -241,6 +244,7 @@ public void registerCapabilities(ConnectorFunctions connectorFunctions, TapCodec connectorFunctions.supportQueryHashByAdvanceFilterFunction(this::queryTableHash); connectorFunctions.supportQueryPartitionTablesByParentName(this::discoverPartitionInfoByParentName); connectorFunctions.supportStreamReadMultiConnectionFunction(this::streamReadMultiConnection); + connectorFunctions.supportExportEventSqlFunction(this::exportEventSql); } @@ -956,4 +960,12 @@ protected void createConstraint(TapConnectorContext connectorContext, TapTable t } } } + + public String exportEventSql(TapConnectorContext connectorContext, TapEvent tapEvent, TapTable table) throws SQLException { + if(tapEvent instanceof TapInsertRecordEvent){ + PostgresWriteRecorder postgresWriter = new PostgresWriteRecorder(null, table, jdbcContext.getConfig().getSchema()); + return postgresWriter.getUpsertSql(((TapInsertRecordEvent)tapEvent).getAfter()); + } + return null; + } }