-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathclaude_hooks.py
More file actions
1349 lines (1170 loc) · 54.7 KB
/
claude_hooks.py
File metadata and controls
1349 lines (1170 loc) · 54.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""Claude Code Hooks → LLM Observability Spans.
Receives Claude Code lifecycle hook events via HTTP and assembles them
into LLMObs-format spans that can be queried through the Event Platform APIs.
"""
import asyncio
import getpass
import gzip
import json
import logging
import os
from pathlib import Path
import random
import socket
import time
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from typing import Set
from urllib.parse import urlparse
from aiohttp import ClientSession
from aiohttp import web
from aiohttp.web import Request
import msgpack
from typing_extensions import cast
from .claude_link_tracker import ClaudeLinkTracker
from .llmobs_event_platform import with_cors
log = logging.getLogger(__name__)
_HOSTNAME = socket.gethostname()
_USERNAME = os.environ.get("HOST_USER") or getpass.getuser()
_USER_HANDLE = os.environ.get("DD_USER_HANDLE", "")
MODEL_CONTEXT_LIMITS: Dict[str, int] = {} # empty; default fallback handles all models
CLAUDE_CODE_EVENTS = [
"PreToolUse",
"PostToolUse",
"PostToolUseFailure",
"Notification",
"Stop",
"SubagentStart",
"SubagentStop",
"UserPromptSubmit",
"SessionStart",
"SessionEnd",
"PreCompact",
"PermissionRequest",
]
CLAUDE_CODE_HOOK = {
"type": "command",
"command": "curl -s --max-time 2 -X POST -H 'Content-Type: application/json' -d @- http://localhost:8126/claude/hooks >/dev/null 2>&1 || true",
"async": True,
}
CLAUDE_CODE_DEFAULT_MATCHER = {"matcher": "", "hooks": [CLAUDE_CODE_HOOK]}
def write_claude_code_hooks(claude_settings_path: Path) -> None:
try:
with open(claude_settings_path, "r") as claude_settings:
try:
claude_code_settings = json.load(claude_settings)
except json.JSONDecodeError:
claude_code_settings = {"hooks": {}}
except FileNotFoundError:
claude_code_settings = {"hooks": {}}
hooks = claude_code_settings.get("hooks", {})
for event in CLAUDE_CODE_EVENTS:
existing_hooks = cast(list[dict[str, Any]] | None, hooks.get(event, None))
if existing_hooks is None:
hooks[event] = [CLAUDE_CODE_DEFAULT_MATCHER]
continue
star_matcher_hook = next(
(hook_matcher for hook_matcher in existing_hooks if hook_matcher.get("matcher", None) == ""), None
)
if star_matcher_hook is None:
existing_hooks.append(CLAUDE_CODE_DEFAULT_MATCHER)
continue
all_hooks_for_star_matcher = cast(list[dict[str, Any]], star_matcher_hook.get("hooks", []))
if not any(hook == CLAUDE_CODE_HOOK for hook in all_hooks_for_star_matcher):
all_hooks_for_star_matcher.append(CLAUDE_CODE_HOOK)
claude_code_settings["hooks"] = hooks
with open(claude_settings_path, "w") as claude_settings:
json.dump(claude_code_settings, claude_settings, indent=2)
def _get_context_limit(model: str) -> int:
"""Return the context window size for a given model."""
if model in MODEL_CONTEXT_LIMITS:
return MODEL_CONTEXT_LIMITS[model]
return 200_000 # all current Claude models
class PendingToolSpan:
"""Tracks a tool invocation between PreToolUse and PostToolUse."""
def __init__(self, span_id: str, tool_name: str, tool_input: Any, parent_id: str, start_ns: int) -> None:
self.span_id = span_id
self.tool_name = tool_name
self.tool_input = tool_input
self.parent_id = parent_id
self.start_ns = start_ns
class SessionState:
"""Tracks the state of a single Claude Code session."""
def __init__(self, session_id: str, trace_id: str, root_span_id: str, start_ns: int) -> None:
self.session_id = session_id
self.trace_id = trace_id
self.root_span_id = root_span_id
self.start_ns = start_ns
self.agent_span_stack: List[Dict[str, Any]] = []
self.pending_tools: Dict[str, PendingToolSpan] = {}
self.user_prompts: List[str] = []
self.model: str = ""
self.tools_used: Set[str] = set()
self.root_span_emitted: bool = False
# Deferred agent spans waiting for PostToolUse(Task) to provide their output.
# Keyed by tool_use_id of the Task tool that spawned the subagent.
self.deferred_agent_spans: Dict[str, Dict[str, Any]] = {}
# Task tool_use_ids that have already been claimed by a SubagentStart,
# so they are not matched again when a second SubagentStart fires.
self.claimed_task_tools: Set[str] = set()
self.conversation_title: str = ""
# Persists across turns so each turn's context_delta reflects growth from
# the previous turn's final context size.
self.last_known_input_tokens: int = 0
# Tracks the time when the permission dialog appeared, so we can compute the elapsed time.
self.pending_permission_at_ns: Optional[int] = None
# whether the session is instrumented with the claude_intercept.mjs script for LLM calls
self.instrumented = False
_MAX_UINT_64 = (1 << 64) - 1
def _rand64bits() -> int:
"""Generate a random 64-bit unsigned integer."""
return random.getrandbits(64)
def _rand128bits() -> int:
"""Generate a 128-bit trace ID matching Datadog's format: <32-bit unix seconds><32 zero bits><64 random bits>."""
return int(time.time()) << 96 | _rand64bits()
def _format_span_id() -> str:
"""Generate a span ID as a decimal string of a random 64-bit unsigned int."""
return str(_rand64bits())
def _format_trace_id() -> str:
"""Generate a trace ID matching Datadog's format_trace_id: 32-char hex for 128-bit IDs."""
trace_id = _rand128bits()
if trace_id > _MAX_UINT_64:
return "{:032x}".format(trace_id)
return str(trace_id)
def _is_user_prompt_entry(entry: Dict[str, Any]) -> bool:
"""Check if a transcript entry is an actual user prompt (not a tool_result)."""
if entry.get("type") != "user":
return False
content = entry.get("message", {}).get("content", [])
if isinstance(content, str):
return True
if isinstance(content, list):
return any(isinstance(b, dict) and b.get("type") == "text" for b in content)
return False
def _read_console_output(transcript_path: str) -> str:
"""Read assistant text responses after the last user prompt from a Claude Code transcript JSONL file."""
if not transcript_path or not os.path.isfile(transcript_path):
return ""
try:
entries: List[Dict[str, Any]] = []
with open(transcript_path, "r") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
entries.append(json.loads(line))
except json.JSONDecodeError:
continue
# Find the last actual user prompt (not tool_result entries)
last_prompt_idx = -1
for i, entry in enumerate(entries):
if _is_user_prompt_entry(entry):
last_prompt_idx = i
# Collect assistant text after the last user prompt
responses: List[str] = []
start = last_prompt_idx + 1 if last_prompt_idx >= 0 else 0
for entry in entries[start:]:
if entry.get("type") == "assistant":
for block in entry.get("message", {}).get("content", []):
if isinstance(block, dict) and block.get("type") == "text":
text = block["text"].strip()
if text:
responses.append(text)
return "\n\n".join(responses)
except Exception as e:
log.debug("Failed to read transcript %s: %s", transcript_path, e)
return ""
def _extract_intent(tool_name: str, tool_input: Any) -> str:
"""Extract a concise human-readable intent from tool input."""
if not isinstance(tool_input, dict):
return ""
def _g(key: str) -> str:
return str(tool_input.get(key, ""))
if tool_name == "Bash":
desc = _g("description")
if desc:
return desc
cmd = _g("command")
return cmd[:80] + "..." if len(cmd) > 80 else cmd
if tool_name in ("Read", "Write", "Edit"):
return os.path.basename(_g("file_path"))
if tool_name == "Glob":
return _g("pattern")
if tool_name == "Grep":
pattern = _g("pattern")
path = _g("path")
if path:
return f'"{pattern}" in {os.path.basename(path)}'
return f'"{pattern}"'
if tool_name == "Task":
return _g("description")
if tool_name == "WebFetch":
url = _g("url")
if url:
return urlparse(url).hostname or ""
return ""
if tool_name == "WebSearch":
return _g("query")
if tool_name == "TaskCreate":
return _g("subject")
if tool_name == "TaskUpdate":
return _g("status")
for key in ("description", "query", "pattern", "file_path", "command", "subject"):
val = _g(key)
if val:
return val[:80] + "..." if len(val) > 80 else val
return ""
class ClaudeHooksAPI:
"""Handler for Claude Code hook events."""
def __init__(self, link_tracker: Optional[ClaudeLinkTracker] = None) -> None:
self._sessions: Dict[str, SessionState] = {}
self._assembled_spans: List[Dict[str, Any]] = []
self._raw_events: List[Dict[str, Any]] = []
self._link_tracker = link_tracker
self._app: Optional[web.Application] = None
def set_app(self, app: web.Application) -> None:
"""Set the aiohttp app reference for backend forwarding."""
self._app = app
def _get_or_create_session(self, session_id: str) -> SessionState:
"""Get existing session or create a new one."""
if session_id not in self._sessions:
trace_id = _format_trace_id()
root_span_id = _format_span_id()
now_ns = int(time.time() * 1_000_000_000)
self._sessions[session_id] = SessionState(
session_id=session_id,
trace_id=trace_id,
root_span_id=root_span_id,
start_ns=now_ns,
)
return self._sessions[session_id]
def _current_parent_id(self, session: SessionState) -> str:
"""Return the span_id of the current active agent (top of stack), or root."""
if session.agent_span_stack:
return str(session.agent_span_stack[-1]["span_id"])
return session.root_span_id
def _current_span_ref(self, session: SessionState) -> Optional[Dict[str, Any]]:
"""Return the span dict of the current active agent (top of stack), or root."""
if session.agent_span_stack:
return session.agent_span_stack[-1].get("_span_ref")
return getattr(session, "_root_span_ref", None)
def _set_hidden_metadata(self, span: Dict[str, Any], **kwargs: Any) -> None:
"""Merge key-value pairs into span['meta']['metadata']['_dd'], preserving existing values."""
span["meta"].setdefault("metadata", {}).setdefault("_dd", {}).update(kwargs)
def _handle_session_start(self, session_id: str, body: Dict[str, Any]) -> None:
"""Handle SessionStart hook event."""
session = self._get_or_create_session(session_id)
model = body.get("model", "")
if model:
session.model = model
session.instrumented = body.get("lapdog_instrumented", False)
log.info("Claude session started: %s (model=%s)", session_id, model)
def _finalize_interrupted_turn(self, session: SessionState) -> None:
"""Finalize an in-progress turn that was interrupted (e.g. user Ctrl+C).
Called when a new UserPromptSubmit or SessionEnd arrives but the previous
turn's Stop hook never fired. Updates all in-progress spans with their
current duration so the trace is complete.
"""
now_ns = int(time.time() * 1_000_000_000)
duration = now_ns - session.start_ns
# Discard any pending permission wait — the turn was interrupted so we don't
# want it bleeding into the next turn's accumulated total.
session.pending_permission_at_ns = None
root_span_ref: Optional[Dict[str, Any]] = getattr(session, "_root_span_ref", None)
if root_span_ref is not None and root_span_ref["duration"] == -1:
root_span_ref["duration"] = 0
# Finalize any in-progress subagent spans on the stack
while session.agent_span_stack:
agent_info = session.agent_span_stack.pop()
span_ref = agent_info.get("_span_ref")
if span_ref:
span_ref["duration"] = now_ns - agent_info["start_ns"]
span_ref["status"] = "error"
span_ref["meta"]["error"] = {"message": "interrupted"}
# Clear pending tools (they'll never get a PostToolUse)
session.pending_tools.clear()
session.deferred_agent_spans.clear()
session.claimed_task_tools.clear()
# Finalize the root span
root_span: Optional[Dict[str, Any]] = getattr(session, "_root_span_ref", None)
if not root_span:
root_span = next(
(s for s in self._assembled_spans if s.get("span_id") == session.root_span_id),
None,
)
token_usage = self._compute_token_usage(session.trace_id)
input_value = "\n\n".join(session.user_prompts) if session.user_prompts else ""
if root_span:
root_span["duration"] = duration
root_span["status"] = "error"
root_span["meta"]["input"]["value"] = input_value
root_span["meta"]["error"] = {"message": "interrupted by user"}
root_span["meta"]["model_name"] = session.model
root_span["meta"]["model_provider"] = "anthropic"
root_span["metrics"] = token_usage
session.root_span_emitted = True
log.info("Finalized interrupted turn for session %s (trace %s)", session.session_id, session.trace_id)
def _handle_user_prompt_submit(self, session_id: str, body: Dict[str, Any]) -> None:
"""Handle UserPromptSubmit hook event — starts a new trace for each user turn.
Emits a preliminary root span eagerly so the execution graph can render
before Stop fires. The root span is updated in-place by _handle_stop.
"""
session = self._get_or_create_session(session_id)
# If the previous turn was never finalized (Stop never fired, e.g. Ctrl+C),
# finalize it as interrupted before starting the new turn.
if not session.root_span_emitted and getattr(session, "_root_span_ref", None) is not None:
self._finalize_interrupted_turn(session)
# If the previous turn's root span was emitted, start a fresh trace
if session.root_span_emitted:
now_ns = int(time.time() * 1_000_000_000)
session.trace_id = _format_trace_id()
session.root_span_id = _format_span_id()
session.start_ns = now_ns
session.user_prompts = []
session.tools_used = set()
session.agent_span_stack = []
session.pending_tools = {}
session.deferred_agent_spans = {}
session.claimed_task_tools = set()
# Don't reset conversation_title — it persists across turns so
# subsequent interactions on the same topic reuse the title.
# The haiku summarization call will update it when the topic changes.
session.root_span_emitted = False
prompt = body.get("user_prompt", body.get("prompt", ""))
if prompt:
session.user_prompts.append(prompt)
# Emit a preliminary root span so the trace has a root node immediately
root_span: Dict[str, Any] = {
"span_id": session.root_span_id,
"trace_id": session.trace_id,
"parent_id": "undefined",
"name": "claude-code-request",
"status": "ok",
"start_ns": session.start_ns,
"duration": 0,
"ml_app": "claude-code",
"service": "claude-code",
"env": "local",
"session_id": session.session_id,
"tags": [
"ml_app:claude-code",
f"session_id:{session.session_id}",
"service:claude-code",
"env:local",
"source:claude-code-hooks",
"language:python",
f"hostname:{_HOSTNAME}",
]
+ ([f"user_handle:{_USER_HANDLE}"] if _USER_HANDLE else [])
+ ([f"topic:{session.conversation_title}"] if session.conversation_title else []),
"meta": {
"span": {"kind": "agent"},
"input": {"value": prompt},
"output": {"value": ""},
},
"metrics": {},
}
self._assembled_spans.append(root_span)
session._root_span_ref = root_span # type: ignore[attr-defined]
def _handle_pre_tool_use(self, session_id: str, body: Dict[str, Any]) -> None:
"""Handle PreToolUse hook event — creates a pending tool span."""
session = self._get_or_create_session(session_id)
tool_name = body.get("tool_name", "unknown_tool")
tool_input = body.get("tool_input", {})
tool_use_id = body.get("tool_use_id", tool_name)
session.tools_used.add(tool_name)
span_id = _format_span_id()
parent_id = self._current_parent_id(session)
now_ns = int(time.time() * 1_000_000_000)
session.pending_tools[tool_use_id] = PendingToolSpan(
span_id=span_id,
tool_name=tool_name,
tool_input=tool_input,
parent_id=parent_id,
start_ns=now_ns,
)
def _handle_post_tool_use(self, session_id: str, body: Dict[str, Any]) -> None:
"""Handle PostToolUse hook event.
If this is PostToolUse for a "Task" tool with a deferred agent span,
emits the agent span (with Task I/O and span links) instead of a tool span.
Otherwise emits a normal tool span.
"""
session = self._get_or_create_session(session_id)
tool_name = body.get("tool_name", "unknown_tool")
tool_use_id = body.get("tool_use_id", tool_name)
tool_output = body.get("tool_response", body.get("tool_output", ""))
now_ns = int(time.time() * 1_000_000_000)
pending = session.pending_tools.pop(tool_use_id, None)
# Clean up claimed_task_tools so the set doesn't grow unbounded
session.claimed_task_tools.discard(tool_use_id)
if pending:
span_id = pending.span_id
parent_id = pending.parent_id
start_ns = pending.start_ns
input_value = str(pending.tool_input) if pending.tool_input else ""
actual_tool_name = pending.tool_name
tool_input_dict = pending.tool_input if isinstance(pending.tool_input, dict) else {}
else:
span_id = _format_span_id()
parent_id = self._current_parent_id(session)
start_ns = now_ns
input_value = ""
actual_tool_name = tool_name
tool_input_dict = {}
intent = _extract_intent(actual_tool_name, tool_input_dict)
output_str = str(tool_output)[:4096] if tool_output else ""
estimated_permission_wait_ms: Optional[int] = None
if session.pending_permission_at_ns is not None:
estimated_permission_wait_ms = (now_ns - session.pending_permission_at_ns) // 1_000_000
session.pending_permission_at_ns = None
root_span_ref: Optional[Dict[str, Any]] = getattr(session, "_root_span_ref", None)
if root_span_ref is not None and root_span_ref["duration"] == -1:
root_span_ref["duration"] = 0
# Check if this Task tool has a deferred agent span to update instead
deferred = session.deferred_agent_spans.pop(tool_use_id, None)
if deferred:
# Update the eagerly-emitted AGENT span with the Task tool's I/O and span links.
agent_span_id = deferred["span_id"]
span_links = []
if self._link_tracker:
links = self._link_tracker.on_tool_call(
tool_use_id, agent_span_id, session.trace_id, deferred["parent_id"]
)
span_links = [link.to_dict() for link in links]
span_ref = deferred.get("_span_ref")
context_delta = deferred.get("context_delta")
if span_ref:
# Update the preliminary span in-place
span_ref["duration"] = deferred["duration"]
span_ref["meta"]["input"] = {"value": input_value}
span_ref["meta"]["output"] = {"value": output_str}
span_ref["span_links"] = span_links
if context_delta:
self._set_hidden_metadata(span_ref, context_delta=context_delta)
else:
# Fallback: no preliminary span — append a new one
span = {
"span_id": agent_span_id,
"trace_id": deferred["trace_id"],
"parent_id": deferred["parent_id"],
"name": deferred["name"],
"status": "ok",
"start_ns": deferred["start_ns"],
"duration": deferred["duration"],
"ml_app": "claude-code",
"service": "claude-code",
"env": "local",
"session_id": session.session_id,
"tags": [
"ml_app:claude-code",
f"session_id:{session.session_id}",
"service:claude-code",
"env:local",
"source:claude-code-hooks",
"language:python",
f"hostname:{_HOSTNAME}",
],
"meta": {
"span": {"kind": "agent"},
"input": {"value": input_value},
"output": {"value": output_str},
},
"metrics": {},
"span_links": span_links,
}
if context_delta:
self._set_hidden_metadata(span, context_delta=context_delta)
self._assembled_spans.append(span)
return
# Normal tool span
duration = now_ns - start_ns
span_links = []
if self._link_tracker:
links = self._link_tracker.on_tool_call(tool_use_id, span_id, session.trace_id, parent_id)
span_links = [link.to_dict() for link in links]
span_name = f"{actual_tool_name} - {intent}" if intent else actual_tool_name
span = {
"span_id": span_id,
"trace_id": session.trace_id,
"parent_id": parent_id,
"name": span_name,
"status": "ok",
"start_ns": start_ns,
"duration": duration,
"ml_app": "claude-code",
"service": "claude-code",
"env": "local",
"session_id": session.session_id,
"tags": [
"ml_app:claude-code",
f"session_id:{session.session_id}",
"service:claude-code",
"env:local",
"source:claude-code-hooks",
"language:python",
f"hostname:{_HOSTNAME}",
f"tool_name:{actual_tool_name}",
],
"meta": {
"span": {"kind": "tool"},
"input": {"value": input_value},
"output": {"value": output_str},
},
"metrics": {},
"span_links": span_links,
}
if estimated_permission_wait_ms is not None:
self._set_hidden_metadata(span, estimated_permission_wait_ms=estimated_permission_wait_ms)
self._assembled_spans.append(span)
def _handle_subagent_start(self, session_id: str, body: Dict[str, Any]) -> None:
"""Handle SubagentStart hook event — pushes a new agent onto the stack.
Emits a preliminary agent span immediately so the UI can show the
in-progress subagent. The span is updated in-place by _handle_subagent_stop
(or _handle_post_tool_use for Task-spawned subagents).
If a pending "Task" tool exists, captures its tool_use_id and input so the
agent span can absorb the Task tool's I/O and span links.
"""
session = self._get_or_create_session(session_id)
span_id = _format_span_id()
parent_id = self._current_parent_id(session)
now_ns = int(time.time() * 1_000_000_000)
agent_name = body.get("agent_type", body.get("agent_name", "subagent"))
# Find the pending "Task" tool that spawned this subagent.
# Skip Task tools already claimed by a previous SubagentStart so that
# when multiple Task tools are pending, each subagent gets its own.
task_tool_use_id = ""
task_tool_input: Any = None
for tid, pending in session.pending_tools.items():
if pending.tool_name == "Task" and tid not in session.claimed_task_tools:
task_tool_use_id = tid
task_tool_input = pending.tool_input
session.claimed_task_tools.add(tid)
break
# Enrich agent name with the Task tool's description if available
task_desc = ""
if isinstance(task_tool_input, dict):
task_desc = task_tool_input.get("description", "")
if task_desc:
agent_name = f"{agent_name} - {task_desc}"
# Emit a preliminary agent span so the trace shows the subagent immediately
preliminary_span: Dict[str, Any] = {
"span_id": span_id,
"trace_id": session.trace_id,
"parent_id": parent_id,
"name": agent_name,
"status": "ok",
"start_ns": now_ns,
"duration": 0,
"ml_app": "claude-code",
"service": "claude-code",
"env": "local",
"session_id": session.session_id,
"tags": [
"ml_app:claude-code",
f"session_id:{session.session_id}",
"service:claude-code",
"env:local",
"source:claude-code-hooks",
"language:python",
f"hostname:{_HOSTNAME}",
],
"meta": {
"span": {"kind": "agent"},
"input": {},
"output": {},
},
"metrics": {},
}
self._assembled_spans.append(preliminary_span)
session.agent_span_stack.append(
{
"span_id": span_id,
"parent_id": parent_id,
"name": agent_name,
"start_ns": now_ns,
"task_tool_use_id": task_tool_use_id,
"task_tool_input": task_tool_input,
"_span_ref": preliminary_span,
}
)
def _handle_subagent_stop(self, session_id: str, body: Dict[str, Any]) -> None:
"""Handle SubagentStop hook event — pops the agent stack.
Updates the eagerly-emitted agent span in-place with final duration.
If the agent was spawned by a Task tool, defers the final update until
PostToolUse(Task) fires so the agent span can include the Task's output.
"""
session = self._get_or_create_session(session_id)
now_ns = int(time.time() * 1_000_000_000)
if not session.agent_span_stack:
log.warning("SubagentStop with empty agent stack for session %s", session_id)
return
agent_info = session.agent_span_stack.pop()
duration = now_ns - agent_info["start_ns"]
task_tool_use_id = agent_info.get("task_tool_use_id", "")
task_tool_input = agent_info.get("task_tool_input")
span_ref = agent_info.get("_span_ref")
context_delta = self._compute_context_delta(
session.trace_id,
agent_info["span_id"],
first_input_tokens=0,
)
estimated_perm_wait = self._sum_estimated_permission_wait_ms(parent_id=str(agent_info["span_id"]))
# If SubagentStart fired before PreToolUse(Task), retry the match now.
if not task_tool_use_id:
for tid, pending in session.pending_tools.items():
if pending.tool_name == "Task" and tid not in session.claimed_task_tools:
task_tool_use_id = tid
task_tool_input = pending.tool_input
session.claimed_task_tools.add(tid)
desc = _extract_intent("Task", task_tool_input)
if desc and span_ref:
span_ref["name"] = f"{agent_info['name']} - {desc}"
break
if task_tool_use_id:
# Defer final update — PostToolUse(Task) will set I/O and span links.
# Update duration on the preliminary span so the UI shows progress.
if span_ref:
span_ref["duration"] = duration
if estimated_perm_wait > 0:
self._set_hidden_metadata(span_ref, estimated_permission_wait_ms=estimated_perm_wait)
session.deferred_agent_spans[task_tool_use_id] = {
"span_id": agent_info["span_id"],
"trace_id": session.trace_id,
"parent_id": agent_info["parent_id"],
"name": agent_info["name"],
"start_ns": agent_info["start_ns"],
"duration": duration,
"input": str(task_tool_input) if task_tool_input else "",
"context_delta": context_delta,
"_span_ref": span_ref,
}
else:
# Update the eagerly-emitted span in-place
if span_ref:
span_ref["duration"] = duration
dd_fields: Dict[str, Any] = {}
if context_delta:
dd_fields["context_delta"] = context_delta
if estimated_perm_wait > 0:
dd_fields["estimated_permission_wait_ms"] = estimated_perm_wait
if dd_fields:
self._set_hidden_metadata(span_ref, **dd_fields)
else:
# Fallback: no preliminary span (shouldn't happen)
span = {
"span_id": agent_info["span_id"],
"trace_id": session.trace_id,
"parent_id": agent_info["parent_id"],
"name": agent_info["name"],
"status": "ok",
"start_ns": agent_info["start_ns"],
"duration": duration,
"ml_app": "claude-code",
"service": "claude-code",
"env": "local",
"session_id": session.session_id,
"tags": [
"ml_app:claude-code",
f"session_id:{session.session_id}",
"service:claude-code",
"env:local",
"source:claude-code-hooks",
"language:python",
f"hostname:{_HOSTNAME}",
],
"meta": {
"span": {"kind": "agent"},
"input": {},
"output": {},
},
"metrics": {},
}
if context_delta:
self._set_hidden_metadata(span, context_delta=context_delta)
self._assembled_spans.append(span)
def _compute_token_usage(self, trace_id: str) -> Dict[str, int]:
"""Sum token metrics from all LLM spans in the given trace."""
total_input = 0
total_output = 0
for span in self._assembled_spans:
if span.get("trace_id") != trace_id:
continue
if span.get("meta", {}).get("span", {}).get("kind") != "llm":
continue
metrics = span.get("metrics", {})
total_input += metrics.get("input_tokens", 0)
total_output += metrics.get("output_tokens", 0)
return {
"input_tokens": total_input,
"output_tokens": total_output,
"total_tokens": total_input + total_output,
}
def _compute_context_delta(
self, trace_id: str, parent_span_id: str, first_input_tokens: int = 0
) -> Optional[Dict[str, Any]]:
"""Return context delta for an agent span.
parent_span_id: only consider LLM spans whose parent is this span.
first_input_tokens: context size at the beginning of this span.
- Root agent: session.last_known_input_tokens (persists across turns)
- Subagent: 0 (each subagent has its own fresh context window)
"""
llm_spans = sorted(
[
s
for s in self._assembled_spans
if s.get("trace_id") == trace_id
and s.get("meta", {}).get("span", {}).get("kind") == "llm"
and s.get("parent_id") == parent_span_id
],
key=lambda s: s.get("start_ns", 0),
)
last_span = next(
(s for s in reversed(llm_spans) if s.get("metrics", {}).get("input_tokens", 0) > 0),
None,
)
if last_span is None:
return None
last_input_tokens = last_span.get("metrics", {}).get("input_tokens", 0)
primary_model = last_span.get("meta", {}).get("model_name", "")
window = _get_context_limit(primary_model)
delta_tokens = max(last_input_tokens - first_input_tokens, 0)
return {
"first_input_tokens": first_input_tokens,
"last_input_tokens": last_input_tokens,
"delta_tokens": delta_tokens,
"context_window_size": window,
"first_usage_pct": round(first_input_tokens / window * 100, 1) if window else 0.0,
"last_usage_pct": round(last_input_tokens / window * 100, 1) if window else 0.0,
}
def _handle_stop(self, session_id: str, body: Dict[str, Any]) -> None:
"""Handle Stop / SessionEnd hook event — updates the eagerly-emitted root span with final data."""
session = self._sessions.get(session_id)
if not session:
log.warning("Stop event for unknown session %s", session_id)
return
now_ns = int(time.time() * 1_000_000_000)
duration = now_ns - session.start_ns
input_value = "\n\n".join(session.user_prompts) if session.user_prompts else ""
transcript_path = body.get("transcript_path", "")
output_value = _read_console_output(transcript_path)
agent_manifest = {
"name": "claude-code",
"instructions": "",
"handoff_description": "",
"model": session.model,
"model_provider": "anthropic",
"model_settings": {},
"tools": [{"name": name} for name in sorted(session.tools_used)],
"handoffs": [],
"guardrails": [],
}
# Find the eagerly-emitted root span and update it in-place
root_span: Optional[Dict[str, Any]] = getattr(session, "_root_span_ref", None)
if not root_span:
# Fallback: search _assembled_spans
root_span = next(
(s for s in self._assembled_spans if s.get("span_id") == session.root_span_id),
None,
)
# Compute aggregate token usage from LLM spans in this trace
token_usage = self._compute_token_usage(session.trace_id)
context_delta = self._compute_context_delta(
session.trace_id, session.root_span_id, session.last_known_input_tokens
)
if context_delta:
session.last_known_input_tokens = context_delta["last_input_tokens"]
root_span_name = "claude-code-request"
estimated_permission_wait_ms = self._sum_estimated_permission_wait_ms(trace_id=session.trace_id)
if root_span:
root_span["name"] = root_span_name
if session.conversation_title:
topic_tag = f"topic:{session.conversation_title}"
# Replace existing topic tag (set from preliminary span) or append
root_span["tags"] = [t for t in root_span["tags"] if not t.startswith("topic:")] + [topic_tag]
root_span["duration"] = duration
root_span["meta"]["input"]["value"] = input_value
root_span["meta"]["output"]["value"] = output_value
root_span["meta"]["model_name"] = session.model
root_span["meta"]["model_provider"] = "anthropic"
root_meta = root_span["meta"].setdefault("metadata", {})
root_meta.update(
{
"model_name": session.model,
"model_provider": "anthropic",
}
)
dd_fields: Dict[str, Any] = {"agent_manifest": agent_manifest}
if context_delta:
dd_fields["context_delta"] = context_delta
if estimated_permission_wait_ms > 0:
dd_fields["estimated_permission_wait_ms"] = estimated_permission_wait_ms
self._set_hidden_metadata(root_span, **dd_fields)
root_span["metrics"] = token_usage
else:
# No eagerly-emitted root span found — create one as fallback
root_span = {
"span_id": session.root_span_id,
"trace_id": session.trace_id,
"parent_id": "undefined",
"name": root_span_name,
"status": "ok",
"start_ns": session.start_ns,
"duration": duration,
"ml_app": "claude-code",
"service": "claude-code",
"env": "local",
"session_id": session.session_id,
"tags": [
"ml_app:claude-code",
f"session_id:{session.session_id}",
"service:claude-code",
"env:local",
"source:claude-code-hooks",
"language:python",
f"hostname:{_HOSTNAME}",
f"user_name:{_USERNAME}",
]
+ ([f"user_handle:{_USER_HANDLE}"] if _USER_HANDLE else [])
+ ([f"topic:{session.conversation_title}"] if session.conversation_title else []),
"meta": {
"span": {"kind": "agent"},
"input": {"value": input_value},
"output": {"value": output_value},
"model_name": session.model,
"model_provider": "anthropic",
"metadata": {
"model_name": session.model,
"model_provider": "anthropic",
},
},
"metrics": token_usage,
}
dd_fields = {"agent_manifest": agent_manifest}
if context_delta:
dd_fields["context_delta"] = context_delta
if estimated_permission_wait_ms > 0:
dd_fields["estimated_permission_wait_ms"] = estimated_permission_wait_ms
self._set_hidden_metadata(root_span, **dd_fields)
self._assembled_spans.append(root_span)
session.root_span_emitted = True
def _handle_session_end(self, session_id: str, body: Dict[str, Any]) -> None:
"""Handle SessionEnd hook event.
If the current turn was already finalized by Stop, this is a no-op.
If Stop never fired (e.g. user Ctrl+C), finalize the turn as interrupted.
"""
session = self._sessions.get(session_id)
if not session:
return
if not session.root_span_emitted:
self._finalize_interrupted_turn(session)
def _handle_notification(self, session_id: str, body: Dict[str, Any]) -> None:
"""Handle Notification hook event — logged but no span emitted."""
log.info("Claude notification for session %s: %s", session_id, body.get("message", ""))
def _handle_post_tool_use_failure(self, session_id: str, body: Dict[str, Any]) -> None:
"""Handle PostToolUseFailure hook event — emits a tool span with error status.