diff --git a/docs/configuration/settings.md b/docs/configuration/settings.md
index 3e34882b28f..043084182b8 100644
--- a/docs/configuration/settings.md
+++ b/docs/configuration/settings.md
@@ -144,14 +144,15 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co
| kyuubi.engine.chat.provider | ECHO | The provider for the Chat engine. Candidates:
ECHO: simply replies a welcome message.
GPT: a.k.a ChatGPT, powered by OpenAI.
ERNIE: ErnieBot, powered by Baidu.
| string | 1.8.0 |
| kyuubi.engine.connection.url.use.hostname | true | (deprecated) When true, the engine registers with hostname to zookeeper. When Spark runs on K8s with cluster mode, set to false to ensure that server can connect to engine | boolean | 1.3.0 |
| kyuubi.engine.data.agent.approval.mode | NORMAL | Default approval mode for tool execution in the Data Agent engine. Candidates:
AUTO_APPROVE: all tools are auto-approved without user interaction.
NORMAL: only destructive tools require explicit approval.
STRICT: all tools require explicit user approval.
| string | 1.12.0 |
+| kyuubi.engine.data.agent.compaction.trigger.tokens | 128000 | The prompt-token threshold above which the Data Agent's compaction middleware summarizes older conversation history into a compact message. The check is made each turn as real_prompt_tokens_of_previous_LLM_call + estimate_of_newly_appended_tail; when this predicted prompt size reaches the configured value, older messages are replaced by a single summary message while the most recent exchanges are kept verbatim. Set to a very large value (e.g., 9223372036854775807) to effectively disable compaction. | long | 1.12.0 |
| kyuubi.engine.data.agent.extra.classpath | <undefined> | The extra classpath for the Data Agent engine, for configuring the location of the LLM SDK and etc. | string | 1.12.0 |
| kyuubi.engine.data.agent.java.options | <undefined> | The extra Java options for the Data Agent engine | string | 1.12.0 |
| kyuubi.engine.data.agent.jdbc.url | <undefined> | The JDBC URL for the Data Agent engine to connect to the target database. If not set, the Data Agent will connect back to Kyuubi server via ZooKeeper service discovery. | string | 1.12.0 |
-| kyuubi.engine.data.agent.llm.api.key | <undefined> | The API key to access the LLM service for the Data Agent engine. | string | 1.12.0 |
-| kyuubi.engine.data.agent.llm.api.url | <undefined> | The API base URL for the LLM service used by the Data Agent engine. | string | 1.12.0 |
-| kyuubi.engine.data.agent.llm.model | <undefined> | The model ID used by the Data Agent engine LLM provider. | string | 1.12.0 |
| kyuubi.engine.data.agent.max.iterations | 100 | The maximum number of ReAct loop iterations for the Data Agent engine. | int | 1.12.0 |
| kyuubi.engine.data.agent.memory | 1g | The heap memory for the Data Agent engine | string | 1.12.0 |
+| kyuubi.engine.data.agent.model | <undefined> | The model ID used by the Data Agent engine. | string | 1.12.0 |
+| kyuubi.engine.data.agent.openai.api.key | <undefined> | The API key for the OpenAI-compatible chat-completion endpoint used by the Data Agent engine. | string | 1.12.0 |
+| kyuubi.engine.data.agent.openai.endpoint | <undefined> | The base URL of the OpenAI-compatible chat-completion endpoint used by the Data Agent engine. | string | 1.12.0 |
| kyuubi.engine.data.agent.provider | ECHO | The provider for the Data Agent engine. Candidates:
ECHO: simply echoes the input, for testing purpose.
| string | 1.12.0 |
| kyuubi.engine.data.agent.query.timeout | PT3M | The JDBC query execution timeout for the Data Agent SQL tools. Passed to Statement.setQueryTimeout so the server (Spark/Trino/...) can cooperatively cancel long-running queries and release cluster resources. Should be set lower than kyuubi.engine.data.agent.tool.call.timeout so server-side cancellation has time to react before the outer wall-clock cap fires. | duration | 1.12.0 |
| kyuubi.engine.data.agent.tool.call.timeout | PT5M | The maximum wall-clock execution time for any tool call in the Data Agent engine. Acts as the outer safety net enforced by the agent runtime via Future.cancel(), applied uniformly to every tool. For SQL tools the inner JDBC-level timeout is controlled separately by kyuubi.engine.data.agent.query.timeout, which should be set lower so server-side cancellation has time to react before this hard cap fires. | duration | 1.12.0 |
diff --git a/externals/kyuubi-data-agent-engine/pom.xml b/externals/kyuubi-data-agent-engine/pom.xml
index c34d049360c..be29d409208 100644
--- a/externals/kyuubi-data-agent-engine/pom.xml
+++ b/externals/kyuubi-data-agent-engine/pom.xml
@@ -30,6 +30,15 @@
Kyuubi Project Engine Data Agenthttps://kyuubi.apache.org/
+
+
+ 1.8.0
+ 2.0.21
+ 4.12.0
+ 3.6.0
+
+
@@ -50,45 +59,111 @@
${project.version}
+
com.openaiopenai-java
+ ${openai.sdk.version}
+
+
+ org.jetbrains.kotlin
+ kotlin-stdlib
+ ${kotlin.stdlib.version}
+
+
+ org.jetbrains.kotlin
+ kotlin-stdlib-common
+ ${kotlin.stdlib.version}
+
+
+ org.jetbrains.kotlin
+ kotlin-stdlib-jdk7
+ ${kotlin.stdlib.version}
+
+
+ org.jetbrains.kotlin
+ kotlin-stdlib-jdk8
+ ${kotlin.stdlib.version}
+
+
+ org.jetbrains.kotlin
+ kotlin-reflect
+ ${kotlin.reflect.version}
+
+
+ com.squareup.okhttp3
+ okhttp
+ ${okhttp.version}
+
+
+ com.squareup.okhttp3
+ logging-interceptor
+ ${okhttp.version}
+
+
+ com.squareup.okio
+ okio
+ ${okio.version}
+
+
+ com.squareup.okio
+ okio-jvm
+ ${okio.version}
+
+
+
com.github.victoolsjsonschema-generator
+ ${victools.jsonschema.version}
-
com.github.victoolsjsonschema-module-jackson
+ ${victools.jsonschema.version}
-
+
- org.apache.kyuubi
- kyuubi-common_${scala.binary.version}
- ${project.version}
- test-jar
+ org.xerial
+ sqlite-jdbc
+ ${sqlite.version}test
- org.xerial
- sqlite-jdbc
+ com.mysql
+ mysql-connector-jtest
+
- org.testcontainers
- testcontainers-mysql
+ io.trino
+ trino-jdbc
+
+
+
+
+ com.zaxxer
+ HikariCP
+
+
+
+
+ org.apache.kyuubi
+ kyuubi-common_${scala.binary.version}
+ ${project.version}
+ test-jartest
- com.mysql
- mysql-connector-j
+ org.testcontainers
+ testcontainers-mysqltest
diff --git a/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/datasource/JdbcDialect.java b/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/datasource/JdbcDialect.java
index c3be1dad61a..1b149e81d73 100644
--- a/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/datasource/JdbcDialect.java
+++ b/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/datasource/JdbcDialect.java
@@ -17,6 +17,12 @@
package org.apache.kyuubi.engine.dataagent.datasource;
+import org.apache.kyuubi.engine.dataagent.datasource.dialect.GenericDialect;
+import org.apache.kyuubi.engine.dataagent.datasource.dialect.MySQLDialect;
+import org.apache.kyuubi.engine.dataagent.datasource.dialect.SQLiteDialect;
+import org.apache.kyuubi.engine.dataagent.datasource.dialect.SparkDialect;
+import org.apache.kyuubi.engine.dataagent.datasource.dialect.TrinoDialect;
+
/**
* SQL dialect abstraction for datasource-specific SQL generation.
*
@@ -83,9 +89,9 @@ static JdbcDialect fromUrl(String jdbcUrl) {
case "trino":
return TrinoDialect.INSTANCE;
case "mysql":
- return MysqlDialect.INSTANCE;
+ return MySQLDialect.INSTANCE;
case "sqlite":
- return SqliteDialect.INSTANCE;
+ return SQLiteDialect.INSTANCE;
default:
return new GenericDialect(name);
}
diff --git a/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/datasource/GenericDialect.java b/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/datasource/dialect/GenericDialect.java
similarity index 92%
rename from externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/datasource/GenericDialect.java
rename to externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/datasource/dialect/GenericDialect.java
index 3ea22ed54e3..d8c4512de03 100644
--- a/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/datasource/GenericDialect.java
+++ b/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/datasource/dialect/GenericDialect.java
@@ -15,7 +15,9 @@
* limitations under the License.
*/
-package org.apache.kyuubi.engine.dataagent.datasource;
+package org.apache.kyuubi.engine.dataagent.datasource.dialect;
+
+import org.apache.kyuubi.engine.dataagent.datasource.JdbcDialect;
/**
* Fallback dialect for JDBC subprotocols that have no dedicated implementation. Carries the
diff --git a/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/datasource/MysqlDialect.java b/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/datasource/dialect/MySQLDialect.java
similarity index 79%
rename from externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/datasource/MysqlDialect.java
rename to externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/datasource/dialect/MySQLDialect.java
index 98747ffa30c..e4dfb27ca2e 100644
--- a/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/datasource/MysqlDialect.java
+++ b/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/datasource/dialect/MySQLDialect.java
@@ -15,14 +15,16 @@
* limitations under the License.
*/
-package org.apache.kyuubi.engine.dataagent.datasource;
+package org.apache.kyuubi.engine.dataagent.datasource.dialect;
+
+import org.apache.kyuubi.engine.dataagent.datasource.JdbcDialect;
/** MySQL dialect. Uses backtick quoting for identifiers. */
-public final class MysqlDialect implements JdbcDialect {
+public final class MySQLDialect implements JdbcDialect {
- static final MysqlDialect INSTANCE = new MysqlDialect();
+ public static final MySQLDialect INSTANCE = new MySQLDialect();
- private MysqlDialect() {}
+ private MySQLDialect() {}
@Override
public String datasourceName() {
diff --git a/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/datasource/SqliteDialect.java b/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/datasource/dialect/SQLiteDialect.java
similarity index 79%
rename from externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/datasource/SqliteDialect.java
rename to externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/datasource/dialect/SQLiteDialect.java
index a53255a9c67..aa1d7db8d23 100644
--- a/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/datasource/SqliteDialect.java
+++ b/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/datasource/dialect/SQLiteDialect.java
@@ -15,14 +15,16 @@
* limitations under the License.
*/
-package org.apache.kyuubi.engine.dataagent.datasource;
+package org.apache.kyuubi.engine.dataagent.datasource.dialect;
+
+import org.apache.kyuubi.engine.dataagent.datasource.JdbcDialect;
/** SQLite dialect. Uses double-quote quoting for identifiers. */
-public final class SqliteDialect implements JdbcDialect {
+public final class SQLiteDialect implements JdbcDialect {
- static final SqliteDialect INSTANCE = new SqliteDialect();
+ public static final SQLiteDialect INSTANCE = new SQLiteDialect();
- private SqliteDialect() {}
+ private SQLiteDialect() {}
@Override
public String datasourceName() {
diff --git a/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/datasource/SparkDialect.java b/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/datasource/dialect/SparkDialect.java
similarity index 85%
rename from externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/datasource/SparkDialect.java
rename to externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/datasource/dialect/SparkDialect.java
index 3adb43fa398..34e20034bfb 100644
--- a/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/datasource/SparkDialect.java
+++ b/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/datasource/dialect/SparkDialect.java
@@ -15,12 +15,14 @@
* limitations under the License.
*/
-package org.apache.kyuubi.engine.dataagent.datasource;
+package org.apache.kyuubi.engine.dataagent.datasource.dialect;
+
+import org.apache.kyuubi.engine.dataagent.datasource.JdbcDialect;
/** Spark SQL dialect. Uses backtick quoting for identifiers. */
public final class SparkDialect implements JdbcDialect {
- static final SparkDialect INSTANCE = new SparkDialect();
+ public static final SparkDialect INSTANCE = new SparkDialect();
private SparkDialect() {}
diff --git a/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/datasource/TrinoDialect.java b/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/datasource/dialect/TrinoDialect.java
similarity index 85%
rename from externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/datasource/TrinoDialect.java
rename to externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/datasource/dialect/TrinoDialect.java
index edacf2f87e2..75fbd4bb242 100644
--- a/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/datasource/TrinoDialect.java
+++ b/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/datasource/dialect/TrinoDialect.java
@@ -15,12 +15,14 @@
* limitations under the License.
*/
-package org.apache.kyuubi.engine.dataagent.datasource;
+package org.apache.kyuubi.engine.dataagent.datasource.dialect;
+
+import org.apache.kyuubi.engine.dataagent.datasource.JdbcDialect;
/** Trino SQL dialect. Uses double-quote quoting for identifiers. */
public final class TrinoDialect implements JdbcDialect {
- static final TrinoDialect INSTANCE = new TrinoDialect();
+ public static final TrinoDialect INSTANCE = new TrinoDialect();
private TrinoDialect() {}
diff --git a/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/provider/ProviderRunRequest.java b/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/provider/ProviderRunRequest.java
index f4e40b2fae8..26ad8be77fb 100644
--- a/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/provider/ProviderRunRequest.java
+++ b/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/provider/ProviderRunRequest.java
@@ -17,13 +17,23 @@
package org.apache.kyuubi.engine.dataagent.provider;
+import org.apache.kyuubi.engine.dataagent.runtime.ApprovalMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* User-facing request parameters for a provider-level agent invocation. Only contains fields from
* the caller (question, model override, etc.). Adding new per-request options does not require
* changing the {@link DataAgentProvider} interface.
+ *
+ *
The approval mode is accepted as a raw string (natural for config-driven callers) and parsed
+ * into {@link ApprovalMode} by {@link #getApprovalMode()}. Unrecognised values fall back to {@link
+ * ApprovalMode#NORMAL} with a warning.
*/
public class ProviderRunRequest {
+ private static final Logger LOG = LoggerFactory.getLogger(ProviderRunRequest.class);
+
private final String question;
private String modelName;
private String approvalMode;
@@ -45,8 +55,20 @@ public ProviderRunRequest modelName(String modelName) {
return this;
}
- public String getApprovalMode() {
- return approvalMode;
+ /**
+ * Resolved approval mode. Returns {@link ApprovalMode#NORMAL} when the caller did not set one or
+ * supplied an unknown value.
+ */
+ public ApprovalMode getApprovalMode() {
+ if (approvalMode == null || approvalMode.isEmpty()) {
+ return ApprovalMode.NORMAL;
+ }
+ try {
+ return ApprovalMode.valueOf(approvalMode.toUpperCase());
+ } catch (IllegalArgumentException e) {
+ LOG.warn("Unknown approval mode '{}', using default NORMAL", approvalMode);
+ return ApprovalMode.NORMAL;
+ }
}
public ProviderRunRequest approvalMode(String approvalMode) {
diff --git a/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/provider/chatcompletion/ChatCompletionProvider.java b/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/provider/chatcompletion/ChatCompletionProvider.java
new file mode 100644
index 00000000000..b9bb30edb7e
--- /dev/null
+++ b/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/provider/chatcompletion/ChatCompletionProvider.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.dataagent.provider.chatcompletion;
+
+import com.openai.client.OpenAIClient;
+import com.openai.client.okhttp.OpenAIOkHttpClient;
+import com.zaxxer.hikari.HikariDataSource;
+import java.time.Duration;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
+import javax.sql.DataSource;
+import org.apache.kyuubi.config.KyuubiConf;
+import org.apache.kyuubi.config.KyuubiReservedKeys;
+import org.apache.kyuubi.engine.dataagent.datasource.DataSourceFactory;
+import org.apache.kyuubi.engine.dataagent.datasource.JdbcDialect;
+import org.apache.kyuubi.engine.dataagent.prompt.SystemPromptBuilder;
+import org.apache.kyuubi.engine.dataagent.provider.DataAgentProvider;
+import org.apache.kyuubi.engine.dataagent.provider.ProviderRunRequest;
+import org.apache.kyuubi.engine.dataagent.runtime.AgentInvocation;
+import org.apache.kyuubi.engine.dataagent.runtime.ConversationMemory;
+import org.apache.kyuubi.engine.dataagent.runtime.ReactAgent;
+import org.apache.kyuubi.engine.dataagent.runtime.event.AgentEvent;
+import org.apache.kyuubi.engine.dataagent.runtime.middleware.ApprovalMiddleware;
+import org.apache.kyuubi.engine.dataagent.runtime.middleware.CompactionMiddleware;
+import org.apache.kyuubi.engine.dataagent.runtime.middleware.LoggingMiddleware;
+import org.apache.kyuubi.engine.dataagent.runtime.middleware.ToolResultOffloadMiddleware;
+import org.apache.kyuubi.engine.dataagent.tool.ToolRegistry;
+import org.apache.kyuubi.engine.dataagent.tool.sql.RunMutationQueryTool;
+import org.apache.kyuubi.engine.dataagent.tool.sql.RunSelectQueryTool;
+import org.apache.kyuubi.engine.dataagent.util.ConfUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An OpenAI-compatible provider that wires up the full ReactAgent with streaming LLM, tools, and
+ * middleware pipeline. Uses the official OpenAI Java SDK.
+ *
+ *
The ReactAgent, DataSource, and ToolRegistry are shared across all sessions within this engine
+ * instance. Each session only maintains its own {@link ConversationMemory}. This works because each
+ * engine is bound to one user + one datasource, so all sessions within the engine naturally share
+ * the same data connection.
+ */
+public class ChatCompletionProvider implements DataAgentProvider {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ChatCompletionProvider.class);
+
+ private final ReactAgent agent;
+ private final ToolRegistry toolRegistry;
+ private final DataSource dataSource;
+ private final OpenAIClient client;
+ private final ConcurrentHashMap sessions = new ConcurrentHashMap<>();
+
+ public ChatCompletionProvider(KyuubiConf conf) {
+ String apiKey = ConfUtils.requireString(conf, KyuubiConf.ENGINE_DATA_AGENT_OPENAI_API_KEY());
+ String baseUrl = ConfUtils.requireString(conf, KyuubiConf.ENGINE_DATA_AGENT_OPENAI_ENDPOINT());
+ String modelName = ConfUtils.requireString(conf, KyuubiConf.ENGINE_DATA_AGENT_MODEL());
+
+ int maxIterations = ConfUtils.intConf(conf, KyuubiConf.ENGINE_DATA_AGENT_MAX_ITERATIONS());
+ long compactionTriggerTokens =
+ ConfUtils.longConf(conf, KyuubiConf.ENGINE_DATA_AGENT_COMPACTION_TRIGGER_TOKENS());
+ int queryTimeoutSeconds =
+ (int) ConfUtils.millisAsSeconds(conf, KyuubiConf.ENGINE_DATA_AGENT_QUERY_TIMEOUT());
+ long toolCallTimeoutSeconds =
+ ConfUtils.millisAsSeconds(conf, KyuubiConf.ENGINE_DATA_AGENT_TOOL_CALL_TIMEOUT());
+
+ this.client =
+ OpenAIOkHttpClient.builder()
+ .apiKey(apiKey)
+ .baseUrl(baseUrl)
+ .maxRetries(3)
+ .timeout(Duration.ofSeconds(180))
+ .build();
+
+ this.toolRegistry = new ToolRegistry(toolCallTimeoutSeconds);
+
+ SystemPromptBuilder promptBuilder = SystemPromptBuilder.create();
+ this.dataSource = attachJdbcDataSource(conf, toolRegistry, promptBuilder, queryTimeoutSeconds);
+
+ this.agent =
+ ReactAgent.builder()
+ .client(client)
+ .modelName(modelName)
+ .toolRegistry(toolRegistry)
+ .addMiddleware(new ToolResultOffloadMiddleware())
+ .addMiddleware(new LoggingMiddleware())
+ .addMiddleware(new CompactionMiddleware(client, modelName, compactionTriggerTokens))
+ .addMiddleware(new ApprovalMiddleware())
+ .maxIterations(maxIterations)
+ .systemPrompt(promptBuilder.build())
+ .build();
+ }
+
+ /**
+ * Register JDBC-backed SQL tools if a JDBC URL is configured. Returns the created {@link
+ * DataSource} so the provider can close it on shutdown, or {@code null} when no JDBC is wired.
+ */
+ private static DataSource attachJdbcDataSource(
+ KyuubiConf conf,
+ ToolRegistry registry,
+ SystemPromptBuilder promptBuilder,
+ int queryTimeoutSeconds) {
+ String jdbcUrl = ConfUtils.optionalString(conf, KyuubiConf.ENGINE_DATA_AGENT_JDBC_URL());
+ if (jdbcUrl == null) {
+ return null;
+ }
+ LOG.info("Data Agent JDBC URL configured ({})", jdbcUrl.replaceAll("//.*@", "//@"));
+
+ String sessionUser =
+ ConfUtils.optionalString(conf, KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY());
+
+ DataSource ds = DataSourceFactory.create(jdbcUrl, sessionUser);
+ registry.register(new RunSelectQueryTool(ds, queryTimeoutSeconds));
+ registry.register(new RunMutationQueryTool(ds, queryTimeoutSeconds));
+ promptBuilder.datasource(JdbcDialect.fromUrl(jdbcUrl).datasourceName());
+ return ds;
+ }
+
+ @Override
+ public void open(String sessionId, String user) {
+ sessions.put(sessionId, new ConversationMemory());
+ LOG.info("Opened Data Agent session {} for user {}", sessionId, user);
+ }
+
+ @Override
+ public void run(String sessionId, ProviderRunRequest request, Consumer onEvent) {
+ ConversationMemory memory = sessions.get(sessionId);
+ if (memory == null) {
+ throw new IllegalStateException("No open Data Agent session for id=" + sessionId);
+ }
+
+ AgentInvocation invocation =
+ new AgentInvocation(request.getQuestion())
+ .modelName(request.getModelName())
+ .approvalMode(request.getApprovalMode())
+ .sessionId(sessionId);
+ agent.run(invocation, memory, onEvent);
+ }
+
+ @Override
+ public boolean resolveApproval(String requestId, boolean approved) {
+ return agent.resolveApproval(requestId, approved);
+ }
+
+ @Override
+ public void close(String sessionId) {
+ sessions.remove(sessionId);
+ agent.closeSession(sessionId);
+ LOG.info("Closed Data Agent session {}", sessionId);
+ }
+
+ @Override
+ public void stop() {
+ agent.stop();
+ toolRegistry.close();
+ if (dataSource instanceof HikariDataSource) {
+ ((HikariDataSource) dataSource).close();
+ LOG.info("Closed Data Agent connection pool");
+ }
+ client.close();
+ }
+}
diff --git a/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/AgentInvocation.java b/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/AgentInvocation.java
new file mode 100644
index 00000000000..0695d556058
--- /dev/null
+++ b/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/AgentInvocation.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.dataagent.runtime;
+
+import java.util.Objects;
+
+/**
+ * User-facing request parameters for a single agent invocation. Only contains fields that come from
+ * the caller (question, model override, etc.). Framework-level concerns like memory and event
+ * consumer are separate method parameters.
+ *
+ *
Adding new per-request options (e.g. temperature, maxTokens) does not require changing the
+ * {@code ReactAgent.run()} signature.
+ */
+public class AgentInvocation {
+
+ private final String userInput;
+ private String modelName;
+ private ApprovalMode approvalMode = ApprovalMode.NORMAL;
+ private String sessionId;
+
+ public AgentInvocation(String userInput) {
+ this.userInput = Objects.requireNonNull(userInput, "userInput must not be null");
+ }
+
+ public String getUserInput() {
+ return userInput;
+ }
+
+ public String getModelName() {
+ return modelName;
+ }
+
+ public AgentInvocation modelName(String modelName) {
+ this.modelName = modelName;
+ return this;
+ }
+
+ public ApprovalMode getApprovalMode() {
+ return approvalMode;
+ }
+
+ public AgentInvocation approvalMode(ApprovalMode approvalMode) {
+ this.approvalMode = approvalMode;
+ return this;
+ }
+
+ public String getSessionId() {
+ return sessionId;
+ }
+
+ /** Upstream session id, propagated into {@link AgentRunContext#getSessionId()}. */
+ public AgentInvocation sessionId(String sessionId) {
+ this.sessionId = sessionId;
+ return this;
+ }
+}
diff --git a/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/AgentRunContext.java b/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/AgentRunContext.java
new file mode 100644
index 00000000000..e7c92df8033
--- /dev/null
+++ b/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/AgentRunContext.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.dataagent.runtime;
+
+import java.util.function.Consumer;
+import org.apache.kyuubi.engine.dataagent.runtime.event.AgentEvent;
+
+/**
+ * Mutable context passed through the middleware pipeline and agent loop. Tracks the current state
+ * of agent execution including iteration count, token usage, and custom middleware state.
+ */
+public class AgentRunContext {
+
+ private final ConversationMemory memory;
+ private final String sessionId;
+ private Consumer eventEmitter;
+ private int iteration;
+ private long promptTokens;
+ private long completionTokens;
+ private long totalTokens;
+ private ApprovalMode approvalMode;
+
+ public AgentRunContext(ConversationMemory memory, ApprovalMode approvalMode) {
+ this(memory, approvalMode, null);
+ }
+
+ public AgentRunContext(ConversationMemory memory, ApprovalMode approvalMode, String sessionId) {
+ this.memory = memory;
+ this.iteration = 0;
+ this.approvalMode = approvalMode;
+ this.sessionId = sessionId;
+ }
+
+ public ConversationMemory getMemory() {
+ return memory;
+ }
+
+ /**
+ * The upstream session identifier this run belongs to. Threaded down from {@code
+ * DataAgentProvider.run(sessionId, ...)}. May be {@code null} in unit tests that do not exercise
+ * session-scoped middleware.
+ */
+ public String getSessionId() {
+ return sessionId;
+ }
+
+ public int getIteration() {
+ return iteration;
+ }
+
+ public void setIteration(int iteration) {
+ this.iteration = iteration;
+ }
+
+ public long getPromptTokens() {
+ return promptTokens;
+ }
+
+ public long getCompletionTokens() {
+ return completionTokens;
+ }
+
+ public long getTotalTokens() {
+ return totalTokens;
+ }
+
+ /**
+ * Record one LLM call's usage. Updates both the per-run counters on this context and the
+ * session-level cumulative on the underlying {@link ConversationMemory}, so middlewares that need
+ * a session-wide picture can read it directly from memory without keeping their own bookkeeping.
+ */
+ public void addTokenUsage(long prompt, long completion, long total) {
+ this.promptTokens += prompt;
+ this.completionTokens += completion;
+ this.totalTokens += total;
+ memory.addCumulativeTokens(prompt, completion, total);
+ }
+
+ public ApprovalMode getApprovalMode() {
+ return approvalMode;
+ }
+
+ public void setApprovalMode(ApprovalMode approvalMode) {
+ this.approvalMode = approvalMode;
+ }
+
+ public void setEventEmitter(Consumer eventEmitter) {
+ this.eventEmitter = eventEmitter;
+ }
+
+ /** Emit an event through the agent's event pipeline. Available for use by middlewares. */
+ public void emit(AgentEvent event) {
+ if (eventEmitter != null) {
+ eventEmitter.accept(event);
+ }
+ }
+}
diff --git a/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/ApprovalMode.java b/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/ApprovalMode.java
new file mode 100644
index 00000000000..57bc20bc2bb
--- /dev/null
+++ b/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/ApprovalMode.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.dataagent.runtime;
+
+/** Approval modes for tool execution in the Data Agent engine. */
+public enum ApprovalMode {
+ /** All tools require explicit user approval. */
+ STRICT,
+ /** Only non-readonly tools require approval. */
+ NORMAL,
+ /** All tools are auto-approved. */
+ AUTO_APPROVE
+}
diff --git a/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/ConversationMemory.java b/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/ConversationMemory.java
new file mode 100644
index 00000000000..0bfae26ec27
--- /dev/null
+++ b/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/ConversationMemory.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.dataagent.runtime;
+
+import com.openai.models.chat.completions.ChatCompletionAssistantMessageParam;
+import com.openai.models.chat.completions.ChatCompletionMessageParam;
+import com.openai.models.chat.completions.ChatCompletionSystemMessageParam;
+import com.openai.models.chat.completions.ChatCompletionToolMessageParam;
+import com.openai.models.chat.completions.ChatCompletionUserMessageParam;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Manages conversation history for a Data Agent session. Ensures tool result messages are never
+ * orphaned from their corresponding AI messages.
+ *
+ *
Each instance is session-scoped and accessed sequentially within a single ReAct loop — no
+ * synchronization is needed. Cross-session concurrency is handled by the provider's session map.
+ */
+public class ConversationMemory {
+
+ /**
+ * System prompt prepended to every LLM call built from this memory. Rebuilt by the provider on
+ * each invocation (datasource/tool metadata can change between turns), so it lives outside the
+ * {@link #messages} list rather than being inserted as the first entry. May be {@code null} until
+ * the first {@link #setSystemPrompt} call — {@link #buildLlmMessages()} will simply omit the
+ * system slot in that case.
+ */
+ private String systemPrompt;
+
+ /**
+ * The raw content of the most recent user message added via {@link #addUserMessage}. Cached
+ * separately from {@link #messages} so middleware and callers can recover the current turn's
+ * question after compaction rewrites history. Not used by the LLM call path itself.
+ */
+ private String lastUserInput;
+
+ /**
+ * The ordered conversation history: user / assistant / tool messages, in the order the LLM will
+ * see them. The system prompt is intentionally NOT stored here (see {@link #systemPrompt}).
+ * Mutated in place by {@link #replaceHistory} during compaction; otherwise append-only.
+ */
+ private final List messages = new ArrayList<>();
+
+ /**
+ * Session-level running total of {@code prompt_tokens} reported by every LLM call on this
+ * conversation (across ReAct turns). Intended for billing, quota, and observability — not used by
+ * any runtime decision. Updated via {@link #addCumulativeTokens}.
+ */
+ private long cumulativePromptTokens;
+
+ /**
+ * Session-level running total of {@code completion_tokens}. See {@link #cumulativePromptTokens}.
+ */
+ private long cumulativeCompletionTokens;
+
+ /**
+ * Session-level running total of {@code total_tokens} (prompt + completion as reported by the
+ * provider — not necessarily the sum of the two counters above, since providers may count
+ * cached/reasoning tokens differently). See {@link #cumulativePromptTokens}.
+ */
+ private long cumulativeTotalTokens;
+
+ /**
+ * The {@code total_tokens} reported by the single most recent LLM call, or {@code 0} if no call
+ * has completed yet. Distinct from the cumulative counters: this is a snapshot, overwritten every
+ * call. Used by {@link
+ * org.apache.kyuubi.engine.dataagent.runtime.middleware.CompactionMiddleware} to estimate the
+ * next prompt size (the last response becomes part of the next prompt, so the next call's prompt
+ * is at least {@code lastTotalTokens}). Persists across ReAct turns until the next call
+ * overwrites it.
+ */
+ private long lastTotalTokens;
+
+ public ConversationMemory() {}
+
+ public String getSystemPrompt() {
+ return systemPrompt;
+ }
+
+ public void setSystemPrompt(String prompt) {
+ this.systemPrompt = prompt;
+ }
+
+ public void addUserMessage(String content) {
+ this.lastUserInput = content;
+ messages.add(
+ ChatCompletionMessageParam.ofUser(
+ ChatCompletionUserMessageParam.builder().content(content).build()));
+ }
+
+ public String getLastUserInput() {
+ return lastUserInput;
+ }
+
+ public void addAssistantMessage(ChatCompletionAssistantMessageParam message) {
+ messages.add(ChatCompletionMessageParam.ofAssistant(message));
+ }
+
+ public void addToolResult(String toolCallId, String content) {
+ messages.add(
+ ChatCompletionMessageParam.ofTool(
+ ChatCompletionToolMessageParam.builder()
+ .toolCallId(toolCallId)
+ .content(content)
+ .build()));
+ }
+
+ /**
+ * Build the full message list for LLM API invocation: [system prompt] + conversation history.
+ *
+ *
No windowing is applied — callers are responsible for managing context length (e.g. via a
+ * token-based truncation strategy).
+ *
+ * @see #getHistory() for history-only access without system prompt
+ */
+ public List buildLlmMessages() {
+ List result = new ArrayList<>(messages.size() + 1);
+ if (systemPrompt != null) {
+ result.add(
+ ChatCompletionMessageParam.ofSystem(
+ ChatCompletionSystemMessageParam.builder().content(systemPrompt).build()));
+ }
+ result.addAll(messages);
+ return Collections.unmodifiableList(result);
+ }
+
+ /**
+ * Returns the conversation history (user, assistant, tool messages) without the system prompt.
+ * Useful for middleware that needs to inspect or compact history.
+ */
+ public List getHistory() {
+ return Collections.unmodifiableList(new ArrayList<>(messages));
+ }
+
+ /**
+ * Replace the conversation history with a compacted version. Useful for context-length management
+ * strategies (e.g., summarizing older messages).
+ *
+ *
Also clears {@link #lastTotalTokens}: the prior snapshot referred to a prompt whose bulk we
+ * just discarded, so it no longer describes anything in memory. Leaving it stale would keep the
+ * compaction trigger armed until the next successful LLM call overwrites it — fine on the happy
+ * path, but if that call fails the next turn would re-enter compaction against already-compacted
+ * history. Zeroing means "unknown, wait for the next real usage report". Cumulative totals are
+ * intentionally preserved (session-level accounting, must not regress on internal compaction).
+ */
+ public void replaceHistory(List compacted) {
+ messages.clear();
+ messages.addAll(compacted);
+ this.lastTotalTokens = 0;
+ }
+
+ public void clear() {
+ messages.clear();
+ }
+
+ public int size() {
+ return messages.size();
+ }
+
+ public long getCumulativePromptTokens() {
+ return cumulativePromptTokens;
+ }
+
+ public long getCumulativeCompletionTokens() {
+ return cumulativeCompletionTokens;
+ }
+
+ public long getCumulativeTotalTokens() {
+ return cumulativeTotalTokens;
+ }
+
+ public long getLastTotalTokens() {
+ return lastTotalTokens;
+ }
+
+ /** Add one LLM call's usage to the session cumulative. Intended for {@link AgentRunContext}. */
+ public void addCumulativeTokens(long prompt, long completion, long total) {
+ this.cumulativePromptTokens += prompt;
+ this.cumulativeCompletionTokens += completion;
+ this.cumulativeTotalTokens += total;
+ this.lastTotalTokens = total;
+ }
+}
diff --git a/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/LlmStreamClient.java b/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/LlmStreamClient.java
new file mode 100644
index 00000000000..8d8d6494aaf
--- /dev/null
+++ b/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/LlmStreamClient.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.dataagent.runtime;
+
+import com.openai.client.OpenAIClient;
+import com.openai.core.http.StreamResponse;
+import com.openai.models.chat.completions.ChatCompletionAssistantMessageParam;
+import com.openai.models.chat.completions.ChatCompletionChunk;
+import com.openai.models.chat.completions.ChatCompletionCreateParams;
+import com.openai.models.chat.completions.ChatCompletionMessageFunctionToolCall;
+import com.openai.models.chat.completions.ChatCompletionMessageParam;
+import com.openai.models.chat.completions.ChatCompletionMessageToolCall;
+import com.openai.models.chat.completions.ChatCompletionStreamOptions;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.kyuubi.engine.dataagent.runtime.event.ContentDelta;
+import org.apache.kyuubi.engine.dataagent.tool.ToolRegistry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Streams one chat completion call and assembles assistant content plus streamed tool calls. */
+final class LlmStreamClient {
+
+ private static final Logger LOG = LoggerFactory.getLogger(LlmStreamClient.class);
+
+ private final OpenAIClient client;
+ private final ToolRegistry toolRegistry;
+
+ LlmStreamClient(OpenAIClient client, ToolRegistry toolRegistry) {
+ this.client = client;
+ this.toolRegistry = toolRegistry;
+ }
+
+ /**
+ * Stream LLM response, emitting ContentDelta through {@code ctx} for each text chunk. Assembles
+ * tool calls directly from streamed chunks with no non-streaming fallback.
+ */
+ StreamResult stream(
+ AgentRunContext ctx, List messages, String effectiveModel) {
+ ChatCompletionCreateParams.Builder paramsBuilder =
+ ChatCompletionCreateParams.builder()
+ .model(effectiveModel)
+ .streamOptions(ChatCompletionStreamOptions.builder().includeUsage(true).build());
+ for (ChatCompletionMessageParam msg : messages) {
+ paramsBuilder.addMessage(msg);
+ }
+ toolRegistry.addToolsTo(paramsBuilder);
+
+ LOG.info("LLM request: model={}", effectiveModel);
+ StreamAccumulator acc = new StreamAccumulator();
+ try (StreamResponse stream =
+ client.chat().completions().createStreaming(paramsBuilder.build())) {
+ stream.stream().forEach(chunk -> consumeChunk(ctx, chunk, acc));
+ }
+ return new StreamResult(acc.content.toString(), acc.buildToolCalls());
+ }
+
+ /** Fold one streaming chunk into {@code acc}, emitting per-token {@link ContentDelta}s. */
+ private void consumeChunk(AgentRunContext ctx, ChatCompletionChunk chunk, StreamAccumulator acc) {
+ if (!acc.serverModelLogged) {
+ LOG.info("LLM response: server-echoed model={}", chunk.model());
+ acc.serverModelLogged = true;
+ }
+ chunk
+ .usage()
+ .ifPresent(u -> ctx.addTokenUsage(u.promptTokens(), u.completionTokens(), u.totalTokens()));
+
+ for (ChatCompletionChunk.Choice c : chunk.choices()) {
+ c.delta()
+ .content()
+ .ifPresent(
+ text -> {
+ acc.content.append(text);
+ ctx.emit(new ContentDelta(text));
+ });
+ c.delta().toolCalls().ifPresent(acc::mergeToolCallDeltas);
+ }
+ }
+
+ /**
+ * Mutable accumulator for a single streaming LLM turn. Tool call fields are keyed by the chunk's
+ * {@code index} because provider SDKs may deliver a single logical call across multiple chunks
+ * and only surface the {@code id}/{@code name} on the first one.
+ */
+ private static final class StreamAccumulator {
+ final StringBuilder content = new StringBuilder();
+ final Map toolCallIds = new HashMap<>();
+ final Map toolCallNames = new HashMap<>();
+ final Map toolCallArgs = new HashMap<>();
+ boolean serverModelLogged = false;
+
+ void mergeToolCallDeltas(List deltas) {
+ for (ChatCompletionChunk.Choice.Delta.ToolCall tc : deltas) {
+ int idx = (int) tc.index();
+ tc.id().ifPresent(id -> toolCallIds.put(idx, id));
+ tc.function()
+ .ifPresent(
+ fn -> {
+ fn.name().ifPresent(name -> toolCallNames.put(idx, name));
+ fn.arguments()
+ .ifPresent(
+ args ->
+ toolCallArgs
+ .computeIfAbsent(idx, k -> new StringBuilder())
+ .append(args));
+ });
+ }
+ }
+
+ /**
+ * Materialize accumulated deltas into SDK tool-call objects. Returns {@code null} (not an empty
+ * list) if no tool calls were seen, matching the existing {@link StreamResult} contract.
+ */
+ List buildToolCalls() {
+ if (toolCallIds.isEmpty()) return null;
+ List out = new ArrayList<>(toolCallIds.size());
+ for (Map.Entry e : toolCallIds.entrySet()) {
+ int idx = e.getKey();
+ String id = (e.getValue() == null || e.getValue().isEmpty()) ? synthId() : e.getValue();
+ String args = toolCallArgs.containsKey(idx) ? toolCallArgs.get(idx).toString() : "{}";
+ out.add(
+ ChatCompletionMessageToolCall.ofFunction(
+ ChatCompletionMessageFunctionToolCall.builder()
+ .id(id)
+ .function(
+ ChatCompletionMessageFunctionToolCall.Function.builder()
+ .name(toolCallNames.getOrDefault(idx, ""))
+ .arguments(args)
+ .build())
+ .build()));
+ }
+ return out;
+ }
+
+ /**
+ * Synthesize an id for tool calls whose id never arrived on the stream (some OpenAI-compatible
+ * providers omit it). The id has to be stable within a turn and unique across turns so the
+ * assistant/tool_result pairing downstream holds.
+ */
+ private static String synthId() {
+ return "local_" + java.util.UUID.randomUUID().toString().replace("-", "").substring(0, 24);
+ }
+ }
+
+ /** Result of a streaming LLM call, assembled from chunks. */
+ static final class StreamResult {
+ final String content;
+ final List toolCalls;
+
+ StreamResult(String content, List toolCalls) {
+ this.content = content;
+ this.toolCalls = toolCalls;
+ }
+
+ boolean isEmpty() {
+ return content.isEmpty() && (toolCalls == null || toolCalls.isEmpty());
+ }
+
+ /** Build the SDK assistant message corresponding to this streamed result. */
+ ChatCompletionAssistantMessageParam toAssistantMessage() {
+ ChatCompletionAssistantMessageParam.Builder b = ChatCompletionAssistantMessageParam.builder();
+ if (!content.isEmpty()) {
+ b.content(content);
+ }
+ if (toolCalls != null && !toolCalls.isEmpty()) {
+ b.toolCalls(toolCalls);
+ }
+ return b.build();
+ }
+ }
+}
diff --git a/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/MiddlewareDispatcher.java b/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/MiddlewareDispatcher.java
new file mode 100644
index 00000000000..9e406e40e3c
--- /dev/null
+++ b/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/MiddlewareDispatcher.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.dataagent.runtime;
+
+import com.openai.models.chat.completions.ChatCompletionAssistantMessageParam;
+import com.openai.models.chat.completions.ChatCompletionMessageParam;
+import java.util.List;
+import org.apache.kyuubi.engine.dataagent.runtime.event.AgentEvent;
+import org.apache.kyuubi.engine.dataagent.runtime.middleware.AgentMiddleware;
+import org.apache.kyuubi.engine.dataagent.runtime.middleware.ApprovalMiddleware;
+import org.apache.kyuubi.engine.dataagent.runtime.middleware.Decision;
+import org.apache.kyuubi.engine.dataagent.runtime.middleware.ToolInvocation;
+import org.apache.kyuubi.engine.dataagent.tool.ToolRegistry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Composite {@link AgentMiddleware} — folds a list of middlewares into one. Hook ordering follows
+ * the onion model: {@code before*} / {@code on*Start} run first-to-last, {@code after*} / {@code
+ * on*Finish} run last-to-first.
+ *
+ *
Component middlewares are internal framework code. If one throws during ordinary hook
+ * dispatch, the agent run fails via {@link ReactAgent#run}; lifecycle cleanup hooks ({@link
+ * #onAgentFinish}, {@link #onSessionClose}, {@link #onStop}) swallow exceptions so later
+ * middlewares still get a chance to release state.
+ */
+final class MiddlewareDispatcher implements AgentMiddleware {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MiddlewareDispatcher.class);
+
+ private final List middlewares;
+ private final ApprovalMiddleware approvalMiddleware;
+
+ MiddlewareDispatcher(List middlewares) {
+ this.middlewares = middlewares;
+ this.approvalMiddleware = findApprovalMiddleware(middlewares);
+ }
+
+ /**
+ * Resolve a pending approval request. Not part of {@link AgentMiddleware} — special accessor for
+ * the approval flow.
+ */
+ boolean resolveApproval(String requestId, boolean approved) {
+ if (approvalMiddleware == null) return false;
+ return approvalMiddleware.resolve(requestId, approved);
+ }
+
+ @Override
+ public void onRegister(ToolRegistry registry) {
+ for (AgentMiddleware mw : middlewares) {
+ mw.onRegister(registry);
+ }
+ }
+
+ @Override
+ public void onAgentStart(AgentRunContext ctx) {
+ for (AgentMiddleware mw : middlewares) {
+ mw.onAgentStart(ctx);
+ }
+ }
+
+ @Override
+ public void onAgentFinish(AgentRunContext ctx) {
+ // Runs even when the agent body threw, so swallow here to ensure every middleware's cleanup
+ // gets a chance to run; otherwise we'd leak session state in later middlewares.
+ for (int i = middlewares.size() - 1; i >= 0; i--) {
+ try {
+ middlewares.get(i).onAgentFinish(ctx);
+ } catch (Exception e) {
+ LOG.warn("Middleware onAgentFinish error", e);
+ }
+ }
+ }
+
+ @Override
+ public void onSessionClose(String sessionId) {
+ for (AgentMiddleware mw : middlewares) {
+ try {
+ mw.onSessionClose(sessionId);
+ } catch (Exception e) {
+ LOG.warn("Middleware onSessionClose error", e);
+ }
+ }
+ }
+
+ @Override
+ public void onStop() {
+ for (AgentMiddleware mw : middlewares) {
+ try {
+ mw.onStop();
+ } catch (Exception e) {
+ LOG.warn("Middleware onStop error", e);
+ }
+ }
+ }
+
+ /**
+ * Fold {@code onEvent} in onion order. Returns PROCEED if untouched, REPLACE with the final event
+ * if any middleware rewrote it, or ABORT if any short-circuited.
+ */
+ @Override
+ public Decision onEvent(AgentRunContext ctx, AgentEvent event) {
+ AgentEvent current = event;
+ for (AgentMiddleware mw : middlewares) {
+ Decision d = mw.onEvent(ctx, current);
+ if (d.kind() == Decision.Kind.ABORT) return d;
+ if (d.kind() == Decision.Kind.REPLACE) current = d.replacement();
+ }
+ return Decision.of(event, current);
+ }
+
+ /**
+ * Fold {@code beforeLlmCall} in onion order so later middlewares see rewritten messages. Returns
+ * PROCEED if untouched, REPLACE with the final value if any did, or ABORT if any short-circuited.
+ */
+ @Override
+ public Decision> beforeLlmCall(
+ AgentRunContext ctx, List messages) {
+ List current = messages;
+ for (AgentMiddleware mw : middlewares) {
+ Decision> d = mw.beforeLlmCall(ctx, current);
+ if (d.kind() == Decision.Kind.ABORT) return d;
+ if (d.kind() == Decision.Kind.REPLACE) current = d.replacement();
+ }
+ return Decision.of(messages, current);
+ }
+
+ /**
+ * Fold {@code afterLlmCall} in reverse onion order so earlier middlewares see rewritten
+ * responses. Returns the final response, or ABORT if any middleware short-circuits.
+ */
+ @Override
+ public Decision afterLlmCall(
+ AgentRunContext ctx, ChatCompletionAssistantMessageParam response) {
+ ChatCompletionAssistantMessageParam current = response;
+ for (int i = middlewares.size() - 1; i >= 0; i--) {
+ Decision d =
+ middlewares.get(i).afterLlmCall(ctx, current);
+ if (d.kind() == Decision.Kind.ABORT) return d;
+ if (d.kind() == Decision.Kind.REPLACE) current = d.replacement();
+ }
+ return Decision.of(response, current);
+ }
+
+ /**
+ * Fold {@code beforeToolCall} in onion order so later middlewares can further rewrite. Returns
+ * PROCEED if untouched, REPLACE with the final invocation otherwise, or ABORT if any middleware
+ * denies the call.
+ */
+ @Override
+ public Decision beforeToolCall(AgentRunContext ctx, ToolInvocation call) {
+ ToolInvocation current = call;
+ for (AgentMiddleware mw : middlewares) {
+ Decision d = mw.beforeToolCall(ctx, current);
+ if (d.kind() == Decision.Kind.ABORT) return d;
+ if (d.kind() == Decision.Kind.REPLACE) current = d.replacement();
+ }
+ return Decision.of(call, current);
+ }
+
+ /**
+ * Fold {@code afterToolCall} in reverse onion order so earlier middlewares see rewritten results.
+ * Returns the final result, or ABORT if any middleware short-circuits — caller decides how to
+ * surface the abort (typically: use {@code reason()} as the result text the LLM sees).
+ */
+ @Override
+ public Decision afterToolCall(AgentRunContext ctx, ToolInvocation call, String result) {
+ String current = result;
+ for (int i = middlewares.size() - 1; i >= 0; i--) {
+ Decision d = middlewares.get(i).afterToolCall(ctx, call, current);
+ if (d.kind() == Decision.Kind.ABORT) return d;
+ if (d.kind() == Decision.Kind.REPLACE) current = d.replacement();
+ }
+ return Decision.of(result, current);
+ }
+
+ private static ApprovalMiddleware findApprovalMiddleware(List middlewares) {
+ for (AgentMiddleware mw : middlewares) {
+ if (mw instanceof ApprovalMiddleware) return (ApprovalMiddleware) mw;
+ }
+ return null;
+ }
+}
diff --git a/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/ReactAgent.java b/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/ReactAgent.java
new file mode 100644
index 00000000000..b5138e9c661
--- /dev/null
+++ b/externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/runtime/ReactAgent.java
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.dataagent.runtime;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.openai.client.OpenAIClient;
+import com.openai.models.chat.completions.ChatCompletionAssistantMessageParam;
+import com.openai.models.chat.completions.ChatCompletionMessageFunctionToolCall;
+import com.openai.models.chat.completions.ChatCompletionMessageParam;
+import com.openai.models.chat.completions.ChatCompletionMessageToolCall;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import org.apache.kyuubi.engine.dataagent.runtime.event.AgentError;
+import org.apache.kyuubi.engine.dataagent.runtime.event.AgentEvent;
+import org.apache.kyuubi.engine.dataagent.runtime.event.AgentFinish;
+import org.apache.kyuubi.engine.dataagent.runtime.event.AgentStart;
+import org.apache.kyuubi.engine.dataagent.runtime.event.ContentComplete;
+import org.apache.kyuubi.engine.dataagent.runtime.event.StepEnd;
+import org.apache.kyuubi.engine.dataagent.runtime.event.StepStart;
+import org.apache.kyuubi.engine.dataagent.runtime.event.ToolCall;
+import org.apache.kyuubi.engine.dataagent.runtime.event.ToolResult;
+import org.apache.kyuubi.engine.dataagent.runtime.middleware.AgentMiddleware;
+import org.apache.kyuubi.engine.dataagent.runtime.middleware.Decision;
+import org.apache.kyuubi.engine.dataagent.runtime.middleware.ToolInvocation;
+import org.apache.kyuubi.engine.dataagent.tool.ToolContext;
+import org.apache.kyuubi.engine.dataagent.tool.ToolRegistry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ReAct (Reasoning + Acting) agent loop using the OpenAI official Java SDK. Iterates through LLM
+ * reasoning, tool execution, and result verification until the agent produces a final answer or
+ * hits the iteration limit.
+ *
+ *
Emits {@link AgentEvent}s via the provided consumer for real-time token-level streaming.
+ */
+public class ReactAgent {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ReactAgent.class);
+ private static final ObjectMapper JSON = new ObjectMapper();
+
+ private final String defaultModelName;
+ private final ToolRegistry toolRegistry;
+ private final MiddlewareDispatcher dispatcher;
+ private final LlmStreamClient llmStreamClient;
+ private final int maxIterations;
+ private final String systemPrompt;
+
+ public ReactAgent(
+ OpenAIClient client,
+ String modelName,
+ ToolRegistry toolRegistry,
+ List middlewares,
+ int maxIterations,
+ String systemPrompt) {
+ this.defaultModelName = modelName;
+ this.toolRegistry = toolRegistry;
+ this.dispatcher = new MiddlewareDispatcher(middlewares);
+ this.llmStreamClient = new LlmStreamClient(client, toolRegistry);
+ this.maxIterations = maxIterations;
+ this.systemPrompt = systemPrompt;
+ }
+
+ /** Resolve a pending approval request. Returns false if no pending request matches. */
+ public boolean resolveApproval(String requestId, boolean approved) {
+ return dispatcher.resolveApproval(requestId, approved);
+ }
+
+ /** Fan out session-close to every middleware. Errors in one middleware don't block others. */
+ public void closeSession(String sessionId) {
+ dispatcher.onSessionClose(sessionId);
+ }
+
+ /** Fan out engine-stop to every middleware. Errors in one middleware don't block others. */
+ public void stop() {
+ dispatcher.onStop();
+ }
+
+ /**
+ * Run the ReAct loop for the given request.
+ *
+ * @param request user-facing parameters (question, model override, etc.)
+ * @param memory the conversation memory (may contain prior context)
+ * @param eventConsumer callback for each agent event (token-level streaming)
+ */
+ public void run(
+ AgentInvocation request, ConversationMemory memory, Consumer eventConsumer) {
+ String userInput = request.getUserInput();
+ ApprovalMode approvalMode = request.getApprovalMode();
+ String modelNameOverride = request.getModelName();
+
+ String effectiveModel =
+ (modelNameOverride != null && !modelNameOverride.isEmpty())
+ ? modelNameOverride
+ : defaultModelName;
+
+ // System prompt is immutable for the lifetime of this agent — only set it on first run
+ // to avoid redundant overwrites on multi-turn conversations.
+ if (memory.getSystemPrompt() == null) {
+ memory.setSystemPrompt(systemPrompt);
+ }
+ memory.addUserMessage(userInput);
+
+ AgentRunContext ctx = new AgentRunContext(memory, approvalMode, request.getSessionId());
+ // Wire the event pipeline: ctx.emit -> middleware.onEvent -> raw consumer.
+ // Splitting filter and forward keeps the middleware composite ignorant of the consumer.
+ ctx.setEventEmitter(
+ event -> {
+ Decision d = dispatcher.onEvent(ctx, event);
+ if (d.kind() == Decision.Kind.ABORT) return;
+ eventConsumer.accept(d.kind() == Decision.Kind.REPLACE ? d.replacement() : event);
+ });
+ dispatcher.onAgentStart(ctx);
+ ctx.emit(new AgentStart());
+
+ try {
+ for (int step = 1; step <= maxIterations; step++) {
+ ctx.setIteration(step);
+ ctx.emit(new StepStart(step));
+
+ List messages =
+ resolveMessagesForCall(ctx, memory.buildLlmMessages());
+ if (messages == null) {
+ // Middleware asked us to skip — AgentError + AgentFinish have already been emitted.
+ return;
+ }
+
+ LlmStreamClient.StreamResult result = llmStreamClient.stream(ctx, messages, effectiveModel);
+ if (result.isEmpty()) {
+ ctx.emit(new AgentError("LLM returned empty response"));
+ emitFinish(ctx);
+ return;
+ }
+
+ if (!result.content.isEmpty()) {
+ ctx.emit(new ContentComplete(result.content));
+ }
+ ChatCompletionAssistantMessageParam assistantMsg = result.toAssistantMessage();
+ Decision after =
+ dispatcher.afterLlmCall(ctx, assistantMsg);
+ if (after.kind() == Decision.Kind.ABORT) {
+ ctx.emit(new AgentError("LLM response rejected by middleware: " + after.reason()));
+ emitFinish(ctx);
+ return;
+ }
+ if (after.kind() == Decision.Kind.REPLACE) assistantMsg = after.replacement();
+ memory.addAssistantMessage(assistantMsg);
+
+ List toolCalls = assistantMsg.toolCalls().orElse(null);
+ if (toolCalls == null || toolCalls.isEmpty()) {
+ // No tool calls — agent is done.
+ ctx.emit(new StepEnd(step));
+ emitFinish(ctx);
+ return;
+ }
+
+ executeToolCalls(ctx, memory, toolCalls);
+ ctx.emit(new StepEnd(step));
+ }
+
+ ctx.emit(new AgentError("Reached maximum iterations (" + maxIterations + ")"));
+ emitFinish(ctx);
+
+ } catch (Exception e) {
+ LOG.error("Agent run failed", e);
+ ctx.emit(new AgentError(e.getClass().getSimpleName() + ": " + e.getMessage()));
+ emitFinish(ctx);
+ } finally {
+ dispatcher.onAgentFinish(ctx);
+ }
+ }
+
+ private static void emitFinish(AgentRunContext ctx) {
+ ctx.emit(
+ new AgentFinish(
+ ctx.getIteration(),
+ ctx.getPromptTokens(),
+ ctx.getCompletionTokens(),
+ ctx.getTotalTokens()));
+ }
+
+ /**
+ * Run {@code beforeLlmCall} middleware against {@code messages}. Returns the messages to send,
+ * possibly rewritten by middleware, or {@code null} if middleware aborted the call (in which case
+ * this method has already emitted the terminal events).
+ */
+ private List resolveMessagesForCall(
+ AgentRunContext ctx, List messages) {
+ Decision> decision = dispatcher.beforeLlmCall(ctx, messages);
+ if (decision.kind() == Decision.Kind.ABORT) {
+ ctx.emit(new AgentError("LLM call skipped by middleware: " + decision.reason()));
+ emitFinish(ctx);
+ return null;
+ }
+ return decision.kind() == Decision.Kind.REPLACE ? decision.replacement() : messages;
+ }
+
+ /**
+ * Execute the assistant's tool calls in 3 phases:
+ *
+ *
+ *
Serial: run {@code beforeToolCall} middleware, emit {@link ToolCall} events, and collect
+ * the calls that survived approval.
+ *
Concurrent: fan out to {@link ToolRegistry#submitTool}, which always returns a future
+ * that completes normally — timeouts and execution errors surface as error strings.
+ *
Serial: join futures in order, run {@code afterToolCall}, and record results to memory.
+ *
+ */
+ private void executeToolCalls(
+ AgentRunContext ctx,
+ ConversationMemory memory,
+ List toolCalls) {
+ List approved = new ArrayList<>();
+ for (ChatCompletionMessageToolCall toolCall : toolCalls) {
+ ChatCompletionMessageFunctionToolCall fnCall = toolCall.asFunction();
+ String toolName = fnCall.function().name();
+ Map toolArgs;
+ try {
+ toolArgs = parseToolArgs(fnCall.function().arguments());
+ } catch (IllegalArgumentException e) {
+ // Malformed JSON from the LLM: record an error tool_result (preserves the
+ // assistant/tool_result pairing the next API call needs) and let the loop self-correct.
+ String err = "Tool call failed: " + e.getMessage();
+ memory.addToolResult(fnCall.id(), err);
+ ctx.emit(new ToolResult(fnCall.id(), toolName, err, true));
+ continue;
+ }
+
+ ToolInvocation invocation = new ToolInvocation(fnCall.id(), toolName, toolArgs);
+ Decision decision = dispatcher.beforeToolCall(ctx, invocation);
+ if (decision.kind() == Decision.Kind.ABORT) {
+ String denied = "Tool call denied: " + decision.reason();
+ memory.addToolResult(fnCall.id(), denied);
+ ctx.emit(new ToolResult(fnCall.id(), toolName, denied, true));
+ continue;
+ }
+ boolean rewritten = decision.kind() == Decision.Kind.REPLACE;
+ ToolInvocation effective = rewritten ? decision.replacement() : invocation;
+
+ ctx.emit(new ToolCall(effective.id(), effective.name(), effective.args()));
+ approved.add(new ToolCallEntry(fnCall, effective, rewritten));
+ }
+
+ ToolContext toolCtx = new ToolContext(ctx.getSessionId());
+ List> futures = new ArrayList<>(approved.size());
+ for (ToolCallEntry entry : approved) {
+ futures.add(toolRegistry.submitTool(entry.invocation.name(), entry.argsJson(), toolCtx));
+ }
+
+ for (int i = 0; i < approved.size(); i++) {
+ ToolCallEntry entry = approved.get(i);
+ String raw = futures.get(i).join();
+ Decision after = dispatcher.afterToolCall(ctx, entry.invocation, raw);
+ // ABORT.afterToolCall: discard the result; surface reason() to the LLM and mark the event
+ // as an error so listeners can distinguish it from a successful tool result.
+ boolean isError = after.kind() == Decision.Kind.ABORT;
+ String output =
+ after.kind() == Decision.Kind.ABORT
+ ? after.reason()
+ : (after.kind() == Decision.Kind.REPLACE ? after.replacement() : raw);
+ memory.addToolResult(entry.fnCall.id(), output);
+ ctx.emit(new ToolResult(entry.fnCall.id(), entry.invocation.name(), output, isError));
+ }
+ }
+
+ /**
+ * Holds an approved tool call's parsed metadata for the 3-phase execution pipeline. {@code
+ * rewritten} is {@code true} when middleware replaced the {@link ToolInvocation}; in that case
+ * args must be re-serialized for {@link ToolRegistry#submitTool}, otherwise the LLM's original
+ * JSON is reused verbatim.
+ */
+ private static class ToolCallEntry {
+ final ChatCompletionMessageFunctionToolCall fnCall;
+ final ToolInvocation invocation;
+ final boolean rewritten;
+
+ ToolCallEntry(
+ ChatCompletionMessageFunctionToolCall fnCall,
+ ToolInvocation invocation,
+ boolean rewritten) {
+ this.fnCall = fnCall;
+ this.invocation = invocation;
+ this.rewritten = rewritten;
+ }
+
+ String argsJson() {
+ if (!rewritten) return fnCall.function().arguments();
+ try {
+ return JSON.writeValueAsString(invocation.args());
+ } catch (com.fasterxml.jackson.core.JsonProcessingException e) {
+ throw new IllegalStateException("Failed to serialize rewritten tool args", e);
+ }
+ }
+ }
+
+ private static Map parseToolArgs(String json) {
+ if (json == null || json.isEmpty()) {
+ return new HashMap<>();
+ }
+ try {
+ return JSON.readValue(json, new TypeReference