-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathtracing.py
More file actions
206 lines (164 loc) · 6.75 KB
/
tracing.py
File metadata and controls
206 lines (164 loc) · 6.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
import json
import re
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional
from quotientai.exceptions import logger
@dataclass
class Trace:
"""
Represents a trace from the QuotientAI API
"""
trace_id: str
root_span: Optional[Dict[str, Any]] = None
total_duration_ms: float = 0
start_time: Optional[datetime] = None
end_time: Optional[datetime] = None
span_list: List[Dict[str, Any]] = None
def __post_init__(self):
if self.span_list is None:
self.span_list = []
def __rich_repr__(self): # pragma: no cover
yield "id", self.trace_id
yield "total_duration_ms", self.total_duration_ms
if self.start_time:
yield "start_time", self.start_time
if self.end_time:
yield "end_time", self.end_time
class Traces:
"""
Container for traces that matches the API response schema.
"""
def __init__(self, data: List[Trace], count: int):
self.data = data
self.count = count
def __repr__(self):
return f"Traces(count={self.count}, data=[{type(self.data[0] if self.data else None)}])"
def to_jsonl(self, filename: Optional[str] = None) -> str:
"""
Export traces to JSON Lines format.
Args:
filename: Optional filename to save the JSON Lines data to
Returns:
String containing JSON Lines data
"""
jsonl_lines = []
for trace in self.data:
# Convert Trace object to dict for JSON serialization
trace_dict = {
"trace_id": trace.trace_id,
"root_span": trace.root_span,
"total_duration_ms": trace.total_duration_ms,
"start_time": trace.start_time.isoformat() if trace.start_time else None,
"end_time": trace.end_time.isoformat() if trace.end_time else None,
"span_list": trace.span_list,
}
jsonl_lines.append(json.dumps(trace_dict))
jsonl_data = "\n".join(jsonl_lines)
if filename:
with open(filename, 'w') as f:
f.write(jsonl_data)
return jsonl_data
class TracesResource:
"""
Resource for interacting with traces in the Quotient API.
"""
def __init__(self, client):
self._client = client
def list(
self,
*,
time_range: Optional[str] = None,
app_name: Optional[str] = None,
environments: Optional[List[str]] = None,
compress: bool = True,
) -> Traces:
"""
List traces with optional filtering parameters.
Args:
time_range: Optional time range filter (e.g., "1d", "1h", "1m")
app_name: Optional app name filter
environments: Optional list of environments to filter by
compress: Whether to request compressed response
Returns:
Traces object containing traces and total count
"""
try:
params = {}
if time_range:
params["time_range"] = time_range
# convert time range from 1d / 1h / 1m to 1 DAY / 1 HOUR / 1 MINUTE, months to MONTHS
params["time_range"] = params["time_range"].replace("d", " DAY").replace("h", " HOUR").replace("m", " MINUTE").replace("M", " MONTHS")
# add a space between the number and the unit
params["time_range"] = re.sub(r'(\d+)([a-zA-Z]+)', r'\1 \2', params["time_range"])
if app_name:
params["app_name"] = app_name
if environments:
params["environments"] = environments
if compress:
params["compress"] = "true"
headers = {}
if compress:
headers["Accept-Encoding"] = "gzip"
# the response is already decompressed by httpx
# https://www.python-httpx.org/quickstart/#binary-response-content
response = self._client._get("/traces", params=params)
# Convert trace dictionaries to Trace objects
trace_objects = []
for trace_dict in response.get("traces", []):
# Parse datetime fields
start_time = None
if trace_dict.get("start_time"):
start_time = datetime.fromisoformat(trace_dict["start_time"].replace('Z', '+00:00'))
end_time = None
if trace_dict.get("end_time"):
end_time = datetime.fromisoformat(trace_dict["end_time"].replace('Z', '+00:00'))
trace = Trace(
trace_id=trace_dict["trace_id"],
root_span=trace_dict.get("root_span"),
total_duration_ms=trace_dict.get("total_duration_ms", 0),
start_time=start_time,
end_time=end_time,
span_list=trace_dict.get("span_list", []),
)
trace_objects.append(trace)
traces = Traces(
data=trace_objects,
count=len(trace_objects),
)
except Exception as e:
logger.error(f"Error listing traces: {str(e)}")
raise
# Return Traces object with structured response
return traces
def get(self, trace_id: str) -> Trace:
"""
Get a specific trace by its ID.
Args:
trace_id: The ID of the trace to retrieve
Returns:
Trace object containing the trace data
"""
try:
response = self._client._get(f"/traces/{trace_id}")
# Response is already parsed JSON from @handle_errors decorator
trace_dict = response
# Parse datetime fields
start_time = None
if trace_dict.get("start_time"):
start_time = datetime.fromisoformat(trace_dict["start_time"].replace('Z', '+00:00'))
end_time = None
if trace_dict.get("end_time"):
end_time = datetime.fromisoformat(trace_dict["end_time"].replace('Z', '+00:00'))
trace = Trace(
trace_id=trace_dict["trace_id"],
root_span=trace_dict.get("root_span"),
total_duration_ms=trace_dict.get("total_duration_ms", 0),
start_time=start_time,
end_time=end_time,
span_list=trace_dict.get("span_list", []),
)
except Exception as e:
logger.error(f"Error getting trace {trace_id}: {str(e)}")
raise
return trace