Skip to content

Commit 0522490

Browse files
brianstrauchDABH
andauthored
AI-36: Add LangGraph plugin (#1448)
* add langgraph plugin * add experimental package warnings * fix ruff lint * fix pyright lint errors * fixed some mypy lints * fix docstring lints * copilot code review * fix mypy lint * separate graphs and entrypoints by task queue to avoid concurrent write bug * use graph.node or task_id for activity names to avoid collisions * rm local conftest in favor of global and fix lint * allow langgraph 1.1 * uv lock * add default_activity_options * add replay test * fix gaps in missing tests * introduce an interceptor to patch is_running only in the workflow, then use init functions for graph compilation * add interceptor * remove graph and entrypoint functions in favor of direct graph usage * rename cache() to get_cache() * remove interceptor * allow metadata to be accessed from node func and test * Fix import sorting in test_node_metadata.py * Fix formatting in langgraph_plugin.py * Fix mypy errors: add type params to StateGraph and use State() constructor * Fix langsmith sandbox crash when langchain_core is installed The LangSmithPlugin passes langsmith through the workflow sandbox, but langsmith conditionally imports langchain_core when it's available. With the langgraph extra adding langchain_core as a transitive dep, this conditional import now fires inside the sandbox, where langchain_core's lazy module loading triggers a restricted access on concurrent.futures.ThreadPoolExecutor. Two fixes: - Pre-import langchain_core.runnables.config at plugin load time so it's in sys.modules before the sandbox starts - Add langchain_core to the sandbox passthrough list - Add a timeout to _poll_query in langsmith tests to prevent infinite hangs if a workflow never reaches the expected state * Suppress basedpyright unused import warning for langchain_core preload * Skip langgraph async tests on Python < 3.11 and warn plugin users LangGraph's Functional API (@task/@entrypoint) and interrupt() require Python >= 3.11 for async context variable propagation via asyncio.create_task(context=...). On older versions, get_config() raises "Called get_config outside of a runnable context". This is a documented LangGraph limitation: https://reference.langchain.com/python/langgraph/config/get_store/ - Skip test_e2e_functional.py, test_interrupt.py, and test_replay_interrupt on Python < 3.11 - Add a runtime warning in LangGraphPlugin.__init__ on Python < 3.11 * Remove duplicate pytest import in test_interrupt.py * Fix basedpyright reportUnreachable warning on version check * Increase execution_timeout for OpenAI tests that call the real API test_hello_world_agent[False] had a 5s execution timeout and test_input_guardrail[False] had a 10s timeout, but both use a 30s activity start_to_close_timeout. The workflow times out before the OpenAI API call can complete on slower CI runners. Bump both to 60s. * Revert "allow metadata to be accessed from node func and test" This reverts commit c84c22f. * Revert "rename cache() to get_cache()" This reverts commit ec8244c. * Revert "remove graph and entrypoint functions in favor of direct graph usage" This reverts commit 8ef609f. * reimplement node metadata fixes * scope graphs and entrypoints to workflow, rename files * test sync nodes and tasks, send * support command goto/update * add test for command * raise error if node or task has a retry policy * support runtime context * fix lint * Revert changes to langsmith test_integration.py * code review * Remove langchain_core from LangSmith plugin sandbox passthroughs (CI experiment) * Restore langchain_core to LangSmith plugin sandbox passthroughs * underscore py files in langgraph plugin dir * include all serializable data for langgraph config * mention langsmith tracing in readme * require execute_in * fix lint * fix flaky test * fix docs --------- Co-authored-by: DABH <DABH@users.noreply.github.com>
1 parent 776bae2 commit 0522490

30 files changed

Lines changed: 2850 additions & 8 deletions

pyproject.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ opentelemetry = ["opentelemetry-api>=1.11.1,<2", "opentelemetry-sdk>=1.11.1,<2"]
3030
pydantic = ["pydantic>=2.0.0,<3"]
3131
openai-agents = ["openai-agents>=0.14.0", "mcp>=1.9.4, <2"]
3232
google-adk = ["google-adk>=1.27.0,<2"]
33+
langgraph = ["langgraph>=1.1.0"]
3334
langsmith = ["langsmith>=0.7.0,<0.8"]
3435
lambda-worker-otel = [
3536
"opentelemetry-api>=1.11.1,<2",
@@ -79,6 +80,7 @@ dev = [
7980
"pytest-rerunfailures>=16.1",
8081
"pytest-xdist>=3.6,<4",
8182
"moto[s3,server]>=5",
83+
"langgraph>=1.1.0",
8284
"langsmith>=0.7.0,<0.8",
8385
"setuptools<82",
8486
"opentelemetry-exporter-otlp-proto-grpc>=1.11.1,<2",
Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
# LangGraph Plugin for Temporal Python SDK
2+
3+
⚠️ **This package is currently at an experimental release stage.** ⚠️
4+
5+
This Temporal [Plugin](https://docs.temporal.io/develop/plugins-guide) allows you to run [LangGraph](https://www.langchain.com/langgraph) nodes and tasks as Temporal Activities, giving your AI workflows durable execution, automatic retries, and timeouts. It supports both the LangGraph Graph API (``StateGraph``) and Functional API (``@entrypoint`` / ``@task``).
6+
7+
## Installation
8+
9+
```sh
10+
uv add temporalio[langgraph]
11+
```
12+
13+
## Plugin Initialization
14+
15+
### Graph API
16+
17+
```python
18+
from langgraph.graph import StateGraph
19+
from temporalio.contrib.langgraph import LangGraphPlugin
20+
21+
g = StateGraph(State)
22+
g.add_node("my_node", my_node, metadata={"execute_in": "activity"})
23+
24+
plugin = LangGraphPlugin(graphs={"my-graph": g})
25+
```
26+
27+
### Functional API
28+
29+
```python
30+
from temporalio.contrib.langgraph import LangGraphPlugin
31+
32+
plugin = LangGraphPlugin(
33+
entrypoints={"my_entrypoint": my_entrypoint},
34+
tasks=[my_task],
35+
activity_options={"my_task": {"execute_in": "activity"}},
36+
)
37+
```
38+
39+
## Checkpointer
40+
41+
If your LangGraph code requires a checkpointer (for example, if you're using interrupts), use `InMemorySaver`.
42+
Temporal handles durability, so third-party checkpointers (like PostgreSQL or Redis) are not needed.
43+
44+
```python
45+
import langgraph.checkpoint.memory
46+
import typing
47+
48+
from temporalio.contrib.langgraph import graph
49+
from temporalio import workflow
50+
51+
@workflow.defn
52+
class MyWorkflow:
53+
@workflow.run
54+
async def run(self, input: str) -> typing.Any:
55+
g = graph("my-graph").compile(
56+
checkpointer=langgraph.checkpoint.memory.InMemorySaver(),
57+
)
58+
59+
...
60+
```
61+
62+
## Execution Location
63+
64+
Every node (Graph API) and task (Functional API) must be labeled with `execute_in`, set to either `"activity"` or `"workflow"`. This is required per node/task; it cannot be set in `default_activity_options`.
65+
66+
```python
67+
# Graph API
68+
graph.add_node("my_node", my_node, metadata={"execute_in": "activity"})
69+
graph.add_node("tool_node", tool_node, metadata={"execute_in": "workflow"})
70+
71+
# Functional API
72+
plugin = LangGraphPlugin(
73+
tasks=[my_task, tool_task],
74+
activity_options={
75+
"my_task": {"execute_in": "activity"},
76+
"tool_task": {"execute_in": "workflow"},
77+
},
78+
)
79+
```
80+
81+
## Activity Options
82+
83+
Options are passed through to [`workflow.execute_activity()`](https://python.temporal.io/temporalio.workflow.html#execute_activity), which supports parameters like `start_to_close_timeout`, `retry_policy`, `schedule_to_close_timeout`, `heartbeat_timeout`, and more.
84+
85+
### Graph API
86+
87+
Pass Activity options as node `metadata` when calling `add_node`:
88+
89+
```python
90+
from datetime import timedelta
91+
from temporalio.common import RetryPolicy
92+
93+
g = StateGraph(State)
94+
g.add_node("my_node", my_node, metadata={
95+
"execute_in": "activity",
96+
"start_to_close_timeout": timedelta(seconds=30),
97+
"retry_policy": RetryPolicy(maximum_attempts=3),
98+
})
99+
```
100+
101+
### Functional API
102+
103+
Pass Activity options to the `LangGraphPlugin` constructor, keyed by task function name:
104+
105+
```python
106+
from datetime import timedelta
107+
from temporalio.common import RetryPolicy
108+
from temporalio.contrib.langgraph import LangGraphPlugin
109+
110+
plugin = LangGraphPlugin(
111+
entrypoints={"my_entrypoint": my_entrypoint},
112+
tasks=[my_task],
113+
activity_options={
114+
"my_task": {
115+
"execute_in": "activity",
116+
"start_to_close_timeout": timedelta(seconds=30),
117+
"retry_policy": RetryPolicy(maximum_attempts=3),
118+
},
119+
},
120+
)
121+
```
122+
123+
### Runtime Context
124+
125+
LangGraph's run-scoped context (`context_schema`) is reconstructed on the Activity side, so nodes and tasks can read from and write to `runtime.context`:
126+
127+
```python
128+
from langgraph.runtime import Runtime
129+
from typing_extensions import TypedDict
130+
131+
from temporalio.contrib.langgraph import graph
132+
133+
class Context(TypedDict):
134+
user_id: str
135+
136+
async def my_node(state: State, runtime: Runtime[Context]) -> dict:
137+
return {"user": runtime.context["user_id"]}
138+
139+
# In the Workflow:
140+
g = graph("my-graph").compile()
141+
await g.ainvoke({...}, context=Context(user_id="alice"))
142+
```
143+
144+
Your `context` object must be serializable by the configured Temporal payload converter, since it crosses the Activity boundary.
145+
146+
## Tracing
147+
148+
We recommend the [Temporal LangSmith Plugin](https://github.com/temporalio/sdk-python/tree/main/temporalio/contrib/langsmith) to trace your LangGraph Workflows and Activities.
149+
150+
## Stores are not supported
151+
152+
LangGraph's `Store` (e.g. `InMemoryStore` passed via `graph.compile(store=...)` or `@entrypoint(store=...)`) isn't accessible inside Activity-wrapped nodes: the Store holds live state that can't cross the Activity boundary, and Activities may run on a different worker than the Workflow. If you pass a store, the plugin logs a warning on first use and `runtime.store` is `None` inside nodes.
153+
154+
Use Workflow state for per-run memory, or an external database (Postgres/Redis/etc.) configured on each worker if you need shared memory across runs.
155+
156+
## Running Tests
157+
158+
Install dependencies:
159+
160+
```sh
161+
uv sync --all-extras
162+
```
163+
164+
Run the test suite:
165+
166+
```sh
167+
uv run pytest tests/contrib/langgraph
168+
```
169+
170+
Tests start a local Temporal dev server automatically — no external server needed.
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
"""LangGraph plugin for Temporal SDK.
2+
3+
.. warning::
4+
This package is experimental and may change in future versions.
5+
Use with caution in production environments.
6+
7+
This plugin runs `LangGraph <https://github.com/langchain-ai/langgraph>`_ nodes
8+
and tasks as Temporal Activities, giving your AI agent workflows durable
9+
execution, automatic retries, and timeouts. It supports both the LangGraph Graph
10+
API (``StateGraph``) and Functional API (``@entrypoint`` / ``@task``).
11+
"""
12+
13+
from temporalio.contrib.langgraph._plugin import (
14+
LangGraphPlugin,
15+
cache,
16+
entrypoint,
17+
graph,
18+
)
19+
20+
__all__ = [
21+
"LangGraphPlugin",
22+
"entrypoint",
23+
"cache",
24+
"graph",
25+
]
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
"""Activity wrappers for executing LangGraph nodes and tasks."""
2+
3+
from collections.abc import Awaitable
4+
from dataclasses import dataclass
5+
from inspect import iscoroutinefunction, signature
6+
from typing import Any, Callable
7+
8+
from langgraph.errors import GraphInterrupt
9+
from langgraph.types import Command, Interrupt
10+
11+
from temporalio import workflow
12+
from temporalio.contrib.langgraph._langgraph_config import (
13+
get_langgraph_config,
14+
set_langgraph_config,
15+
strip_runnable_config,
16+
)
17+
from temporalio.contrib.langgraph._task_cache import (
18+
cache_key,
19+
cache_lookup,
20+
cache_put,
21+
)
22+
23+
# Per-run dedupe so we only warn once when a user passes a Store via
24+
# graph.compile(store=...) / @entrypoint(store=...). Cleared by
25+
# LangGraphInterceptor.execute_workflow on workflow exit.
26+
_warned_store_runs: set[str] = set()
27+
28+
29+
def clear_store_warning(run_id: str) -> None:
30+
"""Drop the store-warning dedupe entry for a workflow run."""
31+
_warned_store_runs.discard(run_id)
32+
33+
34+
@dataclass
35+
class ActivityInput:
36+
"""Input for a LangGraph activity, containing args, kwargs, and config."""
37+
38+
args: tuple[Any, ...]
39+
kwargs: dict[str, Any]
40+
langgraph_config: dict[str, Any]
41+
42+
43+
@dataclass
44+
class ActivityOutput:
45+
"""Output from an Activity, containing result, command, or interrupts."""
46+
47+
result: Any = None
48+
langgraph_command: Any = None
49+
langgraph_interrupts: tuple[Interrupt] | None = None
50+
51+
52+
def wrap_activity(
53+
func: Callable,
54+
) -> Callable[[ActivityInput], Awaitable[ActivityOutput]]:
55+
"""Wrap a function as a Temporal activity that handles LangGraph config and interrupts."""
56+
# Graph nodes declare `runtime: Runtime[Ctx]` in their signature; tasks
57+
# don't and instead reach for Runtime via get_runtime(). We re-inject the
58+
# reconstructed Runtime only when the user function asks.
59+
accepts_runtime = "runtime" in signature(func).parameters
60+
61+
async def wrapper(input: ActivityInput) -> ActivityOutput:
62+
runtime = set_langgraph_config(input.langgraph_config)
63+
kwargs = dict(input.kwargs)
64+
if accepts_runtime:
65+
kwargs["runtime"] = runtime
66+
try:
67+
if iscoroutinefunction(func):
68+
result = await func(*input.args, **kwargs)
69+
else:
70+
result = func(*input.args, **kwargs)
71+
if isinstance(result, Command):
72+
return ActivityOutput(langgraph_command=result)
73+
return ActivityOutput(result=result)
74+
except GraphInterrupt as e:
75+
return ActivityOutput(langgraph_interrupts=e.args[0])
76+
77+
return wrapper
78+
79+
80+
def wrap_execute_activity(
81+
afunc: Callable[[ActivityInput], Awaitable[ActivityOutput]],
82+
task_id: str = "",
83+
**execute_activity_kwargs: Any,
84+
) -> Callable[..., Any]:
85+
"""Wrap an activity function to be called via workflow.execute_activity with caching."""
86+
87+
async def wrapper(*args: Any, **kwargs: Any) -> Any:
88+
# LangGraph may inject a RunnableConfig as the 'config' kwarg. Strip it
89+
# down to a serializable subset so it can cross the activity boundary;
90+
# callbacks, stores, etc. aren't serializable.
91+
if "config" in kwargs:
92+
kwargs["config"] = strip_runnable_config(kwargs["config"])
93+
94+
# LangGraph may inject a Runtime as the 'runtime' kwarg. It's
95+
# reconstructed on the activity side from the serialized langgraph
96+
# config, so drop the live Runtime from the kwargs that cross the
97+
# activity boundary (it holds non-serializable stream_writer, store).
98+
runtime = kwargs.pop("runtime", None)
99+
run_id = workflow.info().run_id
100+
if (
101+
getattr(runtime, "store", None) is not None
102+
and run_id not in _warned_store_runs
103+
):
104+
_warned_store_runs.add(run_id)
105+
workflow.logger.warning(
106+
"LangGraph Store passed via compile(store=...) / @entrypoint(store=...) "
107+
"is not accessible inside activity-wrapped nodes and tasks: the Store "
108+
"object isn't serializable across the activity boundary, and activities "
109+
"may run on a different worker than the workflow. Use a backend-backed "
110+
"store (Postgres/Redis) configured on each worker if you need shared "
111+
"memory, or use workflow state for per-run memory."
112+
)
113+
114+
langgraph_config = get_langgraph_config()
115+
116+
# Check task result cache (for continue-as-new deduplication).
117+
key = (
118+
cache_key(task_id, args, kwargs, langgraph_config.get("context"))
119+
if task_id
120+
else ""
121+
)
122+
if task_id:
123+
found, cached = cache_lookup(key)
124+
if found:
125+
return cached
126+
127+
input = ActivityInput(
128+
args=args, kwargs=kwargs, langgraph_config=langgraph_config
129+
)
130+
output = await workflow.execute_activity(
131+
afunc, input, **execute_activity_kwargs
132+
)
133+
if output.langgraph_interrupts is not None:
134+
raise GraphInterrupt(output.langgraph_interrupts)
135+
136+
result = output.result
137+
if output.langgraph_command is not None:
138+
cmd = output.langgraph_command
139+
result = Command(
140+
graph=cmd["graph"],
141+
update=cmd["update"],
142+
resume=cmd["resume"],
143+
goto=cmd["goto"],
144+
)
145+
146+
# Store in cache for future continue-as-new cycles.
147+
if task_id:
148+
cache_put(key, result)
149+
150+
return result
151+
152+
return wrapper

0 commit comments

Comments
 (0)