From 35ef66ee8081512443ab8c877dc6998cc6c046e1 Mon Sep 17 00:00:00 2001 From: Haocong Wang <75206183+Ryan-Nightwish@users.noreply.github.com> Date: Tue, 9 Jun 2026 18:12:43 +0800 Subject: [PATCH 01/10] [Feature] Add parallel LLM quickstart doc and examples for multi-action fan-out Add a new quickstart page and runnable examples (Java + Python) demonstrating parallel LLM invocations via multi-action fan-out. The example analyzes a restaurant review by fanning out sentiment judgments across three dimensions in parallel and aggregating the results with a final LLM call. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../get-started/quickstart/parallel_llm.md | 347 ++++++++++++++++++ .../examples/ParallelChatRequestExample.java | 83 +++++ .../examples/agents/ParallelChatAgent.java | 251 +++++++++++++ .../quickstart/agents/parallel_chat_agent.py | 196 ++++++++++ .../parallel_chat_request_example.py | 136 +++++++ 5 files changed, 1013 insertions(+) create mode 100644 docs/content/docs/get-started/quickstart/parallel_llm.md create mode 100644 examples/src/main/java/org/apache/flink/agents/examples/ParallelChatRequestExample.java create mode 100644 examples/src/main/java/org/apache/flink/agents/examples/agents/ParallelChatAgent.java create mode 100644 python/flink_agents/examples/quickstart/agents/parallel_chat_agent.py create mode 100644 python/flink_agents/examples/quickstart/parallel_chat_request_example.py diff --git a/docs/content/docs/get-started/quickstart/parallel_llm.md b/docs/content/docs/get-started/quickstart/parallel_llm.md new file mode 100644 index 000000000..fb9a2e3c0 --- /dev/null +++ b/docs/content/docs/get-started/quickstart/parallel_llm.md @@ -0,0 +1,347 @@ +--- +title: 'Parallel LLM Calls' +weight: 3 +type: docs +--- + + +## Overview + +Flink Agents supports parallel LLM invocations via multi-action fan-out. By emitting multiple `ChatRequestEvent` events from a single action, the framework's built-in chat action executes the corresponding LLM calls concurrently — no external orchestration is required. + +This quickstart introduces an example that demonstrates how to build a parallel LLM workflow with Flink Agents: + +The **Parallel Sentiment Analysis** agent processes a restaurant review and judges sentiment along three dimensions (taste / service / price) in parallel, then aggregates the results into a one-line summary with a final LLM call. The end-to-end wall clock time is roughly "slowest single branch + aggregation call", rather than the sum of all four calls. + +{{< hint info >}} +**JDK version note (Java only):** On JDK 21+, the framework uses the Continuation API to execute concurrent chat actions in parallel. On JDK < 21, the framework silently falls back to sequential execution — the result is identical, but the LLM calls run one after another. Python uses native coroutines and always executes in parallel regardless of the JDK version. +{{< /hint >}} + +## Code Walkthrough + +### Prepare Agents Execution Environment + +Create the agents execution environment, and register the available chat model connection to the environment. + +{{< tabs "Prepare Agents Execution Environment" >}} + +{{< tab "Python" >}} +```python +# Set up the Flink streaming environment and the Agents execution environment. +stream_env = StreamExecutionEnvironment.get_execution_environment() +stream_env.set_parallelism(1) +t_env = StreamTableEnvironment.create(stream_execution_environment=stream_env) +agents_env = AgentsExecutionEnvironment.get_execution_environment( + env=stream_env, t_env=t_env +) + +# Add Ollama chat model connection to be used by the ParallelChatAgent. +agents_env.add_resource( + "ollama_server", + ResourceType.CHAT_MODEL_CONNECTION, + ResourceDescriptor( + clazz=ResourceName.ChatModel.OLLAMA_CONNECTION, + request_timeout=240.0, + ), +) +``` +{{< /tab >}} + +{{< tab "Java" >}} +```Java +// Set up the Flink streaming environment and the Agents execution environment. +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +env.setParallelism(1); +AgentsExecutionEnvironment agentsEnv = + AgentsExecutionEnvironment.getExecutionEnvironment(env); + +// Add Ollama chat model connection to be used by the ParallelChatAgent. +agentsEnv.addResource( + "ollamaChatModelConnection", + ResourceType.CHAT_MODEL_CONNECTION, + ResourceDescriptor.Builder.newBuilder(ResourceName.ChatModel.OLLAMA_CONNECTION) + .addInitialArgument("endpoint", "http://localhost:11434") + .addInitialArgument("requestTimeout", 240) + .build()); +``` +{{< /tab >}} + +{{< /tabs >}} + +### Create the Agent + +Below is the example code for the `ParallelChatAgent`. The agent defines a chat model for sentiment analysis and two actions: `request_aspect_judgments` fans out one `ChatRequestEvent` per dimension, and `handle_response` collects the results, triggers the aggregation call, and emits the final output. For more details, please refer to the [Workflow Agent]({{< ref "docs/development/workflow_agent" >}}) documentation. + +{{< tabs "Create the Agent" >}} + +{{< tab "Python" >}} +```python +class ParallelChatAgent(Agent): + """An agent that demonstrates parallel LLM invocations via fan-out of + multiple ChatRequestEvent events. + + This agent receives a restaurant review and uses an LLM to judge sentiment + along multiple dimensions in parallel, then aggregates the results into a + one-line summary with a final LLM call. It handles prompt construction, + parallel chat dispatch, response accumulation, and output assembly. + """ + + @chat_model_setup + @staticmethod + def sentiment_model() -> ResourceDescriptor: + """ChatModel for sentiment analysis.""" + return ResourceDescriptor( + clazz=ResourceName.ChatModel.OLLAMA_SETUP, + connection="ollama_server", + model=OLLAMA_MODEL, + extract_reasoning=True, + ) + + @action(InputEvent.EVENT_TYPE) + @staticmethod + def request_aspect_judgments(event: Event, ctx: RunnerContext) -> None: + """Process input event and send chat requests for each aspect.""" + row = _init_row(event) + _save_row(ctx, row) + for aspect in ASPECTS: + ctx.send_event(_build_aspect_request(row["text"], aspect)) + + @action(ChatResponseEvent.EVENT_TYPE) + @staticmethod + def handle_response(event: Event, ctx: RunnerContext) -> None: + """Process chat response event and send output event.""" + parsed = _parse_response(event) + row = _load_row(ctx) + if _is_final(parsed): + ctx.send_event(_build_output_event(row, parsed)) + return + row["sentiments"][parsed.aspect] = parsed.result + _save_row(ctx, row) + if _all_aspects_received(row): + ctx.send_event(_build_summarize_request(row)) +``` +{{< /tab >}} + +{{< tab "Java" >}} +```Java +/** + * An agent that demonstrates parallel LLM invocations via fan-out of multiple + * ChatRequestEvent events. + */ +public class ParallelChatAgent extends Agent { + + @ChatModelSetup + public static ResourceDescriptor sentimentModel() { + return ResourceDescriptor.Builder.newBuilder(ResourceName.ChatModel.OLLAMA_SETUP) + .addInitialArgument("connection", "ollamaChatModelConnection") + .addInitialArgument("model", OLLAMA_MODEL) + .addInitialArgument("extract_reasoning", true) + .build(); + } + + @Action(listenEventTypes = {InputEvent.EVENT_TYPE}) + public static void requestAspectJudgments(Event event, RunnerContext ctx) + throws Exception { + Map row = initRow(event); + saveRow(ctx, row); + for (String aspect : ASPECTS) { + ctx.sendEvent(buildAspectRequest((String) row.get("text"), aspect)); + } + } + + @Action(listenEventTypes = {ChatResponseEvent.EVENT_TYPE}) + public static void handleResponse(Event event, RunnerContext ctx) throws Exception { + Object parsed = parseResponse(event); + Map row = loadRow(ctx); + + if (parsed instanceof SummaryResponse) { + SummaryResponse summary = (SummaryResponse) parsed; + ctx.sendEvent(buildOutputEvent(row, summary)); + return; + } + + AspectResponse aspect = (AspectResponse) parsed; + Map sentiments = (Map) row.get("sentiments"); + sentiments.put(aspect.aspect, aspect.result); + saveRow(ctx, row); + if (allAspectsReceived(row)) { + ctx.sendEvent(buildSummarizeRequest(row)); + } + } +} +``` +{{< /tab >}} + +{{< /tabs >}} + +### Integrate the Agent with Flink + +Create the input data, use the `ParallelChatAgent` to analyze the review with parallel LLM calls, and print the results. + +{{< tabs "Integrate the Agent with Flink" >}} + +{{< tab "Python" >}} +```python +# Create input table with a single restaurant review. +input_table = t_env.from_elements( + elements=[(1, INPUT_TEXT)], + schema=DataTypes.ROW( + [ + DataTypes.FIELD("id", DataTypes.INT()), + DataTypes.FIELD("text", DataTypes.STRING()), + ] + ), +) + +# Use the ParallelChatAgent to analyze the review with parallel LLM calls. +output_table = ( + agents_env.from_table(input=input_table, key_selector=ParallelChatKeySelector()) + .apply(ParallelChatAgent()) + .to_table(schema=output_schema, output_type=output_type) +) + +# Execute the Flink pipeline. +output_table.execute_insert("sink").wait() +``` +{{< /tab >}} + +{{< tab "Java" >}} +```Java +// Create input stream with a single restaurant review. +DataStream inputStream = + env.fromElements(new SentimentRequest(1, ParallelChatAgent.INPUT_TEXT)); + +// Use the ParallelChatAgent to analyze the review with parallel LLM calls. +DataStream outputStream = + agentsEnv + .fromDataStream(inputStream, new SentimentKeySelector()) + .apply(new ParallelChatAgent()) + .toDataStream(); + +// Print the analysis results to stdout. +outputStream.print(); + +// Execute the Flink pipeline. +agentsEnv.execute(); +``` +{{< /tab >}} + +{{< /tabs >}} + +## Run the Example + +### Prerequisites + +* Unix-like environment (we use Linux, Mac OS X, Cygwin, WSL) +* Git +* Java 11+ (Java 21+ recommended for parallel execution on the Java side) +* Python 3.10, 3.11 or 3.12 + +### Preparation + +#### Prepare Flink and Flink Agents + +Follow the [installation]({{< ref "docs/get-started/installation" >}}) instructions to setup Flink and the Flink Agents. + +#### Clone the Flink Agents Repository (if not done already) + +```bash +git clone https://github.com/apache/flink-agents.git +cd flink-agents +``` +{{< hint info >}} +For python examples, you can skip this step and submit the python file in installed flink-agents wheel. +{{< /hint >}} + +#### Deploy a Standalone Flink Cluster + +You can deploy a standalone Flink cluster in your local environment with the following command. + +{{< tabs "Deploy a Standalone Flink Cluster" >}} + +{{< tab "Python" >}} +```bash +export PYTHONPATH=$(python -c 'import sysconfig; print(sysconfig.get_paths()["purelib"])') +$FLINK_HOME/bin/start-cluster.sh +``` +{{< /tab >}} + +{{< tab "Java" >}} +1. Build Flink Agents from source to generate example jar. See [installation]({{< ref "docs/get-started/installation" >}}) for more details. +2. Start the Flink cluster + ```bash + $FLINK_HOME/bin/start-cluster.sh + ``` + +{{< hint info >}} +To run example on JDK 21+, append jvm option `--add-exports=java.base/jdk.internal.vm=ALL-UNNAMED` to [env.java.opts.all](https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/config/#env-java-opts-all) in `$FLINK_HOME/conf/config.yaml` before start the flink cluster. +{{< /hint >}} +{{< /tab >}} + +{{< /tabs >}} +You can refer to the [local cluster](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/try-flink/local_installation/#starting-and-stopping-a-local-cluster) instructions for more detailed step. + +{{< hint info >}} +If you can't navigate to the web UI at [localhost:8081](localhost:8081), you can find the reason in `$FLINK_HOME/log`. If the reason is port conflict, you can change the port in `$FLINK_HOME/conf/config.yaml`. +{{< /hint >}} + +#### Prepare Ollama + +Download and install Ollama from the official [website](https://ollama.com/download). + +{{< hint info >}} +Ollama server **0.9.0** or higher is required. +{{< /hint >}} + +Then pull the qwen3:1.7b model, which is required by the parallel LLM example + +```bash +ollama pull qwen3:1.7b +``` + +### Submit Flink Agents Job to Standalone Flink Cluster + +#### Submit to Flink Cluster + +{{< tabs "Submit to Flink Cluster" >}} + +{{< tab "Python" >}} +```bash +export PYTHONPATH=$(python -c 'import sysconfig; print(sysconfig.get_paths()["purelib"])') + +# Run parallel chat request example +$FLINK_HOME/bin/flink run -py ./flink-agents/python/flink_agents/examples/quickstart/parallel_chat_request_example.py +# or submit the example python file in installed flink-agents wheel +$FLINK_HOME/bin/flink run -py $PYTHONPATH/flink_agents/examples/quickstart/parallel_chat_request_example.py +``` +{{< /tab >}} + +{{< tab "Java" >}} +```bash +$FLINK_HOME/bin/flink run -c org.apache.flink.agents.examples.ParallelChatRequestExample ./flink-agents/examples/target/flink-agents-examples-$VERSION.jar +``` +{{< /tab >}} + +{{< /tabs >}} + +Now you should see a Flink job submitted to the Flink Cluster in Flink web UI [localhost:8081]( +localhost:8081) + +After a few minutes, you can check for the output in the TaskManager output log. diff --git a/examples/src/main/java/org/apache/flink/agents/examples/ParallelChatRequestExample.java b/examples/src/main/java/org/apache/flink/agents/examples/ParallelChatRequestExample.java new file mode 100644 index 000000000..7a6c63be8 --- /dev/null +++ b/examples/src/main/java/org/apache/flink/agents/examples/ParallelChatRequestExample.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.agents.examples; + +import org.apache.flink.agents.api.AgentsExecutionEnvironment; +import org.apache.flink.agents.api.resource.ResourceDescriptor; +import org.apache.flink.agents.api.resource.ResourceName; +import org.apache.flink.agents.api.resource.ResourceType; +import org.apache.flink.agents.examples.agents.ParallelChatAgent; +import org.apache.flink.agents.examples.agents.ParallelChatAgent.SentimentKeySelector; +import org.apache.flink.agents.examples.agents.ParallelChatAgent.SentimentRequest; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +/** + * Java example demonstrating parallel LLM invocations via multi-action fan-out. + * + *

This example demonstrates how to use the Flink Agents to analyze a restaurant review by + * fanning out multiple parallel LLM calls — one per sentiment dimension — and aggregating the + * results with a final LLM call. This serves as a minimal, end-to-end example of integrating + * parallel LLM-powered agents with Flink streaming jobs. + */ +public class ParallelChatRequestExample { + + /** Runs the example pipeline. */ + public static void main(String[] args) throws Exception { + System.out.println("=== Parallel ChatRequest Example ==="); + System.out.println("Model: " + ParallelChatAgent.OLLAMA_MODEL); + System.out.println("Input: " + ParallelChatAgent.INPUT_TEXT); + + // Set up the Flink streaming environment and the Agents execution environment. + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + AgentsExecutionEnvironment agentsEnv = + AgentsExecutionEnvironment.getExecutionEnvironment(env); + + // Add Ollama chat model connection to be used by the ParallelChatAgent. + agentsEnv.addResource( + "ollamaChatModelConnection", + ResourceType.CHAT_MODEL_CONNECTION, + ResourceDescriptor.Builder.newBuilder(ResourceName.ChatModel.OLLAMA_CONNECTION) + .addInitialArgument("endpoint", "http://localhost:11434") + .addInitialArgument("requestTimeout", 240) + .build()); + + // Create input stream with a single restaurant review. + DataStream inputStream = + env.fromElements(new SentimentRequest(1, ParallelChatAgent.INPUT_TEXT)); + + // Use the ParallelChatAgent to analyze the review with parallel LLM calls. + DataStream outputStream = + agentsEnv + .fromDataStream(inputStream, new SentimentKeySelector()) + .apply(new ParallelChatAgent()) + .toDataStream(); + + // Print the analysis results to stdout. + outputStream.print(); + + // Execute the Flink pipeline. + long wallStart = System.currentTimeMillis(); + agentsEnv.execute(); + long wallElapsed = System.currentTimeMillis() - wallStart; + + System.out.println("End-to-end wall time: " + wallElapsed + "ms"); + System.out.println("=== Done ==="); + } +} diff --git a/examples/src/main/java/org/apache/flink/agents/examples/agents/ParallelChatAgent.java b/examples/src/main/java/org/apache/flink/agents/examples/agents/ParallelChatAgent.java new file mode 100644 index 000000000..6155a2952 --- /dev/null +++ b/examples/src/main/java/org/apache/flink/agents/examples/agents/ParallelChatAgent.java @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.agents.examples.agents; + +import org.apache.flink.agents.api.Event; +import org.apache.flink.agents.api.InputEvent; +import org.apache.flink.agents.api.OutputEvent; +import org.apache.flink.agents.api.agents.Agent; +import org.apache.flink.agents.api.annotation.Action; +import org.apache.flink.agents.api.annotation.ChatModelSetup; +import org.apache.flink.agents.api.chat.messages.ChatMessage; +import org.apache.flink.agents.api.chat.messages.MessageRole; +import org.apache.flink.agents.api.context.RunnerContext; +import org.apache.flink.agents.api.event.ChatRequestEvent; +import org.apache.flink.agents.api.event.ChatResponseEvent; +import org.apache.flink.agents.api.resource.ResourceDescriptor; +import org.apache.flink.agents.api.resource.ResourceName; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.types.Row; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.StringJoiner; + +import static org.apache.flink.agents.api.agents.Agent.STRUCTURED_OUTPUT; + +/** + * An agent that demonstrates parallel LLM invocations via fan-out of multiple {@link + * ChatRequestEvent} events. + * + *

This agent receives a restaurant review and uses an LLM to judge sentiment along multiple + * dimensions (taste / service / price) in parallel, then aggregates the results into a one-line + * summary with a final LLM call. It handles prompt construction, parallel chat dispatch, response + * accumulation, and output assembly. + * + *

JDK version note: On JDK 21+, the framework uses the Continuation API to execute + * concurrent chat actions in parallel, so the wall clock time is roughly "slowest single branch + + * aggregation call". On JDK < 21, the framework silently falls back to sequential execution — + * the result is identical, but the LLM calls run one after another. + */ +public class ParallelChatAgent extends Agent { + + /** Ollama model name, configurable via environment variable. */ + public static final String OLLAMA_MODEL = + System.getenv().getOrDefault("PARALLEL_CHAT_OLLAMA_MODEL", "qwen3:1.7b"); + + /** Input text for the demo. */ + public static final String INPUT_TEXT = "The food here is great, but the service is too slow"; + + private static final String[] ASPECTS = {"taste", "service", "price"}; + private static final int N_ASPECTS = ASPECTS.length; + + private static final String PARALLEL_SYSTEM_PROMPT = + "You are a sentiment analysis assistant. Return JSON: " + + "{\"aspect\":\"\", \"result\":\"\"}" + + " — no explanation, no extra fields."; + private static final String AGGREGATE_SYSTEM_PROMPT = + "You are a summary assistant. Based on the sentiment judgments for three " + + "dimensions, compose a brief one-line evaluation. Return JSON: " + + "{\"summary\":\"taste:, " + + "service:, " + + "price:\"} — return only this JSON."; + + /** LLM response for a single aspect judgment. */ + public static class AspectResponse implements Serializable { + private static final long serialVersionUID = 1L; + public String aspect; + public String result; + + public AspectResponse() {} + + @Override + public String toString() { + return String.format("AspectResponse{aspect='%s', result='%s'}", aspect, result); + } + } + + /** LLM response for the aggregation phase. */ + public static class SummaryResponse implements Serializable { + private static final long serialVersionUID = 1L; + public String summary; + + public SummaryResponse() {} + + @Override + public String toString() { + return String.format("SummaryResponse{summary='%s'}", summary); + } + } + + /** Single input record carrying an id and text. */ + public static class SentimentRequest implements Serializable { + private static final long serialVersionUID = 1L; + public final int id; + public final String text; + + public SentimentRequest(int id, String text) { + this.id = id; + this.text = text; + } + + @Override + public String toString() { + return String.format("SentimentRequest{id=%d, text='%s'}", id, text); + } + } + + /** Key selector that extracts the id field from a {@link SentimentRequest}. */ + public static class SentimentKeySelector implements KeySelector { + @Override + public Integer getKey(SentimentRequest request) { + return request.id; + } + } + + @ChatModelSetup + public static ResourceDescriptor sentimentModel() { + return ResourceDescriptor.Builder.newBuilder(ResourceName.ChatModel.OLLAMA_SETUP) + .addInitialArgument("connection", "ollamaChatModelConnection") + .addInitialArgument("model", OLLAMA_MODEL) + .addInitialArgument("extract_reasoning", true) + .build(); + } + + private static Map initRow(Event event) { + InputEvent inputEvent = InputEvent.fromEvent(event); + SentimentRequest request = (SentimentRequest) inputEvent.getInput(); + Map row = new HashMap<>(); + row.put("id", request.id); + row.put("text", request.text); + row.put("sentiments", new HashMap()); + return row; + } + + private static void saveRow(RunnerContext ctx, Map row) throws Exception { + ctx.getSensoryMemory().set("res", row); + } + + @SuppressWarnings("unchecked") + private static Map loadRow(RunnerContext ctx) throws Exception { + return (Map) ctx.getSensoryMemory().get("res").getValue(); + } + + private static ChatRequestEvent buildAspectRequest(String text, String aspect) { + List messages = + List.of( + new ChatMessage(MessageRole.SYSTEM, PARALLEL_SYSTEM_PROMPT), + new ChatMessage( + MessageRole.USER, + "Judge the \"" + aspect + "\" dimension: " + text)); + return new ChatRequestEvent("sentimentModel", messages, AspectResponse.class); + } + + @SuppressWarnings("unchecked") + private static ChatRequestEvent buildSummarizeRequest(Map row) { + Map sentiments = (Map) row.get("sentiments"); + StringJoiner sj = new StringJoiner(" "); + for (String aspect : ASPECTS) { + sj.add(aspect + ":" + sentiments.get(aspect)); + } + String body = "Original: " + row.get("text") + "\nJudgments: " + sj; + List messages = + List.of( + new ChatMessage(MessageRole.SYSTEM, AGGREGATE_SYSTEM_PROMPT), + new ChatMessage(MessageRole.USER, body)); + return new ChatRequestEvent("sentimentModel", messages, SummaryResponse.class); + } + + private static OutputEvent buildOutputEvent(Map row, SummaryResponse parsed) { + Row output = Row.withNames(); + output.setField("id", row.get("id")); + output.setField("text", row.get("text")); + output.setField("summary", parsed.summary); + return new OutputEvent(output); + } + + private static Object parseResponse(Event event) { + ChatResponseEvent chatResponse = ChatResponseEvent.fromEvent(event); + ChatMessage response = chatResponse.getResponse(); + return response.getExtraArgs().get(STRUCTURED_OUTPUT); + } + + @SuppressWarnings("unchecked") + private static boolean allAspectsReceived(Map row) { + Map sentiments = (Map) row.get("sentiments"); + return sentiments.size() == N_ASPECTS; + } + + /** Process input event and fan-out chat requests for each aspect. */ + @Action(listenEventTypes = {InputEvent.EVENT_TYPE}) + public static void requestAspectJudgments(Event event, RunnerContext ctx) throws Exception { + Map row = initRow(event); + saveRow(ctx, row); + for (String aspect : ASPECTS) { + ctx.sendEvent(buildAspectRequest((String) row.get("text"), aspect)); + } + } + + /** Process chat response event. */ + @SuppressWarnings("unchecked") + @Action(listenEventTypes = {ChatResponseEvent.EVENT_TYPE}) + public static void handleResponse(Event event, RunnerContext ctx) throws Exception { + Object parsed = parseResponse(event); + Map row = loadRow(ctx); + + if (parsed instanceof SummaryResponse) { + SummaryResponse summary = (SummaryResponse) parsed; + System.out.println( + "FINAL summary=" + + summary.summary + + " (t=" + + System.nanoTime() / 1_000_000_000.0 + + ")"); + ctx.sendEvent(buildOutputEvent(row, summary)); + return; + } + + AspectResponse aspect = (AspectResponse) parsed; + System.out.println( + "ASPECT " + + aspect.aspect + + "=" + + aspect.result + + " (t=" + + System.nanoTime() / 1_000_000_000.0 + + ")"); + Map sentiments = (Map) row.get("sentiments"); + sentiments.put(aspect.aspect, aspect.result); + saveRow(ctx, row); + if (allAspectsReceived(row)) { + ctx.sendEvent(buildSummarizeRequest(row)); + } + } +} diff --git a/python/flink_agents/examples/quickstart/agents/parallel_chat_agent.py b/python/flink_agents/examples/quickstart/agents/parallel_chat_agent.py new file mode 100644 index 000000000..18e69b8ec --- /dev/null +++ b/python/flink_agents/examples/quickstart/agents/parallel_chat_agent.py @@ -0,0 +1,196 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# +import json +import os +import time +from typing import Any, Dict, Tuple + +from pydantic import BaseModel +from pyflink.common import Row +from pyflink.datastream import KeySelector + +from flink_agents.api.agents.agent import STRUCTURED_OUTPUT, Agent +from flink_agents.api.agents.types import OutputSchema +from flink_agents.api.chat_message import ChatMessage, MessageRole +from flink_agents.api.decorators import action, chat_model_setup +from flink_agents.api.events.chat_event import ChatRequestEvent, ChatResponseEvent +from flink_agents.api.events.event import Event, InputEvent, OutputEvent +from flink_agents.api.resource import ResourceDescriptor, ResourceName +from flink_agents.api.runner_context import RunnerContext + +OLLAMA_MODEL = os.environ.get("PARALLEL_CHAT_OLLAMA_MODEL", "qwen3:1.7b") + +INPUT_TEXT = "The food here is great, but the service is too slow" +ASPECTS: Tuple[str, ...] = ("taste", "service", "price") +N_ASPECTS = len(ASPECTS) + +PARALLEL_SYSTEM_PROMPT = ( + "You are a sentiment analysis assistant. Return JSON: " + '{"aspect":"", "result":""}' + " — no explanation, no extra fields." +) +AGGREGATE_SYSTEM_PROMPT = ( + "You are a summary assistant. Based on the sentiment judgments for three " + "dimensions, compose a brief one-line evaluation. Return JSON: " + '{"summary":"taste: service: price:"} — return only this JSON.' +) + + +class AspectResponse(BaseModel): + """LLM response for a single aspect judgment.""" + + aspect: str + result: str + + +class SummaryResponse(BaseModel): + """LLM response for the aggregation phase.""" + + summary: str + + +class ParallelChatKeySelector(KeySelector): + """Key selector that extracts the id field from the input row.""" + + def get_key(self, value: Row) -> int: + """Extract key from row.""" + return value[0] + + +def _init_row(event: Event) -> Dict[str, Any]: + """Build a row skeleton from the InputEvent.""" + payload = InputEvent.from_event(event).input + return {"id": payload["id"], "text": payload["text"], "sentiments": {}} + + +def _save_row(ctx: RunnerContext, row: Dict[str, Any]) -> None: + """Write the row to sensory memory.""" + ctx.sensory_memory.set("res", json.dumps(row, ensure_ascii=False)) + + +def _load_row(ctx: RunnerContext) -> Dict[str, Any]: + """Read the row from sensory memory.""" + return json.loads(ctx.sensory_memory.get("res")) + + +def _build_aspect_request(text: str, aspect: str) -> ChatRequestEvent: + """Build a ChatRequestEvent for a single aspect dimension.""" + return ChatRequestEvent( + model="sentiment_model", + messages=[ + ChatMessage(role=MessageRole.SYSTEM, content=PARALLEL_SYSTEM_PROMPT), + ChatMessage( + role=MessageRole.USER, + content=f'Judge the "{aspect}" dimension: {text}', + ), + ], + output_schema=OutputSchema(output_schema=AspectResponse), + ) + + +def _build_summarize_request(row: Dict[str, Any]) -> ChatRequestEvent: + """Build a ChatRequestEvent for the aggregation phase.""" + sentiments = row["sentiments"] + body = ( + f"Original: {row['text']}\n" + + "Judgments: " + + " ".join(f"{a}:{sentiments[a]}" for a in ASPECTS) + ) + return ChatRequestEvent( + model="sentiment_model", + messages=[ + ChatMessage(role=MessageRole.SYSTEM, content=AGGREGATE_SYSTEM_PROMPT), + ChatMessage(role=MessageRole.USER, content=body), + ], + output_schema=OutputSchema(output_schema=SummaryResponse), + ) + + +def _build_output_event(row: Dict[str, Any], parsed: SummaryResponse) -> OutputEvent: + """Pack row fields and summary into the final OutputEvent.""" + return OutputEvent( + output=Row(id=row["id"], text=row["text"], summary=parsed.summary) + ) + + +def _parse_response(event: Event) -> AspectResponse | SummaryResponse: + """Parse a ChatResponseEvent into a structured response object.""" + response = ChatResponseEvent.from_event(event).response + raw = response.extra_args[STRUCTURED_OUTPUT] + if isinstance(raw, BaseModel): + return raw + if "summary" in raw: + return SummaryResponse.model_validate(raw) + return AspectResponse.model_validate(raw) + + +def _is_final(parsed: AspectResponse | SummaryResponse) -> bool: + """Return True if the parsed response is from the aggregation phase.""" + return isinstance(parsed, SummaryResponse) + + +def _all_aspects_received(row: Dict[str, Any]) -> bool: + """Return True if all aspect judgments have been collected.""" + return len(row["sentiments"]) == N_ASPECTS + + +class ParallelChatAgent(Agent): + """An agent that demonstrates parallel LLM invocations via fan-out of + multiple ChatRequestEvent events. + + This agent receives a restaurant review and uses an LLM to judge sentiment + along multiple dimensions in parallel, then aggregates the results into a + one-line summary with a final LLM call. It handles prompt construction, + parallel chat dispatch, response accumulation, and output assembly. + """ + + @chat_model_setup + @staticmethod + def sentiment_model() -> ResourceDescriptor: + """ChatModel for sentiment analysis.""" + return ResourceDescriptor( + clazz=ResourceName.ChatModel.OLLAMA_SETUP, + connection="ollama_server", + model=OLLAMA_MODEL, + extract_reasoning=True, + ) + + @action(InputEvent.EVENT_TYPE) + @staticmethod + def request_aspect_judgments(event: Event, ctx: RunnerContext) -> None: + """Process input event and send chat requests for each aspect.""" + row = _init_row(event) + _save_row(ctx, row) + for aspect in ASPECTS: + ctx.send_event(_build_aspect_request(row["text"], aspect)) + + @action(ChatResponseEvent.EVENT_TYPE) + @staticmethod + def handle_response(event: Event, ctx: RunnerContext) -> None: + """Process chat response event and send output event.""" + parsed = _parse_response(event) + row = _load_row(ctx) + if _is_final(parsed): + print(f"FINAL summary={parsed.summary} (t={time.monotonic():.3f})") + ctx.send_event(_build_output_event(row, parsed)) + return + print(f"ASPECT {parsed.aspect}={parsed.result} (t={time.monotonic():.3f})") + row["sentiments"][parsed.aspect] = parsed.result + _save_row(ctx, row) + if _all_aspects_received(row): + ctx.send_event(_build_summarize_request(row)) diff --git a/python/flink_agents/examples/quickstart/parallel_chat_request_example.py b/python/flink_agents/examples/quickstart/parallel_chat_request_example.py new file mode 100644 index 000000000..e2f025e5a --- /dev/null +++ b/python/flink_agents/examples/quickstart/parallel_chat_request_example.py @@ -0,0 +1,136 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# +import json +import os +import sysconfig +import time +from pathlib import Path + +from pyflink.common.typeinfo import BasicTypeInfo, ExternalTypeInfo, RowTypeInfo +from pyflink.datastream import StreamExecutionEnvironment +from pyflink.table import DataTypes, Schema, StreamTableEnvironment, TableDescriptor + +from flink_agents.api.execution_environment import AgentsExecutionEnvironment +from flink_agents.api.resource import ResourceDescriptor, ResourceName, ResourceType +from flink_agents.examples.quickstart.agents.parallel_chat_agent import ( + INPUT_TEXT, + N_ASPECTS, + OLLAMA_MODEL, + ParallelChatAgent, + ParallelChatKeySelector, +) + + +def main() -> None: + """Main function for the parallel chat request quickstart example. + + This example demonstrates how to use Flink Agents to analyze a restaurant + review by fanning out multiple parallel LLM calls — one per sentiment + dimension — and aggregating the results with a final LLM call. This serves + as a minimal, end-to-end example of integrating parallel LLM-powered agents + with Flink streaming jobs. + """ + # Set up the Flink streaming environment and the Agents execution environment. + stream_env = StreamExecutionEnvironment.get_execution_environment() + stream_env.set_parallelism(1) + t_env = StreamTableEnvironment.create(stream_execution_environment=stream_env) + agents_env = AgentsExecutionEnvironment.get_execution_environment( + env=stream_env, t_env=t_env + ) + + # Add Ollama chat model connection to be used by the ParallelChatAgent. + agents_env.add_resource( + "ollama_server", + ResourceType.CHAT_MODEL_CONNECTION, + ResourceDescriptor( + clazz=ResourceName.ChatModel.OLLAMA_CONNECTION, + request_timeout=240.0, + ), + ) + + # Create input table with a single restaurant review. + input_table = t_env.from_elements( + elements=[(1, INPUT_TEXT)], + schema=DataTypes.ROW( + [ + DataTypes.FIELD("id", DataTypes.INT()), + DataTypes.FIELD("text", DataTypes.STRING()), + ] + ), + ) + + # Define output schema and type info. + output_type_info = RowTypeInfo( + [ + BasicTypeInfo.INT_TYPE_INFO(), + BasicTypeInfo.STRING_TYPE_INFO(), + BasicTypeInfo.STRING_TYPE_INFO(), + ], + ["id", "text", "summary"], + ) + output_type = ExternalTypeInfo(output_type_info) + output_schema = ( + Schema.new_builder() + .column("id", DataTypes.INT()) + .column("text", DataTypes.STRING()) + .column("summary", DataTypes.STRING()) + .build() + ) + + # Register a filesystem sink for collecting results. + result_dir = Path("/tmp/parallel_chat_results") + result_dir.mkdir(parents=True, exist_ok=True) + + t_env.create_temporary_table( + "sink", + TableDescriptor.for_connector("filesystem") + .option("path", str(result_dir.absolute())) + .format("json") + .schema(output_schema) + .build(), + ) + + # Use the ParallelChatAgent to analyze the review with parallel LLM calls. + output_table = ( + agents_env.from_table(input=input_table, key_selector=ParallelChatKeySelector()) + .apply(ParallelChatAgent()) + .to_table(schema=output_schema, output_type=output_type) + ) + + # Execute the Flink pipeline. + wall_start = time.monotonic() + output_table.execute_insert("sink").wait() + wall_elapsed = time.monotonic() - wall_start + + # Print the analysis results to stdout. + rows = [] + for file in result_dir.iterdir(): + if file.is_file(): + with file.open() as f: + for line in f: + line = line.strip() + if line: + rows.append(json.loads(line)) + + print(f"OUTPUT rows: {rows}") + print(f"End-to-end wall time: {wall_elapsed:.2f}s") + print("=== Done ===") + + +if __name__ == "__main__": + main() From 3212d385fc2363e4e7aa97fcadda9af46a20e5d7 Mon Sep 17 00:00:00 2001 From: Haocong Wang <75206183+Ryan-Nightwish@users.noreply.github.com> Date: Tue, 9 Jun 2026 18:16:29 +0800 Subject: [PATCH 02/10] [Feature] Add constraints note for multi-action fan-out in parallel LLM doc Co-Authored-By: Claude Opus 4.6 (1M context) --- docs/content/docs/get-started/quickstart/parallel_llm.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/docs/content/docs/get-started/quickstart/parallel_llm.md b/docs/content/docs/get-started/quickstart/parallel_llm.md index fb9a2e3c0..12535a8b4 100644 --- a/docs/content/docs/get-started/quickstart/parallel_llm.md +++ b/docs/content/docs/get-started/quickstart/parallel_llm.md @@ -191,6 +191,12 @@ public class ParallelChatAgent extends Agent { {{< /tabs >}} +{{< hint warning >}} +**Constraints when using multi-action fan-out:** +- **Fan-out action must exit immediately after sending events.** The action that emits multiple `ChatRequestEvent` events (e.g. `request_aspect_judgments`) should return right after calling `ctx.send_event(...)`. Do not `await`, block, or perform further business logic in the same action — the framework needs control back to schedule the parallel chat actions. +- **Response handlers on the same key execute sequentially.** Multiple `ChatResponseEvent` triggers for the same key are processed one at a time, not concurrently. This means the read-modify-write pattern on sensory memory (e.g. `load_row → update → save_row`) is safe and will not suffer from concurrent overwrites. + {{< /hint >}} + ### Integrate the Agent with Flink Create the input data, use the `ParallelChatAgent` to analyze the review with parallel LLM calls, and print the results. From 91b41643f338b28e81100a9a86c3ab4116ae5cc0 Mon Sep 17 00:00:00 2001 From: Haocong Wang <75206183+Ryan-Nightwish@users.noreply.github.com> Date: Wed, 10 Jun 2026 11:23:40 +0800 Subject: [PATCH 03/10] [Feature] Refactor parallel LLM examples: simplify code and align with project conventions - Move custom types (AspectResponse, SummaryResponse, SentimentRequest, SentimentKeySelector) from agent classes into shared CustomTypesAndResources (Java) and custom_types_and_resources.py (Python) to follow project convention - Simplify OutputEvent construction: use Map/dict instead of Row - Remove debug System.out.println / print statements from agent handlers - Java: use private fields + getters for SentimentRequest (matching POJO style) - Java: remove wall-time measurement and banner prints from Example main() - Python: rewrite example from from_table to from_datastream with env.from_collection(dict), matching workflow_single_agent_example style - Python: remove ParallelChatKeySelector (replaced by inline lambda) - Update parallel_llm.md doc code snippets to reflect the refactored code Co-Authored-By: Claude Opus 4.6 (1M context) --- .../get-started/quickstart/parallel_llm.md | 34 +++---- .../examples/ParallelChatRequestExample.java | 14 +-- .../agents/CustomTypesAndResources.java | 62 ++++++++++++ .../examples/agents/ParallelChatAgent.java | 95 +++---------------- .../agents/custom_types_and_resources.py | 15 +++ .../quickstart/agents/parallel_chat_agent.py | 32 +------ .../parallel_chat_request_example.py | 90 +++--------------- 7 files changed, 127 insertions(+), 215 deletions(-) diff --git a/docs/content/docs/get-started/quickstart/parallel_llm.md b/docs/content/docs/get-started/quickstart/parallel_llm.md index 12535a8b4..a0c77ca4d 100644 --- a/docs/content/docs/get-started/quickstart/parallel_llm.md +++ b/docs/content/docs/get-started/quickstart/parallel_llm.md @@ -45,12 +45,9 @@ Create the agents execution environment, and register the available chat model c {{< tab "Python" >}} ```python # Set up the Flink streaming environment and the Agents execution environment. -stream_env = StreamExecutionEnvironment.get_execution_environment() -stream_env.set_parallelism(1) -t_env = StreamTableEnvironment.create(stream_execution_environment=stream_env) -agents_env = AgentsExecutionEnvironment.get_execution_environment( - env=stream_env, t_env=t_env -) +env = StreamExecutionEnvironment.get_execution_environment() +env.set_parallelism(1) +agents_env = AgentsExecutionEnvironment.get_execution_environment(env) # Add Ollama chat model connection to be used by the ParallelChatAgent. agents_env.add_resource( @@ -205,26 +202,25 @@ Create the input data, use the `ParallelChatAgent` to analyze the review with pa {{< tab "Python" >}} ```python -# Create input table with a single restaurant review. -input_table = t_env.from_elements( - elements=[(1, INPUT_TEXT)], - schema=DataTypes.ROW( - [ - DataTypes.FIELD("id", DataTypes.INT()), - DataTypes.FIELD("text", DataTypes.STRING()), - ] - ), +# Create input stream with a single restaurant review. +input_stream = env.from_collection( + collection=[{"id": 1, "text": INPUT_TEXT}], ) # Use the ParallelChatAgent to analyze the review with parallel LLM calls. -output_table = ( - agents_env.from_table(input=input_table, key_selector=ParallelChatKeySelector()) +output_stream = ( + agents_env.from_datastream( + input=input_stream, key_selector=lambda x: x["id"] + ) .apply(ParallelChatAgent()) - .to_table(schema=output_schema, output_type=output_type) + .to_datastream() ) +# Print the analysis results to stdout. +output_stream.print() + # Execute the Flink pipeline. -output_table.execute_insert("sink").wait() +agents_env.execute() ``` {{< /tab >}} diff --git a/examples/src/main/java/org/apache/flink/agents/examples/ParallelChatRequestExample.java b/examples/src/main/java/org/apache/flink/agents/examples/ParallelChatRequestExample.java index 7a6c63be8..c84486131 100644 --- a/examples/src/main/java/org/apache/flink/agents/examples/ParallelChatRequestExample.java +++ b/examples/src/main/java/org/apache/flink/agents/examples/ParallelChatRequestExample.java @@ -21,9 +21,10 @@ import org.apache.flink.agents.api.resource.ResourceDescriptor; import org.apache.flink.agents.api.resource.ResourceName; import org.apache.flink.agents.api.resource.ResourceType; +import org.apache.flink.agents.examples.agents.CustomTypesAndResources; +import org.apache.flink.agents.examples.agents.CustomTypesAndResources.SentimentKeySelector; +import org.apache.flink.agents.examples.agents.CustomTypesAndResources.SentimentRequest; import org.apache.flink.agents.examples.agents.ParallelChatAgent; -import org.apache.flink.agents.examples.agents.ParallelChatAgent.SentimentKeySelector; -import org.apache.flink.agents.examples.agents.ParallelChatAgent.SentimentRequest; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -39,10 +40,6 @@ public class ParallelChatRequestExample { /** Runs the example pipeline. */ public static void main(String[] args) throws Exception { - System.out.println("=== Parallel ChatRequest Example ==="); - System.out.println("Model: " + ParallelChatAgent.OLLAMA_MODEL); - System.out.println("Input: " + ParallelChatAgent.INPUT_TEXT); - // Set up the Flink streaming environment and the Agents execution environment. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); @@ -73,11 +70,6 @@ public static void main(String[] args) throws Exception { outputStream.print(); // Execute the Flink pipeline. - long wallStart = System.currentTimeMillis(); agentsEnv.execute(); - long wallElapsed = System.currentTimeMillis() - wallStart; - - System.out.println("End-to-end wall time: " + wallElapsed + "ms"); - System.out.println("=== Done ==="); } } diff --git a/examples/src/main/java/org/apache/flink/agents/examples/agents/CustomTypesAndResources.java b/examples/src/main/java/org/apache/flink/agents/examples/agents/CustomTypesAndResources.java index 2fb1da8c7..2447fa185 100644 --- a/examples/src/main/java/org/apache/flink/agents/examples/agents/CustomTypesAndResources.java +++ b/examples/src/main/java/org/apache/flink/agents/examples/agents/CustomTypesAndResources.java @@ -26,7 +26,9 @@ import org.apache.flink.agents.api.prompt.Prompt; import org.apache.flink.agents.api.resource.ResourceDescriptor; import org.apache.flink.agents.api.resource.ResourceName; +import org.apache.flink.api.java.functions.KeySelector; +import java.io.Serializable; import java.util.Arrays; import java.util.List; @@ -224,6 +226,66 @@ public String toString() { } } + /** Single input record carrying an id and text for parallel sentiment analysis. */ + public static class SentimentRequest implements Serializable { + private static final long serialVersionUID = 1L; + private final int id; + private final String text; + + public SentimentRequest(int id, String text) { + this.id = id; + this.text = text; + } + + public int getId() { + return id; + } + + public String getText() { + return text; + } + + @Override + public String toString() { + return String.format("SentimentRequest{id=%d, text='%s'}", id, text); + } + } + + /** Key selector that extracts the id field from a {@link SentimentRequest}. */ + public static class SentimentKeySelector implements KeySelector { + @Override + public Integer getKey(SentimentRequest request) { + return request.getId(); + } + } + + /** LLM response for a single aspect judgment. */ + public static class AspectResponse implements Serializable { + private static final long serialVersionUID = 1L; + public String aspect; + public String result; + + public AspectResponse() {} + + @Override + public String toString() { + return String.format("AspectResponse{aspect='%s', result='%s'}", aspect, result); + } + } + + /** LLM response for the aggregation phase. */ + public static class SummaryResponse implements Serializable { + private static final long serialVersionUID = 1L; + public String summary; + + public SummaryResponse() {} + + @Override + public String toString() { + return String.format("SummaryResponse{summary='%s'}", summary); + } + } + /** Provides a summary of review data including suggestions for improvement. */ @JsonSerialize @JsonDeserialize diff --git a/examples/src/main/java/org/apache/flink/agents/examples/agents/ParallelChatAgent.java b/examples/src/main/java/org/apache/flink/agents/examples/agents/ParallelChatAgent.java index 6155a2952..b070c9a3d 100644 --- a/examples/src/main/java/org/apache/flink/agents/examples/agents/ParallelChatAgent.java +++ b/examples/src/main/java/org/apache/flink/agents/examples/agents/ParallelChatAgent.java @@ -30,10 +30,7 @@ import org.apache.flink.agents.api.event.ChatResponseEvent; import org.apache.flink.agents.api.resource.ResourceDescriptor; import org.apache.flink.agents.api.resource.ResourceName; -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.types.Row; -import java.io.Serializable; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -78,58 +75,6 @@ public class ParallelChatAgent extends Agent { + "service:, " + "price:\"} — return only this JSON."; - /** LLM response for a single aspect judgment. */ - public static class AspectResponse implements Serializable { - private static final long serialVersionUID = 1L; - public String aspect; - public String result; - - public AspectResponse() {} - - @Override - public String toString() { - return String.format("AspectResponse{aspect='%s', result='%s'}", aspect, result); - } - } - - /** LLM response for the aggregation phase. */ - public static class SummaryResponse implements Serializable { - private static final long serialVersionUID = 1L; - public String summary; - - public SummaryResponse() {} - - @Override - public String toString() { - return String.format("SummaryResponse{summary='%s'}", summary); - } - } - - /** Single input record carrying an id and text. */ - public static class SentimentRequest implements Serializable { - private static final long serialVersionUID = 1L; - public final int id; - public final String text; - - public SentimentRequest(int id, String text) { - this.id = id; - this.text = text; - } - - @Override - public String toString() { - return String.format("SentimentRequest{id=%d, text='%s'}", id, text); - } - } - - /** Key selector that extracts the id field from a {@link SentimentRequest}. */ - public static class SentimentKeySelector implements KeySelector { - @Override - public Integer getKey(SentimentRequest request) { - return request.id; - } - } - @ChatModelSetup public static ResourceDescriptor sentimentModel() { return ResourceDescriptor.Builder.newBuilder(ResourceName.ChatModel.OLLAMA_SETUP) @@ -141,10 +86,10 @@ public static ResourceDescriptor sentimentModel() { private static Map initRow(Event event) { InputEvent inputEvent = InputEvent.fromEvent(event); - SentimentRequest request = (SentimentRequest) inputEvent.getInput(); + CustomTypesAndResources.SentimentRequest request = (CustomTypesAndResources.SentimentRequest) inputEvent.getInput(); Map row = new HashMap<>(); - row.put("id", request.id); - row.put("text", request.text); + row.put("id", request.getId()); + row.put("text", request.getText()); row.put("sentiments", new HashMap()); return row; } @@ -165,7 +110,7 @@ private static ChatRequestEvent buildAspectRequest(String text, String aspect) { new ChatMessage( MessageRole.USER, "Judge the \"" + aspect + "\" dimension: " + text)); - return new ChatRequestEvent("sentimentModel", messages, AspectResponse.class); + return new ChatRequestEvent("sentimentModel", messages, CustomTypesAndResources.AspectResponse.class); } @SuppressWarnings("unchecked") @@ -180,14 +125,14 @@ private static ChatRequestEvent buildSummarizeRequest(Map row) { List.of( new ChatMessage(MessageRole.SYSTEM, AGGREGATE_SYSTEM_PROMPT), new ChatMessage(MessageRole.USER, body)); - return new ChatRequestEvent("sentimentModel", messages, SummaryResponse.class); + return new ChatRequestEvent("sentimentModel", messages, CustomTypesAndResources.SummaryResponse.class); } - private static OutputEvent buildOutputEvent(Map row, SummaryResponse parsed) { - Row output = Row.withNames(); - output.setField("id", row.get("id")); - output.setField("text", row.get("text")); - output.setField("summary", parsed.summary); + private static OutputEvent buildOutputEvent(Map row, CustomTypesAndResources.SummaryResponse parsed) { + Map output = new HashMap<>(); + output.put("id", row.get("id")); + output.put("text", row.get("text")); + output.put("summary", parsed.summary); return new OutputEvent(output); } @@ -220,27 +165,13 @@ public static void handleResponse(Event event, RunnerContext ctx) throws Excepti Object parsed = parseResponse(event); Map row = loadRow(ctx); - if (parsed instanceof SummaryResponse) { - SummaryResponse summary = (SummaryResponse) parsed; - System.out.println( - "FINAL summary=" - + summary.summary - + " (t=" - + System.nanoTime() / 1_000_000_000.0 - + ")"); + if (parsed instanceof CustomTypesAndResources.SummaryResponse) { + CustomTypesAndResources.SummaryResponse summary = (CustomTypesAndResources.SummaryResponse) parsed; ctx.sendEvent(buildOutputEvent(row, summary)); return; } - AspectResponse aspect = (AspectResponse) parsed; - System.out.println( - "ASPECT " - + aspect.aspect - + "=" - + aspect.result - + " (t=" - + System.nanoTime() / 1_000_000_000.0 - + ")"); + CustomTypesAndResources.AspectResponse aspect = (CustomTypesAndResources.AspectResponse) parsed; Map sentiments = (Map) row.get("sentiments"); sentiments.put(aspect.aspect, aspect.result); saveRow(ctx, row); diff --git a/python/flink_agents/examples/quickstart/agents/custom_types_and_resources.py b/python/flink_agents/examples/quickstart/agents/custom_types_and_resources.py index 3bd463f97..d759f1a48 100644 --- a/python/flink_agents/examples/quickstart/agents/custom_types_and_resources.py +++ b/python/flink_agents/examples/quickstart/agents/custom_types_and_resources.py @@ -196,6 +196,21 @@ class ProductReviewAnalysisRes(BaseModel): reasons: list[str] +# Custom types for parallel chat agent. +class AspectResponse(BaseModel): + """LLM response for a single aspect judgment.""" + + aspect: str + result: str + + +class SummaryResponse(BaseModel): + """LLM response for the aggregation phase.""" + + summary: str + + + # ollama chat model connection descriptor ollama_server_descriptor = ResourceDescriptor( clazz=ResourceName.ChatModel.OLLAMA_CONNECTION, request_timeout=120 diff --git a/python/flink_agents/examples/quickstart/agents/parallel_chat_agent.py b/python/flink_agents/examples/quickstart/agents/parallel_chat_agent.py index 18e69b8ec..64e4f4637 100644 --- a/python/flink_agents/examples/quickstart/agents/parallel_chat_agent.py +++ b/python/flink_agents/examples/quickstart/agents/parallel_chat_agent.py @@ -17,12 +17,9 @@ ################################################################################# import json import os -import time from typing import Any, Dict, Tuple from pydantic import BaseModel -from pyflink.common import Row -from pyflink.datastream import KeySelector from flink_agents.api.agents.agent import STRUCTURED_OUTPUT, Agent from flink_agents.api.agents.types import OutputSchema @@ -32,6 +29,10 @@ from flink_agents.api.events.event import Event, InputEvent, OutputEvent from flink_agents.api.resource import ResourceDescriptor, ResourceName from flink_agents.api.runner_context import RunnerContext +from flink_agents.examples.quickstart.agents.custom_types_and_resources import ( + AspectResponse, + SummaryResponse, +) OLLAMA_MODEL = os.environ.get("PARALLEL_CHAT_OLLAMA_MODEL", "qwen3:1.7b") @@ -51,27 +52,6 @@ ) -class AspectResponse(BaseModel): - """LLM response for a single aspect judgment.""" - - aspect: str - result: str - - -class SummaryResponse(BaseModel): - """LLM response for the aggregation phase.""" - - summary: str - - -class ParallelChatKeySelector(KeySelector): - """Key selector that extracts the id field from the input row.""" - - def get_key(self, value: Row) -> int: - """Extract key from row.""" - return value[0] - - def _init_row(event: Event) -> Dict[str, Any]: """Build a row skeleton from the InputEvent.""" payload = InputEvent.from_event(event).input @@ -124,7 +104,7 @@ def _build_summarize_request(row: Dict[str, Any]) -> ChatRequestEvent: def _build_output_event(row: Dict[str, Any], parsed: SummaryResponse) -> OutputEvent: """Pack row fields and summary into the final OutputEvent.""" return OutputEvent( - output=Row(id=row["id"], text=row["text"], summary=parsed.summary) + output={"id": row["id"], "text": row["text"], "summary": parsed.summary} ) @@ -186,10 +166,8 @@ def handle_response(event: Event, ctx: RunnerContext) -> None: parsed = _parse_response(event) row = _load_row(ctx) if _is_final(parsed): - print(f"FINAL summary={parsed.summary} (t={time.monotonic():.3f})") ctx.send_event(_build_output_event(row, parsed)) return - print(f"ASPECT {parsed.aspect}={parsed.result} (t={time.monotonic():.3f})") row["sentiments"][parsed.aspect] = parsed.result _save_row(ctx, row) if _all_aspects_received(row): diff --git a/python/flink_agents/examples/quickstart/parallel_chat_request_example.py b/python/flink_agents/examples/quickstart/parallel_chat_request_example.py index e2f025e5a..083709aa1 100644 --- a/python/flink_agents/examples/quickstart/parallel_chat_request_example.py +++ b/python/flink_agents/examples/quickstart/parallel_chat_request_example.py @@ -15,24 +15,13 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################# -import json -import os -import sysconfig -import time -from pathlib import Path - -from pyflink.common.typeinfo import BasicTypeInfo, ExternalTypeInfo, RowTypeInfo from pyflink.datastream import StreamExecutionEnvironment -from pyflink.table import DataTypes, Schema, StreamTableEnvironment, TableDescriptor from flink_agents.api.execution_environment import AgentsExecutionEnvironment from flink_agents.api.resource import ResourceDescriptor, ResourceName, ResourceType from flink_agents.examples.quickstart.agents.parallel_chat_agent import ( INPUT_TEXT, - N_ASPECTS, - OLLAMA_MODEL, ParallelChatAgent, - ParallelChatKeySelector, ) @@ -46,12 +35,9 @@ def main() -> None: with Flink streaming jobs. """ # Set up the Flink streaming environment and the Agents execution environment. - stream_env = StreamExecutionEnvironment.get_execution_environment() - stream_env.set_parallelism(1) - t_env = StreamTableEnvironment.create(stream_execution_environment=stream_env) - agents_env = AgentsExecutionEnvironment.get_execution_environment( - env=stream_env, t_env=t_env - ) + env = StreamExecutionEnvironment.get_execution_environment() + env.set_parallelism(1) + agents_env = AgentsExecutionEnvironment.get_execution_environment(env) # Add Ollama chat model connection to be used by the ParallelChatAgent. agents_env.add_resource( @@ -63,73 +49,25 @@ def main() -> None: ), ) - # Create input table with a single restaurant review. - input_table = t_env.from_elements( - elements=[(1, INPUT_TEXT)], - schema=DataTypes.ROW( - [ - DataTypes.FIELD("id", DataTypes.INT()), - DataTypes.FIELD("text", DataTypes.STRING()), - ] - ), - ) - - # Define output schema and type info. - output_type_info = RowTypeInfo( - [ - BasicTypeInfo.INT_TYPE_INFO(), - BasicTypeInfo.STRING_TYPE_INFO(), - BasicTypeInfo.STRING_TYPE_INFO(), - ], - ["id", "text", "summary"], - ) - output_type = ExternalTypeInfo(output_type_info) - output_schema = ( - Schema.new_builder() - .column("id", DataTypes.INT()) - .column("text", DataTypes.STRING()) - .column("summary", DataTypes.STRING()) - .build() - ) - - # Register a filesystem sink for collecting results. - result_dir = Path("/tmp/parallel_chat_results") - result_dir.mkdir(parents=True, exist_ok=True) - - t_env.create_temporary_table( - "sink", - TableDescriptor.for_connector("filesystem") - .option("path", str(result_dir.absolute())) - .format("json") - .schema(output_schema) - .build(), + # Create input stream with a single restaurant review. + input_stream = env.from_collection( + collection=[{"id": 1, "text": INPUT_TEXT}], ) # Use the ParallelChatAgent to analyze the review with parallel LLM calls. - output_table = ( - agents_env.from_table(input=input_table, key_selector=ParallelChatKeySelector()) + output_stream = ( + agents_env.from_datastream( + input=input_stream, key_selector=lambda x: x["id"] + ) .apply(ParallelChatAgent()) - .to_table(schema=output_schema, output_type=output_type) + .to_datastream() ) - # Execute the Flink pipeline. - wall_start = time.monotonic() - output_table.execute_insert("sink").wait() - wall_elapsed = time.monotonic() - wall_start - # Print the analysis results to stdout. - rows = [] - for file in result_dir.iterdir(): - if file.is_file(): - with file.open() as f: - for line in f: - line = line.strip() - if line: - rows.append(json.loads(line)) + output_stream.print() - print(f"OUTPUT rows: {rows}") - print(f"End-to-end wall time: {wall_elapsed:.2f}s") - print("=== Done ===") + # Execute the Flink pipeline. + agents_env.execute() if __name__ == "__main__": From d07cc6c97f214a45a10f3f83b86c37f5bda42571 Mon Sep 17 00:00:00 2001 From: Haocong Wang <75206183+Ryan-Nightwish@users.noreply.github.com> Date: Wed, 10 Jun 2026 15:22:15 +0800 Subject: [PATCH 04/10] [Feature] Fix spotless code style violations in parallel LLM examples Remove redundant same-package imports in ParallelChatAgent.java and unused import in ParallelChatRequestExample.java. Use fully-qualified CustomTypesAndResources.XxxType references with proper line breaks to satisfy the project line-width constraint. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../examples/ParallelChatRequestExample.java | 1 - .../examples/agents/ParallelChatAgent.java | 18 ++++++++++++------ 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/examples/src/main/java/org/apache/flink/agents/examples/ParallelChatRequestExample.java b/examples/src/main/java/org/apache/flink/agents/examples/ParallelChatRequestExample.java index c84486131..edc540508 100644 --- a/examples/src/main/java/org/apache/flink/agents/examples/ParallelChatRequestExample.java +++ b/examples/src/main/java/org/apache/flink/agents/examples/ParallelChatRequestExample.java @@ -21,7 +21,6 @@ import org.apache.flink.agents.api.resource.ResourceDescriptor; import org.apache.flink.agents.api.resource.ResourceName; import org.apache.flink.agents.api.resource.ResourceType; -import org.apache.flink.agents.examples.agents.CustomTypesAndResources; import org.apache.flink.agents.examples.agents.CustomTypesAndResources.SentimentKeySelector; import org.apache.flink.agents.examples.agents.CustomTypesAndResources.SentimentRequest; import org.apache.flink.agents.examples.agents.ParallelChatAgent; diff --git a/examples/src/main/java/org/apache/flink/agents/examples/agents/ParallelChatAgent.java b/examples/src/main/java/org/apache/flink/agents/examples/agents/ParallelChatAgent.java index b070c9a3d..e63e0e854 100644 --- a/examples/src/main/java/org/apache/flink/agents/examples/agents/ParallelChatAgent.java +++ b/examples/src/main/java/org/apache/flink/agents/examples/agents/ParallelChatAgent.java @@ -86,7 +86,8 @@ public static ResourceDescriptor sentimentModel() { private static Map initRow(Event event) { InputEvent inputEvent = InputEvent.fromEvent(event); - CustomTypesAndResources.SentimentRequest request = (CustomTypesAndResources.SentimentRequest) inputEvent.getInput(); + CustomTypesAndResources.SentimentRequest request = + (CustomTypesAndResources.SentimentRequest) inputEvent.getInput(); Map row = new HashMap<>(); row.put("id", request.getId()); row.put("text", request.getText()); @@ -110,7 +111,8 @@ private static ChatRequestEvent buildAspectRequest(String text, String aspect) { new ChatMessage( MessageRole.USER, "Judge the \"" + aspect + "\" dimension: " + text)); - return new ChatRequestEvent("sentimentModel", messages, CustomTypesAndResources.AspectResponse.class); + return new ChatRequestEvent( + "sentimentModel", messages, CustomTypesAndResources.AspectResponse.class); } @SuppressWarnings("unchecked") @@ -125,10 +127,12 @@ private static ChatRequestEvent buildSummarizeRequest(Map row) { List.of( new ChatMessage(MessageRole.SYSTEM, AGGREGATE_SYSTEM_PROMPT), new ChatMessage(MessageRole.USER, body)); - return new ChatRequestEvent("sentimentModel", messages, CustomTypesAndResources.SummaryResponse.class); + return new ChatRequestEvent( + "sentimentModel", messages, CustomTypesAndResources.SummaryResponse.class); } - private static OutputEvent buildOutputEvent(Map row, CustomTypesAndResources.SummaryResponse parsed) { + private static OutputEvent buildOutputEvent( + Map row, CustomTypesAndResources.SummaryResponse parsed) { Map output = new HashMap<>(); output.put("id", row.get("id")); output.put("text", row.get("text")); @@ -166,12 +170,14 @@ public static void handleResponse(Event event, RunnerContext ctx) throws Excepti Map row = loadRow(ctx); if (parsed instanceof CustomTypesAndResources.SummaryResponse) { - CustomTypesAndResources.SummaryResponse summary = (CustomTypesAndResources.SummaryResponse) parsed; + CustomTypesAndResources.SummaryResponse summary = + (CustomTypesAndResources.SummaryResponse) parsed; ctx.sendEvent(buildOutputEvent(row, summary)); return; } - CustomTypesAndResources.AspectResponse aspect = (CustomTypesAndResources.AspectResponse) parsed; + CustomTypesAndResources.AspectResponse aspect = + (CustomTypesAndResources.AspectResponse) parsed; Map sentiments = (Map) row.get("sentiments"); sentiments.put(aspect.aspect, aspect.result); saveRow(ctx, row); From bba0ac240e4e3d3867be1cca666d2e0d7fd4c942 Mon Sep 17 00:00:00 2001 From: Haocong Wang <75206183+Ryan-Nightwish@users.noreply.github.com> Date: Fri, 12 Jun 2026 12:19:51 +0800 Subject: [PATCH 05/10] =?UTF-8?q?[Feature]=20Address=20reviewer=20feedback?= =?UTF-8?q?=20for=20parallel=20LLM=20quickstart=20example=20-=20Fix=20aspe?= =?UTF-8?q?ct=20correlation:=20pre-build=20all=20ChatRequestEvents=20and?= =?UTF-8?q?=20record=20a=20=20=20{request=5Fid=20=E2=86=92=20aspect}=20map?= =?UTF-8?q?=20at=20dispatch=20time;=20look=20up=20the=20dispatched=20=20?= =?UTF-8?q?=20aspect=20via=20ChatResponseEvent.request=5Fid=20instead=20of?= =?UTF-8?q?=20relying=20on=20the=20=20=20model=20to=20echo=20back=20the=20?= =?UTF-8?q?correct=20dimension=20label=20-=20Inline=20short=20helpers=20(?= =?UTF-8?q?=5Fsave=5Frow/=5Fload=5Frow,=20parseResponse/saveRow=20in=20=20?= =?UTF-8?q?=20Java)=20directly=20into=20action=20methods=20with=20brief=20?= =?UTF-8?q?explanatory=20comments=20-=20Clean=20up=20code=20style:=20inden?= =?UTF-8?q?tation,=20blank=20lines,=20zip=20strict=3DTrue,=20=20=20remove?= =?UTF-8?q?=20dead=20code,=20align=20AGGREGATE=5FSYSTEM=5FPROMPT=20formatt?= =?UTF-8?q?ing=20-=20Update=20"Create=20the=20Agent"=20doc=20section=20to?= =?UTF-8?q?=20match=20current=20code:=20expose=20=20=20prompts=20and=20key?= =?UTF-8?q?=20helpers=20inline,=20reference=20full=20source=20for=20omitte?= =?UTF-8?q?d=20=20=20supporting=20functions?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Claude Sonnet 4.6 --- .../get-started/quickstart/parallel_llm.md | 166 +++++++++++++++--- .../examples/agents/ParallelChatAgent.java | 45 ++--- .../agents/custom_types_and_resources.py | 1 - .../quickstart/agents/parallel_chat_agent.py | 74 ++++---- 4 files changed, 193 insertions(+), 93 deletions(-) diff --git a/docs/content/docs/get-started/quickstart/parallel_llm.md b/docs/content/docs/get-started/quickstart/parallel_llm.md index a0c77ca4d..3860c114d 100644 --- a/docs/content/docs/get-started/quickstart/parallel_llm.md +++ b/docs/content/docs/get-started/quickstart/parallel_llm.md @@ -1,6 +1,6 @@ --- title: 'Parallel LLM Calls' -weight: 3 +weight: 4 type: docs ---