diff --git a/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/dml/PostgresWriteRecorder.java b/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/dml/PostgresWriteRecorder.java index efd504947..81415689e 100644 --- a/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/dml/PostgresWriteRecorder.java +++ b/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/dml/PostgresWriteRecorder.java @@ -2,6 +2,7 @@ import io.netty.buffer.ByteBufInputStream; import io.tapdata.common.dml.NormalWriteRecorder; +import io.tapdata.entity.event.dml.TapDeleteRecordEvent; import io.tapdata.entity.event.dml.TapRecordEvent; import io.tapdata.entity.event.dml.TapUpdateRecordEvent; import io.tapdata.entity.schema.TapTable; @@ -196,7 +197,7 @@ protected Object filterValue(Object value, String dataType) throws SQLException public void addAndCheckCommit(TapRecordEvent recordEvent, WriteListResult listResult) { batchCacheSize++; - if (updatePolicy == LOG_ON_NONEXISTS && recordEvent instanceof TapUpdateRecordEvent) { + if (updatePolicy == LOG_ON_NONEXISTS && recordEvent instanceof TapUpdateRecordEvent || deletePolicy == LOG_ON_NONEXISTS && recordEvent instanceof TapDeleteRecordEvent) { batchCache.add(recordEvent); } } diff --git a/connectors-common/sql-core/src/main/java/io/tapdata/common/dml/NormalRecordWriter.java b/connectors-common/sql-core/src/main/java/io/tapdata/common/dml/NormalRecordWriter.java index a703f1224..527ede9b5 100644 --- a/connectors-common/sql-core/src/main/java/io/tapdata/common/dml/NormalRecordWriter.java +++ b/connectors-common/sql-core/src/main/java/io/tapdata/common/dml/NormalRecordWriter.java @@ -32,6 +32,7 @@ public class NormalRecordWriter { protected NormalWriteRecorder updateRecorder; protected String updatePolicy = ConnectionOptions.DML_UPDATE_POLICY_IGNORE_ON_NON_EXISTS; protected NormalWriteRecorder deleteRecorder; + protected String deletePolicy = ConnectionOptions.DML_DELETE_POLICY_IGNORE_ON_NON_EXISTS; protected String version; protected Connection connection; protected final TapTable tapTable; @@ -79,6 +80,7 @@ public void write(List tapRecordEvents, Consumer listResult) throws SQLE while (iterator.hasNext()) { TapRecordEvent event = iterator.next(); if (0 >= writeResults[index++]) { - tapLogger.info("update record ignored: {}", event); + tapLogger.warn("update record ignored: {}", event); + } + } + } + if (LOG_ON_NONEXISTS == deletePolicy) { + Iterator iterator = batchCache.iterator(); + int index = 0; + while (iterator.hasNext()) { + TapRecordEvent event = iterator.next(); + if (0 >= writeResults[index++]) { + tapLogger.warn("delete record ignored: {}", event); } } } @@ -155,7 +167,7 @@ public void executeBatch(WriteListResult listResult) throws SQLE //commit when cacheSize >= 1000 public void addAndCheckCommit(TapRecordEvent recordEvent, WriteListResult listResult) throws SQLException { batchCacheSize++; - if (updatePolicy == LOG_ON_NONEXISTS && recordEvent instanceof TapUpdateRecordEvent) { + if (updatePolicy == LOG_ON_NONEXISTS && recordEvent instanceof TapUpdateRecordEvent || deletePolicy == LOG_ON_NONEXISTS && recordEvent instanceof TapDeleteRecordEvent) { batchCache.add(recordEvent); } if (batchCacheSize >= 1000) { @@ -189,6 +201,10 @@ public void setUpdatePolicy(String updatePolicy) { this.updatePolicy = WritePolicyEnum.valueOf(updatePolicy.toUpperCase()); } + public void setDeletePolicy(String deletePolicy) { + this.deletePolicy = WritePolicyEnum.valueOf(deletePolicy.toUpperCase()); + } + public void setTapLogger(Log tapLogger) { this.tapLogger = tapLogger; } diff --git a/connectors/postgres-connector/src/main/java/io/tapdata/connector/postgres/PostgresConnector.java b/connectors/postgres-connector/src/main/java/io/tapdata/connector/postgres/PostgresConnector.java index 6198e6c1d..4b05c2b58 100644 --- a/connectors/postgres-connector/src/main/java/io/tapdata/connector/postgres/PostgresConnector.java +++ b/connectors/postgres-connector/src/main/java/io/tapdata/connector/postgres/PostgresConnector.java @@ -444,6 +444,10 @@ protected void writeRecord(TapConnectorContext connectorContext, List