Skip to content
13 changes: 11 additions & 2 deletions python/ray/serve/_private/deployment_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -997,8 +997,10 @@ def __init__(
state=ReplicaState.STARTING,
start_time_s=0,
)
self._multiplexed_model_ids: List = []
self._multiplexed_model_ids: List[str] = []
self._routing_stats: Dict[str, Any] = {}
self._cached_start_status: Optional[ReplicaStartupStatus] = None

def get_running_replica_info(
self, cluster_node_info_cache: ClusterNodeInfoCache
Expand Down Expand Up @@ -1131,6 +1133,7 @@ def check_started(
querying actor obj ref
"""
is_ready = self._actor.check_ready()
self._cached_start_status = is_ready[0]
self.update_actor_details(
pid=self._actor.pid,
node_id=self._actor.node_id,
Expand Down Expand Up @@ -2199,12 +2202,18 @@ def stop_replicas(self, replicas_to_stop) -> None:

def _stop_replica(self, replica: DeploymentReplica, graceful_stop=True):
"""Stop replica
1. Stop the replica.
1. Stop the replica - hard stop if unallocated, graceful if allocated.
2. Change the replica into stopping state.
3. Set the health replica stats to 0.
"""
logger.debug(f"Adding STOPPING to replica: {replica.replica_id}.")
replica.stop(graceful=graceful_stop)

start_status = replica._cached_start_status
if start_status == ReplicaStartupStatus.PENDING_ALLOCATION:
replica.stop(graceful=False)
else:
replica.stop(graceful=graceful_stop)

self._replicas.add(ReplicaState.STOPPING, replica)
self._deployment_scheduler.on_replica_stopping(replica.replica_id)
self.health_check_gauge.set(
Expand Down
68 changes: 65 additions & 3 deletions python/ray/serve/tests/test_deploy_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,14 @@

import ray
from ray import serve
from ray._common.test_utils import SignalActor, wait_for_condition
from ray.serve._private.common import DeploymentStatus
from ray._private.test_utils import SignalActor, wait_for_condition
from ray.serve._private.common import DeploymentID, DeploymentStatus
from ray.serve._private.logging_utils import get_serve_logs_dir
from ray.serve._private.test_utils import check_deployment_status, check_num_replicas_eq
from ray.serve._private.test_utils import (
check_deployment_status,
check_num_replicas_eq,
check_replica_counts,
)
from ray.serve._private.utils import get_component_file_name
from ray.serve.schema import ApplicationStatus
from ray.util.state import list_actors
Expand Down Expand Up @@ -377,5 +381,63 @@ def check_num_waiters(target: int):
print(time.time(), f"Confirmed number of replicas are at {i+1}.")


def test_unallocated_replica_shutdown(serve_instance):
"""Test that unallocated replicas are stopped immediately without waiting for graceful shutdown.

When a replica is in PENDING_ALLOCATION state (due to requiring resources that don't exist),
stopping it should be immediate and not wait for the graceful shutdown timeout.
"""

application_name = "CustomApplication"
deployment_name = "CustomResourceDeployment"

@serve.deployment(
ray_actor_options={"resources": {"custom": 1}}, # Require non-existent resource
graceful_shutdown_timeout_s=100,
)
class CustomResourceDeployment:
def __init__(self):
self.ready = False

def __call__(self):
return "ready"

serve._run(CustomResourceDeployment.bind(), name=application_name, _blocking=False)
wait_for_condition(
check_deployment_status,
app_name=application_name,
name=deployment_name,
expected_status=DeploymentStatus.UPDATING,
)
wait_for_condition(
check_replica_counts,
controller=serve_instance._controller,
deployment_id=DeploymentID(name=deployment_name, app_name=application_name),
total=1,
)

start_time = time.time()
serve.delete(application_name, _blocking=False)

def check_actor_stopped():
current_actors = [
actor
for actor in list_actors(filters=[("state", "=", "DEAD")])
if deployment_name in actor["name"]
]
return len(current_actors) == 1

wait_for_condition(check_actor_stopped, timeout=15)
deletion_time = time.time() - start_time

# deletion_time should be under the graceful shutdown timeout
assert deletion_time < 15, f"Deletion took {deletion_time}s, expected < 15s"

def check_application_removed():
return application_name not in serve.status().applications

wait_for_condition(check_application_removed, timeout=15)


if __name__ == "__main__":
sys.exit(pytest.main(["-v", "-s", __file__]))
43 changes: 43 additions & 0 deletions python/ray/serve/tests/unit/test_deployment_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -4800,6 +4800,49 @@ def test_in_place_update_during_draining(self, mock_deployment_state_manager):
],
)

def test_stop_unallocated_replica(self, mock_deployment_state_manager):
"""Test that replicas are stopped with correct graceful value based on their state.

When stopping a replica:
- If it's in PENDING_ALLOCATION state, stop() should be called with graceful=False
- Otherwise, stop() should be called with the provided graceful_stop value
"""
create_dsm, _, _, _ = mock_deployment_state_manager
dsm = create_dsm()
info1, v1 = deployment_info(version="1")
assert dsm.deploy(TEST_DEPLOYMENT_ID, info1)
ds = dsm._deployment_states[TEST_DEPLOYMENT_ID]

dsm.update()
check_counts(ds, total=1, by_state=[(ReplicaState.STARTING, 1, v1)])
replica = ds._replicas.get(states=[ReplicaState.STARTING])[0]

stop_graceful_value = None
original_stop = replica.stop

def mock_stop(graceful):
nonlocal stop_graceful_value
stop_graceful_value = graceful
return original_stop(graceful=graceful)

replica.stop = mock_stop

# Test Case 1: PENDING_ALLOCATION state
def mock_pending_allocation():
return ReplicaStartupStatus.PENDING_ALLOCATION, None

replica.check_started = mock_pending_allocation
ds.stop_replicas([replica.replica_id])
assert stop_graceful_value is False

# Test Case 2: SUCCEEDED state
def mock_pending_initialization():
return ReplicaStartupStatus.PENDING_INITIALIZATION, None

replica.check_started = mock_pending_initialization
ds.stop_replicas([replica.replica_id])
assert stop_graceful_value is True


def test_docs_path_not_updated_for_different_version(mock_deployment_state_manager):
# Create deployment state manager
Expand Down