Skip to content

Commit 3fcfafc

Browse files
improve trace load logic
1 parent d0955cc commit 3fcfafc

5 files changed

Lines changed: 295 additions & 59 deletions

File tree

src/agentevals/api/models.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
from pydantic import BaseModel, ConfigDict, Field
1212
from pydantic.alias_generators import to_camel
1313

14+
from ..config import EvalParams
15+
1416
T = TypeVar("T")
1517

1618

@@ -134,6 +136,14 @@ class ConvertTracesData(CamelModel):
134136
traces: list[TraceConversionEntry]
135137

136138

139+
class EvaluateJsonRequest(CamelModel):
140+
"""Request body for JSON-based trace evaluation (``POST /evaluate/json``)."""
141+
142+
traces: dict = Field(description="OTLP JSON export with resourceSpans structure.")
143+
config: EvalParams = Field(default_factory=EvalParams, description="Evaluation parameters.")
144+
eval_set: dict | None = Field(default=None, description="Optional ADK EvalSet JSON.")
145+
146+
137147
# ---------------------------------------------------------------------------
138148
# SSE evaluation event models
139149
# ---------------------------------------------------------------------------

src/agentevals/api/routes.py

Lines changed: 141 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,22 @@
2727
)
2828
from ..converter import convert_traces
2929
from ..extraction import get_extractor
30-
from ..runner import RunResult, get_loader, load_eval_set, run_evaluation
30+
from ..loader.otlp import OtlpJsonLoader
31+
from ..runner import (
32+
RunResult,
33+
get_loader,
34+
load_eval_set,
35+
load_eval_set_from_dict,
36+
run_evaluation,
37+
run_evaluation_from_traces,
38+
)
3139
from ..trace_metrics import extract_performance_metrics, extract_trace_metadata
3240
from .models import (
3341
ApiKeyStatus,
3442
ConfigData,
3543
ConvertTracesData,
3644
EvalSetValidation,
45+
EvaluateJsonRequest,
3746
HealthData,
3847
MetricInfo,
3948
SSEDoneEvent,
@@ -729,3 +738,134 @@ async def run_with_progress():
729738
"Connection": "keep-alive",
730739
},
731740
)
741+
742+
743+
@router.post("/evaluate/json", response_model=StandardResponse[RunResult])
744+
async def evaluate_traces_json(request: EvaluateJsonRequest):
745+
"""Evaluate OTLP JSON traces passed in the request body."""
746+
try:
747+
traces = OtlpJsonLoader().load_from_dict(request.traces)
748+
except ValueError as exc:
749+
raise HTTPException(status_code=400, detail=str(exc)) from exc
750+
751+
if not traces:
752+
raise HTTPException(status_code=400, detail="No traces found in OTLP JSON")
753+
754+
eval_set = None
755+
if request.eval_set:
756+
try:
757+
eval_set = load_eval_set_from_dict(request.eval_set)
758+
except Exception as exc:
759+
raise HTTPException(status_code=400, detail=f"Invalid eval set: {exc}") from exc
760+
761+
try:
762+
result = await run_evaluation_from_traces(
763+
traces=traces, config=request.config, eval_set=eval_set,
764+
)
765+
return StandardResponse(data=_camel_keys(result.model_dump(by_alias=True)))
766+
except Exception as exc:
767+
logger.exception("JSON evaluation failed")
768+
raise HTTPException(status_code=500, detail=f"Internal error: {exc!s}") from exc
769+
770+
771+
@router.post("/evaluate/json/stream")
772+
async def evaluate_traces_json_stream(request: EvaluateJsonRequest):
773+
"""Evaluate OTLP JSON traces with real-time progress via SSE."""
774+
775+
async def event_generator():
776+
try:
777+
try:
778+
traces = OtlpJsonLoader().load_from_dict(request.traces)
779+
except ValueError as exc:
780+
yield f"data: {SSEErrorEvent(error=str(exc)).model_dump_json(by_alias=True)}\n\n"
781+
return
782+
783+
if not traces:
784+
yield f"data: {SSEErrorEvent(error='No traces found in OTLP JSON').model_dump_json(by_alias=True)}\n\n"
785+
return
786+
787+
eval_set = None
788+
if request.eval_set:
789+
try:
790+
eval_set = load_eval_set_from_dict(request.eval_set)
791+
except Exception as exc:
792+
yield f"data: {SSEErrorEvent(error=f'Invalid eval set: {exc}').model_dump_json(by_alias=True)}\n\n"
793+
return
794+
795+
for trace in traces:
796+
try:
797+
extractor = get_extractor(trace)
798+
perf_metrics = _camel_keys(extract_performance_metrics(trace, extractor))
799+
trace_metadata = _camel_keys(extract_trace_metadata(trace, extractor))
800+
evt = SSEPerformanceMetricsEvent(
801+
trace_id=trace.trace_id,
802+
performance_metrics=perf_metrics,
803+
trace_metadata=trace_metadata,
804+
)
805+
yield f"event: performance_metrics\ndata: {evt.model_dump_json(by_alias=True)}\n\n"
806+
except Exception as e:
807+
logger.error(f"Failed to extract early performance metrics: {e}")
808+
809+
queue: asyncio.Queue = asyncio.Queue()
810+
811+
async def progress_callback(message: str):
812+
await queue.put(("progress", message))
813+
814+
async def trace_progress_callback(trace_result):
815+
await queue.put(("trace_progress", trace_result))
816+
817+
async def run_with_progress():
818+
result = await run_evaluation_from_traces(
819+
traces=traces,
820+
config=request.config,
821+
eval_set=eval_set,
822+
progress_callback=progress_callback,
823+
trace_progress_callback=trace_progress_callback,
824+
)
825+
await queue.put(("done", result))
826+
827+
eval_task = asyncio.create_task(run_with_progress())
828+
829+
try:
830+
while True:
831+
msg = await queue.get()
832+
tag, payload = msg
833+
834+
if tag == "done":
835+
evt = SSEDoneEvent(
836+
result=_camel_keys(payload.model_dump(by_alias=True)),
837+
)
838+
yield f"data: {evt.model_dump_json(by_alias=True)}\n\n"
839+
break
840+
elif tag == "trace_progress":
841+
evt = SSETraceProgressEvent(
842+
trace_progress=SSETraceProgress(
843+
trace_id=payload.trace_id,
844+
partial_result=_camel_keys(payload.model_dump(by_alias=True)),
845+
)
846+
)
847+
yield f"data: {evt.model_dump_json(by_alias=True)}\n\n"
848+
elif tag == "progress":
849+
evt = SSEProgressEvent(message=payload)
850+
yield f"data: {evt.model_dump_json(by_alias=True)}\n\n"
851+
finally:
852+
if not eval_task.done():
853+
eval_task.cancel()
854+
try:
855+
await eval_task
856+
except asyncio.CancelledError:
857+
pass
858+
859+
except Exception as exc:
860+
logger.exception("JSON evaluation stream failed")
861+
evt = SSEErrorEvent(error=str(exc))
862+
yield f"data: {evt.model_dump_json(by_alias=True)}\n\n"
863+
864+
return StreamingResponse(
865+
event_generator(),
866+
media_type="text/event-stream",
867+
headers={
868+
"Cache-Control": "no-cache",
869+
"Connection": "keep-alive",
870+
},
871+
)

src/agentevals/config.py

Lines changed: 30 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@
55
from pathlib import Path
66
from typing import Annotated, Any, Literal
77

8-
from pydantic import BaseModel, Field, field_validator
8+
from pydantic import BaseModel, ConfigDict, Field, field_validator
9+
from pydantic.alias_generators import to_camel
910

1011

1112
class BuiltinMetricDef(BaseModel):
@@ -99,13 +100,14 @@ def _validate_grader(cls, v: dict[str, Any]) -> dict[str, Any]:
99100
]
100101

101102

102-
class EvalRunConfig(BaseModel):
103-
trace_files: list[str] = Field(description="Paths to trace files (Jaeger JSON or OTLP JSON).")
103+
class EvalParams(BaseModel):
104+
"""Evaluation parameters independent of how traces are provided.
104105
105-
eval_set_file: str | None = Field(
106-
default=None,
107-
description="Path to a golden eval set JSON file (ADK EvalSet format).",
108-
)
106+
Used by ``run_evaluation_from_traces`` for programmatic / API-driven
107+
evaluation. ``EvalRunConfig`` inherits from this and adds file I/O fields.
108+
"""
109+
110+
model_config = ConfigDict(alias_generator=to_camel, populate_by_name=True)
109111

110112
metrics: list[str] = Field(
111113
default_factory=lambda: ["tool_trajectory_avg_score"],
@@ -117,11 +119,6 @@ class EvalRunConfig(BaseModel):
117119
description="Custom evaluator definitions.",
118120
)
119121

120-
trace_format: str = Field(
121-
default="jaeger-json",
122-
description="Format of the trace files (jaeger-json or otlp-json).",
123-
)
124-
125122
judge_model: str | None = Field(
126123
default=None,
127124
description="LLM model for judge-based metrics.",
@@ -145,11 +142,6 @@ def _validate_trajectory_match_type(cls, v: str | None) -> str | None:
145142
raise ValueError(f"Invalid trajectory_match_type '{v}'. Valid values: {sorted(valid)}")
146143
return v.upper() if v is not None else v
147144

148-
output_format: str = Field(
149-
default="table",
150-
description="Output format: 'table', 'json', or 'summary'.",
151-
)
152-
153145
max_concurrent_traces: int = Field(
154146
default=10,
155147
description="Maximum number of traces to evaluate concurrently.",
@@ -159,3 +151,24 @@ def _validate_trajectory_match_type(cls, v: str | None) -> str | None:
159151
default=5,
160152
description="Maximum number of concurrent metric evaluations (LLM API calls).",
161153
)
154+
155+
156+
class EvalRunConfig(EvalParams):
157+
"""Full configuration for file-based evaluation runs."""
158+
159+
trace_files: list[str] = Field(description="Paths to trace files (Jaeger JSON or OTLP JSON).")
160+
161+
eval_set_file: str | None = Field(
162+
default=None,
163+
description="Path to a golden eval set JSON file (ADK EvalSet format).",
164+
)
165+
166+
trace_format: str = Field(
167+
default="jaeger-json",
168+
description="Format of the trace files (jaeger-json or otlp-json).",
169+
)
170+
171+
output_format: str = Field(
172+
default="table",
173+
description="Output format: 'table', 'json', or 'summary'.",
174+
)

src/agentevals/loader/otlp.py

Lines changed: 55 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,12 @@ def load(self, source: str) -> list[Trace]:
5656
logger.info("Loaded %d trace(s) from %s", len(traces), source)
5757
return traces
5858

59+
def load_from_dict(self, data: dict) -> list[Trace]:
60+
"""Load traces from an OTLP JSON dict (resourceSpans structure)."""
61+
if "resourceSpans" not in data:
62+
raise ValueError("Expected OTLP JSON with 'resourceSpans' key")
63+
return self._parse_otlp_export(data)
64+
5965
def _parse_otlp_export(self, data: dict) -> list[Trace]:
6066
"""Parse full OTLP export structure with resourceSpans."""
6167
all_spans = []
@@ -122,23 +128,40 @@ def _promote_genai_event_attributes(self, span_data: dict, attributes: dict) ->
122128
Some SDKs (e.g. Strands) store message content in span events rather
123129
than span attributes. This promotes those values so the converter can
124130
find them via normal attribute lookups.
131+
132+
Accepts events in OTLP array format or flat/nested dict format.
125133
"""
126134
for event in span_data.get("events", []):
127-
for attr in event.get("attributes", []):
128-
key = attr.get("key", "")
129-
if key in self._GENAI_EVENT_KEYS and key not in attributes:
130-
value_obj = attr.get("value", {})
131-
if "stringValue" in value_obj:
132-
attributes[key] = value_obj["stringValue"]
133-
134-
def _extract_attributes(self, attrs_list: list[dict]) -> dict:
135-
"""Convert OTLP attributes array to flat dict.
136-
137-
OTLP attributes are [{key, value: {stringValue|intValue|...}}]
138-
We flatten to {key: value} for easier use.
135+
event_attrs = event.get("attributes", [])
136+
if isinstance(event_attrs, dict):
137+
flat = self._flatten_nested_dict(event_attrs)
138+
for key in self._GENAI_EVENT_KEYS:
139+
if key in flat and key not in attributes:
140+
attributes[key] = flat[key]
141+
else:
142+
for attr in event_attrs:
143+
key = attr.get("key", "")
144+
if key in self._GENAI_EVENT_KEYS and key not in attributes:
145+
value_obj = attr.get("value", {})
146+
if "stringValue" in value_obj:
147+
attributes[key] = value_obj["stringValue"]
148+
149+
def _extract_attributes(self, attrs) -> dict:
150+
"""Convert attributes to a flat ``{key: value}`` dict.
151+
152+
Accepts three formats:
153+
1. OTLP array: ``[{key, value: {stringValue|intValue|...}}]``
154+
2. Flat dict: ``{"gen_ai.operation.name": "chat"}``
155+
3. Nested dict (ClickHouse JSON column): ``{"gen_ai": {"operation": {"name": "chat"}}}``
156+
157+
Formats 2 and 3 are auto-detected by checking whether *attrs* is a dict.
158+
Nested dicts are recursively flattened to dot-notation keys.
139159
"""
160+
if isinstance(attrs, dict):
161+
return self._flatten_nested_dict(attrs)
162+
140163
result = {}
141-
for attr in attrs_list:
164+
for attr in attrs:
142165
key = attr.get("key", "")
143166
value_obj = attr.get("value", {})
144167

@@ -157,6 +180,25 @@ def _extract_attributes(self, attrs_list: list[dict]) -> dict:
157180

158181
return result
159182

183+
@staticmethod
184+
def _flatten_nested_dict(d: dict, prefix: str = "") -> dict:
185+
"""Recursively flatten a nested dict to dot-notation keys.
186+
187+
``{"gen_ai": {"operation": {"name": "chat"}}}``
188+
becomes ``{"gen_ai.operation.name": "chat"}``.
189+
190+
Already-flat keys (e.g. ``{"service.name": "agent"}``) pass through
191+
unchanged.
192+
"""
193+
result = {}
194+
for key, value in d.items():
195+
full_key = f"{prefix}{key}" if not prefix else f"{prefix}.{key}"
196+
if isinstance(value, dict):
197+
result.update(OtlpJsonLoader._flatten_nested_dict(value, full_key))
198+
else:
199+
result[full_key] = value
200+
return result
201+
160202
def _build_traces(self, all_spans: list[Span]) -> list[Trace]:
161203
"""Group spans by trace_id and build parent-child relationships."""
162204
traces_by_id: dict[str, list[Span]] = {}

0 commit comments

Comments
 (0)