diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBObjectTypeQueryIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBObjectTypeQueryIT.java
new file mode 100644
index 0000000000000..286bf91a53c4d
--- /dev/null
+++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBObjectTypeQueryIT.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.relational.it.query.recent;
+
+import org.apache.iotdb.isession.ITableSession;
+import org.apache.iotdb.isession.SessionDataSet;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.TableClusterIT;
+import org.apache.iotdb.itbase.category.TableLocalStandaloneIT;
+import org.apache.iotdb.itbase.env.BaseEnv;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+
+import org.apache.tsfile.utils.Binary;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.sql.Connection;
+import java.sql.Statement;
+import java.time.LocalDate;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({TableLocalStandaloneIT.class, TableClusterIT.class})
+public class IoTDBObjectTypeQueryIT {
+
+ private static final String DATABASE_NAME = "test";
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ EnvFactory.getEnv().initClusterEnvironment();
+ try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT);
+ Statement statement = connection.createStatement()) {
+ statement.execute("CREATE DATABASE " + DATABASE_NAME);
+ statement.execute("USE " + DATABASE_NAME);
+ statement.execute(
+ "CREATE TABLE table1(device STRING TAG, s4 DATE FIELD, s5 TIMESTAMP FIELD, s6 BLOB FIELD, s7 STRING FIELD, s8 OBJECT FIELD, s9 OBJECT FIELD)");
+ for (int i = 1; i <= 10; i++) {
+ for (int j = 0; j < 10; j++) {
+ statement.execute(
+ String.format(
+ "insert into table1(time, device, s4, s5, s6, s7, s8) "
+ + "values(%d, '%s', '%s', %d, %s, '%s', %s)",
+ j,
+ "d" + i,
+ LocalDate.of(2024, 5, i % 31 + 1),
+ j,
+ "X'cafebabe'",
+ j,
+ "to_object(true, 0, X'cafebabe')"));
+ }
+ }
+ }
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ EnvFactory.getEnv().cleanClusterEnvironment();
+ }
+
+ @Test
+ public void testObjectLength() throws IoTDBConnectionException, StatementExecutionException {
+ try (ITableSession session = EnvFactory.getEnv().getTableSessionConnection()) {
+ session.executeNonQueryStatement("USE " + DATABASE_NAME);
+ SessionDataSet sessionDataSet =
+ session.executeQueryStatement("select length(s8) from table1 limit 1");
+ SessionDataSet.DataIterator iterator = sessionDataSet.iterator();
+ while (iterator.next()) {
+ long length = iterator.getLong(1);
+ Assert.assertEquals(4, length);
+ }
+ }
+ }
+
+ @Test
+ public void testReadObject() throws IoTDBConnectionException, StatementExecutionException {
+ try (ITableSession session = EnvFactory.getEnv().getTableSessionConnection()) {
+ session.executeNonQueryStatement("USE " + DATABASE_NAME);
+ SessionDataSet sessionDataSet =
+ session.executeQueryStatement("select read_object(s8) from table1 where device = 'd2'");
+ SessionDataSet.DataIterator iterator = sessionDataSet.iterator();
+ byte[] expected = new byte[] {(byte) 0xCA, (byte) 0xFE, (byte) 0xBA, (byte) 0xBE};
+ while (iterator.next()) {
+ Binary blob = iterator.getBlob(1);
+ Assert.assertArrayEquals(expected, blob.getValues());
+ }
+
+ sessionDataSet =
+ session.executeQueryStatement(
+ "select read_object(s8, 1) from table1 where device = 'd3'");
+ iterator = sessionDataSet.iterator();
+ expected = new byte[] {(byte) 0xFE, (byte) 0xBA, (byte) 0xBE};
+ while (iterator.next()) {
+ Binary blob = iterator.getBlob(1);
+ Assert.assertArrayEquals(expected, blob.getValues());
+ }
+
+ sessionDataSet =
+ session.executeQueryStatement(
+ "select read_object(s8, 1, 2) from table1 where device = 'd1'");
+ iterator = sessionDataSet.iterator();
+ expected = new byte[] {(byte) 0xFE, (byte) 0xBA};
+ while (iterator.next()) {
+ Binary blob = iterator.getBlob(1);
+ Assert.assertArrayEquals(expected, blob.getValues());
+ }
+
+ sessionDataSet =
+ session.executeQueryStatement(
+ "select read_object(s8, 1, 1000) from table1 where device = 'd1'");
+ iterator = sessionDataSet.iterator();
+ expected = new byte[] {(byte) 0xFE, (byte) 0xBA, (byte) 0xBE};
+ while (iterator.next()) {
+ Binary blob = iterator.getBlob(1);
+ Assert.assertArrayEquals(expected, blob.getValues());
+ }
+
+ sessionDataSet =
+ session.executeQueryStatement(
+ "select count(*) from table1 where device = 'd1' and s6 = read_object(s8)");
+ iterator = sessionDataSet.iterator();
+ while (iterator.next()) {
+ long count = iterator.getLong(1);
+ Assert.assertEquals(10, count);
+ }
+
+ // read_object are not pushed down. Read remote files
+ sessionDataSet =
+ session.executeQueryStatement(
+ "select read_object(t1_s8) from (select t1.s8 as t1_s8, t2.s8 as t2_s8 from table1 as t1 inner join table1 as t2 using(time))");
+ iterator = sessionDataSet.iterator();
+ expected = new byte[] {(byte) 0xCA, (byte) 0xFE, (byte) 0xBA, (byte) 0xBE};
+ while (iterator.next()) {
+ Binary blob = iterator.getBlob(1);
+ Assert.assertArrayEquals(expected, blob.getValues());
+ }
+ }
+ }
+
+ @Test
+ public void testFunctionAndClauses()
+ throws IoTDBConnectionException, StatementExecutionException {
+ try (ITableSession session = EnvFactory.getEnv().getTableSessionConnection()) {
+ session.executeNonQueryStatement("USE " + DATABASE_NAME);
+
+ SessionDataSet sessionDataSet =
+ session.executeQueryStatement(
+ "select length(s8) from table1 where device = 'd2' and s8 is not null limit 1");
+ SessionDataSet.DataIterator iterator = sessionDataSet.iterator();
+ while (iterator.next()) {
+ Assert.assertEquals(4, iterator.getLong(1));
+ }
+ sessionDataSet =
+ session.executeQueryStatement(
+ "select count(s8), first(s8), last(s8), first_by(s8, time), last_by(s8, time) from table1 where device = 'd1' and cast(s8 as string) = '(Object) 4 B' and try_cast(s8 as string) = '(Object) 4 B'");
+ iterator = sessionDataSet.iterator();
+ while (iterator.next()) {
+ Assert.assertEquals(10, iterator.getLong(1));
+ Assert.assertEquals("(Object) 4 B", iterator.getString(2));
+ Assert.assertEquals("(Object) 4 B", iterator.getString(3));
+ Assert.assertEquals("(Object) 4 B", iterator.getString(4));
+ Assert.assertEquals("(Object) 4 B", iterator.getString(5));
+ }
+
+ sessionDataSet = session.executeQueryStatement("select coalesce(s9, s8) from table1");
+ iterator = sessionDataSet.iterator();
+ while (iterator.next()) {
+ Assert.assertEquals("(Object) 4 B", iterator.getString(1));
+ }
+
+ // MATCH_RECOGNIZE
+ Assert.assertThrows(
+ StatementExecutionException.class,
+ () ->
+ session.executeNonQueryStatement(
+ "select m.cnt from table1 match_recognize (order by s8 measures RPR_LAST(time) as cnt one row per match pattern (B+) define B as B.s6 = prev(B.s6)) as m"));
+ Assert.assertThrows(
+ StatementExecutionException.class,
+ () ->
+ session.executeNonQueryStatement(
+ "select m.cnt from table1 match_recognize (partition by s8 measures RPR_LAST(time) as cnt one row per match pattern (B+) define B as B.s6 = prev(B.s6)) as m"));
+
+ sessionDataSet =
+ session.executeQueryStatement(
+ "select m.value from table1 match_recognize(partition by s6 measures prev(s8) as value one row per match pattern (B+) define B as B.s6=prev(B.s6)) as m");
+ iterator = sessionDataSet.iterator();
+ while (iterator.next()) {
+ Assert.assertEquals("(Object) 4 B", iterator.getString(1));
+ }
+
+ // WHERE
+ session.executeQueryStatement(
+ "select time, s8 from table1 where device = 'd10' and s8 is not null");
+ iterator = sessionDataSet.iterator();
+ while (iterator.next()) {
+ Assert.assertEquals("(Object) 4 B", iterator.getString(2));
+ }
+
+ // GROUP BY
+ Assert.assertThrows(
+ StatementExecutionException.class,
+ () -> session.executeNonQueryStatement("select count(*) from table1 group by s8"));
+
+ // ORDER BY
+ Assert.assertThrows(
+ StatementExecutionException.class,
+ () -> session.executeNonQueryStatement("select count(*) from table1 group by s8"));
+
+ // FILL
+ Assert.assertThrows(
+ StatementExecutionException.class,
+ () ->
+ session.executeNonQueryStatement(
+ "select time, s8 from table1 where device = 'd10' fill method linear"));
+ session.executeQueryStatement(
+ "select time, s8 from table1 where device = 'd10' fill method previous");
+ iterator = sessionDataSet.iterator();
+ while (iterator.next()) {
+ Assert.assertEquals("(Object) 4 B", iterator.getString(2));
+ }
+
+ // HAVING
+ session.executeQueryStatement(
+ "select device, count(s8) from table1 group by device having count(s8) > 0");
+ iterator = sessionDataSet.iterator();
+ while (iterator.next()) {
+ long count = iterator.getLong(2);
+ Assert.assertEquals(10, count);
+ }
+
+ // WINDOW
+ Assert.assertThrows(
+ StatementExecutionException.class,
+ () ->
+ session.executeNonQueryStatement(
+ "select *, nth_value(s8,2) over(partition by s8) from table1"));
+ Assert.assertThrows(
+ StatementExecutionException.class,
+ () ->
+ session.executeNonQueryStatement(
+ "select *, nth_value(s8,2) over(order by s8) from table1"));
+ session.executeNonQueryStatement(
+ "select *, nth_value(s8,2) over(partition by device) from table1");
+ session.executeNonQueryStatement(
+ "select *, lead(s8) over(partition by device order by time) from table1");
+ session.executeNonQueryStatement(
+ "select *, first_value(s8) over(partition by device) from table1");
+ session.executeNonQueryStatement(
+ "select *, last_value(s8) over(partition by device) from table1");
+ session.executeNonQueryStatement(
+ "select *, lag(s8) over(partition by device order by time) from table1");
+
+ // Table-value function
+ Assert.assertThrows(
+ StatementExecutionException.class,
+ () ->
+ session.executeNonQueryStatement(
+ "select * from session(data => table1 partition by s8, timecol => 'time', gap => 1ms)"));
+ Assert.assertThrows(
+ StatementExecutionException.class,
+ () ->
+ session.executeNonQueryStatement(
+ "select * from session(data => table1 order by s8, timecol => 'time', gap => 1ms)"));
+ sessionDataSet =
+ session.executeQueryStatement(
+ "select * from hop(data => table1, timecol => 'time', slide => 1ms, size => 2ms)");
+ iterator = sessionDataSet.iterator();
+ while (iterator.next()) {
+ String str = iterator.getString("s8");
+ Assert.assertEquals("(Object) 4 B", str);
+ }
+ }
+ }
+}
diff --git a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/access/Record.java b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/access/Record.java
index 8c6e2a7f3578c..4267f41dc8d8d 100644
--- a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/access/Record.java
+++ b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/access/Record.java
@@ -83,7 +83,7 @@ public interface Record {
* Returns the Binary value at the specified column in this row.
*
*
Users need to ensure that the data type of the specified column is {@code TSDataType.TEXT},
- * {@code TSDataType.STRING} or {@code TSDataType.BLOB}.
+ * {@code TSDataType.STRING} or {@code TSDataType.BLOB} or {@code TSDataType.OBJECT}.
*
* @param columnIndex index of the specified column
* @return the Binary value at the specified column in this row
@@ -94,7 +94,7 @@ public interface Record {
* Returns the String value at the specified column in this row.
*
*
Users need to ensure that the data type of the specified column is {@code TSDataType.TEXT}
- * or {@code TSDataType.STRING}.
+ * or {@code TSDataType.STRING} or {@code TSDataType.OBJECT}.
*
* @param columnIndex index of the specified column
* @return the String value at the specified column in this row
@@ -113,6 +113,37 @@ public interface Record {
Object getObject(int columnIndex);
+ /**
+ * Returns the Binary representation of an object stored at the specified column in this row.
+ *
+ *
Users need to ensure that the data type of the specified column is {@code
+ * TSDataType.OBJECT}.
+ *
+ *
This method returns the entire binary data of the object and may require considerable memory
+ * if the stored object is large.
+ *
+ * @param columnIndex index of the specified column
+ * @return the Binary content of the object at the specified column
+ */
+ Binary readObject(int columnIndex);
+
+ /**
+ * Returns a partial Binary segment of an object stored at the specified column in this row.
+ *
+ *
Users need to ensure that the data type of the specified column is {@code
+ * TSDataType.OBJECT}.
+ *
+ *
This method enables reading a subset of the stored object without materializing the entire
+ * binary data in memory, which is useful for large objects and streaming access patterns.
+ *
+ * @param columnIndex index of the specified column
+ * @param offset byte offset of the subsection read
+ * @param length number of bytes to read starting from the offset. If length < 0, read the entire
+ * binary data from offset.
+ * @return the Binary content of the object segment at the specified column
+ */
+ Binary readObject(int columnIndex, long offset, int length);
+
/**
* Returns the actual data type of the value at the specified column in this row.
*
diff --git a/iotdb-core/confignode/src/test/resources/confignode1conf/iotdb-system.properties b/iotdb-core/confignode/src/test/resources/confignode1conf/iotdb-system.properties
index b396e373f8699..082ee7bf48811 100644
--- a/iotdb-core/confignode/src/test/resources/confignode1conf/iotdb-system.properties
+++ b/iotdb-core/confignode/src/test/resources/confignode1conf/iotdb-system.properties
@@ -34,7 +34,7 @@ timestamp_precision=ms
data_region_consensus_protocol_class=org.apache.iotdb.consensus.iot.IoTConsensus
schema_region_consensus_protocol_class=org.apache.iotdb.consensus.ratis.RatisConsensus
schema_replication_factor=3
-data_replication_factor=3
+data_replication_factor=1
udf_lib_dir=target/confignode1/ext/udf
trigger_lib_dir=target/confignode1/ext/trigger
pipe_lib_dir=target/confignode1/ext/pipe
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index dcc62fcd3c8f1..8f1b97b71964a 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -201,6 +201,7 @@
import org.apache.iotdb.db.trigger.executor.TriggerExecutor;
import org.apache.iotdb.db.trigger.executor.TriggerFireResult;
import org.apache.iotdb.db.trigger.service.TriggerManagementService;
+import org.apache.iotdb.db.utils.ObjectTypeUtils;
import org.apache.iotdb.db.utils.SetThreadName;
import org.apache.iotdb.metrics.type.AutoGauge;
import org.apache.iotdb.metrics.utils.MetricLevel;
@@ -279,6 +280,7 @@
import org.apache.iotdb.mpp.rpc.thrift.TPushTopicMetaReq;
import org.apache.iotdb.mpp.rpc.thrift.TPushTopicMetaResp;
import org.apache.iotdb.mpp.rpc.thrift.TPushTopicMetaRespExceptionMessage;
+import org.apache.iotdb.mpp.rpc.thrift.TReadObjectReq;
import org.apache.iotdb.mpp.rpc.thrift.TRegionLeaderChangeReq;
import org.apache.iotdb.mpp.rpc.thrift.TRegionLeaderChangeResp;
import org.apache.iotdb.mpp.rpc.thrift.TRegionMigrateResult;
@@ -3051,6 +3053,12 @@ public TSStatus writeAuditLog(TAuditLogReq req) {
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
}
+ @Override
+ public ByteBuffer readObject(TReadObjectReq req) {
+ return ObjectTypeUtils.readObjectContent(
+ req.getRelativePath(), req.getOffset(), req.getSize(), false);
+ }
+
public void handleClientExit() {
// Do nothing
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/Slice.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/Slice.java
index b7ff6f3e0fdd5..8faea51b829a4 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/Slice.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/Slice.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.queryengine.execution.operator.process.function.partition;
+import org.apache.iotdb.db.utils.ObjectTypeUtils;
import org.apache.iotdb.udf.api.relational.access.Record;
import org.apache.iotdb.udf.api.type.Type;
@@ -186,6 +187,20 @@ public Object getObject(int columnIndex) {
return originalColumns[columnIndex].getObject(offset);
}
+ @Override
+ public Binary readObject(int columnIndex, long offset, int length) {
+ if (getDataType(columnIndex) == Type.OBJECT) {
+ throw new UnsupportedOperationException("current column is not object column");
+ }
+ Binary binary = getBinary(columnIndex);
+ return new Binary(ObjectTypeUtils.readObjectContent(binary, offset, length, true).array());
+ }
+
+ @Override
+ public Binary readObject(int columnIndex) {
+ return readObject(columnIndex, 0L, -1);
+ }
+
@Override
public Type getDataType(int columnIndex) {
return dataTypes.get(columnIndex);
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/rowpattern/expression/PatternExpressionComputation.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/rowpattern/expression/PatternExpressionComputation.java
index 6b5032627b219..cb8cbdccbb695 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/rowpattern/expression/PatternExpressionComputation.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/rowpattern/expression/PatternExpressionComputation.java
@@ -35,6 +35,7 @@
import org.apache.tsfile.read.common.type.BooleanType;
import org.apache.tsfile.read.common.type.DoubleType;
import org.apache.tsfile.read.common.type.FloatType;
+import org.apache.tsfile.read.common.type.ObjectType;
import org.apache.tsfile.read.common.type.Type;
import java.util.ArrayList;
@@ -161,7 +162,9 @@ private Object getValueFromPartition(
return partition.getFloat(channel, position);
} else if (type instanceof DoubleType) {
return partition.getDouble(channel, position);
- } else if (type instanceof AbstractVarcharType || type instanceof BlobType) {
+ } else if (type instanceof AbstractVarcharType
+ || type instanceof BlobType
+ || type instanceof ObjectType) {
return partition.getBinary(channel, position);
} else {
throw new SemanticException("Unsupported type: " + type.getClass().getSimpleName());
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/MaskedRecordIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/MaskedRecordIterator.java
index 5237fcfd12e6c..48095927eda56 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/MaskedRecordIterator.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/MaskedRecordIterator.java
@@ -19,8 +19,6 @@
package org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation;
-import org.apache.iotdb.commons.udf.access.RecordIterator;
-
import org.apache.tsfile.block.column.Column;
import org.apache.tsfile.read.common.type.Type;
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/access/RecordIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/RecordIterator.java
similarity index 88%
rename from iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/access/RecordIterator.java
rename to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/RecordIterator.java
index 6f5813955dd87..34bc9d98e9c1b 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/access/RecordIterator.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/RecordIterator.java
@@ -17,9 +17,10 @@
* under the License.
*/
-package org.apache.iotdb.commons.udf.access;
+package org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation;
import org.apache.iotdb.commons.udf.utils.UDFDataTypeTransformer;
+import org.apache.iotdb.db.utils.ObjectTypeUtils;
import org.apache.iotdb.udf.api.relational.access.Record;
import org.apache.iotdb.udf.api.type.Type;
@@ -138,6 +139,20 @@ public Object getObject(int columnIndex) {
return childrenColumns.get(columnIndex).getObject(index);
}
+ @Override
+ public Binary readObject(int columnIndex, long offset, int length) {
+ if (getDataType(columnIndex) == Type.OBJECT) {
+ throw new UnsupportedOperationException("current column is not object column");
+ }
+ Binary binary = getBinary(columnIndex);
+ return new Binary(ObjectTypeUtils.readObjectContent(binary, offset, length, true).array());
+ }
+
+ @Override
+ public Binary readObject(int columnIndex) {
+ return readObject(columnIndex, 0L, -1);
+ }
+
@Override
public Type getDataType(int columnIndex) {
return UDFDataTypeTransformer.transformReadTypeToUDFDataType(dataTypes.get(columnIndex));
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/UserDefinedAggregateFunctionAccumulator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/UserDefinedAggregateFunctionAccumulator.java
index 64c0896b3a180..70e1c0c5d4432 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/UserDefinedAggregateFunctionAccumulator.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/UserDefinedAggregateFunctionAccumulator.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation;
-import org.apache.iotdb.commons.udf.access.RecordIterator;
import org.apache.iotdb.udf.api.State;
import org.apache.iotdb.udf.api.customizer.analysis.AggregateFunctionAnalysis;
import org.apache.iotdb.udf.api.relational.AggregateFunction;
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedUserDefinedAggregateAccumulator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedUserDefinedAggregateAccumulator.java
index b3d24e923aed2..9ac6b48db7cc4 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedUserDefinedAggregateAccumulator.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedUserDefinedAggregateAccumulator.java
@@ -19,9 +19,9 @@
package org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped;
-import org.apache.iotdb.commons.udf.access.RecordIterator;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.AggregationMask;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.MaskedRecordIterator;
+import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.RecordIterator;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.array.ObjectBigArray;
import org.apache.iotdb.udf.api.State;
import org.apache.iotdb.udf.api.relational.AggregateFunction;
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/relational/ColumnTransformerBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/relational/ColumnTransformerBuilder.java
index 0378c80f042be..19f81b78f6839 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/relational/ColumnTransformerBuilder.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/relational/ColumnTransformerBuilder.java
@@ -163,6 +163,7 @@
import org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.Log10ColumnTransformer;
import org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.LongToBytesColumnTransformer;
import org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.LowerColumnTransformer;
+import org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.ObjectLengthColumnTransformer;
import org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.RTrim2ColumnTransformer;
import org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.RTrimColumnTransformer;
import org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.RadiansColumnTransformer;
@@ -243,7 +244,6 @@
import static org.apache.tsfile.read.common.type.FloatType.FLOAT;
import static org.apache.tsfile.read.common.type.IntType.INT32;
import static org.apache.tsfile.read.common.type.LongType.INT64;
-import static org.apache.tsfile.read.common.type.ObjectType.OBJECT;
import static org.apache.tsfile.read.common.type.StringType.STRING;
public class ColumnTransformerBuilder
@@ -780,7 +780,9 @@ private ColumnTransformer getFunctionColumnTransformer(
} else if (TableBuiltinScalarFunction.LENGTH.getFunctionName().equalsIgnoreCase(functionName)) {
ColumnTransformer first = this.process(children.get(0), context);
if (children.size() == 1) {
- return new LengthColumnTransformer(INT32, first);
+ return context.inputDataTypes.get(0) == TSDataType.OBJECT
+ ? new ObjectLengthColumnTransformer(INT64, first)
+ : new LengthColumnTransformer(INT64, first);
}
} else if (TableBuiltinScalarFunction.UPPER.getFunctionName().equalsIgnoreCase(functionName)) {
ColumnTransformer first = this.process(children.get(0), context);
@@ -1456,10 +1458,10 @@ private ColumnTransformer getFunctionColumnTransformer(
.equalsIgnoreCase(functionName)) {
ColumnTransformer first = this.process(children.get(0), context);
if (children.size() == 1) {
- return new ReadObjectColumnTransformer(OBJECT, first, context.fragmentInstanceContext);
+ return new ReadObjectColumnTransformer(BLOB, first, context.fragmentInstanceContext);
} else if (children.size() == 2) {
return new ReadObjectColumnTransformer(
- OBJECT,
+ BLOB,
((LongLiteral) children.get(1)).getParsedValue(),
first,
context.fragmentInstanceContext);
@@ -1468,7 +1470,7 @@ private ColumnTransformer getFunctionColumnTransformer(
long length = ((LongLiteral) children.get(2)).getParsedValue();
checkArgument(offset >= 0 && length >= 0);
return new ReadObjectColumnTransformer(
- OBJECT,
+ BLOB,
((LongLiteral) children.get(1)).getParsedValue(),
((LongLiteral) children.get(2)).getParsedValue(),
first,
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java
index 8c54fd640f88c..2274762341b5d 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java
@@ -308,6 +308,10 @@ public boolean updateRegionCache(final TRegionRouteReq req) {
return partitionCache.updateGroupIdToReplicaSetMap(req.getTimestamp(), req.getRegionRouteMap());
}
+ public List getRegionReplicaSet(List consensusGroupIds) {
+ return partitionCache.getRegionReplicaSet(consensusGroupIds);
+ }
+
@Override
public void invalidAllCache() {
partitionCache.invalidAllCache();
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
index 581641591837b..44be3e69afb7c 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
@@ -1435,6 +1435,7 @@ public static IFill[] getPreviousFill(
case TEXT:
case STRING:
case BLOB:
+ case OBJECT:
previousFill[i] =
filter == null
? new BinaryPreviousFill()
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
index d7d4a951cb860..c29ceaff04f82 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
@@ -268,6 +268,7 @@
import org.apache.tsfile.read.common.type.BinaryType;
import org.apache.tsfile.read.common.type.BlobType;
import org.apache.tsfile.read.common.type.BooleanType;
+import org.apache.tsfile.read.common.type.ObjectType;
import org.apache.tsfile.read.common.type.Type;
import org.apache.tsfile.read.common.type.TypeFactory;
import org.apache.tsfile.read.filter.basic.Filter;
@@ -3825,6 +3826,7 @@ private boolean[] checkStatisticAndScanOrder(
case MAX:
case MIN:
if (BlobType.BLOB.equals(argumentType)
+ || ObjectType.OBJECT.equals(argumentType)
|| BinaryType.TEXT.equals(argumentType)
|| BooleanType.BOOLEAN.equals(argumentType)) {
canUseStatistic = false;
@@ -3840,8 +3842,8 @@ private boolean[] checkStatisticAndScanOrder(
descendingCount++;
}
- // first/last/first_by/last_by aggregation with BLOB type can not use statistics
- if (BlobType.BLOB.equals(argumentType)) {
+ // first/last/first_by/last_by aggregation with BLOB or OBJECT type can not use statistics
+ if (BlobType.BLOB.equals(argumentType) || ObjectType.OBJECT.equals(argumentType)) {
canUseStatistic = false;
break;
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java
index 83f6bbec63e09..2297ddcebdd01 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java
@@ -207,6 +207,9 @@ private void handleObjectValue(
for (int j = 0; j < insertRowNode.getDataTypes().length; j++) {
if (insertRowNode.getDataTypes()[j] == TSDataType.OBJECT) {
Object[] values = insertRowNode.getValues();
+ if (values[j] == null) {
+ continue;
+ }
byte[] binary = ((Binary) values[j]).getValues();
ByteBuffer buffer = ByteBuffer.wrap(binary);
boolean isEoF = buffer.get() == 1;
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java
index cc9af5c9b7249..b6242ec000fc6 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java
@@ -265,13 +265,14 @@ && isIntegerNumber(argumentTypes.get(2)))) {
}
return STRING;
} else if (TableBuiltinScalarFunction.LENGTH.getFunctionName().equalsIgnoreCase(functionName)) {
- if (!(argumentTypes.size() == 1 && isCharType(argumentTypes.get(0)))) {
+ if (!(argumentTypes.size() == 1 && (isCharType(argumentTypes.get(0)))
+ || isObjectType(argumentTypes.get(0)))) {
throw new SemanticException(
"Scalar function "
+ functionName.toLowerCase(Locale.ENGLISH)
- + " only accepts one argument and it must be text or string data type.");
+ + " only accepts one argument and it must be text or string or object data type.");
}
- return INT32;
+ return INT64;
} else if (TableBuiltinScalarFunction.UPPER.getFunctionName().equalsIgnoreCase(functionName)) {
if (!(argumentTypes.size() == 1 && isCharType(argumentTypes.get(0)))) {
throw new SemanticException(
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/udf/UserDefineScalarFunctionTransformer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/udf/UserDefineScalarFunctionTransformer.java
index 279e3c06fd49b..47fd40ed73f92 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/udf/UserDefineScalarFunctionTransformer.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/udf/UserDefineScalarFunctionTransformer.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.db.queryengine.transformation.dag.column.udf;
-import org.apache.iotdb.commons.udf.access.RecordIterator;
+import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.RecordIterator;
import org.apache.iotdb.db.queryengine.transformation.dag.column.ColumnTransformer;
import org.apache.iotdb.db.queryengine.transformation.dag.column.multi.MultiColumnTransformer;
import org.apache.iotdb.udf.api.relational.ScalarFunction;
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/AbstractCastFunctionColumnTransformer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/AbstractCastFunctionColumnTransformer.java
index ee40a3ec6f8ed..4cd2358118f09 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/AbstractCastFunctionColumnTransformer.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/AbstractCastFunctionColumnTransformer.java
@@ -341,6 +341,7 @@ protected void castString(ColumnBuilder columnBuilder, Binary value) {
case TEXT:
case STRING:
case BLOB:
+ case OBJECT:
returnType.writeBinary(columnBuilder, value);
break;
default:
@@ -393,4 +394,15 @@ protected void castBlob(ColumnBuilder columnBuilder, Binary value) {
String.format("Cannot cast %s to %s type", stringValue, returnType.getDisplayName()));
}
}
+
+ protected void castObject(ColumnBuilder columnBuilder, Binary value) {
+ String stringValue = BytesUtils.parseObjectByteArrayToString(value.getValues());
+ switch (returnType.getTypeEnum()) {
+ case STRING:
+ returnType.writeBinary(columnBuilder, BytesUtils.valueOf(String.valueOf(stringValue)));
+ break;
+ default:
+ throw new UnsupportedOperationException(String.format(ERROR_MSG, returnType.getTypeEnum()));
+ }
+ }
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/AbstractLengthColumnTransformer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/AbstractLengthColumnTransformer.java
new file mode 100644
index 0000000000000..08eb47691668f
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/AbstractLengthColumnTransformer.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar;
+
+import org.apache.iotdb.db.queryengine.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.db.queryengine.transformation.dag.column.unary.UnaryColumnTransformer;
+
+import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.block.column.ColumnBuilder;
+import org.apache.tsfile.read.common.type.Type;
+import org.apache.tsfile.utils.Binary;
+
+public abstract class AbstractLengthColumnTransformer extends UnaryColumnTransformer {
+
+ public AbstractLengthColumnTransformer(
+ Type returnType, ColumnTransformer childColumnTransformer) {
+ super(returnType, childColumnTransformer);
+ }
+
+ @Override
+ protected void doTransform(Column column, ColumnBuilder columnBuilder) {
+ for (int i = 0, n = column.getPositionCount(); i < n; i++) {
+ if (!column.isNull(i)) {
+ columnBuilder.writeLong(transformNonNullValue(column.getBinary(i)));
+ } else {
+ columnBuilder.appendNull();
+ }
+ }
+ }
+
+ @Override
+ protected void doTransform(Column column, ColumnBuilder columnBuilder, boolean[] selection) {
+ for (int i = 0, n = column.getPositionCount(); i < n; i++) {
+ if (selection[i] && !column.isNull(i)) {
+ columnBuilder.writeLong(transformNonNullValue(column.getBinary(i)));
+ } else {
+ columnBuilder.appendNull();
+ }
+ }
+ }
+
+ protected abstract long transformNonNullValue(Binary binary);
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/CastFunctionColumnTransformer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/CastFunctionColumnTransformer.java
index b9c8e31b1abbc..624eabadc62d6 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/CastFunctionColumnTransformer.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/CastFunctionColumnTransformer.java
@@ -67,6 +67,9 @@ protected void transform(
case BLOB:
castBlob(columnBuilder, childType.getBinary(column, i));
break;
+ case OBJECT:
+ castObject(columnBuilder, childType.getBinary(column, i));
+ break;
default:
throw new UnsupportedOperationException(
String.format(
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/LengthColumnTransformer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/LengthColumnTransformer.java
index 00448c6f575b1..c94530c83d403 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/LengthColumnTransformer.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/LengthColumnTransformer.java
@@ -20,40 +20,18 @@
package org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar;
import org.apache.iotdb.db.queryengine.transformation.dag.column.ColumnTransformer;
-import org.apache.iotdb.db.queryengine.transformation.dag.column.unary.UnaryColumnTransformer;
-import org.apache.tsfile.block.column.Column;
-import org.apache.tsfile.block.column.ColumnBuilder;
import org.apache.tsfile.common.conf.TSFileConfig;
import org.apache.tsfile.read.common.type.Type;
+import org.apache.tsfile.utils.Binary;
-public class LengthColumnTransformer extends UnaryColumnTransformer {
-
+public class LengthColumnTransformer extends AbstractLengthColumnTransformer {
public LengthColumnTransformer(Type returnType, ColumnTransformer childColumnTransformer) {
super(returnType, childColumnTransformer);
}
@Override
- protected void doTransform(Column column, ColumnBuilder columnBuilder) {
- for (int i = 0, n = column.getPositionCount(); i < n; i++) {
- if (!column.isNull(i)) {
- String currentValue = column.getBinary(i).getStringValue(TSFileConfig.STRING_CHARSET);
- columnBuilder.writeInt(currentValue.length());
- } else {
- columnBuilder.appendNull();
- }
- }
- }
-
- @Override
- protected void doTransform(Column column, ColumnBuilder columnBuilder, boolean[] selection) {
- for (int i = 0, n = column.getPositionCount(); i < n; i++) {
- if (selection[i] && !column.isNull(i)) {
- String currentValue = column.getBinary(i).getStringValue(TSFileConfig.STRING_CHARSET);
- columnBuilder.writeInt(currentValue.length());
- } else {
- columnBuilder.appendNull();
- }
- }
+ protected long transformNonNullValue(Binary binary) {
+ return binary.getStringValue(TSFileConfig.STRING_CHARSET).length();
}
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ObjectLengthColumnTransformer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ObjectLengthColumnTransformer.java
new file mode 100644
index 0000000000000..5d39c6f6af3ca
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ObjectLengthColumnTransformer.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar;
+
+import org.apache.iotdb.db.queryengine.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.db.utils.ObjectTypeUtils;
+
+import org.apache.tsfile.read.common.type.Type;
+import org.apache.tsfile.utils.Binary;
+
+public class ObjectLengthColumnTransformer extends AbstractLengthColumnTransformer {
+ public ObjectLengthColumnTransformer(Type returnType, ColumnTransformer childColumnTransformer) {
+ super(returnType, childColumnTransformer);
+ }
+
+ @Override
+ protected long transformNonNullValue(Binary binary) {
+ return ObjectTypeUtils.getObjectLength(binary);
+ }
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ReadObjectColumnTransformer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ReadObjectColumnTransformer.java
index 9504c6c2282df..8c791191b42ac 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ReadObjectColumnTransformer.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ReadObjectColumnTransformer.java
@@ -19,25 +19,18 @@
package org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar;
-import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
-import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.db.queryengine.transformation.dag.column.ColumnTransformer;
import org.apache.iotdb.db.queryengine.transformation.dag.column.unary.UnaryColumnTransformer;
import org.apache.iotdb.db.utils.ObjectTypeUtils;
-import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.tsfile.block.column.Column;
import org.apache.tsfile.block.column.ColumnBuilder;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.read.common.type.Type;
import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.Pair;
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.nio.file.StandardOpenOption;
import java.util.Optional;
public class ReadObjectColumnTransformer extends UnaryColumnTransformer {
@@ -107,35 +100,14 @@ private void transform(Column column, ColumnBuilder columnBuilder, int i) {
}
private Binary readObject(Binary binary) {
- File file = ObjectTypeUtils.getObjectPathFromBinary(binary);
- long actualReadSize = getActualReadSize(file);
+ Pair ObjectLengthPathPair = ObjectTypeUtils.parseObjectBinary(binary);
+ long fileLength = ObjectLengthPathPair.getLeft();
+ String relativePath = ObjectLengthPathPair.getRight();
+ int actualReadSize =
+ ObjectTypeUtils.getActualReadSize(relativePath, fileLength, offset, length);
fragmentInstanceContext.ifPresent(
context -> context.getMemoryReservationContext().reserveMemoryCumulatively(actualReadSize));
- byte[] bytes = new byte[(int) actualReadSize];
- ByteBuffer buffer = ByteBuffer.wrap(bytes);
- try (FileChannel fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.READ)) {
- fileChannel.read(buffer, offset);
- } catch (IOException e) {
- throw new IoTDBRuntimeException(e, TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
- }
- return new Binary(bytes);
- }
-
- private long getActualReadSize(File file) {
- long fileSize = file.length();
- if (offset >= fileSize) {
- throw new SemanticException(
- String.format(
- "offset %d is greater than object size %d, file path is %s",
- offset, fileSize, file.getAbsolutePath()));
- }
- long actualReadSize = Math.min(length < 0 ? fileSize : length, fileSize - offset);
- if (actualReadSize > Integer.MAX_VALUE) {
- throw new SemanticException(
- String.format(
- "Read object size %s is too large (size > 2G), file path is %s",
- actualReadSize, file.getAbsolutePath()));
- }
- return actualReadSize;
+ return new Binary(
+ ObjectTypeUtils.readObjectContent(relativePath, offset, actualReadSize, true).array());
}
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/TryCastFunctionColumnTransformer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/TryCastFunctionColumnTransformer.java
index c25fd321c3445..419d1bbabf3be 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/TryCastFunctionColumnTransformer.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/TryCastFunctionColumnTransformer.java
@@ -69,6 +69,9 @@ protected void transform(
case BLOB:
castBlob(columnBuilder, childType.getBinary(column, i));
break;
+ case OBJECT:
+ castObject(columnBuilder, childType.getBinary(column, i));
+ break;
default:
throw new UnsupportedOperationException(
String.format(
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java
index c153061a90d62..058213450363e 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java
@@ -19,18 +19,36 @@
package org.apache.iotdb.db.utils;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
import org.apache.iotdb.commons.exception.ObjectFileNotExist;
+import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.queryengine.plan.Coordinator;
+import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
import org.apache.iotdb.db.service.metrics.FileMetrics;
import org.apache.iotdb.db.storageengine.rescon.disk.TierManager;
+import org.apache.iotdb.mpp.rpc.thrift.TReadObjectReq;
+import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.tsfile.common.conf.TSFileConfig;
import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.Collections;
+import java.util.List;
import java.util.Optional;
public class ObjectTypeUtils {
@@ -40,6 +58,119 @@ public class ObjectTypeUtils {
private ObjectTypeUtils() {}
+ public static ByteBuffer readObjectContent(
+ Binary binary, long offset, int length, boolean mayNotInCurrentNode) {
+ Pair ObjectLengthPathPair = ObjectTypeUtils.parseObjectBinary(binary);
+ long fileLength = ObjectLengthPathPair.getLeft();
+ String relativePath = ObjectLengthPathPair.getRight();
+ int actualReadSize =
+ ObjectTypeUtils.getActualReadSize(
+ relativePath, fileLength, offset, length < 0 ? fileLength : length);
+ return ObjectTypeUtils.readObjectContent(
+ relativePath, offset, actualReadSize, mayNotInCurrentNode);
+ }
+
+ public static ByteBuffer readObjectContent(
+ String relativePath, long offset, int readSize, boolean mayNotInCurrentNode) {
+ Optional objectFile = TIER_MANAGER.getAbsoluteObjectFilePath(relativePath, false);
+ if (objectFile.isPresent()) {
+ return readObjectContentFromLocalFile(objectFile.get(), offset, readSize);
+ }
+ if (mayNotInCurrentNode) {
+ return readObjectContentFromRemoteFile(relativePath, offset, readSize);
+ }
+ throw new ObjectFileNotExist(relativePath);
+ }
+
+ private static ByteBuffer readObjectContentFromLocalFile(File file, long offset, long readSize) {
+ byte[] bytes = new byte[(int) readSize];
+ ByteBuffer buffer = ByteBuffer.wrap(bytes);
+ try (FileChannel fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.READ)) {
+ fileChannel.read(buffer, offset);
+ } catch (IOException e) {
+ throw new IoTDBRuntimeException(e, TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+ }
+ buffer.flip();
+ return buffer;
+ }
+
+ private static ByteBuffer readObjectContentFromRemoteFile(
+ final String relativePath, final long offset, final int readSize) {
+ byte[] bytes = new byte[readSize];
+ ByteBuffer buffer = ByteBuffer.wrap(bytes);
+ TConsensusGroupId consensusGroupId =
+ new TConsensusGroupId(
+ TConsensusGroupType.DataRegion,
+ Integer.parseInt(Paths.get(relativePath).getName(0).toString()));
+ List regionReplicaSetList =
+ ClusterPartitionFetcher.getInstance()
+ .getRegionReplicaSet(Collections.singletonList(consensusGroupId));
+ TRegionReplicaSet regionReplicaSet = regionReplicaSetList.iterator().next();
+ final int batchSize = 1024 * 1024;
+ final TReadObjectReq req = new TReadObjectReq();
+ req.setRelativePath(relativePath);
+ for (int i = 0; i < regionReplicaSet.getDataNodeLocations().size(); i++) {
+ TDataNodeLocation dataNodeLocation = regionReplicaSet.getDataNodeLocations().get(i);
+ int toReadSizeInCurrentDataNode = readSize;
+ try (SyncDataNodeInternalServiceClient client =
+ Coordinator.getInstance()
+ .getInternalServiceClientManager()
+ .borrowClient(dataNodeLocation.getInternalEndPoint())) {
+ while (toReadSizeInCurrentDataNode > 0) {
+ req.setOffset(offset + buffer.position());
+ req.setSize(Math.min(toReadSizeInCurrentDataNode, batchSize));
+ toReadSizeInCurrentDataNode -= req.getSize();
+ ByteBuffer partial = client.readObject(req);
+ buffer.put(partial);
+ }
+ } catch (Exception e) {
+ logger.error(
+ "Failed to read object from datanode: {}" + e.getMessage(), dataNodeLocation, e);
+ if (i == regionReplicaSet.getDataNodeLocations().size() - 1) {
+ throw new IoTDBRuntimeException(e, TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+ }
+ buffer.clear();
+ req.setOffset(offset);
+ continue;
+ }
+ break;
+ }
+ buffer.flip();
+ return buffer;
+ }
+
+ public static int getActualReadSize(String filePath, long fileSize, long offset, long length) {
+ if (offset >= fileSize) {
+ throw new SemanticException(
+ String.format(
+ "offset %d is greater than object size %d, file path is %s",
+ offset, fileSize, filePath));
+ }
+ long actualReadSize = Math.min(length < 0 ? fileSize : length, fileSize - offset);
+ if (actualReadSize > Integer.MAX_VALUE) {
+ throw new SemanticException(
+ String.format(
+ "Read object size %s is too large (size > 2G), file path is %s",
+ actualReadSize, filePath));
+ }
+ return (int) actualReadSize;
+ }
+
+ public static Pair parseObjectBinary(Binary binary) {
+ byte[] bytes = binary.getValues();
+ ByteBuffer buffer = ByteBuffer.wrap(bytes);
+ long length = buffer.getLong();
+ String relativeObjectFilePath =
+ new String(bytes, 8, bytes.length - 8, TSFileConfig.STRING_CHARSET);
+ return new Pair<>(length, relativeObjectFilePath);
+ }
+
+ public static long getObjectLength(Binary binary) {
+ byte[] bytes = binary.getValues();
+ ByteBuffer wrap = ByteBuffer.wrap(bytes);
+ return wrap.getLong();
+ }
+
public static File getObjectPathFromBinary(Binary binary) {
byte[] bytes = binary.getValues();
String relativeObjectFilePath =
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/object/ObjectTypeCompactionTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/object/ObjectTypeCompactionTest.java
new file mode 100644
index 0000000000000..4487e8b704df3
--- /dev/null
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/object/ObjectTypeCompactionTest.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.dataregion.compaction.object;
+
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.schema.table.TsTable;
+import org.apache.iotdb.commons.schema.table.column.FieldColumnSchema;
+import org.apache.iotdb.commons.schema.table.column.TagColumnSchema;
+import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache;
+import org.apache.iotdb.db.storageengine.dataregion.compaction.AbstractCompactionTest;
+import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer;
+import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.ReadChunkCompactionPerformer;
+import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.ReadPointCompactionPerformer;
+import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.CrossSpaceCompactionTask;
+import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.InnerSpaceCompactionTask;
+import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.SettleCompactionTask;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
+import org.apache.iotdb.db.storageengine.rescon.disk.TierManager;
+
+import org.apache.tsfile.enums.ColumnCategory;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.exception.write.WriteProcessException;
+import org.apache.tsfile.file.metadata.ColumnSchema;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.StringArrayDeviceID;
+import org.apache.tsfile.file.metadata.TableSchema;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.BytesUtils;
+import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.write.chunk.AlignedChunkWriterImpl;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.apache.tsfile.write.writer.TsFileIOWriter;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collections;
+
+public class ObjectTypeCompactionTest extends AbstractCompactionTest {
+
+ private static final TableSchema tableSchema =
+ new TableSchema(
+ "t1",
+ Arrays.asList(
+ new ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG),
+ new ColumnSchema("s1", TSDataType.OBJECT, ColumnCategory.FIELD)));
+
+ private String threadName;
+ private File objectDir;
+
+ @Before
+ public void setUp()
+ throws IOException, WriteProcessException, MetadataException, InterruptedException {
+ this.threadName = Thread.currentThread().getName();
+ Thread.currentThread().setName("pool-1-IoTDB-Compaction-Worker-1");
+ DataNodeTableCache.getInstance().invalid(this.COMPACTION_TEST_SG);
+ createTable("t1", 1);
+ super.setUp();
+ try {
+ objectDir = new File(TierManager.getInstance().getNextFolderForObjectFile());
+ } catch (DiskSpaceInsufficientException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @After
+ public void tearDown() throws IOException, StorageEngineException {
+ super.tearDown();
+ Thread.currentThread().setName(threadName);
+ DataNodeTableCache.getInstance().invalid(this.COMPACTION_TEST_SG);
+ File[] files = objectDir.listFiles();
+ if (files != null) {
+ for (File file : files) {
+ Files.delete(file.toPath());
+ }
+ }
+ }
+
+ public void createTable(String tableName, long ttl) {
+ TsTable tsTable = new TsTable(tableName);
+ tsTable.addColumnSchema(new TagColumnSchema("device", TSDataType.STRING));
+ tsTable.addColumnSchema(
+ new FieldColumnSchema("s1", TSDataType.OBJECT, TSEncoding.PLAIN, CompressionType.LZ4));
+ tsTable.addProp(TsTable.TTL_PROPERTY, ttl + "");
+ DataNodeTableCache.getInstance().preUpdateTable(this.COMPACTION_TEST_SG, tsTable, null);
+ DataNodeTableCache.getInstance().commitUpdateTable(this.COMPACTION_TEST_SG, tableName, null);
+ }
+
+ @Test
+ public void testSeqCompactionWithTTL() throws IOException, WriteProcessException {
+ Pair pair1 =
+ generateTsFileAndObject(true, System.currentTimeMillis() - 10000);
+ Pair pair2 =
+ generateTsFileAndObject(true, System.currentTimeMillis() + 1000000);
+ tsFileManager.add(pair1.getLeft(), true);
+ tsFileManager.add(pair2.getLeft(), true);
+ InnerSpaceCompactionTask task =
+ new InnerSpaceCompactionTask(
+ 0,
+ tsFileManager,
+ tsFileManager.getTsFileList(true),
+ true,
+ new ReadChunkCompactionPerformer(),
+ 0);
+ Assert.assertTrue(task.start());
+ Assert.assertFalse(pair1.getRight().exists());
+ Assert.assertTrue(pair2.getRight().exists());
+ }
+
+ @Test
+ public void testUnseqCompactionWithTTL() throws IOException, WriteProcessException {
+ Pair pair1 =
+ generateTsFileAndObject(false, System.currentTimeMillis() + 100000);
+ Pair pair2 =
+ generateTsFileAndObject(false, System.currentTimeMillis() - 1000000);
+ tsFileManager.add(pair1.getLeft(), false);
+ tsFileManager.add(pair2.getLeft(), false);
+ InnerSpaceCompactionTask task =
+ new InnerSpaceCompactionTask(
+ 0,
+ tsFileManager,
+ tsFileManager.getTsFileList(false),
+ false,
+ new FastCompactionPerformer(false),
+ 0);
+ Assert.assertTrue(task.start());
+ Assert.assertFalse(pair2.getRight().exists());
+ Assert.assertTrue(pair1.getRight().exists());
+ }
+
+ @Test
+ public void testUnseqCompactionWithReadPointWithTTL() throws IOException, WriteProcessException {
+ Pair pair1 =
+ generateTsFileAndObject(false, System.currentTimeMillis() + 100000);
+ Pair pair2 =
+ generateTsFileAndObject(false, System.currentTimeMillis() - 1000000);
+ tsFileManager.add(pair1.getLeft(), false);
+ tsFileManager.add(pair2.getLeft(), false);
+ InnerSpaceCompactionTask task =
+ new InnerSpaceCompactionTask(
+ 0,
+ tsFileManager,
+ tsFileManager.getTsFileList(false),
+ false,
+ new ReadPointCompactionPerformer(),
+ 0);
+ Assert.assertTrue(task.start());
+ Assert.assertTrue(pair1.getRight().exists());
+ Assert.assertFalse(pair2.getRight().exists());
+ }
+
+ @Test
+ public void testCrossCompactionWithTTL() throws IOException, WriteProcessException {
+ Pair pair1 =
+ generateTsFileAndObject(true, System.currentTimeMillis() + 100000);
+ Pair pair2 =
+ generateTsFileAndObject(false, System.currentTimeMillis() - 1000000);
+ tsFileManager.add(pair1.getLeft(), true);
+ tsFileManager.add(pair2.getLeft(), false);
+ CrossSpaceCompactionTask task =
+ new CrossSpaceCompactionTask(
+ 0,
+ tsFileManager,
+ tsFileManager.getTsFileList(true),
+ tsFileManager.getTsFileList(false),
+ new FastCompactionPerformer(true),
+ 1,
+ 0);
+ Assert.assertTrue(task.start());
+ Assert.assertFalse(pair2.getRight().exists());
+ Assert.assertTrue(pair1.getRight().exists());
+ }
+
+ @Test
+ public void testSettleCompaction() throws IOException, WriteProcessException {
+ Pair pair1 =
+ generateTsFileAndObject(true, System.currentTimeMillis() - 10000);
+ Pair pair2 =
+ generateTsFileAndObject(true, System.currentTimeMillis() + 1000000);
+ tsFileManager.add(pair1.getLeft(), true);
+ tsFileManager.add(pair2.getLeft(), true);
+ SettleCompactionTask task =
+ new SettleCompactionTask(
+ 0,
+ tsFileManager,
+ tsFileManager.getTsFileList(true),
+ Collections.emptyList(),
+ true,
+ new FastCompactionPerformer(true),
+ 0);
+ Assert.assertTrue(task.start());
+ Assert.assertFalse(pair1.getRight().exists());
+ Assert.assertTrue(pair2.getRight().exists());
+ }
+
+ private Pair generateTsFileAndObject(boolean seq, long timestamp)
+ throws IOException, WriteProcessException {
+ TsFileResource resource = createEmptyFileAndResource(seq);
+ Path testFile1 = Files.createTempFile(objectDir.toPath(), "test_", ".bin");
+ byte[] content = new byte[100];
+ for (int i = 0; i < 100; i++) {
+ content[i] = (byte) i;
+ }
+ Files.write(testFile1, content);
+ String relativePath = testFile1.toFile().getName();
+ ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES + relativePath.length());
+ buffer.putLong(100L);
+ buffer.put(BytesUtils.stringToBytes(relativePath));
+ buffer.flip();
+ IDeviceID deviceID = new StringArrayDeviceID("t1", "d1");
+ try (TsFileIOWriter writer = new TsFileIOWriter(resource.getTsFile())) {
+ writer.getSchema().registerTableSchema(tableSchema);
+ writer.startChunkGroup(deviceID);
+ AlignedChunkWriterImpl alignedChunkWriter =
+ new AlignedChunkWriterImpl(Arrays.asList(new MeasurementSchema("s1", TSDataType.OBJECT)));
+ alignedChunkWriter.write(timestamp);
+ alignedChunkWriter.write(timestamp, new Binary(buffer.array()), false);
+ alignedChunkWriter.sealCurrentPage();
+ alignedChunkWriter.writeToFileWriter(writer);
+ writer.endChunkGroup();
+ writer.endFile();
+ }
+ resource.updateStartTime(deviceID, 1);
+ resource.updateEndTime(deviceID, 1);
+ resource.serialize();
+ resource.deserialize();
+ resource.setStatus(TsFileResourceStatus.NORMAL);
+ return new Pair<>(resource, testFile1.toFile());
+ }
+}
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
index caaf44c16a770..469d2bb006aef 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
@@ -773,6 +773,12 @@ struct TKillQueryInstanceReq {
2: optional string allowedUsername
}
+struct TReadObjectReq {
+ 1: string relativePath
+ 2: i64 offset
+ 3: i32 size
+}
+
/**
* END: Used for EXPLAIN ANALYZE
**/
@@ -1257,6 +1263,8 @@ service IDataNodeRPCService {
* Write an audit log entry to the DataNode's AuditEventLogger
*/
common.TSStatus writeAuditLog(TAuditLogReq req);
+
+ binary readObject(TReadObjectReq req);
}
service MPPDataExchangeService {