Skip to content

Commit bec9643

Browse files
committed
implemnt plugin model for ResultSink
Signed-off-by: Peter Jausovec <[email protected]>
1 parent f12a891 commit bec9643

9 files changed

Lines changed: 470 additions & 28 deletions

File tree

examples/README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,12 @@ The zero-code and SDK examples implement the same toy agent (dice rolling + prim
119119
|---------|-------------|
120120
| [kubernetes/](./kubernetes/) | Deploy agentevals with kagent on Kubernetes using native OTLP gRPC ingestion (or optionally an OTel Collector). Includes a walkthrough for comparing two kagent agents (different models) and evaluating them with tool trajectory and response match scores. |
121121

122+
## Custom result sinks
123+
124+
Plugins can deliver run results (partial metrics, final summary, errors) to arbitrary backends alongside the database. Install a package that declares `[project.entry-points."agentevals.sinks"]`, restart agentevals, then reference the plugin’s `kind` in `spec.sinks` on `POST /api/runs`.
125+
126+
See [custom_sink/README.md](./custom_sink/README.md) for a minimal setuptools plugin and configuration examples.
127+
122128
## Advanced: GenAI Semantic Convention Patterns
123129

124130
> [!TIP]

examples/custom_sink/README.md

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
# Custom result sink plugin
2+
3+
This folder is a tiny installable Python package that registers a result **sink** with agentevals via setuptools **entry points**. The worker fans out partial/final/error events to every configured sink in addition to the database.
4+
5+
## What gets implemented
6+
7+
- **`DemoNdjsonSink`** — subclasses `ResultSink` from `agentevals.run.sinks` and appends one JSON object per line to `path` from the run spec (same pattern as the built-in `file` sink, with a `"demo": true` marker on each line).
8+
- **`create_demo_sink(spec)`** — factory callable; must accept the full sink dict from the run spec and return a `ResultSink` (see return type in code).
9+
10+
The entry point **name** (`demo_ndjson` in `pyproject.toml`) is the **`kind`** string clients put under `spec.sinks`.
11+
12+
## Install (local dev)
13+
14+
From the agentevals repo root, install the framework first, then this example:
15+
16+
```bash
17+
uv pip install -e .
18+
uv pip install -e examples/custom_sink
19+
```
20+
21+
Restart the agentevals process so `importlib.metadata` picks up the new distribution.
22+
23+
PyPI-style usage is the same: depend on `agentevals-example-custom-sink` next to `agentevals-cli`, install both into the server environment, restart.
24+
25+
## Configure runs
26+
27+
Async runs are submitted with **`POST /api/runs`**. Put your sink in **`spec.sinks`** (requires Postgres storage — see main docs).
28+
29+
Example body (use **absolute** `path` on the host where the agentevals process runs when possible). **`path` must be a file path** (e.g. `/tmp/demo.ndjson`). If `path` is an **existing directory** (including `"."` for the process working directory), output goes to `<path>/agentevals-demo-sink.ndjson`, or `<path>/<filename>` if you add an optional `"filename"` field next to `path` in the sink dict.
30+
31+
The `inline` object must contain real trace data (Jaeger JSON or OTLP), not an empty object.
32+
33+
```json
34+
{
35+
"spec": {
36+
"approach": "trace_replay",
37+
"target": {
38+
"kind": "inline",
39+
"traceFormat": "jaeger-json",
40+
"inline": {
41+
"data": [
42+
{
43+
"traceID": "61646461646164646164616461646164",
44+
"spans": [
45+
{
46+
"traceID": "61646461646164646164616461646164",
47+
"spanID": "6164616461646164",
48+
"operationName": "demo-op",
49+
"startTime": 1000000,
50+
"duration": 100000,
51+
"tags": [],
52+
"logs": [],
53+
"references": [],
54+
"processID": "p1"
55+
}
56+
],
57+
"processes": { "p1": { "serviceName": "demo" } }
58+
}
59+
]
60+
}
61+
},
62+
"sinks": [{ "kind": "demo_ndjson", "path": "/tmp/agentevals-demo.ndjson" }]
63+
}
64+
}
65+
```
66+
67+
You can list several sinks; they run in parallel. Built-in kinds are `stdout`, `file`, and `http_webhook`.
68+
69+
## Publishing your own sink
70+
71+
1. Implement `ResultSink` from `agentevals.run.sinks` (subclass the protocol, or provide the three async methods).
72+
2. Expose a factory `def create_*(spec: dict) -> ResultSink`.
73+
3. Add the following to your `pyproject.toml`:
74+
75+
```toml
76+
[project.entry-points."agentevals.sinks"]
77+
your_kind = "your_package.module:your_factory"
78+
```
79+
80+
4. Install the package into the **same environment** as `agentevals serve`, restart, and reference `"kind": "your_kind"` in `spec.sinks`.
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
"""Example result sink plugin for agentevals (see README)."""
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
"""Minimal NDJSON sink registered via setuptools entry points."""
2+
3+
from __future__ import annotations
4+
5+
import asyncio
6+
import json
7+
from pathlib import Path
8+
from typing import Any
9+
from uuid import UUID
10+
11+
from agentevals.run.sinks import ResultSink
12+
from agentevals.storage.models import Result
13+
14+
15+
def _result_payload(r: Result) -> dict:
16+
return r.model_dump(mode="json", by_alias=True)
17+
18+
19+
_DEFAULT_FILENAME = "agentevals-demo-sink.ndjson"
20+
21+
22+
def _resolve_output_file(spec: dict[str, Any]) -> Path:
23+
"""If ``path`` is an existing directory (including ``.``), write NDJSON inside it."""
24+
p = Path(spec["path"]).expanduser()
25+
if p.exists() and p.is_dir():
26+
name = spec.get("filename") or _DEFAULT_FILENAME
27+
return p / name
28+
return p
29+
30+
31+
class DemoNdjsonSink(ResultSink):
32+
"""Concrete :class:`~agentevals.run.sinks.ResultSink`; append-only JSON lines with a ``demo`` marker."""
33+
34+
def __init__(self, path: Path) -> None:
35+
self._path = path
36+
self._lock = asyncio.Lock()
37+
38+
async def _write(self, payload: dict) -> None:
39+
async with self._lock:
40+
self._path.parent.mkdir(parents=True, exist_ok=True)
41+
with self._path.open("a") as f: # noqa: ASYNC230
42+
f.write(json.dumps(payload) + "\n")
43+
44+
async def emit_partial(self, run_id: UUID, results: list[Result], attempt: int) -> None:
45+
for r in results:
46+
await self._write(
47+
{
48+
"phase": "partial",
49+
"run_id": str(run_id),
50+
"attempt": attempt,
51+
"demo": True,
52+
"result": _result_payload(r),
53+
}
54+
)
55+
56+
async def emit_final(self, run_id: UUID, summary: dict, attempt: int) -> None:
57+
await self._write(
58+
{"phase": "final", "run_id": str(run_id), "attempt": attempt, "demo": True, "summary": summary}
59+
)
60+
61+
async def emit_error(self, run_id: UUID, error: str, attempt: int) -> None:
62+
await self._write({"phase": "error", "run_id": str(run_id), "attempt": attempt, "demo": True, "error": error})
63+
64+
65+
def create_demo_sink(spec: dict[str, Any]) -> ResultSink:
66+
"""Entry-point factory: returns a :class:`ResultSink`; ``kind`` must be ``demo_ndjson`` (see pyproject).
67+
68+
``path`` should normally be a **file** path. If it points at an existing directory (e.g. ``.`` or ``/tmp``),
69+
lines are appended to ``<path>/agentevals-demo-sink.ndjson``, or ``<path>/<filename>`` if ``filename`` is set.
70+
"""
71+
return DemoNdjsonSink(_resolve_output_file(spec))
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
[build-system]
2+
requires = ["hatchling"]
3+
build-backend = "hatchling.build"
4+
5+
[project]
6+
name = "agentevals-example-custom-sink"
7+
version = "0.1.0"
8+
description = "Example setuptools plugin that registers an agentevals result sink"
9+
readme = "README.md"
10+
requires-python = ">=3.11"
11+
dependencies = [
12+
"agentevals-cli>=0.7.0",
13+
]
14+
15+
[project.entry-points."agentevals.sinks"]
16+
demo_ndjson = "agentevals_example_custom_sink.sink:create_demo_sink"
17+
18+
[tool.hatch.build.targets.wheel]
19+
packages = ["agentevals_example_custom_sink"]

src/agentevals/api/app.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from agentevals import __version__
1818

1919
from ..run.service import RunService
20+
from ..run.sinks import log_registered_sinks
2021
from ..run.worker import AsyncRunWorker
2122
from ..storage import StorageSettings, build_repos
2223
from ..storage.postgres.migrator import Migrator
@@ -83,6 +84,7 @@ async def lifespan(app: FastAPI):
8384
worker = AsyncRunWorker(runs=repos.runs, results=repos.results, settings=storage_settings)
8485
await worker.start()
8586
app.state.run_worker = worker
87+
log_registered_sinks()
8688

8789
yield
8890

src/agentevals/run/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
33
Contents:
44
- :mod:`fetcher` resolves a run spec's ``target`` into a list of traces.
5-
- :mod:`sinks` fan-out result delivery (stdout, file, http_webhook).
5+
- :mod:`sinks` fan-out result delivery (built-ins plus setuptools plugins / :func:`~agentevals.run.sinks.register_sink_factory`).
66
- :mod:`service` is the synchronous control surface used by HTTP handlers.
77
- :mod:`worker` is the in-process loop that claims runs and drives the
88
existing :func:`agentevals.runner.run_evaluation_from_traces` pipeline.

src/agentevals/run/sinks.py

Lines changed: 110 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,14 @@
33
The :class:`agentevals.storage.repos.ResultRepository` is always written;
44
sinks are an additional delivery channel. Sink failures are logged with
55
``run_id`` / ``result_id`` but do not fail the run.
6+
7+
**Plugins:** third-party packages declare setuptools entry points in group
8+
``agentevals.sinks`` (entry **name** = ``kind`` string; **value** = ``module:factory``
9+
callable ``factory(spec: dict) -> ResultSink``). Built-in kinds
10+
(``stdout``, ``file``, ``http_webhook``) are not overridden by entry points;
11+
hosts may replace any kind via :func:`register_sink_factory` (highest precedence).
12+
13+
Tests may call :func:`clear_sink_plugin_registry` to drop programmatic registrations.
614
"""
715

816
from __future__ import annotations
@@ -12,8 +20,10 @@
1220
import logging
1321
import os
1422
import sys
23+
from collections.abc import Callable
24+
from importlib.metadata import entry_points
1525
from pathlib import Path
16-
from typing import Any, Protocol
26+
from typing import Any, Protocol, cast
1727
from uuid import UUID
1828

1929
import httpx
@@ -22,13 +32,20 @@
2232

2333
logger = logging.getLogger(__name__)
2434

35+
SINK_ENTRY_POINT_GROUP = "agentevals.sinks"
36+
2537

2638
class ResultSink(Protocol):
2739
async def emit_partial(self, run_id: UUID, results: list[Result], attempt: int) -> None: ...
2840
async def emit_final(self, run_id: UUID, summary: dict, attempt: int) -> None: ...
2941
async def emit_error(self, run_id: UUID, error: str, attempt: int) -> None: ...
3042

3143

44+
SinkFactory = Callable[[dict[str, Any]], ResultSink]
45+
46+
_PLUGIN_FACTORIES: dict[str, SinkFactory] = {}
47+
48+
3249
def _result_payload(r: Result) -> dict:
3350
return r.model_dump(mode="json", by_alias=True)
3451

@@ -187,33 +204,18 @@ async def _guard(coro: Any, phase: str) -> None:
187204
logger.exception("sink delivery failed in phase=%s", phase)
188205

189206

190-
def build_sinks(specs: list[dict]) -> SinkFanout:
191-
"""Construct a fan-out from the run spec's ``sinks`` array.
207+
def register_sink_factory(kind: str, factory: SinkFactory) -> None:
208+
"""Register or replace the factory for ``kind`` (overrides built-ins and entry points).
192209
193-
Each spec is a dict with ``kind`` plus kind-specific args. Unknown kinds
194-
are skipped with a warning so a future kind added by a host doesn't
195-
break older agentevals replicas mid-rollout.
210+
Call during process startup before run workers consume specs. The factory receives
211+
the full sink spec dict (including ``kind``) and returns a :class:`ResultSink`.
196212
"""
197-
sinks: list[ResultSink] = []
198-
for spec in specs:
199-
kind = spec.get("kind")
200-
if kind == "stdout":
201-
sinks.append(StdoutSink())
202-
elif kind == "file":
203-
sinks.append(FileSink(spec["path"]))
204-
elif kind == "http_webhook":
205-
sinks.append(
206-
HttpWebhookSink(
207-
url=spec["url"],
208-
headers=spec.get("headers"),
209-
headers_from_env=spec.get("headers_from_env") or _extract_env_headers(spec.get("auth")),
210-
timeout_s=float(spec.get("timeout_s", 10.0)),
211-
max_attempts=int(spec.get("max_attempts", 5)),
212-
)
213-
)
214-
else:
215-
logger.warning("unknown sink kind '%s'; skipping", kind)
216-
return SinkFanout(sinks)
213+
_PLUGIN_FACTORIES[kind] = factory
214+
215+
216+
def clear_sink_plugin_registry() -> None:
217+
"""Drop all registrations from :func:`register_sink_factory` (for tests)."""
218+
_PLUGIN_FACTORIES.clear()
217219

218220

219221
def _extract_env_headers(auth: Any) -> dict[str, str]:
@@ -228,3 +230,85 @@ def _extract_env_headers(auth: Any) -> dict[str, str]:
228230
if isinstance(value, dict) and "from_env" in value:
229231
result[header_name] = value["from_env"]
230232
return result
233+
234+
235+
def _http_webhook_from_spec(spec: dict[str, Any]) -> HttpWebhookSink:
236+
return HttpWebhookSink(
237+
url=spec["url"],
238+
headers=spec.get("headers"),
239+
headers_from_env=spec.get("headers_from_env") or _extract_env_headers(spec.get("auth")),
240+
timeout_s=float(spec.get("timeout_s", 10.0)),
241+
max_attempts=int(spec.get("max_attempts", 5)),
242+
)
243+
244+
245+
def _builtin_factories() -> dict[str, SinkFactory]:
246+
return {
247+
"stdout": lambda _spec: StdoutSink(),
248+
"file": lambda spec: FileSink(spec["path"]),
249+
"http_webhook": _http_webhook_from_spec,
250+
}
251+
252+
253+
def _merge_sink_factories() -> dict[str, SinkFactory]:
254+
"""Built-ins, then entry points (no built-in shadowing), then programmatic overrides."""
255+
merged: dict[str, SinkFactory] = dict(_builtin_factories())
256+
eps = entry_points(group=SINK_ENTRY_POINT_GROUP)
257+
for ep in eps:
258+
if ep.name in merged:
259+
logger.debug("skipping sink entry point %r; built-in kind takes precedence", ep.name)
260+
continue
261+
try:
262+
loaded = ep.load()
263+
if not callable(loaded):
264+
logger.warning("sink entry point %r is not callable; skipping", ep.name)
265+
continue
266+
merged[ep.name] = cast(SinkFactory, loaded)
267+
except Exception:
268+
logger.exception("failed to load sink entry point %r", ep.name)
269+
merged.update(_PLUGIN_FACTORIES)
270+
return merged
271+
272+
273+
def registered_sink_kinds() -> tuple[str, ...]:
274+
"""Sorted sink ``kind`` strings that would resolve if :func:`build_sinks` ran now.
275+
276+
Includes built-ins, successfully loaded setuptools entry points for group
277+
:data:`SINK_ENTRY_POINT_GROUP`, and registrations from
278+
:func:`register_sink_factory`. The tuple reflects current process state and
279+
can change if the programmatic registry is mutated after startup.
280+
"""
281+
return tuple(sorted(_merge_sink_factories().keys()))
282+
283+
284+
def log_registered_sinks() -> None:
285+
"""Emit one INFO line listing available sink kinds (for operator diagnostics)."""
286+
kinds = registered_sink_kinds()
287+
logger.info("Result sinks available (%d kinds): %s", len(kinds), ", ".join(kinds))
288+
289+
290+
def build_sinks(specs: list[dict]) -> SinkFanout:
291+
"""Construct a fan-out from the run spec's ``sinks`` array.
292+
293+
Each spec is a dict with ``kind`` plus kind-specific args. Unknown kinds
294+
are skipped with a warning so a future kind added by a host doesn't
295+
break older agentevals replicas mid-rollout.
296+
297+
Factory lookup starts from built-ins, adds setuptools entry points (group
298+
``agentevals.sinks``) for ``kind`` names not already built-in, then applies
299+
:func:`register_sink_factory` registrations, which override any prior factory
300+
for the same ``kind``. See :func:`_merge_sink_factories`.
301+
"""
302+
factories = _merge_sink_factories()
303+
sinks: list[ResultSink] = []
304+
for spec in specs:
305+
kind = spec.get("kind")
306+
factory = factories.get(kind) if kind is not None else None
307+
if factory is None:
308+
logger.warning("unknown sink kind '%s'; skipping", kind)
309+
continue
310+
try:
311+
sinks.append(factory(spec))
312+
except Exception:
313+
logger.exception("sink factory failed for kind=%s", kind)
314+
return SinkFanout(sinks)

0 commit comments

Comments
 (0)