Skip to content

Commit 496316e

Browse files
Febin .Satharclaude
authored andcommitted
feat: add Claude Agent SDK example with Temporal
Add a new cookbook example demonstrating how to run the Claude Agent SDK (claude-code-sdk) inside a Temporal workflow for durable agent execution. Key patterns demonstrated: - Background heartbeats via asyncio.create_task() (independent of event stream) - Staleness guard to detect hung agents (15-min idle threshold) - Response deduplication (AssistantMessage vs StreamEvent) - Errors modeled as completions (not exceptions) Includes workflow tests (mocked activities) and activity tests (mocked SDK). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent bcfbbdd commit 496316e

13 files changed

Lines changed: 2188 additions & 0 deletions

File tree

agents/claude_agent_sdk_python/README.md

Lines changed: 404 additions & 0 deletions
Large diffs are not rendered by default.

agents/claude_agent_sdk_python/activities/__init__.py

Whitespace-only changes.
Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,209 @@
1+
"""
2+
Agent Execution Activity — Claude Agent SDK with Temporal
3+
4+
This activity wraps the Claude Agent SDK (claude-agent-sdk) to provide durable,
5+
observable agent execution inside a Temporal workflow.
6+
7+
Key patterns demonstrated:
8+
1. Background heartbeats — keeps Temporal informed during long-running tool calls
9+
2. Staleness guard — stops heartbeating when the agent appears hung
10+
3. Response deduplication — only accumulates text from AssistantMessage events
11+
"""
12+
13+
import asyncio
14+
import os
15+
import time
16+
from datetime import datetime, timezone
17+
18+
from temporalio import activity
19+
20+
from models import AgentInput, AgentOutput
21+
22+
23+
# How often to send a heartbeat to Temporal (seconds).
24+
HEARTBEAT_INTERVAL = 60
25+
26+
# If no SDK events arrive for this long, stop heartbeating and let Temporal
27+
# kill the activity. This prevents a truly hung agent from blocking the
28+
# full start_to_close_timeout (30 min).
29+
MAX_IDLE_SECONDS = 15 * 60 # 15 minutes
30+
31+
32+
@activity.defn
33+
async def execute_agent_activity(input_data: AgentInput) -> AgentOutput:
34+
"""
35+
Execute a Claude agent via the Claude Agent SDK and collect results.
36+
37+
The Claude Agent SDK manages the agentic loop internally — tool selection,
38+
execution, and multi-turn conversation are all handled by the SDK. This
39+
activity simply streams events from the SDK and collects the final response.
40+
41+
Heartbeat pattern:
42+
A background asyncio task sends heartbeats every 60 seconds, independent
43+
of the SDK event stream. This is critical because the SDK may execute
44+
long-running tools (e.g. git clone, large file reads) that emit no events
45+
for extended periods. Without background heartbeats, Temporal would kill
46+
the activity for missing its heartbeat_timeout.
47+
48+
Staleness guard:
49+
If no SDK events arrive for MAX_IDLE_SECONDS (15 min), the heartbeat
50+
loop exits. Temporal's heartbeat_timeout (10 min) then fires ~25 min
51+
after the last event, killing a truly hung agent instead of letting it
52+
block the full 30-minute start_to_close_timeout.
53+
54+
Response deduplication:
55+
The Claude Agent SDK emits both StreamEvent (incremental text chunks)
56+
and AssistantMessage (complete text blocks). Both contain the same text,
57+
so we only accumulate from AssistantMessage to avoid duplication.
58+
"""
59+
# Lazy import — avoids loading the SDK at module level, which keeps the
60+
# Temporal worker startup fast and avoids sandbox issues.
61+
from claude_agent_sdk import query
62+
from claude_agent_sdk.types import ClaudeAgentOptions, AssistantMessage, ResultMessage
63+
64+
activity.logger.info(
65+
f"Starting agent execution: prompt_length={len(input_data.prompt)}, "
66+
f"model={input_data.model}"
67+
)
68+
69+
start_time = datetime.now(timezone.utc)
70+
71+
try:
72+
# Build SDK options
73+
#
74+
# NOTE: The SDK merges os.environ with options.env ({**os.environ, **env}).
75+
# If the worker runs inside Claude Code, CLAUDECODE will be set, and the
76+
# bundled CLI binary refuses to launch ("cannot nest Claude Code sessions").
77+
# We override it to empty string so the subprocess doesn't see it.
78+
options = ClaudeAgentOptions(
79+
model=input_data.model,
80+
max_turns=input_data.max_turns,
81+
permission_mode=input_data.permission_mode,
82+
env={"CLAUDECODE": ""},
83+
)
84+
if input_data.system_prompt:
85+
options.system_prompt = input_data.system_prompt
86+
87+
# Heartbeat state shared between the event loop and the background task.
88+
heartbeat_state = {
89+
"event_count": 0,
90+
"last_event_time": time.time(),
91+
"done": False,
92+
}
93+
94+
async def _heartbeat_loop():
95+
"""
96+
Background task that sends Temporal heartbeats at a fixed interval.
97+
98+
This runs independently of the SDK event stream so that heartbeats
99+
continue even when the SDK is executing a long-running tool that
100+
produces no events.
101+
"""
102+
while not heartbeat_state["done"]:
103+
await asyncio.sleep(HEARTBEAT_INTERVAL)
104+
if heartbeat_state["done"]:
105+
break
106+
107+
idle_seconds = time.time() - heartbeat_state["last_event_time"]
108+
109+
# Staleness guard: if no events for too long, the agent may be
110+
# stuck. Stop heartbeating and let Temporal's heartbeat_timeout
111+
# kill the activity.
112+
if idle_seconds > MAX_IDLE_SECONDS:
113+
activity.logger.warning(
114+
f"No events for {idle_seconds:.0f}s — stopping heartbeat "
115+
f"(agent may be stuck)"
116+
)
117+
break
118+
119+
activity.heartbeat(
120+
f"events={heartbeat_state['event_count']}, "
121+
f"idle={idle_seconds:.0f}s"
122+
)
123+
124+
heartbeat_task = asyncio.create_task(_heartbeat_loop())
125+
126+
# Collect response and events
127+
response_text = ""
128+
total_tokens = 0
129+
event_count = 0
130+
131+
try:
132+
async for event in query(
133+
prompt=input_data.prompt,
134+
options=options,
135+
):
136+
event_count += 1
137+
heartbeat_state["event_count"] = event_count
138+
heartbeat_state["last_event_time"] = time.time()
139+
140+
# Response deduplication: The SDK emits both StreamEvent
141+
# (incremental chunks) and AssistantMessage (complete blocks).
142+
# Both contain the same text, so we ONLY accumulate from
143+
# AssistantMessage to avoid duplicating the response.
144+
if isinstance(event, AssistantMessage):
145+
# AssistantMessage.content is a list of content blocks
146+
for block in event.content:
147+
if hasattr(block, "text"):
148+
response_text += block.text
149+
150+
# Capture token usage from the final result event
151+
if isinstance(event, ResultMessage):
152+
total_tokens = getattr(event, "total_tokens", 0) or 0
153+
154+
finally:
155+
# Always clean up the heartbeat task
156+
heartbeat_state["done"] = True
157+
heartbeat_task.cancel()
158+
try:
159+
await heartbeat_task
160+
except asyncio.CancelledError:
161+
pass
162+
163+
end_time = datetime.now(timezone.utc)
164+
processing_time = (end_time - start_time).total_seconds()
165+
166+
activity.logger.info(
167+
f"Agent execution completed: events={event_count}, "
168+
f"response_length={len(response_text)}, "
169+
f"processing_time={processing_time:.2f}s"
170+
)
171+
172+
return AgentOutput(
173+
status="success",
174+
response=response_text,
175+
total_tokens=total_tokens,
176+
num_events=event_count,
177+
processing_time_seconds=processing_time,
178+
)
179+
180+
except Exception as e:
181+
activity.logger.error(f"Agent execution failed: {e}", exc_info=True)
182+
183+
end_time = datetime.now(timezone.utc)
184+
processing_time = (end_time - start_time).total_seconds()
185+
186+
return AgentOutput(
187+
status="error",
188+
response="",
189+
error_message=str(e),
190+
processing_time_seconds=processing_time,
191+
)
192+
193+
194+
@activity.defn
195+
async def log_result_activity(output: AgentOutput) -> None:
196+
"""
197+
Log the agent execution result.
198+
199+
In a production system, this would persist results to a database.
200+
Here we keep it simple for the cookbook example.
201+
"""
202+
if output.status == "success":
203+
activity.logger.info(
204+
f"Agent succeeded: {len(output.response)} chars, "
205+
f"{output.total_tokens} tokens, "
206+
f"{output.processing_time_seconds:.2f}s"
207+
)
208+
else:
209+
activity.logger.error(f"Agent failed: {output.error_message}")
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
"""
2+
Pydantic Models for Agent Execution
3+
4+
Defines the input/output contract between the workflow and activity.
5+
"""
6+
7+
from typing import Literal, Optional
8+
from pydantic import BaseModel, Field
9+
10+
11+
class AgentInput(BaseModel):
12+
"""Input for the agent execution activity."""
13+
14+
prompt: str = Field(..., description="User message to send to the agent")
15+
model: str = Field(
16+
default="claude-sonnet-4-5-20250929",
17+
description="Claude model to use",
18+
)
19+
system_prompt: Optional[str] = Field(
20+
default=None,
21+
description="Optional system prompt for the agent",
22+
)
23+
max_turns: int = Field(
24+
default=30,
25+
description="Maximum number of agentic turns (tool call rounds)",
26+
)
27+
permission_mode: str = Field(
28+
default="bypassPermissions",
29+
description="Claude Code permission mode (e.g. 'bypassPermissions', 'default')",
30+
)
31+
32+
33+
class AgentOutput(BaseModel):
34+
"""Output from the agent execution activity."""
35+
36+
status: Literal["success", "error"] = Field(
37+
..., description="Overall execution status"
38+
)
39+
response: str = Field(default="", description="Final assistant response text")
40+
total_tokens: int = Field(default=0, description="Total tokens used")
41+
num_events: int = Field(default=0, description="Number of SDK events processed")
42+
processing_time_seconds: Optional[float] = Field(
43+
default=None, description="Wall-clock time in seconds"
44+
)
45+
error_message: Optional[str] = Field(
46+
default=None, description="Error details if status is 'error'"
47+
)
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
[project]
2+
name = "cookbook-claude-agent-sdk-python"
3+
version = "0.1"
4+
description = "Durable agent execution using Claude Agent SDK with Temporal"
5+
authors = [{ name = "Temporal Technologies Inc", email = "sdk@temporal.io" }]
6+
requires-python = ">=3.10"
7+
readme = "README.md"
8+
license = "MIT"
9+
dependencies = [
10+
"temporalio>=1.15.0,<2",
11+
"claude-agent-sdk==0.1.6",
12+
"pydantic>=2.0.0",
13+
]
14+
15+
[project.optional-dependencies]
16+
test = [
17+
"pytest>=7.0",
18+
"pytest-asyncio>=0.21",
19+
]
20+
21+
[tool.pytest.ini_options]
22+
asyncio_mode = "strict"
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
"""
2+
Start an Agent Execution Workflow
3+
4+
Submits a prompt to the Claude Agent SDK via Temporal and prints the result.
5+
6+
Usage:
7+
uv run python -m start_workflow "explain how binary search works"
8+
uv run python -m start_workflow "what files are in the current directory?"
9+
"""
10+
11+
import asyncio
12+
import sys
13+
import uuid
14+
15+
from temporalio.client import Client
16+
from temporalio.contrib.pydantic import pydantic_data_converter
17+
18+
from workflows.agent import AgentExecutionWorkflow
19+
from models import AgentInput
20+
21+
22+
async def main():
23+
client = await Client.connect(
24+
"localhost:7233",
25+
data_converter=pydantic_data_converter,
26+
)
27+
28+
prompt = sys.argv[1] if len(sys.argv) > 1 else "Tell me about recursion"
29+
30+
input_data = AgentInput(prompt=prompt)
31+
32+
result = await client.execute_workflow(
33+
AgentExecutionWorkflow.run,
34+
input_data,
35+
id=f"claude-agent-sdk-{uuid.uuid4()}",
36+
task_queue="claude-agent-sdk-task-queue",
37+
)
38+
39+
print(f"\nStatus: {result.status}")
40+
print(f"Tokens: {result.total_tokens}")
41+
print(f"Events: {result.num_events}")
42+
print(f"Time: {result.processing_time_seconds:.2f}s")
43+
print(f"\nResponse:\n{result.response}")
44+
45+
46+
if __name__ == "__main__":
47+
asyncio.run(main())

agents/claude_agent_sdk_python/tests/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)