diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestStatement.java b/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestStatement.java index 3f96fdf1372f..0523a3848289 100644 --- a/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestStatement.java +++ b/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestStatement.java @@ -78,6 +78,13 @@ private void updateConfig(Statement statement, int timeout) throws SQLException statement.setQueryTimeout(timeout); } + /** + * Executes a SQL query on all read statements in parallel. + * + *

Note: For PreparedStatement EXECUTE queries, use the write connection directly instead, + * because PreparedStatements are session-scoped and this method may route queries to different + * nodes where the PreparedStatement doesn't exist. + */ @Override public ResultSet executeQuery(String sql) throws SQLException { return new ClusterTestResultSet(readStatements, readEndpoints, sql, queryTimeout); diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBPreparedStatementIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBPreparedStatementIT.java new file mode 100644 index 000000000000..1852cd0fe3dc --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBPreparedStatementIT.java @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.relational.it.db.it; + +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.TableClusterIT; +import org.apache.iotdb.itbase.category.TableLocalStandaloneIT; +import org.apache.iotdb.itbase.runtime.ClusterTestConnection; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Statement; + +import static org.apache.iotdb.db.it.utils.TestUtils.tableResultSetEqualTest; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +@RunWith(IoTDBTestRunner.class) +@Category({TableLocalStandaloneIT.class, TableClusterIT.class}) +public class IoTDBPreparedStatementIT { + private static final String DATABASE_NAME = "test"; + private static final String[] sqls = + new String[] { + "CREATE DATABASE " + DATABASE_NAME, + "USE " + DATABASE_NAME, + "CREATE TABLE test_table(id INT64 FIELD, name STRING FIELD, value DOUBLE FIELD)", + "INSERT INTO test_table VALUES (2025-01-01T00:00:00, 1, 'Alice', 100.5)", + "INSERT INTO test_table VALUES (2025-01-01T00:01:00, 2, 'Bob', 200.3)", + "INSERT INTO test_table VALUES (2025-01-01T00:02:00, 3, 'Charlie', 300.7)", + "INSERT INTO test_table VALUES (2025-01-01T00:03:00, 4, 'David', 400.2)", + "INSERT INTO test_table VALUES (2025-01-01T00:04:00, 5, 'Eve', 500.9)", + }; + + protected static void insertData() { + try (Connection connection = EnvFactory.getEnv().getTableConnection(); + Statement statement = connection.createStatement()) { + for (String sql : sqls) { + statement.execute(sql); + } + } catch (Exception e) { + fail("insertData failed: " + e.getMessage()); + } + } + + @BeforeClass + public static void setUp() { + EnvFactory.getEnv().initClusterEnvironment(); + insertData(); + } + + @AfterClass + public static void tearDown() { + EnvFactory.getEnv().cleanClusterEnvironment(); + } + + /** + * Execute a prepared statement query and verify the result. For PreparedStatement EXECUTE + * queries, use the write connection directly instead of tableResultSetEqualTest, because + * PreparedStatements are session-scoped and tableResultSetEqualTest may route queries to + * different nodes where the PreparedStatement doesn't exist. + */ + private static void executePreparedStatementAndVerify( + Connection connection, + Statement statement, + String executeSql, + String[] expectedHeader, + String[] expectedRetArray) + throws SQLException { + // Execute with parameters using write connection directly + // In cluster test, we need to use write connection to ensure same session + Statement writeStatement; + if (connection instanceof ClusterTestConnection) { + // Use write connection directly for PreparedStatement queries + writeStatement = + ((ClusterTestConnection) connection) + .writeConnection + .getUnderlyingConnection() + .createStatement(); + } else { + writeStatement = statement; + } + + try (ResultSet resultSet = writeStatement.executeQuery(executeSql)) { + ResultSetMetaData metaData = resultSet.getMetaData(); + + // Verify header + assertEquals(expectedHeader.length, metaData.getColumnCount()); + for (int i = 1; i <= metaData.getColumnCount(); i++) { + assertEquals(expectedHeader[i - 1], metaData.getColumnName(i)); + } + + // Verify data + int cnt = 0; + while (resultSet.next()) { + StringBuilder builder = new StringBuilder(); + for (int i = 1; i <= expectedHeader.length; i++) { + builder.append(resultSet.getString(i)).append(","); + } + assertEquals(expectedRetArray[cnt], builder.toString()); + cnt++; + } + assertEquals(expectedRetArray.length, cnt); + } + } + + @Test + public void testPrepareAndExecute() { + String[] expectedHeader = new String[] {"time", "id", "name", "value"}; + String[] retArray = new String[] {"2025-01-01T00:01:00.000Z,2,Bob,200.3,"}; + try (Connection connection = EnvFactory.getEnv().getTableConnection(); + Statement statement = connection.createStatement()) { + statement.execute("USE " + DATABASE_NAME); + // Prepare a statement + statement.execute("PREPARE stmt1 FROM SELECT * FROM test_table WHERE id = ?"); + // Execute with parameter using write connection directly + executePreparedStatementAndVerify( + connection, statement, "EXECUTE stmt1 USING 2", expectedHeader, retArray); + // Deallocate + statement.execute("DEALLOCATE PREPARE stmt1"); + } catch (SQLException e) { + fail("testPrepareAndExecute failed: " + e.getMessage()); + } + } + + @Test + public void testPrepareAndExecuteMultipleTimes() { + String[] expectedHeader = new String[] {"time", "id", "name", "value"}; + String[] retArray1 = new String[] {"2025-01-01T00:00:00.000Z,1,Alice,100.5,"}; + String[] retArray2 = new String[] {"2025-01-01T00:02:00.000Z,3,Charlie,300.7,"}; + try (Connection connection = EnvFactory.getEnv().getTableConnection(); + Statement statement = connection.createStatement()) { + statement.execute("USE " + DATABASE_NAME); + // Prepare a statement + statement.execute("PREPARE stmt2 FROM SELECT * FROM test_table WHERE id = ?"); + // Execute multiple times with different parameters using write connection directly + executePreparedStatementAndVerify( + connection, statement, "EXECUTE stmt2 USING 1", expectedHeader, retArray1); + executePreparedStatementAndVerify( + connection, statement, "EXECUTE stmt2 USING 3", expectedHeader, retArray2); + // Deallocate + statement.execute("DEALLOCATE PREPARE stmt2"); + } catch (SQLException e) { + fail("testPrepareAndExecuteMultipleTimes failed: " + e.getMessage()); + } + } + + @Test + public void testPrepareWithMultipleParameters() { + String[] expectedHeader = new String[] {"time", "id", "name", "value"}; + String[] retArray = new String[] {"2025-01-01T00:01:00.000Z,2,Bob,200.3,"}; + try (Connection connection = EnvFactory.getEnv().getTableConnection(); + Statement statement = connection.createStatement()) { + statement.execute("USE " + DATABASE_NAME); + // Prepare a statement with multiple parameters + statement.execute("PREPARE stmt3 FROM SELECT * FROM test_table WHERE id = ? AND value > ?"); + // Execute with multiple parameters using write connection directly + executePreparedStatementAndVerify( + connection, statement, "EXECUTE stmt3 USING 2, 150.0", expectedHeader, retArray); + // Deallocate + statement.execute("DEALLOCATE PREPARE stmt3"); + } catch (SQLException e) { + fail("testPrepareWithMultipleParameters failed: " + e.getMessage()); + } + } + + @Test + public void testExecuteImmediate() { + String[] expectedHeader = new String[] {"time", "id", "name", "value"}; + String[] retArray = new String[] {"2025-01-01T00:03:00.000Z,4,David,400.2,"}; + try (Connection connection = EnvFactory.getEnv().getTableConnection(); + Statement statement = connection.createStatement()) { + statement.execute("USE " + DATABASE_NAME); + // Execute immediate with SQL string and parameters + tableResultSetEqualTest( + "EXECUTE IMMEDIATE 'SELECT * FROM test_table WHERE id = ?' USING 4", + expectedHeader, + retArray, + DATABASE_NAME); + } catch (SQLException e) { + fail("testExecuteImmediate failed: " + e.getMessage()); + } + } + + @Test + public void testExecuteImmediateWithoutParameters() { + String[] expectedHeader = new String[] {"_col0"}; + String[] retArray = new String[] {"5,"}; + try (Connection connection = EnvFactory.getEnv().getTableConnection(); + Statement statement = connection.createStatement()) { + statement.execute("USE " + DATABASE_NAME); + // Execute immediate without parameters + tableResultSetEqualTest( + "EXECUTE IMMEDIATE 'SELECT COUNT(*) FROM test_table'", + expectedHeader, + retArray, + DATABASE_NAME); + } catch (SQLException e) { + fail("testExecuteImmediateWithoutParameters failed: " + e.getMessage()); + } + } + + @Test + public void testExecuteImmediateWithMultipleParameters() { + String[] expectedHeader = new String[] {"time", "id", "name", "value"}; + String[] retArray = new String[] {"2025-01-01T00:04:00.000Z,5,Eve,500.9,"}; + try (Connection connection = EnvFactory.getEnv().getTableConnection(); + Statement statement = connection.createStatement()) { + statement.execute("USE " + DATABASE_NAME); + // Execute immediate with multiple parameters + tableResultSetEqualTest( + "EXECUTE IMMEDIATE 'SELECT * FROM test_table WHERE id = ? AND value > ?' USING 5, 450.0", + expectedHeader, + retArray, + DATABASE_NAME); + } catch (SQLException e) { + fail("testExecuteImmediateWithMultipleParameters failed: " + e.getMessage()); + } + } + + @Test + public void testDeallocateNonExistentStatement() { + try (Connection connection = EnvFactory.getEnv().getTableConnection(); + Statement statement = connection.createStatement()) { + statement.execute("USE " + DATABASE_NAME); + // Try to deallocate a non-existent statement + SQLException exception = + assertThrows( + SQLException.class, () -> statement.execute("DEALLOCATE PREPARE non_existent_stmt")); + assertTrue( + exception.getMessage().contains("does not exist") + || exception.getMessage().contains("Prepared statement")); + } catch (SQLException e) { + fail("testDeallocateNonExistentStatement failed: " + e.getMessage()); + } + } + + @Test + public void testExecuteNonExistentStatement() { + try (Connection connection = EnvFactory.getEnv().getTableConnection(); + Statement statement = connection.createStatement()) { + statement.execute("USE " + DATABASE_NAME); + // Try to execute a non-existent statement + SQLException exception = + assertThrows( + SQLException.class, () -> statement.execute("EXECUTE non_existent_stmt USING 1")); + assertTrue( + exception.getMessage().contains("does not exist") + || exception.getMessage().contains("Prepared statement")); + } catch (SQLException e) { + fail("testExecuteNonExistentStatement failed: " + e.getMessage()); + } + } + + @Test + public void testMultiplePreparedStatements() { + String[] expectedHeader1 = new String[] {"time", "id", "name", "value"}; + String[] retArray1 = new String[] {"2025-01-01T00:00:00.000Z,1,Alice,100.5,"}; + String[] expectedHeader2 = new String[] {"_col0"}; + String[] retArray2 = new String[] {"4,"}; + try (Connection connection = EnvFactory.getEnv().getTableConnection(); + Statement statement = connection.createStatement()) { + statement.execute("USE " + DATABASE_NAME); + // Prepare multiple statements + statement.execute("PREPARE stmt4 FROM SELECT * FROM test_table WHERE id = ?"); + statement.execute("PREPARE stmt5 FROM SELECT COUNT(*) FROM test_table WHERE value > ?"); + // Execute both statements using write connection directly + executePreparedStatementAndVerify( + connection, statement, "EXECUTE stmt4 USING 1", expectedHeader1, retArray1); + executePreparedStatementAndVerify( + connection, statement, "EXECUTE stmt5 USING 200.0", expectedHeader2, retArray2); + // Deallocate both + statement.execute("DEALLOCATE PREPARE stmt4"); + statement.execute("DEALLOCATE PREPARE stmt5"); + } catch (SQLException e) { + fail("testMultiplePreparedStatements failed: " + e.getMessage()); + } + } + + @Test + public void testPrepareDuplicateName() { + try (Connection connection = EnvFactory.getEnv().getTableConnection(); + Statement statement = connection.createStatement()) { + statement.execute("USE " + DATABASE_NAME); + // Prepare a statement + statement.execute("PREPARE stmt6 FROM SELECT * FROM test_table WHERE id = ?"); + // Try to prepare another statement with the same name + SQLException exception = + assertThrows( + SQLException.class, + () -> statement.execute("PREPARE stmt6 FROM SELECT * FROM test_table WHERE id = ?")); + assertTrue( + exception.getMessage().contains("already exists") + || exception.getMessage().contains("Prepared statement")); + // Cleanup + statement.execute("DEALLOCATE PREPARE stmt6"); + } catch (SQLException e) { + fail("testPrepareDuplicateName failed: " + e.getMessage()); + } + } + + @Test + public void testPrepareAndExecuteWithAggregation() { + String[] expectedHeader = new String[] {"_col0"}; + String[] retArray = new String[] {"300.40000000000003,"}; + try (Connection connection = EnvFactory.getEnv().getTableConnection(); + Statement statement = connection.createStatement()) { + statement.execute("USE " + DATABASE_NAME); + // Prepare a statement with aggregation + statement.execute( + "PREPARE stmt7 FROM SELECT AVG(value) FROM test_table WHERE id >= ? AND id <= ?"); + // Execute with parameters using write connection directly + executePreparedStatementAndVerify( + connection, statement, "EXECUTE stmt7 USING 2, 4", expectedHeader, retArray); + // Deallocate + statement.execute("DEALLOCATE PREPARE stmt7"); + } catch (SQLException e) { + fail("testPrepareAndExecuteWithAggregation failed: " + e.getMessage()); + } + } + + @Test + public void testPrepareAndExecuteWithStringParameter() { + String[] expectedHeader = new String[] {"time", "id", "name", "value"}; + String[] retArray = new String[] {"2025-01-01T00:02:00.000Z,3,Charlie,300.7,"}; + try (Connection connection = EnvFactory.getEnv().getTableConnection(); + Statement statement = connection.createStatement()) { + statement.execute("USE " + DATABASE_NAME); + // Prepare a statement with string parameter + statement.execute("PREPARE stmt8 FROM SELECT * FROM test_table WHERE name = ?"); + // Execute with string parameter using write connection directly + executePreparedStatementAndVerify( + connection, statement, "EXECUTE stmt8 USING 'Charlie'", expectedHeader, retArray); + // Deallocate + statement.execute("DEALLOCATE PREPARE stmt8"); + } catch (SQLException e) { + fail("testPrepareAndExecuteWithStringParameter failed: " + e.getMessage()); + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/ClientSession.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/ClientSession.java index 6aa862b9242f..ea90fbafeccd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/ClientSession.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/ClientSession.java @@ -33,6 +33,9 @@ public class ClientSession extends IClientSession { private final Map> statementIdToQueryId = new ConcurrentHashMap<>(); + // Map from statement name to PreparedStatementInfo + private final Map preparedStatements = new ConcurrentHashMap<>(); + public ClientSession(Socket clientSocket) { this.clientSocket = clientSocket; } @@ -103,4 +106,24 @@ public static void removeQueryId( } } } + + @Override + public void addPreparedStatement(String statementName, PreparedStatementInfo info) { + preparedStatements.put(statementName, info); + } + + @Override + public PreparedStatementInfo removePreparedStatement(String statementName) { + return preparedStatements.remove(statementName); + } + + @Override + public PreparedStatementInfo getPreparedStatement(String statementName) { + return preparedStatements.get(statementName); + } + + @Override + public Set getPreparedStatementNames() { + return preparedStatements.keySet(); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/IClientSession.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/IClientSession.java index 21d4121ffd3a..6ee522342b7e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/IClientSession.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/IClientSession.java @@ -180,6 +180,39 @@ public void setDatabaseName(@Nullable String databaseName) { this.databaseName = databaseName; } + /** + * Add a prepared statement to this session. + * + * @param statementName the name of the prepared statement + * @param info the prepared statement information + */ + public abstract void addPreparedStatement(String statementName, PreparedStatementInfo info); + + /** + * Remove a prepared statement from this session. + * + * @param statementName the name of the prepared statement + * @return the removed prepared statement info, or null if not found + */ + @Nullable + public abstract PreparedStatementInfo removePreparedStatement(String statementName); + + /** + * Get a prepared statement from this session. + * + * @param statementName the name of the prepared statement + * @return the prepared statement info, or null if not found + */ + @Nullable + public abstract PreparedStatementInfo getPreparedStatement(String statementName); + + /** + * Get all prepared statement names in this session. + * + * @return set of prepared statement names + */ + public abstract Set getPreparedStatementNames(); + public enum SqlDialect { TREE((byte) 0), TABLE((byte) 1); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/InternalClientSession.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/InternalClientSession.java index 3c72d083a8c7..654499e504e2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/InternalClientSession.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/InternalClientSession.java @@ -35,6 +35,9 @@ public class InternalClientSession extends IClientSession { private final Map> statementIdToQueryId = new ConcurrentHashMap<>(); + // Map from statement name to PreparedStatementInfo + private final Map preparedStatements = new ConcurrentHashMap<>(); + public InternalClientSession(String clientID) { this.clientID = clientID; } @@ -88,4 +91,24 @@ public void addQueryId(Long statementId, long queryId) { public void removeQueryId(Long statementId, Long queryId) { ClientSession.removeQueryId(statementIdToQueryId, statementId, queryId); } + + @Override + public void addPreparedStatement(String statementName, PreparedStatementInfo info) { + preparedStatements.put(statementName, info); + } + + @Override + public PreparedStatementInfo removePreparedStatement(String statementName) { + return preparedStatements.remove(statementName); + } + + @Override + public PreparedStatementInfo getPreparedStatement(String statementName) { + return preparedStatements.get(statementName); + } + + @Override + public Set getPreparedStatementNames() { + return preparedStatements.keySet(); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/MqttClientSession.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/MqttClientSession.java index ae9e2cd03616..ca13dbb04985 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/MqttClientSession.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/MqttClientSession.java @@ -22,12 +22,17 @@ import org.apache.iotdb.service.rpc.thrift.TSConnectionType; import java.util.Collections; +import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; public class MqttClientSession extends IClientSession { private final String clientID; + // Map from statement name to PreparedStatementInfo + private final Map preparedStatements = new ConcurrentHashMap<>(); + public MqttClientSession(String clientID) { this.clientID = clientID; } @@ -76,4 +81,24 @@ public void addQueryId(Long statementId, long queryId) { public void removeQueryId(Long statementId, Long queryId) { throw new UnsupportedOperationException(); } + + @Override + public void addPreparedStatement(String statementName, PreparedStatementInfo info) { + preparedStatements.put(statementName, info); + } + + @Override + public PreparedStatementInfo removePreparedStatement(String statementName) { + return preparedStatements.remove(statementName); + } + + @Override + public PreparedStatementInfo getPreparedStatement(String statementName) { + return preparedStatements.get(statementName); + } + + @Override + public Set getPreparedStatementNames() { + return preparedStatements.keySet(); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/PreparedStatementInfo.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/PreparedStatementInfo.java new file mode 100644 index 000000000000..398e5ca6c8d7 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/PreparedStatementInfo.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.protocol.session; + +import org.apache.iotdb.commons.memory.IMemoryBlock; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement; + +import java.util.Objects; + +import static java.util.Objects.requireNonNull; + +/** + * Information about a prepared statement stored in a session. The AST is cached here to avoid + * re-parsing on EXECUTE. + */ +public class PreparedStatementInfo { + + private final String statementName; + private final Statement sql; // Cached AST (contains Parameter nodes) + private final long createTime; + private final IMemoryBlock memoryBlock; // Memory block allocated for this PreparedStatement + + public PreparedStatementInfo(String statementName, Statement sql, IMemoryBlock memoryBlock) { + this.statementName = requireNonNull(statementName, "statementName is null"); + this.sql = requireNonNull(sql, "sql is null"); + this.createTime = System.currentTimeMillis(); + this.memoryBlock = memoryBlock; + } + + public PreparedStatementInfo( + String statementName, Statement sql, long createTime, IMemoryBlock memoryBlock) { + this.statementName = requireNonNull(statementName, "statementName is null"); + this.sql = requireNonNull(sql, "sql is null"); + this.createTime = createTime; + this.memoryBlock = memoryBlock; + } + + public String getStatementName() { + return statementName; + } + + public Statement getSql() { + return sql; + } + + public long getCreateTime() { + return createTime; + } + + public IMemoryBlock getMemoryBlock() { + return memoryBlock; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + PreparedStatementInfo that = (PreparedStatementInfo) o; + return Objects.equals(statementName, that.statementName) && Objects.equals(sql, that.sql); + } + + @Override + public int hashCode() { + return Objects.hash(statementName, sql); + } + + @Override + public String toString() { + return "PreparedStatementInfo{" + + "statementName='" + + statementName + + '\'' + + ", sql=" + + sql + + ", createTime=" + + createTime + + '}'; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/RestClientSession.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/RestClientSession.java index fa830ace3fbc..d122c3c7dc5f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/RestClientSession.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/RestClientSession.java @@ -22,12 +22,17 @@ import org.apache.iotdb.service.rpc.thrift.TSConnectionType; import java.util.Collections; +import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; public class RestClientSession extends IClientSession { private final String clientID; + // Map from statement name to PreparedStatementInfo + private final Map preparedStatements = new ConcurrentHashMap<>(); + public RestClientSession(String clientID) { this.clientID = clientID; } @@ -76,4 +81,24 @@ public void addQueryId(Long statementId, long queryId) { public void removeQueryId(Long statementId, Long queryId) { throw new UnsupportedOperationException(); } + + @Override + public void addPreparedStatement(String statementName, PreparedStatementInfo info) { + preparedStatements.put(statementName, info); + } + + @Override + public PreparedStatementInfo removePreparedStatement(String statementName) { + return preparedStatements.remove(statementName); + } + + @Override + public PreparedStatementInfo getPreparedStatement(String statementName) { + return preparedStatements.get(statementName); + } + + @Override + public Set getPreparedStatementNames() { + return preparedStatements.keySet(); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java index d098cf70223e..1c2018f1c3c3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java @@ -39,6 +39,7 @@ import org.apache.iotdb.db.protocol.basic.BasicOpenSessionResp; import org.apache.iotdb.db.protocol.thrift.OperationType; import org.apache.iotdb.db.queryengine.common.SessionInfo; +import org.apache.iotdb.db.queryengine.plan.execution.config.session.PreparedStatementMemoryManager; import org.apache.iotdb.db.storageengine.dataregion.read.control.QueryResourceManager; import org.apache.iotdb.db.utils.DataNodeAuthUtils; import org.apache.iotdb.metrics.utils.MetricLevel; @@ -274,6 +275,7 @@ public boolean closeSession(IClientSession session, LongConsumer releaseByQueryI } private void releaseSessionResource(IClientSession session, LongConsumer releaseQueryResource) { + // Release query resources Iterable statementIds = session.getStatementIds(); if (statementIds != null) { for (Long statementId : statementIds) { @@ -285,6 +287,17 @@ private void releaseSessionResource(IClientSession session, LongConsumer release } } } + + // Release PreparedStatement memory resources + try { + PreparedStatementMemoryManager.getInstance().releaseAllForSession(session); + } catch (Exception e) { + LOGGER.warn( + "Failed to release PreparedStatement resources for session {}: {}", + session, + e.getMessage(), + e); + } } public TSStatus closeOperation( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/handler/BaseServerContextHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/handler/BaseServerContextHandler.java index e633caa45f6a..352d8915d890 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/handler/BaseServerContextHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/handler/BaseServerContextHandler.java @@ -20,7 +20,9 @@ package org.apache.iotdb.db.protocol.thrift.handler; import org.apache.iotdb.db.protocol.session.ClientSession; +import org.apache.iotdb.db.protocol.session.IClientSession; import org.apache.iotdb.db.protocol.session.SessionManager; +import org.apache.iotdb.db.queryengine.plan.Coordinator; import org.apache.iotdb.external.api.thrift.JudgableServerContext; import org.apache.iotdb.external.api.thrift.ServerContextFactory; import org.apache.iotdb.rpc.TElasticFramedTransport; @@ -70,7 +72,22 @@ public ServerContext createContext(TProtocol in, TProtocol out) { } public void deleteContext(ServerContext context, TProtocol in, TProtocol out) { + IClientSession session = getSessionManager().getCurrSession(); + + // Release session resources (including PreparedStatement memory) + // This handles TCP connection loss scenarios + if (session != null) { + try { + getSessionManager().closeSession(session, Coordinator.getInstance()::cleanupQueryExecution); + } catch (Exception e) { + logger.warn( + "Failed to close session during TCP connection disconnect: {}", e.getMessage(), e); + } + } + + // Remove the session from the current thread getSessionManager().removeCurrSession(); + if (context != null && factory != null) { ((JudgableServerContext) context).whenDisconnect(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java index c2f9639de8e2..2f5776dab2ed 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java @@ -32,7 +32,9 @@ import org.apache.iotdb.db.auth.AuthorityChecker; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.exception.sql.SemanticException; import org.apache.iotdb.db.protocol.session.IClientSession; +import org.apache.iotdb.db.protocol.session.PreparedStatementInfo; import org.apache.iotdb.db.queryengine.common.DataNodeEndPoints; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.common.QueryId; @@ -49,6 +51,7 @@ import org.apache.iotdb.db.queryengine.plan.execution.config.TreeConfigTaskVisitor; import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner; import org.apache.iotdb.db.queryengine.plan.planner.TreeModelPlanner; +import org.apache.iotdb.db.queryengine.plan.relational.analyzer.NodeRef; import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata; import org.apache.iotdb.db.queryengine.plan.relational.planner.PlannerContext; import org.apache.iotdb.db.queryengine.plan.relational.planner.TableModelPlanner; @@ -56,6 +59,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.DistributedOptimizeFactory; import org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.LogicalOptimizeFactory; import org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.PlanOptimizer; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ParameterExtractor; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AddColumn; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AlterDB; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ClearCache; @@ -64,6 +68,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateModel; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateTable; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateTraining; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Deallocate; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DeleteDevice; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DescribeTable; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropColumn; @@ -71,13 +76,19 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropFunction; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropModel; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropTable; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Execute; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ExecuteImmediate; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ExtendRegion; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Flush; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.KillQuery; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Literal; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LoadConfiguration; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LoadModel; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.MigrateRegion; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Parameter; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.PipeStatement; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Prepare; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ReconstructRegion; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.RelationalAuthorStatement; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.RemoveAINode; @@ -130,7 +141,9 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; @@ -398,7 +411,9 @@ private IQueryExecution createQueryExecutionForTableModel( logicalPlanOptimizers, distributionPlanOptimizers, AuthorityChecker.getAccessControl(), - dataNodeLocationSupplier); + dataNodeLocationSupplier, + Collections.emptyList(), + Collections.emptyMap()); return new QueryExecution(tableModelPlanner, queryContext, executor); } @@ -471,7 +486,9 @@ private IQueryExecution createQueryExecutionForTableModel( || statement instanceof LoadModel || statement instanceof UnloadModel || statement instanceof ShowLoadedModels - || statement instanceof RemoveRegion) { + || statement instanceof RemoveRegion + || statement instanceof Prepare + || statement instanceof Deallocate) { return new ConfigExecution( queryContext, null, @@ -481,12 +498,54 @@ private IQueryExecution createQueryExecutionForTableModel( clientSession, metadata, AuthorityChecker.getAccessControl()), queryContext)); } + // Initialize variables for TableModelPlanner + org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement statementToUse = statement; + List parameters = Collections.emptyList(); + Map, Expression> parameterLookup = Collections.emptyMap(); + + if (statement instanceof Execute) { + Execute executeStatement = (Execute) statement; + String statementName = executeStatement.getStatementName().getValue(); + + // Get prepared statement from session (contains cached AST) + PreparedStatementInfo preparedInfo = clientSession.getPreparedStatement(statementName); + if (preparedInfo == null) { + throw new SemanticException( + String.format("Prepared statement '%s' does not exist", statementName)); + } + + // Use cached AST + statementToUse = preparedInfo.getSql(); + + // Bind parameters: create parameterLookup map + // Note: bindParameters() internally validates parameter count + parameterLookup = + ParameterExtractor.bindParameters(statementToUse, executeStatement.getParameters()); + parameters = new ArrayList<>(executeStatement.getParameters()); + + } else if (statement instanceof ExecuteImmediate) { + ExecuteImmediate executeImmediateStatement = (ExecuteImmediate) statement; + + // EXECUTE IMMEDIATE needs to parse SQL first + String sql = executeImmediateStatement.getSqlString(); + List literalParameters = executeImmediateStatement.getParameters(); + + statementToUse = sqlParser.createStatement(sql, clientSession.getZoneId(), clientSession); + + if (!literalParameters.isEmpty()) { + parameterLookup = ParameterExtractor.bindParameters(statementToUse, literalParameters); + parameters = new ArrayList<>(literalParameters); + } + } + if (statement instanceof WrappedInsertStatement) { ((WrappedInsertStatement) statement).setContext(queryContext); } - final TableModelPlanner tableModelPlanner = + + // Create QueryExecution with TableModelPlanner + TableModelPlanner tableModelPlanner = new TableModelPlanner( - statement, + statementToUse, sqlParser, metadata, scheduledExecutor, @@ -496,7 +555,9 @@ private IQueryExecution createQueryExecutionForTableModel( logicalPlanOptimizers, distributionPlanOptimizers, AuthorityChecker.getAccessControl(), - dataNodeLocationSupplier); + dataNodeLocationSupplier, + parameters, + parameterLookup); return new QueryExecution(tableModelPlanner, queryContext, executor); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java index 0475a27c26da..978949fbb860 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java @@ -98,6 +98,8 @@ import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.relational.ShowTablesDetailsTask; import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.relational.ShowTablesTask; import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.relational.UseDBTask; +import org.apache.iotdb.db.queryengine.plan.execution.config.session.DeallocateTask; +import org.apache.iotdb.db.queryengine.plan.execution.config.session.PrepareTask; import org.apache.iotdb.db.queryengine.plan.execution.config.session.SetSqlDialectTask; import org.apache.iotdb.db.queryengine.plan.execution.config.session.ShowCurrentDatabaseTask; import org.apache.iotdb.db.queryengine.plan.execution.config.session.ShowCurrentSqlDialectTask; @@ -146,6 +148,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CreateView; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DataType; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DatabaseStatement; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Deallocate; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DeleteDevice; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DescribeTable; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DropColumn; @@ -167,6 +170,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LongLiteral; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.MigrateRegion; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Node; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Prepare; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Property; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.QualifiedName; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ReconstructRegion; @@ -1348,6 +1352,18 @@ protected IConfigTask visitSetSqlDialect(SetSqlDialect node, MPPQueryContext con return new SetSqlDialectTask(node.getSqlDialect()); } + @Override + protected IConfigTask visitPrepare(Prepare node, MPPQueryContext context) { + context.setQueryType(QueryType.WRITE); + return new PrepareTask(node.getStatementName().getValue(), node.getSql()); + } + + @Override + protected IConfigTask visitDeallocate(Deallocate node, MPPQueryContext context) { + context.setQueryType(QueryType.WRITE); + return new DeallocateTask(node.getStatementName().getValue()); + } + @Override protected IConfigTask visitShowCurrentDatabase( ShowCurrentDatabase node, MPPQueryContext context) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/session/DeallocateTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/session/DeallocateTask.java new file mode 100644 index 000000000000..e43077992591 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/session/DeallocateTask.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.execution.config.session; + +import org.apache.iotdb.db.exception.sql.SemanticException; +import org.apache.iotdb.db.protocol.session.IClientSession; +import org.apache.iotdb.db.protocol.session.PreparedStatementInfo; +import org.apache.iotdb.db.protocol.session.SessionManager; +import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult; +import org.apache.iotdb.db.queryengine.plan.execution.config.IConfigTask; +import org.apache.iotdb.db.queryengine.plan.execution.config.executor.IConfigTaskExecutor; +import org.apache.iotdb.rpc.TSStatusCode; + +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; + +/** + * Task for executing DEALLOCATE PREPARE statement. Removes the prepared statement from the session + * and releases its allocated memory. + */ +public class DeallocateTask implements IConfigTask { + + private final String statementName; + + public DeallocateTask(String statementName) { + this.statementName = statementName; + } + + @Override + public ListenableFuture execute(IConfigTaskExecutor configTaskExecutor) + throws InterruptedException { + SettableFuture future = SettableFuture.create(); + try { + IClientSession session = SessionManager.getInstance().getCurrSession(); + if (session == null) { + future.setException( + new IllegalStateException("No current session available for DEALLOCATE statement")); + return future; + } + + // Remove the prepared statement + PreparedStatementInfo removedInfo = session.removePreparedStatement(statementName); + if (removedInfo == null) { + throw new SemanticException( + String.format("Prepared statement '%s' does not exist", statementName)); + } else { + // Release the memory allocated for this PreparedStatement + PreparedStatementMemoryManager.getInstance().release(removedInfo.getMemoryBlock()); + } + + future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); + } catch (Exception e) { + future.setException(e); + } + return future; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/session/PrepareTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/session/PrepareTask.java new file mode 100644 index 000000000000..470a663277b9 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/session/PrepareTask.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.execution.config.session; + +import org.apache.iotdb.commons.memory.IMemoryBlock; +import org.apache.iotdb.db.exception.sql.SemanticException; +import org.apache.iotdb.db.protocol.session.IClientSession; +import org.apache.iotdb.db.protocol.session.PreparedStatementInfo; +import org.apache.iotdb.db.protocol.session.SessionManager; +import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult; +import org.apache.iotdb.db.queryengine.plan.execution.config.IConfigTask; +import org.apache.iotdb.db.queryengine.plan.execution.config.executor.IConfigTaskExecutor; +import org.apache.iotdb.db.queryengine.plan.relational.sql.AstMemoryEstimator; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement; +import org.apache.iotdb.rpc.TSStatusCode; + +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; + +/** + * Task for executing PREPARE statement. Stores the prepared statement AST in the session. The AST + * is cached to avoid re-parsing on EXECUTE (skipping Parser phase). Memory is allocated from + * CoordinatorMemoryManager and shared across all sessions. + */ +public class PrepareTask implements IConfigTask { + + private final String statementName; + private final Statement sql; // AST containing Parameter nodes + + public PrepareTask(String statementName, Statement sql) { + this.statementName = statementName; + this.sql = sql; + } + + @Override + public ListenableFuture execute(IConfigTaskExecutor configTaskExecutor) + throws InterruptedException { + SettableFuture future = SettableFuture.create(); + IMemoryBlock memoryBlock = null; + try { + IClientSession session = SessionManager.getInstance().getCurrSession(); + if (session == null) { + future.setException( + new IllegalStateException("No current session available for PREPARE statement")); + return future; + } + + // Check if prepared statement with the same name already exists + PreparedStatementInfo existingInfo = session.getPreparedStatement(statementName); + if (existingInfo != null) { + throw new SemanticException( + String.format("Prepared statement '%s' already exists.", statementName)); + } + + // Estimate memory size of the AST + long memorySizeInBytes = AstMemoryEstimator.estimateMemorySize(sql); + + // Allocate memory from CoordinatorMemoryManager + // This memory is shared across all sessions + memoryBlock = + PreparedStatementMemoryManager.getInstance().allocate(statementName, memorySizeInBytes); + + // Create and store the prepared statement info (AST is cached) + PreparedStatementInfo info = new PreparedStatementInfo(statementName, sql, memoryBlock); + session.addPreparedStatement(statementName, info); + + future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); + } catch (Exception e) { + // If memory allocation succeeded but something else failed, release the memory + if (memoryBlock != null) { + PreparedStatementMemoryManager.getInstance().release(memoryBlock); + } + future.setException(e); + } + return future; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/session/PreparedStatementMemoryManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/session/PreparedStatementMemoryManager.java new file mode 100644 index 000000000000..443fb744bf3f --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/session/PreparedStatementMemoryManager.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.execution.config.session; + +import org.apache.iotdb.commons.memory.IMemoryBlock; +import org.apache.iotdb.commons.memory.MemoryBlockType; +import org.apache.iotdb.commons.memory.MemoryException; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.exception.sql.SemanticException; +import org.apache.iotdb.db.protocol.session.IClientSession; +import org.apache.iotdb.db.protocol.session.PreparedStatementInfo; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashSet; +import java.util.Set; + +/** + * Memory manager for PreparedStatement. All PreparedStatements from all sessions share the memory + * pool allocated from CoordinatorMemoryManager. When memory is full, new PREPARE statements will + * fail until some PreparedStatements are deallocated. + */ +public class PreparedStatementMemoryManager { + private static final Logger LOGGER = + LoggerFactory.getLogger(PreparedStatementMemoryManager.class); + + private static final PreparedStatementMemoryManager INSTANCE = + new PreparedStatementMemoryManager(); + + private PreparedStatementMemoryManager() { + // singleton + } + + public static PreparedStatementMemoryManager getInstance() { + return INSTANCE; + } + + /** + * Allocate memory for a PreparedStatement. + * + * @param statementName the name of the prepared statement (used as memory block name) + * @param memorySizeInBytes the memory size in bytes to allocate + * @return the allocated memory block + * @throws SemanticException if memory allocation fails + */ + public IMemoryBlock allocate(String statementName, long memorySizeInBytes) { + try { + IMemoryBlock memoryBlock = + IoTDBDescriptor.getInstance() + .getMemoryConfig() + .getCoordinatorMemoryManager() + .exactAllocate( + "PreparedStatement-" + statementName, memorySizeInBytes, MemoryBlockType.DYNAMIC); + + LOGGER.debug( + "Allocated {} bytes for PreparedStatement '{}'", memorySizeInBytes, statementName); + return memoryBlock; + } catch (MemoryException e) { + LOGGER.warn( + "Failed to allocate {} bytes for PreparedStatement '{}': {}", + memorySizeInBytes, + statementName, + e.getMessage()); + throw new SemanticException( + String.format( + "Insufficient memory for PreparedStatement '%s'. " + + "Please deallocate some PreparedStatements and try again.", + statementName)); + } + } + + /** + * Release memory for a PreparedStatement. + * + * @param memoryBlock the memory block to release + */ + public void release(IMemoryBlock memoryBlock) { + if (memoryBlock != null) { + try { + memoryBlock.close(); + LOGGER.debug( + "Released memory block '{}' ({} bytes) for PreparedStatement", + memoryBlock.getName(), + memoryBlock.getTotalMemorySizeInBytes()); + } catch (Exception e) { + LOGGER.error( + "Failed to release memory block '{}' for PreparedStatement: {}", + memoryBlock.getName(), + e.getMessage(), + e); + } + } + } + + /** + * Release all PreparedStatements for a session. This method should be called when a session is + * closed or connection is lost. + * + * @param session the session whose PreparedStatements should be released + */ + public void releaseAllForSession(IClientSession session) { + if (session == null) { + return; + } + + try { + Set preparedStatementNames = session.getPreparedStatementNames(); + if (preparedStatementNames == null || preparedStatementNames.isEmpty()) { + return; + } + + int releasedCount = 0; + long totalReleasedBytes = 0; + + // Create a copy of the set to avoid concurrent modification + Set statementNamesCopy = new HashSet<>(preparedStatementNames); + + for (String statementName : statementNamesCopy) { + try { + PreparedStatementInfo info = session.getPreparedStatement(statementName); + if (info != null && info.getMemoryBlock() != null) { + IMemoryBlock memoryBlock = info.getMemoryBlock(); + if (!memoryBlock.isReleased()) { + long memorySize = memoryBlock.getTotalMemorySizeInBytes(); + release(memoryBlock); + releasedCount++; + totalReleasedBytes += memorySize; + } + } + } catch (Exception e) { + LOGGER.warn( + "Failed to release PreparedStatement '{}' during session cleanup: {}", + statementName, + e.getMessage(), + e); + } + } + + if (releasedCount > 0) { + LOGGER.debug( + "Released {} PreparedStatement(s) ({} bytes total) for session {}", + releasedCount, + totalReleasedBytes, + session); + } + } catch (Exception e) { + LOGGER.warn( + "Failed to release PreparedStatement resources for session {}: {}", + session, + e.getMessage(), + e); + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java index 6a2b7eb34c16..70da304aef4a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java @@ -45,7 +45,6 @@ import org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName; import org.apache.iotdb.db.queryengine.plan.relational.metadata.TableMetadataImpl; import org.apache.iotdb.db.queryengine.plan.relational.metadata.TableSchema; -import org.apache.iotdb.db.queryengine.plan.relational.planner.IrExpressionInterpreter; import org.apache.iotdb.db.queryengine.plan.relational.planner.PlannerContext; import org.apache.iotdb.db.queryengine.plan.relational.planner.ScopeAware; import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; @@ -132,6 +131,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.NullIfExpression; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Offset; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.OrderBy; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Parameter; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.PatternRecognitionRelation; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.PipeEnriched; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Property; @@ -211,6 +211,7 @@ import org.apache.iotdb.udf.api.relational.table.specification.TableParameterSpecification; import com.google.common.base.Joiner; +import com.google.common.base.VerifyException; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -280,6 +281,7 @@ import static org.apache.iotdb.db.queryengine.plan.relational.function.tvf.ForecastTableFunction.TIMECOL_PARAMETER_NAME; import static org.apache.iotdb.db.queryengine.plan.relational.metadata.MetadataUtil.createQualifiedObjectName; import static org.apache.iotdb.db.queryengine.plan.relational.metadata.TableMetadataImpl.isTimestampType; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.IrExpressionInterpreter.evaluateConstantExpression; import static org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DereferenceExpression.getQualifiedName; import static org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Join.Type.FULL; import static org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Join.Type.INNER; @@ -3965,15 +3967,12 @@ private void analyzeOffset(Offset node, Scope scope) { if (node.getRowCount() instanceof LongLiteral) { rowCount = ((LongLiteral) node.getRowCount()).getParsedValue(); } else { - // checkState( - // node.getRowCount() instanceof Parameter, - // "unexpected OFFSET rowCount: " + - // node.getRowCount().getClass().getSimpleName()); - throw new SemanticException( + checkState( + node.getRowCount() instanceof Parameter, "unexpected OFFSET rowCount: " + node.getRowCount().getClass().getSimpleName()); - // OptionalLong providedValue = - // analyzeParameterAsRowCount((Parameter) node.getRowCount(), scope, "OFFSET"); - // rowCount = providedValue.orElse(0); + OptionalLong providedValue = + analyzeParameterAsRowCount((Parameter) node.getRowCount(), scope, "OFFSET"); + rowCount = providedValue.orElse(0); } if (rowCount < 0) { throw new SemanticException( @@ -4034,14 +4033,10 @@ private boolean analyzeLimit(Limit node, Scope scope) { } else if (node.getRowCount() instanceof LongLiteral) { rowCount = OptionalLong.of(((LongLiteral) node.getRowCount()).getParsedValue()); } else { - // checkState( - // node.getRowCount() instanceof Parameter, - // "unexpected LIMIT rowCount: " + - // node.getRowCount().getClass().getSimpleName()); - throw new SemanticException( + checkState( + node.getRowCount() instanceof Parameter, "unexpected LIMIT rowCount: " + node.getRowCount().getClass().getSimpleName()); - // rowCount = analyzeParameterAsRowCount((Parameter) node.getRowCount(), scope, - // "LIMIT"); + rowCount = analyzeParameterAsRowCount((Parameter) node.getRowCount(), scope, "LIMIT"); } rowCount.ifPresent( count -> { @@ -4057,32 +4052,27 @@ private boolean analyzeLimit(Limit node, Scope scope) { return false; } - // private OptionalLong analyzeParameterAsRowCount( - // Parameter parameter, Scope scope, String context) { - // // validate parameter index - // analyzeExpression(parameter, scope); - // Expression providedValue = analysis.getParameters().get(NodeRef.of(parameter)); - // Object value; - // try { - // value = - // evaluateConstantExpression( - // providedValue, - // BIGINT, - // plannerContext, - // session, - // accessControl, - // analysis.getParameters()); - // } catch (VerifyException e) { - // throw new SemanticException( - // String.format("Non constant parameter value for %s: %s", context, providedValue)); - // } - // if (value == null) { - // throw new SemanticException( - // String.format("Parameter value provided for %s is NULL: %s", context, - // providedValue)); - // } - // return OptionalLong.of((long) value); - // } + private OptionalLong analyzeParameterAsRowCount( + Parameter parameter, Scope scope, String context) { + // validate parameter index + analyzeExpression(parameter, scope); + Expression providedValue = analysis.getParameters().get(NodeRef.of(parameter)); + Object value; + try { + value = + evaluateConstantExpression( + providedValue, new PlannerContext(metadata, typeManager), sessionContext); + + } catch (VerifyException e) { + throw new SemanticException( + String.format("Non constant parameter value for %s: %s", context, providedValue)); + } + if (value == null) { + throw new SemanticException( + String.format("Parameter value provided for %s is NULL: %s", context, providedValue)); + } + return OptionalLong.of((long) value); + } private void analyzeAggregations( QuerySpecification node, @@ -5146,7 +5136,7 @@ private ArgumentAnalysis analyzeScalarArgument( Expression expression, ScalarParameterSpecification argumentSpecification) { // currently, only constant arguments are supported Object constantValue = - IrExpressionInterpreter.evaluateConstantExpression( + evaluateConstantExpression( expression, new PlannerContext(metadata, typeManager), sessionContext); if (!argumentSpecification.getType().checkObjectType(constantValue)) { if ((argumentSpecification.getType().equals(org.apache.iotdb.udf.api.type.Type.STRING) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableModelPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableModelPlanner.java index 8d7ce3cebbba..ba7f6e42c24b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableModelPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableModelPlanner.java @@ -35,13 +35,16 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan; import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis; import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analyzer; +import org.apache.iotdb.db.queryengine.plan.relational.analyzer.NodeRef; import org.apache.iotdb.db.queryengine.plan.relational.analyzer.StatementAnalyzerFactory; import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata; import org.apache.iotdb.db.queryengine.plan.relational.planner.distribute.TableDistributedPlanner; import org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.DataNodeLocationSupplierFactory; import org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.PlanOptimizer; import org.apache.iotdb.db.queryengine.plan.relational.security.AccessControl; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LoadTsFile; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Parameter; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.PipeEnriched; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.WrappedInsertStatement; @@ -56,8 +59,8 @@ import org.apache.iotdb.rpc.TSStatusCode; import java.util.ArrayList; -import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.concurrent.ScheduledExecutorService; import static org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet.DISTRIBUTION_PLANNER; @@ -87,6 +90,10 @@ public class TableModelPlanner implements IPlanner { private final DataNodeLocationSupplierFactory.DataNodeLocationSupplier dataNodeLocationSupplier; + // Parameters for prepared statements (optional) + private final List parameters; + private final Map, Expression> parameterLookup; + public TableModelPlanner( final Statement statement, final SqlParser sqlParser, @@ -100,7 +107,9 @@ public TableModelPlanner( final List logicalPlanOptimizers, final List distributionPlanOptimizers, final AccessControl accessControl, - final DataNodeLocationSupplierFactory.DataNodeLocationSupplier dataNodeLocationSupplier) { + final DataNodeLocationSupplierFactory.DataNodeLocationSupplier dataNodeLocationSupplier, + final List parameters, + final Map, Expression> parameterLookup) { this.statement = statement; this.sqlParser = sqlParser; this.metadata = metadata; @@ -112,6 +121,8 @@ public TableModelPlanner( this.distributionPlanOptimizers = distributionPlanOptimizers; this.accessControl = accessControl; this.dataNodeLocationSupplier = dataNodeLocationSupplier; + this.parameters = parameters; + this.parameterLookup = parameterLookup; } @Override @@ -120,8 +131,8 @@ public IAnalysis analyze(final MPPQueryContext context) { context, context.getSession(), new StatementAnalyzerFactory(metadata, sqlParser, accessControl), - Collections.emptyList(), - Collections.emptyMap(), + parameters, + parameterLookup, statementRewrite, warningCollector) .analyze(statement); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/AstMemoryEstimator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/AstMemoryEstimator.java new file mode 100644 index 000000000000..d45f6546e0f6 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/AstMemoryEstimator.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.sql; + +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DefaultTraversalVisitor; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Node; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement; + +import org.apache.tsfile.utils.RamUsageEstimator; + +/** + * Utility class for estimating memory usage of AST nodes. Uses RamUsageEstimator to calculate + * approximate memory size. + */ +public final class AstMemoryEstimator { + private AstMemoryEstimator() {} + + /** + * Estimate the memory size of a Statement AST node in bytes. + * + * @param statement the statement AST to estimate + * @return estimated memory size in bytes + */ + public static long estimateMemorySize(Statement statement) { + if (statement == null) { + return 0L; + } + MemoryEstimatingVisitor visitor = new MemoryEstimatingVisitor(); + visitor.process(statement, null); + return visitor.getTotalMemorySize(); + } + + private static class MemoryEstimatingVisitor extends DefaultTraversalVisitor { + private long totalMemorySize = 0L; + + public long getTotalMemorySize() { + return totalMemorySize; + } + + @Override + protected Void visitNode(Node node, Void context) { + // Estimate shallow size of the node object + long nodeSize = RamUsageEstimator.shallowSizeOfInstance(node.getClass()); + totalMemorySize += nodeSize; + + // Traverse children (DefaultTraversalVisitor handles this) + return super.visitNode(node, context); + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ParameterExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ParameterExtractor.java new file mode 100644 index 000000000000..d727acdc35e2 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ParameterExtractor.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.queryengine.plan.relational.sql; + +import org.apache.iotdb.db.exception.sql.SemanticException; +import org.apache.iotdb.db.queryengine.plan.relational.analyzer.NodeRef; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DefaultTraversalVisitor; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Literal; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Parameter; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement; + +import com.google.common.collect.ImmutableMap; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import static com.google.common.collect.ImmutableList.toImmutableList; + +/** Utility class for extracting and binding parameters in prepared statements. */ +public final class ParameterExtractor { + private ParameterExtractor() {} + + /** + * Get the number of parameters in a statement. + * + * @param statement the statement to analyze + * @return the number of parameters + */ + public static int getParameterCount(Statement statement) { + return extractParameters(statement).size(); + } + + /** + * Extract all Parameter nodes from a statement in order of appearance. + * + * @param statement the statement to analyze + * @return list of Parameter nodes in order of appearance + */ + public static List extractParameters(Statement statement) { + ParameterExtractingVisitor visitor = new ParameterExtractingVisitor(); + visitor.process(statement, null); + return visitor.getParameters().stream() + .sorted( + Comparator.comparing( + parameter -> + parameter + .getLocation() + .orElseThrow( + () -> new SemanticException("Parameter node must have a location")), + Comparator.comparing( + org.apache.iotdb.db.queryengine.plan.relational.sql.ast.NodeLocation + ::getLineNumber) + .thenComparing( + org.apache.iotdb.db.queryengine.plan.relational.sql.ast.NodeLocation + ::getColumnNumber))) + .collect(toImmutableList()); + } + + /** + * Bind parameter values to Parameter nodes in a statement. Creates a map from Parameter node + * references to their corresponding Expression values. + * + * @param statement the statement containing Parameter nodes + * @param values the parameter values (in order) + * @return map from Parameter node references to Expression values + * @throws SemanticException if the number of parameters doesn't match + */ + public static Map, Expression> bindParameters( + Statement statement, List values) { + List parametersList = extractParameters(statement); + + // Validate parameter count + if (parametersList.size() != values.size()) { + throw new SemanticException( + String.format( + "Invalid number of parameters: expected %d, got %d", + parametersList.size(), values.size())); + } + + ImmutableMap.Builder, Expression> builder = ImmutableMap.builder(); + Iterator iterator = values.iterator(); + for (Parameter parameter : parametersList) { + builder.put(NodeRef.of(parameter), iterator.next()); + } + return builder.buildOrThrow(); + } + + private static class ParameterExtractingVisitor extends DefaultTraversalVisitor { + private final List parameters = new ArrayList<>(); + + public List getParameters() { + return parameters; + } + + @Override + protected Void visitParameter(Parameter node, Void context) { + parameters.add(node); + return null; + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java index 5b43b467a362..fb04469ac9bf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java @@ -93,6 +93,22 @@ protected R visitUse(Use node, C context) { return visitStatement(node, context); } + protected R visitPrepare(Prepare node, C context) { + return visitStatement(node, context); + } + + protected R visitExecute(Execute node, C context) { + return visitStatement(node, context); + } + + protected R visitExecuteImmediate(ExecuteImmediate node, C context) { + return visitStatement(node, context); + } + + protected R visitDeallocate(Deallocate node, C context) { + return visitStatement(node, context); + } + protected R visitGenericLiteral(GenericLiteral node, C context) { return visitLiteral(node, context); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/Deallocate.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/Deallocate.java new file mode 100644 index 000000000000..28bcc0c1e3f3 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/Deallocate.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.sql.ast; + +import com.google.common.collect.ImmutableList; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.List; +import java.util.Objects; + +import static com.google.common.base.MoreObjects.toStringHelper; +import static java.util.Objects.requireNonNull; + +/** DEALLOCATE PREPARE statement AST node. Example: DEALLOCATE PREPARE stmt1 */ +public final class Deallocate extends Statement { + + private final Identifier statementName; + + public Deallocate(@Nonnull Identifier statementName) { + this(null, statementName); + } + + public Deallocate(@Nullable NodeLocation location, @Nonnull Identifier statementName) { + super(location); + this.statementName = requireNonNull(statementName, "statementName is null"); + } + + public Identifier getStatementName() { + return statementName; + } + + @Override + public R accept(AstVisitor visitor, C context) { + return visitor.visitDeallocate(this, context); + } + + @Override + public List getChildren() { + return ImmutableList.of(statementName); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Deallocate that = (Deallocate) o; + return Objects.equals(statementName, that.statementName); + } + + @Override + public int hashCode() { + return Objects.hash(statementName); + } + + @Override + public String toString() { + return toStringHelper(this).add("statementName", statementName).toString(); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/Execute.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/Execute.java new file mode 100644 index 000000000000..310fce40d3bf --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/Execute.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.sql.ast; + +import com.google.common.collect.ImmutableList; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.List; +import java.util.Objects; + +import static com.google.common.base.MoreObjects.toStringHelper; +import static java.util.Objects.requireNonNull; + +/** EXECUTE statement AST node. Example: EXECUTE stmt1 USING 100, 'test' */ +public final class Execute extends Statement { + + private final Identifier statementName; + private final List parameters; + + public Execute(@Nonnull Identifier statementName) { + this(null, statementName, ImmutableList.of()); + } + + public Execute(@Nonnull Identifier statementName, @Nonnull List parameters) { + this(null, statementName, parameters); + } + + public Execute( + @Nullable NodeLocation location, + @Nonnull Identifier statementName, + @Nonnull List parameters) { + super(location); + this.statementName = requireNonNull(statementName, "statementName is null"); + this.parameters = ImmutableList.copyOf(requireNonNull(parameters, "parameters is null")); + } + + public Identifier getStatementName() { + return statementName; + } + + public List getParameters() { + return parameters; + } + + @Override + public R accept(AstVisitor visitor, C context) { + return visitor.visitExecute(this, context); + } + + @Override + public List getChildren() { + ImmutableList.Builder children = ImmutableList.builder(); + children.add(statementName); + children.addAll(parameters); + return children.build(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Execute that = (Execute) o; + return Objects.equals(statementName, that.statementName) + && Objects.equals(parameters, that.parameters); + } + + @Override + public int hashCode() { + return Objects.hash(statementName, parameters); + } + + @Override + public String toString() { + return toStringHelper(this) + .add("statementName", statementName) + .add("parameters", parameters) + .toString(); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/ExecuteImmediate.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/ExecuteImmediate.java new file mode 100644 index 000000000000..3e962205e04c --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/ExecuteImmediate.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.sql.ast; + +import com.google.common.collect.ImmutableList; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.List; +import java.util.Objects; + +import static com.google.common.base.MoreObjects.toStringHelper; +import static java.util.Objects.requireNonNull; + +/** + * EXECUTE IMMEDIATE statement AST node. Example: EXECUTE IMMEDIATE 'SELECT * FROM table WHERE id = + * 100' + */ +public final class ExecuteImmediate extends Statement { + + private final StringLiteral sql; + private final List parameters; + + public ExecuteImmediate(@Nonnull StringLiteral sql) { + this(null, sql, ImmutableList.of()); + } + + public ExecuteImmediate(@Nonnull StringLiteral sql, @Nonnull List parameters) { + this(null, sql, parameters); + } + + public ExecuteImmediate( + @Nullable NodeLocation location, + @Nonnull StringLiteral sql, + @Nonnull List parameters) { + super(location); + this.sql = requireNonNull(sql, "sql is null"); + this.parameters = ImmutableList.copyOf(requireNonNull(parameters, "parameters is null")); + } + + public StringLiteral getSql() { + return sql; + } + + public String getSqlString() { + return sql.getValue(); + } + + public List getParameters() { + return parameters; + } + + @Override + public R accept(AstVisitor visitor, C context) { + return visitor.visitExecuteImmediate(this, context); + } + + @Override + public List getChildren() { + ImmutableList.Builder children = ImmutableList.builder(); + children.add(sql); + children.addAll(parameters); + return children.build(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ExecuteImmediate that = (ExecuteImmediate) o; + return Objects.equals(sql, that.sql) && Objects.equals(parameters, that.parameters); + } + + @Override + public int hashCode() { + return Objects.hash(sql, parameters); + } + + @Override + public String toString() { + return toStringHelper(this).add("sql", sql).add("parameters", parameters).toString(); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/Prepare.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/Prepare.java new file mode 100644 index 000000000000..15691679d613 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/Prepare.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.sql.ast; + +import com.google.common.collect.ImmutableList; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.List; +import java.util.Objects; + +import static com.google.common.base.MoreObjects.toStringHelper; +import static java.util.Objects.requireNonNull; + +/** PREPARE statement AST node. Example: PREPARE stmt1 FROM SELECT * FROM table WHERE id = ? */ +public final class Prepare extends Statement { + + private final Identifier statementName; + private final Statement sql; + + public Prepare(@Nonnull Identifier statementName, @Nonnull Statement sql) { + super(null); + this.statementName = requireNonNull(statementName, "statementName is null"); + this.sql = requireNonNull(sql, "sql is null"); + } + + public Prepare( + @Nullable NodeLocation location, @Nonnull Identifier statementName, @Nonnull Statement sql) { + super(location); + this.statementName = requireNonNull(statementName, "statementName is null"); + this.sql = requireNonNull(sql, "sql is null"); + } + + public Identifier getStatementName() { + return statementName; + } + + public Statement getSql() { + return sql; + } + + @Override + public R accept(AstVisitor visitor, C context) { + return visitor.visitPrepare(this, context); + } + + @Override + public List getChildren() { + return ImmutableList.of(statementName, sql); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Prepare that = (Prepare) o; + return Objects.equals(statementName, that.statementName) && Objects.equals(sql, that.sql); + } + + @Override + public int hashCode() { + return Objects.hash(statementName, sql); + } + + @Override + public String toString() { + return toStringHelper(this).add("statementName", statementName).add("sql", sql).toString(); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java index 76107bfa30e7..47da2dfc6203 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java @@ -71,6 +71,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CurrentUser; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DataType; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DataTypeParameter; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Deallocate; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Delete; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DeleteDevice; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DereferenceExpression; @@ -89,6 +90,8 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.EmptyPattern; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Except; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ExcludedPattern; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Execute; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ExecuteImmediate; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ExistsPredicate; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Explain; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ExplainAnalyze; @@ -145,6 +148,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.PatternRecognitionRelation; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.PatternRecognitionRelation.RowsPerMatch; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.PatternVariable; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Prepare; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ProcessingMode; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Property; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.QualifiedName; @@ -3779,6 +3783,40 @@ public Node visitDropModelStatement(RelationalSqlParser.DropModelStatementContex return new DropModel(modelId); } + @Override + public Node visitPrepareStatement(RelationalSqlParser.PrepareStatementContext ctx) { + Identifier statementName = lowerIdentifier((Identifier) visit(ctx.statementName)); + Statement sql = (Statement) visit(ctx.sql); + return new Prepare(getLocation(ctx), statementName, sql); + } + + @Override + public Node visitExecuteStatement(RelationalSqlParser.ExecuteStatementContext ctx) { + Identifier statementName = lowerIdentifier((Identifier) visit(ctx.statementName)); + List parameters = + ctx.literalExpression() != null && !ctx.literalExpression().isEmpty() + ? visit(ctx.literalExpression(), Literal.class) + : ImmutableList.of(); + return new Execute(getLocation(ctx), statementName, parameters); + } + + @Override + public Node visitExecuteImmediateStatement( + RelationalSqlParser.ExecuteImmediateStatementContext ctx) { + StringLiteral sql = (StringLiteral) visit(ctx.sql); + List parameters = + ctx.literalExpression() != null && !ctx.literalExpression().isEmpty() + ? visit(ctx.literalExpression(), Literal.class) + : ImmutableList.of(); + return new ExecuteImmediate(getLocation(ctx), sql, parameters); + } + + @Override + public Node visitDeallocateStatement(RelationalSqlParser.DeallocateStatementContext ctx) { + Identifier statementName = lowerIdentifier((Identifier) visit(ctx.statementName)); + return new Deallocate(getLocation(ctx), statementName); + } + // ***************** arguments ***************** @Override public Node visitGenericType(RelationalSqlParser.GenericTypeContext ctx) { diff --git a/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4 b/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4 index 59231b6de2bc..08625b46d3eb 100644 --- a/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4 +++ b/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4 @@ -173,6 +173,12 @@ statement | loadModelStatement | unloadModelStatement + // Prepared Statement + | prepareStatement + | executeStatement + | executeImmediateStatement + | deallocateStatement + // View, Trigger, CQ, Quota are not supported yet ; @@ -852,6 +858,23 @@ unloadModelStatement : UNLOAD MODEL existingModelId=identifier FROM DEVICES deviceIdList=string ; +// ------------------------------------------- Prepared Statement --------------------------------------------------------- +prepareStatement + : PREPARE statementName=identifier FROM sql=statement + ; + +executeStatement + : EXECUTE statementName=identifier (USING literalExpression (',' literalExpression)*)? + ; + +executeImmediateStatement + : EXECUTE IMMEDIATE sql=string (USING literalExpression (',' literalExpression)*)? + ; + +deallocateStatement + : DEALLOCATE PREPARE statementName=identifier + ; + // ------------------------------------------- Query Statement --------------------------------------------------------- queryStatement : query #statementDefault