-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathclaude_link_tracker.py
More file actions
164 lines (137 loc) · 5.75 KB
/
claude_link_tracker.py
File metadata and controls
164 lines (137 loc) · 5.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
"""Span link tracking for connecting LLM and tool spans in Claude Code traces.
Follows the same linking pattern as dd-trace-py's LinkTracker:
- LLM.output -> Tool.input (when LLM generates tool_use, link to the tool span)
- Tool.output -> LLM.input (when tool_result is sent to next LLM call)
"""
import logging
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from typing import Tuple
log = logging.getLogger(__name__)
class SpanLink:
"""A directional link between two spans in the agentic graph."""
def __init__(self, span_id: str, trace_id: str, from_io: str, to_io: str) -> None:
self.span_id = span_id
self.trace_id = trace_id
self.from_io = from_io
self.to_io = to_io
def to_dict(self) -> Dict[str, Any]:
return {
"span_id": self.span_id,
"trace_id": self.trace_id,
"attributes": {"from": self.from_io, "to": self.to_io},
}
class TrackedToolCall:
"""Tracks a tool call between LLM response and tool execution for linking."""
def __init__(
self,
tool_use_id: str,
tool_name: str,
arguments: str,
llm_span_id: str,
llm_trace_id: str,
) -> None:
self.tool_use_id = tool_use_id
self.tool_name = tool_name
self.arguments = arguments
self.llm_span_id = llm_span_id
self.llm_trace_id = llm_trace_id
self.tool_span_id: Optional[str] = None
self.tool_trace_id: Optional[str] = None
self.tool_parent_id: Optional[str] = None
class ClaudeLinkTracker:
"""Track tool_use_id correlations between LLM and tool spans.
The linking flow:
1. Proxy sees LLM response with tool_use blocks -> ``on_llm_tool_choice()``
stores ``{tool_use_id -> llm_span context}``.
2. Hook fires PostToolUse with tool_use_id -> ``on_tool_call()``
creates LLM.output -> Tool.input link on the tool span and
stores ``{tool_use_id -> tool_span context}``.
3. Proxy sees next LLM request with tool_result blocks -> ``on_tool_call_output_used()``
creates Tool.output -> LLM.input link on the LLM span and
consumes the tracked tool call.
"""
def __init__(self) -> None:
self._tool_calls: Dict[str, TrackedToolCall] = {}
self._llm_span_parents: Dict[str, str] = {} # llm_span_id → parent agent span_id
def on_llm_tool_choice(
self,
tool_use_id: str,
tool_name: str,
arguments: str,
llm_span_id: str,
llm_trace_id: str,
) -> None:
"""Record an LLM response tool_use block for span linking."""
self._tool_calls[tool_use_id] = TrackedToolCall(
tool_use_id=tool_use_id,
tool_name=tool_name,
arguments=arguments,
llm_span_id=llm_span_id,
llm_trace_id=llm_trace_id,
)
log.debug("Tracking tool choice: %s (%s)", tool_use_id, tool_name)
def on_tool_call(
self, tool_use_id: str, tool_span_id: str, tool_trace_id: str, tool_parent_id: str
) -> List[SpanLink]:
"""Create span links for a finished tool span (from PostToolUse hook).
Return span links to add to the tool span (LLM.output -> Tool.input).
Also store the tool span's parent_id so the proxy can use it to determine
the correct parent for subsequent LLM spans (handles concurrent subagents).
"""
tc = self._tool_calls.get(tool_use_id)
if not tc:
return []
tc.tool_span_id = tool_span_id
tc.tool_trace_id = tool_trace_id
tc.tool_parent_id = tool_parent_id
log.debug("Linking LLM(%s).output -> Tool(%s).input via %s", tc.llm_span_id, tool_span_id, tool_use_id)
return [
SpanLink(
span_id=tc.llm_span_id,
trace_id=tc.llm_trace_id,
from_io="output",
to_io="input",
)
]
def on_tool_call_output_used(self, tool_use_id: str) -> Tuple[List[SpanLink], Optional[str]]:
"""Create span links for an LLM request containing tool_result blocks.
Return ``(span_links, parent_id_hint)`` where *span_links* are
Tool.output -> LLM.input links for the LLM span and *parent_id_hint*
is the parent of the tool span that produced this result (allowing the
proxy to assign the correct parent even with concurrent subagents).
Consume the tracked tool call.
"""
tc = self._tool_calls.pop(tool_use_id, None)
if not tc or not tc.tool_span_id:
return [], None
log.debug(
"Linking Tool(%s).output -> LLM.input via %s (parent hint: %s)",
tc.tool_span_id,
tool_use_id,
tc.tool_parent_id,
)
links = [
SpanLink(
span_id=tc.tool_span_id,
trace_id=tc.tool_trace_id or "",
from_io="output",
to_io="input",
)
]
return links, tc.tool_parent_id
def set_llm_parent(self, llm_span_id: str, parent_span_id: str) -> None:
"""Record which agent span an LLM span belongs to."""
self._llm_span_parents[llm_span_id] = parent_span_id
def get_parent_for_tool(self, tool_use_id: str) -> Optional[str]:
"""Resolve the parent agent for a tool via: tool_use_id → LLM span → agent parent.
When an LLM response emits tool_use blocks, on_llm_tool_choice records which
LLM span produced each tool_use_id. This method walks that chain to find the
agent span that the LLM (and therefore the tool) belongs to.
"""
tc = self._tool_calls.get(tool_use_id)
if not tc:
return None
return self._llm_span_parents.get(tc.llm_span_id)