Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/agentevals/api/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
from pydantic import BaseModel, ConfigDict, Field
from pydantic.alias_generators import to_camel

from ..config import EvalParams

T = TypeVar("T")


Expand Down Expand Up @@ -134,6 +136,14 @@ class ConvertTracesData(CamelModel):
traces: list[TraceConversionEntry]


class EvaluateJsonRequest(CamelModel):
"""Request body for JSON-based trace evaluation (``POST /evaluate/json``)."""

traces: dict = Field(description="OTLP JSON export with resourceSpans structure.")
config: EvalParams = Field(default_factory=EvalParams, description="Evaluation parameters.")
eval_set: dict | None = Field(default=None, description="Optional ADK EvalSet JSON.")

Comment thread
krisztianfekete marked this conversation as resolved.

# ---------------------------------------------------------------------------
# SSE evaluation event models
# ---------------------------------------------------------------------------
Expand Down
142 changes: 141 additions & 1 deletion src/agentevals/api/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,22 @@
)
from ..converter import convert_traces
from ..extraction import get_extractor
from ..runner import RunResult, get_loader, load_eval_set, run_evaluation
from ..loader.otlp import OtlpJsonLoader
from ..runner import (
RunResult,
get_loader,
load_eval_set,
load_eval_set_from_dict,
run_evaluation,
run_evaluation_from_traces,
)
from ..trace_metrics import extract_performance_metrics, extract_trace_metadata
from .models import (
ApiKeyStatus,
ConfigData,
ConvertTracesData,
EvalSetValidation,
EvaluateJsonRequest,
HealthData,
MetricInfo,
SSEDoneEvent,
Expand Down Expand Up @@ -729,3 +738,134 @@ async def run_with_progress():
"Connection": "keep-alive",
},
)


@router.post("/evaluate/json", response_model=StandardResponse[RunResult])
async def evaluate_traces_json(request: EvaluateJsonRequest):
"""Evaluate OTLP JSON traces passed in the request body."""
try:
Comment thread
krisztianfekete marked this conversation as resolved.
traces = OtlpJsonLoader().load_from_dict(request.traces)
except ValueError as exc:
raise HTTPException(status_code=400, detail=str(exc)) from exc

if not traces:
raise HTTPException(status_code=400, detail="No traces found in OTLP JSON")
Comment thread
krisztianfekete marked this conversation as resolved.
Outdated

eval_set = None
if request.eval_set:
Comment thread
peterj marked this conversation as resolved.
try:
eval_set = load_eval_set_from_dict(request.eval_set)
except Exception as exc:
raise HTTPException(status_code=400, detail=f"Invalid eval set: {exc}") from exc

try:
result = await run_evaluation_from_traces(
traces=traces, config=request.config, eval_set=eval_set,
)
return StandardResponse(data=_camel_keys(result.model_dump(by_alias=True)))
except Exception as exc:
logger.exception("JSON evaluation failed")
raise HTTPException(status_code=500, detail=f"Internal error: {exc!s}") from exc


@router.post("/evaluate/json/stream")
async def evaluate_traces_json_stream(request: EvaluateJsonRequest):
Comment thread
krisztianfekete marked this conversation as resolved.
Outdated
"""Evaluate OTLP JSON traces with real-time progress via SSE."""

async def event_generator():
try:
try:
traces = OtlpJsonLoader().load_from_dict(request.traces)
except ValueError as exc:
yield f"data: {SSEErrorEvent(error=str(exc)).model_dump_json(by_alias=True)}\n\n"
Comment thread
krisztianfekete marked this conversation as resolved.
Outdated
return

if not traces:
yield f"data: {SSEErrorEvent(error='No traces found in OTLP JSON').model_dump_json(by_alias=True)}\n\n"
return

eval_set = None
if request.eval_set:
try:
eval_set = load_eval_set_from_dict(request.eval_set)
except Exception as exc:
yield f"data: {SSEErrorEvent(error=f'Invalid eval set: {exc}').model_dump_json(by_alias=True)}\n\n"
return

for trace in traces:
try:
extractor = get_extractor(trace)
perf_metrics = _camel_keys(extract_performance_metrics(trace, extractor))
trace_metadata = _camel_keys(extract_trace_metadata(trace, extractor))
evt = SSEPerformanceMetricsEvent(
trace_id=trace.trace_id,
performance_metrics=perf_metrics,
trace_metadata=trace_metadata,
)
yield f"event: performance_metrics\ndata: {evt.model_dump_json(by_alias=True)}\n\n"
except Exception as e:
logger.error(f"Failed to extract early performance metrics: {e}")
Comment thread
peterj marked this conversation as resolved.

queue: asyncio.Queue = asyncio.Queue()

async def progress_callback(message: str):
await queue.put(("progress", message))

async def trace_progress_callback(trace_result):
await queue.put(("trace_progress", trace_result))

async def run_with_progress():
result = await run_evaluation_from_traces(
traces=traces,
config=request.config,
eval_set=eval_set,
progress_callback=progress_callback,
trace_progress_callback=trace_progress_callback,
)
await queue.put(("done", result))

eval_task = asyncio.create_task(run_with_progress())

try:
while True:
msg = await queue.get()
tag, payload = msg

if tag == "done":
evt = SSEDoneEvent(
result=_camel_keys(payload.model_dump(by_alias=True)),
)
yield f"data: {evt.model_dump_json(by_alias=True)}\n\n"
Comment thread
krisztianfekete marked this conversation as resolved.
break
elif tag == "trace_progress":
evt = SSETraceProgressEvent(
trace_progress=SSETraceProgress(
trace_id=payload.trace_id,
partial_result=_camel_keys(payload.model_dump(by_alias=True)),
)
)
yield f"data: {evt.model_dump_json(by_alias=True)}\n\n"
elif tag == "progress":
evt = SSEProgressEvent(message=payload)
yield f"data: {evt.model_dump_json(by_alias=True)}\n\n"
finally:
if not eval_task.done():
eval_task.cancel()
try:
await eval_task
except asyncio.CancelledError:
pass

except Exception as exc:
logger.exception("JSON evaluation stream failed")
evt = SSEErrorEvent(error=str(exc))
yield f"data: {evt.model_dump_json(by_alias=True)}\n\n"

return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
},
)
47 changes: 30 additions & 17 deletions src/agentevals/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
from pathlib import Path
from typing import Annotated, Any, Literal

from pydantic import BaseModel, Field, field_validator
from pydantic import BaseModel, ConfigDict, Field, field_validator
from pydantic.alias_generators import to_camel


class BuiltinMetricDef(BaseModel):
Expand Down Expand Up @@ -99,13 +100,14 @@ def _validate_grader(cls, v: dict[str, Any]) -> dict[str, Any]:
]


class EvalRunConfig(BaseModel):
trace_files: list[str] = Field(description="Paths to trace files (Jaeger JSON or OTLP JSON).")
class EvalParams(BaseModel):
"""Evaluation parameters independent of how traces are provided.

eval_set_file: str | None = Field(
default=None,
description="Path to a golden eval set JSON file (ADK EvalSet format).",
)
Used by ``run_evaluation_from_traces`` for programmatic / API-driven
evaluation. ``EvalRunConfig`` inherits from this and adds file I/O fields.
"""

model_config = ConfigDict(alias_generator=to_camel, populate_by_name=True)

metrics: list[str] = Field(
default_factory=lambda: ["tool_trajectory_avg_score"],
Expand All @@ -117,11 +119,6 @@ class EvalRunConfig(BaseModel):
description="Custom evaluator definitions.",
)

trace_format: str = Field(
default="jaeger-json",
description="Format of the trace files (jaeger-json or otlp-json).",
)

judge_model: str | None = Field(
default=None,
description="LLM model for judge-based metrics.",
Expand All @@ -145,11 +142,6 @@ def _validate_trajectory_match_type(cls, v: str | None) -> str | None:
raise ValueError(f"Invalid trajectory_match_type '{v}'. Valid values: {sorted(valid)}")
return v.upper() if v is not None else v

output_format: str = Field(
default="table",
description="Output format: 'table', 'json', or 'summary'.",
)

max_concurrent_traces: int = Field(
default=10,
description="Maximum number of traces to evaluate concurrently.",
Expand All @@ -159,3 +151,24 @@ def _validate_trajectory_match_type(cls, v: str | None) -> str | None:
default=5,
description="Maximum number of concurrent metric evaluations (LLM API calls).",
)
Comment thread
krisztianfekete marked this conversation as resolved.


class EvalRunConfig(EvalParams):
"""Full configuration for file-based evaluation runs."""

trace_files: list[str] = Field(description="Paths to trace files (Jaeger JSON or OTLP JSON).")

eval_set_file: str | None = Field(
default=None,
description="Path to a golden eval set JSON file (ADK EvalSet format).",
)

trace_format: str = Field(
default="jaeger-json",
description="Format of the trace files (jaeger-json or otlp-json).",
)

output_format: str = Field(
default="table",
description="Output format: 'table', 'json', or 'summary'.",
)
68 changes: 55 additions & 13 deletions src/agentevals/loader/otlp.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ def load(self, source: str) -> list[Trace]:
logger.info("Loaded %d trace(s) from %s", len(traces), source)
return traces

def load_from_dict(self, data: dict) -> list[Trace]:
"""Load traces from an OTLP JSON dict (resourceSpans structure)."""
if "resourceSpans" not in data:
raise ValueError("Expected OTLP JSON with 'resourceSpans' key")
return self._parse_otlp_export(data)

def _parse_otlp_export(self, data: dict) -> list[Trace]:
"""Parse full OTLP export structure with resourceSpans."""
all_spans = []
Expand Down Expand Up @@ -122,23 +128,40 @@ def _promote_genai_event_attributes(self, span_data: dict, attributes: dict) ->
Some SDKs (e.g. Strands) store message content in span events rather
than span attributes. This promotes those values so the converter can
find them via normal attribute lookups.

Accepts events in OTLP array format or flat/nested dict format.
"""
for event in span_data.get("events", []):
for attr in event.get("attributes", []):
key = attr.get("key", "")
if key in self._GENAI_EVENT_KEYS and key not in attributes:
value_obj = attr.get("value", {})
if "stringValue" in value_obj:
attributes[key] = value_obj["stringValue"]

def _extract_attributes(self, attrs_list: list[dict]) -> dict:
"""Convert OTLP attributes array to flat dict.

OTLP attributes are [{key, value: {stringValue|intValue|...}}]
We flatten to {key: value} for easier use.
event_attrs = event.get("attributes", [])
if isinstance(event_attrs, dict):
flat = self._flatten_nested_dict(event_attrs)
for key in self._GENAI_EVENT_KEYS:
if key in flat and key not in attributes:
attributes[key] = flat[key]
else:
for attr in event_attrs:
key = attr.get("key", "")
if key in self._GENAI_EVENT_KEYS and key not in attributes:
value_obj = attr.get("value", {})
if "stringValue" in value_obj:
attributes[key] = value_obj["stringValue"]

def _extract_attributes(self, attrs) -> dict:
"""Convert attributes to a flat ``{key: value}`` dict.

Accepts three formats:
1. OTLP array: ``[{key, value: {stringValue|intValue|...}}]``
2. Flat dict: ``{"gen_ai.operation.name": "chat"}``
3. Nested dict (ClickHouse JSON column): ``{"gen_ai": {"operation": {"name": "chat"}}}``

Formats 2 and 3 are auto-detected by checking whether *attrs* is a dict.
Nested dicts are recursively flattened to dot-notation keys.
"""
if isinstance(attrs, dict):
return self._flatten_nested_dict(attrs)

result = {}
Comment thread
krisztianfekete marked this conversation as resolved.
for attr in attrs_list:
for attr in attrs:
key = attr.get("key", "")
value_obj = attr.get("value", {})

Expand All @@ -157,6 +180,25 @@ def _extract_attributes(self, attrs_list: list[dict]) -> dict:

return result

@staticmethod
def _flatten_nested_dict(d: dict, prefix: str = "") -> dict:
"""Recursively flatten a nested dict to dot-notation keys.

``{"gen_ai": {"operation": {"name": "chat"}}}``
becomes ``{"gen_ai.operation.name": "chat"}``.

Already-flat keys (e.g. ``{"service.name": "agent"}``) pass through
unchanged.
"""
result = {}
for key, value in d.items():
full_key = f"{prefix}{key}" if not prefix else f"{prefix}.{key}"
if isinstance(value, dict):
result.update(OtlpJsonLoader._flatten_nested_dict(value, full_key))
else:
result[full_key] = value
return result

def _build_traces(self, all_spans: list[Span]) -> list[Trace]:
"""Group spans by trace_id and build parent-child relationships."""
traces_by_id: dict[str, list[Span]] = {}
Expand Down
Loading
Loading