Skip to content

Commit 9c06edd

Browse files
committed
Update LangGraph samples to current sdk-python main API
The plugin API has evolved past PR #1448: - LangGraphPlugin(graphs=...) now takes dict[str, StateGraph], not list - Same for entrypoints (dict[str, Pregel]) - Each node/task must declare execute_in: "activity" | "workflow" in metadata - Workflows look up registered graphs/entrypoints via graph(name) / entrypoint(name) instead of importing them directly - get_cache/set_cache replaced by cache() and graph(name, cache=...) / entrypoint(name, cache=...) Other changes: - State schemas converted from primitives (str, int) to TypedDict, since langgraph's StateT is bound to TypedDict / dataclass / pydantic models - Module-level graph instances replaced with make_<name>_graph() factories because LangGraphPlugin mutates node metadata in-place at construction time, which prevented reuse across tests - langgraph dep group now pulls langchain, langchain-anthropic, and temporalio[langgraph,langsmith] so the langsmith_tracing samples and human_in_the_loop's RunnableConfig usage actually resolve
1 parent a5e3871 commit 9c06edd

33 files changed

Lines changed: 293 additions & 141 deletions

langgraph_plugin/functional_api/continue_as_new/run_worker.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,14 @@
1111
PipelineFunctionalWorkflow,
1212
activity_options,
1313
all_tasks,
14+
pipeline_entrypoint,
1415
)
1516

1617

1718
async def main() -> None:
1819
client = await Client.connect(os.environ.get("TEMPORAL_ADDRESS", "localhost:7233"))
1920
plugin = LangGraphPlugin(
21+
entrypoints={"pipeline": pipeline_entrypoint},
2022
tasks=all_tasks,
2123
activity_options=activity_options,
2224
)

langgraph_plugin/functional_api/continue_as_new/workflow.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from langgraph.func import entrypoint as lg_entrypoint
1111
from langgraph.func import task
1212
from temporalio import workflow
13-
from temporalio.contrib.langgraph import get_cache, set_cache
13+
from temporalio.contrib.langgraph import cache, entrypoint
1414

1515

1616
@task
@@ -40,10 +40,13 @@ async def pipeline_entrypoint(data: int) -> dict:
4040
return {"result": loaded}
4141

4242

43-
all_tasks = [extract, transform, load]
43+
all_tasks: list[Any] = [extract, transform, load]
4444

4545
activity_options = {
46-
t.func.__name__: {"start_to_close_timeout": timedelta(seconds=30)}
46+
t.func.__name__: {
47+
"execute_in": "activity",
48+
"start_to_close_timeout": timedelta(seconds=30),
49+
}
4750
for t in all_tasks
4851
}
4952

@@ -65,14 +68,14 @@ class PipelineFunctionalWorkflow:
6568

6669
@workflow.run
6770
async def run(self, input_data: PipelineInput) -> dict[str, Any]:
68-
set_cache(input_data.cache)
69-
result = await pipeline_entrypoint.ainvoke(input_data.data)
71+
app = entrypoint("pipeline", cache=input_data.cache)
72+
result = await app.ainvoke(input_data.data)
7073

7174
if input_data.phase < 3:
7275
workflow.continue_as_new(
7376
PipelineInput(
7477
data=input_data.data,
75-
cache=get_cache(),
78+
cache=cache(),
7679
phase=input_data.phase + 1,
7780
)
7881
)

langgraph_plugin/functional_api/control_flow/run_worker.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,14 @@
1111
ControlFlowWorkflow,
1212
activity_options,
1313
all_tasks,
14+
control_flow_pipeline,
1415
)
1516

1617

1718
async def main() -> None:
1819
client = await Client.connect(os.environ.get("TEMPORAL_ADDRESS", "localhost:7233"))
1920
plugin = LangGraphPlugin(
21+
entrypoints={"control-flow": control_flow_pipeline},
2022
tasks=all_tasks,
2123
activity_options=activity_options,
2224
)

langgraph_plugin/functional_api/control_flow/workflow.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,12 @@
77
"""
88

99
from datetime import timedelta
10+
from typing import Any
1011

1112
from langgraph.func import entrypoint as lg_entrypoint
1213
from langgraph.func import task
1314
from temporalio import workflow
15+
from temporalio.contrib.langgraph import entrypoint
1416

1517

1618
@task
@@ -74,7 +76,7 @@ async def control_flow_pipeline(items: list[str]) -> dict:
7476
return {"results": results, "summary": summary_text, "total": len(results)}
7577

7678

77-
all_tasks = [
79+
all_tasks: list[Any] = [
7880
validate_item,
7981
classify_item,
8082
process_urgent,
@@ -83,7 +85,10 @@ async def control_flow_pipeline(items: list[str]) -> dict:
8385
]
8486

8587
activity_options = {
86-
t.func.__name__: {"start_to_close_timeout": timedelta(seconds=30)}
88+
t.func.__name__: {
89+
"execute_in": "activity",
90+
"start_to_close_timeout": timedelta(seconds=30),
91+
}
8792
for t in all_tasks
8893
}
8994

@@ -92,4 +97,4 @@ async def control_flow_pipeline(items: list[str]) -> dict:
9297
class ControlFlowWorkflow:
9398
@workflow.run
9499
async def run(self, items: list[str]) -> dict:
95-
return await control_flow_pipeline.ainvoke(items)
100+
return await entrypoint("control-flow").ainvoke(items)

langgraph_plugin/functional_api/hello_world/run_worker.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,14 @@
1111
HelloWorldFunctionalWorkflow,
1212
activity_options,
1313
all_tasks,
14+
hello_entrypoint,
1415
)
1516

1617

1718
async def main() -> None:
1819
client = await Client.connect(os.environ.get("TEMPORAL_ADDRESS", "localhost:7233"))
1920
plugin = LangGraphPlugin(
21+
entrypoints={"hello-world": hello_entrypoint},
2022
tasks=all_tasks,
2123
activity_options=activity_options,
2224
)

langgraph_plugin/functional_api/hello_world/workflow.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@
44
"""
55

66
from datetime import timedelta
7+
from typing import Any
78

89
from langgraph.func import entrypoint as lg_entrypoint
910
from langgraph.func import task
1011
from temporalio import workflow
12+
from temporalio.contrib.langgraph import entrypoint
1113

1214

1315
@task
@@ -23,10 +25,13 @@ async def hello_entrypoint(query: str) -> dict:
2325
return {"result": result}
2426

2527

26-
all_tasks = [process_query]
28+
all_tasks: list[Any] = [process_query]
2729

2830
activity_options = {
29-
t.func.__name__: {"start_to_close_timeout": timedelta(seconds=10)}
31+
t.func.__name__: {
32+
"execute_in": "activity",
33+
"start_to_close_timeout": timedelta(seconds=10),
34+
}
3035
for t in all_tasks
3136
}
3237

@@ -35,4 +40,4 @@ async def hello_entrypoint(query: str) -> dict:
3540
class HelloWorldFunctionalWorkflow:
3641
@workflow.run
3742
async def run(self, query: str) -> dict:
38-
return await hello_entrypoint.ainvoke(query)
43+
return await entrypoint("hello-world").ainvoke(query)

langgraph_plugin/functional_api/human_in_the_loop/run_worker.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,14 @@
1111
ChatbotFunctionalWorkflow,
1212
activity_options,
1313
all_tasks,
14+
chatbot_entrypoint,
1415
)
1516

1617

1718
async def main() -> None:
1819
client = await Client.connect(os.environ.get("TEMPORAL_ADDRESS", "localhost:7233"))
1920
plugin = LangGraphPlugin(
21+
entrypoints={"chatbot": chatbot_entrypoint},
2022
tasks=all_tasks,
2123
activity_options=activity_options,
2224
)

langgraph_plugin/functional_api/human_in_the_loop/workflow.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from langgraph.func import task
1313
from langgraph.types import Command, interrupt
1414
from temporalio import workflow
15+
from temporalio.contrib.langgraph import entrypoint
1516

1617

1718
@task
@@ -40,10 +41,13 @@ async def chatbot_entrypoint(user_message: str) -> dict:
4041
return {"response": final_response}
4142

4243

43-
all_tasks = [generate_draft, request_human_review]
44+
all_tasks: list[Any] = [generate_draft, request_human_review]
4445

4546
activity_options = {
46-
t.func.__name__: {"start_to_close_timeout": timedelta(seconds=30)}
47+
t.func.__name__: {
48+
"execute_in": "activity",
49+
"start_to_close_timeout": timedelta(seconds=30),
50+
}
4751
for t in all_tasks
4852
}
4953

@@ -66,21 +70,22 @@ def get_draft(self) -> str | None:
6670

6771
@workflow.run
6872
async def run(self, user_message: str) -> dict[str, Any]:
69-
chatbot_entrypoint.checkpointer = InMemorySaver()
73+
app = entrypoint("chatbot")
74+
app.checkpointer = InMemorySaver()
7075
config = RunnableConfig(
7176
{"configurable": {"thread_id": workflow.info().workflow_id}}
7277
)
7378

7479
# First invocation: runs until interrupt() pauses for human review
75-
result = await chatbot_entrypoint.ainvoke(user_message, config, version="v2")
80+
result = await app.ainvoke(user_message, config, version="v2")
7681

7782
self._draft = result.interrupts[0].value
7883

7984
# Wait for human feedback via Temporal signal
8085
await workflow.wait_condition(lambda: self._human_input is not None)
8186

8287
# Resume with the human's feedback
83-
resumed = await chatbot_entrypoint.ainvoke(
88+
resumed = await app.ainvoke(
8489
Command(resume=self._human_input), config, version="v2"
8590
)
8691
return resumed.value

langgraph_plugin/functional_api/langsmith_tracing/run_worker.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
ChatFunctionalWorkflow,
1616
activity_options,
1717
all_tasks,
18+
chat_entrypoint,
1819
)
1920

2021

@@ -30,6 +31,7 @@ async def main() -> None:
3031
workflows=[ChatFunctionalWorkflow],
3132
plugins=[
3233
LangGraphPlugin(
34+
entrypoints={"chat": chat_entrypoint},
3335
tasks=all_tasks,
3436
activity_options=activity_options,
3537
)

langgraph_plugin/functional_api/langsmith_tracing/workflow.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,21 +8,22 @@
88
"""
99

1010
from datetime import timedelta
11+
from typing import Any
1112

13+
from langchain.chat_models import init_chat_model
1214
from langgraph.func import entrypoint as lg_entrypoint
1315
from langgraph.func import task
1416
from langsmith import traceable
1517
from temporalio import workflow
16-
17-
from langchain.chat_models import init_chat_model
18+
from temporalio.contrib.langgraph import entrypoint
1819

1920

2021
@task
2122
@traceable(name="chat_task", run_type="chain")
2223
def chat(message: str) -> str:
2324
"""Call an LLM to respond to the message. Traced by LangSmith."""
2425
response = init_chat_model("claude-sonnet-4-6").invoke(message)
25-
return response.content
26+
return str(response.content)
2627

2728

2829
@lg_entrypoint()
@@ -32,10 +33,13 @@ async def chat_entrypoint(message: str) -> dict:
3233
return {"response": response}
3334

3435

35-
all_tasks = [chat]
36+
all_tasks: list[Any] = [chat]
3637

3738
activity_options = {
38-
t.func.__name__: {"start_to_close_timeout": timedelta(seconds=30)}
39+
t.func.__name__: {
40+
"execute_in": "activity",
41+
"start_to_close_timeout": timedelta(seconds=30),
42+
}
3943
for t in all_tasks
4044
}
4145

@@ -44,4 +48,4 @@ async def chat_entrypoint(message: str) -> dict:
4448
class ChatFunctionalWorkflow:
4549
@workflow.run
4650
async def run(self, message: str) -> dict:
47-
return await chat_entrypoint.ainvoke(message)
51+
return await entrypoint("chat").ainvoke(message)

0 commit comments

Comments
 (0)