Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,13 @@ private void updateConfig(Statement statement, int timeout) throws SQLException
statement.setQueryTimeout(timeout);
}

/**
* Executes a SQL query on all read statements in parallel.
*
* <p>Note: For PreparedStatement EXECUTE queries, use the write connection directly instead,
* because PreparedStatements are session-scoped and this method may route queries to different
* nodes where the PreparedStatement doesn't exist.
*/
@Override
public ResultSet executeQuery(String sql) throws SQLException {
return new ClusterTestResultSet(readStatements, readEndpoints, sql, queryTimeout);
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ public class ClientSession extends IClientSession {

private final Map<Long, Set<Long>> statementIdToQueryId = new ConcurrentHashMap<>();

// Map from statement name to PreparedStatementInfo
private final Map<String, PreparedStatementInfo> preparedStatements = new ConcurrentHashMap<>();

public ClientSession(Socket clientSocket) {
this.clientSocket = clientSocket;
}
Expand Down Expand Up @@ -103,4 +106,24 @@ public static void removeQueryId(
}
}
}

@Override
public void addPreparedStatement(String statementName, PreparedStatementInfo info) {
preparedStatements.put(statementName, info);
}

@Override
public PreparedStatementInfo removePreparedStatement(String statementName) {
return preparedStatements.remove(statementName);
}

@Override
public PreparedStatementInfo getPreparedStatement(String statementName) {
return preparedStatements.get(statementName);
}

@Override
public Set<String> getPreparedStatementNames() {
return preparedStatements.keySet();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,39 @@ public void setDatabaseName(@Nullable String databaseName) {
this.databaseName = databaseName;
}

/**
* Add a prepared statement to this session.
*
* @param statementName the name of the prepared statement
* @param info the prepared statement information
*/
public abstract void addPreparedStatement(String statementName, PreparedStatementInfo info);

/**
* Remove a prepared statement from this session.
*
* @param statementName the name of the prepared statement
* @return the removed prepared statement info, or null if not found
*/
@Nullable
public abstract PreparedStatementInfo removePreparedStatement(String statementName);

/**
* Get a prepared statement from this session.
*
* @param statementName the name of the prepared statement
* @return the prepared statement info, or null if not found
*/
@Nullable
public abstract PreparedStatementInfo getPreparedStatement(String statementName);

/**
* Get all prepared statement names in this session.
*
* @return set of prepared statement names
*/
public abstract Set<String> getPreparedStatementNames();

public enum SqlDialect {
TREE((byte) 0),
TABLE((byte) 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ public class InternalClientSession extends IClientSession {

private final Map<Long, Set<Long>> statementIdToQueryId = new ConcurrentHashMap<>();

// Map from statement name to PreparedStatementInfo
private final Map<String, PreparedStatementInfo> preparedStatements = new ConcurrentHashMap<>();

public InternalClientSession(String clientID) {
this.clientID = clientID;
}
Expand Down Expand Up @@ -88,4 +91,24 @@ public void addQueryId(Long statementId, long queryId) {
public void removeQueryId(Long statementId, Long queryId) {
ClientSession.removeQueryId(statementIdToQueryId, statementId, queryId);
}

@Override
public void addPreparedStatement(String statementName, PreparedStatementInfo info) {
preparedStatements.put(statementName, info);
}

@Override
public PreparedStatementInfo removePreparedStatement(String statementName) {
return preparedStatements.remove(statementName);
}

@Override
public PreparedStatementInfo getPreparedStatement(String statementName) {
return preparedStatements.get(statementName);
}

@Override
public Set<String> getPreparedStatementNames() {
return preparedStatements.keySet();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,17 @@
import org.apache.iotdb.service.rpc.thrift.TSConnectionType;

import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

public class MqttClientSession extends IClientSession {

private final String clientID;

// Map from statement name to PreparedStatementInfo
private final Map<String, PreparedStatementInfo> preparedStatements = new ConcurrentHashMap<>();

public MqttClientSession(String clientID) {
this.clientID = clientID;
}
Expand Down Expand Up @@ -76,4 +81,24 @@ public void addQueryId(Long statementId, long queryId) {
public void removeQueryId(Long statementId, Long queryId) {
throw new UnsupportedOperationException();
}

@Override
public void addPreparedStatement(String statementName, PreparedStatementInfo info) {
preparedStatements.put(statementName, info);
}

@Override
public PreparedStatementInfo removePreparedStatement(String statementName) {
return preparedStatements.remove(statementName);
}

@Override
public PreparedStatementInfo getPreparedStatement(String statementName) {
return preparedStatements.get(statementName);
}

@Override
public Set<String> getPreparedStatementNames() {
return preparedStatements.keySet();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.protocol.session;

import org.apache.iotdb.commons.memory.IMemoryBlock;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement;

import java.util.Objects;

import static java.util.Objects.requireNonNull;

/**
* Information about a prepared statement stored in a session. The AST is cached here to avoid
* re-parsing on EXECUTE.
*/
public class PreparedStatementInfo {

private final String statementName;
private final Statement sql; // Cached AST (contains Parameter nodes)
private final long createTime;
private final IMemoryBlock memoryBlock; // Memory block allocated for this PreparedStatement

public PreparedStatementInfo(String statementName, Statement sql, IMemoryBlock memoryBlock) {
this.statementName = requireNonNull(statementName, "statementName is null");
this.sql = requireNonNull(sql, "sql is null");
this.createTime = System.currentTimeMillis();
this.memoryBlock = memoryBlock;
}

public PreparedStatementInfo(
String statementName, Statement sql, long createTime, IMemoryBlock memoryBlock) {
this.statementName = requireNonNull(statementName, "statementName is null");
this.sql = requireNonNull(sql, "sql is null");
this.createTime = createTime;
this.memoryBlock = memoryBlock;
}

public String getStatementName() {
return statementName;
}

public Statement getSql() {
return sql;
}

public long getCreateTime() {
return createTime;
}

public IMemoryBlock getMemoryBlock() {
return memoryBlock;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
PreparedStatementInfo that = (PreparedStatementInfo) o;
return Objects.equals(statementName, that.statementName) && Objects.equals(sql, that.sql);
}

@Override
public int hashCode() {
return Objects.hash(statementName, sql);
}

@Override
public String toString() {
return "PreparedStatementInfo{"
+ "statementName='"
+ statementName
+ '\''
+ ", sql="
+ sql
+ ", createTime="
+ createTime
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,17 @@
import org.apache.iotdb.service.rpc.thrift.TSConnectionType;

import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

public class RestClientSession extends IClientSession {

private final String clientID;

// Map from statement name to PreparedStatementInfo
private final Map<String, PreparedStatementInfo> preparedStatements = new ConcurrentHashMap<>();

public RestClientSession(String clientID) {
this.clientID = clientID;
}
Expand Down Expand Up @@ -76,4 +81,24 @@ public void addQueryId(Long statementId, long queryId) {
public void removeQueryId(Long statementId, Long queryId) {
throw new UnsupportedOperationException();
}

@Override
public void addPreparedStatement(String statementName, PreparedStatementInfo info) {
preparedStatements.put(statementName, info);
}

@Override
public PreparedStatementInfo removePreparedStatement(String statementName) {
return preparedStatements.remove(statementName);
}

@Override
public PreparedStatementInfo getPreparedStatement(String statementName) {
return preparedStatements.get(statementName);
}

@Override
public Set<String> getPreparedStatementNames() {
return preparedStatements.keySet();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.iotdb.db.protocol.basic.BasicOpenSessionResp;
import org.apache.iotdb.db.protocol.thrift.OperationType;
import org.apache.iotdb.db.queryengine.common.SessionInfo;
import org.apache.iotdb.db.queryengine.plan.execution.config.session.PreparedStatementMemoryManager;
import org.apache.iotdb.db.storageengine.dataregion.read.control.QueryResourceManager;
import org.apache.iotdb.db.utils.DataNodeAuthUtils;
import org.apache.iotdb.metrics.utils.MetricLevel;
Expand Down Expand Up @@ -274,6 +275,7 @@ public boolean closeSession(IClientSession session, LongConsumer releaseByQueryI
}

private void releaseSessionResource(IClientSession session, LongConsumer releaseQueryResource) {
// Release query resources
Iterable<Long> statementIds = session.getStatementIds();
if (statementIds != null) {
for (Long statementId : statementIds) {
Expand All @@ -285,6 +287,17 @@ private void releaseSessionResource(IClientSession session, LongConsumer release
}
}
}

// Release PreparedStatement memory resources
try {
PreparedStatementMemoryManager.getInstance().releaseAllForSession(session);
} catch (Exception e) {
LOGGER.warn(
"Failed to release PreparedStatement resources for session {}: {}",
session,
e.getMessage(),
e);
}
}

public TSStatus closeOperation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
package org.apache.iotdb.db.protocol.thrift.handler;

import org.apache.iotdb.db.protocol.session.ClientSession;
import org.apache.iotdb.db.protocol.session.IClientSession;
import org.apache.iotdb.db.protocol.session.SessionManager;
import org.apache.iotdb.db.queryengine.plan.Coordinator;
import org.apache.iotdb.external.api.thrift.JudgableServerContext;
import org.apache.iotdb.external.api.thrift.ServerContextFactory;
import org.apache.iotdb.rpc.TElasticFramedTransport;
Expand Down Expand Up @@ -70,7 +72,22 @@ public ServerContext createContext(TProtocol in, TProtocol out) {
}

public void deleteContext(ServerContext context, TProtocol in, TProtocol out) {
IClientSession session = getSessionManager().getCurrSession();

// Release session resources (including PreparedStatement memory)
// This handles TCP connection loss scenarios
if (session != null) {
try {
getSessionManager().closeSession(session, Coordinator.getInstance()::cleanupQueryExecution);
} catch (Exception e) {
logger.warn(
"Failed to close session during TCP connection disconnect: {}", e.getMessage(), e);
}
}

// Remove the session from the current thread
getSessionManager().removeCurrSession();

if (context != null && factory != null) {
((JudgableServerContext) context).whenDisconnect();
}
Expand Down
Loading
Loading