Skip to content

Commit 55da6cc

Browse files
Implement RESTART_GANG runtime failure policy
Signed-off-by: jeffreywang <jeffreywang@anyscale.com>
1 parent 1a4f124 commit 55da6cc

File tree

7 files changed

+241
-80
lines changed

7 files changed

+241
-80
lines changed

python/ray/serve/_private/common.py

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -879,11 +879,7 @@ class GangContext:
879879

880880
@dataclass
881881
class GangPlacementGroupRequest:
882-
"""Request to prepare gang placement groups for a deployment.
883-
884-
Used in Step 3.5 of DeploymentStateManager.update() to pre-create
885-
placement groups before replicas are created in Step 4.
886-
"""
882+
"""Request to prepare gang placement groups for a deployment."""
887883

888884
deployment_id: DeploymentID
889885
gang_size: int
@@ -894,10 +890,7 @@ class GangPlacementGroupRequest:
894890

895891
@dataclass
896892
class GangPreparationResult:
897-
"""Result of gang placement group preparation.
898-
899-
Contains either successfully created placement groups or error information.
900-
"""
893+
"""Result of gang placement group preparation."""
901894

902895
success: bool
903896
error_message: Optional[str] = None

python/ray/serve/_private/deployment_scheduler.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import copy
22
import logging
33
import sys
4+
import uuid
45
import warnings
56
from abc import ABC, abstractmethod
67
from collections import defaultdict
@@ -1028,7 +1029,8 @@ def _prepare_gangs_for_deployment(
10281029
]
10291030

10301031
pg_name = (
1031-
f"gang_{deployment_id.app_name}_{deployment_id.name}_{gang_index}"
1032+
f"gang_{deployment_id.app_name}_{deployment_id.name}"
1033+
f"_{gang_index}_{uuid.uuid4().hex[:8]}"
10321034
)
10331035
strategy = request.gang_placement_strategy
10341036

python/ray/serve/_private/deployment_state.py

Lines changed: 101 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
RunningReplicaInfo,
3838
)
3939
from ray.serve._private.config import DeploymentConfig
40+
from ray.serve.config import GangRuntimeFailurePolicy
4041
from ray.serve._private.constants import (
4142
DEFAULT_LATENCY_BUCKET_MS,
4243
MAX_PER_REPLICA_RETRY_COUNT,
@@ -999,7 +1000,13 @@ def check_stopped(self) -> bool:
9991000
# Remove the placement group both if the actor has already been deleted or
10001001
# it was just killed above.
10011002
if stopped and self._placement_group is not None:
1002-
ray.util.remove_placement_group(self._placement_group)
1003+
try:
1004+
ray.util.remove_placement_group(self._placement_group)
1005+
except Exception:
1006+
# Gang PGs are shared across multiple replicas.
1007+
# Another replica in the same gang may have already
1008+
# removed this PG.
1009+
pass
10031010

10041011
return stopped
10051012

@@ -3043,13 +3050,18 @@ def _add_replicas_with_gang_scheduling(
30433050
"""
30443051
upscale = []
30453052

3046-
# Check if gang PG preparation failed
3047-
if gang_prep_result is None or not gang_prep_result.success:
3048-
error_msg = (
3049-
gang_prep_result.error_message
3050-
if gang_prep_result
3051-
else "Gang placement groups were not prepared"
3053+
# PG prep was not attempted (e.g. replicas still stopping).
3054+
# Skip replica creation and retry in the next reconciliation loop.
3055+
if gang_prep_result is None:
3056+
logger.info(
3057+
f"Gang PG preparation was skipped for {self._id}. "
3058+
"Will retry in the next reconciliation loop."
30523059
)
3060+
return upscale
3061+
3062+
# PG prep was attempted but failed (resources insufficient).
3063+
if not gang_prep_result.success:
3064+
error_msg = gang_prep_result.error_message or "Unknown error"
30533065
logger.error(
30543066
f"Gang scheduling failed for {self._id}: {error_msg}. "
30553067
"Skipping replica creation."
@@ -3357,7 +3369,22 @@ def check_and_update_replicas(self):
33573369
transition happened.
33583370
"""
33593371

3360-
# TODO (jeffreywang): Implement gang health check and runtime failure policy here.
3372+
gang_config = self.get_gang_config()
3373+
restart_gang = (
3374+
gang_config is not None
3375+
and gang_config.runtime_failure_policy
3376+
== GangRuntimeFailurePolicy.RESTART_GANG
3377+
)
3378+
3379+
# --- Gang health check: two-pass approach ---
3380+
# Pass 1: Check health of all replicas. Collect which gang_ids
3381+
# have at least one unhealthy member.
3382+
# Pass 2: Process results. Healthy replicas whose gang has an
3383+
# unhealthy member are forcefully stopped too.
3384+
healthy_replicas: List[DeploymentReplica] = []
3385+
unhealthy_replicas: List[DeploymentReplica] = []
3386+
gang_ids_to_restart: Set[str] = set()
3387+
33613388
for replica in self._replicas.pop(
33623389
states=[ReplicaState.RUNNING, ReplicaState.PENDING_MIGRATION]
33633390
):
@@ -3375,6 +3402,38 @@ def check_and_update_replicas(self):
33753402
self.health_check_failures_counter.inc(tags=metric_tags)
33763403

33773404
if is_healthy:
3405+
healthy_replicas.append(replica)
3406+
else:
3407+
unhealthy_replicas.append(replica)
3408+
if restart_gang and replica.gang_context is not None:
3409+
gang_ids_to_restart.add(replica.gang_context.gang_id)
3410+
3411+
# Pass 2: process healthy replicas.
3412+
for replica in healthy_replicas:
3413+
if (
3414+
restart_gang
3415+
and replica.gang_context is not None
3416+
and replica.gang_context.gang_id in gang_ids_to_restart
3417+
):
3418+
# Healthy replica whose gang has an unhealthy member.
3419+
# Forcefully stop it so the entire gang is rescheduled.
3420+
logger.warning(
3421+
f"Replica {replica.replica_id} is healthy but its gang "
3422+
f"(gang_id={replica.gang_context.gang_id}) has an "
3423+
"unhealthy replica. Forcefully stopping it because "
3424+
"RESTART_GANG runtime failure policy is enabled."
3425+
)
3426+
self._stop_replica(replica, graceful_stop=False)
3427+
if replica.version == self._target_state.version:
3428+
self._curr_status_info = (
3429+
self._curr_status_info.handle_transition(
3430+
trigger=DeploymentStatusInternalTrigger.HEALTH_CHECK_FAILED,
3431+
message="A replica's health check failed. This "
3432+
"deployment will be UNHEALTHY until the replica "
3433+
"recovers or a new deploy happens.",
3434+
)
3435+
)
3436+
else:
33783437
self._replicas.add(replica.actor_details.state, replica)
33793438
self.health_check_gauge.set(
33803439
1,
@@ -3384,29 +3443,33 @@ def check_and_update_replicas(self):
33843443
)
33853444
routing_stats = replica.pull_routing_stats()
33863445
replica.record_routing_stats(routing_stats)
3387-
else:
3388-
logger.warning(
3389-
f"Replica {replica.replica_id} failed health check, stopping it."
3390-
)
3391-
self.health_check_gauge.set(
3392-
0,
3393-
tags={
3394-
"replica": replica.replica_id.unique_id,
3395-
},
3396-
)
3397-
self._stop_replica(
3398-
replica, graceful_stop=not self.FORCE_STOP_UNHEALTHY_REPLICAS
3446+
3447+
# Process unhealthy replicas with force-stop for gang replicas under
3448+
# RESTART_GANG policy.
3449+
for replica in unhealthy_replicas:
3450+
logger.warning(
3451+
f"Replica {replica.replica_id} failed health check, stopping it."
3452+
)
3453+
self.health_check_gauge.set(
3454+
0,
3455+
tags={
3456+
"replica": replica.replica_id.unique_id,
3457+
},
3458+
)
3459+
graceful = not self.FORCE_STOP_UNHEALTHY_REPLICAS
3460+
if restart_gang and replica.gang_context is not None:
3461+
graceful = False
3462+
self._stop_replica(replica, graceful_stop=graceful)
3463+
# If this is a replica of the target version, the deployment
3464+
# enters the "UNHEALTHY" status until the replica is
3465+
# recovered or a new deploy happens.
3466+
if replica.version == self._target_state.version:
3467+
self._curr_status_info = self._curr_status_info.handle_transition(
3468+
trigger=DeploymentStatusInternalTrigger.HEALTH_CHECK_FAILED,
3469+
message="A replica's health check failed. This "
3470+
"deployment will be UNHEALTHY until the replica "
3471+
"recovers or a new deploy happens.",
33993472
)
3400-
# If this is a replica of the target version, the deployment
3401-
# enters the "UNHEALTHY" status until the replica is
3402-
# recovered or a new deploy happens.
3403-
if replica.version == self._target_state.version:
3404-
self._curr_status_info = self._curr_status_info.handle_transition(
3405-
trigger=DeploymentStatusInternalTrigger.HEALTH_CHECK_FAILED,
3406-
message="A replica's health check failed. This "
3407-
"deployment will be UNHEALTHY until the replica "
3408-
"recovers or a new deploy happens.",
3409-
)
34103473

34113474
slow_start_replicas = []
34123475
slow_start = self._check_startup_replicas(ReplicaState.STARTING)
@@ -4316,6 +4379,14 @@ def _prepare_gang_placement_groups(
43164379
if deployment_state._terminally_failed():
43174380
continue
43184381

4382+
# Skip if deployment has replicas still stopping. Their resources
4383+
# haven't been released yet, so PG creation would likely fail or
4384+
# block waiting for resources. We'll retry next reconciliation loop.
4385+
if deployment_state._replicas.count(
4386+
states=[ReplicaState.STOPPING]
4387+
) > 0:
4388+
continue
4389+
43194390
gang_requests[deployment_id] = GangPlacementGroupRequest(
43204391
deployment_id=deployment_id,
43214392
gang_size=gang_config.gang_size,

python/ray/serve/config.py

Lines changed: 13 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -797,37 +797,17 @@ class GangRuntimeFailurePolicy(str, Enum):
797797
"""Policy for handling runtime failures of replicas in a gang."""
798798

799799
RESTART_GANG = "RESTART_GANG"
800-
"""Kill and restart entire gang atomically when any replica fails.
801-
Use for: Tightly coupled systems where partial gang is useless.
802-
Ensures consistency but higher recovery time."""
800+
"""Tear down and restart entire gang atomically when any replica fails."""
803801

804802
RESTART_REPLICA = "RESTART_REPLICA"
805-
"""Kill and restart individual replica when it fails.
806-
Use for: Systems that can tolerate partial gang availability.
807-
Faster recovery but may result in inconsistent state."""
803+
"""Tear down and restart individual replica when it fails. Other replicas in the gang will continue running."""
808804

809805

810806
@PublicAPI(stability="alpha")
811807
class GangSchedulingConfig(BaseModel):
812-
"""Configuration for gang scheduling of deployment replicas.
813-
Gang scheduling ensures that groups of replicas are scheduled together
814-
atomically, which is essential for distributed workloads that require
815-
coordination between replicas.
816-
Example:
817-
.. code-block:: python
818-
from ray import serve
819-
from ray.serve.config import GangSchedulingConfig, GangPlacementStrategy
820-
@serve.deployment(
821-
num_replicas=8,
822-
gang_scheduling_config=GangSchedulingConfig(
823-
gang_size=4,
824-
gang_placement_strategy=GangPlacementStrategy.STRICT_PACK,
825-
runtime_failure_policy=GangRuntimeFailurePolicy.RESTART_GANG
826-
)
827-
)
828-
class MyDeployment:
829-
pass
830-
"""
808+
"""Configuration for gang scheduling of deployment replicas."""
809+
810+
# Please keep these options in sync with those in `src/ray/protobuf/serve.proto`.
831811

832812
gang_size: int = Field(
833813
description=(
@@ -854,3 +834,11 @@ class MyDeployment:
854834
"RESTART_REPLICA: kill and restart individual replica."
855835
),
856836
)
837+
838+
@validator("runtime_failure_policy", always=True)
839+
def _validate_runtime_failure_policy(cls, v):
840+
if v == GangRuntimeFailurePolicy.RESTART_REPLICA:
841+
raise NotImplementedError(
842+
"RESTART_REPLICA policy is not yet implemented."
843+
)
844+
return v

python/ray/serve/tests/test_deployment_scheduler.py

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -951,25 +951,18 @@ def __call__(self):
951951
assert len(gangs) == 2
952952

953953
for gang_id, members in gangs.items():
954-
# Each gang should have exactly 2 replicas
955954
assert len(members) == 2
956-
957-
# All members should have the same world_size
958955
assert all(member["world_size"] == 2 for member in members)
959-
960-
# All members should have the same member_replica_ids
961956
assert members[0]["member_replica_ids"] == members[1]["member_replica_ids"]
962957

963-
# member_replica_ids should contain exactly the 2 replica IDs in this gang
964958
expected_ids = sorted([m["replica_id"] for m in members])
965959
actual_ids = sorted(members[0]["member_replica_ids"])
966960
assert actual_ids == expected_ids
967961

968-
# Ranks within the gang should be {0, 1}
969962
ranks = sorted([m["rank"] for m in members])
970963
assert ranks == [0, 1]
971964

972-
# Across gangs: gang_ids should be different (already guaranteed by dict keys)
965+
# Across gangs: gang_ids should be different
973966
gang_ids = list(gangs.keys())
974967
assert gang_ids[0] != gang_ids[1]
975968

0 commit comments

Comments
 (0)