Skip to content

Commit 5ee5122

Browse files
cleanup and simplify
1 parent 5c6d499 commit 5ee5122

12 files changed

Lines changed: 218 additions & 247 deletions

File tree

src/agentevals/api/routes.py

Lines changed: 11 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -95,39 +95,22 @@ async def _maybe_persist_evaluate_run(
9595
upload_filenames: list[str] | None,
9696
run_result: "RunResult",
9797
) -> str | None:
98-
"""Persist a synchronously-completed eval as a Run + Result rows when
99-
``app.state.run_service`` is configured (i.e. ``backend=postgres``).
100-
101-
Returns the synthesized ``run_id`` so the caller can attach it to the
102-
response (UI / SSE clients can then ``GET /api/runs/{id}/results`` to
103-
pull historical context). Returns None on the memory backend so callers
104-
keep their existing zero-config behavior. Errors are logged but never
105-
propagated; if persistence fails the eval result is still returned to
106-
the caller.
107-
"""
98+
"""Best-effort: persist the just-completed eval as a Run row + Result
99+
rows when ``app.state.run_service`` is configured (postgres backend).
100+
Returns the synthesized ``run_id`` for inclusion in the response, or
101+
``None`` on the memory backend or on persistence failure (eval result
102+
is still returned to the caller in that case)."""
108103
service = getattr(request.app.state, "run_service", None)
109104
if service is None:
110105
return None
111106
try:
112-
from ..run.service import RunService
113-
from ..storage.models import RunSpec, TraceTarget
114-
115-
filenames = list(upload_filenames or [])
116-
target = TraceTarget(
117-
kind="uploaded",
118-
trace_format=trace_format if trace_format in ("jaeger-json", "otlp-json") else None,
119-
trace_count=len(filenames),
120-
trace_files=filenames,
121-
)
122-
spec_payload = params.model_dump(by_alias=False)
123-
spec = RunSpec(
124-
approach="trace_replay",
125-
target=target,
126-
eval_config=spec_payload,
127-
eval_set=eval_set_dict,
107+
run = await service.record_eval_run(
108+
params=params,
109+
eval_set_dict=eval_set_dict,
110+
trace_format=trace_format,
111+
upload_filenames=upload_filenames,
112+
run_result=run_result,
128113
)
129-
assert isinstance(service, RunService)
130-
run = await service.record_completed_eval(spec=spec, params=params, run_result=run_result)
131114
return str(run.run_id)
132115
except Exception:
133116
logger.exception("failed to persist /api/evaluate run; eval result still returned to caller")

src/agentevals/api/runs_routes.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -103,12 +103,11 @@ async def list_run_results(run_id: UUID, request: Request):
103103

104104
@runs_router.post("/runs/{run_id}/cancel", response_model=StandardResponse[Run])
105105
async def cancel_run(run_id: UUID, request: Request):
106+
"""Mark a run cancel-requested. Idempotent: cancelling an already-terminal
107+
run is a no-op and the current state is returned to the caller."""
106108
service = _service(request)
107-
cancelled = await service.cancel(run_id)
109+
await service.cancel(run_id)
108110
run = await service.get(run_id)
109111
if run is None:
110112
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"run {run_id} not found")
111-
if not cancelled and run.status not in (RunStatus.QUEUED, RunStatus.RUNNING):
112-
# Already terminal; surface that to the caller without an error.
113-
return StandardResponse(data=run)
114113
return StandardResponse(data=run)

src/agentevals/run/result_builder.py

Lines changed: 48 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@
1313
from uuid import UUID
1414

1515
from ..config import EvalParams
16-
from ..runner import RunResult
17-
from ..storage.models import Result
16+
from ..runner import MetricResult, RunResult
17+
from ..storage.models import Result, ResultStatus, compute_result_id
1818

1919
EvaluatorType = Literal["builtin", "code", "remote", "openai_eval"]
2020

@@ -29,6 +29,50 @@ def classify_evaluator(metric_name: str, params: EvalParams) -> EvaluatorType:
2929
return "builtin"
3030

3131

32+
def result_from_metric_result(
33+
*,
34+
run_id: UUID,
35+
eval_set_item_id: str,
36+
eval_set_item_name: str,
37+
trace_id: str | None,
38+
evaluator_type: EvaluatorType,
39+
metric_result: MetricResult,
40+
) -> Result:
41+
"""Project an ADK :class:`MetricResult` onto a persistable :class:`Result`.
42+
43+
The status mapping treats a non-empty ``error`` field as ``ERRORED`` even
44+
when ``eval_status`` would have been ``PASSED`` / ``FAILED``, so
45+
downstream consumers can filter on status alone without special-casing
46+
the error column.
47+
"""
48+
if metric_result.error:
49+
status = ResultStatus.ERRORED
50+
else:
51+
raw = (metric_result.eval_status or "NOT_EVALUATED").upper()
52+
status = {
53+
"PASSED": ResultStatus.PASSED,
54+
"FAILED": ResultStatus.FAILED,
55+
}.get(raw, ResultStatus.SKIPPED)
56+
57+
latency_ms = int(metric_result.duration_ms) if metric_result.duration_ms is not None else None
58+
59+
return Result(
60+
result_id=compute_result_id(run_id, eval_set_item_id, metric_result.metric_name),
61+
run_id=run_id,
62+
eval_set_item_id=eval_set_item_id,
63+
eval_set_item_name=eval_set_item_name,
64+
evaluator_name=metric_result.metric_name,
65+
evaluator_type=evaluator_type,
66+
status=status,
67+
score=metric_result.score,
68+
per_invocation_scores=list(metric_result.per_invocation_scores or []),
69+
trace_id=trace_id,
70+
details=metric_result.details or {},
71+
error_text=metric_result.error,
72+
latency_ms=latency_ms,
73+
)
74+
75+
3276
def build_results(run_id: UUID, params: EvalParams, run_result: RunResult) -> list[Result]:
3377
"""Flatten ``run_result.trace_results[*].metric_results[*]`` into a list
3478
of persistable :class:`Result` rows.
@@ -41,13 +85,12 @@ def build_results(run_id: UUID, params: EvalParams, run_result: RunResult) -> li
4185
out: list[Result] = []
4286
for trace_result in run_result.trace_results:
4387
item_id = trace_result.trace_id
44-
item_name = trace_result.trace_id
4588
for mr in trace_result.metric_results:
4689
out.append(
47-
Result.from_metric_result(
90+
result_from_metric_result(
4891
run_id=run_id,
4992
eval_set_item_id=item_id,
50-
eval_set_item_name=item_name,
93+
eval_set_item_name=item_id,
5194
trace_id=trace_result.trace_id,
5295
evaluator_type=classify_evaluator(mr.metric_name, params),
5396
metric_result=mr,

src/agentevals/run/service.py

Lines changed: 36 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,28 @@
1-
"""Synchronous control surface used by ``/api/runs`` HTTP handlers.
1+
"""Synchronous control surface used by ``/api/runs`` and ``/api/evaluate``.
22
33
Wraps the :class:`agentevals.storage.repos.RunRepository` with submit
4-
idempotency, list pagination, and the 409 spec-mismatch path.
5-
6-
Also provides :meth:`RunService.record_completed_eval` for the
7-
``/api/evaluate`` path: that handler executes synchronously (the trace was
8-
already supplied as multipart and the result is being streamed back over
9-
SSE), so we synthesize a Run row for visibility in run history rather than
10-
queueing work for the worker.
4+
idempotency, list pagination, and the 409 spec-mismatch path. Also exposes
5+
:meth:`RunService.record_eval_run` for the ``/api/evaluate`` path, which
6+
executes synchronously and synthesizes a Run row for visibility in run
7+
history rather than queueing work for the worker.
118
"""
129

1310
from __future__ import annotations
1411

15-
import json
1612
import logging
1713
from datetime import datetime, timezone
14+
from typing import Any
1815
from uuid import UUID, uuid4
1916

2017
from ..config import EvalParams
2118
from ..runner import RunResult
22-
from ..storage.models import Run, RunSpec, RunStatus
19+
from ..storage.models import Run, RunSpec, RunStatus, TraceTarget
2320
from ..storage.repos import ResultRepository, RunRepository
2421
from .result_builder import build_results, summarize_run_result
2522

2623
logger = logging.getLogger(__name__)
2724

2825

29-
def _now() -> datetime:
30-
return datetime.now(timezone.utc)
31-
32-
3326
class RunSubmitConflict(Exception):
3427
"""Raised when a re-submission's spec differs from the persisted one.
3528
@@ -54,7 +47,7 @@ async def submit(self, *, run_id: UUID | None, spec: RunSpec) -> Run:
5447
spec=spec,
5548
)
5649
persisted = await self._runs.create(run)
57-
if persisted.run_id == run.run_id and not _specs_equal(persisted.spec, spec):
50+
if persisted.run_id == run.run_id and persisted.spec != spec:
5851
raise RunSubmitConflict(persisted)
5952
return persisted
6053

@@ -76,30 +69,46 @@ async def list_results(self, run_id: UUID):
7669
async def cancel(self, run_id: UUID) -> bool:
7770
return await self._runs.cancel(run_id)
7871

79-
async def record_completed_eval(
72+
async def record_eval_run(
8073
self,
8174
*,
82-
spec: RunSpec,
8375
params: EvalParams,
76+
eval_set_dict: dict[str, Any] | None,
77+
trace_format: str | None,
78+
upload_filenames: list[str] | None,
8479
run_result: RunResult,
8580
) -> Run:
86-
"""Persist a synchronously-completed eval as a Run row plus Result rows.
87-
88-
The run is created already in ``running`` state (so the row passes the
89-
``run_running_has_worker`` check is sidestepped via a synthetic worker
90-
id), then transitioned to a terminal state in the same call. Two
91-
writes per eval, but using the public :class:`RunRepository` API
92-
avoids leaking an executor-only schema requirement into this layer.
81+
"""Persist a synchronously-completed ``/api/evaluate`` call as a Run
82+
row plus Result rows.
83+
84+
Builds an ``uploaded`` :class:`TraceTarget` from the request metadata,
85+
creates a queued run, persists results, then transitions the run to
86+
a terminal status. Two writes (create + update_status), but the
87+
public :class:`RunRepository` API stays clean of executor-only
88+
schema knowledge.
9389
"""
90+
filenames = list(upload_filenames or [])
91+
target = TraceTarget(
92+
kind="uploaded",
93+
trace_format=trace_format if trace_format in ("jaeger-json", "otlp-json") else None,
94+
trace_count=len(filenames),
95+
trace_files=filenames,
96+
)
97+
spec = RunSpec(
98+
approach="trace_replay",
99+
target=target,
100+
eval_config=params.model_dump(by_alias=False),
101+
eval_set=eval_set_dict,
102+
)
103+
94104
run_id = uuid4()
95-
worker_id = "sync:/api/evaluate"
96105
run = Run(
97106
run_id=run_id,
98107
status=RunStatus.QUEUED,
99108
spec=spec,
100109
attempt=1,
101-
worker_id=worker_id,
102-
started_at=_now(),
110+
worker_id="sync:/api/evaluate",
111+
started_at=datetime.now(timezone.utc),
103112
)
104113
await self._runs.create(run)
105114

@@ -117,11 +126,3 @@ async def record_completed_eval(
117126
run.status = RunStatus.SUCCEEDED
118127
run.summary = summary
119128
return run
120-
121-
122-
def _specs_equal(a: RunSpec, b: RunSpec) -> bool:
123-
"""Deep equality on the JSON projection. Pydantic equality compares model
124-
instances by class identity, which trips up the round-trip from JSONB."""
125-
return json.dumps(a.model_dump(by_alias=False), sort_keys=True) == json.dumps(
126-
b.model_dump(by_alias=False), sort_keys=True
127-
)

src/agentevals/run/worker.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
import asyncio
1616
import logging
1717
import socket
18-
from datetime import datetime, timedelta, timezone
18+
from datetime import timedelta
1919
from uuid import UUID
2020

2121
from google.adk.evaluation.eval_set import EvalSet
@@ -32,10 +32,6 @@
3232
logger = logging.getLogger(__name__)
3333

3434

35-
def _now() -> datetime:
36-
return datetime.now(timezone.utc)
37-
38-
3935
class _CancelledByRequest(Exception):
4036
"""Raised inside the worker task when the heartbeat observes cancel_requested."""
4137

src/agentevals/storage/models.py

Lines changed: 0 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -121,49 +121,3 @@ class Result(BaseModel):
121121
tokens_used: dict[str, Any] | None = None
122122
latency_ms: int | None = None
123123
created_at: datetime = Field(default_factory=_now)
124-
125-
@classmethod
126-
def from_metric_result(
127-
cls,
128-
*,
129-
run_id: UUID,
130-
eval_set_item_id: str,
131-
eval_set_item_name: str,
132-
trace_id: str | None,
133-
evaluator_type: Literal["builtin", "code", "remote", "openai_eval"],
134-
metric_result: Any,
135-
) -> Result:
136-
"""Project an in-pipeline MetricResult onto the persisted shape.
137-
138-
ADK emits ``eval_status`` strings ``PASSED`` / ``FAILED`` /
139-
``NOT_EVALUATED``; we additionally map presence of ``error`` to
140-
``errored`` so downstream consumers don't have to special-case
141-
evaluator failures.
142-
"""
143-
if metric_result.error:
144-
status = ResultStatus.ERRORED
145-
else:
146-
raw = (metric_result.eval_status or "NOT_EVALUATED").upper()
147-
status = {
148-
"PASSED": ResultStatus.PASSED,
149-
"FAILED": ResultStatus.FAILED,
150-
}.get(raw, ResultStatus.SKIPPED)
151-
152-
scores: list[float | None] = list(metric_result.per_invocation_scores or [])
153-
latency_ms = int(metric_result.duration_ms) if metric_result.duration_ms is not None else None
154-
155-
return cls(
156-
result_id=compute_result_id(run_id, eval_set_item_id, metric_result.metric_name),
157-
run_id=run_id,
158-
eval_set_item_id=eval_set_item_id,
159-
eval_set_item_name=eval_set_item_name,
160-
evaluator_name=metric_result.metric_name,
161-
evaluator_type=evaluator_type,
162-
status=status,
163-
score=metric_result.score,
164-
per_invocation_scores=scores,
165-
trace_id=trace_id,
166-
details=metric_result.details or {},
167-
error_text=metric_result.error,
168-
latency_ms=latency_ms,
169-
)

src/agentevals/storage/repos/memory.py

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,6 @@
2020
from ...streaming.session import TraceSession
2121

2222

23-
def _now() -> datetime:
24-
return datetime.now(timezone.utc)
25-
26-
2723
class MemorySessionRepository:
2824
def __init__(self) -> None:
2925
self._sessions: dict[str, TraceSession] = {}
@@ -87,7 +83,7 @@ async def list(
8783
return runs[:limit]
8884

8985
async def claim_next(self, *, worker_id: str, lease: timedelta, max_attempts: int) -> Run | None:
90-
now = _now()
86+
now = datetime.now(timezone.utc)
9187
async with self._lock:
9288
candidates = [r for r in self._runs.values() if r.status == RunStatus.QUEUED and r.attempt < max_attempts]
9389
candidates.sort(key=lambda r: r.created_at)
@@ -125,7 +121,7 @@ async def update_status(
125121
if summary is not None:
126122
run.summary = summary
127123
if status in (RunStatus.SUCCEEDED, RunStatus.FAILED, RunStatus.CANCELLED):
128-
run.finished_at = _now()
124+
run.finished_at = datetime.now(timezone.utc)
129125

130126
async def cancel(self, run_id: UUID) -> bool:
131127
async with self._lock:
@@ -135,7 +131,7 @@ async def cancel(self, run_id: UUID) -> bool:
135131
run.cancel_requested = True
136132
if run.status == RunStatus.QUEUED:
137133
run.status = RunStatus.CANCELLED
138-
run.finished_at = _now()
134+
run.finished_at = datetime.now(timezone.utc)
139135
return True
140136

141137

src/agentevals/storage/repos/postgres.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
import json
1212
import logging
13-
from datetime import datetime, timedelta, timezone
13+
from datetime import datetime, timedelta
1414
from typing import TYPE_CHECKING
1515
from uuid import UUID
1616

@@ -25,10 +25,6 @@
2525
logger = logging.getLogger(__name__)
2626

2727

28-
def _now() -> datetime:
29-
return datetime.now(timezone.utc)
30-
31-
3228
def _row_to_session(row: "asyncpg.Record") -> "TraceSession":
3329
from ...streaming.session import TraceSession
3430

0 commit comments

Comments
 (0)