Skip to content

Commit 34d6a29

Browse files
wenjin272claude
andcommitted
[api][plan][runtime] Cross-language Function descriptors and FunctionTool
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 5182f25 commit 34d6a29

20 files changed

Lines changed: 1000 additions & 61 deletions

File tree

api/src/main/java/org/apache/flink/agents/api/agents/Agent.java

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
package org.apache.flink.agents.api.agents;
2020

21+
import org.apache.flink.agents.api.function.Function;
22+
import org.apache.flink.agents.api.function.JavaFunction;
2123
import org.apache.flink.agents.api.resource.ResourceDescriptor;
2224
import org.apache.flink.agents.api.resource.ResourceType;
2325
import org.apache.flink.agents.api.resource.SerializableResource;
@@ -31,7 +33,7 @@
3133

3234
/** Base class for defining agent logic. */
3335
public class Agent {
34-
private final Map<String, Tuple3<String[], Method, Map<String, Object>>> actions;
36+
private final Map<String, Tuple3<String[], Function, Map<String, Object>>> actions;
3537

3638
private final Map<ResourceType, Map<String, Object>> resources;
3739

@@ -43,7 +45,7 @@ public Agent() {
4345
this.actions = new HashMap<>();
4446
}
4547

46-
public Map<String, Tuple3<String[], Method, Map<String, Object>>> getActions() {
48+
public Map<String, Tuple3<String[], Function, Map<String, Object>>> getActions() {
4749
return actions;
4850
}
4951

@@ -60,12 +62,7 @@ public Map<ResourceType, Map<String, Object>> getResources() {
6062
*/
6163
public Agent addAction(
6264
String[] eventTypes, Method method, @Nullable Map<String, Object> config) {
63-
String name = method.getName();
64-
if (actions.containsKey(name)) {
65-
throw new IllegalArgumentException(String.format("Action %s already defined.", name));
66-
}
67-
actions.put(name, new Tuple3<>(eventTypes, method, config));
68-
return this;
65+
return addAction(method.getName(), eventTypes, JavaFunction.fromMethod(method), config);
6966
}
7067

7168
/**
@@ -78,6 +75,27 @@ public Agent addAction(String[] eventTypes, Method method) {
7875
return addAction(eventTypes, method, null);
7976
}
8077

78+
/**
79+
* Add action to agent.
80+
*
81+
* @param name The action name. Must be unique within this agent.
82+
* @param eventTypes The event type strings this action listens to.
83+
* @param function The api-layer function descriptor; will be promoted to a plan-layer
84+
* executable at {@code AgentPlan} construction.
85+
* @param config Optional config for this action.
86+
*/
87+
public Agent addAction(
88+
String name,
89+
String[] eventTypes,
90+
Function function,
91+
@Nullable Map<String, Object> config) {
92+
if (actions.containsKey(name)) {
93+
throw new IllegalArgumentException(String.format("Action %s already defined.", name));
94+
}
95+
actions.put(name, new Tuple3<>(eventTypes, function, config));
96+
return this;
97+
}
98+
8199
public void addResourcesIfAbsent(Map<ResourceType, Map<String, Object>> resources) {
82100
for (ResourceType type : resources.keySet()) {
83101
Map<String, Object> typedResources = resources.get(type);
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.flink.agents.api.function;
19+
20+
/**
21+
* Pure-data marker for user-defined function descriptors carried on the api layer.
22+
*
23+
* <p>Implementations describe <em>which</em> function ({@link PythonFunction}, {@link
24+
* JavaFunction}) but do not execute it. The plan-layer twins ({@code
25+
* org.apache.flink.agents.plan.Function} and friends) own execution; the conversion from api → plan
26+
* happens during {@code AgentPlan} construction.
27+
*/
28+
public interface Function {}
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.flink.agents.api.function;
19+
20+
import com.fasterxml.jackson.annotation.JsonCreator;
21+
import com.fasterxml.jackson.annotation.JsonProperty;
22+
23+
import java.io.Serializable;
24+
import java.lang.reflect.Method;
25+
import java.util.ArrayList;
26+
import java.util.Collections;
27+
import java.util.List;
28+
import java.util.Objects;
29+
30+
/**
31+
* Pure-data descriptor for a Java method, identified by declaring class FQN, method name, and
32+
* parameter types as strings.
33+
*
34+
* <p>Parameter types are strings — JVM primitive names ({@code int}, {@code long}, {@code boolean},
35+
* …) or fully-qualified reference type names ({@code java.lang.String}, {@code java.util.List}). No
36+
* generic parameters. The wire form keeps the descriptor pure data; class resolution is deferred to
37+
* the plan-layer twin.
38+
*/
39+
public final class JavaFunction implements Function, Serializable {
40+
41+
private static final String FIELD_QUAL_NAME = "qualName";
42+
private static final String FIELD_METHOD_NAME = "methodName";
43+
private static final String FIELD_PARAMETER_TYPES = "parameterTypes";
44+
45+
@JsonProperty(FIELD_QUAL_NAME)
46+
private final String qualName;
47+
48+
@JsonProperty(FIELD_METHOD_NAME)
49+
private final String methodName;
50+
51+
@JsonProperty(FIELD_PARAMETER_TYPES)
52+
private final List<String> parameterTypes;
53+
54+
@JsonCreator
55+
public JavaFunction(
56+
@JsonProperty(FIELD_QUAL_NAME) String qualName,
57+
@JsonProperty(FIELD_METHOD_NAME) String methodName,
58+
@JsonProperty(FIELD_PARAMETER_TYPES) List<String> parameterTypes) {
59+
this.qualName = Objects.requireNonNull(qualName, "qualName");
60+
this.methodName = Objects.requireNonNull(methodName, "methodName");
61+
this.parameterTypes =
62+
parameterTypes == null
63+
? Collections.emptyList()
64+
: Collections.unmodifiableList(new ArrayList<>(parameterTypes));
65+
}
66+
67+
/**
68+
* Build a descriptor from a reflected {@link Method}. Each parameter type is captured via
69+
* {@link Class#getName()} — the same form {@link Class#forName(String)} accepts when the api
70+
* descriptor is later promoted to its plan-layer twin. For primitives this is the keyword
71+
* ({@code int}, {@code long}); for reference types the fully-qualified name; for array types
72+
* the JVM-internal descriptor ({@code [I}, {@code [Ljava.lang.String;}).
73+
*/
74+
public static JavaFunction fromMethod(Method method) {
75+
List<String> params = new ArrayList<>(method.getParameterCount());
76+
for (Class<?> p : method.getParameterTypes()) {
77+
params.add(p.getName());
78+
}
79+
return new JavaFunction(method.getDeclaringClass().getName(), method.getName(), params);
80+
}
81+
82+
public String getQualName() {
83+
return qualName;
84+
}
85+
86+
public String getMethodName() {
87+
return methodName;
88+
}
89+
90+
public List<String> getParameterTypes() {
91+
return parameterTypes;
92+
}
93+
94+
@Override
95+
public boolean equals(Object o) {
96+
if (this == o) return true;
97+
if (!(o instanceof JavaFunction)) return false;
98+
JavaFunction that = (JavaFunction) o;
99+
return qualName.equals(that.qualName)
100+
&& methodName.equals(that.methodName)
101+
&& parameterTypes.equals(that.parameterTypes);
102+
}
103+
104+
@Override
105+
public int hashCode() {
106+
return Objects.hash(qualName, methodName, parameterTypes);
107+
}
108+
109+
@Override
110+
public String toString() {
111+
return "JavaFunction{" + qualName + "#" + methodName + parameterTypes + "}";
112+
}
113+
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.flink.agents.api.function;
19+
20+
import com.fasterxml.jackson.annotation.JsonCreator;
21+
import com.fasterxml.jackson.annotation.JsonProperty;
22+
23+
import java.io.Serializable;
24+
import java.util.Objects;
25+
26+
/**
27+
* Pure-data descriptor for a Python callable, identified by its module and qualified name.
28+
*
29+
* <p>Carries no execution behavior — the plan-layer {@code
30+
* org.apache.flink.agents.plan.PythonFunction} owns invocation via the Pemja interpreter.
31+
*/
32+
public final class PythonFunction implements Function, Serializable {
33+
34+
private static final String FIELD_MODULE = "module";
35+
private static final String FIELD_QUAL_NAME = "qualName";
36+
37+
@JsonProperty(FIELD_MODULE)
38+
private final String module;
39+
40+
@JsonProperty(FIELD_QUAL_NAME)
41+
private final String qualName;
42+
43+
@JsonCreator
44+
public PythonFunction(
45+
@JsonProperty(FIELD_MODULE) String module,
46+
@JsonProperty(FIELD_QUAL_NAME) String qualName) {
47+
this.module = Objects.requireNonNull(module, "module");
48+
this.qualName = Objects.requireNonNull(qualName, "qualName");
49+
}
50+
51+
public String getModule() {
52+
return module;
53+
}
54+
55+
public String getQualName() {
56+
return qualName;
57+
}
58+
59+
@Override
60+
public boolean equals(Object o) {
61+
if (this == o) return true;
62+
if (!(o instanceof PythonFunction)) return false;
63+
PythonFunction that = (PythonFunction) o;
64+
return module.equals(that.module) && qualName.equals(that.qualName);
65+
}
66+
67+
@Override
68+
public int hashCode() {
69+
return Objects.hash(module, qualName);
70+
}
71+
72+
@Override
73+
public String toString() {
74+
return "PythonFunction{" + module + ":" + qualName + "}";
75+
}
76+
}

api/src/main/java/org/apache/flink/agents/api/resource/python/PythonResourceAdapter.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,4 +128,36 @@ public interface PythonResourceAdapter {
128128
* @return the result of the method invocation
129129
*/
130130
Object invoke(String name, Object... args);
131+
132+
/**
133+
* Look up tool metadata for a Python function across the JVM&rarr;Python bridge.
134+
*
135+
* <p>The Java side asks the Python side to introspect a callable identified by {@code module} +
136+
* {@code qualName}, and returns a flat {@code Map<String, String>} with keys {@code "name"},
137+
* {@code "description"}, and {@code "inputSchema"} (a JSON schema string compatible with {@code
138+
* ToolMetadata.inputSchema}).
139+
*
140+
* <p>The return shape is intentionally flat — pemja can SIGSEGV when returning arbitrary Python
141+
* objects to Java on non-main-interpreter threads.
142+
*
143+
* @param module the Python module containing the callable
144+
* @param qualName the qualified name of the callable inside the module (e.g. {@code "fn"} or
145+
* {@code "MyClass.method"})
146+
* @return flat map with keys "name", "description", "inputSchema"
147+
*/
148+
Map<String, String> getPythonToolMetadata(String module, String qualName);
149+
150+
/**
151+
* Invoke a Python callable as a tool, passing keyword arguments. Used when a Java chat model's
152+
* tool list contains a {@code plan.FunctionTool} whose function descriptor is a {@code
153+
* PythonFunction}: instead of routing the invocation through Java reflection, dispatch it
154+
* across the bridge so the underlying Python function runs in the Pemja interpreter.
155+
*
156+
* @param module the Python module containing the callable
157+
* @param qualName the qualified name of the callable inside the module
158+
* @param kwargs keyword arguments to pass to the callable; LLM tool calls always arrive as
159+
* keyword arguments
160+
* @return the raw return value from the Python callable
161+
*/
162+
Object invokePythonTool(String module, String qualName, Map<String, Object> kwargs);
131163
}

api/src/main/java/org/apache/flink/agents/api/tools/FunctionTool.java

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,25 +18,38 @@
1818

1919
package org.apache.flink.agents.api.tools;
2020

21+
import org.apache.flink.agents.api.function.Function;
22+
import org.apache.flink.agents.api.function.JavaFunction;
2123
import org.apache.flink.agents.api.resource.ResourceType;
2224
import org.apache.flink.agents.api.resource.SerializableResource;
2325

2426
import java.lang.reflect.Method;
27+
import java.util.Objects;
2528

26-
/** Tool keeps a method, will be converted to tool after compile. */
29+
/**
30+
* Pure-data tool descriptor: carries an {@link Function} reference. Used at agent-construction
31+
* time; compiled to the plan-layer executable {@code plan.tools.FunctionTool} when the agent
32+
* becomes an {@code AgentPlan}.
33+
*/
2734
public class FunctionTool extends SerializableResource {
28-
private final Method method;
2935

30-
public FunctionTool(Method method) {
31-
this.method = method;
36+
private final Function func;
37+
38+
public FunctionTool(Function func) {
39+
this.func = Objects.requireNonNull(func, "func");
40+
}
41+
42+
/** Convenience factory: derive a {@link JavaFunction} from a reflected method. */
43+
public static FunctionTool fromMethod(Method method) {
44+
return new FunctionTool(JavaFunction.fromMethod(method));
45+
}
46+
47+
public Function getFunc() {
48+
return func;
3249
}
3350

3451
@Override
3552
public ResourceType getResourceType() {
3653
return ResourceType.TOOL;
3754
}
38-
39-
public Method getMethod() {
40-
return method;
41-
}
4255
}

0 commit comments

Comments
 (0)