From 5d607eba0521a53c4f98e5b6e2e4a7c14be58c20 Mon Sep 17 00:00:00 2001 From: shuwenwei Date: Thu, 27 Nov 2025 18:44:38 +0800 Subject: [PATCH 1/4] add functions for object type and add tests --- .../query/recent/IoTDBObjectTypeQueryIT.java | 302 ++++++++++++++++++ .../udf/api/relational/access/Record.java | 35 +- .../confignode1conf/iotdb-system.properties | 2 +- .../impl/DataNodeInternalRPCServiceImpl.java | 8 + .../process/function/partition/Slice.java | 15 + .../PatternExpressionComputation.java | 5 +- .../aggregation/MaskedRecordIterator.java | 2 - .../aggregation}/RecordIterator.java | 17 +- ...erDefinedAggregateFunctionAccumulator.java | 1 - ...roupedUserDefinedAggregateAccumulator.java | 2 +- .../relational/ColumnTransformerBuilder.java | 12 +- .../plan/analyze/ClusterPartitionFetcher.java | 4 + .../plan/planner/OperatorTreeGenerator.java | 1 + .../plan/planner/TableOperatorGenerator.java | 6 +- .../node/write/RelationalInsertRowsNode.java | 3 + .../metadata/TableMetadataImpl.java | 7 +- .../UserDefineScalarFunctionTransformer.java | 2 +- ...AbstractCastFunctionColumnTransformer.java | 12 + .../AbstractLengthColumnTransformer.java | 60 ++++ .../scalar/CastFunctionColumnTransformer.java | 3 + .../unary/scalar/LengthColumnTransformer.java | 30 +- .../scalar/ObjectLengthColumnTransformer.java | 37 +++ .../scalar/ReadObjectColumnTransformer.java | 44 +-- .../TryCastFunctionColumnTransformer.java | 3 + .../iotdb/db/utils/ObjectTypeUtils.java | 124 +++++++ .../object/ObjectTypeCompactionTest.java | 259 +++++++++++++++ .../src/main/thrift/datanode.thrift | 8 + 27 files changed, 922 insertions(+), 82 deletions(-) create mode 100644 integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBObjectTypeQueryIT.java rename iotdb-core/{node-commons/src/main/java/org/apache/iotdb/commons/udf/access => datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation}/RecordIterator.java (88%) create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/AbstractLengthColumnTransformer.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ObjectLengthColumnTransformer.java create mode 100644 iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/object/ObjectTypeCompactionTest.java diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBObjectTypeQueryIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBObjectTypeQueryIT.java new file mode 100644 index 0000000000000..45b32a57b07c3 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBObjectTypeQueryIT.java @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.relational.it.query.recent; + +import org.apache.iotdb.isession.ITableSession; +import org.apache.iotdb.isession.SessionDataSet; +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.TableClusterIT; +import org.apache.iotdb.itbase.category.TableLocalStandaloneIT; +import org.apache.iotdb.itbase.env.BaseEnv; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; + +import org.apache.tsfile.utils.Binary; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.sql.Connection; +import java.sql.Statement; +import java.time.LocalDate; + +@RunWith(IoTDBTestRunner.class) +@Category({TableLocalStandaloneIT.class, TableClusterIT.class}) +public class IoTDBObjectTypeQueryIT { + + private static final String DATABASE_NAME = "test"; + + @BeforeClass + public static void setUp() throws Exception { + EnvFactory.getEnv().initClusterEnvironment(); + try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT); + Statement statement = connection.createStatement()) { + statement.execute("CREATE DATABASE " + DATABASE_NAME); + statement.execute("USE " + DATABASE_NAME); + statement.execute( + "CREATE TABLE table1(device STRING TAG, s4 DATE FIELD, s5 TIMESTAMP FIELD, s6 BLOB FIELD, s7 STRING FIELD, s8 OBJECT FIELD, s9 OBJECT FIELD)"); + for (int i = 1; i <= 10; i++) { + for (int j = 0; j < 10; j++) { + statement.execute( + String.format( + "insert into table1(time, device, s4, s5, s6, s7, s8) " + + "values(%d, '%s', '%s', %d, %s, '%s', %s)", + j, + "d" + i, + LocalDate.of(2024, 5, i % 31 + 1), + j, + "X'cafebabe'", + j, + "to_object(true, 0, X'cafebabe')")); + if (i == 10 && j == 9) { + statement.execute( + String.format( + "insert into table1(time, device, s4, s5, s6, s7, s8) " + + "values(%d, '%s', '%s', %d, %s, '%s', %s)", + j, "d" + i, LocalDate.of(2024, 5, i % 31 + 1), j, "X'cafebabe'", j, "null")); + } + } + } + } + } + + @AfterClass + public static void tearDown() { + EnvFactory.getEnv().cleanClusterEnvironment(); + } + + @Test + public void testObjectLength() throws IoTDBConnectionException, StatementExecutionException { + try (ITableSession session = EnvFactory.getEnv().getTableSessionConnection()) { + session.executeNonQueryStatement("USE " + DATABASE_NAME); + SessionDataSet sessionDataSet = + session.executeQueryStatement("select length(s8) from table1 limit 1"); + SessionDataSet.DataIterator iterator = sessionDataSet.iterator(); + while (iterator.next()) { + long length = iterator.getLong(1); + Assert.assertEquals(4, length); + } + } + } + + @Test + public void testReadObject() throws IoTDBConnectionException, StatementExecutionException { + try (ITableSession session = EnvFactory.getEnv().getTableSessionConnection()) { + session.executeNonQueryStatement("USE " + DATABASE_NAME); + SessionDataSet sessionDataSet = + session.executeQueryStatement("select read_object(s8) from table1 where device = 'd2'"); + SessionDataSet.DataIterator iterator = sessionDataSet.iterator(); + byte[] expected = new byte[] {(byte) 0xCA, (byte) 0xFE, (byte) 0xBA, (byte) 0xBE}; + while (iterator.next()) { + Binary blob = iterator.getBlob(1); + Assert.assertArrayEquals(expected, blob.getValues()); + } + + sessionDataSet = + session.executeQueryStatement( + "select read_object(s8, 1) from table1 where device = 'd3'"); + iterator = sessionDataSet.iterator(); + expected = new byte[] {(byte) 0xFE, (byte) 0xBA, (byte) 0xBE}; + while (iterator.next()) { + Binary blob = iterator.getBlob(1); + Assert.assertArrayEquals(expected, blob.getValues()); + } + + sessionDataSet = + session.executeQueryStatement( + "select read_object(s8, 1, 2) from table1 where device = 'd1'"); + iterator = sessionDataSet.iterator(); + expected = new byte[] {(byte) 0xFE, (byte) 0xBA}; + while (iterator.next()) { + Binary blob = iterator.getBlob(1); + Assert.assertArrayEquals(expected, blob.getValues()); + } + + sessionDataSet = + session.executeQueryStatement( + "select read_object(s8, 1, 1000) from table1 where device = 'd1'"); + iterator = sessionDataSet.iterator(); + expected = new byte[] {(byte) 0xFE, (byte) 0xBA, (byte) 0xBE}; + while (iterator.next()) { + Binary blob = iterator.getBlob(1); + Assert.assertArrayEquals(expected, blob.getValues()); + } + + sessionDataSet = + session.executeQueryStatement( + "select count(*) from table1 where device = 'd1' and s6 = read_object(s8)"); + iterator = sessionDataSet.iterator(); + while (iterator.next()) { + long count = iterator.getLong(1); + Assert.assertEquals(10, count); + } + + // read_object are not pushed down. Read remote files + sessionDataSet = + session.executeQueryStatement( + "select read_object(t1_s8) from (select t1.s8 as t1_s8, t2.s8 as t2_s8 from table1 as t1 inner join table1 as t2 using(time))"); + iterator = sessionDataSet.iterator(); + expected = new byte[] {(byte) 0xCA, (byte) 0xFE, (byte) 0xBA, (byte) 0xBE}; + while (iterator.next()) { + Binary blob = iterator.getBlob(1); + Assert.assertArrayEquals(expected, blob.getValues()); + } + } + } + + @Test + public void testFunctionAndClauses() + throws IoTDBConnectionException, StatementExecutionException { + try (ITableSession session = EnvFactory.getEnv().getTableSessionConnection()) { + session.executeNonQueryStatement("USE " + DATABASE_NAME); + + SessionDataSet sessionDataSet = + session.executeQueryStatement( + "select length(s8) from table1 where device = 'd2' and s8 is not null limit 1"); + SessionDataSet.DataIterator iterator = sessionDataSet.iterator(); + while (iterator.next()) { + Assert.assertEquals(4, iterator.getLong(1)); + } + sessionDataSet = + session.executeQueryStatement( + "select count(s8), first(s8), last(s8), first_by(s8, time), last_by(s8, time) from table1 where device = 'd1' and cast(s8 as string) = '(Object) 4 B' and try_cast(s8 as string) = '(Object) 4 B'"); + iterator = sessionDataSet.iterator(); + while (iterator.next()) { + Assert.assertEquals(10, iterator.getLong(1)); + Assert.assertEquals("(Object) 4 B", iterator.getString(2)); + Assert.assertEquals("(Object) 4 B", iterator.getString(3)); + Assert.assertEquals("(Object) 4 B", iterator.getString(4)); + Assert.assertEquals("(Object) 4 B", iterator.getString(5)); + } + + sessionDataSet = session.executeQueryStatement("select coalesce(s9, s8) from table1"); + iterator = sessionDataSet.iterator(); + while (iterator.next()) { + Assert.assertEquals("(Object) 4 B", iterator.getString(1)); + } + + // MATCH_RECOGNIZE + Assert.assertThrows( + StatementExecutionException.class, + () -> + session.executeNonQueryStatement( + "select m.cnt from table1 match_recognize (order by s8 measures RPR_LAST(time) as cnt one row per match pattern (B+) define B as B.s6 = prev(B.s6)) as m")); + Assert.assertThrows( + StatementExecutionException.class, + () -> + session.executeNonQueryStatement( + "select m.cnt from table1 match_recognize (partition by s8 measures RPR_LAST(time) as cnt one row per match pattern (B+) define B as B.s6 = prev(B.s6)) as m")); + + sessionDataSet = + session.executeQueryStatement( + "select m.value from table1 match_recognize(partition by s6 measures prev(s8) as value one row per match pattern (B+) define B as B.s6=prev(B.s6)) as m"); + iterator = sessionDataSet.iterator(); + while (iterator.next()) { + Assert.assertEquals("(Object) 4 B", iterator.getString(1)); + } + + // WHERE + session.executeQueryStatement( + "select time, s8 from table1 where device = 'd10' and s8 is not null"); + iterator = sessionDataSet.iterator(); + while (iterator.next()) { + Assert.assertEquals("(Object) 4 B", iterator.getString(2)); + } + + // GROUP BY + Assert.assertThrows( + StatementExecutionException.class, + () -> session.executeNonQueryStatement("select count(*) from table1 group by s8")); + + // ORDER BY + Assert.assertThrows( + StatementExecutionException.class, + () -> session.executeNonQueryStatement("select count(*) from table1 group by s8")); + + // FILL + Assert.assertThrows( + StatementExecutionException.class, + () -> + session.executeNonQueryStatement( + "select time, s8 from table1 where device = 'd10' fill method linear")); + session.executeQueryStatement( + "select time, s8 from table1 where device = 'd10' fill method previous"); + iterator = sessionDataSet.iterator(); + while (iterator.next()) { + Assert.assertEquals("(Object) 4 B", iterator.getString(2)); + } + + // HAVING + session.executeQueryStatement( + "select device, count(s8) from table1 group by device having count(s8) > 0"); + iterator = sessionDataSet.iterator(); + while (iterator.next()) { + long count = iterator.getLong(2); + Assert.assertEquals(10, count); + } + + // WINDOW + Assert.assertThrows( + StatementExecutionException.class, + () -> + session.executeNonQueryStatement( + "select *, nth_value(s8,2) over(partition by s8) from table1")); + Assert.assertThrows( + StatementExecutionException.class, + () -> + session.executeNonQueryStatement( + "select *, nth_value(s8,2) over(order by s8) from table1")); + session.executeNonQueryStatement( + "select *, nth_value(s8,2) over(partition by device) from table1"); + session.executeNonQueryStatement( + "select *, lead(s8) over(partition by device order by time) from table1"); + session.executeNonQueryStatement( + "select *, first_value(s8) over(partition by device) from table1"); + session.executeNonQueryStatement( + "select *, last_value(s8) over(partition by device) from table1"); + session.executeNonQueryStatement( + "select *, lag(s8) over(partition by device order by time) from table1"); + + // Table-value function + Assert.assertThrows( + StatementExecutionException.class, + () -> + session.executeNonQueryStatement( + "select * from session(data => table1 partition by s8, timecol => 'time', gap => 1ms)")); + Assert.assertThrows( + StatementExecutionException.class, + () -> + session.executeNonQueryStatement( + "select * from session(data => table1 order by s8, timecol => 'time', gap => 1ms)")); + sessionDataSet = + session.executeQueryStatement( + "select * from hop(data => table1, timecol => 'time', slide => 1ms, size => 2ms)"); + iterator = sessionDataSet.iterator(); + while (iterator.next()) { + String str = iterator.getString("s8"); + Assert.assertEquals("(Object) 4 B", str); + } + } + } +} diff --git a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/access/Record.java b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/access/Record.java index 8c6e2a7f3578c..c4baa5fc5c8e2 100644 --- a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/access/Record.java +++ b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/access/Record.java @@ -83,7 +83,7 @@ public interface Record { * Returns the Binary value at the specified column in this row. * *

Users need to ensure that the data type of the specified column is {@code TSDataType.TEXT}, - * {@code TSDataType.STRING} or {@code TSDataType.BLOB}. + * {@code TSDataType.STRING} or {@code TSDataType.BLOB} or {@code TSDataType.OBJECT}. * * @param columnIndex index of the specified column * @return the Binary value at the specified column in this row @@ -94,7 +94,7 @@ public interface Record { * Returns the String value at the specified column in this row. * *

Users need to ensure that the data type of the specified column is {@code TSDataType.TEXT} - * or {@code TSDataType.STRING}. + * or {@code TSDataType.STRING} or {@code TSDataType.OBJECT}. * * @param columnIndex index of the specified column * @return the String value at the specified column in this row @@ -113,6 +113,37 @@ public interface Record { Object getObject(int columnIndex); + /** + * Returns the Binary representation of an object stored at the specified column in this row. + * + *

Users need to ensure that the data type of the specified column is {@code + * TSDataType.OBJECT}. + * + *

This method returns the entire binary data of the object and may require considerable memory + * if the stored object is large. + * + * @param columnIndex index of the specified column + * @return the Binary content of the object at the specified column + */ + Binary readObject(int columnIndex); + + /** + * Returns a partial Binary segment of an object stored at the specified column in this row. + * + *

Users need to ensure that the data type of the specified column is {@code + * TSDataType.OBJECT}. + * + *

This method enables reading a subset of the stored object without materializing the entire + * binary data in memory, which is useful for large objects and streaming access patterns. + * + * @param columnIndex index of the specified column + * @param offset byte offset of the subsection read + * @param length number of bytes to read starting from the offset. If length < 0, read the entire + * binary data from offset. + * @return the Binary content of the object segment at the specified column + */ + Binary readObject(int columnIndex, long offset, long length); + /** * Returns the actual data type of the value at the specified column in this row. * diff --git a/iotdb-core/confignode/src/test/resources/confignode1conf/iotdb-system.properties b/iotdb-core/confignode/src/test/resources/confignode1conf/iotdb-system.properties index b396e373f8699..082ee7bf48811 100644 --- a/iotdb-core/confignode/src/test/resources/confignode1conf/iotdb-system.properties +++ b/iotdb-core/confignode/src/test/resources/confignode1conf/iotdb-system.properties @@ -34,7 +34,7 @@ timestamp_precision=ms data_region_consensus_protocol_class=org.apache.iotdb.consensus.iot.IoTConsensus schema_region_consensus_protocol_class=org.apache.iotdb.consensus.ratis.RatisConsensus schema_replication_factor=3 -data_replication_factor=3 +data_replication_factor=1 udf_lib_dir=target/confignode1/ext/udf trigger_lib_dir=target/confignode1/ext/trigger pipe_lib_dir=target/confignode1/ext/pipe diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java index dcc62fcd3c8f1..8f1b97b71964a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java @@ -201,6 +201,7 @@ import org.apache.iotdb.db.trigger.executor.TriggerExecutor; import org.apache.iotdb.db.trigger.executor.TriggerFireResult; import org.apache.iotdb.db.trigger.service.TriggerManagementService; +import org.apache.iotdb.db.utils.ObjectTypeUtils; import org.apache.iotdb.db.utils.SetThreadName; import org.apache.iotdb.metrics.type.AutoGauge; import org.apache.iotdb.metrics.utils.MetricLevel; @@ -279,6 +280,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TPushTopicMetaReq; import org.apache.iotdb.mpp.rpc.thrift.TPushTopicMetaResp; import org.apache.iotdb.mpp.rpc.thrift.TPushTopicMetaRespExceptionMessage; +import org.apache.iotdb.mpp.rpc.thrift.TReadObjectReq; import org.apache.iotdb.mpp.rpc.thrift.TRegionLeaderChangeReq; import org.apache.iotdb.mpp.rpc.thrift.TRegionLeaderChangeResp; import org.apache.iotdb.mpp.rpc.thrift.TRegionMigrateResult; @@ -3051,6 +3053,12 @@ public TSStatus writeAuditLog(TAuditLogReq req) { return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS); } + @Override + public ByteBuffer readObject(TReadObjectReq req) { + return ObjectTypeUtils.readObjectContent( + req.getRelativePath(), req.getOffset(), req.getSize(), false); + } + public void handleClientExit() { // Do nothing } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/Slice.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/Slice.java index b7ff6f3e0fdd5..5e09ca5ff26d5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/Slice.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/Slice.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.queryengine.execution.operator.process.function.partition; +import org.apache.iotdb.db.utils.ObjectTypeUtils; import org.apache.iotdb.udf.api.relational.access.Record; import org.apache.iotdb.udf.api.type.Type; @@ -186,6 +187,20 @@ public Object getObject(int columnIndex) { return originalColumns[columnIndex].getObject(offset); } + @Override + public Binary readObject(int columnIndex, long offset, long length) { + if (getDataType(columnIndex) == Type.OBJECT) { + throw new UnsupportedOperationException("current column is not object column"); + } + Binary binary = getBinary(columnIndex); + return new Binary(ObjectTypeUtils.readObjectContent(binary, offset, length, true).array()); + } + + @Override + public Binary readObject(int columnIndex) { + return readObject(columnIndex, 0L, -1); + } + @Override public Type getDataType(int columnIndex) { return dataTypes.get(columnIndex); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/rowpattern/expression/PatternExpressionComputation.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/rowpattern/expression/PatternExpressionComputation.java index 6b5032627b219..cb8cbdccbb695 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/rowpattern/expression/PatternExpressionComputation.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/rowpattern/expression/PatternExpressionComputation.java @@ -35,6 +35,7 @@ import org.apache.tsfile.read.common.type.BooleanType; import org.apache.tsfile.read.common.type.DoubleType; import org.apache.tsfile.read.common.type.FloatType; +import org.apache.tsfile.read.common.type.ObjectType; import org.apache.tsfile.read.common.type.Type; import java.util.ArrayList; @@ -161,7 +162,9 @@ private Object getValueFromPartition( return partition.getFloat(channel, position); } else if (type instanceof DoubleType) { return partition.getDouble(channel, position); - } else if (type instanceof AbstractVarcharType || type instanceof BlobType) { + } else if (type instanceof AbstractVarcharType + || type instanceof BlobType + || type instanceof ObjectType) { return partition.getBinary(channel, position); } else { throw new SemanticException("Unsupported type: " + type.getClass().getSimpleName()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/MaskedRecordIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/MaskedRecordIterator.java index 5237fcfd12e6c..48095927eda56 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/MaskedRecordIterator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/MaskedRecordIterator.java @@ -19,8 +19,6 @@ package org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation; -import org.apache.iotdb.commons.udf.access.RecordIterator; - import org.apache.tsfile.block.column.Column; import org.apache.tsfile.read.common.type.Type; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/access/RecordIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/RecordIterator.java similarity index 88% rename from iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/access/RecordIterator.java rename to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/RecordIterator.java index 6f5813955dd87..f2f306f2bc2d8 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/access/RecordIterator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/RecordIterator.java @@ -17,9 +17,10 @@ * under the License. */ -package org.apache.iotdb.commons.udf.access; +package org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation; import org.apache.iotdb.commons.udf.utils.UDFDataTypeTransformer; +import org.apache.iotdb.db.utils.ObjectTypeUtils; import org.apache.iotdb.udf.api.relational.access.Record; import org.apache.iotdb.udf.api.type.Type; @@ -138,6 +139,20 @@ public Object getObject(int columnIndex) { return childrenColumns.get(columnIndex).getObject(index); } + @Override + public Binary readObject(int columnIndex, long offset, long length) { + if (getDataType(columnIndex) == Type.OBJECT) { + throw new UnsupportedOperationException("current column is not object column"); + } + Binary binary = getBinary(columnIndex); + return new Binary(ObjectTypeUtils.readObjectContent(binary, offset, length, true).array()); + } + + @Override + public Binary readObject(int columnIndex) { + return readObject(columnIndex, 0L, -1); + } + @Override public Type getDataType(int columnIndex) { return UDFDataTypeTransformer.transformReadTypeToUDFDataType(dataTypes.get(columnIndex)); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/UserDefinedAggregateFunctionAccumulator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/UserDefinedAggregateFunctionAccumulator.java index 64c0896b3a180..70e1c0c5d4432 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/UserDefinedAggregateFunctionAccumulator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/UserDefinedAggregateFunctionAccumulator.java @@ -19,7 +19,6 @@ package org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation; -import org.apache.iotdb.commons.udf.access.RecordIterator; import org.apache.iotdb.udf.api.State; import org.apache.iotdb.udf.api.customizer.analysis.AggregateFunctionAnalysis; import org.apache.iotdb.udf.api.relational.AggregateFunction; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedUserDefinedAggregateAccumulator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedUserDefinedAggregateAccumulator.java index b3d24e923aed2..9ac6b48db7cc4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedUserDefinedAggregateAccumulator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedUserDefinedAggregateAccumulator.java @@ -19,9 +19,9 @@ package org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped; -import org.apache.iotdb.commons.udf.access.RecordIterator; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.AggregationMask; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.MaskedRecordIterator; +import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.RecordIterator; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.array.ObjectBigArray; import org.apache.iotdb.udf.api.State; import org.apache.iotdb.udf.api.relational.AggregateFunction; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/relational/ColumnTransformerBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/relational/ColumnTransformerBuilder.java index 0378c80f042be..19f81b78f6839 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/relational/ColumnTransformerBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/relational/ColumnTransformerBuilder.java @@ -163,6 +163,7 @@ import org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.Log10ColumnTransformer; import org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.LongToBytesColumnTransformer; import org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.LowerColumnTransformer; +import org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.ObjectLengthColumnTransformer; import org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.RTrim2ColumnTransformer; import org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.RTrimColumnTransformer; import org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.RadiansColumnTransformer; @@ -243,7 +244,6 @@ import static org.apache.tsfile.read.common.type.FloatType.FLOAT; import static org.apache.tsfile.read.common.type.IntType.INT32; import static org.apache.tsfile.read.common.type.LongType.INT64; -import static org.apache.tsfile.read.common.type.ObjectType.OBJECT; import static org.apache.tsfile.read.common.type.StringType.STRING; public class ColumnTransformerBuilder @@ -780,7 +780,9 @@ private ColumnTransformer getFunctionColumnTransformer( } else if (TableBuiltinScalarFunction.LENGTH.getFunctionName().equalsIgnoreCase(functionName)) { ColumnTransformer first = this.process(children.get(0), context); if (children.size() == 1) { - return new LengthColumnTransformer(INT32, first); + return context.inputDataTypes.get(0) == TSDataType.OBJECT + ? new ObjectLengthColumnTransformer(INT64, first) + : new LengthColumnTransformer(INT64, first); } } else if (TableBuiltinScalarFunction.UPPER.getFunctionName().equalsIgnoreCase(functionName)) { ColumnTransformer first = this.process(children.get(0), context); @@ -1456,10 +1458,10 @@ private ColumnTransformer getFunctionColumnTransformer( .equalsIgnoreCase(functionName)) { ColumnTransformer first = this.process(children.get(0), context); if (children.size() == 1) { - return new ReadObjectColumnTransformer(OBJECT, first, context.fragmentInstanceContext); + return new ReadObjectColumnTransformer(BLOB, first, context.fragmentInstanceContext); } else if (children.size() == 2) { return new ReadObjectColumnTransformer( - OBJECT, + BLOB, ((LongLiteral) children.get(1)).getParsedValue(), first, context.fragmentInstanceContext); @@ -1468,7 +1470,7 @@ private ColumnTransformer getFunctionColumnTransformer( long length = ((LongLiteral) children.get(2)).getParsedValue(); checkArgument(offset >= 0 && length >= 0); return new ReadObjectColumnTransformer( - OBJECT, + BLOB, ((LongLiteral) children.get(1)).getParsedValue(), ((LongLiteral) children.get(2)).getParsedValue(), first, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java index 8c54fd640f88c..2274762341b5d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java @@ -308,6 +308,10 @@ public boolean updateRegionCache(final TRegionRouteReq req) { return partitionCache.updateGroupIdToReplicaSetMap(req.getTimestamp(), req.getRegionRouteMap()); } + public List getRegionReplicaSet(List consensusGroupIds) { + return partitionCache.getRegionReplicaSet(consensusGroupIds); + } + @Override public void invalidAllCache() { partitionCache.invalidAllCache(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java index 581641591837b..44be3e69afb7c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java @@ -1435,6 +1435,7 @@ public static IFill[] getPreviousFill( case TEXT: case STRING: case BLOB: + case OBJECT: previousFill[i] = filter == null ? new BinaryPreviousFill() diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java index d7d4a951cb860..c29ceaff04f82 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java @@ -268,6 +268,7 @@ import org.apache.tsfile.read.common.type.BinaryType; import org.apache.tsfile.read.common.type.BlobType; import org.apache.tsfile.read.common.type.BooleanType; +import org.apache.tsfile.read.common.type.ObjectType; import org.apache.tsfile.read.common.type.Type; import org.apache.tsfile.read.common.type.TypeFactory; import org.apache.tsfile.read.filter.basic.Filter; @@ -3825,6 +3826,7 @@ private boolean[] checkStatisticAndScanOrder( case MAX: case MIN: if (BlobType.BLOB.equals(argumentType) + || ObjectType.OBJECT.equals(argumentType) || BinaryType.TEXT.equals(argumentType) || BooleanType.BOOLEAN.equals(argumentType)) { canUseStatistic = false; @@ -3840,8 +3842,8 @@ private boolean[] checkStatisticAndScanOrder( descendingCount++; } - // first/last/first_by/last_by aggregation with BLOB type can not use statistics - if (BlobType.BLOB.equals(argumentType)) { + // first/last/first_by/last_by aggregation with BLOB or OBJECT type can not use statistics + if (BlobType.BLOB.equals(argumentType) || ObjectType.OBJECT.equals(argumentType)) { canUseStatistic = false; break; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java index 83f6bbec63e09..2297ddcebdd01 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java @@ -207,6 +207,9 @@ private void handleObjectValue( for (int j = 0; j < insertRowNode.getDataTypes().length; j++) { if (insertRowNode.getDataTypes()[j] == TSDataType.OBJECT) { Object[] values = insertRowNode.getValues(); + if (values[j] == null) { + continue; + } byte[] binary = ((Binary) values[j]).getValues(); ByteBuffer buffer = ByteBuffer.wrap(binary); boolean isEoF = buffer.get() == 1; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java index cc9af5c9b7249..b6242ec000fc6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java @@ -265,13 +265,14 @@ && isIntegerNumber(argumentTypes.get(2)))) { } return STRING; } else if (TableBuiltinScalarFunction.LENGTH.getFunctionName().equalsIgnoreCase(functionName)) { - if (!(argumentTypes.size() == 1 && isCharType(argumentTypes.get(0)))) { + if (!(argumentTypes.size() == 1 && (isCharType(argumentTypes.get(0))) + || isObjectType(argumentTypes.get(0)))) { throw new SemanticException( "Scalar function " + functionName.toLowerCase(Locale.ENGLISH) - + " only accepts one argument and it must be text or string data type."); + + " only accepts one argument and it must be text or string or object data type."); } - return INT32; + return INT64; } else if (TableBuiltinScalarFunction.UPPER.getFunctionName().equalsIgnoreCase(functionName)) { if (!(argumentTypes.size() == 1 && isCharType(argumentTypes.get(0)))) { throw new SemanticException( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/udf/UserDefineScalarFunctionTransformer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/udf/UserDefineScalarFunctionTransformer.java index 279e3c06fd49b..47fd40ed73f92 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/udf/UserDefineScalarFunctionTransformer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/udf/UserDefineScalarFunctionTransformer.java @@ -19,7 +19,7 @@ package org.apache.iotdb.db.queryengine.transformation.dag.column.udf; -import org.apache.iotdb.commons.udf.access.RecordIterator; +import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.RecordIterator; import org.apache.iotdb.db.queryengine.transformation.dag.column.ColumnTransformer; import org.apache.iotdb.db.queryengine.transformation.dag.column.multi.MultiColumnTransformer; import org.apache.iotdb.udf.api.relational.ScalarFunction; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/AbstractCastFunctionColumnTransformer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/AbstractCastFunctionColumnTransformer.java index ee40a3ec6f8ed..4cd2358118f09 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/AbstractCastFunctionColumnTransformer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/AbstractCastFunctionColumnTransformer.java @@ -341,6 +341,7 @@ protected void castString(ColumnBuilder columnBuilder, Binary value) { case TEXT: case STRING: case BLOB: + case OBJECT: returnType.writeBinary(columnBuilder, value); break; default: @@ -393,4 +394,15 @@ protected void castBlob(ColumnBuilder columnBuilder, Binary value) { String.format("Cannot cast %s to %s type", stringValue, returnType.getDisplayName())); } } + + protected void castObject(ColumnBuilder columnBuilder, Binary value) { + String stringValue = BytesUtils.parseObjectByteArrayToString(value.getValues()); + switch (returnType.getTypeEnum()) { + case STRING: + returnType.writeBinary(columnBuilder, BytesUtils.valueOf(String.valueOf(stringValue))); + break; + default: + throw new UnsupportedOperationException(String.format(ERROR_MSG, returnType.getTypeEnum())); + } + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/AbstractLengthColumnTransformer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/AbstractLengthColumnTransformer.java new file mode 100644 index 0000000000000..08eb47691668f --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/AbstractLengthColumnTransformer.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar; + +import org.apache.iotdb.db.queryengine.transformation.dag.column.ColumnTransformer; +import org.apache.iotdb.db.queryengine.transformation.dag.column.unary.UnaryColumnTransformer; + +import org.apache.tsfile.block.column.Column; +import org.apache.tsfile.block.column.ColumnBuilder; +import org.apache.tsfile.read.common.type.Type; +import org.apache.tsfile.utils.Binary; + +public abstract class AbstractLengthColumnTransformer extends UnaryColumnTransformer { + + public AbstractLengthColumnTransformer( + Type returnType, ColumnTransformer childColumnTransformer) { + super(returnType, childColumnTransformer); + } + + @Override + protected void doTransform(Column column, ColumnBuilder columnBuilder) { + for (int i = 0, n = column.getPositionCount(); i < n; i++) { + if (!column.isNull(i)) { + columnBuilder.writeLong(transformNonNullValue(column.getBinary(i))); + } else { + columnBuilder.appendNull(); + } + } + } + + @Override + protected void doTransform(Column column, ColumnBuilder columnBuilder, boolean[] selection) { + for (int i = 0, n = column.getPositionCount(); i < n; i++) { + if (selection[i] && !column.isNull(i)) { + columnBuilder.writeLong(transformNonNullValue(column.getBinary(i))); + } else { + columnBuilder.appendNull(); + } + } + } + + protected abstract long transformNonNullValue(Binary binary); +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/CastFunctionColumnTransformer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/CastFunctionColumnTransformer.java index b9c8e31b1abbc..624eabadc62d6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/CastFunctionColumnTransformer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/CastFunctionColumnTransformer.java @@ -67,6 +67,9 @@ protected void transform( case BLOB: castBlob(columnBuilder, childType.getBinary(column, i)); break; + case OBJECT: + castObject(columnBuilder, childType.getBinary(column, i)); + break; default: throw new UnsupportedOperationException( String.format( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/LengthColumnTransformer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/LengthColumnTransformer.java index 00448c6f575b1..c94530c83d403 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/LengthColumnTransformer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/LengthColumnTransformer.java @@ -20,40 +20,18 @@ package org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar; import org.apache.iotdb.db.queryengine.transformation.dag.column.ColumnTransformer; -import org.apache.iotdb.db.queryengine.transformation.dag.column.unary.UnaryColumnTransformer; -import org.apache.tsfile.block.column.Column; -import org.apache.tsfile.block.column.ColumnBuilder; import org.apache.tsfile.common.conf.TSFileConfig; import org.apache.tsfile.read.common.type.Type; +import org.apache.tsfile.utils.Binary; -public class LengthColumnTransformer extends UnaryColumnTransformer { - +public class LengthColumnTransformer extends AbstractLengthColumnTransformer { public LengthColumnTransformer(Type returnType, ColumnTransformer childColumnTransformer) { super(returnType, childColumnTransformer); } @Override - protected void doTransform(Column column, ColumnBuilder columnBuilder) { - for (int i = 0, n = column.getPositionCount(); i < n; i++) { - if (!column.isNull(i)) { - String currentValue = column.getBinary(i).getStringValue(TSFileConfig.STRING_CHARSET); - columnBuilder.writeInt(currentValue.length()); - } else { - columnBuilder.appendNull(); - } - } - } - - @Override - protected void doTransform(Column column, ColumnBuilder columnBuilder, boolean[] selection) { - for (int i = 0, n = column.getPositionCount(); i < n; i++) { - if (selection[i] && !column.isNull(i)) { - String currentValue = column.getBinary(i).getStringValue(TSFileConfig.STRING_CHARSET); - columnBuilder.writeInt(currentValue.length()); - } else { - columnBuilder.appendNull(); - } - } + protected long transformNonNullValue(Binary binary) { + return binary.getStringValue(TSFileConfig.STRING_CHARSET).length(); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ObjectLengthColumnTransformer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ObjectLengthColumnTransformer.java new file mode 100644 index 0000000000000..5d39c6f6af3ca --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ObjectLengthColumnTransformer.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar; + +import org.apache.iotdb.db.queryengine.transformation.dag.column.ColumnTransformer; +import org.apache.iotdb.db.utils.ObjectTypeUtils; + +import org.apache.tsfile.read.common.type.Type; +import org.apache.tsfile.utils.Binary; + +public class ObjectLengthColumnTransformer extends AbstractLengthColumnTransformer { + public ObjectLengthColumnTransformer(Type returnType, ColumnTransformer childColumnTransformer) { + super(returnType, childColumnTransformer); + } + + @Override + protected long transformNonNullValue(Binary binary) { + return ObjectTypeUtils.getObjectLength(binary); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ReadObjectColumnTransformer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ReadObjectColumnTransformer.java index 9504c6c2282df..8c791191b42ac 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ReadObjectColumnTransformer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ReadObjectColumnTransformer.java @@ -19,25 +19,18 @@ package org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar; -import org.apache.iotdb.commons.exception.IoTDBRuntimeException; -import org.apache.iotdb.db.exception.sql.SemanticException; import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext; import org.apache.iotdb.db.queryengine.transformation.dag.column.ColumnTransformer; import org.apache.iotdb.db.queryengine.transformation.dag.column.unary.UnaryColumnTransformer; import org.apache.iotdb.db.utils.ObjectTypeUtils; -import org.apache.iotdb.rpc.TSStatusCode; import org.apache.tsfile.block.column.Column; import org.apache.tsfile.block.column.ColumnBuilder; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.read.common.type.Type; import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.utils.Pair; -import java.io.File; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; -import java.nio.file.StandardOpenOption; import java.util.Optional; public class ReadObjectColumnTransformer extends UnaryColumnTransformer { @@ -107,35 +100,14 @@ private void transform(Column column, ColumnBuilder columnBuilder, int i) { } private Binary readObject(Binary binary) { - File file = ObjectTypeUtils.getObjectPathFromBinary(binary); - long actualReadSize = getActualReadSize(file); + Pair ObjectLengthPathPair = ObjectTypeUtils.parseObjectBinary(binary); + long fileLength = ObjectLengthPathPair.getLeft(); + String relativePath = ObjectLengthPathPair.getRight(); + int actualReadSize = + ObjectTypeUtils.getActualReadSize(relativePath, fileLength, offset, length); fragmentInstanceContext.ifPresent( context -> context.getMemoryReservationContext().reserveMemoryCumulatively(actualReadSize)); - byte[] bytes = new byte[(int) actualReadSize]; - ByteBuffer buffer = ByteBuffer.wrap(bytes); - try (FileChannel fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.READ)) { - fileChannel.read(buffer, offset); - } catch (IOException e) { - throw new IoTDBRuntimeException(e, TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()); - } - return new Binary(bytes); - } - - private long getActualReadSize(File file) { - long fileSize = file.length(); - if (offset >= fileSize) { - throw new SemanticException( - String.format( - "offset %d is greater than object size %d, file path is %s", - offset, fileSize, file.getAbsolutePath())); - } - long actualReadSize = Math.min(length < 0 ? fileSize : length, fileSize - offset); - if (actualReadSize > Integer.MAX_VALUE) { - throw new SemanticException( - String.format( - "Read object size %s is too large (size > 2G), file path is %s", - actualReadSize, file.getAbsolutePath())); - } - return actualReadSize; + return new Binary( + ObjectTypeUtils.readObjectContent(relativePath, offset, actualReadSize, true).array()); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/TryCastFunctionColumnTransformer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/TryCastFunctionColumnTransformer.java index c25fd321c3445..419d1bbabf3be 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/TryCastFunctionColumnTransformer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/TryCastFunctionColumnTransformer.java @@ -69,6 +69,9 @@ protected void transform( case BLOB: castBlob(columnBuilder, childType.getBinary(column, i)); break; + case OBJECT: + castObject(columnBuilder, childType.getBinary(column, i)); + break; default: throw new UnsupportedOperationException( String.format( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java index c153061a90d62..af69458588317 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java @@ -19,18 +19,38 @@ package org.apache.iotdb.db.utils; +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; +import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; +import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.commons.client.exception.ClientManagerException; +import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient; +import org.apache.iotdb.commons.exception.IoTDBRuntimeException; import org.apache.iotdb.commons.exception.ObjectFileNotExist; +import org.apache.iotdb.db.exception.sql.SemanticException; +import org.apache.iotdb.db.queryengine.plan.Coordinator; +import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher; import org.apache.iotdb.db.service.metrics.FileMetrics; import org.apache.iotdb.db.storageengine.rescon.disk.TierManager; +import org.apache.iotdb.mpp.rpc.thrift.TReadObjectReq; +import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.thrift.TException; import org.apache.tsfile.common.conf.TSFileConfig; import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.utils.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; import java.nio.file.Files; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; +import java.util.Collections; +import java.util.List; import java.util.Optional; public class ObjectTypeUtils { @@ -40,6 +60,110 @@ public class ObjectTypeUtils { private ObjectTypeUtils() {} + public static ByteBuffer readObjectContent( + Binary binary, long offset, long length, boolean mayNotInCurrentNode) { + Pair ObjectLengthPathPair = ObjectTypeUtils.parseObjectBinary(binary); + long fileLength = ObjectLengthPathPair.getLeft(); + length = length < 0 ? fileLength : length; + String relativePath = ObjectLengthPathPair.getRight(); + int actualReadSize = + ObjectTypeUtils.getActualReadSize(relativePath, fileLength, offset, length); + return ObjectTypeUtils.readObjectContent( + relativePath, offset, actualReadSize, mayNotInCurrentNode); + } + + public static ByteBuffer readObjectContent( + String relativePath, long offset, long readSize, boolean mayNotInCurrentNode) { + Optional objectFile = TIER_MANAGER.getAbsoluteObjectFilePath(relativePath, false); + if (objectFile.isPresent()) { + return readObjectContentFromLocalFile(objectFile.get(), offset, readSize); + } + if (mayNotInCurrentNode) { + return readObjectContentFromRemoteFile(relativePath, offset, readSize); + } + throw new ObjectFileNotExist(relativePath); + } + + private static ByteBuffer readObjectContentFromLocalFile(File file, long offset, long readSize) { + byte[] bytes = new byte[(int) readSize]; + ByteBuffer buffer = ByteBuffer.wrap(bytes); + try (FileChannel fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.READ)) { + fileChannel.read(buffer, offset); + } catch (IOException e) { + throw new IoTDBRuntimeException(e, TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()); + } + buffer.flip(); + return buffer; + } + + private static ByteBuffer readObjectContentFromRemoteFile( + String relativePath, long offset, long readSize) { + byte[] bytes = new byte[(int) readSize]; + ByteBuffer buffer = ByteBuffer.wrap(bytes); + TConsensusGroupId consensusGroupId = + new TConsensusGroupId( + TConsensusGroupType.DataRegion, + Integer.parseInt(Paths.get(relativePath).getName(0).toString())); + List regionReplicaSetList = + ClusterPartitionFetcher.getInstance() + .getRegionReplicaSet(Collections.singletonList(consensusGroupId)); + TRegionReplicaSet regionReplicaSet = regionReplicaSetList.iterator().next(); + final int batchSize = 1024 * 1024 * 1024; + final TReadObjectReq req = new TReadObjectReq(); + req.setRelativePath(relativePath); + for (TDataNodeLocation dataNodeLocation : regionReplicaSet.getDataNodeLocations()) { + try (SyncDataNodeInternalServiceClient client = + Coordinator.getInstance() + .getInternalServiceClientManager() + .borrowClient(dataNodeLocation.getInternalEndPoint())) { + while (readSize > 0) { + req.setOffset(offset + buffer.position()); + req.setSize(Math.min(readSize, batchSize)); + readSize -= req.getSize(); + ByteBuffer partial = client.readObject(req); + buffer.put(partial); + } + } catch (ClientManagerException | TException e) { + logger.error(e.getMessage(), e); + throw new IoTDBRuntimeException(e, TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()); + } + } + buffer.flip(); + return buffer; + } + + public static int getActualReadSize(String filePath, long fileSize, long offset, long length) { + if (offset >= fileSize) { + throw new SemanticException( + String.format( + "offset %d is greater than object size %d, file path is %s", + offset, fileSize, filePath)); + } + long actualReadSize = Math.min(length < 0 ? fileSize : length, fileSize - offset); + if (actualReadSize > Integer.MAX_VALUE) { + throw new SemanticException( + String.format( + "Read object size %s is too large (size > 2G), file path is %s", + actualReadSize, filePath)); + } + return (int) actualReadSize; + } + + public static Pair parseObjectBinary(Binary binary) { + byte[] bytes = binary.getValues(); + ByteBuffer buffer = ByteBuffer.wrap(bytes); + long length = buffer.getLong(); + String relativeObjectFilePath = + new String(bytes, 8, bytes.length - 8, TSFileConfig.STRING_CHARSET); + return new Pair<>(length, relativeObjectFilePath); + } + + public static long getObjectLength(Binary binary) { + byte[] bytes = binary.getValues(); + ByteBuffer wrap = ByteBuffer.wrap(bytes); + return wrap.getLong(); + } + public static File getObjectPathFromBinary(Binary binary) { byte[] bytes = binary.getValues(); String relativeObjectFilePath = diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/object/ObjectTypeCompactionTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/object/ObjectTypeCompactionTest.java new file mode 100644 index 0000000000000..4487e8b704df3 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/object/ObjectTypeCompactionTest.java @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.compaction.object; + +import org.apache.iotdb.commons.exception.MetadataException; +import org.apache.iotdb.commons.schema.table.TsTable; +import org.apache.iotdb.commons.schema.table.column.FieldColumnSchema; +import org.apache.iotdb.commons.schema.table.column.TagColumnSchema; +import org.apache.iotdb.db.exception.DiskSpaceInsufficientException; +import org.apache.iotdb.db.exception.StorageEngineException; +import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache; +import org.apache.iotdb.db.storageengine.dataregion.compaction.AbstractCompactionTest; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.ReadChunkCompactionPerformer; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.ReadPointCompactionPerformer; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.CrossSpaceCompactionTask; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.InnerSpaceCompactionTask; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.SettleCompactionTask; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus; +import org.apache.iotdb.db.storageengine.rescon.disk.TierManager; + +import org.apache.tsfile.enums.ColumnCategory; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.exception.write.WriteProcessException; +import org.apache.tsfile.file.metadata.ColumnSchema; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.StringArrayDeviceID; +import org.apache.tsfile.file.metadata.TableSchema; +import org.apache.tsfile.file.metadata.enums.CompressionType; +import org.apache.tsfile.file.metadata.enums.TSEncoding; +import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.utils.BytesUtils; +import org.apache.tsfile.utils.Pair; +import org.apache.tsfile.write.chunk.AlignedChunkWriterImpl; +import org.apache.tsfile.write.schema.MeasurementSchema; +import org.apache.tsfile.write.writer.TsFileIOWriter; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collections; + +public class ObjectTypeCompactionTest extends AbstractCompactionTest { + + private static final TableSchema tableSchema = + new TableSchema( + "t1", + Arrays.asList( + new ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG), + new ColumnSchema("s1", TSDataType.OBJECT, ColumnCategory.FIELD))); + + private String threadName; + private File objectDir; + + @Before + public void setUp() + throws IOException, WriteProcessException, MetadataException, InterruptedException { + this.threadName = Thread.currentThread().getName(); + Thread.currentThread().setName("pool-1-IoTDB-Compaction-Worker-1"); + DataNodeTableCache.getInstance().invalid(this.COMPACTION_TEST_SG); + createTable("t1", 1); + super.setUp(); + try { + objectDir = new File(TierManager.getInstance().getNextFolderForObjectFile()); + } catch (DiskSpaceInsufficientException e) { + throw new RuntimeException(e); + } + } + + @After + public void tearDown() throws IOException, StorageEngineException { + super.tearDown(); + Thread.currentThread().setName(threadName); + DataNodeTableCache.getInstance().invalid(this.COMPACTION_TEST_SG); + File[] files = objectDir.listFiles(); + if (files != null) { + for (File file : files) { + Files.delete(file.toPath()); + } + } + } + + public void createTable(String tableName, long ttl) { + TsTable tsTable = new TsTable(tableName); + tsTable.addColumnSchema(new TagColumnSchema("device", TSDataType.STRING)); + tsTable.addColumnSchema( + new FieldColumnSchema("s1", TSDataType.OBJECT, TSEncoding.PLAIN, CompressionType.LZ4)); + tsTable.addProp(TsTable.TTL_PROPERTY, ttl + ""); + DataNodeTableCache.getInstance().preUpdateTable(this.COMPACTION_TEST_SG, tsTable, null); + DataNodeTableCache.getInstance().commitUpdateTable(this.COMPACTION_TEST_SG, tableName, null); + } + + @Test + public void testSeqCompactionWithTTL() throws IOException, WriteProcessException { + Pair pair1 = + generateTsFileAndObject(true, System.currentTimeMillis() - 10000); + Pair pair2 = + generateTsFileAndObject(true, System.currentTimeMillis() + 1000000); + tsFileManager.add(pair1.getLeft(), true); + tsFileManager.add(pair2.getLeft(), true); + InnerSpaceCompactionTask task = + new InnerSpaceCompactionTask( + 0, + tsFileManager, + tsFileManager.getTsFileList(true), + true, + new ReadChunkCompactionPerformer(), + 0); + Assert.assertTrue(task.start()); + Assert.assertFalse(pair1.getRight().exists()); + Assert.assertTrue(pair2.getRight().exists()); + } + + @Test + public void testUnseqCompactionWithTTL() throws IOException, WriteProcessException { + Pair pair1 = + generateTsFileAndObject(false, System.currentTimeMillis() + 100000); + Pair pair2 = + generateTsFileAndObject(false, System.currentTimeMillis() - 1000000); + tsFileManager.add(pair1.getLeft(), false); + tsFileManager.add(pair2.getLeft(), false); + InnerSpaceCompactionTask task = + new InnerSpaceCompactionTask( + 0, + tsFileManager, + tsFileManager.getTsFileList(false), + false, + new FastCompactionPerformer(false), + 0); + Assert.assertTrue(task.start()); + Assert.assertFalse(pair2.getRight().exists()); + Assert.assertTrue(pair1.getRight().exists()); + } + + @Test + public void testUnseqCompactionWithReadPointWithTTL() throws IOException, WriteProcessException { + Pair pair1 = + generateTsFileAndObject(false, System.currentTimeMillis() + 100000); + Pair pair2 = + generateTsFileAndObject(false, System.currentTimeMillis() - 1000000); + tsFileManager.add(pair1.getLeft(), false); + tsFileManager.add(pair2.getLeft(), false); + InnerSpaceCompactionTask task = + new InnerSpaceCompactionTask( + 0, + tsFileManager, + tsFileManager.getTsFileList(false), + false, + new ReadPointCompactionPerformer(), + 0); + Assert.assertTrue(task.start()); + Assert.assertTrue(pair1.getRight().exists()); + Assert.assertFalse(pair2.getRight().exists()); + } + + @Test + public void testCrossCompactionWithTTL() throws IOException, WriteProcessException { + Pair pair1 = + generateTsFileAndObject(true, System.currentTimeMillis() + 100000); + Pair pair2 = + generateTsFileAndObject(false, System.currentTimeMillis() - 1000000); + tsFileManager.add(pair1.getLeft(), true); + tsFileManager.add(pair2.getLeft(), false); + CrossSpaceCompactionTask task = + new CrossSpaceCompactionTask( + 0, + tsFileManager, + tsFileManager.getTsFileList(true), + tsFileManager.getTsFileList(false), + new FastCompactionPerformer(true), + 1, + 0); + Assert.assertTrue(task.start()); + Assert.assertFalse(pair2.getRight().exists()); + Assert.assertTrue(pair1.getRight().exists()); + } + + @Test + public void testSettleCompaction() throws IOException, WriteProcessException { + Pair pair1 = + generateTsFileAndObject(true, System.currentTimeMillis() - 10000); + Pair pair2 = + generateTsFileAndObject(true, System.currentTimeMillis() + 1000000); + tsFileManager.add(pair1.getLeft(), true); + tsFileManager.add(pair2.getLeft(), true); + SettleCompactionTask task = + new SettleCompactionTask( + 0, + tsFileManager, + tsFileManager.getTsFileList(true), + Collections.emptyList(), + true, + new FastCompactionPerformer(true), + 0); + Assert.assertTrue(task.start()); + Assert.assertFalse(pair1.getRight().exists()); + Assert.assertTrue(pair2.getRight().exists()); + } + + private Pair generateTsFileAndObject(boolean seq, long timestamp) + throws IOException, WriteProcessException { + TsFileResource resource = createEmptyFileAndResource(seq); + Path testFile1 = Files.createTempFile(objectDir.toPath(), "test_", ".bin"); + byte[] content = new byte[100]; + for (int i = 0; i < 100; i++) { + content[i] = (byte) i; + } + Files.write(testFile1, content); + String relativePath = testFile1.toFile().getName(); + ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES + relativePath.length()); + buffer.putLong(100L); + buffer.put(BytesUtils.stringToBytes(relativePath)); + buffer.flip(); + IDeviceID deviceID = new StringArrayDeviceID("t1", "d1"); + try (TsFileIOWriter writer = new TsFileIOWriter(resource.getTsFile())) { + writer.getSchema().registerTableSchema(tableSchema); + writer.startChunkGroup(deviceID); + AlignedChunkWriterImpl alignedChunkWriter = + new AlignedChunkWriterImpl(Arrays.asList(new MeasurementSchema("s1", TSDataType.OBJECT))); + alignedChunkWriter.write(timestamp); + alignedChunkWriter.write(timestamp, new Binary(buffer.array()), false); + alignedChunkWriter.sealCurrentPage(); + alignedChunkWriter.writeToFileWriter(writer); + writer.endChunkGroup(); + writer.endFile(); + } + resource.updateStartTime(deviceID, 1); + resource.updateEndTime(deviceID, 1); + resource.serialize(); + resource.deserialize(); + resource.setStatus(TsFileResourceStatus.NORMAL); + return new Pair<>(resource, testFile1.toFile()); + } +} diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift index caaf44c16a770..64de32597c1e5 100644 --- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift +++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift @@ -773,6 +773,12 @@ struct TKillQueryInstanceReq { 2: optional string allowedUsername } +struct TReadObjectReq { + 1: string relativePath + 2: optional i64 offset + 3: optional i64 size +} + /** * END: Used for EXPLAIN ANALYZE **/ @@ -1257,6 +1263,8 @@ service IDataNodeRPCService { * Write an audit log entry to the DataNode's AuditEventLogger */ common.TSStatus writeAuditLog(TAuditLogReq req); + + binary readObject(TReadObjectReq req); } service MPPDataExchangeService { From fc8344fe590f7380968a377219b2bf1e1a3bb8b4 Mon Sep 17 00:00:00 2001 From: shuwenwei Date: Thu, 27 Nov 2025 18:53:12 +0800 Subject: [PATCH 2/4] fix it --- .../relational/it/query/recent/IoTDBObjectTypeQueryIT.java | 7 ------- 1 file changed, 7 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBObjectTypeQueryIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBObjectTypeQueryIT.java index 45b32a57b07c3..286bf91a53c4d 100644 --- a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBObjectTypeQueryIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBObjectTypeQueryIT.java @@ -69,13 +69,6 @@ public static void setUp() throws Exception { "X'cafebabe'", j, "to_object(true, 0, X'cafebabe')")); - if (i == 10 && j == 9) { - statement.execute( - String.format( - "insert into table1(time, device, s4, s5, s6, s7, s8) " - + "values(%d, '%s', '%s', %d, %s, '%s', %s)", - j, "d" + i, LocalDate.of(2024, 5, i % 31 + 1), j, "X'cafebabe'", j, "null")); - } } } } From 58bd35966a264b669fd9f68b524d94a153b33fb4 Mon Sep 17 00:00:00 2001 From: shuwenwei Date: Thu, 27 Nov 2025 19:07:26 +0800 Subject: [PATCH 3/4] fix bug --- .../iotdb/db/utils/ObjectTypeUtils.java | 29 ++++++++++++------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java index af69458588317..0a4179994a5a3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java @@ -23,7 +23,6 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; -import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient; import org.apache.iotdb.commons.exception.IoTDBRuntimeException; import org.apache.iotdb.commons.exception.ObjectFileNotExist; @@ -35,7 +34,6 @@ import org.apache.iotdb.mpp.rpc.thrift.TReadObjectReq; import org.apache.iotdb.rpc.TSStatusCode; -import org.apache.thrift.TException; import org.apache.tsfile.common.conf.TSFileConfig; import org.apache.tsfile.utils.Binary; import org.apache.tsfile.utils.Pair; @@ -97,7 +95,7 @@ private static ByteBuffer readObjectContentFromLocalFile(File file, long offset, } private static ByteBuffer readObjectContentFromRemoteFile( - String relativePath, long offset, long readSize) { + final String relativePath, final long offset, final long readSize) { byte[] bytes = new byte[(int) readSize]; ByteBuffer buffer = ByteBuffer.wrap(bytes); TConsensusGroupId consensusGroupId = @@ -108,25 +106,34 @@ private static ByteBuffer readObjectContentFromRemoteFile( ClusterPartitionFetcher.getInstance() .getRegionReplicaSet(Collections.singletonList(consensusGroupId)); TRegionReplicaSet regionReplicaSet = regionReplicaSetList.iterator().next(); - final int batchSize = 1024 * 1024 * 1024; + final int batchSize = 1024 * 1024; final TReadObjectReq req = new TReadObjectReq(); req.setRelativePath(relativePath); - for (TDataNodeLocation dataNodeLocation : regionReplicaSet.getDataNodeLocations()) { + for (int i = 0; i < regionReplicaSet.getDataNodeLocations().size(); i++) { + TDataNodeLocation dataNodeLocation = regionReplicaSet.getDataNodeLocations().get(i); + long toReadSizeInCurrentDataNode = readSize; try (SyncDataNodeInternalServiceClient client = Coordinator.getInstance() .getInternalServiceClientManager() .borrowClient(dataNodeLocation.getInternalEndPoint())) { - while (readSize > 0) { + while (toReadSizeInCurrentDataNode > 0) { req.setOffset(offset + buffer.position()); - req.setSize(Math.min(readSize, batchSize)); - readSize -= req.getSize(); + req.setSize(Math.min(toReadSizeInCurrentDataNode, batchSize)); + toReadSizeInCurrentDataNode -= req.getSize(); ByteBuffer partial = client.readObject(req); buffer.put(partial); } - } catch (ClientManagerException | TException e) { - logger.error(e.getMessage(), e); - throw new IoTDBRuntimeException(e, TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()); + } catch (Exception e) { + logger.error( + "Failed to read object from datanode: {}" + e.getMessage(), dataNodeLocation, e); + if (i == regionReplicaSet.getDataNodeLocations().size() - 1) { + throw new IoTDBRuntimeException(e, TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()); + } + buffer.clear(); + req.setOffset(offset); + continue; } + break; } buffer.flip(); return buffer; From 322d75369a109236315c720ffd92d68042d60676 Mon Sep 17 00:00:00 2001 From: shuwenwei Date: Thu, 27 Nov 2025 19:19:37 +0800 Subject: [PATCH 4/4] fix --- .../iotdb/udf/api/relational/access/Record.java | 2 +- .../operator/process/function/partition/Slice.java | 2 +- .../relational/aggregation/RecordIterator.java | 2 +- .../org/apache/iotdb/db/utils/ObjectTypeUtils.java | 14 +++++++------- .../src/main/thrift/datanode.thrift | 4 ++-- 5 files changed, 12 insertions(+), 12 deletions(-) diff --git a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/access/Record.java b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/access/Record.java index c4baa5fc5c8e2..4267f41dc8d8d 100644 --- a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/access/Record.java +++ b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/access/Record.java @@ -142,7 +142,7 @@ public interface Record { * binary data from offset. * @return the Binary content of the object segment at the specified column */ - Binary readObject(int columnIndex, long offset, long length); + Binary readObject(int columnIndex, long offset, int length); /** * Returns the actual data type of the value at the specified column in this row. diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/Slice.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/Slice.java index 5e09ca5ff26d5..8faea51b829a4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/Slice.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/Slice.java @@ -188,7 +188,7 @@ public Object getObject(int columnIndex) { } @Override - public Binary readObject(int columnIndex, long offset, long length) { + public Binary readObject(int columnIndex, long offset, int length) { if (getDataType(columnIndex) == Type.OBJECT) { throw new UnsupportedOperationException("current column is not object column"); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/RecordIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/RecordIterator.java index f2f306f2bc2d8..34bc9d98e9c1b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/RecordIterator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/RecordIterator.java @@ -140,7 +140,7 @@ public Object getObject(int columnIndex) { } @Override - public Binary readObject(int columnIndex, long offset, long length) { + public Binary readObject(int columnIndex, long offset, int length) { if (getDataType(columnIndex) == Type.OBJECT) { throw new UnsupportedOperationException("current column is not object column"); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java index 0a4179994a5a3..058213450363e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java @@ -59,19 +59,19 @@ public class ObjectTypeUtils { private ObjectTypeUtils() {} public static ByteBuffer readObjectContent( - Binary binary, long offset, long length, boolean mayNotInCurrentNode) { + Binary binary, long offset, int length, boolean mayNotInCurrentNode) { Pair ObjectLengthPathPair = ObjectTypeUtils.parseObjectBinary(binary); long fileLength = ObjectLengthPathPair.getLeft(); - length = length < 0 ? fileLength : length; String relativePath = ObjectLengthPathPair.getRight(); int actualReadSize = - ObjectTypeUtils.getActualReadSize(relativePath, fileLength, offset, length); + ObjectTypeUtils.getActualReadSize( + relativePath, fileLength, offset, length < 0 ? fileLength : length); return ObjectTypeUtils.readObjectContent( relativePath, offset, actualReadSize, mayNotInCurrentNode); } public static ByteBuffer readObjectContent( - String relativePath, long offset, long readSize, boolean mayNotInCurrentNode) { + String relativePath, long offset, int readSize, boolean mayNotInCurrentNode) { Optional objectFile = TIER_MANAGER.getAbsoluteObjectFilePath(relativePath, false); if (objectFile.isPresent()) { return readObjectContentFromLocalFile(objectFile.get(), offset, readSize); @@ -95,8 +95,8 @@ private static ByteBuffer readObjectContentFromLocalFile(File file, long offset, } private static ByteBuffer readObjectContentFromRemoteFile( - final String relativePath, final long offset, final long readSize) { - byte[] bytes = new byte[(int) readSize]; + final String relativePath, final long offset, final int readSize) { + byte[] bytes = new byte[readSize]; ByteBuffer buffer = ByteBuffer.wrap(bytes); TConsensusGroupId consensusGroupId = new TConsensusGroupId( @@ -111,7 +111,7 @@ private static ByteBuffer readObjectContentFromRemoteFile( req.setRelativePath(relativePath); for (int i = 0; i < regionReplicaSet.getDataNodeLocations().size(); i++) { TDataNodeLocation dataNodeLocation = regionReplicaSet.getDataNodeLocations().get(i); - long toReadSizeInCurrentDataNode = readSize; + int toReadSizeInCurrentDataNode = readSize; try (SyncDataNodeInternalServiceClient client = Coordinator.getInstance() .getInternalServiceClientManager() diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift index 64de32597c1e5..469d2bb006aef 100644 --- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift +++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift @@ -775,8 +775,8 @@ struct TKillQueryInstanceReq { struct TReadObjectReq { 1: string relativePath - 2: optional i64 offset - 3: optional i64 size + 2: i64 offset + 3: i32 size } /**