Skip to content

Commit 3c028ae

Browse files
[KYUUBI #7379][PR 1/4] Data Agent Engine: module skeleton, configuration, and engine core
1 parent c237522 commit 3c028ae

46 files changed

Lines changed: 3343 additions & 2 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

build/dist

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,7 @@ mkdir -p "$DISTDIR/externals/engines/trino"
288288
mkdir -p "$DISTDIR/externals/engines/hive"
289289
mkdir -p "$DISTDIR/externals/engines/jdbc"
290290
mkdir -p "$DISTDIR/externals/engines/chat"
291+
mkdir -p "$DISTDIR/externals/engines/data-agent"
291292
echo "Kyuubi $VERSION $GITREVSTRING built for" > "$DISTDIR/RELEASE"
292293
echo "Java $JAVA_VERSION" >> "$DISTDIR/RELEASE"
293294
echo "Scala $SCALA_VERSION" >> "$DISTDIR/RELEASE"
@@ -363,6 +364,18 @@ for jar in $(ls "$DISTDIR/jars/"); do
363364
fi
364365
done
365366

367+
# Copy data-agent engines
368+
cp "$KYUUBI_HOME/externals/kyuubi-data-agent-engine/target/kyuubi-data-agent-engine_${SCALA_VERSION}-${VERSION}.jar" "$DISTDIR/externals/engines/data-agent/"
369+
cp -r "$KYUUBI_HOME"/externals/kyuubi-data-agent-engine/target/scala-$SCALA_VERSION/jars/*.jar "$DISTDIR/externals/engines/data-agent/"
370+
371+
# Share the jars w/ server to reduce binary size
372+
# shellcheck disable=SC2045
373+
for jar in $(ls "$DISTDIR/jars/"); do
374+
if [[ -f "$DISTDIR/externals/engines/data-agent/$jar" ]]; then
375+
(cd $DISTDIR/externals/engines/data-agent; ln -snf "../../../jars/$jar" "$DISTDIR/externals/engines/data-agent/$jar")
376+
fi
377+
done
378+
366379
# Copy Kyuubi Spark extension
367380
# shellcheck disable=SC2068
368381
for SPARK_EXTENSION_VERSION in ${SPARK_EXTENSION_VERSIONS[@]}; do

docs/configuration/settings.md

Lines changed: 12 additions & 0 deletions
Large diffs are not rendered by default.

externals/kyuubi-data-agent-engine/pom.xml

Whitespace-only changes.
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kyuubi.engine.dataagent.provider;
19+
20+
import java.util.function.Consumer;
21+
import org.apache.kyuubi.config.KyuubiConf;
22+
import org.apache.kyuubi.engine.dataagent.runtime.event.AgentEvent;
23+
import org.apache.kyuubi.util.reflect.DynConstructors;
24+
25+
/**
26+
* A pluggable provider interface for the Data Agent engine. Implementations wire up the ReactAgent
27+
* with an LLM model, tools, and middlewares.
28+
*/
29+
public interface DataAgentProvider {
30+
31+
/** Initialize a session for the given user. */
32+
void open(String sessionId, String user);
33+
34+
/**
35+
* Run the agent for the given request, emitting events via the consumer. Events include
36+
* token-level ContentDelta for streaming, ToolCall/ToolResult for tool invocations, and
37+
* AgentFinish when complete.
38+
*
39+
* @param sessionId the session identifier (maps to conversation memory)
40+
* @param request user-facing parameters (question, model override, etc.)
41+
* @param onEvent event consumer for streaming events
42+
*/
43+
void run(String sessionId, ProviderRunRequest request, Consumer<AgentEvent> onEvent);
44+
45+
/**
46+
* Close and clean up a single session, releasing session-scoped resources such as conversation
47+
* history and session state. Called when one user session ends.
48+
*/
49+
void close(String sessionId);
50+
51+
/**
52+
* Resolve a pending tool approval request. Called when the client sends an approval or denial
53+
* response for a tool call that requires human-in-the-loop confirmation.
54+
*
55+
* @param requestId the request ID from the ApprovalRequest event
56+
* @param approved true to approve, false to deny
57+
* @return true if the request was found and resolved, false if not found (timed out or invalid)
58+
*/
59+
default boolean resolveApproval(String requestId, boolean approved) {
60+
return false;
61+
}
62+
63+
/**
64+
* Stop the provider itself, releasing engine-level resources shared across all sessions (e.g.
65+
* HTTP connection pools, thread pools). Called once when the entire engine shuts down.
66+
*/
67+
default void stop() {}
68+
69+
static DataAgentProvider load(KyuubiConf conf) {
70+
String providerClass = conf.get(KyuubiConf.ENGINE_DATA_AGENT_PROVIDER());
71+
try {
72+
return (DataAgentProvider)
73+
DynConstructors.builder(DataAgentProvider.class)
74+
.impl(providerClass, KyuubiConf.class)
75+
.impl(providerClass)
76+
.buildChecked()
77+
.newInstanceChecked(conf);
78+
} catch (ClassCastException e) {
79+
throw new IllegalArgumentException(
80+
"Class "
81+
+ providerClass
82+
+ " is not a child of '"
83+
+ DataAgentProvider.class.getName()
84+
+ "'.",
85+
e);
86+
} catch (Exception e) {
87+
throw new IllegalArgumentException("Error while instantiating '" + providerClass + "': ", e);
88+
}
89+
}
90+
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kyuubi.engine.dataagent.provider;
19+
20+
/**
21+
* User-facing request parameters for a provider-level agent invocation. Only contains fields from
22+
* the caller (question, model override, etc.). Adding new per-request options does not require
23+
* changing the {@link DataAgentProvider} interface.
24+
*/
25+
public class ProviderRunRequest {
26+
27+
private final String question;
28+
private String modelName;
29+
private String approvalMode;
30+
31+
public ProviderRunRequest(String question) {
32+
this.question = question;
33+
}
34+
35+
public String getQuestion() {
36+
return question;
37+
}
38+
39+
public String getModelName() {
40+
return modelName;
41+
}
42+
43+
public ProviderRunRequest modelName(String modelName) {
44+
this.modelName = modelName;
45+
return this;
46+
}
47+
48+
public String getApprovalMode() {
49+
return approvalMode;
50+
}
51+
52+
public ProviderRunRequest approvalMode(String approvalMode) {
53+
this.approvalMode = approvalMode;
54+
return this;
55+
}
56+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kyuubi.engine.dataagent.provider.echo;
19+
20+
import java.util.function.Consumer;
21+
import org.apache.kyuubi.engine.dataagent.provider.DataAgentProvider;
22+
import org.apache.kyuubi.engine.dataagent.provider.ProviderRunRequest;
23+
import org.apache.kyuubi.engine.dataagent.runtime.event.AgentEvent;
24+
import org.apache.kyuubi.engine.dataagent.runtime.event.AgentFinish;
25+
import org.apache.kyuubi.engine.dataagent.runtime.event.AgentStart;
26+
import org.apache.kyuubi.engine.dataagent.runtime.event.ContentComplete;
27+
import org.apache.kyuubi.engine.dataagent.runtime.event.ContentDelta;
28+
import org.apache.kyuubi.engine.dataagent.runtime.event.StepEnd;
29+
import org.apache.kyuubi.engine.dataagent.runtime.event.StepStart;
30+
31+
/** A simple echo provider for testing purposes. Simulates the agent event stream. */
32+
public class EchoProvider implements DataAgentProvider {
33+
34+
@Override
35+
public void open(String sessionId, String user) {}
36+
37+
@Override
38+
public void run(String sessionId, ProviderRunRequest request, Consumer<AgentEvent> onEvent) {
39+
String question = request.getQuestion();
40+
41+
onEvent.accept(new AgentStart());
42+
onEvent.accept(new StepStart(1));
43+
44+
// Simulate token-level streaming
45+
String reply =
46+
"[DataAgent Echo] I received your question: "
47+
+ question
48+
+ "\n"
49+
+ "This is the Data Agent engine in echo mode. "
50+
+ "Please configure an LLM provider (e.g., OPENAI_COMPATIBLE) for actual data analysis.";
51+
for (String token : reply.split("(?<=\\s)")) {
52+
onEvent.accept(new ContentDelta(token));
53+
}
54+
55+
onEvent.accept(new ContentComplete(reply));
56+
onEvent.accept(new StepEnd(1));
57+
onEvent.accept(new AgentFinish(1, 0, 0, 0));
58+
}
59+
60+
@Override
61+
public void close(String sessionId) {}
62+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kyuubi.engine.dataagent.runtime.event;
19+
20+
/** An error occurred during agent execution. */
21+
public final class AgentError extends AgentEvent {
22+
private final String message;
23+
24+
public AgentError(String message) {
25+
super(EventType.ERROR);
26+
this.message = message;
27+
}
28+
29+
public String message() {
30+
return message;
31+
}
32+
33+
@Override
34+
public String toString() {
35+
return "AgentError{message='" + message + "'}";
36+
}
37+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kyuubi.engine.dataagent.runtime.event;
19+
20+
/**
21+
* Base class for events emitted by the ReAct agent loop. Each event represents a discrete step in
22+
* the agent's reasoning and execution process, enabling real-time token-level streaming to clients.
23+
*
24+
* <p>Every subclass must declare its {@link EventType}, which also determines the SSE event name
25+
* used on the wire. This allows consumers to {@code switch} on the type rather than using {@code
26+
* instanceof} chains.
27+
*
28+
* <p>Package-private constructor restricts subclassing to this package.
29+
*/
30+
public abstract class AgentEvent {
31+
32+
private final EventType eventType;
33+
34+
AgentEvent(EventType eventType) {
35+
this.eventType = eventType;
36+
}
37+
38+
/** Returns the type of this event. */
39+
public EventType eventType() {
40+
return eventType;
41+
}
42+
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kyuubi.engine.dataagent.runtime.event;
19+
20+
/** The agent has finished its analysis. */
21+
public final class AgentFinish extends AgentEvent {
22+
private final int totalSteps;
23+
private final long promptTokens;
24+
private final long completionTokens;
25+
private final long totalTokens;
26+
27+
public AgentFinish(int totalSteps, long promptTokens, long completionTokens, long totalTokens) {
28+
super(EventType.AGENT_FINISH);
29+
this.totalSteps = totalSteps;
30+
this.promptTokens = promptTokens;
31+
this.completionTokens = completionTokens;
32+
this.totalTokens = totalTokens;
33+
}
34+
35+
public int totalSteps() {
36+
return totalSteps;
37+
}
38+
39+
public long promptTokens() {
40+
return promptTokens;
41+
}
42+
43+
public long completionTokens() {
44+
return completionTokens;
45+
}
46+
47+
public long totalTokens() {
48+
return totalTokens;
49+
}
50+
51+
@Override
52+
public String toString() {
53+
return "AgentFinish{totalSteps="
54+
+ totalSteps
55+
+ ", promptTokens="
56+
+ promptTokens
57+
+ ", completionTokens="
58+
+ completionTokens
59+
+ ", totalTokens="
60+
+ totalTokens
61+
+ "}";
62+
}
63+
}

0 commit comments

Comments
 (0)