From c48b5d9dcc75c4e3767b8b99187eaff1f50eb263 Mon Sep 17 00:00:00 2001 From: abrar Date: Sat, 7 Feb 2026 07:48:42 +0000 Subject: [PATCH 1/2] [Serve] Optimize stop_replicas() to avoid pop-all/re-add cycle Signed-off-by: abrar --- python/ray/serve/_private/deployment_state.py | 26 ++++++++++++++++--- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/python/ray/serve/_private/deployment_state.py b/python/ray/serve/_private/deployment_state.py index 37c7a5f90d4c..f3a4d102553f 100644 --- a/python/ray/serve/_private/deployment_state.py +++ b/python/ray/serve/_private/deployment_state.py @@ -1634,6 +1634,25 @@ def count( "Only one of `version` or `exclude_version` may be provided." ) + def remove(self, replica_id: ReplicaID) -> Optional[DeploymentReplica]: + """Remove and return a replica by its ID. + + Searches across all states for the replica with the given ID and + removes it from the container. + + Args: + replica_id: the ReplicaID of the replica to remove. + + Returns: + The removed DeploymentReplica, or None if not found. + """ + for state in ALL_REPLICA_STATES: + replicas = self._replicas[state] + for i, replica in enumerate(replicas): + if replica.replica_id == replica_id: + return replicas.pop(i) + return None + def __str__(self): return str(self._replicas) @@ -3147,11 +3166,10 @@ def record_replica_startup_failure(self, error_msg: str): self._curr_status_info = self._curr_status_info.update_message(message) def stop_replicas(self, replicas_to_stop) -> None: - for replica in self._replicas.pop(): - if replica.replica_id in replicas_to_stop: + for replica_id in replicas_to_stop: + replica = self._replicas.remove(replica_id) + if replica is not None: self._stop_replica(replica) - else: - self._replicas.add(replica.actor_details.state, replica) def _stop_replica(self, replica: DeploymentReplica, graceful_stop=True): """Stop replica From 7530aa74328eb6d8fad7f51893fcbaba9fe9764d Mon Sep 17 00:00:00 2001 From: abrar Date: Sat, 7 Feb 2026 08:14:02 +0000 Subject: [PATCH 2/2] add types Signed-off-by: abrar --- python/ray/serve/_private/deployment_state.py | 43 ++++++++++++------- 1 file changed, 27 insertions(+), 16 deletions(-) diff --git a/python/ray/serve/_private/deployment_state.py b/python/ray/serve/_private/deployment_state.py index f3a4d102553f..ab6605aa40b1 100644 --- a/python/ray/serve/_private/deployment_state.py +++ b/python/ray/serve/_private/deployment_state.py @@ -1634,24 +1634,37 @@ def count( "Only one of `version` or `exclude_version` may be provided." ) - def remove(self, replica_id: ReplicaID) -> Optional[DeploymentReplica]: - """Remove and return a replica by its ID. + def remove(self, replica_ids: Set[ReplicaID]) -> List[DeploymentReplica]: + """Remove and return all replicas whose IDs are in the given set. - Searches across all states for the replica with the given ID and - removes it from the container. + Performs a single pass over the container. Non-matching replicas + stay in place without being re-added (so no spurious + ``update_state`` / ``update_actor_details`` calls). Args: - replica_id: the ReplicaID of the replica to remove. + replica_ids: collection of ReplicaIDs to remove. Returns: - The removed DeploymentReplica, or None if not found. + The list of removed DeploymentReplicas. """ + replica_ids = set(replica_ids) + removed = [] + remaining_to_find = len(replica_ids) for state in ALL_REPLICA_STATES: - replicas = self._replicas[state] - for i, replica in enumerate(replicas): - if replica.replica_id == replica_id: - return replicas.pop(i) - return None + if remaining_to_find == 0: + break + found_any = False + remaining = [] + for replica in self._replicas[state]: + if remaining_to_find > 0 and replica.replica_id in replica_ids: + removed.append(replica) + remaining_to_find -= 1 + found_any = True + else: + remaining.append(replica) + if found_any: + self._replicas[state] = remaining + return removed def __str__(self): return str(self._replicas) @@ -3165,11 +3178,9 @@ def record_replica_startup_failure(self, error_msg: str): ) self._curr_status_info = self._curr_status_info.update_message(message) - def stop_replicas(self, replicas_to_stop) -> None: - for replica_id in replicas_to_stop: - replica = self._replicas.remove(replica_id) - if replica is not None: - self._stop_replica(replica) + def stop_replicas(self, replicas_to_stop: Set[ReplicaID]) -> None: + for replica in self._replicas.remove(replicas_to_stop): + self._stop_replica(replica) def _stop_replica(self, replica: DeploymentReplica, graceful_stop=True): """Stop replica