diff --git a/docs/content/docs/get-started/quickstart/parallel_llm.md b/docs/content/docs/get-started/quickstart/parallel_llm.md new file mode 100644 index 000000000..5a4b75c09 --- /dev/null +++ b/docs/content/docs/get-started/quickstart/parallel_llm.md @@ -0,0 +1,480 @@ +--- +title: 'Parallel LLM Calls' +weight: 4 +type: docs +--- + + +## Overview + +Flink Agents supports parallel LLM invocations via a broadcast event model. By emitting a single intermediate event from one action and registering multiple independent action handlers that each listen on that event type, the framework's built-in chat action executes the corresponding LLM calls concurrently — no external orchestration is required. + +This quickstart introduces an example that demonstrates how to build a parallel LLM workflow with Flink Agents: + +The **Parallel Sentiment Analysis** agent processes a restaurant review and judges sentiment along two dimensions (taste / service) in parallel, then aggregates the results into a one-line summary with a final LLM call. The end-to-end wall clock time is roughly "slowest single branch + aggregation call", rather than the sum of all three calls. + +{{< hint info >}} +**JDK version note (Java only):** On JDK 21+, the framework uses the Continuation API to execute concurrent chat actions in parallel. On JDK < 21, the framework silently falls back to sequential execution — the result is identical, but the LLM calls run one after another. Python uses native coroutines and always executes in parallel regardless of the JDK version. +{{< /hint >}} + +## Code Walkthrough + +### Prepare Agents Execution Environment + +Create the agents execution environment, and register the available chat model connection to the environment. + +{{< tabs "Prepare Agents Execution Environment" >}} + +{{< tab "Python" >}} +```python +# Set up the Flink streaming environment and the Agents execution environment. +env = StreamExecutionEnvironment.get_execution_environment() +env.set_parallelism(1) +agents_env = AgentsExecutionEnvironment.get_execution_environment(env) + +# Add Ollama chat model connection to be used by the ParallelChatAgent. +agents_env.add_resource( + "ollama_server", + ResourceType.CHAT_MODEL_CONNECTION, + ResourceDescriptor( + clazz=ResourceName.ChatModel.OLLAMA_CONNECTION, + request_timeout=240.0, + ), +) +``` +{{< /tab >}} + +{{< tab "Java" >}} +```Java +// Set up the Flink streaming environment and the Agents execution environment. +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +env.setParallelism(1); +AgentsExecutionEnvironment agentsEnv = + AgentsExecutionEnvironment.getExecutionEnvironment(env); + +// Add Ollama chat model connection to be used by the ParallelChatAgent. +agentsEnv.addResource( + "ollamaChatModelConnection", + ResourceType.CHAT_MODEL_CONNECTION, + ResourceDescriptor.Builder.newBuilder(ResourceName.ChatModel.OLLAMA_CONNECTION) + .addInitialArgument("endpoint", "http://localhost:11434") + .addInitialArgument("requestTimeout", 240) + .build()); +``` +{{< /tab >}} + +{{< /tabs >}} + +### Create the Agent + +Below is the example code for the `ParallelChatAgent`. Two system prompts — `PARALLEL_SYSTEM_PROMPT` for the parallel aspect judgments and `AGGREGATE_SYSTEM_PROMPT` for the aggregation call — are packaged into `ChatRequestEvent`s by `_build_aspect_request` / `buildAspectRequest` and `_build_summarize_request` / `buildSummarizeRequest`. The agent defines a chat model and four actions: `request_aspect_judgments` stores the review `id` and `text` in sensory memory and emits a single generic `Event` of type `SENTIMENT_INPUT_EVENT_TYPE`; `handle_taste_input` and `handle_service_input` each listen on that event type and independently dispatch one `ChatRequestEvent`, recording a `{request_id → aspect}` entry using path-based memory access (`aspect_map.`); and `handle_response` looks up the dispatched aspect, stores the result under `sentiments.`, and triggers aggregation once all aspects are collected. + +{{< tabs "Create the Agent" >}} + +{{< tab "Python" >}} +```python +ASPECTS: tuple = ("taste", "service") +N_ASPECTS = len(ASPECTS) +SENTIMENT_INPUT_EVENT_TYPE = "SentimentInputEvent" + +PARALLEL_SYSTEM_PROMPT = ( + "You are a sentiment analysis assistant. Return JSON: " + '{"aspect":"", "result":""}' + " — no explanation, no extra fields." +) +AGGREGATE_SYSTEM_PROMPT = ( + "You are a summary assistant. Based on the sentiment judgments for two " + "dimensions, compose a brief one-line evaluation. Return JSON: " + '{"summary":"taste:, ' + 'service:"} — return only this JSON.' +) + + +def _build_aspect_request(text: str, aspect: str) -> ChatRequestEvent: + """Build a ChatRequestEvent for a single aspect dimension.""" + return ChatRequestEvent( + model="sentiment_model", + messages=[ + ChatMessage(role=MessageRole.SYSTEM, content=PARALLEL_SYSTEM_PROMPT), + ChatMessage( + role=MessageRole.USER, + content=f'Judge the "{aspect}" dimension: {text}', + ), + ], + output_schema=OutputSchema(output_schema=AspectResponse), + ) + + +def _build_summarize_request(text: str, sentiments: Dict[str, str]) -> ChatRequestEvent: + """Build a ChatRequestEvent for the aggregation phase.""" + body = ( + f"Original: {text}\n" + + "Judgments: " + + " ".join(f"{a}:{sentiments[a]}" for a in ASPECTS) + ) + return ChatRequestEvent( + model="sentiment_model", + messages=[ + ChatMessage(role=MessageRole.SYSTEM, content=AGGREGATE_SYSTEM_PROMPT), + ChatMessage(role=MessageRole.USER, content=body), + ], + output_schema=OutputSchema(output_schema=SummaryResponse), + ) + + +class ParallelChatAgent(Agent): + """An agent that demonstrates parallel LLM invocations via fan-out of + multiple ChatRequestEvent events. + + This agent receives a restaurant review and uses an LLM to judge sentiment + along multiple dimensions in parallel, then aggregates the results into a + one-line summary with a final LLM call. It handles prompt construction, + parallel chat dispatch, response accumulation, and output assembly. + + Event flow: + 1. InputEvent → request_aspect_judgments → emits SentimentInputEvent + 2. SentimentInputEvent triggers handlers in parallel: + - handle_taste_input → ChatRequestEvent (taste LLM call) + - handle_service_input → ChatRequestEvent (service LLM call) + 3. Each ChatResponseEvent → handle_response (accumulates aspect results) + 4. Once all aspects received → aggregation LLM call → OutputEvent + """ + + @chat_model_setup + @staticmethod + def sentiment_model() -> ResourceDescriptor: + """ChatModel for sentiment analysis.""" + return ResourceDescriptor( + clazz=ResourceName.ChatModel.OLLAMA_SETUP, + connection="ollama_server", + model=OLLAMA_MODEL, + extract_reasoning=True, + ) + + @action(InputEvent.EVENT_TYPE) + @staticmethod + def request_aspect_judgments(event: Event, ctx: RunnerContext) -> None: + """Process input event and dispatch SentimentInputEvent to aspect handlers.""" + payload = InputEvent.from_event(event).input + # Primitive types (int, str) cross the Pemja JVM boundary without serialization. + ctx.sensory_memory.set("id", payload["id"]) + ctx.sensory_memory.set("text", payload["text"]) + ctx.send_event( + Event( + type=SENTIMENT_INPUT_EVENT_TYPE, + attributes={"input_id": payload["id"], "text": payload["text"]}, + ) + ) + + @action(SENTIMENT_INPUT_EVENT_TYPE) + @staticmethod + def handle_taste_input(event: Event, ctx: RunnerContext) -> None: + """Handle taste aspect: build and send ChatRequestEvent for taste judgment.""" + req = _build_aspect_request(event.get_attr("text"), "taste") + ctx.sensory_memory.set(f"aspect_map.{req.id}", "taste") + ctx.send_event(req) + + @action(SENTIMENT_INPUT_EVENT_TYPE) + @staticmethod + def handle_service_input(event: Event, ctx: RunnerContext) -> None: + """Handle service aspect: build and send ChatRequestEvent for service.""" + req = _build_aspect_request(event.get_attr("text"), "service") + ctx.sensory_memory.set(f"aspect_map.{req.id}", "service") + ctx.send_event(req) + + @action(ChatResponseEvent.EVENT_TYPE) + @staticmethod + def handle_response(event: Event, ctx: RunnerContext) -> None: + """Process chat response event and send output event.""" + ... +``` + +The complete source including `_build_output_event` and other supporting functions can be found in [`parallel_chat_agent.py`](https://github.com/apache/flink-agents/blob/main/python/flink_agents/examples/quickstart/agents/parallel_chat_agent.py). +{{< /tab >}} + +{{< tab "Java" >}} +```Java +/** + * An agent that demonstrates parallel LLM invocations via a broadcast event model. + * + *

Event flow: + *

    + *
  1. InputEvent → requestAspectJudgments → emits SentimentInputEvent + *
  2. SentimentInputEvent triggers handlers in parallel: + *
      + *
    • handleTasteInput → ChatRequestEvent (taste LLM call) + *
    • handleServiceInput → ChatRequestEvent (service LLM call) + *
    + *
  3. Each ChatResponseEvent → handleResponse (accumulates aspect results) + *
  4. Once all aspects received → aggregation LLM call → OutputEvent + *
+ */ +public class ParallelChatAgent extends Agent { + + private static final String[] ASPECTS = {"taste", "service"}; + + private static final String SENTIMENT_INPUT_EVENT_TYPE = "SentimentInputEvent"; + + private static final String PARALLEL_SYSTEM_PROMPT = + "You are a sentiment analysis assistant. Return JSON: " + + "{\"aspect\":\"\", \"result\":\"\"}" + + " — no explanation, no extra fields."; + private static final String AGGREGATE_SYSTEM_PROMPT = + "You are a summary assistant. Based on the sentiment judgments for two " + + "dimensions, compose a brief one-line evaluation. Return JSON: " + + "{\"summary\":\"taste:, " + + "service:\"} — return only this JSON."; + + @ChatModelSetup + public static ResourceDescriptor sentimentModel() { + return ResourceDescriptor.Builder.newBuilder(ResourceName.ChatModel.OLLAMA_SETUP) + .addInitialArgument("connection", "ollamaChatModelConnection") + .addInitialArgument("model", OLLAMA_MODEL) + .addInitialArgument("extract_reasoning", true) + .build(); + } + + private static ChatRequestEvent buildAspectRequest(String text, String aspect) { + List messages = + List.of( + new ChatMessage(MessageRole.SYSTEM, PARALLEL_SYSTEM_PROMPT), + new ChatMessage( + MessageRole.USER, + "Judge the \"" + aspect + "\" dimension: " + text)); + return new ChatRequestEvent( + "sentimentModel", messages, CustomTypesAndResources.AspectResponse.class); + } + + private static ChatRequestEvent buildSummarizeRequest( + String text, Map sentiments) { + StringJoiner sj = new StringJoiner(" "); + for (String aspect : ASPECTS) { + sj.add(aspect + ":" + sentiments.get(aspect)); + } + String body = "Original: " + text + "\nJudgments: " + sj; + List messages = + List.of( + new ChatMessage(MessageRole.SYSTEM, AGGREGATE_SYSTEM_PROMPT), + new ChatMessage(MessageRole.USER, body)); + return new ChatRequestEvent( + "sentimentModel", messages, CustomTypesAndResources.SummaryResponse.class); + } + + /** Process input event and dispatch a SentimentInputEvent for each aspect handler. */ + @Action(listenEventTypes = {InputEvent.EVENT_TYPE}) + public static void requestAspectJudgments(Event event, RunnerContext ctx) throws Exception { + CustomTypesAndResources.SentimentRequest request = + (CustomTypesAndResources.SentimentRequest) InputEvent.fromEvent(event).getInput(); + ctx.getSensoryMemory().set("id", request.getId()); + ctx.getSensoryMemory().set("text", request.getText()); + ctx.sendEvent( + new Event( + SENTIMENT_INPUT_EVENT_TYPE, + Map.of("input_id", request.getId(), "text", request.getText()))); + } + + /** Handle taste aspect: build and send ChatRequestEvent for taste judgment. */ + @Action(listenEventTypes = {SENTIMENT_INPUT_EVENT_TYPE}) + public static void handleTasteInput(Event event, RunnerContext ctx) throws Exception { + ChatRequestEvent req = buildAspectRequest((String) event.getAttr("text"), "taste"); + ctx.getSensoryMemory().set("aspect_map." + req.getId(), "taste"); + ctx.sendEvent(req); + } + + /** Handle service aspect: build and send ChatRequestEvent for service judgment. */ + @Action(listenEventTypes = {SENTIMENT_INPUT_EVENT_TYPE}) + public static void handleServiceInput(Event event, RunnerContext ctx) throws Exception { + ChatRequestEvent req = buildAspectRequest((String) event.getAttr("text"), "service"); + ctx.getSensoryMemory().set("aspect_map." + req.getId(), "service"); + ctx.sendEvent(req); + } + + @Action(listenEventTypes = {ChatResponseEvent.EVENT_TYPE}) + public static void handleResponse(Event event, RunnerContext ctx) throws Exception { + ... + } +} +``` + +Other private helpers (`buildOutputEvent`) are omitted above; the full source is at [`ParallelChatAgent.java`](https://github.com/apache/flink-agents/blob/main/examples/src/main/java/org/apache/flink/agents/examples/agents/ParallelChatAgent.java). +{{< /tab >}} + +{{< /tabs >}} + +{{< hint warning >}} +**Constraints when using parallel actions:** +- **Dispatch action must exit immediately after sending events.** The action that emits `SentimentInputEvent` (i.e. `request_aspect_judgments`) should return right after calling `ctx.send_event(...)`. Do not `await`, block, or perform further business logic in the same action — the framework needs control back to schedule the parallel chat actions. +- **Action handlers on the same key execute sequentially.** Multiple action invocations triggered by the same key are processed one at a time, not concurrently. This means `handle_taste_input` and `handle_service_input` are called in sequence. Each handler writes a single path-based entry (`aspect_map.`) to sensory memory without reading back, so there is no conflict between the two. The actual LLM calls dispatched by these handlers still execute in parallel. +- **Response handlers on the same key execute sequentially.** Multiple `ChatResponseEvent` triggers for the same key are processed one at a time, not concurrently. This means the path-based writes to `sentiments.` in `handle_response` are safe and will not suffer from concurrent overwrites. +{{< /hint >}} + +### Integrate the Agent with Flink + +Create the input data, use the `ParallelChatAgent` to analyze the review with parallel LLM calls, and print the results. + +{{< tabs "Integrate the Agent with Flink" >}} + +{{< tab "Python" >}} +```python +# Create input stream with a single restaurant review. +input_stream = env.from_collection( + collection=[{"id": 1, "text": INPUT_TEXT}], +) + +# Use the ParallelChatAgent to analyze the review with parallel LLM calls. +output_stream = ( + agents_env.from_datastream( + input=input_stream, key_selector=lambda x: x["id"] + ) + .apply(ParallelChatAgent()) + .to_datastream() +) + +# Print the analysis results to stdout. +output_stream.print() + +# Execute the Flink pipeline. +agents_env.execute() +``` +{{< /tab >}} + +{{< tab "Java" >}} +```Java +// Create input stream with a single restaurant review. +DataStream inputStream = + env.fromElements(new SentimentRequest(1, ParallelChatAgent.INPUT_TEXT)); + +// Use the ParallelChatAgent to analyze the review with parallel LLM calls. +DataStream outputStream = + agentsEnv + .fromDataStream(inputStream, new SentimentKeySelector()) + .apply(new ParallelChatAgent()) + .toDataStream(); + +// Print the analysis results to stdout. +outputStream.print(); + +// Execute the Flink pipeline. +agentsEnv.execute(); +``` +{{< /tab >}} + +{{< /tabs >}} + +## Run the Example + +### Prerequisites + +* Unix-like environment (we use Linux, Mac OS X, Cygwin, WSL) +* Git +* Java 11+ (Java 21+ recommended for parallel execution on the Java side) +* Python 3.10, 3.11 or 3.12 + +### Preparation + +#### Prepare Flink and Flink Agents + +Follow the [installation]({{< ref "docs/get-started/installation" >}}) instructions to setup Flink and the Flink Agents. + +#### Clone the Flink Agents Repository (if not done already) + +```bash +git clone https://github.com/apache/flink-agents.git +cd flink-agents +``` +{{< hint info >}} +For python examples, you can skip this step and submit the python file in installed flink-agents wheel. +{{< /hint >}} + +#### Deploy a Standalone Flink Cluster + +You can deploy a standalone Flink cluster in your local environment with the following command. + +{{< tabs "Deploy a Standalone Flink Cluster" >}} + +{{< tab "Python" >}} +```bash +export PYTHONPATH=$(python -c 'import sysconfig; print(sysconfig.get_paths()["purelib"])') +$FLINK_HOME/bin/start-cluster.sh +``` +{{< /tab >}} + +{{< tab "Java" >}} +1. Build Flink Agents from source to generate example jar. See [installation]({{< ref "docs/get-started/installation" >}}) for more details. +2. Start the Flink cluster + ```bash + $FLINK_HOME/bin/start-cluster.sh + ``` + +{{< hint info >}} +To run example on JDK 21+, append jvm option `--add-exports=java.base/jdk.internal.vm=ALL-UNNAMED` to [env.java.opts.all](https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/config/#env-java-opts-all) in `$FLINK_HOME/conf/config.yaml` before start the flink cluster. +{{< /hint >}} +{{< /tab >}} + +{{< /tabs >}} +You can refer to the [local cluster](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/try-flink/local_installation/#starting-and-stopping-a-local-cluster) instructions for more detailed step. + +{{< hint info >}} +If you can't navigate to the web UI at [localhost:8081](localhost:8081), you can find the reason in `$FLINK_HOME/log`. If the reason is port conflict, you can change the port in `$FLINK_HOME/conf/config.yaml`. +{{< /hint >}} + +#### Prepare Ollama + +Download and install Ollama from the official [website](https://ollama.com/download). + +{{< hint info >}} +Ollama server **0.9.0** or higher is required. +{{< /hint >}} + +Then pull the qwen3:1.7b model, which is required by the parallel LLM example + +```bash +ollama pull qwen3:1.7b +``` + +### Submit Flink Agents Job to Standalone Flink Cluster + +#### Submit to Flink Cluster + +{{< tabs "Submit to Flink Cluster" >}} + +{{< tab "Python" >}} +```bash +export PYTHONPATH=$(python -c 'import sysconfig; print(sysconfig.get_paths()["purelib"])') + +# Run parallel chat request example +$FLINK_HOME/bin/flink run -py ./flink-agents/python/flink_agents/examples/quickstart/parallel_chat_request_example.py +# or submit the example python file in installed flink-agents wheel +$FLINK_HOME/bin/flink run -py $PYTHONPATH/flink_agents/examples/quickstart/parallel_chat_request_example.py +``` +{{< /tab >}} + +{{< tab "Java" >}} +```bash +$FLINK_HOME/bin/flink run -c org.apache.flink.agents.examples.ParallelChatRequestExample ./flink-agents/examples/target/flink-agents-examples-$VERSION.jar +``` +{{< /tab >}} + +{{< /tabs >}} + +Now you should see a Flink job submitted to the Flink Cluster in Flink web UI [localhost:8081]( +localhost:8081) + +After a few minutes, you can check for the output in the TaskManager output log. diff --git a/examples/src/main/java/org/apache/flink/agents/examples/ParallelChatRequestExample.java b/examples/src/main/java/org/apache/flink/agents/examples/ParallelChatRequestExample.java new file mode 100644 index 000000000..edc540508 --- /dev/null +++ b/examples/src/main/java/org/apache/flink/agents/examples/ParallelChatRequestExample.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.agents.examples; + +import org.apache.flink.agents.api.AgentsExecutionEnvironment; +import org.apache.flink.agents.api.resource.ResourceDescriptor; +import org.apache.flink.agents.api.resource.ResourceName; +import org.apache.flink.agents.api.resource.ResourceType; +import org.apache.flink.agents.examples.agents.CustomTypesAndResources.SentimentKeySelector; +import org.apache.flink.agents.examples.agents.CustomTypesAndResources.SentimentRequest; +import org.apache.flink.agents.examples.agents.ParallelChatAgent; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +/** + * Java example demonstrating parallel LLM invocations via multi-action fan-out. + * + *

This example demonstrates how to use the Flink Agents to analyze a restaurant review by + * fanning out multiple parallel LLM calls — one per sentiment dimension — and aggregating the + * results with a final LLM call. This serves as a minimal, end-to-end example of integrating + * parallel LLM-powered agents with Flink streaming jobs. + */ +public class ParallelChatRequestExample { + + /** Runs the example pipeline. */ + public static void main(String[] args) throws Exception { + // Set up the Flink streaming environment and the Agents execution environment. + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + AgentsExecutionEnvironment agentsEnv = + AgentsExecutionEnvironment.getExecutionEnvironment(env); + + // Add Ollama chat model connection to be used by the ParallelChatAgent. + agentsEnv.addResource( + "ollamaChatModelConnection", + ResourceType.CHAT_MODEL_CONNECTION, + ResourceDescriptor.Builder.newBuilder(ResourceName.ChatModel.OLLAMA_CONNECTION) + .addInitialArgument("endpoint", "http://localhost:11434") + .addInitialArgument("requestTimeout", 240) + .build()); + + // Create input stream with a single restaurant review. + DataStream inputStream = + env.fromElements(new SentimentRequest(1, ParallelChatAgent.INPUT_TEXT)); + + // Use the ParallelChatAgent to analyze the review with parallel LLM calls. + DataStream outputStream = + agentsEnv + .fromDataStream(inputStream, new SentimentKeySelector()) + .apply(new ParallelChatAgent()) + .toDataStream(); + + // Print the analysis results to stdout. + outputStream.print(); + + // Execute the Flink pipeline. + agentsEnv.execute(); + } +} diff --git a/examples/src/main/java/org/apache/flink/agents/examples/agents/CustomTypesAndResources.java b/examples/src/main/java/org/apache/flink/agents/examples/agents/CustomTypesAndResources.java index 2fb1da8c7..2447fa185 100644 --- a/examples/src/main/java/org/apache/flink/agents/examples/agents/CustomTypesAndResources.java +++ b/examples/src/main/java/org/apache/flink/agents/examples/agents/CustomTypesAndResources.java @@ -26,7 +26,9 @@ import org.apache.flink.agents.api.prompt.Prompt; import org.apache.flink.agents.api.resource.ResourceDescriptor; import org.apache.flink.agents.api.resource.ResourceName; +import org.apache.flink.api.java.functions.KeySelector; +import java.io.Serializable; import java.util.Arrays; import java.util.List; @@ -224,6 +226,66 @@ public String toString() { } } + /** Single input record carrying an id and text for parallel sentiment analysis. */ + public static class SentimentRequest implements Serializable { + private static final long serialVersionUID = 1L; + private final int id; + private final String text; + + public SentimentRequest(int id, String text) { + this.id = id; + this.text = text; + } + + public int getId() { + return id; + } + + public String getText() { + return text; + } + + @Override + public String toString() { + return String.format("SentimentRequest{id=%d, text='%s'}", id, text); + } + } + + /** Key selector that extracts the id field from a {@link SentimentRequest}. */ + public static class SentimentKeySelector implements KeySelector { + @Override + public Integer getKey(SentimentRequest request) { + return request.getId(); + } + } + + /** LLM response for a single aspect judgment. */ + public static class AspectResponse implements Serializable { + private static final long serialVersionUID = 1L; + public String aspect; + public String result; + + public AspectResponse() {} + + @Override + public String toString() { + return String.format("AspectResponse{aspect='%s', result='%s'}", aspect, result); + } + } + + /** LLM response for the aggregation phase. */ + public static class SummaryResponse implements Serializable { + private static final long serialVersionUID = 1L; + public String summary; + + public SummaryResponse() {} + + @Override + public String toString() { + return String.format("SummaryResponse{summary='%s'}", summary); + } + } + /** Provides a summary of review data including suggestions for improvement. */ @JsonSerialize @JsonDeserialize diff --git a/examples/src/main/java/org/apache/flink/agents/examples/agents/ParallelChatAgent.java b/examples/src/main/java/org/apache/flink/agents/examples/agents/ParallelChatAgent.java new file mode 100644 index 000000000..24c85214d --- /dev/null +++ b/examples/src/main/java/org/apache/flink/agents/examples/agents/ParallelChatAgent.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.agents.examples.agents; + +import org.apache.flink.agents.api.Event; +import org.apache.flink.agents.api.InputEvent; +import org.apache.flink.agents.api.OutputEvent; +import org.apache.flink.agents.api.agents.Agent; +import org.apache.flink.agents.api.annotation.Action; +import org.apache.flink.agents.api.annotation.ChatModelSetup; +import org.apache.flink.agents.api.chat.messages.ChatMessage; +import org.apache.flink.agents.api.chat.messages.MessageRole; +import org.apache.flink.agents.api.context.RunnerContext; +import org.apache.flink.agents.api.event.ChatRequestEvent; +import org.apache.flink.agents.api.event.ChatResponseEvent; +import org.apache.flink.agents.api.resource.ResourceDescriptor; +import org.apache.flink.agents.api.resource.ResourceName; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.StringJoiner; + +import static org.apache.flink.agents.api.agents.Agent.STRUCTURED_OUTPUT; + +/** + * An agent that demonstrates parallel LLM invocations via fan-out of multiple {@link + * ChatRequestEvent} events. + * + *

This agent receives a restaurant review and uses an LLM to judge sentiment along multiple + * dimensions (taste / service) in parallel, then aggregates the results into a one-line summary + * with a final LLM call. It handles prompt construction, parallel chat dispatch, response + * accumulation, and output assembly. + * + *

Event flow: + * + *

    + *
  1. InputEvent → requestAspectJudgments → emits SentimentInputEvent + *
  2. SentimentInputEvent triggers handlers in parallel: + *
      + *
    • handleTasteInput → ChatRequestEvent (taste LLM call) + *
    • handleServiceInput → ChatRequestEvent (service LLM call) + *
    + *
  3. Each ChatResponseEvent → handleResponse (accumulates aspect results) + *
  4. Once all aspects received → aggregation LLM call → OutputEvent + *
+ * + *

JDK version note: On JDK 21+, the framework uses the Continuation API to execute + * concurrent chat actions in parallel, so the wall clock time is roughly "slowest single branch + + * aggregation call". On JDK < 21, the framework silently falls back to sequential execution — + * the result is identical, but the LLM calls run one after another. + */ +public class ParallelChatAgent extends Agent { + + /** Ollama model name, configurable via environment variable. */ + public static final String OLLAMA_MODEL = + System.getenv().getOrDefault("PARALLEL_CHAT_OLLAMA_MODEL", "qwen3:1.7b"); + + /** Input text for the demo. */ + public static final String INPUT_TEXT = "The food here is great, but the service is too slow"; + + private static final String[] ASPECTS = {"taste", "service"}; + + private static final String SENTIMENT_INPUT_EVENT_TYPE = "SentimentInputEvent"; + + private static final String PARALLEL_SYSTEM_PROMPT = + "You are a sentiment analysis assistant. Return JSON: " + + "{\"aspect\":\"\", \"result\":\"\"}" + + " — no explanation, no extra fields."; + private static final String AGGREGATE_SYSTEM_PROMPT = + "You are a summary assistant. Based on the sentiment judgments for two " + + "dimensions, compose a brief one-line evaluation. Return JSON: " + + "{\"summary\":\"taste:, " + + "service:\"} — return only this JSON."; + + @ChatModelSetup + public static ResourceDescriptor sentimentModel() { + return ResourceDescriptor.Builder.newBuilder(ResourceName.ChatModel.OLLAMA_SETUP) + .addInitialArgument("connection", "ollamaChatModelConnection") + .addInitialArgument("model", OLLAMA_MODEL) + .addInitialArgument("extract_reasoning", true) + .build(); + } + + private static ChatRequestEvent buildAspectRequest(String text, String aspect) { + List messages = + List.of( + new ChatMessage(MessageRole.SYSTEM, PARALLEL_SYSTEM_PROMPT), + new ChatMessage( + MessageRole.USER, + "Judge the \"" + aspect + "\" dimension: " + text)); + return new ChatRequestEvent( + "sentimentModel", messages, CustomTypesAndResources.AspectResponse.class); + } + + private static ChatRequestEvent buildSummarizeRequest( + String text, Map sentiments) { + StringJoiner sj = new StringJoiner(" "); + for (String aspect : ASPECTS) { + sj.add(aspect + ":" + sentiments.get(aspect)); + } + String body = "Original: " + text + "\nJudgments: " + sj; + List messages = + List.of( + new ChatMessage(MessageRole.SYSTEM, AGGREGATE_SYSTEM_PROMPT), + new ChatMessage(MessageRole.USER, body)); + return new ChatRequestEvent( + "sentimentModel", messages, CustomTypesAndResources.SummaryResponse.class); + } + + private static OutputEvent buildOutputEvent( + int id, String text, CustomTypesAndResources.SummaryResponse parsed) { + Map output = new HashMap<>(); + output.put("id", id); + output.put("text", text); + output.put("summary", parsed.summary); + return new OutputEvent(output); + } + + /** Process input event and dispatch a SentimentInputEvent for each aspect handler. */ + @Action(listenEventTypes = {InputEvent.EVENT_TYPE}) + public static void requestAspectJudgments(Event event, RunnerContext ctx) throws Exception { + InputEvent inputEvent = InputEvent.fromEvent(event); + CustomTypesAndResources.SentimentRequest request = + (CustomTypesAndResources.SentimentRequest) inputEvent.getInput(); + ctx.getSensoryMemory().set("id", request.getId()); + ctx.getSensoryMemory().set("text", request.getText()); + ctx.sendEvent( + new Event( + SENTIMENT_INPUT_EVENT_TYPE, + Map.of("input_id", request.getId(), "text", request.getText()))); + } + + /** Handle taste aspect: build and send ChatRequestEvent for taste judgment. */ + @Action(listenEventTypes = {SENTIMENT_INPUT_EVENT_TYPE}) + public static void handleTasteInput(Event event, RunnerContext ctx) throws Exception { + ChatRequestEvent req = buildAspectRequest((String) event.getAttr("text"), "taste"); + ctx.getSensoryMemory().set("aspect_map." + req.getId(), "taste"); + ctx.sendEvent(req); + } + + /** Handle service aspect: build and send ChatRequestEvent for service judgment. */ + @Action(listenEventTypes = {SENTIMENT_INPUT_EVENT_TYPE}) + public static void handleServiceInput(Event event, RunnerContext ctx) throws Exception { + ChatRequestEvent req = buildAspectRequest((String) event.getAttr("text"), "service"); + ctx.getSensoryMemory().set("aspect_map." + req.getId(), "service"); + ctx.sendEvent(req); + } + + /** Process chat response event. */ + @Action(listenEventTypes = {ChatResponseEvent.EVENT_TYPE}) + public static void handleResponse(Event event, RunnerContext ctx) throws Exception { + ChatResponseEvent chatResponse = ChatResponseEvent.fromEvent(event); + Object parsed = chatResponse.getResponse().getExtraArgs().get(STRUCTURED_OUTPUT); + + if (parsed instanceof CustomTypesAndResources.SummaryResponse) { + CustomTypesAndResources.SummaryResponse summary = + (CustomTypesAndResources.SummaryResponse) parsed; + int id = (int) ctx.getSensoryMemory().get("id").getValue(); + String text = (String) ctx.getSensoryMemory().get("text").getValue(); + ctx.sendEvent(buildOutputEvent(id, text, summary)); + return; + } + + CustomTypesAndResources.AspectResponse aspectResponse = + (CustomTypesAndResources.AspectResponse) parsed; + String aspect = + (String) + ctx.getSensoryMemory() + .get("aspect_map." + chatResponse.getRequestId()) + .getValue(); + ctx.getSensoryMemory().set("sentiments." + aspect, aspectResponse.result); + boolean allReceived = true; + for (String a : ASPECTS) { + if (!ctx.getSensoryMemory().isExist("sentiments." + a)) { + allReceived = false; + break; + } + } + if (allReceived) { + String text = (String) ctx.getSensoryMemory().get("text").getValue(); + Map sentiments = new HashMap<>(); + for (String a : ASPECTS) { + sentiments.put( + a, (String) ctx.getSensoryMemory().get("sentiments." + a).getValue()); + } + ctx.sendEvent(buildSummarizeRequest(text, sentiments)); + } + } +} diff --git a/python/flink_agents/examples/quickstart/agents/custom_types_and_resources.py b/python/flink_agents/examples/quickstart/agents/custom_types_and_resources.py index 3bd463f97..ba51e78fa 100644 --- a/python/flink_agents/examples/quickstart/agents/custom_types_and_resources.py +++ b/python/flink_agents/examples/quickstart/agents/custom_types_and_resources.py @@ -196,6 +196,20 @@ class ProductReviewAnalysisRes(BaseModel): reasons: list[str] +# Custom types for parallel chat agent. +class AspectResponse(BaseModel): + """LLM response for a single aspect judgment.""" + + aspect: str + result: str + + +class SummaryResponse(BaseModel): + """LLM response for the aggregation phase.""" + + summary: str + + # ollama chat model connection descriptor ollama_server_descriptor = ResourceDescriptor( clazz=ResourceName.ChatModel.OLLAMA_CONNECTION, request_timeout=120 diff --git a/python/flink_agents/examples/quickstart/agents/parallel_chat_agent.py b/python/flink_agents/examples/quickstart/agents/parallel_chat_agent.py new file mode 100644 index 000000000..6d98cb2a1 --- /dev/null +++ b/python/flink_agents/examples/quickstart/agents/parallel_chat_agent.py @@ -0,0 +1,173 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# +import os +from typing import Dict + +from flink_agents.api.agents.agent import STRUCTURED_OUTPUT, Agent +from flink_agents.api.agents.types import OutputSchema +from flink_agents.api.chat_message import ChatMessage, MessageRole +from flink_agents.api.decorators import action, chat_model_setup +from flink_agents.api.events.chat_event import ChatRequestEvent, ChatResponseEvent +from flink_agents.api.events.event import Event, InputEvent, OutputEvent +from flink_agents.api.resource import ResourceDescriptor, ResourceName +from flink_agents.api.runner_context import RunnerContext +from flink_agents.examples.quickstart.agents.custom_types_and_resources import ( + AspectResponse, + SummaryResponse, +) + +OLLAMA_MODEL = os.environ.get("PARALLEL_CHAT_OLLAMA_MODEL", "qwen3:1.7b") + +INPUT_TEXT = "The food here is great, but the service is too slow" +SENTIMENT_INPUT_EVENT_TYPE = "SentimentInputEvent" +ASPECTS: tuple = ("taste", "service") +N_ASPECTS = len(ASPECTS) + +PARALLEL_SYSTEM_PROMPT = ( + "You are a sentiment analysis assistant. Return JSON: " + '{"aspect":"", "result":""}' + " — no explanation, no extra fields." +) +AGGREGATE_SYSTEM_PROMPT = ( + "You are a summary assistant. Based on the sentiment judgments for two " + "dimensions, compose a brief one-line evaluation. Return JSON: " + '{"summary":"taste:, ' + 'service:"} — return only this JSON.' +) + + +def _build_aspect_request(text: str, aspect: str) -> ChatRequestEvent: + """Build a ChatRequestEvent for a single aspect dimension.""" + return ChatRequestEvent( + model="sentiment_model", + messages=[ + ChatMessage(role=MessageRole.SYSTEM, content=PARALLEL_SYSTEM_PROMPT), + ChatMessage( + role=MessageRole.USER, + content=f'Judge the "{aspect}" dimension: {text}', + ), + ], + output_schema=OutputSchema(output_schema=AspectResponse), + ) + + +def _build_summarize_request(text: str, sentiments: Dict[str, str]) -> ChatRequestEvent: + """Build a ChatRequestEvent for the aggregation phase.""" + body = ( + f"Original: {text}\n" + + "Judgments: " + + " ".join(f"{a}:{sentiments[a]}" for a in ASPECTS) + ) + return ChatRequestEvent( + model="sentiment_model", + messages=[ + ChatMessage(role=MessageRole.SYSTEM, content=AGGREGATE_SYSTEM_PROMPT), + ChatMessage(role=MessageRole.USER, content=body), + ], + output_schema=OutputSchema(output_schema=SummaryResponse), + ) + + +def _build_output_event(row_id: int, text: str, parsed: SummaryResponse) -> OutputEvent: + """Build the final OutputEvent from the aggregated row.""" + return OutputEvent(output={"id": row_id, "text": text, "summary": parsed.summary}) + + +class ParallelChatAgent(Agent): + """An agent that demonstrates parallel LLM invocations via fan-out of + multiple ChatRequestEvent events. + + This agent receives a restaurant review and uses an LLM to judge sentiment + along multiple dimensions in parallel, then aggregates the results into a + one-line summary with a final LLM call. It handles prompt construction, + parallel chat dispatch, response accumulation, and output assembly. + + Event flow: + 1. InputEvent → request_aspect_judgments → emits SentimentInputEvent + 2. SentimentInputEvent triggers handlers in parallel: + - handle_taste_input → ChatRequestEvent (taste LLM call) + - handle_service_input → ChatRequestEvent (service LLM call) + 3. Each ChatResponseEvent → handle_response (accumulates aspect results) + 4. Once all aspects received → aggregation LLM call → OutputEvent + """ + + @chat_model_setup + @staticmethod + def sentiment_model() -> ResourceDescriptor: + """ChatModel for sentiment analysis.""" + return ResourceDescriptor( + clazz=ResourceName.ChatModel.OLLAMA_SETUP, + connection="ollama_server", + model=OLLAMA_MODEL, + extract_reasoning=True, + ) + + @action(InputEvent.EVENT_TYPE) + @staticmethod + def request_aspect_judgments(event: Event, ctx: RunnerContext) -> None: + """Process input event and dispatch SentimentInputEvent to aspect handlers.""" + payload = InputEvent.from_event(event).input + # Primitive types (int, str) cross the Pemja JVM boundary without serialization. + ctx.sensory_memory.set("id", payload["id"]) + ctx.sensory_memory.set("text", payload["text"]) + ctx.send_event( + Event( + type=SENTIMENT_INPUT_EVENT_TYPE, + attributes={"input_id": payload["id"], "text": payload["text"]}, + ) + ) + + @action(SENTIMENT_INPUT_EVENT_TYPE) + @staticmethod + def handle_taste_input(event: Event, ctx: RunnerContext) -> None: + """Handle taste aspect: build and send ChatRequestEvent for taste judgment.""" + req = _build_aspect_request(event.get_attr("text"), "taste") + ctx.sensory_memory.set(f"aspect_map.{req.id}", "taste") + ctx.send_event(req) + + @action(SENTIMENT_INPUT_EVENT_TYPE) + @staticmethod + def handle_service_input(event: Event, ctx: RunnerContext) -> None: + """Handle service aspect: build and send ChatRequestEvent for service.""" + req = _build_aspect_request(event.get_attr("text"), "service") + ctx.sensory_memory.set(f"aspect_map.{req.id}", "service") + ctx.send_event(req) + + @action(ChatResponseEvent.EVENT_TYPE) + @staticmethod + def handle_response(event: Event, ctx: RunnerContext) -> None: + """Process chat response event and send output event.""" + response_event = ChatResponseEvent.from_event(event) + parsed = response_event.response.extra_args[STRUCTURED_OUTPUT] + if isinstance(parsed, dict): + parsed = SummaryResponse(**parsed) if "summary" in parsed else AspectResponse(**parsed) + if isinstance(parsed, SummaryResponse): + ctx.send_event( + _build_output_event( + ctx.sensory_memory.get("id"), + ctx.sensory_memory.get("text"), + parsed, + ) + ) + return + aspect = ctx.sensory_memory.get(f"aspect_map.{response_event.request_id}") + ctx.sensory_memory.set(f"sentiments.{aspect}", parsed.result) + if all(ctx.sensory_memory.is_exist(f"sentiments.{a}") for a in ASPECTS): + text = ctx.sensory_memory.get("text") + sentiments = {a: ctx.sensory_memory.get(f"sentiments.{a}") for a in ASPECTS} + ctx.send_event(_build_summarize_request(text, sentiments)) diff --git a/python/flink_agents/examples/quickstart/parallel_chat_request_example.py b/python/flink_agents/examples/quickstart/parallel_chat_request_example.py new file mode 100644 index 000000000..083709aa1 --- /dev/null +++ b/python/flink_agents/examples/quickstart/parallel_chat_request_example.py @@ -0,0 +1,74 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# +from pyflink.datastream import StreamExecutionEnvironment + +from flink_agents.api.execution_environment import AgentsExecutionEnvironment +from flink_agents.api.resource import ResourceDescriptor, ResourceName, ResourceType +from flink_agents.examples.quickstart.agents.parallel_chat_agent import ( + INPUT_TEXT, + ParallelChatAgent, +) + + +def main() -> None: + """Main function for the parallel chat request quickstart example. + + This example demonstrates how to use Flink Agents to analyze a restaurant + review by fanning out multiple parallel LLM calls — one per sentiment + dimension — and aggregating the results with a final LLM call. This serves + as a minimal, end-to-end example of integrating parallel LLM-powered agents + with Flink streaming jobs. + """ + # Set up the Flink streaming environment and the Agents execution environment. + env = StreamExecutionEnvironment.get_execution_environment() + env.set_parallelism(1) + agents_env = AgentsExecutionEnvironment.get_execution_environment(env) + + # Add Ollama chat model connection to be used by the ParallelChatAgent. + agents_env.add_resource( + "ollama_server", + ResourceType.CHAT_MODEL_CONNECTION, + ResourceDescriptor( + clazz=ResourceName.ChatModel.OLLAMA_CONNECTION, + request_timeout=240.0, + ), + ) + + # Create input stream with a single restaurant review. + input_stream = env.from_collection( + collection=[{"id": 1, "text": INPUT_TEXT}], + ) + + # Use the ParallelChatAgent to analyze the review with parallel LLM calls. + output_stream = ( + agents_env.from_datastream( + input=input_stream, key_selector=lambda x: x["id"] + ) + .apply(ParallelChatAgent()) + .to_datastream() + ) + + # Print the analysis results to stdout. + output_stream.print() + + # Execute the Flink pipeline. + agents_env.execute() + + +if __name__ == "__main__": + main()