Skip to content

Commit 38571f1

Browse files
authored
[Serve] Expose health metrics from controller (ray-project#60473)
1 parent cb31660 commit 38571f1

File tree

7 files changed

+854
-7
lines changed

7 files changed

+854
-7
lines changed

python/ray/serve/_private/controller.py

Lines changed: 51 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@
4242
SERVE_LOGGER_NAME,
4343
SERVE_NAMESPACE,
4444
)
45+
from ray.serve._private.controller_health_metrics_tracker import (
46+
ControllerHealthMetricsTracker,
47+
)
4548
from ray.serve._private.default_impl import create_cluster_node_info_cache
4649
from ray.serve._private.deployment_info import DeploymentInfo
4750
from ray.serve._private.deployment_state import DeploymentStateManager
@@ -230,6 +233,11 @@ async def __init__(
230233
self._shutdown_event = asyncio.Event()
231234
self._shutdown_start_time = None
232235

236+
# Initialize health metrics tracker
237+
self._health_metrics_tracker = ControllerHealthMetricsTracker(
238+
controller_start_time=time.time()
239+
)
240+
233241
self._create_control_loop_metrics()
234242
run_background_task(self.run_control_loop())
235243

@@ -304,6 +312,8 @@ def record_autoscaling_metrics_from_replica(
304312
"replica": replica_metric_report.replica_id.unique_id,
305313
},
306314
)
315+
# Track in health metrics
316+
self._health_metrics_tracker.record_replica_metrics_delay(latency_ms)
307317
if latency_ms > RAY_SERVE_RPC_LATENCY_WARNING_THRESHOLD_MS:
308318
logger.warning(
309319
f"Received autoscaling metrics from replica {replica_metric_report.replica_id} with timestamp {replica_metric_report.timestamp} "
@@ -329,6 +339,8 @@ def record_autoscaling_metrics_from_handle(
329339
"handle": handle_metric_report.handle_id,
330340
},
331341
)
342+
# Track in health metrics
343+
self._health_metrics_tracker.record_handle_metrics_delay(latency_ms)
332344
if latency_ms > RAY_SERVE_RPC_LATENCY_WARNING_THRESHOLD_MS:
333345
logger.warning(
334346
f"Received autoscaling metrics from handle {handle_metric_report.handle_id} for deployment {handle_metric_report.deployment_id} with timestamp {handle_metric_report.timestamp} "
@@ -507,13 +519,18 @@ async def run_control_loop(self) -> None:
507519
extra={"log_to_stderr": False},
508520
)
509521
self.control_loop_duration_gauge_s.set(loop_duration)
522+
# Track in health metrics
523+
self._health_metrics_tracker.record_loop_duration(loop_duration)
510524

511525
num_loops += 1
512526
self.num_control_loops_gauge.set(num_loops)
527+
self._health_metrics_tracker.num_control_loops = num_loops
513528

514529
sleep_start_time = time.time()
515530
await asyncio.sleep(CONTROL_LOOP_INTERVAL_S)
516-
self.sleep_duration_gauge_s.set(time.time() - sleep_start_time)
531+
sleep_duration = time.time() - sleep_start_time
532+
self.sleep_duration_gauge_s.set(sleep_duration)
533+
self._health_metrics_tracker.last_sleep_duration_s = sleep_duration
517534

518535
async def run_control_loop_step(
519536
self, start_time: float, recovering_timeout: float, num_loops: int
@@ -546,7 +563,9 @@ async def run_control_loop_step(
546563
dsm_update_start_time = time.time()
547564
any_recovering = self.deployment_state_manager.update()
548565

549-
self.dsm_update_duration_gauge_s.set(time.time() - dsm_update_start_time)
566+
dsm_duration = time.time() - dsm_update_start_time
567+
self.dsm_update_duration_gauge_s.set(dsm_duration)
568+
self._health_metrics_tracker.record_dsm_update_duration(dsm_duration)
550569
if not self.done_recovering_event.is_set() and not any_recovering:
551570
self.done_recovering_event.set()
552571
if num_loops > 0:
@@ -564,7 +583,9 @@ async def run_control_loop_step(
564583
any_target_state_changed = self.application_state_manager.update()
565584
if any_recovering or any_target_state_changed:
566585
self._refresh_autoscaling_deployments_cache()
567-
self.asm_update_duration_gauge_s.set(time.time() - asm_update_start_time)
586+
asm_duration = time.time() - asm_update_start_time
587+
self.asm_update_duration_gauge_s.set(asm_duration)
588+
self._health_metrics_tracker.record_asm_update_duration(asm_duration)
568589
except Exception:
569590
logger.exception("Exception updating application state.")
570591

@@ -577,7 +598,9 @@ async def run_control_loop_step(
577598
# so they are more consistent.
578599
node_update_start_time = time.time()
579600
self._update_proxy_nodes()
580-
self.node_update_duration_gauge_s.set(time.time() - node_update_start_time)
601+
node_update_duration = time.time() - node_update_start_time
602+
self.node_update_duration_gauge_s.set(node_update_duration)
603+
self._health_metrics_tracker.record_node_update_duration(node_update_duration)
581604

582605
# Don't update proxy_state until after the done recovering event is set,
583606
# otherwise we may start a new proxy but not broadcast it any
@@ -586,8 +609,10 @@ async def run_control_loop_step(
586609
try:
587610
proxy_update_start_time = time.time()
588611
self.proxy_state_manager.update(proxy_nodes=self._proxy_nodes)
589-
self.proxy_update_duration_gauge_s.set(
590-
time.time() - proxy_update_start_time
612+
proxy_update_duration = time.time() - proxy_update_start_time
613+
self.proxy_update_duration_gauge_s.set(proxy_update_duration)
614+
self._health_metrics_tracker.record_proxy_update_duration(
615+
proxy_update_duration
591616
)
592617
except Exception:
593618
logger.exception("Exception updating proxy state.")
@@ -725,6 +750,26 @@ def get_actor_details(self) -> ServeActorDetails:
725750
"""
726751
return self._actor_details
727752

753+
def get_health_metrics(self) -> Dict[str, Any]:
754+
"""Returns comprehensive health metrics for the controller.
755+
756+
This method provides detailed performance metrics to help diagnose
757+
controller health issues, especially as cluster size increases.
758+
759+
Returns:
760+
Dictionary containing health metrics including:
761+
- Control loop performance (iteration speed, durations)
762+
- Event loop health (task count, scheduling delay)
763+
- Component update latencies
764+
- Autoscaling metrics latency (handle/replica)
765+
- Memory usage
766+
"""
767+
try:
768+
return self._health_metrics_tracker.collect_metrics().dict()
769+
except Exception:
770+
logger.exception("Exception collecting controller health metrics.")
771+
raise
772+
728773
def get_proxy_details(self, node_id: str) -> Optional[ProxyDetails]:
729774
"""Returns the proxy details for the proxy on the given node.
730775
Lines changed: 217 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,217 @@
1+
import asyncio
2+
import math
3+
import sys
4+
import time
5+
from collections import deque
6+
from dataclasses import dataclass, field
7+
from typing import Deque, List, Optional
8+
9+
from ray._common.pydantic_compat import BaseModel
10+
from ray.serve._private.constants import CONTROL_LOOP_INTERVAL_S
11+
12+
# Number of recent loop iterations to track for rolling averages
13+
_HEALTH_METRICS_HISTORY_SIZE = 100
14+
15+
16+
class DurationStats(BaseModel):
17+
"""Statistics for a collection of duration/latency measurements."""
18+
19+
mean: float = 0.0
20+
std: float = 0.0
21+
min: float = 0.0
22+
max: float = 0.0
23+
24+
@classmethod
25+
def from_values(cls, values: List[float]) -> "DurationStats":
26+
"""Compute statistics from a list of values."""
27+
if not values:
28+
return cls()
29+
30+
n = len(values)
31+
mean = sum(values) / n
32+
min_val = min(values)
33+
max_val = max(values)
34+
35+
# Compute standard deviation
36+
if n > 1:
37+
variance = sum((x - mean) ** 2 for x in values) / n
38+
std = math.sqrt(variance)
39+
else:
40+
std = 0.0
41+
42+
return cls(mean=mean, std=std, min=min_val, max=max_val)
43+
44+
45+
class ControllerHealthMetrics(BaseModel):
46+
"""Health metrics for the Ray Serve controller.
47+
48+
These metrics help diagnose controller performance issues, especially
49+
as cluster size increases.
50+
"""
51+
52+
# Timestamps
53+
timestamp: float = 0.0 # When these metrics were collected
54+
controller_start_time: float = 0.0 # When the controller started
55+
uptime_s: float = 0.0 # Controller uptime in seconds
56+
57+
# Control loop metrics
58+
num_control_loops: int = 0 # Total number of control loops executed
59+
loop_duration_s: Optional[
60+
DurationStats
61+
] = None # Loop duration stats (rolling window)
62+
loops_per_second: float = 0.0 # Control loop iterations per second
63+
64+
# Sleep/scheduling metrics
65+
last_sleep_duration_s: float = 0.0 # Actual sleep duration of last iteration
66+
expected_sleep_duration_s: float = 0.0 # Expected sleep (CONTROL_LOOP_INTERVAL_S)
67+
event_loop_delay_s: float = 0.0 # Delay = actual - expected (positive = overloaded)
68+
69+
# Event loop health
70+
num_asyncio_tasks: int = 0 # Number of pending asyncio tasks
71+
72+
# Component update durations (rolling window stats)
73+
deployment_state_update_duration_s: Optional[DurationStats] = None
74+
application_state_update_duration_s: Optional[DurationStats] = None
75+
proxy_state_update_duration_s: Optional[DurationStats] = None
76+
node_update_duration_s: Optional[DurationStats] = None
77+
78+
# Autoscaling metrics latency tracking (rolling window stats)
79+
# These track the delay between when metrics are generated and when they reach controller
80+
handle_metrics_delay_ms: Optional[DurationStats] = None
81+
replica_metrics_delay_ms: Optional[DurationStats] = None
82+
83+
# Memory usage (in MB)
84+
process_memory_mb: float = 0.0
85+
86+
87+
@dataclass
88+
class ControllerHealthMetricsTracker:
89+
"""Tracker for collecting controller health metrics over time."""
90+
91+
controller_start_time: float = field(default_factory=time.time)
92+
93+
# Rolling history of loop durations
94+
loop_durations: Deque[float] = field(
95+
default_factory=lambda: deque(maxlen=_HEALTH_METRICS_HISTORY_SIZE)
96+
)
97+
98+
# Rolling history of metrics delays
99+
handle_metrics_delays: Deque[float] = field(
100+
default_factory=lambda: deque(maxlen=_HEALTH_METRICS_HISTORY_SIZE)
101+
)
102+
replica_metrics_delays: Deque[float] = field(
103+
default_factory=lambda: deque(maxlen=_HEALTH_METRICS_HISTORY_SIZE)
104+
)
105+
106+
# Rolling history of component update durations
107+
dsm_update_durations: Deque[float] = field(
108+
default_factory=lambda: deque(maxlen=_HEALTH_METRICS_HISTORY_SIZE)
109+
)
110+
asm_update_durations: Deque[float] = field(
111+
default_factory=lambda: deque(maxlen=_HEALTH_METRICS_HISTORY_SIZE)
112+
)
113+
proxy_update_durations: Deque[float] = field(
114+
default_factory=lambda: deque(maxlen=_HEALTH_METRICS_HISTORY_SIZE)
115+
)
116+
node_update_durations: Deque[float] = field(
117+
default_factory=lambda: deque(maxlen=_HEALTH_METRICS_HISTORY_SIZE)
118+
)
119+
120+
# Latest values (used in collect_metrics)
121+
last_sleep_duration_s: float = 0.0
122+
num_control_loops: int = 0
123+
124+
def record_loop_duration(self, duration: float):
125+
self.loop_durations.append(duration)
126+
127+
def record_handle_metrics_delay(self, delay_ms: float):
128+
self.handle_metrics_delays.append(delay_ms)
129+
130+
def record_replica_metrics_delay(self, delay_ms: float):
131+
self.replica_metrics_delays.append(delay_ms)
132+
133+
def record_dsm_update_duration(self, duration: float):
134+
self.dsm_update_durations.append(duration)
135+
136+
def record_asm_update_duration(self, duration: float):
137+
self.asm_update_durations.append(duration)
138+
139+
def record_proxy_update_duration(self, duration: float):
140+
self.proxy_update_durations.append(duration)
141+
142+
def record_node_update_duration(self, duration: float):
143+
self.node_update_durations.append(duration)
144+
145+
def collect_metrics(self) -> ControllerHealthMetrics:
146+
"""Collect and return current health metrics."""
147+
now = time.time()
148+
149+
# Calculate loop statistics from rolling history
150+
loop_duration_stats = DurationStats.from_values(list(self.loop_durations))
151+
152+
# Calculate loops per second based on uptime and total loops
153+
uptime = now - self.controller_start_time
154+
loops_per_second = self.num_control_loops / uptime if uptime > 0 else 0.0
155+
156+
# Calculate event loop delay (actual sleep - expected sleep)
157+
# Positive values indicate the event loop is overloaded
158+
event_loop_delay = max(
159+
0.0, self.last_sleep_duration_s - CONTROL_LOOP_INTERVAL_S
160+
)
161+
162+
# Get asyncio task count
163+
try:
164+
loop = asyncio.get_event_loop()
165+
num_asyncio_tasks = len(asyncio.all_tasks(loop))
166+
except RuntimeError:
167+
num_asyncio_tasks = 0
168+
169+
# Calculate metrics delay statistics
170+
handle_delay_stats = DurationStats.from_values(list(self.handle_metrics_delays))
171+
replica_delay_stats = DurationStats.from_values(
172+
list(self.replica_metrics_delays)
173+
)
174+
175+
# Calculate component update duration statistics
176+
dsm_update_stats = DurationStats.from_values(list(self.dsm_update_durations))
177+
asm_update_stats = DurationStats.from_values(list(self.asm_update_durations))
178+
proxy_update_stats = DurationStats.from_values(
179+
list(self.proxy_update_durations)
180+
)
181+
node_update_stats = DurationStats.from_values(list(self.node_update_durations))
182+
183+
# Get memory usage in MB
184+
# Note: ru_maxrss is in bytes on macOS but kilobytes on Linux
185+
# The resource module is Unix-only, so we handle Windows gracefully
186+
try:
187+
import resource
188+
189+
rusage = resource.getrusage(resource.RUSAGE_SELF)
190+
process_memory_mb = (
191+
rusage.ru_maxrss / (1024 * 1024) # Convert bytes to MB on macOS
192+
if sys.platform == "darwin"
193+
else rusage.ru_maxrss / 1024 # Convert KB to MB on Linux
194+
)
195+
except ImportError:
196+
# resource module not available on Windows
197+
process_memory_mb = 0.0
198+
199+
return ControllerHealthMetrics(
200+
timestamp=now,
201+
controller_start_time=self.controller_start_time,
202+
uptime_s=uptime,
203+
num_control_loops=self.num_control_loops,
204+
loop_duration_s=loop_duration_stats,
205+
loops_per_second=loops_per_second,
206+
last_sleep_duration_s=self.last_sleep_duration_s,
207+
expected_sleep_duration_s=CONTROL_LOOP_INTERVAL_S,
208+
event_loop_delay_s=event_loop_delay,
209+
num_asyncio_tasks=num_asyncio_tasks,
210+
deployment_state_update_duration_s=dsm_update_stats,
211+
application_state_update_duration_s=asm_update_stats,
212+
proxy_state_update_duration_s=proxy_update_stats,
213+
node_update_duration_s=node_update_stats,
214+
handle_metrics_delay_ms=handle_delay_stats,
215+
replica_metrics_delay_ms=replica_delay_stats,
216+
process_memory_mb=process_memory_mb,
217+
)

0 commit comments

Comments
 (0)