Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions claude_agent_sdk/07_Hosting_the_agent.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@
}
],
"source": [
"from research_agent.agent import RESEARCH_SYSTEM_PROMPT, DEFAULT_MODEL\n",
"from research_agent.agent import DEFAULT_MODEL, RESEARCH_SYSTEM_PROMPT\n",
"\n",
"print(f\"model: {DEFAULT_MODEL}\")\n",
"print(RESEARCH_SYSTEM_PROMPT)"
]
Expand Down Expand Up @@ -928,8 +929,8 @@
},
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"output_type": "stream",
"text": [
"event: message\n",
"data: {\"subtype\": \"init\", \"data\": {\"type\": \"system\", \"subtype\": \"init\", \"cwd\": \"/app\", \"session_id\": \"77597263-a169-4236-b3d4-8ec14f90fd2b\", \"tools\": [\"Task\", \"TaskOutput\", \"Bash\", \"Glob\", \"Grep\", \"ExitPlanMode\", \"Read\", \"Edit\", \"Write\", \"N … [truncated]\n",
Expand Down Expand Up @@ -979,8 +980,8 @@
},
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"output_type": "stream",
"text": [
"event: message\n",
"data: {\"subtype\": \"init\", \"data\": {\"type\": \"system\", \"subtype\": \"init\", \"cwd\": \"/app\", \"session_id\": \"77597263-a169-4236-b3d4-8ec14f90fd2b\", \"tools\": [\"Task\", \"TaskOutput\", \"Bash\", \"Glob\", \"Grep\", \"ExitPlanMode\", \"Read\", \"Edit\", \"Write\", \"N … [truncated]\n",
Expand Down Expand Up @@ -1924,8 +1925,8 @@
},
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"output_type": "stream",
"text": [
"event: message\n",
"data: {\"subtype\": \"init\", \"data\": {\"type\": \"system\", \"subtype\": \"init\", \"cwd\": \"/app\", \"session_id\": \"1566ffe4-2b20-4a68-82ff-283984b64451\", \"tools\": [\"Task\", \"TaskOutput\", \"Bash\", \"Glob\", \"Grep\", \"ExitPlanMode\", \"Read\", \"Edit\", \"Write\", \"N … [truncated]\n",
Expand Down
27 changes: 13 additions & 14 deletions claude_agent_sdk/hosting/kubernetes/gateway/k8s.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@
_k8s_available = False

# AGENT_IMAGE is the container image for agent pods (e.g. "us-docker.pkg.dev/…/agent:latest").
# Required when K8s is available; ignored otherwise.
# Required when K8s is available; ignored otherwise. Validated at startup
# (initialize_standby_pool), not import time, so a misconfigured gateway fails
# its readiness probe instead of crash-looping on import.
AGENT_IMAGE = os.environ.get("AGENT_IMAGE", "")
if _k8s_available and not AGENT_IMAGE:
raise RuntimeError("AGENT_IMAGE env var is required when running inside Kubernetes")

# The namespace where agent pods are created. Namespaces are K8s's way of
# partitioning resources — like folders for your cluster objects.
Expand All @@ -65,6 +65,7 @@
# Pod manifest builder
# ---------------------------------------------------------------------------


def _build_pod_manifest(
*,
pod_name: str,
Expand Down Expand Up @@ -213,7 +214,7 @@ def _build_pod_manifest(
client.V1Volume(
name="egress-proxy-tls",
secret=client.V1SecretVolumeSource(
secret_name="egress-proxy-tls",
secret_name="egress-proxy-tls", # noqa: S106 — k8s Secret name, not a credential
),
),
],
Expand All @@ -233,6 +234,7 @@ def _build_pod_manifest(
# Standby pool management
# ---------------------------------------------------------------------------


async def _create_standby_pod() -> str:
"""Create a single standby pod and return its name.

Expand Down Expand Up @@ -348,8 +350,7 @@ async def _count_standby_pods() -> int:
return sum(
1
for p in pods.items
if p.status.phase in ("Pending", "Running")
and p.metadata.deletion_timestamp is None
if p.status.phase in ("Pending", "Running") and p.metadata.deletion_timestamp is None
)


Expand Down Expand Up @@ -397,9 +398,7 @@ async def _claim_standby_pod(session_id: str) -> str | None:
body=body,
)
pod_ip = patched.status.pod_ip
logger.info(
f"Claimed standby pod {pod_name} for session {session_id} at {pod_ip}"
)
logger.info(f"Claimed standby pod {pod_name} for session {session_id} at {pod_ip}")
return pod_ip
except kubernetes.client.exceptions.ApiException as e:
# Another gateway instance may have claimed this pod first,
Expand Down Expand Up @@ -433,8 +432,7 @@ async def _replenish_pool() -> None:
return

logger.info(
f"Replenishing standby pool: {current_count}/{STANDBY_POOL_SIZE}, "
"creating 1 pod"
f"Replenishing standby pool: {current_count}/{STANDBY_POOL_SIZE}, creating 1 pod"
)
pod_name = None
try:
Expand Down Expand Up @@ -474,6 +472,7 @@ def _schedule_replenish() -> None:
# Public API
# ---------------------------------------------------------------------------


async def initialize_standby_pool() -> None:
"""Pre-warm the standby pool on gateway startup.

Expand All @@ -483,6 +482,8 @@ async def initialize_standby_pool() -> None:
if not _k8s_available:
logger.info("K8s unavailable — skipping standby pool initialization")
return
if not AGENT_IMAGE:
raise RuntimeError("AGENT_IMAGE env var is required when running inside Kubernetes")
logger.info(f"Initializing standby pool with {STANDBY_POOL_SIZE} pods")
await _replenish_pool()

Expand All @@ -504,9 +505,7 @@ async def create_agent_pod(session_id: str) -> str:
return pod_ip

# No standby pods available — fall back to creating one on-demand
logger.warning(
f"No standby pods available for session {session_id}, creating on-demand"
)
logger.warning(f"No standby pods available for session {session_id}, creating on-demand")
pod_name = f"agent-session-{session_id}"
labels = {
"app": "claude-agent",
Expand Down
17 changes: 12 additions & 5 deletions claude_agent_sdk/hosting/kubernetes/gateway/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,16 @@
# acceptable only for local poking, never for anything reachable by others.
# A pair with an empty token (":alice") is dropped rather than mapped — an
# empty key would make the empty Authorization header a valid credential.
_TOKEN_TO_TENANT: dict[str, str] = dict(
pair.split(":", 1)
for pair in os.getenv("GATEWAY_TENANTS", "").split(",")
if ":" in pair and not pair.startswith(":")
)
_TOKEN_TO_TENANT: dict[str, str] = {}
for _pair in os.getenv("GATEWAY_TENANTS", "").split(","):
_parts = _pair.split(":", 1)
if len(_parts) != 2:
continue
_token, _tenant = _parts[0].strip(), _parts[1].strip()
if not _token:
continue
_TOKEN_TO_TENANT[_token] = _tenant
logger.info("gateway: loaded %d tenant token(s)", len(_TOKEN_TO_TENANT))

# Session IDs must match this pattern to prevent path traversal / label abuse.
_SESSION_ID_RE = re.compile(r"^[A-Za-z0-9][A-Za-z0-9_-]{0,63}$")
Expand Down Expand Up @@ -106,6 +111,8 @@ async def lifespan(_app: FastAPI):
app = FastAPI(title="Claude Agent Gateway (k8s)", lifespan=lifespan)


# Only enforces the cap for requests with Content-Length; chunked bodies
# pass through. In production put a real body-size limit on the gateway/LB.
@app.middleware("http")
async def _limit_body_size(request: Request, call_next):
length = request.headers.get("content-length")
Expand Down
1 change: 1 addition & 0 deletions claude_agent_sdk/hosting/kubernetes/gateway/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ async def relay_sse(
payloads and the blank-line event delimiter exactly — no re-framing.
"""
url = f"http://{pod_ip}:{AGENT_PORT}/sessions/{session_id}/messages"
# Not async with on purpose — the client must outlive send(..., stream=True).
client = httpx.AsyncClient(timeout=_TIMEOUT)
try:
req = client.build_request("POST", url, json=body)
Expand Down
14 changes: 11 additions & 3 deletions claude_agent_sdk/hosting/kubernetes/kind-quickstart.sh
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ EOF
kubectl wait --for=condition=ready node --all --timeout=120s
else
info "kind cluster '$CLUSTER' already exists — reusing"
REUSING_CLUSTER=1
fi

# 2. Images ─────────────────────────────────────────────────────────────────
Expand Down Expand Up @@ -94,9 +95,8 @@ kubectl -n claude-agent create secret generic anthropic-api-key \

# Two demo tenants so you can watch the gateway enforce session ownership:
# alice can read/continue/delete her sessions, bob gets a 403 on them.
# Note: the gateway reads this at pod start — if you re-run this script
# against an existing cluster, `kubectl -n claude-agent rollout restart
# deploy/gateway` to pick up the regenerated tokens.
# The gateway reads this secret at pod start, so on a re-run we restart the
# deployment below once the manifests are applied.
ALICE_TOKEN=$(openssl rand -hex 16)
BOB_TOKEN=$(openssl rand -hex 16)
kubectl -n claude-agent create secret generic gateway-tenants \
Expand All @@ -121,6 +121,14 @@ for f in manifests/*.yaml; do
sed "s|REGISTRY_URL|${REG}|g" "$f" | kubectl apply -f -
done

# On a re-run the gateway pod predates the freshly regenerated GATEWAY_TENANTS
# secret — restart it so the new tokens take effect.
if [[ -n "${REUSING_CLUSTER:-}" ]]; then
info "Restarting gateway to pick up regenerated tenant tokens..."
kubectl -n claude-agent rollout restart deploy/gateway
kubectl -n claude-agent rollout status deploy/gateway --timeout=300s
fi

# 6. Wait ───────────────────────────────────────────────────────────────────
info "Waiting for gateway to be ready (this can take a minute on first run)..."
kubectl -n claude-agent wait --for=condition=available deploy/redis --timeout=180s
Expand Down
2 changes: 2 additions & 0 deletions claude_agent_sdk/hosting/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# Hard-pinned so the cookbook doesn't silently break on a future release.
# Bump deliberately when you've re-run the notebook end-to-end.
# The npm @anthropic-ai/claude-code pin in the Dockerfile is part of the same
# pin set — bump together.
claude-agent-sdk==0.1.50
fastapi==0.118.3
uvicorn[standard]==0.37.0
Expand Down
15 changes: 10 additions & 5 deletions claude_agent_sdk/hosting/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,22 @@
import os
import re
import secrets
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from dataclasses import asdict, is_dataclass
from pathlib import Path
from typing import Any, AsyncIterator
from typing import Any

from fastapi import Depends, FastAPI, Header, HTTPException, Request
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from sse_starlette.sse import EventSourceResponse, ServerSentEvent

from claude_agent_sdk import ClaudeAgentOptions, ResultMessage, query

# The agent we built in notebook 00. We import its system prompt so this server
# deploys *that* agent; the tool list and buffer size below mirror send_query().
from research_agent.agent import RESEARCH_SYSTEM_PROMPT
from sse_starlette.sse import EventSourceResponse, ServerSentEvent

from claude_agent_sdk import ClaudeAgentOptions, ResultMessage, query

# Same shape the k8s gateway enforces: must start alphanumeric so a session_id
# can never begin with "-" or "_" (keeps it safe as a filename or k8s label).
Expand Down Expand Up @@ -86,6 +87,8 @@ async def _lifespan(_app: FastAPI) -> AsyncIterator[None]:
app = FastAPI(title="Research Agent (Claude Agent SDK)", lifespan=_lifespan)


# Only enforces the cap for requests with Content-Length; chunked bodies
# pass through. In production put a real body-size limit on the gateway/LB.
@app.middleware("http")
async def _limit_body_size(request: Request, call_next):
length = request.headers.get("content-length")
Expand Down Expand Up @@ -161,7 +164,9 @@ async def _remember(external_id: str, sdk_session_id: str) -> None:
# Note: two concurrent *first* turns on the same external_id would each
# start a fresh SDK session and last-write-wins here. Fine for this
# cookbook's single-caller-per-session shape; a production server would
# lock around the read-create-write span, not just the write.
# lock around the read-create-write span, not just the write. The
# Kubernetes tier avoids this race with Redis SET NX — see
# kubernetes/gateway/k8s.py.
async with _map_lock:
if _session_map.get(external_id) == sdk_session_id:
return
Expand Down
Loading