From cb7cb96adf2196d317cc5c1617bf95c624d4eb4a Mon Sep 17 00:00:00 2001 From: Kyle Verhoog Date: Tue, 17 Mar 2026 08:16:06 -0400 Subject: [PATCH] feat: integrate claude_sessions and task_segmentation into agent Wire up SessionCache to scan ~/.claude/projects/ JSONL files and inject session events into the LLMObs span pool. Add dedicated /sessions endpoint and task segmentation API routes. Session events include slug, cwd, project, num_turns, and end_ns for sorting by recency. Co-Authored-By: Claude Opus 4.6 (1M context) --- ddapm_test_agent/agent.py | 4 + ddapm_test_agent/claude_sessions.py | 345 +++++++++++++++++++++ ddapm_test_agent/llmobs_event_platform.py | 21 ++ ddapm_test_agent/task_segmentation.py | 348 ++++++++++++++++++++++ 4 files changed, 718 insertions(+) create mode 100644 ddapm_test_agent/claude_sessions.py create mode 100644 ddapm_test_agent/task_segmentation.py diff --git a/ddapm_test_agent/agent.py b/ddapm_test_agent/agent.py index bf86cbf6..1d7e2acb 100644 --- a/ddapm_test_agent/agent.py +++ b/ddapm_test_agent/agent.py @@ -59,6 +59,7 @@ from .claude_hooks import write_claude_code_hooks from .claude_link_tracker import ClaudeLinkTracker from .claude_proxy import ClaudeProxyAPI +from .task_segmentation import TaskSegmentationAPI from .integration import Integration from .llmobs_event_platform import LLMObsEventPlatformAPI from .logs import LOGS_ENDPOINT @@ -1991,6 +1992,9 @@ def make_app( claude_proxy_api = ClaudeProxyAPI(hooks_api=claude_hooks_api, link_tracker=claude_link_tracker) app.add_routes(claude_proxy_api.get_routes()) + task_seg_api = TaskSegmentationAPI(llmobs_event_platform_api) + app.add_routes(task_seg_api.get_routes()) + async def _cleanup_claude_proxy(app: web.Application) -> None: await claude_proxy_api.close() diff --git a/ddapm_test_agent/claude_sessions.py b/ddapm_test_agent/claude_sessions.py new file mode 100644 index 00000000..84a224b7 --- /dev/null +++ b/ddapm_test_agent/claude_sessions.py @@ -0,0 +1,345 @@ +"""Claude Code Session Scanner. + +Reads Claude Code session files from ~/.claude/projects/ and converts them +to Event Platform events so they can be displayed in the Sessions view. +""" + +from datetime import datetime +import json +import logging +import os +import time +from typing import Any +from typing import Dict +from typing import List +from typing import Optional +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from .claude_hooks import ClaudeHooksAPI + +log = logging.getLogger(__name__) + +# Only read first/last N lines of each JSONL file for performance +_HEAD_LINES = 20 +_TAIL_LINES = 20 + + +def _read_head_tail(filepath: str, head: int = _HEAD_LINES, tail: int = _TAIL_LINES) -> List[str]: + """Read the first `head` and last `tail` lines of a file efficiently.""" + lines: List[str] = [] + try: + with open(filepath, "r") as f: + all_lines = f.readlines() + except Exception: + return [] + if len(all_lines) <= head + tail: + return [line.rstrip("\n") for line in all_lines] + head_lines = [line.rstrip("\n") for line in all_lines[:head]] + tail_lines = [line.rstrip("\n") for line in all_lines[-tail:]] + lines = head_lines + tail_lines + return lines + + +def _parse_jsonl_lines(lines: List[str]) -> List[Dict[str, Any]]: + """Parse JSONL lines into dicts, skipping invalid ones.""" + entries: List[Dict[str, Any]] = [] + for line in lines: + line = line.strip() + if not line: + continue + try: + entries.append(json.loads(line)) + except json.JSONDecodeError: + continue + return entries + + +def _extract_first_user_message(entries: List[Dict[str, Any]]) -> str: + """Extract the first user message text from parsed JSONL entries.""" + for entry in entries: + if entry.get("type") != "user": + continue + content = entry.get("message", {}).get("content", "") + if isinstance(content, str): + return content + if isinstance(content, list): + for block in content: + if isinstance(block, dict) and block.get("type") == "text": + return block.get("text", "") + if isinstance(block, str): + return block + return "" + + +def _count_user_turns(filepath: str) -> int: + """Count lines with type=user in the entire file (lightweight line scan).""" + count = 0 + try: + with open(filepath, "r") as f: + for line in f: + if '"type":"user"' in line or '"type": "user"' in line: + # Quick check - also verify it's not a tool_result + if '"tool_result"' not in line: + count += 1 + except Exception: + pass + return count + + +def _dir_name_to_project_path(dir_name: str) -> str: + """Convert directory name like -Users-kyle-dd to /Users/kyle/dd.""" + if dir_name.startswith("-"): + return "/" + dir_name[1:].replace("-", "/") + return dir_name.replace("-", "/") + + +def scan_all_sessions(claude_dir: str = "~/.claude") -> List[Dict[str, Any]]: + """Scan all Claude Code session files and return session summaries. + + Walks ~/.claude/projects/*/ for *.jsonl files (top-level only). + """ + claude_dir = os.path.expanduser(claude_dir) + projects_dir = os.path.join(claude_dir, "projects") + if not os.path.isdir(projects_dir): + return [] + + sessions: List[Dict[str, Any]] = [] + + try: + project_dirs = os.listdir(projects_dir) + except OSError: + return [] + + for project_name in project_dirs: + project_path = os.path.join(projects_dir, project_name) + if not os.path.isdir(project_path): + continue + + try: + files = os.listdir(project_path) + except OSError: + continue + + for filename in files: + if not filename.endswith(".jsonl"): + continue + filepath = os.path.join(project_path, filename) + if not os.path.isfile(filepath): + continue + + session_id = filename[:-6] # strip .jsonl + + try: + session = _parse_session_file(filepath, session_id, project_name) + if session: + sessions.append(session) + except Exception as e: + log.debug("Failed to parse session file %s: %s", filepath, e) + + return sessions + + +def _parse_session_file(filepath: str, session_id: str, project_name: str) -> Optional[Dict[str, Any]]: + """Parse a single session JSONL file and return a session summary dict.""" + lines = _read_head_tail(filepath) + if not lines: + return None + + entries = _parse_jsonl_lines(lines) + if not entries: + return None + + # Extract metadata from entries + first_input = "" + start_timestamp = "" + end_timestamp = "" + model = "" + cwd = "" + git_branch = "" + version = "" + slug = "" + has_error = False + conversation_title = "" + + for entry in entries: + entry_type = entry.get("type", "") + + # Get session metadata from any entry that has it + if not start_timestamp and entry.get("timestamp"): + start_timestamp = entry["timestamp"] + if not cwd and entry.get("cwd"): + cwd = entry["cwd"] + if not git_branch and entry.get("gitBranch"): + git_branch = entry["gitBranch"] + if not version and entry.get("version"): + version = entry["version"] + if not slug and entry.get("slug"): + slug = entry["slug"] + + # Check for sessionId match + entry_session_id = entry.get("sessionId", "") + if entry_session_id and entry_session_id != session_id: + # This is a subagent file or mismatched - skip + pass + + # Extract model from assistant messages + if entry_type == "assistant": + msg_model = entry.get("message", {}).get("model", "") + if msg_model: + model = msg_model + + # Track last timestamp + if entry.get("timestamp"): + end_timestamp = entry["timestamp"] + + # Extract first user message + first_input = _extract_first_user_message(entries) + if not first_input: + return None # Skip sessions with no user input + + # Count turns + num_turns = _count_user_turns(filepath) + + # Compute duration from timestamps + start_ms = 0 + end_ms = 0 + if start_timestamp: + try: + dt = datetime.fromisoformat(start_timestamp.replace("Z", "+00:00")) + start_ms = int(dt.timestamp() * 1000) + except (ValueError, TypeError): + pass + if end_timestamp: + try: + dt = datetime.fromisoformat(end_timestamp.replace("Z", "+00:00")) + end_ms = int(dt.timestamp() * 1000) + except (ValueError, TypeError): + pass + + duration_ns = (end_ms - start_ms) * 1_000_000 if end_ms > start_ms else 0 + + project = _dir_name_to_project_path(project_name) + + return { + "session_id": session_id, + "first_input": first_input, + "status": "ok", + "start_ms": start_ms, + "end_ms": end_ms, + "duration_ns": duration_ns, + "model": model, + "num_turns": num_turns, + "project": project, + "cwd": cwd, + "git_branch": git_branch, + "version": version, + "slug": slug, + "has_error": has_error, + "conversation_title": conversation_title, + } + + +def enrich_with_hook_data(sessions: List[Dict[str, Any]], hooks_api: "ClaudeHooksAPI") -> List[Dict[str, Any]]: + """Enrich session data with live state from the hooks API.""" + for session in sessions: + sid = session["session_id"] + hook_state = hooks_api._sessions.get(sid) + if hook_state: + if not hook_state.root_span_emitted: + session["status"] = "active" + # Check for error spans in this session + for span in hooks_api._assembled_spans: + if span.get("session_id") == sid and span.get("status") == "error": + session["has_error"] = True + if session["status"] != "active": + session["status"] = "error" + break + # Use conversation title if available + if hook_state.conversation_title: + session["conversation_title"] = hook_state.conversation_title + return sessions + + +def sessions_to_event_platform_events(sessions: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """Convert session summaries to Event Platform event format. + + Matches the shape produced by build_event_platform_list_response. + """ + spans: List[Dict[str, Any]] = [] + for session in sessions: + session_id = session["session_id"] + first_input = session.get("first_input", "") + status = session.get("status", "ok") + start_ms = session.get("start_ms", 0) + end_ms = session.get("end_ms", 0) + duration_ns = session.get("duration_ns", 0) + model = session.get("model", "") + num_turns = session.get("num_turns", 0) + project = session.get("project", "") + cwd = session.get("cwd", "") + slug = session.get("slug", "") + name = session.get("conversation_title") or (first_input[:80] + "..." if len(first_input) > 80 else first_input) + + # Build a span-like dict that build_event_platform_list_response can consume + span: Dict[str, Any] = { + "span_id": session_id, + "trace_id": session_id, + "session_id": session_id, + "parent_id": "undefined", + "name": name, + "status": status, + "start_ns": start_ms * 1_000_000, + "end_ns": end_ms * 1_000_000, + "duration": duration_ns, + "ml_app": "claude-code", + "service": "claude-code", + "env": "local", + "tags": [ + "ml_app:claude-code", + f"session_id:{session_id}", + "service:claude-code", + "env:local", + "source:claude-code-sessions", + f"project:{project}", + f"cwd:{cwd}", + f"slug:{slug}", + ], + "meta": { + "span": {"kind": "session"}, + "input": {"value": first_input}, + "output": {}, + "model_name": model, + "metadata": { + "project": project, + "cwd": cwd, + "slug": slug, + }, + }, + "metrics": { + "num_turns": num_turns, + }, + "_event_type": "session", + } + spans.append(span) + + return spans + + +class SessionCache: + """Cache for session scan results with TTL.""" + + def __init__(self, ttl_seconds: float = 5.0) -> None: + self._ttl = ttl_seconds + self._cached_sessions: List[Dict[str, Any]] = [] + self._last_scan: float = 0.0 + self._claude_dir: str = "~/.claude" + + def get_sessions(self, claude_dir: str = "~/.claude") -> List[Dict[str, Any]]: + """Get sessions, rescanning if cache is stale.""" + now = time.time() + if now - self._last_scan > self._ttl or claude_dir != self._claude_dir: + self._claude_dir = claude_dir + self._cached_sessions = scan_all_sessions(claude_dir) + self._last_scan = now + return self._cached_sessions diff --git a/ddapm_test_agent/llmobs_event_platform.py b/ddapm_test_agent/llmobs_event_platform.py index 11d23117..6b3f6d6b 100644 --- a/ddapm_test_agent/llmobs_event_platform.py +++ b/ddapm_test_agent/llmobs_event_platform.py @@ -20,6 +20,9 @@ import msgpack from . import llmobs_query_parser +from .claude_sessions import SessionCache +from .claude_sessions import enrich_with_hook_data +from .claude_sessions import sessions_to_event_platform_events if TYPE_CHECKING: from .agent import Agent @@ -654,6 +657,7 @@ def __init__(self, agent: "Agent"): self._query_results: Dict[str, Dict[str, Any]] = {} self.decoded_llmobs_span_events: Dict[int, List[Dict[str, Any]]] = {} self._claude_hooks_api: Optional["ClaudeHooksAPI"] = None + self._session_cache = SessionCache() def set_claude_hooks_api(self, api: "ClaudeHooksAPI") -> None: """Wire up the Claude hooks API so its spans appear in LLMObs queries.""" @@ -683,6 +687,12 @@ def get_llmobs_spans(self, token: Optional[str] = None) -> List[Dict[str, Any]]: if self._claude_hooks_api: all_spans.extend(self._claude_hooks_api._assembled_spans) + # Inject session-scanned spans from Claude Code session files + session_spans = self._session_cache.get_sessions() + if self._claude_hooks_api: + session_spans = enrich_with_hook_data(session_spans, self._claude_hooks_api) + all_spans.extend(sessions_to_event_platform_events(session_spans)) + all_spans.sort(key=lambda s: s.get("start_ns", 0), reverse=True) return all_spans @@ -1123,6 +1133,15 @@ async def handle_query_scalar(self, request: Request) -> web.Response: } ) + async def handle_sessions_list(self, request: Request) -> web.Response: + """Handle GET /api/ui/llm-obs/v1/sessions — return session-level events only.""" + session_spans = self._session_cache.get_sessions() + if self._claude_hooks_api: + session_spans = enrich_with_hook_data(session_spans, self._claude_hooks_api) + events = sessions_to_event_platform_events(session_spans) + events.sort(key=lambda s: s.get("start_ns", 0), reverse=True) + return web.json_response({"sessions": events, "total": len(events)}) + def get_routes(self) -> List[web.RouteDef]: """Return the routes for this API (all handlers wrapped with CORS support).""" return [ @@ -1148,4 +1167,6 @@ def get_routes(self) -> List[web.RouteDef]: web.route("*", "/api/ui/llm-obs/v1/trace/{trace_id}", with_cors(self.handle_trace)), # Query scalar endpoint web.route("*", "/api/ui/query/scalar", with_cors(self.handle_query_scalar)), + # Sessions list endpoint + web.route("*", "/api/ui/llm-obs/v1/sessions", with_cors(self.handle_sessions_list)), ] diff --git a/ddapm_test_agent/task_segmentation.py b/ddapm_test_agent/task_segmentation.py new file mode 100644 index 00000000..9fad1d53 --- /dev/null +++ b/ddapm_test_agent/task_segmentation.py @@ -0,0 +1,348 @@ +"""Task segmentation for LLM Observability sessions. + +Takes session spans from the test agent, compresses them into a transcript, +and calls Claude Haiku to identify task boundaries and outcomes. + +Adapted from mrlee-plugins trajectory/src/analysis/task-segmentation/segment.py. +""" + +import json +import logging +import os +import time +from typing import Any +from typing import Dict +from typing import List +from typing import Optional +from typing import Tuple + +import aiohttp +from aiohttp import web +from aiohttp.web import Request + +from .llmobs_event_platform import LLMObsEventPlatformAPI +from .llmobs_event_platform import with_cors + + +log = logging.getLogger(__name__) + +ANTHROPIC_API_BASE = "https://api.anthropic.com" +MODEL = "claude-haiku-4-5-20251001" +MAX_PROMPT_CHARS = 12000 + +# --------------------------------------------------------------------------- +# Span compression (mirrors segment.py from mrlee-plugins) +# --------------------------------------------------------------------------- + + +def _get_input_preview(meta: Dict[str, Any]) -> str: + """Extract a short input preview from span meta.""" + inp = meta.get("input", {}) + if not inp: + return "" + messages = inp.get("messages", []) + if messages: + last = messages[-1] if isinstance(messages[-1], dict) else {} + return str(last.get("content", ""))[:300] + return str(inp.get("value", ""))[:300] + + +def _get_output_preview(meta: Dict[str, Any]) -> str: + """Extract a short output preview from span meta.""" + out = meta.get("output", {}) + if not out: + return "" + messages = out.get("messages", []) + if messages: + last = messages[-1] if isinstance(messages[-1], dict) else {} + return str(last.get("content", ""))[:200] + return str(out.get("value", ""))[:200] + + +def compress_span_to_turn(span: Dict[str, Any], turn_idx: int) -> Dict[str, Any]: + """Compress a root-level span into a turn-like structure for the LLM.""" + meta = span.get("meta", {}) + name = span.get("name", "") + status = span.get("status", "ok") + duration_ns = span.get("duration", 0) + span_kind = meta.get("span", {}).get("kind", "llm") + + turn: Dict[str, Any] = {"turn": turn_idx} + + # User input + user_input = _get_input_preview(meta) + if user_input: + turn["user"] = user_input + + # Agent response + output = _get_output_preview(meta) + if output: + turn["response"] = output + + # Metadata + turn["name"] = name + turn["kind"] = span_kind + turn["status"] = status + turn["duration_ms"] = round(duration_ns / 1_000_000, 1) if duration_ns else 0 + + return turn + + +def compress_session_spans(spans: List[Dict[str, Any]], session_id: str) -> Dict[str, Any]: + """Compress session spans into an LLM-friendly transcript.""" + # Sort by start time ascending + sorted_spans = sorted(spans, key=lambda s: s.get("start_ns", 0)) + + # Only include root spans (agent/workflow level, not child LLM calls) + root_spans = [s for s in sorted_spans if not s.get("parent_id") or s.get("parent_id") in ("0", "", "undefined")] + + # If no root spans, fall back to all spans + if not root_spans: + root_spans = sorted_spans + + turns = [] + for i, span in enumerate(root_spans): + turns.append(compress_span_to_turn(span, i)) + + return { + "session_id": session_id, + "total_turns": len(turns), + "turns": turns, + } + + +# --------------------------------------------------------------------------- +# Segmentation prompt (adapted from segment.py) +# --------------------------------------------------------------------------- + +SEGMENTATION_PROMPT = """\ +You are a task segmentation system. Given a compressed agent session transcript, +identify the distinct tasks the user worked on and their outcomes. + +## What is a task? + +A **task** is a coherent unit of user intent. It has a goal (what the user wants), +boundaries (which turns), and an outcome (did it work?). + +## Rules + +1. **Don't over-segment.** A multi-turn conversation about one topic is ONE task. + Only create a new task when the user's goal clearly changes. + +2. **Don't under-segment.** Sessions with 6+ turns almost always have 3+ tasks. + +3. **Turn boundaries must not overlap and must not have gaps.** Every turn should + belong to exactly one task. + +4. **Short sessions (1-2 turns) are usually 1 task.** + +## Outcome labels + +- **success**: Goal fully achieved. +- **mostly**: Goal substantially achieved with minor gaps (~80%+ done). +- **partial**: Some progress but significant work remains. +- **failure**: Attempted but did not achieve goal. +- **interrupted**: Task did not complete because session ended or user pivoted. + +## Session Transcript + +```json +{transcript} +``` +""" + +TASK_SCHEMA = { + "type": "object", + "properties": { + "tasks": { + "type": "array", + "items": { + "type": "object", + "properties": { + "task_id": { + "type": "integer", + "description": "Sequential task number starting at 1", + }, + "goal": { + "type": "string", + "description": "Short description of what the user wanted accomplished", + }, + "start_turn": { + "type": "integer", + "description": "First turn index (0-based)", + }, + "end_turn": { + "type": "integer", + "description": "Last turn index (0-based)", + }, + "outcome": { + "type": "string", + "enum": ["success", "mostly", "partial", "failure", "interrupted"], + }, + "outcome_score": { + "type": "number", + "description": "0.0 to 1.0 quality score", + }, + }, + "required": ["task_id", "goal", "start_turn", "end_turn", "outcome", "outcome_score"], + }, + } + }, + "required": ["tasks"], +} + + +def build_prompt(session_data: Dict[str, Any]) -> str: + """Build the segmentation prompt from compressed session data.""" + transcript_json = json.dumps(session_data, indent=2) + + # Truncate if too long + if len(transcript_json) > MAX_PROMPT_CHARS: + for turn in session_data.get("turns", []): + if "response" in turn: + turn["response"] = turn["response"][:100] + if "user" in turn: + turn["user"] = turn["user"][:150] + transcript_json = json.dumps(session_data, indent=2) + + return SEGMENTATION_PROMPT.replace("{transcript}", transcript_json) + + +# --------------------------------------------------------------------------- +# Anthropic API call +# --------------------------------------------------------------------------- + + +async def call_anthropic(prompt: str, api_key: str) -> Tuple[Dict[str, Any], float]: + """Call the Anthropic API with structured JSON output. + + Returns (result_dict, cost_usd). + """ + headers = { + "x-api-key": api_key, + "content-type": "application/json", + "anthropic-version": "2023-06-01", + } + + body = { + "model": MODEL, + "max_tokens": 4096, + "messages": [{"role": "user", "content": prompt}], + "tools": [ + { + "name": "output_tasks", + "description": "Output the task segmentation result", + "input_schema": TASK_SCHEMA, + } + ], + "tool_choice": {"type": "tool", "name": "output_tasks"}, + } + + t0 = time.time() + async with aiohttp.ClientSession() as session: + async with session.post( + f"{ANTHROPIC_API_BASE}/v1/messages", + headers=headers, + json=body, + ) as resp: + if resp.status != 200: + error_text = await resp.text() + log.error(f"Anthropic API error {resp.status}: {error_text}") + return {"tasks": []}, 0.0 + + data = await resp.json() + + duration_ms = int((time.time() - t0) * 1000) + log.info(f"[task_segmentation] Anthropic call took {duration_ms}ms") + + # Extract tool use result + for block in data.get("content", []): + if block.get("type") == "tool_use" and block.get("name") == "output_tasks": + result = block.get("input", {}) + # Estimate cost from usage + usage = data.get("usage", {}) + input_tokens = usage.get("input_tokens", 0) + output_tokens = usage.get("output_tokens", 0) + # Haiku pricing: $0.80/MTok input, $4/MTok output + cost = (input_tokens * 0.80 + output_tokens * 4.0) / 1_000_000 + return result, cost + + return {"tasks": []}, 0.0 + + +# --------------------------------------------------------------------------- +# API handler +# --------------------------------------------------------------------------- + + +class TaskSegmentationAPI: + """Handler for task segmentation endpoints.""" + + def __init__(self, llmobs_api: LLMObsEventPlatformAPI) -> None: + self.llmobs_api = llmobs_api + self._cache: Dict[str, Dict[str, Any]] = {} # session_id -> cached result + + async def handle_session_tasks(self, request: Request) -> web.Response: + """Handle GET /api/ui/llm-obs/v1/session/{session_id}/tasks endpoint.""" + try: + session_id = request.match_info.get("session_id", "") + if not session_id: + return web.json_response({"error": "session_id required"}, status=400) + + # Check for force refresh + force = request.query.get("force", "false").lower() == "true" + + # Get all spans and filter by session_id + all_spans = self.llmobs_api.get_llmobs_spans() + session_spans = [s for s in all_spans if s.get("session_id") == session_id] + + if not session_spans: + return web.json_response({"tasks": [], "session_id": session_id, "span_count": 0}) + + # Check cache (invalidate if span count changed) + cache_key = session_id + cached = self._cache.get(cache_key) + if cached and not force and cached.get("span_count") == len(session_spans): + return web.json_response(cached) + + # Get API key from env or app config + api_key = os.environ.get("ANTHROPIC_API_KEY", "") + if not api_key: + return web.json_response( + {"error": "ANTHROPIC_API_KEY not set. Set it to enable task segmentation."}, + status=503, + ) + + # Compress spans into transcript + session_data = compress_session_spans(session_spans, session_id) + + # Build prompt and call LLM + prompt = build_prompt(session_data) + result, cost = await call_anthropic(prompt, api_key) + + response_data = { + "tasks": result.get("tasks", []), + "session_id": session_id, + "span_count": len(session_spans), + "total_turns": session_data["total_turns"], + "cost_usd": cost, + } + + # Cache result + self._cache[cache_key] = response_data + + return web.json_response(response_data) + + except Exception as e: + log.error(f"Error handling session tasks: {e}") + return web.json_response({"error": str(e)}, status=500) + + def get_routes(self) -> List[web.RouteDef]: + """Return routes for this API.""" + return [ + web.route( + "*", + "/api/ui/llm-obs/v1/session/{session_id}/tasks", + with_cors(self.handle_session_tasks), + ), + ]