From b76ae04f3f1cd0cfcd4faf392116067b43a265ea Mon Sep 17 00:00:00 2001 From: nanxiang xia <162968176+XNX02@users.noreply.github.com> Date: Mon, 26 May 2025 16:53:50 +0800 Subject: [PATCH] Pipe: add mark-as-general-write-request parameter in pipe to force forwarding event (#15572) --- .../constant/PipeConnectorConstant.java | 6 +++++ .../connector/protocol/IoTDBConnector.java | 26 ++++++++++++++++--- 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java index f570319a08e7..e13a40a2c740 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java @@ -250,6 +250,12 @@ public class PipeConnectorConstant { public static final String SINK_MARK_AS_PIPE_REQUEST_KEY = "sink.mark-as-pipe-request"; public static final boolean CONNECTOR_MARK_AS_PIPE_REQUEST_DEFAULT_VALUE = true; + public static final String CONNECTOR_MARK_AS_GENERAL_WRITE_REQUEST_KEY = + "connector.mark-as-general-write-request"; + public static final String SINK_MARK_AS_GENERAL_WRITE_REQUEST_KEY = + "sink.mark-as-general-write-request"; + public static final boolean CONNECTOR_MARK_AS_GENERAL_WRITE_REQUEST_DEFAULT_VALUE = false; + public static final String CONNECTOR_SKIP_IF_KEY = "connector.skipif"; public static final String SINK_SKIP_IF_KEY = "sink.skipif"; public static final String CONNECTOR_IOTDB_SKIP_IF_NO_PRIVILEGES = "no-privileges"; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java index ea450095a841..561b61b2391c 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java @@ -101,6 +101,8 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_TSFILE_STRATEGY_SYNC_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_TSFILE_VALIDATION_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_TSFILE_VALIDATION_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_MARK_AS_GENERAL_WRITE_REQUEST_DEFAULT_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_MARK_AS_GENERAL_WRITE_REQUEST_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_MARK_AS_PIPE_REQUEST_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_MARK_AS_PIPE_REQUEST_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_RATE_LIMIT_DEFAULT_VALUE; @@ -130,6 +132,7 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LOAD_BALANCE_STRATEGY_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LOAD_TSFILE_STRATEGY_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LOAD_TSFILE_VALIDATION_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_MARK_AS_GENERAL_WRITE_REQUEST_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_MARK_AS_PIPE_REQUEST_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_RATE_LIMIT_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_SKIP_IF_KEY; @@ -225,6 +228,13 @@ public void validate(final PipeParameterValidator validator) throws Exception { Arrays.asList(CONNECTOR_IOTDB_BATCH_DELAY_KEY, SINK_IOTDB_BATCH_DELAY_KEY), false); + // Check coexistence of mark-as-pipe-request and mark-as-general-write-request + validator.validateSynonymAttributes( + Arrays.asList(CONNECTOR_MARK_AS_PIPE_REQUEST_KEY, SINK_MARK_AS_PIPE_REQUEST_KEY), + Arrays.asList( + CONNECTOR_MARK_AS_GENERAL_WRITE_REQUEST_KEY, SINK_MARK_AS_GENERAL_WRITE_REQUEST_KEY), + false); + username = parameters.getStringOrDefault( Arrays.asList( @@ -379,10 +389,20 @@ public void customize( .equals(CONNECTOR_FORMAT_TS_FILE_VALUE); LOGGER.info("IoTDBConnector isTabletBatchModeEnabled: {}", isTabletBatchModeEnabled); - shouldMarkAsPipeRequest = + final boolean shouldMarkAsGeneralWriteRequest = parameters.getBooleanOrDefault( - Arrays.asList(CONNECTOR_MARK_AS_PIPE_REQUEST_KEY, SINK_MARK_AS_PIPE_REQUEST_KEY), - CONNECTOR_MARK_AS_PIPE_REQUEST_DEFAULT_VALUE); + Arrays.asList( + CONNECTOR_MARK_AS_GENERAL_WRITE_REQUEST_KEY, + SINK_MARK_AS_GENERAL_WRITE_REQUEST_KEY), + CONNECTOR_MARK_AS_GENERAL_WRITE_REQUEST_DEFAULT_VALUE); + if (shouldMarkAsGeneralWriteRequest) { + shouldMarkAsPipeRequest = false; + } else { + shouldMarkAsPipeRequest = + parameters.getBooleanOrDefault( + Arrays.asList(CONNECTOR_MARK_AS_PIPE_REQUEST_KEY, SINK_MARK_AS_PIPE_REQUEST_KEY), + CONNECTOR_MARK_AS_PIPE_REQUEST_DEFAULT_VALUE); + } LOGGER.info("IoTDBConnector shouldMarkAsPipeRequest: {}", shouldMarkAsPipeRequest); final String connectorSkipIfValue =