|
| 1 | +from collections.abc import Iterable |
| 2 | +from typing import Final |
| 3 | + |
| 4 | +from ai.backend.common.clients.prometheus.metric_types import KernelLiveStatBatchResult |
| 5 | +from ai.backend.common.clients.prometheus.types import MetricValue as PrometheusMetricValue |
| 6 | +from ai.backend.common.clients.prometheus.types import ValueType |
| 7 | +from ai.backend.common.metrics.types import UTILIZATION_METRIC_INTERVAL |
| 8 | +from ai.backend.common.types import KernelId, MetricValue |
| 9 | + |
| 10 | +# Metric-name classification used only while adapting Prometheus samples back |
| 11 | +# into the legacy live_stat dict that Graphene/WebUI still expects. |
| 12 | +_RATE_STAT_METRICS: Final[frozenset[str]] = frozenset({"net_rx", "net_tx"}) |
| 13 | +_DIFF_STAT_METRICS: Final[frozenset[str]] = frozenset({"cpu_util"}) |
| 14 | + |
| 15 | +# Per-metric unit hint emitted by the agent (source of truth: |
| 16 | +# src/ai/backend/agent/docker/intrinsic.py). |
| 17 | +_METRIC_UNIT_HINTS: Final[dict[str, str]] = { |
| 18 | + "cpu_used": "msec", |
| 19 | + "cpu_util": "percent", |
| 20 | + "mem": "bytes", |
| 21 | + "net_rx": "bps", |
| 22 | + "net_tx": "bps", |
| 23 | + "io_read": "bytes", |
| 24 | + "io_write": "bytes", |
| 25 | + "io_scratch_size": "bytes", |
| 26 | +} |
| 27 | + |
| 28 | + |
| 29 | +def _make_default_metric_value(unit_hint: str) -> MetricValue: |
| 30 | + return MetricValue({ |
| 31 | + "current": "0", |
| 32 | + "capacity": "0", |
| 33 | + "pct": "0", |
| 34 | + "unit_hint": unit_hint, |
| 35 | + "stats.min": "0", |
| 36 | + "stats.max": "0", |
| 37 | + "stats.sum": "0", |
| 38 | + "stats.avg": "0", |
| 39 | + "stats.diff": "0", |
| 40 | + "stats.rate": "0", |
| 41 | + "stats.version": None, |
| 42 | + }) |
| 43 | + |
| 44 | + |
| 45 | +def _resolve_unit_hint(metric_name: str) -> str: |
| 46 | + if metric_name in _METRIC_UNIT_HINTS: |
| 47 | + return _METRIC_UNIT_HINTS[metric_name] |
| 48 | + if metric_name.endswith("_util"): |
| 49 | + return "percent" |
| 50 | + if metric_name == "mem" or metric_name.endswith("_mem"): |
| 51 | + return "bytes" |
| 52 | + if metric_name.startswith("io_"): |
| 53 | + return "bytes" |
| 54 | + if metric_name.startswith("net_"): |
| 55 | + return "bps" |
| 56 | + return metric_name |
| 57 | + |
| 58 | + |
| 59 | +class LegacyLiveStatConverter: |
| 60 | + """Adapt `KernelLiveStatBatchResult` into the legacy |
| 61 | + `dict[metric_name, MetricValue]` shape consumed by GQL/WebUI. |
| 62 | +
|
| 63 | + Merge order from upstream is gauge -> diff -> rate, so for |
| 64 | + RATE/DIFF metrics the same `(name, CURRENT)` tuple appears twice; |
| 65 | + `currents[0]` is the raw gauge sample, `currents[-1]` is the |
| 66 | + rate/diff query result. |
| 67 | +
|
| 68 | + `stats.max` / `stats.avg` are not populated |
| 69 | + """ |
| 70 | + |
| 71 | + @classmethod |
| 72 | + def convert( |
| 73 | + cls, result: KernelLiveStatBatchResult |
| 74 | + ) -> dict[KernelId, dict[str, MetricValue] | None]: |
| 75 | + out: dict[KernelId, dict[str, MetricValue] | None] = {} |
| 76 | + for kernel_id, entry in result.entries.items(): |
| 77 | + if not entry.values: |
| 78 | + out[kernel_id] = None |
| 79 | + continue |
| 80 | + out[kernel_id] = cls._convert_one_kernel(entry.values) |
| 81 | + return out |
| 82 | + |
| 83 | + @classmethod |
| 84 | + def _convert_one_kernel(cls, values: Iterable[PrometheusMetricValue]) -> dict[str, MetricValue]: |
| 85 | + grouped: dict[str, list[PrometheusMetricValue]] = {} |
| 86 | + for v in values: |
| 87 | + grouped.setdefault(v.metric_name, []).append(v) |
| 88 | + |
| 89 | + per_metric: dict[str, MetricValue] = {} |
| 90 | + for name, samples in grouped.items(): |
| 91 | + per_metric[name] = cls._convert_metric_samples(name, samples) |
| 92 | + return per_metric |
| 93 | + |
| 94 | + @staticmethod |
| 95 | + def _convert_metric_samples( |
| 96 | + metric_name: str, samples: list[PrometheusMetricValue] |
| 97 | + ) -> MetricValue: |
| 98 | + # `_resolve_unit_hint` falls back to naming conventions and finally |
| 99 | + # the metric_name itself for unregistered plugin metrics. |
| 100 | + unit_hint = _resolve_unit_hint(metric_name) |
| 101 | + out = _make_default_metric_value(unit_hint=unit_hint) |
| 102 | + |
| 103 | + currents = [s.value for s in samples if s.value_type is ValueType.CURRENT] |
| 104 | + capacities = [s.value for s in samples if s.value_type is ValueType.CAPACITY] |
| 105 | + pcts = [s.value for s in samples if s.value_type is ValueType.PCT] |
| 106 | + |
| 107 | + is_rate_metric = metric_name in _RATE_STAT_METRICS |
| 108 | + is_diff_metric = metric_name in _DIFF_STAT_METRICS |
| 109 | + |
| 110 | + if currents: |
| 111 | + # RATE/DIFF: prefer the rate/diff query result over the raw gauge, |
| 112 | + # mirroring the legacy `current_hook=stats.rate|diff` behavior. |
| 113 | + if (is_rate_metric or is_diff_metric) and len(currents) > 1: |
| 114 | + out["current"] = currents[-1] |
| 115 | + else: |
| 116 | + out["current"] = currents[0] |
| 117 | + if capacities: |
| 118 | + out["capacity"] = capacities[-1] |
| 119 | + |
| 120 | + if is_rate_metric and currents: |
| 121 | + # RATE template applies `/ UTILIZATION_METRIC_INTERVAL`; undo it |
| 122 | + # here to recover the per-second magnitude legacy `stats.rate` had. |
| 123 | + # TODO: separate the rate query from the gauge query so this |
| 124 | + # hack-multiply isn't needed. |
| 125 | + try: |
| 126 | + rate_value = float(currents[-1]) * UTILIZATION_METRIC_INTERVAL |
| 127 | + out["stats.rate"] = f"{rate_value:.6f}" |
| 128 | + except ValueError: |
| 129 | + out["stats.rate"] = currents[-1] |
| 130 | + if is_diff_metric and currents: |
| 131 | + # Per-second rate, not the legacy per-5s delta — GQL consumers |
| 132 | + # only read `cpu_util.pct`, so magnitude mismatch is acceptable. |
| 133 | + out["stats.diff"] = currents[-1] |
| 134 | + |
| 135 | + # Derive pct from current/capacity when no PCT sample was emitted. |
| 136 | + if pcts: |
| 137 | + out["pct"] = pcts[-1] |
| 138 | + else: |
| 139 | + try: |
| 140 | + current_value = float(out["current"]) |
| 141 | + capacity = out["capacity"] |
| 142 | + if capacity is None: |
| 143 | + return out |
| 144 | + capacity_value = float(capacity) |
| 145 | + if capacity_value > 0: |
| 146 | + out["pct"] = f"{current_value / capacity_value * 100:.2f}" |
| 147 | + except ValueError: |
| 148 | + pass |
| 149 | + |
| 150 | + return out |
0 commit comments