From 39d94f2f90573fe46d09c16cc7e3bb839e4818a5 Mon Sep 17 00:00:00 2001 From: hardy <2545433047@qq.com> Date: Wed, 4 Jun 2025 11:21:42 +0800 Subject: [PATCH 1/3] feat:TAP-5816 Support export repair sql --- connectors-common/pom.xml | 6 +++--- connectors-common/postgres-core/pom.xml | 2 +- .../postgres/dml/PostgresWriteRecorder.java | 8 ++++++++ .../tapdata/common/dml/NormalWriteRecorder.java | 15 +++++++++++++++ connectors/postgres-connector/pom.xml | 2 +- .../connector/postgres/PostgresConnector.java | 12 ++++++++++++ 6 files changed, 40 insertions(+), 5 deletions(-) diff --git a/connectors-common/pom.xml b/connectors-common/pom.xml index 7bc358079..03c451ac2 100644 --- a/connectors-common/pom.xml +++ b/connectors-common/pom.xml @@ -27,9 +27,9 @@ ${project.artifactId}-v${project.version} 8 - 2.0-SNAPSHOT - 2.0.0-SNAPSHOT - 2.0.0-SNAPSHOT + 2.1-SNAPSHOT + 2.0.1-SNAPSHOT + 2.0.1-SNAPSHOT 1.0-SNAPSHOT 5.8.1 1.8.1 diff --git a/connectors-common/postgres-core/pom.xml b/connectors-common/postgres-core/pom.xml index 1df0eba47..408e8b4aa 100644 --- a/connectors-common/postgres-core/pom.xml +++ b/connectors-common/postgres-core/pom.xml @@ -21,7 +21,7 @@ 1.0-SNAPSHOT 1.5.4.Final 1.0-SNAPSHOT - 2.0.0-SNAPSHOT + 2.0.1-SNAPSHOT diff --git a/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/dml/PostgresWriteRecorder.java b/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/dml/PostgresWriteRecorder.java index 81415689e..91a576c98 100644 --- a/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/dml/PostgresWriteRecorder.java +++ b/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/dml/PostgresWriteRecorder.java @@ -255,6 +255,14 @@ protected void upsert(Map after, WriteListResult } } + @Override + public String getUpsertSql(Map after) throws SQLException { + upsert(after,null); + String sql = preparedStatement.toString().split("wrapping")[1]; + preparedStatement.clearParameters(); + return sql; + } + protected String getUpsertSql() { return "INSERT INTO " + getSchemaAndTable() + " (" + allColumn.stream().map(this::quoteAndEscape).collect(Collectors.joining(", ")) + ") " + diff --git a/connectors-common/sql-core/src/main/java/io/tapdata/common/dml/NormalWriteRecorder.java b/connectors-common/sql-core/src/main/java/io/tapdata/common/dml/NormalWriteRecorder.java index 78e86da41..d8bdad837 100644 --- a/connectors-common/sql-core/src/main/java/io/tapdata/common/dml/NormalWriteRecorder.java +++ b/connectors-common/sql-core/src/main/java/io/tapdata/common/dml/NormalWriteRecorder.java @@ -297,6 +297,10 @@ protected void upsert(Map after, WriteListResult throw new UnsupportedOperationException("upsert is not supported"); } + public String getUpsertSql(Map after) throws SQLException { + throw new UnsupportedOperationException("upsert is not supported"); + } + //插入唯一键冲突时忽略 protected void insertIgnore(Map after, WriteListResult listResult) throws SQLException { throw new UnsupportedOperationException("insertIgnore is not supported"); @@ -512,4 +516,15 @@ protected String object2String(Object obj) { } return result; } + + protected String byteArrayToHexString(byte[] bytes) { + if (bytes == null || bytes.length == 0) { + return ""; + } + StringBuilder sb = new StringBuilder(); + for (byte b : bytes) { + sb.append(String.format("%02X", b)); + } + return "0x"+ sb; + } } diff --git a/connectors/postgres-connector/pom.xml b/connectors/postgres-connector/pom.xml index 05a1a97f1..0deb5bae1 100644 --- a/connectors/postgres-connector/pom.xml +++ b/connectors/postgres-connector/pom.xml @@ -16,7 +16,7 @@ 8 42.7.5 - 2.0.0-SNAPSHOT + 2.0.1-SNAPSHOT diff --git a/connectors/postgres-connector/src/main/java/io/tapdata/connector/postgres/PostgresConnector.java b/connectors/postgres-connector/src/main/java/io/tapdata/connector/postgres/PostgresConnector.java index 03a9ff2ab..368115c5d 100644 --- a/connectors/postgres-connector/src/main/java/io/tapdata/connector/postgres/PostgresConnector.java +++ b/connectors/postgres-connector/src/main/java/io/tapdata/connector/postgres/PostgresConnector.java @@ -10,6 +10,7 @@ import io.tapdata.connector.postgres.config.PostgresConfig; import io.tapdata.connector.postgres.ddl.PostgresDDLSqlGenerator; import io.tapdata.connector.postgres.dml.PostgresRecordWriter; +import io.tapdata.connector.postgres.dml.PostgresWriteRecorder; import io.tapdata.connector.postgres.error.PostgresErrorCode; import io.tapdata.connector.postgres.exception.PostgresExceptionCollector; import io.tapdata.connector.postgres.partition.PostgresPartitionContext; @@ -17,9 +18,11 @@ import io.tapdata.entity.TapConstraintException; import io.tapdata.entity.codec.TapCodecsRegistry; import io.tapdata.entity.error.CoreException; +import io.tapdata.entity.event.TapEvent; import io.tapdata.entity.event.ddl.constraint.TapCreateConstraintEvent; import io.tapdata.entity.event.ddl.index.TapCreateIndexEvent; import io.tapdata.entity.event.ddl.table.*; +import io.tapdata.entity.event.dml.TapInsertRecordEvent; import io.tapdata.entity.event.dml.TapRecordEvent; import io.tapdata.entity.schema.TapConstraint; import io.tapdata.entity.schema.TapField; @@ -240,6 +243,7 @@ public void registerCapabilities(ConnectorFunctions connectorFunctions, TapCodec connectorFunctions.supportQueryHashByAdvanceFilterFunction(this::queryTableHash); connectorFunctions.supportQueryPartitionTablesByParentName(this::discoverPartitionInfoByParentName); connectorFunctions.supportStreamReadMultiConnectionFunction(this::streamReadMultiConnection); + connectorFunctions.supportExportEventSqlFunction(this::exportEventSql); } @@ -944,4 +948,12 @@ protected void createConstraint(TapConnectorContext connectorContext, TapTable t } } } + + public String exportEventSql(TapConnectorContext connectorContext, TapEvent tapEvent, TapTable table) throws SQLException { + if(tapEvent instanceof TapInsertRecordEvent){ + PostgresWriteRecorder postgresWriter = new PostgresWriteRecorder(postgresJdbcContext.getConnection(), table, jdbcContext.getConfig().getSchema()); + return postgresWriter.getUpsertSql(((TapInsertRecordEvent)tapEvent).getAfter()); + } + return null; + } } From 3de21328644b5b6faaf75a2628acf43581ad053b Mon Sep 17 00:00:00 2001 From: hardy <2545433047@qq.com> Date: Wed, 4 Jun 2025 18:02:09 +0800 Subject: [PATCH 2/3] feat:TAP-5816 Support export repair sql --- .../postgres/dml/PostgresWriteRecorder.java | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/dml/PostgresWriteRecorder.java b/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/dml/PostgresWriteRecorder.java index 91a576c98..91f7331a7 100644 --- a/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/dml/PostgresWriteRecorder.java +++ b/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/dml/PostgresWriteRecorder.java @@ -257,10 +257,15 @@ protected void upsert(Map after, WriteListResult @Override public String getUpsertSql(Map after) throws SQLException { - upsert(after,null); - String sql = preparedStatement.toString().split("wrapping")[1]; - preparedStatement.clearParameters(); - return sql; + try{ + upsert(after,null); + String sql = preparedStatement.toString().split("wrapping")[1]; + preparedStatement.clearParameters(); + return sql; + }finally { + connection.close(); + } + } protected String getUpsertSql() { From 9b6fdb88e43193b1b4d5ea35e8e37ccbcf6cdc41 Mon Sep 17 00:00:00 2001 From: hardy <2545433047@qq.com> Date: Mon, 9 Jun 2025 11:27:40 +0800 Subject: [PATCH 3/3] fix:pg exportEventSql --- .../postgres/dml/PostgresWriteRecorder.java | 40 +++++++++++++++---- .../connector/postgres/PostgresConnector.java | 2 +- 2 files changed, 34 insertions(+), 8 deletions(-) diff --git a/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/dml/PostgresWriteRecorder.java b/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/dml/PostgresWriteRecorder.java index 91f7331a7..c52c7fd00 100644 --- a/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/dml/PostgresWriteRecorder.java +++ b/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/dml/PostgresWriteRecorder.java @@ -257,17 +257,43 @@ protected void upsert(Map after, WriteListResult @Override public String getUpsertSql(Map after) throws SQLException { - try{ - upsert(after,null); - String sql = preparedStatement.toString().split("wrapping")[1]; - preparedStatement.clearParameters(); - return sql; - }finally { - connection.close(); + String sql = getUpsertSql(); + for (String key : allColumn) { + sql = sql.replaceFirst("\\?", formatValueForSql(after.get(key), columnTypeMap.get(key))); + } + for (String key : updatedColumn) { + sql = sql.replaceFirst("\\?", formatValueForSql(after.get(key), columnTypeMap.get(key))); + } + return sql; + } + + private String formatValueForSql(Object value, String dataType) { + if (dataType.startsWith("bit")) { + if (value instanceof Boolean) { + value = ((Boolean) value) ? "1" : "0"; + }else{ + value = String.valueOf(value); + } + } + if(value instanceof byte[]){ + return "'\\\\x" + StringKit.convertToHexString((byte[]) value) + "'"; } + if (dataType.endsWith("with time zone") && value instanceof LocalDateTime) { + Timestamp timestamp = Timestamp.valueOf(((LocalDateTime) value)); + timestamp.setTime(timestamp.getTime() + TimeZone.getDefault().getRawOffset()); + value = timestamp; + } + if (value instanceof Boolean && dataType.contains("int")) { + value = Boolean.TRUE.equals(value) ? 1 : 0; + } + if (value instanceof Boolean && dataType.contains("boolean")) { + value = Boolean.TRUE.equals(value) ? "true" : "false"; + } + return object2String(value); } + protected String getUpsertSql() { return "INSERT INTO " + getSchemaAndTable() + " (" + allColumn.stream().map(this::quoteAndEscape).collect(Collectors.joining(", ")) + ") " + diff --git a/connectors/postgres-connector/src/main/java/io/tapdata/connector/postgres/PostgresConnector.java b/connectors/postgres-connector/src/main/java/io/tapdata/connector/postgres/PostgresConnector.java index bede1d4a9..3bc17944c 100644 --- a/connectors/postgres-connector/src/main/java/io/tapdata/connector/postgres/PostgresConnector.java +++ b/connectors/postgres-connector/src/main/java/io/tapdata/connector/postgres/PostgresConnector.java @@ -963,7 +963,7 @@ protected void createConstraint(TapConnectorContext connectorContext, TapTable t public String exportEventSql(TapConnectorContext connectorContext, TapEvent tapEvent, TapTable table) throws SQLException { if(tapEvent instanceof TapInsertRecordEvent){ - PostgresWriteRecorder postgresWriter = new PostgresWriteRecorder(postgresJdbcContext.getConnection(), table, jdbcContext.getConfig().getSchema()); + PostgresWriteRecorder postgresWriter = new PostgresWriteRecorder(null, table, jdbcContext.getConfig().getSchema()); return postgresWriter.getUpsertSql(((TapInsertRecordEvent)tapEvent).getAfter()); } return null;