Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
263 changes: 263 additions & 0 deletions products/metrics/backend/anomaly.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,263 @@
"""Anomaly characterization for one metric.

Given an anomaly window (and an optional explicit baseline window), this
compares the windows statistically, finds the onset bucket, and drills into
candidate label keys to find which label values moved — the "what changed,
when, and where" an on-call investigator (or agent) needs before
correlating into logs and traces.
"""

from __future__ import annotations

import math
import datetime as dt
import statistics
from collections.abc import Callable
from typing import Any

from posthog.hogql import ast
from posthog.hogql.parser import parse_select
from posthog.hogql.query import execute_hogql_query

from posthog.clickhouse.client.connection import Workload
from posthog.models import Team

from products.metrics.backend.facade.contracts import (
MetricAnomalyDimension,
MetricAnomalyReport,
MetricFilter,
MetricGroupBy,
MetricPoint,
MetricSeries,
)
from products.metrics.backend.metric_names_query_runner import MetricNamesQueryRunner
from products.metrics.backend.metric_query_runner import MetricQueryRunner, _pick_interval

# How many label keys to drill into and how many movers to report.
MAX_CANDIDATE_KEYS = 4
MAX_TOP_MOVERS = 8

# Onset = first anomaly-window bucket beyond this many baseline stddevs
# (with a relative-change floor for near-constant baselines).
ONSET_STDDEV_THRESHOLD = 3.0
ONSET_RELATIVE_FLOOR = 0.5

_AGGREGATION_BY_TYPE = {
"sum": "rate",
"gauge": "avg",
"histogram": "histogram_quantile",
"exponential_histogram": "p95",
"summary": "avg",
}


def _default_aggregation(team: Team, metric_name: str) -> tuple[str, float | None]:
"""Pick an aggregation from the metric's OTel type: counters get `rate`,
gauges `avg`, histograms `histogram_quantile(0.95)`."""
for row in MetricNamesQueryRunner(team=team, search=metric_name, limit=5).run():
if row["name"] == metric_name:
aggregation = _AGGREGATION_BY_TYPE.get(row["metric_type"], "avg")
return aggregation, 0.95 if aggregation == "histogram_quantile" else None
return "avg", None


def _mean(values: list[float]) -> float:
return sum(values) / len(values) if values else 0.0


def _find_onset(
points: list[MetricPoint], anomaly_from_iso: str, baseline_mean: float, baseline_stddev: float, direction: str
) -> str | None:
threshold = max(
ONSET_STDDEV_THRESHOLD * baseline_stddev,
ONSET_RELATIVE_FLOOR * abs(baseline_mean),
1e-9,
)
for point in points:
if point.time < anomaly_from_iso:
continue
deviation = point.value - baseline_mean
if direction == "down":
deviation = -deviation
if deviation > threshold:
return point.time
return None


def characterize_anomaly(
*,
team: Team,
metric_name: str,
anomaly_from: dt.datetime,
anomaly_to: dt.datetime,
baseline_from: dt.datetime | None = None,
baseline_to: dt.datetime | None = None,
aggregation: str | None = None,
quantile: float | None = None,
filters: tuple[MetricFilter, ...] = (),
candidate_keys: tuple[str, ...] | None = None,
) -> MetricAnomalyReport:
if anomaly_to <= anomaly_from:
raise ValueError("anomaly_to must be after anomaly_from")
if baseline_to is None:
baseline_to = anomaly_from
if baseline_from is None:
baseline_from = baseline_to - (anomaly_to - anomaly_from)
if baseline_to > anomaly_from:
raise ValueError("the baseline window must end at or before anomaly_from")
Comment on lines +100 to +107

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing validation for baseline_from < baseline_to. The code validates that anomaly_to > anomaly_from and baseline_to <= anomaly_from, but never checks if baseline_from < baseline_to.

If a caller explicitly passes baseline_from >= baseline_to, the function will:

  1. Run the query from baseline_from to anomaly_to
  2. Filter points with time < anomaly_from_iso as baseline (line 135)
  3. Produce incorrect or empty baseline_values, leading to invalid statistical calculations
if baseline_from >= baseline_to:
    raise ValueError("baseline_from must be before baseline_to")

Add this check after line 105 to prevent invalid window configurations.

Suggested change
if anomaly_to <= anomaly_from:
raise ValueError("anomaly_to must be after anomaly_from")
if baseline_to is None:
baseline_to = anomaly_from
if baseline_from is None:
baseline_from = baseline_to - (anomaly_to - anomaly_from)
if baseline_to > anomaly_from:
raise ValueError("the baseline window must end at or before anomaly_from")
if anomaly_to <= anomaly_from:
raise ValueError("anomaly_to must be after anomaly_from")
if baseline_to is None:
baseline_to = anomaly_from
if baseline_from is None:
baseline_from = baseline_to - (anomaly_to - anomaly_from)
if baseline_from >= baseline_to:
raise ValueError("baseline_from must be before baseline_to")
if baseline_to > anomaly_from:
raise ValueError("the baseline window must end at or before anomaly_from")

Spotted by Graphite

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.


if aggregation is None:
aggregation, default_quantile = _default_aggregation(team, metric_name)
quantile = quantile if quantile is not None else default_quantile
if aggregation == "histogram_quantile" and quantile is None:
quantile = 0.95

# One query over the combined window keeps baseline and anomaly on a
# single grid; the interval comes from the combined span.
interval = _pick_interval(baseline_from, anomaly_to)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Medium: Unbounded metrics scans

A user with metrics:read can submit an anomalyFrom far in the past and omit candidateKeys; this endpoint then scans the selected metric once for the series, once to discover candidate keys, and up to four more times grouped by labels. Add a maximum combined baseline/anomaly window or query cost guard before running these queries so one API/MCP request cannot monopolize the metrics ClickHouse workload.

anomaly_from_iso = anomaly_from.isoformat()

def _run(group_by: tuple[MetricGroupBy, ...] = ()) -> list[dict[str, Any]]:
return MetricQueryRunner(
team=team,
metric_name=metric_name,
aggregation=aggregation,
date_from=baseline_from,
date_to=anomaly_to,
filters=filters,
group_by=group_by,
interval=interval,
quantile=quantile if aggregation == "histogram_quantile" else None,
).run()

rows = _run()
points = [MetricPoint(time=row["time"], value=row["value"]) for row in rows]
baseline_values = [p.value for p in points if p.time < anomaly_from_iso]
anomaly_values = [p.value for p in points if p.time >= anomaly_from_iso]

baseline_mean = _mean(baseline_values)
baseline_stddev = statistics.pstdev(baseline_values) if len(baseline_values) > 1 else 0.0
anomaly_mean = _mean(anomaly_values)
anomaly_peak = max(anomaly_values, default=0.0)
change_ratio = anomaly_mean / baseline_mean if baseline_mean else anomaly_mean

if math.isclose(anomaly_mean, baseline_mean, rel_tol=0.05, abs_tol=1e-12):
direction = "flat"
else:
direction = "up" if anomaly_mean > baseline_mean else "down"

onset_time = (
None
if direction == "flat"
else _find_onset(points, anomaly_from_iso, baseline_mean, baseline_stddev, direction)
)

movers = _find_top_movers(
run=_run,
team=team,
metric_name=metric_name,
anomaly_from_iso=anomaly_from_iso,
anomaly_from=anomaly_from,
anomaly_to=anomaly_to,
filters=filters,
candidate_keys=candidate_keys,
)

return MetricAnomalyReport(
metric_name=metric_name,
aggregation=aggregation,
interval=interval,
baseline_from=baseline_from.isoformat(),
baseline_to=baseline_to.isoformat(),
anomaly_from=anomaly_from_iso,
anomaly_to=anomaly_to.isoformat(),
baseline_mean=baseline_mean,
baseline_stddev=baseline_stddev,
anomaly_mean=anomaly_mean,
anomaly_peak=anomaly_peak,
change_ratio=change_ratio,
direction=direction,
onset_time=onset_time,
top_movers=movers,
series=MetricSeries(labels={}, points=tuple(points), metric_name=metric_name, clause="anomaly"),
)


def _discover_candidate_keys(
team: Team, metric_name: str, date_from: dt.datetime, date_to: dt.datetime
) -> tuple[str, ...]:
"""Most common attribute keys on the metric's rows in the window, with
service_name always considered (it's a first-class column duplicated
into the labels)."""
query = parse_select(
"""
SELECT key, count() AS occurrences
FROM (
SELECT arrayJoin(arrayConcat(mapKeys(attributes), mapKeys(resource_attributes))) AS key
FROM posthog.metrics
WHERE metric_name = {metric_name}
AND timestamp >= {date_from}
AND timestamp < {date_to}
)
GROUP BY key
ORDER BY occurrences DESC
LIMIT {limit}
""",
placeholders={
"metric_name": ast.Constant(value=metric_name),
"date_from": ast.Constant(value=date_from),
"date_to": ast.Constant(value=date_to),
"limit": ast.Constant(value=MAX_CANDIDATE_KEYS),
},
)
response = execute_hogql_query(query_type="MetricQuery", query=query, team=team, workload=Workload.LOGS)
keys = [row[0] for row in response.results]
if "service_name" not in keys:
keys = ["service_name", *keys][:MAX_CANDIDATE_KEYS]
return tuple(keys)


def _find_top_movers(
*,
run: Callable[..., list[dict[str, Any]]],
team: Team,
metric_name: str,
anomaly_from_iso: str,
anomaly_from: dt.datetime,
anomaly_to: dt.datetime,
filters: tuple[MetricFilter, ...],
candidate_keys: tuple[str, ...] | None,
) -> tuple[MetricAnomalyDimension, ...]:
keys = candidate_keys or _discover_candidate_keys(team, metric_name, anomaly_from, anomaly_to)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Candidate-key discovery queries only the anomaly window (anomaly_fromanomaly_to). A label value that was present during the baseline but disappeared during the anomaly — e.g. a pod that crashed or a shard that went offline — will never be discovered here, so its disappearance won't surface in top_movers. Extending the scan to the full combined window (baseline_fromanomaly_to) would catch this class of "drop to zero" signals.

Prompt To Fix With AI
This is a comment left during a code review.
Path: products/metrics/backend/anomaly.py
Line: 231

Comment:
Candidate-key discovery queries only the anomaly window (`anomaly_from``anomaly_to`). A label value that was present during the baseline but disappeared during the anomaly — e.g. a pod that crashed or a shard that went offline — will never be discovered here, so its disappearance won't surface in `top_movers`. Extending the scan to the full combined window (`baseline_from``anomaly_to`) would catch this class of "drop to zero" signals.

How can I resolve this? If you propose a fix, please make it concise.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Auto-discovery uses only the anomaly window to find candidate keys, which will miss labels that existed in the baseline but disappeared during the anomaly. This could fail to identify the root cause when services/shards stop reporting.

# Bug: discovers from anomaly window only
keys = candidate_keys or _discover_candidate_keys(team, metric_name, anomaly_from, anomaly_to)

# Fix: discover from the full baseline+anomaly span
keys = candidate_keys or _discover_candidate_keys(team, metric_name, baseline_from, anomaly_to)

The function already has baseline_from available via the closure (line 105), so pass it instead of anomaly_from to ensure labels from both windows are considered.

Suggested change
keys = candidate_keys or _discover_candidate_keys(team, metric_name, anomaly_from, anomaly_to)
keys = candidate_keys or _discover_candidate_keys(team, metric_name, baseline_from, anomaly_to)

Spotted by Graphite

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.


movers: list[MetricAnomalyDimension] = []
for key in keys[:MAX_CANDIDATE_KEYS]:
rows = run(group_by=(MetricGroupBy(key=key),))
per_label: dict[str, dict[str, list[float]]] = {}
for row in rows:
label = row["labels"].get(key, "")
bucket = "anomaly" if row["time"] >= anomaly_from_iso else "baseline"
per_label.setdefault(label, {"baseline": [], "anomaly": []})[bucket].append(row["value"])
for label, windows in per_label.items():
baseline_value = _mean(windows["baseline"])
anomaly_value = _mean(windows["anomaly"])
if math.isclose(baseline_value, anomaly_value, rel_tol=0.05, abs_tol=1e-12):
continue
ratio = anomaly_value / baseline_value if baseline_value else anomaly_value
movers.append(
MetricAnomalyDimension(
key=key,
label=label,
baseline_value=baseline_value,
anomaly_value=anomaly_value,
change_ratio=ratio,
)
)

def _magnitude(mover: MetricAnomalyDimension) -> float:
if mover.baseline_value == 0 or mover.change_ratio <= 0:
return abs(mover.anomaly_value - mover.baseline_value)
return abs(math.log(mover.change_ratio)) * max(abs(mover.anomaly_value), abs(mover.baseline_value))

movers.sort(key=lambda m: (-_magnitude(m), m.key, m.label))
return tuple(movers[:MAX_TOP_MOVERS])
49 changes: 48 additions & 1 deletion products/metrics/backend/facade/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,20 @@
so import-linter's strict-mode contract holds.
"""

import datetime as dt
from typing import Any

from posthog.models import Team

from products.metrics.backend.facade.contracts import MetricPoint, MetricQueryClause, MetricQueryRequest, MetricSeries
from products.metrics.backend.anomaly import characterize_anomaly as _characterize_anomaly
from products.metrics.backend.facade.contracts import (
MetricAnomalyReport,
MetricFilter,
MetricPoint,
MetricQueryClause,
MetricQueryRequest,
MetricSeries,
)
from products.metrics.backend.facade.enums import MetricAggregation
from products.metrics.backend.formula import evaluate, parse_formula
from products.metrics.backend.has_metrics_query_runner import team_has_metrics as _team_has_metrics
Expand Down Expand Up @@ -183,3 +192,41 @@ def list_metric_names(
"""
runner = MetricNamesQueryRunner(team=team, search=search, limit=limit)
return runner.run()


def characterize_metric_anomaly(
*,
team: Team,
metric_name: str,
anomaly_from: dt.datetime,
anomaly_to: dt.datetime,
baseline_from: dt.datetime | None = None,
baseline_to: dt.datetime | None = None,
aggregation: str | None = None,
quantile: float | None = None,
filters: tuple[MetricFilter, ...] = (),
candidate_keys: tuple[str, ...] | None = None,
) -> MetricAnomalyReport:
"""Characterize how a metric behaves in an anomaly window vs a baseline:
summary statistics, change magnitude/direction, the onset bucket, and
the label values that moved the most (drilling into up to four candidate
keys, auto-discovered from the metric's attributes when not given).

The baseline defaults to the window of equal length immediately before
`anomaly_from`. `aggregation` defaults by the metric's OTel type
(counter -> rate, gauge -> avg, histogram -> histogram_quantile 0.95).
Raises `ValueError` for invalid windows/aggregations — the presentation
layer surfaces these as 400s.
"""
return _characterize_anomaly(
team=team,
metric_name=metric_name,
anomaly_from=anomaly_from,
anomaly_to=anomaly_to,
baseline_from=baseline_from,
baseline_to=baseline_to,
aggregation=aggregation,
quantile=quantile,
filters=filters,
candidate_keys=candidate_keys,
)
38 changes: 38 additions & 0 deletions products/metrics/backend/facade/contracts.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,41 @@ class MetricSeries:
points: tuple[MetricPoint, ...]
metric_name: str | None = None
clause: str | None = None


@dataclass(frozen=True, slots=True)
class MetricAnomalyDimension:
"""One label value's behavior across the baseline/anomaly windows."""

key: str
label: str
baseline_value: float
anomaly_value: float
# anomaly_value / baseline_value; 0.0 baselines yield the anomaly value
# itself (treat as "new" traffic).
change_ratio: float


@dataclass(frozen=True, slots=True)
class MetricAnomalyReport:
"""Everything an investigator needs to characterize 'metric X looks
wrong': how the anomaly window compares to the baseline, when it
started, and which label values moved the most."""

metric_name: str
aggregation: str
interval: str
baseline_from: str
baseline_to: str
anomaly_from: str
anomaly_to: str
baseline_mean: float
baseline_stddev: float
anomaly_mean: float
anomaly_peak: float
# anomaly_mean / baseline_mean; 0.0 baselines yield anomaly_mean.
change_ratio: float
direction: str # "up" | "down" | "flat"
onset_time: str | None
top_movers: tuple[MetricAnomalyDimension, ...]
series: MetricSeries
Loading