Skip to content

Commit 601f5d4

Browse files
authored
[AI-72] LangGraph plugin samples (#289)
* Add LangGraph plugin samples Seven samples demonstrating the Temporal LangGraph plugin across both the Graph API and Functional API: - Human-in-the-loop: interrupt() + Temporal signals for chatbot approval - Continue-as-new: task result caching across workflow boundaries - ReAct agent: tool-calling loop with conditional edges / while loop - Control flow (Functional API only): parallel, for-loop, if/else Related SDK PR: temporalio/sdk-python#1448 * Remove explicit langchain deps from langgraph group to avoid conflict * Declare langchain and langgraph as conflicting dependency groups * Fix import sorting and formatting * Update dependencies and lint config (temporarily - waiting for SDK PR merge) so that CI otherwise passes * Add hello_world samples, per-sample READMEs, tests, __init__ docstrings, and configurable client * Address review: consistent v2 API, unique thread_id, SDK prereq note * Remove SDK prereq note - PR won't merge until plugin is released * Add LangSmith tracing sample (Graph API + Functional API) Combines LangGraphPlugin (durable execution) with LangSmithPlugin (observability) for full tracing of LLM calls through Temporal workflows. * Update all samples to match SDK PR #1448 API - graphs parameter is now a list, not a dict - No graph()/entrypoint() lookup functions — reference objects directly - No entrypoints parameter on LangGraphPlugin - Use set_cache()/get_cache() for continue-as-new caching - Update tests and READMEs accordingly * Update LangGraph samples to current sdk-python main API The plugin API has evolved past PR #1448: - LangGraphPlugin(graphs=...) now takes dict[str, StateGraph], not list - Same for entrypoints (dict[str, Pregel]) - Each node/task must declare execute_in: "activity" | "workflow" in metadata - Workflows look up registered graphs/entrypoints via graph(name) / entrypoint(name) instead of importing them directly - get_cache/set_cache replaced by cache() and graph(name, cache=...) / entrypoint(name, cache=...) Other changes: - State schemas converted from primitives (str, int) to TypedDict, since langgraph's StateT is bound to TypedDict / dataclass / pydantic models - Module-level graph instances replaced with make_<name>_graph() factories because LangGraphPlugin mutates node metadata in-place at construction time, which prevented reuse across tests - langgraph dep group now pulls langchain, langchain-anthropic, and temporalio[langgraph,langsmith] so the langsmith_tracing samples and human_in_the_loop's RunnableConfig usage actually resolve * CI: install protoc for building temporalio from sdk-python git source * Skip LangGraph functional API + interrupt tests on Python < 3.11 These features rely on contextvars propagation through asyncio.create_task(), which is only available in Python 3.11+. On 3.10 they hang rather than fail cleanly, so CI sat for an hour before today's fix landed. Mirrors the pytestmark skipif used in sdk-python's own contrib/langgraph tests. * Address review feedback on LangGraph samples - Drop cross-references between Functional API and Graph API READMEs so each stands alone (review comment 1) - Combine run_worker.py + run_workflow.py into a single main.py for the langsmith_tracing samples to reduce setup steps (review comment 2) - Fix workflow -> Workflow casing (review comment 3) - Alias temporalio's entrypoint and graph helpers as temporal_entrypoint / temporal_graph at import sites to avoid the langgraph.func.entrypoint collision (review comment 4) - Demonstrate @Traceable in three places in both langsmith_tracing samples: on the Activity/task itself, on a helper called from the Activity, and on a helper called from the Workflow (review comment 5) - Drop unnecessary single-task list comprehension for activity_options in hello_world and langsmith_tracing functional samples (review comment 6) - Spell out 'temporal server start-dev' in every sample README's prerequisites (review comment 7) - Fix stale README references to get_cache() now that the API is cache() (review comment 8) - Rename arbitrarily-named ETL pipeline functions in continue_as_new to honest math names (double / add_50 / triple) per review comment 9 * Update LangGraph samples for temporalio 1.27.0 release - Drop the [tool.uv.sources] git pin; consume temporalio from PyPI now that 1.27.0 ships the LangGraph plugin. - Keep the protoc CI step around as a permanent fixture so we don't have to add and remove it each time we iterate on a new plugin sample. - Replace dict-comprehension activity_options with explicit per-task literals in the Functional API samples. - Emphasize workflow.wait_condition() in the human-in-the-loop READMEs as the durable wait primitive that pairs with interrupt() and signals. - Clarify that the task result cache is provided by the LangGraph plugin (Temporal), not by upstream LangGraph. - Drop "Same pattern as the Graph API version" cross-references from Functional API workflow docstrings; readers pick one API style. - Tighten the Functional API LangSmith tracing README copy. - Point the parent README to the released contrib package docs.
1 parent 6b8e441 commit 601f5d4

71 files changed

Lines changed: 2873 additions & 92 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.github/workflows/ci.yml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,14 @@ jobs:
3333
- uses: actions/setup-python@a309ff8b426b58ec0e2a45f0f869d46889d02405 # v6
3434
with:
3535
python-version: ${{ matrix.python }}
36+
# protoc is needed whenever uv builds temporalio from a git source
37+
# (e.g. when [tool.uv.sources] pins a branch while a feature is in
38+
# development). Kept on by default so we don't have to add and remove
39+
# this step every time we iterate on a new plugin.
40+
- uses: arduino/setup-protoc@c65c819552d16ad3c9b72d9dfd5ba5237b9c906b # v3
41+
with:
42+
version: "23.x"
43+
repo-token: ${{ secrets.GITHUB_TOKEN }}
3644
- run: uv tool install poethepoet
3745
- run: uv sync --group=dsl --group=encryption --group=trio-async
3846
- run: poe lint

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ Some examples require extra dependencies. See each sample's directory for specif
7272
* [gevent_async](gevent_async) - Combine gevent and Temporal.
7373
* [hello_standalone_activity](hello_standalone_activity) - Use activities without using a workflow.
7474
* [langchain](langchain) - Orchestrate workflows for LangChain.
75+
* [langgraph_plugin](langgraph_plugin) - Run LangGraph workflows as durable Temporal workflows (Graph API and Functional API).
7576
* [message_passing/introduction](message_passing/introduction/) - Introduction to queries, signals, and updates.
7677
* [message_passing/safe_message_handlers](message_passing/safe_message_handlers/) - Safely handling updates and signals.
7778
* [message_passing/update_with_start/lazy_initialization](message_passing/update_with_start/lazy_initialization/) - Use update-with-start to update a Shopping Cart, starting it if it does not exist.

langgraph_plugin/README.md

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
# LangGraph Plugin Samples
2+
3+
These samples demonstrate the [Temporal LangGraph plugin](https://github.com/temporalio/sdk-python/tree/main/temporalio/contrib/langgraph), which runs LangGraph workflows as durable Temporal workflows. Each LangGraph graph node (Graph API) or `@task` (Functional API) executes as a Temporal activity with automatic retries, timeouts, and crash recovery.
4+
5+
Samples are organized by API style:
6+
7+
- **Graph API** (`graph_api/`) -- Define workflows as `StateGraph` with nodes and edges.
8+
- **Functional API** (`functional_api/`) -- Define workflows with `@task` and `@entrypoint` decorators for an imperative programming style.
9+
10+
## Samples
11+
12+
| Sample | Graph API | Functional API | Description |
13+
|--------|:---------:|:--------------:|-------------|
14+
| **Hello World** | [graph_api/hello_world](graph_api/hello_world) | [functional_api/hello_world](functional_api/hello_world) | Minimal sample -- single node/task that processes a query string. Start here. |
15+
| **Human-in-the-loop** | [graph_api/human_in_the_loop](graph_api/human_in_the_loop) | [functional_api/human_in_the_loop](functional_api/human_in_the_loop) | Chatbot that uses `interrupt()` to pause for human approval, Temporal signals to receive feedback, and queries to expose the pending draft. |
16+
| **Continue-as-new** | [graph_api/continue_as_new](graph_api/continue_as_new) | [functional_api/continue_as_new](functional_api/continue_as_new) | Multi-stage data pipeline that uses `continue-as-new` with task result caching so previously-completed stages are not re-executed. |
17+
| **ReAct Agent** | [graph_api/react_agent](graph_api/react_agent) | [functional_api/react_agent](functional_api/react_agent) | Tool-calling agent loop. Graph API uses conditional edges; Functional API uses a `while` loop. |
18+
| **Control Flow** | -- | [functional_api/control_flow](functional_api/control_flow) | Demonstrates parallel task execution, `for` loops, and `if/else` branching -- patterns that are natural in the Functional API. |
19+
| **LangSmith Tracing** | [graph_api/langsmith_tracing](graph_api/langsmith_tracing) | [functional_api/langsmith_tracing](functional_api/langsmith_tracing) | Combines `LangGraphPlugin` with Temporal's `LangSmithPlugin` for durable execution + full observability of LLM calls. Requires API keys. |
20+
21+
## Prerequisites
22+
23+
1. Install dependencies:
24+
25+
```bash
26+
uv sync --group langgraph
27+
```
28+
29+
2. Start a [Temporal dev server](https://docs.temporal.io/cli#start-dev-server):
30+
31+
```bash
32+
temporal server start-dev
33+
```
34+
35+
## Running a Sample
36+
37+
Most samples have two scripts -- start the Worker first, then the Workflow starter in a separate terminal.
38+
39+
```bash
40+
# Terminal 1: start the Worker
41+
uv run langgraph_plugin/<api>/<sample>/run_worker.py
42+
43+
# Terminal 2: start the Workflow
44+
uv run langgraph_plugin/<api>/<sample>/run_workflow.py
45+
```
46+
47+
For example, to run the Graph API human-in-the-loop chatbot:
48+
49+
```bash
50+
# Terminal 1
51+
uv run langgraph_plugin/graph_api/human_in_the_loop/run_worker.py
52+
53+
# Terminal 2
54+
uv run langgraph_plugin/graph_api/human_in_the_loop/run_workflow.py
55+
```
56+
57+
The LangSmith Tracing samples bundle the Worker and Workflow execution into a single `main.py`:
58+
59+
```bash
60+
uv run langgraph_plugin/<api>/langsmith_tracing/main.py
61+
```
62+
63+
## Key Features Demonstrated
64+
65+
- **Durable execution** -- Every graph node / `@task` runs as a Temporal activity with configurable timeouts and retry policies.
66+
- **Human-in-the-loop** -- LangGraph's `interrupt()` pauses the graph; Temporal signals deliver human input; queries expose pending state to UIs.
67+
- **Continue-as-new with caching** -- `cache()` captures completed task results; passing the cache to the next execution avoids re-running them.
68+
- **Conditional routing** -- Graph API's `add_conditional_edges` and Functional API's native `if/else`/`while` for agent loops.
69+
- **Parallel execution** -- Functional API launches multiple tasks concurrently by creating futures before awaiting them.
70+
71+
## Related
72+
73+
- [Temporal LangGraph plugin docs](https://github.com/temporalio/sdk-python/tree/main/temporalio/contrib/langgraph)

langgraph_plugin/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
"""Temporal LangGraph plugin samples."""
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
"""LangGraph Functional API samples using @task and @entrypoint."""
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
# Continue-as-New with Caching (Functional API)
2+
3+
Demonstrates combining Temporal's continue-as-new with the LangGraph plugin's task result cache to avoid re-executing completed `@task` functions across workflow boundaries.
4+
5+
## What This Sample Demonstrates
6+
7+
- Task result caching across continue-as-new boundaries with `cache()`
8+
- Restoring cached results with `entrypoint(name, cache=...)`
9+
- Each `@task` executes exactly once despite multiple workflow invocations
10+
11+
## How It Works
12+
13+
1. Three tasks run sequentially: `double` (x2) -> `add_50` (+50) -> `triple` (x3).
14+
2. After the first invocation, the workflow continues-as-new with the cache.
15+
3. On subsequent invocations, all tasks return cached results instantly.
16+
4. Input 10 -> 20 -> 70 -> 210.
17+
18+
## Running the Sample
19+
20+
Prerequisites: `uv sync --group langgraph` and a running Temporal dev server (`temporal server start-dev`).
21+
22+
```bash
23+
# Terminal 1
24+
uv run langgraph_plugin/functional_api/continue_as_new/run_worker.py
25+
26+
# Terminal 2
27+
uv run langgraph_plugin/functional_api/continue_as_new/run_workflow.py
28+
```
29+
30+
## Files
31+
32+
| File | Description |
33+
|------|-------------|
34+
| `workflow.py` | `@task` functions, `@entrypoint`, `PipelineInput`, and `PipelineFunctionalWorkflow` |
35+
| `run_worker.py` | Registers tasks and entrypoint with `LangGraphPlugin`, starts Worker |
36+
| `run_workflow.py` | Executes the pipeline Workflow and prints the result |
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
"""Continue-as-new pipeline with task result caching."""
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
"""Worker for the continue-as-new pipeline (Functional API)."""
2+
3+
import asyncio
4+
import os
5+
6+
from temporalio.client import Client
7+
from temporalio.contrib.langgraph import LangGraphPlugin
8+
from temporalio.worker import Worker
9+
10+
from langgraph_plugin.functional_api.continue_as_new.workflow import (
11+
PipelineFunctionalWorkflow,
12+
activity_options,
13+
all_tasks,
14+
pipeline_entrypoint,
15+
)
16+
17+
18+
async def main() -> None:
19+
client = await Client.connect(os.environ.get("TEMPORAL_ADDRESS", "localhost:7233"))
20+
plugin = LangGraphPlugin(
21+
entrypoints={"pipeline": pipeline_entrypoint},
22+
tasks=all_tasks,
23+
activity_options=activity_options,
24+
)
25+
26+
worker = Worker(
27+
client,
28+
task_queue="langgraph-pipeline-functional",
29+
workflows=[PipelineFunctionalWorkflow],
30+
plugins=[plugin],
31+
)
32+
print("Worker started. Ctrl+C to exit.")
33+
await worker.run()
34+
35+
36+
if __name__ == "__main__":
37+
asyncio.run(main())
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
"""Start the continue-as-new pipeline workflow (Functional API)."""
2+
3+
import asyncio
4+
import os
5+
from datetime import timedelta
6+
7+
from temporalio.client import Client
8+
9+
from langgraph_plugin.functional_api.continue_as_new.workflow import (
10+
PipelineFunctionalWorkflow,
11+
PipelineInput,
12+
)
13+
14+
15+
async def main() -> None:
16+
client = await Client.connect(os.environ.get("TEMPORAL_ADDRESS", "localhost:7233"))
17+
18+
result = await client.execute_workflow(
19+
PipelineFunctionalWorkflow.run,
20+
PipelineInput(data=10),
21+
id="pipeline-functional-workflow",
22+
task_queue="langgraph-pipeline-functional",
23+
execution_timeout=timedelta(seconds=60),
24+
)
25+
26+
# 10*2=20 -> 20+50=70 -> 70*3=210
27+
print(f"Pipeline result: {result}")
28+
29+
30+
if __name__ == "__main__":
31+
asyncio.run(main())
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
"""Continue-as-new with caching using the LangGraph Functional API with Temporal.
2+
3+
Demonstrates Temporal's continue-as-new with the LangGraph plugin's task
4+
result cache to avoid re-executing completed @task functions across
5+
workflow boundaries.
6+
"""
7+
8+
from dataclasses import dataclass
9+
from datetime import timedelta
10+
from typing import Any
11+
12+
from langgraph.func import entrypoint, task
13+
from temporalio import workflow
14+
from temporalio.contrib.langgraph import cache
15+
from temporalio.contrib.langgraph import entrypoint as temporal_entrypoint
16+
17+
18+
@task
19+
def double(data: int) -> int:
20+
"""Stage 1: double the input."""
21+
return data * 2
22+
23+
24+
@task
25+
def add_50(data: int) -> int:
26+
"""Stage 2: add 50."""
27+
return data + 50
28+
29+
30+
@task
31+
def triple(data: int) -> int:
32+
"""Stage 3: triple the result."""
33+
return data * 3
34+
35+
36+
@entrypoint()
37+
async def pipeline_entrypoint(data: int) -> dict:
38+
"""Run the 3-stage pipeline: double -> add_50 -> triple."""
39+
doubled = await double(data)
40+
plus_50 = await add_50(doubled)
41+
tripled = await triple(plus_50)
42+
return {"result": tripled}
43+
44+
45+
all_tasks = [double, add_50, triple]
46+
47+
activity_options = {
48+
"double": {
49+
"execute_in": "activity",
50+
"start_to_close_timeout": timedelta(seconds=30),
51+
},
52+
"add_50": {
53+
"execute_in": "activity",
54+
"start_to_close_timeout": timedelta(seconds=30),
55+
},
56+
"triple": {
57+
"execute_in": "activity",
58+
"start_to_close_timeout": timedelta(seconds=30),
59+
},
60+
}
61+
62+
63+
@dataclass
64+
class PipelineInput:
65+
data: int
66+
cache: dict[str, Any] | None = None
67+
phase: int = 1
68+
69+
70+
@workflow.defn
71+
class PipelineFunctionalWorkflow:
72+
"""Runs the pipeline, continuing-as-new after each phase.
73+
74+
Input 10: 10*2=20 -> 20+50=70 -> 70*3=210
75+
Each task executes once; phases 2 and 3 use cached results.
76+
"""
77+
78+
@workflow.run
79+
async def run(self, input_data: PipelineInput) -> dict[str, Any]:
80+
app = temporal_entrypoint("pipeline", cache=input_data.cache)
81+
result = await app.ainvoke(input_data.data)
82+
83+
if input_data.phase < 3:
84+
workflow.continue_as_new(
85+
PipelineInput(
86+
data=input_data.data,
87+
cache=cache(),
88+
phase=input_data.phase + 1,
89+
)
90+
)
91+
92+
return result

0 commit comments

Comments
 (0)