Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions python/ray/serve/_private/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,13 @@
"RAY_SERVE_FORCE_STOP_UNHEALTHY_REPLICAS", "0"
)

# How often (in seconds) the controller re-records an unchanged health-check
# gauge value for each replica. Setting this to 0 disables caching (every loop
# iteration records the gauge, matching pre-optimization behavior).
RAY_SERVE_HEALTH_GAUGE_REPORT_INTERVAL_S = get_env_float_non_negative(
"RAY_SERVE_HEALTH_GAUGE_REPORT_INTERVAL_S", 10.0
)

# Initial deadline for queue length responses in the router.
RAY_SERVE_QUEUE_LENGTH_RESPONSE_DEADLINE_S = get_env_float(
"RAY_SERVE_QUEUE_LENGTH_RESPONSE_DEADLINE_S", 0.1
Expand Down
56 changes: 38 additions & 18 deletions python/ray/serve/_private/deployment_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
RAY_SERVE_ENABLE_TASK_EVENTS,
RAY_SERVE_FAIL_ON_RANK_ERROR,
RAY_SERVE_FORCE_STOP_UNHEALTHY_REPLICAS,
RAY_SERVE_HEALTH_GAUGE_REPORT_INTERVAL_S,
RAY_SERVE_USE_PACK_SCHEDULING_STRATEGY,
REPLICA_HEALTH_CHECK_UNHEALTHY_THRESHOLD,
REPLICA_STARTUP_SHUTDOWN_LATENCY_BUCKETS_MS,
Expand Down Expand Up @@ -2178,6 +2179,14 @@ def __init__(

self.replica_average_ongoing_requests: Dict[str, float] = {}

# Cache the last-reported health gauge value and timestamp per replica.
# This avoids redundant Gauge.set() calls on every control loop
# iteration, which are expensive at scale (O(num_replicas) Cython
# FFI calls per loop). We only call Gauge.set() when the value
# changes or the cache entry is older than _HEALTH_GAUGE_REPORT_INTERVAL_S
# (to ensure the metric is re-exported within each Prometheus scrape window).
self._health_gauge_cache: Dict[str, Tuple[int, float]] = {}

self.health_check_gauge = metrics.Gauge(
"serve_deployment_replica_healthy",
description=(
Expand Down Expand Up @@ -3146,6 +3155,31 @@ def record_replica_startup_failure(self, error_msg: str):
)
self._curr_status_info = self._curr_status_info.update_message(message)

def _set_health_gauge(self, replica_unique_id: str, value: int) -> None:
"""Set the health-check gauge for *replica_unique_id*, skipping the
(expensive) Cython ``Gauge.set()`` call when the value hasn't changed
and was recently reported.

In large clusters this avoids O(num_replicas) redundant FFI calls on
every control-loop iteration while still refreshing the metric often
enough for Prometheus export.
"""
now = time.time()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

time.time() is not guaranteed to be monotonic. If the system clock is adjusted backwards, now - cached[1] could be negative, which could lead to the gauge not being updated for a long time, even past the intended interval. It's recommended to use time.monotonic() for measuring time durations to avoid this issue.

Suggested change
now = time.time()
now = time.monotonic()

cached = self._health_gauge_cache.get(replica_unique_id)
if (
cached is not None
and cached[0] == value
and (now - cached[1]) < RAY_SERVE_HEALTH_GAUGE_REPORT_INTERVAL_S
):
return
self.health_check_gauge.set(value, tags={"replica": replica_unique_id})
self._health_gauge_cache[replica_unique_id] = (value, now)

def _clear_health_gauge_cache(self, replica_unique_id: str) -> None:
"""Remove a replica from the health-gauge cache (after it has
fully stopped and been removed from tracking)."""
self._health_gauge_cache.pop(replica_unique_id, None)

def stop_replicas(self, replicas_to_stop) -> None:
for replica in self._replicas.pop():
if replica.replica_id in replicas_to_stop:
Expand All @@ -3163,12 +3197,7 @@ def _stop_replica(self, replica: DeploymentReplica, graceful_stop=True):
replica.stop(graceful=graceful_stop)
self._replicas.add(ReplicaState.STOPPING, replica)
self._deployment_scheduler.on_replica_stopping(replica.replica_id)
self.health_check_gauge.set(
0,
tags={
"replica": replica.replica_id.unique_id,
},
)
self._set_health_gauge(replica.replica_id.unique_id, 0)

def check_and_update_replicas(self):
"""
Expand All @@ -3195,24 +3224,14 @@ def check_and_update_replicas(self):

if is_healthy:
self._replicas.add(replica.actor_details.state, replica)
self.health_check_gauge.set(
1,
tags={
"replica": replica.replica_id.unique_id,
},
)
self._set_health_gauge(replica.replica_id.unique_id, 1)
routing_stats = replica.pull_routing_stats()
replica.record_routing_stats(routing_stats)
else:
logger.warning(
f"Replica {replica.replica_id} failed health check, stopping it."
)
self.health_check_gauge.set(
0,
tags={
"replica": replica.replica_id.unique_id,
},
)
self._set_health_gauge(replica.replica_id.unique_id, 0)
self._stop_replica(
replica, graceful_stop=not self.FORCE_STOP_UNHEALTHY_REPLICAS
)
Expand Down Expand Up @@ -3319,6 +3338,7 @@ def check_and_update_replicas(self):
# Release rank only after replica is successfully stopped
# This ensures rank is available during draining/graceful shutdown
replica_id = replica.replica_id.unique_id
self._clear_health_gauge_cache(replica_id)
if self._rank_manager.has_replica_rank(replica_id):
# Only release rank if assigned. Replicas that failed allocation
# or never reached RUNNING state won't have ranks.
Expand Down
1 change: 1 addition & 0 deletions python/ray/serve/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ py_test_module_list(
"RAY_SERVE_REPLICA_UTILIZATION_WINDOW_S": "1",
"RAY_SERVE_REPLICA_UTILIZATION_REPORT_INTERVAL_S": "1",
"RAY_SERVE_REPLICA_UTILIZATION_NUM_BUCKETS": "10",
"RAY_SERVE_HEALTH_GAUGE_REPORT_INTERVAL_S": "0.1",
},
files = [
"test_deploy_app.py",
Expand Down
1 change: 1 addition & 0 deletions python/ray/serve/tests/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -892,6 +892,7 @@ def h():
wait_for_condition(
lambda: len(get_metric_dictionaries("ray_serve_deployment_replica_healthy"))
== 3,
timeout=40,
)
health_metrics = get_metric_dictionaries("ray_serve_deployment_replica_healthy")
expected_output = {
Expand Down
85 changes: 85 additions & 0 deletions python/ray/serve/tests/unit/test_deployment_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
DEFAULT_HEALTH_CHECK_TIMEOUT_S,
DEFAULT_MAX_ONGOING_REQUESTS,
RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE,
RAY_SERVE_HEALTH_GAUGE_REPORT_INTERVAL_S,
)
from ray.serve._private.deployment_info import DeploymentInfo
from ray.serve._private.deployment_scheduler import ReplicaSchedulingRequest
Expand Down Expand Up @@ -1947,6 +1948,90 @@ def test_health_check(
assert ds.curr_status_info.status_trigger == DeploymentStatusTrigger.UNSPECIFIED


def test_health_gauge_caching(mock_deployment_state_manager):
"""Test that the health gauge is only set when the value changes.

The _health_gauge_cache avoids redundant Gauge.set() calls on every
control-loop iteration, which are expensive at scale.
"""
create_dsm, timer, _, _ = mock_deployment_state_manager
dsm: DeploymentStateManager = create_dsm()

b_info_1, v1 = deployment_info(num_replicas=2, version="1")
assert dsm.deploy(TEST_DEPLOYMENT_ID, b_info_1)
ds = dsm._deployment_states[TEST_DEPLOYMENT_ID]

dsm.update()
check_counts(ds, total=2, by_state=[(ReplicaState.STARTING, 2, v1)])

for replica in ds._replicas.get():
replica._actor.set_ready()

# First update: _check_startup_replicas transitions STARTING -> RUNNING.
# check_and_update_replicas hasn't seen them as RUNNING yet.
dsm.update()
check_counts(ds, total=2, by_state=[(ReplicaState.RUNNING, 2, v1)])

# Second update: check_and_update_replicas processes the RUNNING replicas
# for the first time, calling check_health() and setting the gauge.
dsm.update()

replica_ids = [r.replica_id.unique_id for r in ds._replicas.get()]
# After the second update the cache should have (value=1, timestamp) for both.
for rid in replica_ids:
cached_value, cached_time = ds._health_gauge_cache[rid]
assert cached_value == 1

# Track how many times Gauge.set is called using a wrapper.
original_set = ds.health_check_gauge.set
call_count = 0

def counting_set(*args, **kwargs):
nonlocal call_count
call_count += 1
return original_set(*args, **kwargs)

ds.health_check_gauge.set = counting_set

# Subsequent updates with all-healthy replicas should NOT call Gauge.set
# because the cache already has value 1 for each replica (within TTL).
dsm.update()
dsm.update()
dsm.update()
assert call_count == 0, (
f"Gauge.set was called {call_count} times for already-healthy replicas; "
"expected 0 (should be cached)"
)

# After the TTL expires, the gauge should be re-reported even though
# the value hasn't changed.
timer.advance(RAY_SERVE_HEALTH_GAUGE_REPORT_INTERVAL_S + 1)
dsm.update()
assert call_count == len(replica_ids), (
f"Gauge.set was called {call_count} times after TTL expired; "
f"expected {len(replica_ids)} (one per replica)"
)

# Mark one replica unhealthy — gauge should transition to 0.
call_count = 0
ds._replicas.get()[0]._actor.set_unhealthy()
dsm.update()
# Gauge.set should have been called at least once (for the now-unhealthy replica).
assert call_count >= 1
# The stopping replica should have cache value 0.
stopping = ds._replicas.get(states=[ReplicaState.STOPPING])
assert len(stopping) == 1
cached_value, _ = ds._health_gauge_cache[stopping[0].replica_id.unique_id]
assert cached_value == 0

# After the stopped replica is fully removed, its cache entry should be cleaned up.
stopped_id = stopping[0].replica_id.unique_id
stopping[0]._actor.set_done_stopping()
call_count = 0
dsm.update()
assert stopped_id not in ds._health_gauge_cache


def test_update_while_unhealthy(mock_deployment_state_manager):
create_dsm, _, _, _ = mock_deployment_state_manager
dsm: DeploymentStateManager = create_dsm()
Expand Down