Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions packages/agent-sre/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,27 @@ All notable changes to Agent SRE will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

### Added
- `MeasurementStore` ABC with `InMemoryMeasurementStore` (thread-safe, default) and
`SQLiteMeasurementStore` (durable, survives agent restarts) backends for SLI
measurement persistence — closes #645.
- `CalibrationDeltaSLI`: new built-in SLI that tracks the running gap between an
agent's stated confidence and its empirical success rate (calibration drift).
Registered in `SLIRegistry` by default. Reference: PDR DOI 10.5281/zenodo.19339987.
- `SLI.__init__` and all built-in SLI subclasses now accept an optional `store`
keyword argument. Omitting it preserves identical backward-compatible behaviour.
- `_validate_db_path()` utility function rejects non-file URI schemes (e.g. `http://`)
passed to `SQLiteMeasurementStore`.
- 26 new tests in `tests/unit/test_sli_persistence.py` covering both stores,
thread-safety, SQLite durability, input validation, and `CalibrationDeltaSLI`.

### Changed
- `InMemoryMeasurementStore` is now thread-safe (uses `threading.Lock`).
- `SLI._measurements` is preserved as a backward-compatible alias pointing into
the in-memory store's row list when the default backend is used.

## [0.3.0] - 2026-02-19

### Added
Expand Down
7 changes: 6 additions & 1 deletion packages/agent-sre/src/agent_sre/slo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
# Licensed under the MIT License.
"""SLO Engine — Define what 'reliable' means for agents."""

from agent_sre.slo.indicators import SLI, SLIRegistry, SLIValue
from agent_sre.slo.indicators import CalibrationDeltaSLI, SLI, SLIRegistry, SLIValue
from agent_sre.slo.persistence import InMemoryMeasurementStore, MeasurementStore, SQLiteMeasurementStore
from agent_sre.slo.objectives import SLO, ErrorBudget
from agent_sre.slo.spec import SLOSpec, load_slo_specs, resolve_inheritance
from agent_sre.slo.validator import SLODiff, diff_specs, validate_spec
Expand All @@ -11,6 +12,10 @@
"SLI",
"SLIValue",
"SLIRegistry",
"CalibrationDeltaSLI",
"MeasurementStore",
"InMemoryMeasurementStore",
"SQLiteMeasurementStore",
"SLO",
"ErrorBudget",
"SLOSpec",
Expand Down
221 changes: 203 additions & 18 deletions packages/agent-sre/src/agent_sre/slo/indicators.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
from enum import Enum
from typing import Any

from agent_sre.slo.persistence import InMemoryMeasurementStore, MeasurementStore


class TimeWindow(Enum):
"""Standard time windows for SLI aggregation."""
Expand Down Expand Up @@ -53,30 +55,50 @@ class SLI(ABC):
tool call accuracy, response latency).
"""

def __init__(self, name: str, target: float, window: TimeWindow | str) -> None:
def __init__(
self,
name: str,
target: float,
window: TimeWindow | str,
store: MeasurementStore | None = None,
) -> None:
self.name = name
self.target = target
self.window = TimeWindow(window) if isinstance(window, str) else window
self._measurements: list[SLIValue] = []
self._store: MeasurementStore = store if store is not None else InMemoryMeasurementStore()
# Backward-compat alias — external code that appends directly to ._measurements
# still works when using the default InMemoryMeasurementStore.
self._measurements: list[SLIValue] = (
self._store._rows # type: ignore[attr-defined]
if isinstance(self._store, InMemoryMeasurementStore)
else []
)

@abstractmethod
def collect(self) -> SLIValue:
"""Collect a new measurement."""

def record(self, value: float, metadata: dict[str, Any] | None = None) -> SLIValue:
"""Record a measurement value."""
ts = time.time()
full_meta: dict[str, Any] = {"target": self.target, **(metadata or {})}
measurement = SLIValue(
name=self.name,
value=value,
metadata={"target": self.target, **(metadata or {})},
timestamp=ts,
metadata=full_meta,
)
self._measurements.append(measurement)
self._store.append(self.name, value, ts, full_meta)
return measurement

def values_in_window(self) -> list[SLIValue]:
"""Get measurements within the current time window."""
cutoff = time.time() - self.window.seconds
return [m for m in self._measurements if m.timestamp >= cutoff]
rows = self._store.query(self.name, cutoff)
return [
SLIValue(name=r.name, value=r.value, timestamp=r.timestamp, metadata=r.metadata)
for r in rows
]

def current_value(self) -> float | None:
"""Get the current aggregated value within the window."""
Expand Down Expand Up @@ -111,8 +133,13 @@ def to_dict(self) -> dict[str, Any]:
class TaskSuccessRate(SLI):
"""Measures the fraction of tasks completed successfully."""

def __init__(self, target: float = 0.995, window: TimeWindow | str = "30d") -> None:
super().__init__("task_success_rate", target, window)
def __init__(
self,
target: float = 0.995,
window: TimeWindow | str = "30d",
store: MeasurementStore | None = None,
) -> None:
super().__init__("task_success_rate", target, window, store=store)
self._total = 0
self._success = 0

Expand All @@ -132,8 +159,13 @@ def collect(self) -> SLIValue:
class ToolCallAccuracy(SLI):
"""Measures the fraction of tool calls that selected the correct tool."""

def __init__(self, target: float = 0.999, window: TimeWindow | str = "7d") -> None:
super().__init__("tool_call_accuracy", target, window)
def __init__(
self,
target: float = 0.999,
window: TimeWindow | str = "7d",
store: MeasurementStore | None = None,
) -> None:
super().__init__("tool_call_accuracy", target, window, store=store)
self._total = 0
self._correct = 0

Expand All @@ -158,8 +190,9 @@ def __init__(
target_ms: float = 5000.0,
percentile: float = 0.95,
window: TimeWindow | str = "1h",
store: MeasurementStore | None = None,
) -> None:
super().__init__(f"response_latency_p{int(percentile * 100)}", target_ms, window)
super().__init__(f"response_latency_p{int(percentile * 100)}", target_ms, window, store=store)
self.percentile = percentile
self._latencies: list[float] = []

Expand All @@ -185,8 +218,13 @@ def collect(self) -> SLIValue:
class CostPerTask(SLI):
"""Measures the average cost per task in USD."""

def __init__(self, target_usd: float = 0.50, window: TimeWindow | str = "24h") -> None:
super().__init__("cost_per_task", target_usd, window)
def __init__(
self,
target_usd: float = 0.50,
window: TimeWindow | str = "24h",
store: MeasurementStore | None = None,
) -> None:
super().__init__("cost_per_task", target_usd, window, store=store)
self._total_cost = 0.0
self._task_count = 0

Expand All @@ -205,8 +243,13 @@ def collect(self) -> SLIValue:
class PolicyCompliance(SLI):
"""Measures adherence to Agent OS policies (100% target by default)."""

def __init__(self, target: float = 1.0, window: TimeWindow | str = "24h") -> None:
super().__init__("policy_compliance", target, window)
def __init__(
self,
target: float = 1.0,
window: TimeWindow | str = "24h",
store: MeasurementStore | None = None,
) -> None:
super().__init__("policy_compliance", target, window, store=store)
self._total = 0
self._compliant = 0

Expand All @@ -226,8 +269,13 @@ def collect(self) -> SLIValue:
class DelegationChainDepth(SLI):
"""Measures scope chain depth (lower is better, inverted comparison)."""

def __init__(self, max_depth: int = 3, window: TimeWindow | str = "24h") -> None:
super().__init__("scope_chain_depth", float(max_depth), window)
def __init__(
self,
max_depth: int = 3,
window: TimeWindow | str = "24h",
store: MeasurementStore | None = None,
) -> None:
super().__init__("scope_chain_depth", float(max_depth), window, store=store)
self.max_depth = max_depth

def record_depth(self, depth: int, metadata: dict[str, Any] | None = None) -> SLIValue:
Expand All @@ -253,8 +301,13 @@ class HallucinationRate(SLI):
that scores agent outputs for factual accuracy. Lower is better.
"""

def __init__(self, target: float = 0.05, window: TimeWindow | str = "24h") -> None:
super().__init__("hallucination_rate", target, window)
def __init__(
self,
target: float = 0.05,
window: TimeWindow | str = "24h",
store: MeasurementStore | None = None,
) -> None:
super().__init__("hallucination_rate", target, window, store=store)
self._total = 0
self._hallucinated = 0

Expand Down Expand Up @@ -284,6 +337,137 @@ def collect(self) -> SLIValue:
return self.record(rate)


class CalibrationDeltaSLI(SLI):
"""Tracks calibration drift — the gap between predicted confidence and actual success rate.

An agent is *well-calibrated* when its stated confidence for a claim matches the
empirical success rate for that claim. For example, a claim made with 0.80
confidence should succeed ~80 % of the time. A growing |avg_predicted − success_rate|
over successive measurements signals systematic over- or under-confidence.

The SLI records one ``SLIValue`` per ``record_prediction()`` call. Each recorded
value is the aggregate calibration delta up to that point (i.e. the running
``|mean_predicted_confidence − mean_actual_success_rate|``). ``current_value()``
therefore returns the latest aggregate delta within the measurement window.

Compliance is defined as the fraction of recorded aggregates that are at or below
*target_delta* — a lower value is better.

Reference:
PDR: Persistent Delivery Reliability for AI Agents,
DOI 10.5281/zenodo.19339987 — calibration_delta axis, §3.

Example::

sli = CalibrationDeltaSLI(target_delta=0.05)
# High-confidence prediction that succeeds — good calibration
sli.record_prediction(predicted_confidence=0.90, actual_success=True)
# Overconfident prediction that fails — calibration gap widens
sli.record_prediction(predicted_confidence=0.90, actual_success=False)
print(sli.current_value()) # Running |avg_pred − avg_success|
"""

def __init__(
self,
target_delta: float = 0.05,
window: TimeWindow | str = "30d",
store: MeasurementStore | None = None,
) -> None:
"""Initialise the CalibrationDeltaSLI.

Args:
target_delta: Maximum acceptable calibration gap (lower is better).
Compliance is the fraction of measurements at or below
this threshold. Defaults to 0.05 (5 percentage points).
window: Measurement window for ``values_in_window()`` and
``compliance()``. Defaults to ``"30d"``.
store: Optional persistence backend. Defaults to
``InMemoryMeasurementStore`` (same behaviour as before
persistence support was added). Pass a
``SQLiteMeasurementStore`` to survive agent restarts.
"""
super().__init__("calibration_delta", target_delta, window, store=store)
self._sum_predicted: float = 0.0
self._sum_actual: float = 0.0
self._count: int = 0

def current_value(self) -> float | None:
"""Return the most recent aggregate calibration delta within the window.

Unlike the base ``SLI.current_value()`` which averages all window
measurements, CalibrationDelta tracks a *running aggregate* — so the
most recent recorded value IS the current aggregate. Averaging the
history would give a misleadingly high number during a well-calibrated
convergence phase.

Returns:
The latest aggregate ``|mean_predicted − mean_actual_success_rate|``,
or ``None`` if no measurements exist in the window.
"""
values = self.values_in_window()
if not values:
return None
return values[-1].value # last recorded running aggregate

def record_prediction(
self,
predicted_confidence: float,
actual_success: bool,
metadata: dict[str, Any] | None = None,
) -> SLIValue:
"""Record one prediction and its outcome.

Accumulates running sums so that each call records the *aggregate*
``|mean_predicted_confidence − mean_actual_success_rate|`` over all
predictions seen so far. This makes ``current_value()`` meaningful after
just a single call while naturally smoothing out noise over many calls.

Args:
predicted_confidence: Agent's stated confidence in [0, 1] before
observing the outcome.
actual_success: Whether the prediction/action ultimately
succeeded (True) or failed (False).
metadata: Optional key/value annotations stored alongside
the measurement.

Returns:
The ``SLIValue`` representing the current aggregate calibration delta.
"""
self._sum_predicted += predicted_confidence
self._sum_actual += float(actual_success)
self._count += 1
avg_pred = self._sum_predicted / self._count
avg_actual = self._sum_actual / self._count
delta = abs(avg_pred - avg_actual)
return self.record(delta, metadata)

def compliance(self) -> float | None:
"""Fraction of recorded aggregate deltas at or below *target_delta*.

A measurement is *good* when the running calibration gap (the recorded
value) is ≤ ``self.target``. Because each recorded value is an aggregate
over all preceding predictions, the compliance score reflects how often
the agent has been within the target calibration envelope throughout the
measurement window.

Returns:
A float in [0, 1], or ``None`` if no measurements exist in the window.
"""
values = self.values_in_window()
if not values:
return None
good = sum(1 for v in values if v.value <= self.target)
return good / len(values)

def collect(self) -> SLIValue:
"""Emit the current aggregate calibration delta as a new measurement."""
if self._count == 0:
return self.record(0.0)
avg_pred = self._sum_predicted / self._count
avg_actual = self._sum_actual / self._count
return self.record(abs(avg_pred - avg_actual))


# --- Registry ---


Expand All @@ -302,6 +486,7 @@ def __init__(self) -> None:
PolicyCompliance,
DelegationChainDepth,
HallucinationRate,
CalibrationDeltaSLI,
):
self.register_type(cls)

Expand Down
Loading
Loading