diff --git a/python/ray/serve/_private/deployment_state.py b/python/ray/serve/_private/deployment_state.py index 37c7a5f90d4c..ab6605aa40b1 100644 --- a/python/ray/serve/_private/deployment_state.py +++ b/python/ray/serve/_private/deployment_state.py @@ -1634,6 +1634,38 @@ def count( "Only one of `version` or `exclude_version` may be provided." ) + def remove(self, replica_ids: Set[ReplicaID]) -> List[DeploymentReplica]: + """Remove and return all replicas whose IDs are in the given set. + + Performs a single pass over the container. Non-matching replicas + stay in place without being re-added (so no spurious + ``update_state`` / ``update_actor_details`` calls). + + Args: + replica_ids: collection of ReplicaIDs to remove. + + Returns: + The list of removed DeploymentReplicas. + """ + replica_ids = set(replica_ids) + removed = [] + remaining_to_find = len(replica_ids) + for state in ALL_REPLICA_STATES: + if remaining_to_find == 0: + break + found_any = False + remaining = [] + for replica in self._replicas[state]: + if remaining_to_find > 0 and replica.replica_id in replica_ids: + removed.append(replica) + remaining_to_find -= 1 + found_any = True + else: + remaining.append(replica) + if found_any: + self._replicas[state] = remaining + return removed + def __str__(self): return str(self._replicas) @@ -3146,12 +3178,9 @@ def record_replica_startup_failure(self, error_msg: str): ) self._curr_status_info = self._curr_status_info.update_message(message) - def stop_replicas(self, replicas_to_stop) -> None: - for replica in self._replicas.pop(): - if replica.replica_id in replicas_to_stop: - self._stop_replica(replica) - else: - self._replicas.add(replica.actor_details.state, replica) + def stop_replicas(self, replicas_to_stop: Set[ReplicaID]) -> None: + for replica in self._replicas.remove(replicas_to_stop): + self._stop_replica(replica) def _stop_replica(self, replica: DeploymentReplica, graceful_stop=True): """Stop replica