From 632d7b9f176a4d12deefc22a80d385f54114df53 Mon Sep 17 00:00:00 2001 From: WenjinXie Date: Wed, 18 Mar 2026 17:30:32 +0800 Subject: [PATCH] [runtime] Support async execution for cross language resources. --- .../api/chat/model/BaseChatModelSetup.java | 12 ++++ .../model/BaseEmbeddingModelSetup.java | 12 ++++ .../flink/agents/api/resource/Resource.java | 3 + .../api/vectorstores/BaseVectorStore.java | 12 ++++ plan/pom.xml | 6 ++ .../agents/plan/actions/ChatModelAction.java | 8 ++- .../plan/actions/ContextRetrievalAction.java | 8 ++- .../flink/agents/plan/actions/Utils.java | 56 +++++++++++++++++++ python/flink_agents/api/resource.py | 37 +++++++----- .../plan/actions/chat_model_action.py | 23 ++++++-- .../plan/actions/context_retrieval_action.py | 9 ++- python/flink_agents/plan/actions/utils.py | 50 +++++++++++++++++ python/flink_agents/plan/agent_plan.py | 1 + .../runtime/java/java_chat_model.py | 11 ++++ .../runtime/java/java_embedding_model.py | 13 +++++ .../runtime/java/java_vector_store.py | 11 ++++ 16 files changed, 243 insertions(+), 29 deletions(-) create mode 100644 plan/src/main/java/org/apache/flink/agents/plan/actions/Utils.java create mode 100644 python/flink_agents/plan/actions/utils.py diff --git a/api/src/main/java/org/apache/flink/agents/api/chat/model/BaseChatModelSetup.java b/api/src/main/java/org/apache/flink/agents/api/chat/model/BaseChatModelSetup.java index 15aa953e3..05f156821 100644 --- a/api/src/main/java/org/apache/flink/agents/api/chat/model/BaseChatModelSetup.java +++ b/api/src/main/java/org/apache/flink/agents/api/chat/model/BaseChatModelSetup.java @@ -49,6 +49,18 @@ public BaseChatModelSetup( this.tools = descriptor.getArgument("tools"); } + /** + * Trigger construction for resource objects. + * + *

Currently, in cross-language invocation scenarios, constructing resource object within an + * async thread may encounter issues. We resolved this issue by moving the construction of the + * resources object out of the method to be async executed and invoking it in the main thread. + */ + @Override + public void open() throws Exception { + this.getResource.apply(this.connection, ResourceType.CHAT_MODEL_CONNECTION); + } + public abstract Map getParameters(); public ChatMessage chat(List messages) { diff --git a/api/src/main/java/org/apache/flink/agents/api/embedding/model/BaseEmbeddingModelSetup.java b/api/src/main/java/org/apache/flink/agents/api/embedding/model/BaseEmbeddingModelSetup.java index 205cfed38..6856be59c 100644 --- a/api/src/main/java/org/apache/flink/agents/api/embedding/model/BaseEmbeddingModelSetup.java +++ b/api/src/main/java/org/apache/flink/agents/api/embedding/model/BaseEmbeddingModelSetup.java @@ -44,6 +44,18 @@ public BaseEmbeddingModelSetup( this.model = descriptor.getArgument("model"); } + /** + * Trigger construction for resource objects. + * + *

Currently, in cross-language invocation scenarios, constructing resource object within an + * async thread may encounter issues. We resolved this issue by moving the construction of the + * resources object out of the method to be async executed and invoking it in the main thread. + */ + @Override + public void open() { + this.getConnection(); + } + public abstract Map getParameters(); @Override diff --git a/api/src/main/java/org/apache/flink/agents/api/resource/Resource.java b/api/src/main/java/org/apache/flink/agents/api/resource/Resource.java index d386ca6dd..de2e588be 100644 --- a/api/src/main/java/org/apache/flink/agents/api/resource/Resource.java +++ b/api/src/main/java/org/apache/flink/agents/api/resource/Resource.java @@ -65,6 +65,9 @@ protected FlinkAgentsMetricGroup getMetricGroup() { return metricGroup; } + /** Open the resource. */ + public void open() throws Exception {} + /** Close the resource. */ public void close() throws Exception {} } diff --git a/api/src/main/java/org/apache/flink/agents/api/vectorstores/BaseVectorStore.java b/api/src/main/java/org/apache/flink/agents/api/vectorstores/BaseVectorStore.java index 64cc74556..035b8850c 100644 --- a/api/src/main/java/org/apache/flink/agents/api/vectorstores/BaseVectorStore.java +++ b/api/src/main/java/org/apache/flink/agents/api/vectorstores/BaseVectorStore.java @@ -46,6 +46,18 @@ public BaseVectorStore( this.embeddingModel = descriptor.getArgument("embedding_model"); } + /** + * Trigger construction for resource objects. + * + *

Currently, in cross-language invocation scenarios, constructing resource object within an + * async thread may encounter issues. We resolved this issue by moving the construction of the + * resources object out of the method to be async executed and invoking it in the main thread. + */ + @Override + public void open() { + this.getResource.apply(this.embeddingModel, ResourceType.EMBEDDING_MODEL); + } + @Override public ResourceType getResourceType() { return ResourceType.VECTOR_STORE; diff --git a/plan/pom.xml b/plan/pom.xml index ede26e085..eb437e711 100644 --- a/plan/pom.xml +++ b/plan/pom.xml @@ -35,6 +35,12 @@ under the License. ${flink.version} provided + + org.apache.flink + flink-runtime + ${flink.version} + provided + org.apache.flink flink-agents-api diff --git a/plan/src/main/java/org/apache/flink/agents/plan/actions/ChatModelAction.java b/plan/src/main/java/org/apache/flink/agents/plan/actions/ChatModelAction.java index cc73f2c62..aed8726f1 100644 --- a/plan/src/main/java/org/apache/flink/agents/plan/actions/ChatModelAction.java +++ b/plan/src/main/java/org/apache/flink/agents/plan/actions/ChatModelAction.java @@ -47,6 +47,7 @@ import java.util.*; import static org.apache.flink.agents.api.agents.Agent.STRUCTURED_OUTPUT; +import static org.apache.flink.agents.plan.actions.Utils.supportAsync; /** Built-in action for processing chat request and tool call result. */ public class ChatModelAction { @@ -199,9 +200,10 @@ public static void chat( (BaseChatModelSetup) ctx.getResource(model, ResourceType.CHAT_MODEL); boolean chatAsync = ctx.getConfig().get(AgentExecutionOptions.CHAT_ASYNC); - // TODO: python chat model doesn't support async execution yet, see - // https://github.com/apache/flink-agents/issues/448 for details. - chatAsync = chatAsync && !(chatModel instanceof PythonChatModelSetup); + + if ((chatModel instanceof PythonChatModelSetup) && !supportAsync()) { + chatAsync = false; + } Agent.ErrorHandlingStrategy strategy = ctx.getConfig().get(AgentExecutionOptions.ERROR_HANDLING_STRATEGY); diff --git a/plan/src/main/java/org/apache/flink/agents/plan/actions/ContextRetrievalAction.java b/plan/src/main/java/org/apache/flink/agents/plan/actions/ContextRetrievalAction.java index 420ceafa4..c66cda312 100644 --- a/plan/src/main/java/org/apache/flink/agents/plan/actions/ContextRetrievalAction.java +++ b/plan/src/main/java/org/apache/flink/agents/plan/actions/ContextRetrievalAction.java @@ -33,6 +33,8 @@ import java.util.List; +import static org.apache.flink.agents.plan.actions.Utils.supportAsync; + /** Built-in action for processing context retrieval requests. */ public class ContextRetrievalAction { @@ -60,9 +62,9 @@ public static void processContextRetrievalRequest(Event event, RunnerContext ctx contextRetrievalRequestEvent.getVectorStore(), ResourceType.VECTOR_STORE); - // TODO: python vector store doesn't support async execution yet, see - // https://github.com/apache/flink-agents/issues/448 for details. - ragAsync = ragAsync && !(vectorStore instanceof PythonVectorStore); + if ((vectorStore instanceof PythonVectorStore) && !supportAsync()) { + ragAsync = false; + } final VectorStoreQuery vectorStoreQuery = new VectorStoreQuery( diff --git a/plan/src/main/java/org/apache/flink/agents/plan/actions/Utils.java b/plan/src/main/java/org/apache/flink/agents/plan/actions/Utils.java new file mode 100644 index 000000000..8465e447b --- /dev/null +++ b/plan/src/main/java/org/apache/flink/agents/plan/actions/Utils.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.agents.plan.actions; + +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.StringTokenizer; + +public class Utils { + private static final Logger LOG = LoggerFactory.getLogger(Utils.class); + + /** + * Check whether the current Flink version supports the async execution for cross-language + * resource. + * + *

The async execution for java resource is supported only on flink with the pemja 0.6.2 + * dependency. See flink-agents + * for details. + */ + public static boolean supportAsync() { + String version = EnvironmentInformation.getVersion(); + StringTokenizer st = new StringTokenizer(version, "."); + int major = Integer.parseInt(st.nextToken()); + int minor = Integer.parseInt(st.nextToken()); + int micro = Integer.parseInt(st.nextToken()); + + if ((major == 1 && (minor < 20 || micro <= 3)) + || (major == 2 && minor == 0 && micro <= 1) + || (major == 2 && minor == 1 && micro <= 1) + || (major == 2 && minor == 2 && micro <= 0)) { + LOG.debug( + "Flink {} doesn't support async execution for java resource, will fallback to sync execution.", + version); + return false; + } + + return true; + } +} diff --git a/python/flink_agents/api/resource.py b/python/flink_agents/api/resource.py index a5b1dd864..57d506677 100644 --- a/python/flink_agents/api/resource.py +++ b/python/flink_agents/api/resource.py @@ -90,6 +90,9 @@ def metric_group(self) -> "MetricGroup | None": """ return self._metric_group + def open(self) -> None: + """Open the resource.""" + def close(self) -> None: """Close the resource.""" @@ -120,14 +123,14 @@ class ResourceDescriptor(BaseModel): arguments: Dict[str, Any] def __init__( - self, - /, - *, - clazz: str | None = None, - target_module: str | None = None, - target_clazz: str | None = None, - arguments: Dict[str, Any] | None = None, - **kwargs: Any, + self, + /, + *, + clazz: str | None = None, + target_module: str | None = None, + target_clazz: str | None = None, + arguments: Dict[str, Any] | None = None, + **kwargs: Any, ) -> None: """Initialize ResourceDescriptor. @@ -182,9 +185,9 @@ def __eq__(self, other: object) -> bool: if not isinstance(other, ResourceDescriptor): return False return ( - self.target_module == other.target_module - and self.target_clazz == other.target_clazz - and self.arguments == other.arguments + self.target_module == other.target_module + and self.target_clazz == other.target_clazz + and self.arguments == other.arguments ) def __hash__(self) -> int: @@ -253,8 +256,12 @@ class ChatModel: TONGYI_SETUP = "flink_agents.integrations.chat_models.tongyi_chat_model.TongyiChatModelSetup" # Java Wrapper - JAVA_WRAPPER_CONNECTION = "flink_agents.api.chat_models.java_chat_model.JavaChatModelConnection" - JAVA_WRAPPER_SETUP = "flink_agents.api.chat_models.java_chat_model.JavaChatModelSetup" + JAVA_WRAPPER_CONNECTION = ( + "flink_agents.api.chat_models.java_chat_model.JavaChatModelConnection" + ) + JAVA_WRAPPER_SETUP = ( + "flink_agents.api.chat_models.java_chat_model.JavaChatModelSetup" + ) class Java: """Java implementations of ChatModel.""" @@ -307,7 +314,9 @@ class VectorStore: CHROMA_VECTOR_STORE = "flink_agents.integrations.vector_stores.chroma.chroma_vector_store.ChromaVectorStore" # Java Wrapper - JAVA_WRAPPER_VECTOR_STORE = "flink_agents.api.vector_stores.java_vector_store.JavaVectorStore" + JAVA_WRAPPER_VECTOR_STORE = ( + "flink_agents.api.vector_stores.java_vector_store.JavaVectorStore" + ) JAVA_WRAPPER_COLLECTION_MANAGEABLE_VECTOR_STORE = "flink_agents.api.vector_stores.java_vector_store.JavaCollectionManageableVectorStore" class Java: diff --git a/python/flink_agents/plan/actions/chat_model_action.py b/python/flink_agents/plan/actions/chat_model_action.py index 02e92ed8e..81f8eeee7 100644 --- a/python/flink_agents/plan/actions/chat_model_action.py +++ b/python/flink_agents/plan/actions/chat_model_action.py @@ -40,6 +40,7 @@ from flink_agents.api.resource import ResourceType from flink_agents.api.runner_context import RunnerContext from flink_agents.plan.actions.action import Action +from flink_agents.plan.actions.utils import support_async from flink_agents.plan.function import PythonFunction if TYPE_CHECKING: @@ -179,11 +180,13 @@ async def chat( ) chat_async = ctx.config.get(AgentExecutionOptions.CHAT_ASYNC) - # java chat model doesn't support async execution, - # see https://github.com/apache/flink-agents/issues/448 for details. - chat_async = chat_async and not isinstance(chat_model, JavaChatModelSetup) - error_handling_strategy = ctx.config.get(AgentExecutionOptions.ERROR_HANDLING_STRATEGY) + if isinstance(chat_model, JavaChatModelSetup) and not support_async(): + chat_async = False + + error_handling_strategy = ctx.config.get( + AgentExecutionOptions.ERROR_HANDLING_STRATEGY + ) num_retries = 0 if error_handling_strategy == ErrorHandlingStrategy.RETRY: num_retries = max(0, ctx.config.get(AgentExecutionOptions.MAX_RETRIES)) @@ -196,8 +199,16 @@ async def chat( else: response = ctx.durable_execute(chat_model.chat, messages) - if response.extra_args.get("model_name") and response.extra_args.get("promptTokens") and response.extra_args.get("completionTokens"): - chat_model._record_token_metrics(response.extra_args["model_name"], response.extra_args["promptTokens"], response.extra_args["completionTokens"]) + if ( + response.extra_args.get("model_name") + and response.extra_args.get("promptTokens") + and response.extra_args.get("completionTokens") + ): + chat_model._record_token_metrics( + response.extra_args["model_name"], + response.extra_args["promptTokens"], + response.extra_args["completionTokens"], + ) if output_schema is not None and len(response.tool_calls) == 0: response = _generate_structured_output(response, output_schema) break diff --git a/python/flink_agents/plan/actions/context_retrieval_action.py b/python/flink_agents/plan/actions/context_retrieval_action.py index d484e7822..e56134dc1 100644 --- a/python/flink_agents/plan/actions/context_retrieval_action.py +++ b/python/flink_agents/plan/actions/context_retrieval_action.py @@ -28,10 +28,12 @@ from flink_agents.api.vector_stores.java_vector_store import JavaVectorStore from flink_agents.api.vector_stores.vector_store import VectorStoreQuery from flink_agents.plan.actions.action import Action +from flink_agents.plan.actions.utils import support_async from flink_agents.plan.function import PythonFunction _logger = logging.getLogger(__name__) + async def process_context_retrieval_request(event: Event, ctx: RunnerContext) -> None: """Built-in action for processing context retrieval requests.""" if isinstance(event, ContextRetrievalRequestEvent): @@ -40,9 +42,10 @@ async def process_context_retrieval_request(event: Event, ctx: RunnerContext) -> query = VectorStoreQuery(query_text=event.query, limit=event.max_results) rag_async = ctx.config.get(AgentExecutionOptions.RAG_ASYNC) - # java vector store doesn't support async execution - # see https://github.com/apache/flink-agents/issues/448 for details. - rag_async = rag_async and not isinstance(vector_store, JavaVectorStore) + + if isinstance(vector_store, JavaVectorStore) and not support_async(): + rag_async = False + if rag_async: # To avoid https://github.com/alibaba/pemja/issues/88, # we log a message here. diff --git a/python/flink_agents/plan/actions/utils.py b/python/flink_agents/plan/actions/utils.py new file mode 100644 index 000000000..6730546c4 --- /dev/null +++ b/python/flink_agents/plan/actions/utils.py @@ -0,0 +1,50 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# +import functools +import logging +from importlib.metadata import version +from typing import List, Tuple + +from packaging import version as pkg_version + +UNSUPPORTED_RANGES: List[Tuple[str, str]] = [ + ("1.0.0", "1.20.3"), + ("2.0.0", "2.0.1"), + ("2.1.0", "2.1.1"), + ("2.2.0", "2.2.0"), +] + + +@functools.lru_cache(maxsize=1) +def support_async() -> bool: + """Check whether the current Flink version supports the async execution for + cross-language resource. + + The async execution for java resource is supported only on flink with + the pemja 0.6.2 dependency. See https://github.com/apache/flink-agents/pull/571 + for details. + """ + current = pkg_version.parse(version("apache-flink")) + + for min_ver, max_ver in UNSUPPORTED_RANGES: + if pkg_version.parse(min_ver) <= current <= pkg_version.parse(max_ver): + logging.debug( + f"Flink {current} doesn't support async execution for java resource, will fallback to sync execution." + ) + return False + return True diff --git a/python/flink_agents/plan/agent_plan.py b/python/flink_agents/plan/agent_plan.py index daf661286..8599e5737 100644 --- a/python/flink_agents/plan/agent_plan.py +++ b/python/flink_agents/plan/agent_plan.py @@ -218,6 +218,7 @@ def get_resource(self, name: str, type: ResourceType) -> Resource: resource = resource_provider.provide( get_resource=self.get_resource, config=self.config ) + resource.open() self.__resources[type][name] = resource return self.__resources[type][name] diff --git a/python/flink_agents/runtime/java/java_chat_model.py b/python/flink_agents/runtime/java/java_chat_model.py index 2263b68e8..a6e8b82a4 100644 --- a/python/flink_agents/runtime/java/java_chat_model.py +++ b/python/flink_agents/runtime/java/java_chat_model.py @@ -123,6 +123,17 @@ def model_kwargs(self) -> Dict[str, Any]: """ return {} + @override + def open(self) -> None: + """Trigger construction for resource objects. + + Currently, in cross-language invocation scenarios, constructing resource + object within an async thread may encounter issues. We resolved this issue + by moving the construction of the resources object out of the method to be + async executed and invoking it in the main thread. + """ + self._j_resource.open() + @override def chat(self, messages: Sequence[ChatMessage], **kwargs: Any) -> ChatMessage: """Execute chat conversation by delegating to Java implementation. diff --git a/python/flink_agents/runtime/java/java_embedding_model.py b/python/flink_agents/runtime/java/java_embedding_model.py index c53a2a851..d561ce7db 100644 --- a/python/flink_agents/runtime/java/java_embedding_model.py +++ b/python/flink_agents/runtime/java/java_embedding_model.py @@ -17,6 +17,8 @@ ################################################################################# from typing import Any, Dict, Sequence +from typing_extensions import override + from flink_agents.api.embedding_models.java_embedding_model import ( JavaEmbeddingModelConnection, JavaEmbeddingModelSetup, @@ -96,6 +98,17 @@ def model_kwargs(self) -> Dict[str, Any]: """ return {} + @override + def open(self) -> None: + """Trigger construction for java resource objects. + + Currently, in cross-language invocation scenarios, constructing resource + object within an async thread may encounter issues. We resolved this issue + by moving the construction of the resources object out of the method to be + async executed and invoking it in the main thread. + """ + self._j_resource.open() + def embed(self, text: str | Sequence[str], **kwargs: Any) -> list[float] | list[list[float]]: """Generate embedding vector for a single text query. Converts the input text into a high-dimensional vector representation diff --git a/python/flink_agents/runtime/java/java_vector_store.py b/python/flink_agents/runtime/java/java_vector_store.py index 77f3167fd..b11154d1d 100644 --- a/python/flink_agents/runtime/java/java_vector_store.py +++ b/python/flink_agents/runtime/java/java_vector_store.py @@ -67,6 +67,17 @@ def __init__(self, j_resource: Any, j_resource_adapter: Any, **kwargs: Any) -> N def store_kwargs(self) -> Dict[str, Any]: return {} + @override + def open(self) -> None: + """Trigger construction for java resource objects. + + Currently, in cross-language invocation scenarios, constructing resource + object within an async thread may encounter issues. We resolved this issue + by moving the construction of the resources object out of the method to be + async executed and invoking it in the main thread. + """ + self._j_resource.open() + @override def add( self,