Skip to content

Commit 521cb0e

Browse files
authored
[Serve] Add replica utilization metric for Ray Serve (ray-project#60758)
### Summary Adds a new `ray_serve_replica_utilization_percent` Gauge metric that measures what percentage of a replica's capacity is being used over a rolling time window. This metric is useful for capacity planning and identifying underutilized or overloaded replicas. **Formula:** `(total_user_code_execution_time / (window_duration × max_ongoing_requests)) × 100` ### Changes - **New metric**: `ray_serve_replica_utilization_percent` with tags `deployment`, `replica`, `application` - **New module**: `rolling_window_accumulator.py` - a lock-free, thread-safe rolling window implementation using thread-local storage for minimal overhead on the request hot path (~0.5µs per `add()`) - **Configurable** via environment variables: - `RAY_SERVE_REPLICA_UTILIZATION_WINDOW_S` (default: 600s) - `RAY_SERVE_REPLICA_UTILIZATION_REPORT_INTERVAL_S` (default: 10s) - `RAY_SERVE_REPLICA_UTILIZATION_NUM_BUCKETS` (default: 60) ### Test plan - [x] Unit tests for `RollingWindowAccumulator` (32 tests covering single-threaded, multi-threaded, edge cases, and thread isolation) - [x] End-to-end integration test in `test_metrics.py` - [x] Benchmark confirms sub-microsecond overhead with no degradation under concurrent load <img width="285" height="168" alt="image" src="https://github.com/user-attachments/assets/ff59d071-56cc-4aab-8547-22b9780a318e" /> ### Related issue Closes ray-project#60755 <!-- BUGBOT_STATUS --><sup><a href="https://cursor.com/dashboard?tab=bugbot">Cursor Bugbot</a> found 1 potential issue for commit <u>920c18d</u></sup><!-- /BUGBOT_STATUS --> --------- Signed-off-by: abrar <abrar@anyscale.com>
1 parent aec74f0 commit 521cb0e

File tree

7 files changed

+1029
-0
lines changed

7 files changed

+1029
-0
lines changed

doc/source/serve/monitoring.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -662,6 +662,7 @@ These metrics track request throughput, errors, and latency at the replica level
662662
| Metric | Type | Tags | Description |
663663
|--------|------|------|-------------|
664664
| `ray_serve_replica_processing_queries` **[D]** | Gauge | `deployment`, `replica`, `application` | Current number of requests being processed by the replica. |
665+
| `ray_serve_replica_utilization_percent` **[D]** | Gauge | `deployment`, `replica`, `application` | Percentage of replica capacity used over a rolling window. Calculated as total user code execution time divided by maximum capacity (`window_duration × max_ongoing_requests`). Useful for capacity planning and identifying underutilized or overloaded replicas. Configure with `RAY_SERVE_REPLICA_UTILIZATION_WINDOW_S` (default: 600s), `RAY_SERVE_REPLICA_UTILIZATION_REPORT_INTERVAL_S` (default: 10s), and `RAY_SERVE_REPLICA_UTILIZATION_NUM_BUCKETS` (default: 60). |
665666
| `ray_serve_deployment_request_counter_total` **[D]** | Counter | `deployment`, `replica`, `route`, `application` | Total number of requests processed by the replica. |
666667
| `ray_serve_deployment_processing_latency_ms` **[D]** | Histogram | `deployment`, `replica`, `route`, `application` | Histogram of request processing time in milliseconds (excludes queue wait time). |
667668
| `ray_serve_deployment_error_counter_total` **[D]** | Counter | `deployment`, `replica`, `route`, `application` | Total number of exceptions raised while processing requests. |

python/ray/serve/_private/constants.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,20 @@
165165
DEFAULT_BATCH_UTILIZATION_BUCKETS_PERCENT,
166166
)
167167

168+
#: Replica utilization metric configuration.
169+
#: Rolling window duration for calculating replica utilization (in seconds).
170+
RAY_SERVE_REPLICA_UTILIZATION_WINDOW_S = float(
171+
get_env_str("RAY_SERVE_REPLICA_UTILIZATION_WINDOW_S", "600")
172+
)
173+
#: Interval for reporting replica utilization metric (in seconds).
174+
RAY_SERVE_REPLICA_UTILIZATION_REPORT_INTERVAL_S = float(
175+
get_env_str("RAY_SERVE_REPLICA_UTILIZATION_REPORT_INTERVAL_S", "10")
176+
)
177+
#: Number of buckets for the rolling window (determines granularity).
178+
RAY_SERVE_REPLICA_UTILIZATION_NUM_BUCKETS = int(
179+
get_env_str("RAY_SERVE_REPLICA_UTILIZATION_NUM_BUCKETS", "60")
180+
)
181+
168182
#: Histogram buckets for actual batch size.
169183
DEFAULT_BATCH_SIZE_BUCKETS = [
170184
1,

python/ray/serve/_private/replica.py

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,9 @@
7070
RAY_SERVE_RECORD_AUTOSCALING_STATS_TIMEOUT_S,
7171
RAY_SERVE_REPLICA_AUTOSCALING_METRIC_RECORD_INTERVAL_S,
7272
RAY_SERVE_REPLICA_GRPC_MAX_MESSAGE_LENGTH,
73+
RAY_SERVE_REPLICA_UTILIZATION_NUM_BUCKETS,
74+
RAY_SERVE_REPLICA_UTILIZATION_REPORT_INTERVAL_S,
75+
RAY_SERVE_REPLICA_UTILIZATION_WINDOW_S,
7376
RAY_SERVE_REQUEST_PATH_LOG_BUFFER_SIZE,
7477
RAY_SERVE_RUN_SYNC_IN_THREADPOOL,
7578
RAY_SERVE_RUN_SYNC_IN_THREADPOOL_WARNING,
@@ -121,6 +124,7 @@
121124
from ray.serve._private.metrics_utils import InMemoryMetricsStore, MetricsPusher
122125
from ray.serve._private.proxy_request_response import ResponseStatus
123126
from ray.serve._private.replica_response_generator import ReplicaResponseGenerator
127+
from ray.serve._private.rolling_window_accumulator import RollingWindowAccumulator
124128
from ray.serve._private.serialization import RPCSerializer
125129
from ray.serve._private.task_consumer import TaskConsumerWrapper
126130
from ray.serve._private.thirdparty.get_asgi_route_name import (
@@ -299,6 +303,7 @@ def __init__(
299303
event_loop: asyncio.BaseEventLoop,
300304
autoscaling_config: Optional[AutoscalingConfig],
301305
ingress: bool,
306+
max_ongoing_requests: int,
302307
):
303308
self._replica_id = replica_id
304309
self._deployment_id = replica_id.deployment_id
@@ -309,6 +314,7 @@ def __init__(
309314
SERVE_CONTROLLER_NAME, namespace=SERVE_NAMESPACE
310315
)
311316
self._num_ongoing_requests = 0
317+
self._max_ongoing_requests = max_ongoing_requests
312318
# Store event loop for scheduling async tasks from sync context
313319
self._event_loop = event_loop or asyncio.get_event_loop()
314320

@@ -384,6 +390,26 @@ def __init__(
384390
boundaries=REQUEST_LATENCY_BUCKETS_MS,
385391
)
386392

393+
# Replica utilization tracking with rolling window.
394+
# Tracks total user code execution time over a rolling window to calculate
395+
# utilization as: user_code_time / (window_duration * max_ongoing_requests).
396+
self._user_code_time_accumulator = RollingWindowAccumulator(
397+
window_duration_s=RAY_SERVE_REPLICA_UTILIZATION_WINDOW_S,
398+
num_buckets=RAY_SERVE_REPLICA_UTILIZATION_NUM_BUCKETS,
399+
)
400+
self._replica_utilization_gauge = metrics.Gauge(
401+
"serve_replica_utilization_percent",
402+
description=(
403+
"Percentage of replica capacity utilized by user code execution "
404+
"over a rolling window. Calculated as: "
405+
"user_code_time / (window_duration * max_ongoing_requests)."
406+
),
407+
)
408+
self._utilization_report_interval_s = (
409+
RAY_SERVE_REPLICA_UTILIZATION_REPORT_INTERVAL_S
410+
)
411+
self._event_loop.create_task(self._report_utilization_forever())
412+
387413
self.set_autoscaling_config(autoscaling_config)
388414

389415
if self._is_direct_ingress:
@@ -701,8 +727,56 @@ def get_num_ongoing_requests(self) -> int:
701727
"""Get current total queue length of requests for this replica."""
702728
return self._num_ongoing_requests
703729

730+
def set_max_ongoing_requests(self, max_ongoing_requests: int) -> None:
731+
"""Update max_ongoing_requests when deployment config changes."""
732+
self._max_ongoing_requests = max_ongoing_requests
733+
734+
async def _report_utilization_forever(self) -> None:
735+
"""Background task to emit utilization gauge continuously."""
736+
consecutive_errors = 0
737+
while True:
738+
try:
739+
await asyncio.sleep(self._utilization_report_interval_s)
740+
utilization = self._calculate_utilization()
741+
self._replica_utilization_gauge.set(utilization)
742+
consecutive_errors = 0
743+
except Exception:
744+
logger.exception("Unexpected error reporting utilization metrics.")
745+
746+
# Exponential backoff starting at 1s and capping at 10s.
747+
backoff_time_s = min(10, 2**consecutive_errors)
748+
consecutive_errors += 1
749+
await asyncio.sleep(backoff_time_s)
750+
751+
def _calculate_utilization(self) -> float:
752+
"""Calculate current utilization percentage based on rolling window.
753+
754+
Utilization is calculated as:
755+
user_code_time / (window_duration * max_ongoing_requests)
756+
757+
This represents the percentage of the replica's theoretical maximum
758+
capacity that was used for executing user code.
759+
"""
760+
total_user_code_time_ms = self._user_code_time_accumulator.get_total()
761+
762+
# Max capacity = window_duration_ms * max_ongoing_requests
763+
window_duration_ms = RAY_SERVE_REPLICA_UTILIZATION_WINDOW_S * 1000
764+
max_capacity_ms = window_duration_ms * self._max_ongoing_requests
765+
766+
if max_capacity_ms > 0:
767+
utilization_percent = (total_user_code_time_ms / max_capacity_ms) * 100
768+
# Cap at 100% (can theoretically exceed if requests overlap heavily)
769+
utilization_percent = min(utilization_percent, 100.0)
770+
else:
771+
utilization_percent = 0.0
772+
773+
return utilization_percent
774+
704775
def record_request_metrics(self, *, route: str, latency_ms: float, was_error: bool):
705776
"""Records per-request metrics."""
777+
# Track latency for utilization calculation (rolling window).
778+
self._user_code_time_accumulator.add(latency_ms)
779+
706780
if self._cached_metrics_enabled:
707781
self._cached_latencies[route].append(latency_ms)
708782
if was_error:
@@ -948,6 +1022,7 @@ def __init__(
9481022
event_loop=self._event_loop,
9491023
autoscaling_config=self._deployment_config.autoscaling_config,
9501024
ingress=ingress,
1025+
max_ongoing_requests=self._deployment_config.max_ongoing_requests,
9511026
)
9521027

9531028
# Start event loop monitoring for the replica's main event loop.
@@ -1460,6 +1535,9 @@ async def reconfigure(
14601535
self._metrics_manager.set_autoscaling_config(
14611536
deployment_config.autoscaling_config
14621537
)
1538+
self._metrics_manager.set_max_ongoing_requests(
1539+
deployment_config.max_ongoing_requests
1540+
)
14631541
if logging_config_changed:
14641542
self._configure_logger_and_profilers(deployment_config.logging_config)
14651543

Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
import threading
2+
import time
3+
from typing import List
4+
5+
6+
class _ThreadBuckets:
7+
"""Per-thread bucket storage for rolling window accumulator.
8+
9+
Each thread gets its own instance to avoid lock contention on the hot path.
10+
"""
11+
12+
# This is a performance optimization to avoid creating a dictionary for the instance.
13+
__slots__ = ("buckets", "current_bucket_idx", "last_rotation_time")
14+
15+
def __init__(self, num_buckets: int):
16+
self.buckets = [0.0] * num_buckets
17+
self.current_bucket_idx = 0
18+
self.last_rotation_time = time.time()
19+
20+
21+
class _ThreadLocalRef(threading.local):
22+
"""Thread-local reference to the thread's _ThreadBuckets instance."""
23+
24+
def __init__(self):
25+
super().__init__()
26+
# by using threading.local, each thread gets its own instance of _ThreadBuckets.
27+
self.data: _ThreadBuckets = None
28+
29+
30+
class RollingWindowAccumulator:
31+
"""Tracks cumulative values over a rolling time window.
32+
33+
Uses bucketing for memory efficiency - divides the window into N buckets
34+
and rotates them as time passes. This allows efficient tracking of values
35+
over a sliding window without storing individual data points.
36+
37+
Uses thread-local storage for lock-free writes on the hot path (add()).
38+
Only get_total() requires synchronization to aggregate across threads.
39+
40+
Example:
41+
# Create a 10-minute rolling window with 60 buckets (10s each)
42+
accumulator = RollingWindowAccumulator(
43+
window_duration_s=600.0,
44+
num_buckets=60,
45+
)
46+
47+
# Add values (lock-free, safe from multiple threads)
48+
accumulator.add(100.0)
49+
accumulator.add(50.0)
50+
51+
# Get total (aggregates across all threads)
52+
total = accumulator.get_total()
53+
54+
Thread Safety:
55+
- add() is lock-free after the first call from each thread
56+
- get_total() acquires a lock to aggregate across threads
57+
- Safe to call from multiple threads concurrently
58+
"""
59+
60+
def __init__(
61+
self,
62+
window_duration_s: float,
63+
num_buckets: int = 60,
64+
):
65+
"""Initialize the rolling window accumulator.
66+
67+
Args:
68+
window_duration_s: Total duration of the rolling window in seconds.
69+
Values older than this are automatically expired.
70+
num_buckets: Number of buckets to divide the window into. More buckets
71+
gives finer granularity but uses slightly more memory. Default is 60,
72+
which for a 10-minute window gives 10-second granularity.
73+
74+
Raises:
75+
ValueError: If window_duration_s <= 0 or num_buckets <= 0.
76+
"""
77+
if window_duration_s <= 0:
78+
raise ValueError(
79+
f"window_duration_s must be positive, got {window_duration_s}"
80+
)
81+
if num_buckets <= 0:
82+
raise ValueError(f"num_buckets must be positive, got {num_buckets}")
83+
84+
self._window_duration_s = window_duration_s
85+
self._num_buckets = num_buckets
86+
self._bucket_duration_s = window_duration_s / num_buckets
87+
88+
# Thread-local reference to per-thread bucket data
89+
self._local = _ThreadLocalRef()
90+
91+
# Track all per-thread bucket instances for aggregation
92+
self._all_thread_data: List[_ThreadBuckets] = []
93+
self._registry_lock = threading.Lock()
94+
95+
@property
96+
def window_duration_s(self) -> float:
97+
"""The total duration of the rolling window in seconds."""
98+
return self._window_duration_s
99+
100+
@property
101+
def num_buckets(self) -> int:
102+
"""The number of buckets in the rolling window."""
103+
return self._num_buckets
104+
105+
@property
106+
def bucket_duration_s(self) -> float:
107+
"""The duration of each bucket in seconds."""
108+
return self._bucket_duration_s
109+
110+
def _ensure_initialized(self) -> _ThreadBuckets:
111+
"""Ensure thread-local storage is initialized for the current thread.
112+
113+
This is called on every add() but the fast path (already initialized)
114+
is just a single attribute check with no locking.
115+
116+
Returns:
117+
The _ThreadBuckets instance for the current thread.
118+
"""
119+
data = self._local.data
120+
if data is not None:
121+
return data
122+
123+
# Slow path: first call from this thread
124+
data = _ThreadBuckets(self._num_buckets)
125+
self._local.data = data
126+
127+
# Register for aggregation (only happens once per thread)
128+
with self._registry_lock:
129+
self._all_thread_data.append(data)
130+
131+
return data
132+
133+
def _rotate_buckets_if_needed(self, data: _ThreadBuckets) -> None:
134+
"""Rotate buckets for the given thread's storage.
135+
136+
Advances the current bucket index and clears old buckets as time passes.
137+
"""
138+
now = time.time()
139+
elapsed = now - data.last_rotation_time
140+
buckets_to_advance = int(elapsed / self._bucket_duration_s)
141+
142+
if buckets_to_advance > 0:
143+
if buckets_to_advance >= self._num_buckets:
144+
# All buckets have expired, reset everything
145+
data.buckets = [0.0] * self._num_buckets
146+
data.current_bucket_idx = 0
147+
else:
148+
# Clear old buckets as we advance
149+
for _ in range(buckets_to_advance):
150+
data.current_bucket_idx = (
151+
data.current_bucket_idx + 1
152+
) % self._num_buckets
153+
data.buckets[data.current_bucket_idx] = 0.0
154+
155+
data.last_rotation_time = now
156+
157+
def add(self, value: float) -> None:
158+
"""Add a value to the current bucket.
159+
160+
This operation is lock-free for the calling thread after the first call.
161+
Safe to call from multiple threads concurrently.
162+
163+
Args:
164+
value: The value to add to the accumulator.
165+
"""
166+
# Fast path: just check if initialized (no lock)
167+
data = self._ensure_initialized()
168+
169+
# Lock-free: only touches thread-local data
170+
self._rotate_buckets_if_needed(data)
171+
data.buckets[data.current_bucket_idx] += value
172+
173+
def get_total(self) -> float:
174+
"""Get total value across all buckets in the window.
175+
176+
This aggregates values from all threads that have called add().
177+
Expired buckets (older than window_duration_s) are not included.
178+
179+
Note: We are accepting some inaccuracy in the total value to avoid the overhead of a lock.
180+
This is acceptable because we are only using this for utilization metrics, which are not
181+
critical for the overall system. Given that the default window duration is 600s and the
182+
default report interval is 10s, the inaccuracy is less than 0.16%.
183+
184+
Returns:
185+
The sum of all non-expired values in the rolling window.
186+
"""
187+
total = 0.0
188+
now = time.time()
189+
190+
with self._registry_lock:
191+
for data in self._all_thread_data:
192+
# Calculate which buckets are still valid for this thread's data
193+
elapsed = now - data.last_rotation_time
194+
buckets_expired = int(elapsed / self._bucket_duration_s)
195+
196+
if buckets_expired >= self._num_buckets:
197+
# All buckets have expired for this thread
198+
continue
199+
200+
# Sum buckets that haven't expired
201+
# Buckets are arranged in a circular buffer, with current_bucket_idx
202+
# being the most recent. We need to skip buckets that have expired.
203+
for i in range(self._num_buckets - buckets_expired):
204+
# Go backwards from current bucket
205+
idx = (data.current_bucket_idx - i) % self._num_buckets
206+
total += data.buckets[idx]
207+
208+
return total
209+
210+
def get_num_registered_threads(self) -> int:
211+
"""Get the number of threads that have called add().
212+
213+
Useful for debugging and testing.
214+
215+
Returns:
216+
The number of threads registered with this accumulator.
217+
"""
218+
with self._registry_lock:
219+
return len(self._all_thread_data)

python/ray/serve/tests/BUILD.bazel

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,10 @@ py_test_module_list(
244244
timeout = "long",
245245
env = {
246246
"RAY_SERVE_ROUTER_QUEUE_LEN_GAUGE_THROTTLE_S": "0",
247+
"RAY_SERVE_RUN_SYNC_IN_THREADPOOL": "1",
248+
"RAY_SERVE_REPLICA_UTILIZATION_WINDOW_S": "1",
249+
"RAY_SERVE_REPLICA_UTILIZATION_REPORT_INTERVAL_S": "1",
250+
"RAY_SERVE_REPLICA_UTILIZATION_NUM_BUCKETS": "10",
247251
},
248252
files = [
249253
"test_deploy_app.py",

0 commit comments

Comments
 (0)