Skip to content

Commit dd50181

Browse files
seedspiritclaude
andauthored
refactor(BA-5803): consolidate metric service layer (#11238)
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent d24e1ec commit dd50181

33 files changed

Lines changed: 715 additions & 763 deletions

File tree

changes/11238.enhance.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Consolidate metric service layer into standard single-service structure and move Prometheus data-fetching to repository layer

src/ai/backend/manager/api/gql_legacy/metric/base.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,10 @@ async def get_object(
3131
info: graphene.ResolveInfo,
3232
) -> Self:
3333
graph_ctx: GraphQueryContext = info.context
34-
action_result = await graph_ctx.processors.utilization_metric.query_container_metadata.wait_for_complete(
35-
ContainerMetricMetadataAction()
34+
action_result = (
35+
await graph_ctx.processors.metric.query_container_metadata.wait_for_complete(
36+
ContainerMetricMetadataAction()
37+
)
3638
)
3739
return cls(
3840
metric_names=action_result.metric_names,

src/ai/backend/manager/api/gql_legacy/metric/user.py

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,11 @@
99
import graphene
1010

1111
from ai.backend.common.dto.clients.prometheus.request import QueryTimeRange
12+
from ai.backend.manager.data.metric.types import ContainerMetricOptionalLabel
1213
from ai.backend.manager.services.metric.actions.container import (
1314
ContainerMetricAction,
1415
)
1516
from ai.backend.manager.services.metric.types import (
16-
ContainerMetricOptionalLabel,
1717
MetricQueryParameter,
1818
)
1919

@@ -70,15 +70,11 @@ async def get_object(
7070
param: MetricQueryParameter,
7171
) -> Self:
7272
graph_ctx: GraphQueryContext = info.context
73-
action_result = (
74-
await graph_ctx.processors.utilization_metric.query_container.wait_for_complete(
75-
ContainerMetricAction(
76-
metric_name=param.metric_name,
77-
labels=ContainerMetricOptionalLabel(
78-
user_id=user_id, value_type=param.value_type
79-
),
80-
time_range=QueryTimeRange(start=param.start, end=param.end, step=param.step),
81-
)
73+
action_result = await graph_ctx.processors.metric.query_container.wait_for_complete(
74+
ContainerMetricAction(
75+
metric_name=param.metric_name,
76+
labels=ContainerMetricOptionalLabel(user_id=user_id, value_type=param.value_type),
77+
time_range=QueryTimeRange(start=param.start, end=param.end, step=param.step),
8278
)
8379
)
8480
metrics = []
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
from .fixed_query_builder import FixedQueryBuilder, LabelValuesQuery
2+
3+
__all__ = [
4+
"FixedQueryBuilder",
5+
"LabelValuesQuery",
6+
]
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
import re
2+
from collections.abc import Sequence
3+
from dataclasses import dataclass
4+
from typing import Final
5+
6+
from ai.backend.common.clients.prometheus.preset import LabelMatcher, MetricPreset
7+
from ai.backend.common.clients.prometheus.querier import ContainerMetricQuerier
8+
from ai.backend.common.clients.prometheus.types import ValueType
9+
from ai.backend.common.exception import UnreachableError
10+
from ai.backend.common.metrics.types import (
11+
CONTAINER_UTILIZATION_METRIC_LABEL_NAME,
12+
CONTAINER_UTILIZATION_METRIC_NAME,
13+
UTILIZATION_METRIC_INTERVAL,
14+
)
15+
from ai.backend.common.types import KernelId
16+
from ai.backend.manager.data.metric.types import (
17+
DIFF_METRICS,
18+
RATE_METRICS,
19+
ContainerLiveStatQueries,
20+
ContainerMetricOptionalLabel,
21+
MetricType,
22+
)
23+
24+
_LIVE_STAT_GROUP_BY: Final[frozenset[str]] = frozenset({
25+
"kernel_id",
26+
"container_metric_name",
27+
"value_type",
28+
})
29+
30+
_GAUGE_TEMPLATE: Final[str] = (
31+
f"sum by ({{group_by}})({CONTAINER_UTILIZATION_METRIC_NAME}{{{{{{labels}}}}}})"
32+
)
33+
_RATE_TEMPLATE: Final[str] = (
34+
"sum by ({group_by})(rate("
35+
f"{CONTAINER_UTILIZATION_METRIC_NAME}{{{{{{labels}}}}}}[{{window}}]))"
36+
f" / {UTILIZATION_METRIC_INTERVAL}"
37+
)
38+
_DIFF_TEMPLATE: Final[str] = (
39+
"sum by ({group_by})(rate("
40+
f"{CONTAINER_UTILIZATION_METRIC_NAME}{{{{{{labels}}}}}}[{{window}}]))"
41+
)
42+
43+
44+
@dataclass(frozen=True)
45+
class LabelValuesQuery:
46+
label_name: str
47+
metric_match: str
48+
49+
50+
def _regex_union(values: Sequence[str]) -> str:
51+
return "|".join(re.escape(value) for value in values)
52+
53+
54+
class FixedQueryBuilder:
55+
_timewindow: str
56+
57+
def __init__(self, timewindow: str) -> None:
58+
self._timewindow = timewindow
59+
60+
def get_container_metric_metadata_query(self) -> LabelValuesQuery:
61+
return LabelValuesQuery(
62+
label_name=CONTAINER_UTILIZATION_METRIC_LABEL_NAME,
63+
metric_match=CONTAINER_UTILIZATION_METRIC_NAME,
64+
)
65+
66+
def get_container_metric_type(
67+
self,
68+
metric_name: str,
69+
label: ContainerMetricOptionalLabel,
70+
) -> MetricType:
71+
if metric_name in DIFF_METRICS and label.value_type == ValueType.CURRENT:
72+
return MetricType.DIFF
73+
if metric_name in RATE_METRICS:
74+
return MetricType.RATE
75+
return MetricType.GAUGE
76+
77+
def get_container_metric_query(
78+
self,
79+
metric_name: str,
80+
label: ContainerMetricOptionalLabel,
81+
) -> MetricPreset:
82+
metric_type = self.get_container_metric_type(metric_name, label)
83+
querier = ContainerMetricQuerier(
84+
metric_name=metric_name,
85+
value_type=ValueType(label.value_type.value),
86+
kernel_id=label.kernel_id,
87+
session_id=label.session_id,
88+
agent_id=label.agent_id,
89+
user_id=label.user_id,
90+
project_id=label.project_id,
91+
)
92+
return MetricPreset(
93+
template=self._get_template(metric_type),
94+
labels=querier.labels(),
95+
group_by=querier.group_by_labels(),
96+
window=self._timewindow,
97+
)
98+
99+
def get_container_live_stat_queries(
100+
self,
101+
kernel_ids: Sequence[KernelId],
102+
) -> ContainerLiveStatQueries:
103+
return ContainerLiveStatQueries(
104+
gauge=self._get_container_live_stat_query(
105+
kernel_ids,
106+
metric_type=MetricType.GAUGE,
107+
),
108+
diff=self._get_container_live_stat_query(
109+
kernel_ids,
110+
metric_type=MetricType.DIFF,
111+
metric_name_filter=DIFF_METRICS,
112+
value_type_filter=ValueType.CURRENT,
113+
),
114+
rate=self._get_container_live_stat_query(
115+
kernel_ids,
116+
metric_type=MetricType.RATE,
117+
metric_name_filter=RATE_METRICS,
118+
value_type_filter=ValueType.CURRENT,
119+
),
120+
)
121+
122+
def _get_container_live_stat_query(
123+
self,
124+
kernel_ids: Sequence[KernelId],
125+
*,
126+
metric_type: MetricType,
127+
metric_name_filter: frozenset[str] | None = None,
128+
value_type_filter: ValueType | None = None,
129+
) -> MetricPreset:
130+
labels: dict[str, LabelMatcher] = {
131+
"kernel_id": LabelMatcher.regex(_regex_union([str(kid) for kid in kernel_ids]))
132+
}
133+
if metric_name_filter is not None:
134+
labels["container_metric_name"] = LabelMatcher.regex(
135+
_regex_union(sorted(metric_name_filter))
136+
)
137+
if value_type_filter is not None:
138+
labels["value_type"] = LabelMatcher.exact(value_type_filter.value)
139+
140+
return MetricPreset(
141+
template=self._get_template(metric_type),
142+
labels=labels,
143+
group_by=_LIVE_STAT_GROUP_BY,
144+
window=self._timewindow,
145+
)
146+
147+
def _get_template(self, metric_type: MetricType) -> str:
148+
match metric_type:
149+
case MetricType.GAUGE:
150+
return _GAUGE_TEMPLATE
151+
case MetricType.RATE:
152+
return _RATE_TEMPLATE
153+
case MetricType.DIFF:
154+
return _DIFF_TEMPLATE
155+
case _:
156+
raise UnreachableError(f"Unknown metric type: {metric_type}")

src/ai/backend/manager/data/metric/types.py

Lines changed: 126 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,23 +3,36 @@
33
from collections.abc import Mapping, Sequence
44
from dataclasses import dataclass
55
from enum import StrEnum
6-
from typing import Final, Self
6+
from typing import Final, Self, cast
7+
from uuid import UUID
78

9+
from ai.backend.common.clients.prometheus.preset import MetricPreset
810
from ai.backend.common.clients.prometheus.types import MetricValue, ValueType
11+
from ai.backend.common.dto.clients.prometheus.response import (
12+
MetricResponseInfo,
13+
PrometheusResponse,
14+
)
15+
from ai.backend.common.exception import InvalidAPIParameters
916
from ai.backend.common.types import KernelId
1017

1118
__all__ = [
19+
"ContainerLiveStatQueries",
20+
"ContainerMetricOptionalLabel",
21+
"ContainerMetricResponseInfo",
22+
"ContainerMetricResult",
1223
"DIFF_METRICS",
1324
"KernelLiveStatBatchResult",
1425
"KernelLiveStatEntry",
26+
"KernelMetricValuesByKernel",
1527
"MetricValue",
28+
"MetricResultValue",
1629
"RATE_METRICS",
17-
"UtilizationMetricType",
30+
"MetricType",
1831
"ValueType",
1932
]
2033

2134

22-
class UtilizationMetricType(StrEnum):
35+
class MetricType(StrEnum):
2336
"""
2437
Specifies the type of a metric value.
2538
"""
@@ -41,13 +54,81 @@ class UtilizationMetricType(StrEnum):
4154
"""
4255

4356

44-
# Metric-name -> UtilizationMetricType classification rules.
57+
# Metric-name -> MetricType classification rules.
4558
# TODO: Refactor to query metric metadata from the repository layer once
4659
# the metadata persistence is available.
60+
61+
62+
@dataclass(frozen=True)
63+
class ContainerLiveStatQueries:
64+
"""Gauge / diff / rate query preset bundle for container live stats."""
65+
66+
gauge: MetricPreset
67+
diff: MetricPreset
68+
rate: MetricPreset
69+
70+
def to_list(self) -> list[MetricPreset]:
71+
return [self.gauge, self.diff, self.rate]
72+
73+
4774
DIFF_METRICS: Final[frozenset[str]] = frozenset({"cpu_util"})
4875
RATE_METRICS: Final[frozenset[str]] = frozenset({"net_rx", "net_tx"})
4976

5077

78+
@dataclass
79+
class ContainerMetricResponseInfo:
80+
value_type: str
81+
container_metric_name: str | None
82+
agent_id: str | None
83+
instance: str | None
84+
job: str | None
85+
kernel_id: str | None
86+
owner_project_id: str | None
87+
owner_user_id: str | None
88+
session_id: str | None
89+
90+
@classmethod
91+
def from_metric_response_info(cls, info: MetricResponseInfo) -> Self:
92+
if info.value_type is None:
93+
raise InvalidAPIParameters(
94+
f"Missing required label 'value_type' for container metric (metric={info.name!r})"
95+
)
96+
return cls(
97+
value_type=info.value_type,
98+
container_metric_name=info.container_metric_name,
99+
agent_id=info.agent_id,
100+
instance=info.instance,
101+
job=info.job,
102+
kernel_id=info.kernel_id,
103+
owner_project_id=info.owner_project_id,
104+
owner_user_id=info.owner_user_id,
105+
session_id=info.session_id,
106+
)
107+
108+
109+
@dataclass
110+
class MetricResultValue:
111+
timestamp: float
112+
value: str
113+
114+
115+
@dataclass
116+
class ContainerMetricOptionalLabel:
117+
value_type: ValueType
118+
119+
agent_id: str | None = None
120+
kernel_id: UUID | None = None
121+
session_id: UUID | None = None
122+
user_id: UUID | None = None
123+
project_id: UUID | None = None
124+
125+
126+
@dataclass
127+
class ContainerMetricResult:
128+
metric: ContainerMetricResponseInfo
129+
values: list[MetricResultValue]
130+
131+
51132
@dataclass(frozen=True)
52133
class KernelLiveStatEntry:
53134
"""All live_stat samples belonging to a single kernel.
@@ -61,7 +142,7 @@ class KernelLiveStatEntry:
61142

62143
@dataclass(frozen=True)
63144
class KernelLiveStatBatchResult:
64-
# Per-kernel batch result for `query_kernel_live_stat_batch`
145+
# Per-kernel bulk result for `query_container_live_stats`
65146

66147
entries: dict[KernelId, KernelLiveStatEntry]
67148

@@ -84,3 +165,43 @@ def from_metric_values(
84165
for kid in kernel_ids
85166
}
86167
)
168+
169+
170+
@dataclass(frozen=True)
171+
class KernelMetricValuesByKernel:
172+
values_by_kernel: dict[KernelId, list[MetricValue]]
173+
174+
@classmethod
175+
def from_prometheus_response(cls, response: PrometheusResponse) -> Self:
176+
grouped: dict[KernelId, list[MetricValue]] = {}
177+
for metric in response.data.result:
178+
info = metric.metric
179+
if not info.has_container_metric_labels or not metric.values:
180+
continue
181+
# Non-None guaranteed by has_container_metric_labels above;
182+
# cast needed because property checks don't narrow types.
183+
kernel_id_str = cast(str, info.kernel_id)
184+
container_metric_name = cast(str, info.container_metric_name)
185+
value_type_str = cast(str, info.value_type)
186+
try:
187+
value_type = ValueType(value_type_str)
188+
kernel_id = KernelId(UUID(kernel_id_str))
189+
except ValueError:
190+
continue
191+
# Instant queries are normalized into a one-element list, and range
192+
# queries are ordered by time, so the last sample is the newest one.
193+
_, raw_value = metric.values[-1]
194+
grouped.setdefault(kernel_id, []).append(
195+
MetricValue(
196+
metric_name=container_metric_name,
197+
value_type=value_type,
198+
value=raw_value,
199+
)
200+
)
201+
return cls(values_by_kernel=grouped)
202+
203+
def merged_with(self, other: Self) -> Self:
204+
merged = {kernel_id: list(values) for kernel_id, values in self.values_by_kernel.items()}
205+
for kernel_id, values in other.values_by_kernel.items():
206+
merged.setdefault(kernel_id, []).extend(values)
207+
return type(self)(values_by_kernel=merged)

0 commit comments

Comments
 (0)