Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions vllm/distributed/kv_transfer/kv_connector/v1/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,11 @@
from vllm.distributed.kv_transfer.kv_connector.v1.metrics import (
KVConnectorPromMetrics,
KVConnectorStats,
PromMetric,
PromMetricT,
)
from vllm.forward_context import ForwardContext
from vllm.v1.core.kv_cache_manager import KVCacheBlocks
from vllm.v1.kv_cache_interface import KVCacheConfig
from vllm.v1.metrics.backends import MetricBackend
from vllm.v1.request import Request

# s_tensor_list, d_tensor_list, s_indices, d_indices, direction
Expand Down Expand Up @@ -571,9 +570,9 @@ def set_xfer_handshake_metadata(
def build_prom_metrics(
cls,
vllm_config: "VllmConfig",
metric_types: dict[type["PromMetric"], type["PromMetricT"]],
backend: "MetricBackend",
labelnames: list[str],
per_engine_labelvalues: dict[int, list[object]],
per_engine_labelvalues: dict[int, list[str]],
) -> Optional["KVConnectorPromMetrics"]:
"""
Create a KVConnectorPromMetrics subclass which should register
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,12 @@
from vllm.distributed.kv_transfer.kv_connector.v1.metrics import (
KVConnectorPromMetrics,
KVConnectorStats,
PromMetric,
PromMetricT,
)
from vllm.forward_context import ForwardContext
from vllm.v1.core.kv_cache_manager import KVCacheBlocks
from vllm.v1.core.kv_cache_utils import BlockHash
from vllm.v1.kv_cache_interface import KVCacheConfig
from vllm.v1.metrics.backends import MetricBackend
from vllm.v1.request import Request

logger = lmcache_init_logger(__name__)
Expand Down Expand Up @@ -804,9 +803,9 @@ def build_kv_connector_stats(
def build_prom_metrics(
cls,
vllm_config: "VllmConfig",
metric_types: dict[type["PromMetric"], type["PromMetricT"]],
backend: "MetricBackend",
labelnames: list[str],
per_engine_labelvalues: dict[int, list[object]],
per_engine_labelvalues: dict[int, list[str]],
) -> Optional["KVConnectorPromMetrics"]:
"""
Create a KVConnectorPromMetrics subclass which should register
Expand Down
43 changes: 21 additions & 22 deletions vllm/distributed/kv_transfer/kv_connector/v1/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,18 @@
from dataclasses import dataclass, field
from typing import Any, TypeAlias, TypeVar

from prometheus_client import Counter, Gauge, Histogram

from vllm.config import KVTransferConfig, VllmConfig
from vllm.distributed.kv_transfer.kv_connector.factory import KVConnectorFactory
from vllm.logger import init_logger

PromMetric: TypeAlias = Gauge | Counter | Histogram
from vllm.v1.metrics.backends import (
AbstractCounter,
AbstractGauge,
AbstractHistogram,
MetricBackend,
PrometheusBackend,
)

PromMetric: TypeAlias = AbstractGauge | AbstractCounter | AbstractHistogram
PromMetricT = TypeVar("PromMetricT", bound=PromMetric)

logger = init_logger(__name__)
Expand Down Expand Up @@ -115,25 +120,23 @@ class KVConnectorPromMetrics:
def __init__(
self,
vllm_config: VllmConfig,
metric_types: dict[type[PromMetric], type[PromMetricT]],
backend: MetricBackend,
labelnames: list[str],
per_engine_labelvalues: dict[int, list[object]],
per_engine_labelvalues: dict[int, list[str]],
):
self._kv_transfer_config = vllm_config.kv_transfer_config
self._gauge_cls = metric_types[Gauge]
self._counter_cls = metric_types[Counter]
self._histogram_cls = metric_types[Histogram]
self._backend = backend
self._labelnames = labelnames
self._per_engine_labelvalues = per_engine_labelvalues

def make_per_engine(self, metric: PromMetric) -> dict[int, PromMetric]:
def make_per_engine(self, metric: PromMetricT) -> dict[int, PromMetricT]:
"""
Create a per-engine child of a prometheus_client.Metric with
the appropriate labels set. The parent metric must be created
using the labelnames list.
"""
return {
idx: metric.labels(*labelvalues)
idx: metric.labels(*labelvalues) # type: ignore[return-value,misc]
for idx, labelvalues in self._per_engine_labelvalues.items()
}

Expand All @@ -154,28 +157,24 @@ class KVConnectorPrometheus:
KVConnectorBase.build_prom_metrics().
"""

_gauge_cls = Gauge
_counter_cls = Counter
_histogram_cls = Histogram

def __init__(
self,
vllm_config: VllmConfig,
labelnames: list[str],
per_engine_labelvalues: dict[int, list[object]],
per_engine_labelvalues: dict[int, list[str]],
backend: MetricBackend | None = None,
):
self.prom_metrics: KVConnectorPromMetrics | None = None
kv_transfer_config = vllm_config.kv_transfer_config

# Use provided backend or default to Prometheus
backend = backend if backend is not None else PrometheusBackend()

if kv_transfer_config and kv_transfer_config.kv_connector:
connector_cls = KVConnectorFactory.get_connector_class(kv_transfer_config)
metric_types = {
Gauge: self._gauge_cls,
Counter: self._counter_cls,
Histogram: self._histogram_cls,
}
self.prom_metrics = connector_cls.build_prom_metrics(
vllm_config,
metric_types,
backend,
labelnames,
per_engine_labelvalues,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
from vllm.distributed.kv_transfer.kv_connector.v1.metrics import (
KVConnectorPromMetrics,
KVConnectorStats,
PromMetric,
PromMetricT,
)
from vllm.logger import init_logger
from vllm.v1.core.sched.output import SchedulerOutput
Expand All @@ -32,6 +30,7 @@
from vllm.forward_context import ForwardContext
from vllm.v1.core.kv_cache_manager import KVCacheBlocks
from vllm.v1.kv_cache_interface import KVCacheConfig
from vllm.v1.metrics.backends import MetricBackend
from vllm.v1.request import Request

logger = init_logger(__name__)
Expand Down Expand Up @@ -83,12 +82,12 @@ class MultiKVConnectorPromMetrics(KVConnectorPromMetrics):
def __init__(
self,
vllm_config: "VllmConfig",
metric_types: dict[type[PromMetric], type[PromMetricT]],
backend: "MetricBackend",
labelnames: list[str],
per_engine_labelvalues: dict[int, list[object]],
per_engine_labelvalues: dict[int, list[str]],
prom_metrics: dict[str, KVConnectorPromMetrics],
):
super().__init__(vllm_config, metric_types, labelnames, per_engine_labelvalues)
super().__init__(vllm_config, backend, labelnames, per_engine_labelvalues)
self._prom_metrics = prom_metrics

def observe(self, transfer_stats_data: dict[str, Any], engine_idx: int = 0):
Expand Down Expand Up @@ -438,22 +437,22 @@ def get_kv_connector_stats(self) -> MultiKVConnectorStats | None:
def build_prom_metrics(
cls,
vllm_config: "VllmConfig",
metric_types: dict[type["PromMetric"], type["PromMetricT"]],
backend: "MetricBackend",
labelnames: list[str],
per_engine_labelvalues: dict[int, list[object]],
per_engine_labelvalues: dict[int, list[str]],
) -> KVConnectorPromMetrics:
prom_metrics: dict[str, KVConnectorPromMetrics] = {}
for connector_cls, temp_config in cls._get_connector_classes_and_configs(
vllm_config
):
connector_prom = connector_cls.build_prom_metrics(
temp_config, metric_types, labelnames, per_engine_labelvalues
temp_config, backend, labelnames, per_engine_labelvalues
)
if connector_prom is not None:
prom_metrics[connector_cls.__name__] = connector_prom
return MultiKVConnectorPromMetrics(
vllm_config,
metric_types,
backend,
labelnames,
per_engine_labelvalues,
prom_metrics,
Expand Down
95 changes: 50 additions & 45 deletions vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@
from vllm.distributed.kv_transfer.kv_connector.v1.metrics import (
KVConnectorPromMetrics,
KVConnectorStats,
PromMetric,
PromMetricT,
)
from vllm.distributed.parallel_state import (
get_tensor_model_parallel_rank,
Expand All @@ -53,6 +51,11 @@
if TYPE_CHECKING:
from vllm.v1.core.kv_cache_manager import KVCacheBlocks
from vllm.v1.kv_cache_interface import KVCacheConfig
from vllm.v1.metrics.backends import (
AbstractCounter,
AbstractHistogram,
MetricBackend,
)
from vllm.v1.request import Request

TransferHandle = int
Expand Down Expand Up @@ -398,13 +401,11 @@ def build_kv_connector_stats(
def build_prom_metrics(
cls,
vllm_config: VllmConfig,
metric_types: dict[type[PromMetric], type[PromMetricT]],
backend: "MetricBackend",
labelnames: list[str],
per_engine_labelvalues: dict[int, list[object]],
per_engine_labelvalues: dict[int, list[str]],
) -> KVConnectorPromMetrics:
return NixlPromMetrics(
vllm_config, metric_types, labelnames, per_engine_labelvalues
)
return NixlPromMetrics(vllm_config, backend, labelnames, per_engine_labelvalues)

def start_load_kv(self, forward_context: "ForwardContext", **kwargs) -> None:
assert self.connector_worker is not None
Expand Down Expand Up @@ -2408,11 +2409,11 @@ class NixlPromMetrics(KVConnectorPromMetrics):
def __init__(
self,
vllm_config: VllmConfig,
metric_types: dict[type[PromMetric], type[PromMetricT]],
backend: "MetricBackend",
labelnames: list[str],
per_engine_labelvalues: dict[int, list[object]],
per_engine_labelvalues: dict[int, list[str]],
):
super().__init__(vllm_config, metric_types, labelnames, per_engine_labelvalues)
super().__init__(vllm_config, backend, labelnames, per_engine_labelvalues)

buckets = [
0.001,
Expand All @@ -2429,74 +2430,78 @@ def __init__(
1.0,
5.0,
]
nixl_histogram_xfer_time = self._histogram_cls(
nixl_histogram_xfer_time = self._backend.create_histogram(
name="vllm:nixl_xfer_time_seconds",
documentation="Histogram of transfer duration for NIXL KV Cache transfers.",
buckets=buckets[1:],
labelnames=labelnames,
)
self.nixl_histogram_xfer_time = self.make_per_engine(nixl_histogram_xfer_time)
nixl_histogram_post_time = self._histogram_cls(
self.nixl_histogram_xfer_time: dict[int, AbstractHistogram] = (
self.make_per_engine(nixl_histogram_xfer_time)
) # type: ignore[assignment]
nixl_histogram_post_time = self._backend.create_histogram(
name="vllm:nixl_post_time_seconds",
documentation="Histogram of transfer post time for NIXL KV"
" Cache transfers.",
buckets=buckets,
labelnames=labelnames,
)
self.nixl_histogram_post_time = self.make_per_engine(nixl_histogram_post_time)
self.nixl_histogram_post_time: dict[int, AbstractHistogram] = (
self.make_per_engine(nixl_histogram_post_time)
) # type: ignore[assignment]
# uniform 2kb to 16gb range
buckets = [2 ** (10 + i) for i in range(1, 25, 2)]
nixl_histogram_bytes_transferred = self._histogram_cls(
buckets = [float(2 ** (10 + i)) for i in range(1, 25, 2)]
nixl_histogram_bytes_transferred = self._backend.create_histogram(
name="vllm:nixl_bytes_transferred",
documentation="Histogram of bytes transferred per NIXL KV Cache transfers.",
buckets=buckets,
labelnames=labelnames,
)
self.nixl_histogram_bytes_transferred = self.make_per_engine(
nixl_histogram_bytes_transferred
)
self.nixl_histogram_bytes_transferred: dict[int, AbstractHistogram] = (
self.make_per_engine(nixl_histogram_bytes_transferred)
) # type: ignore[assignment]
buckets = [
10,
20,
30,
50,
75,
100,
200,
400,
1000,
2000,
4000,
10000,
20000,
50000,
10.0,
20.0,
30.0,
50.0,
75.0,
100.0,
200.0,
400.0,
1000.0,
2000.0,
4000.0,
10000.0,
20000.0,
50000.0,
]
nixl_histogram_num_descriptors = self._histogram_cls(
nixl_histogram_num_descriptors = self._backend.create_histogram(
name="vllm:nixl_num_descriptors",
documentation="Histogram of number of descriptors per NIXL"
" KV Cache transfers.",
buckets=buckets,
labelnames=labelnames,
)
self.nixl_histogram_num_descriptors = self.make_per_engine(
nixl_histogram_num_descriptors
)
counter_nixl_num_failed_transfers = self._counter_cls(
self.nixl_histogram_num_descriptors: dict[int, AbstractHistogram] = (
self.make_per_engine(nixl_histogram_num_descriptors)
) # type: ignore[assignment]
counter_nixl_num_failed_transfers = self._backend.create_counter(
name="vllm:nixl_num_failed_transfers",
documentation="Number of failed NIXL KV Cache transfers.",
labelnames=labelnames,
)
self.counter_nixl_num_failed_transfers = self.make_per_engine(
counter_nixl_num_failed_transfers
)
counter_nixl_num_failed_notifications = self._counter_cls(
self.counter_nixl_num_failed_transfers: dict[int, AbstractCounter] = (
self.make_per_engine(counter_nixl_num_failed_transfers)
) # type: ignore[assignment]
counter_nixl_num_failed_notifications = self._backend.create_counter(
name="vllm:nixl_num_failed_notifications",
documentation="Number of failed NIXL KV Cache notifications.",
labelnames=labelnames,
)
self.counter_nixl_num_failed_notifications = self.make_per_engine(
counter_nixl_num_failed_notifications
)
self.counter_nixl_num_failed_notifications: dict[int, AbstractCounter] = (
self.make_per_engine(counter_nixl_num_failed_notifications)
) # type: ignore[assignment]

def observe(self, transfer_stats_data: dict[str, Any], engine_idx: int = 0):
for prom_obj, list_item_key in zip(
Expand Down
29 changes: 29 additions & 0 deletions vllm/v1/metrics/backends/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project

"""Metrics backend abstraction layer.

This module provides a unified interface for metrics collection that supports
multiple backend implementations (Prometheus, OpenTelemetry, etc.).

The abstraction allows metrics to be defined once and exported to different
backends without code duplication.
"""

from vllm.v1.metrics.backends.abstract import (
AbstractCounter,
AbstractGauge,
AbstractHistogram,
MetricBackend,
)
from vllm.v1.metrics.backends.otel_backend import OTELBackend
from vllm.v1.metrics.backends.prometheus_backend import PrometheusBackend

__all__ = [
"MetricBackend",
"AbstractCounter",
"AbstractGauge",
"AbstractHistogram",
"PrometheusBackend",
"OTELBackend",
]
Loading