Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions ci/lint/pydoclint-baseline.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1513,9 +1513,6 @@ python/ray/serve/_private/controller.py
python/ray/serve/_private/deploy_utils.py
DOC201: Function `get_app_code_version` does not have a return section in docstring
--------------------
python/ray/serve/_private/deployment_scheduler.py
DOC201: Method `DeploymentScheduler._schedule_replica` does not have a return section in docstring
--------------------
python/ray/serve/_private/deployment_state.py
DOC201: Method `ReplicaStateContainer.get` does not have a return section in docstring
DOC201: Method `ReplicaStateContainer.pop` does not have a return section in docstring
Expand Down
74 changes: 64 additions & 10 deletions python/ray/serve/_private/deployment_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,7 @@ def _schedule_replica(
default_scheduling_strategy: str,
target_node_id: Optional[str] = None,
target_labels: Optional[LabelMatchExpressionsT] = None,
):
) -> bool:
"""Schedule a replica from a scheduling request.

The following special scheduling strategies will be used, in
Expand All @@ -555,6 +555,9 @@ def _schedule_replica(
target node.
target_labels: Attempt to schedule this replica onto nodes
with these target labels.

Returns:
True if the replica was successfully scheduled, False otherwise.
"""

replica_id = scheduling_request.replica_id
Expand Down Expand Up @@ -588,7 +591,7 @@ def _schedule_replica(
scheduling_request.status = (
ReplicaSchedulingRequestStatus.PLACEMENT_GROUP_CREATION_FAILED
)
return
return False
scheduling_strategy = PlacementGroupSchedulingStrategy(
placement_group=pg,
placement_group_capture_child_tasks=True,
Expand Down Expand Up @@ -629,7 +632,7 @@ def _schedule_replica(
scheduling_request.status = (
ReplicaSchedulingRequestStatus.ACTOR_CREATION_FAILED
)
return
return False

del self._pending_replicas[deployment_id][replica_id]
self._on_replica_launching(
Expand All @@ -641,6 +644,7 @@ def _schedule_replica(

scheduling_request.status = ReplicaSchedulingRequestStatus.SUCCEEDED
scheduling_request.on_scheduled(actor_handle, placement_group=placement_group)
return True

@abstractmethod
def get_node_to_compact(
Expand Down Expand Up @@ -728,8 +732,31 @@ def _schedule_with_pack_strategy(self):
for node_id in active_nodes
}

# Compute available resources once upfront and update incrementally
# as replicas are scheduled.
# Complexity is O(launching + running + nodes + requests * nodes).
available_resources_per_node = self._get_available_resources_per_node()

# Running replicas don't change during scheduling, so compute once.
node_to_running_replicas = self._get_node_to_running_replicas()

for scheduling_request in all_scheduling_requests:
self._pack_schedule_replica(scheduling_request, all_node_labels)
target_node = self._pack_schedule_replica(
scheduling_request,
all_node_labels,
available_resources_per_node,
node_to_running_replicas,
)
# Incrementally update available resources for the target node.
# This is slightly conservative compared to recomputing from
# scratch (since we subtract from the min of GCS and calculated
# resources rather than only from the calculated side), but
# _get_available_resources_per_node is already best-effort.
if target_node and target_node in available_resources_per_node:
available_resources_per_node[target_node] = (
available_resources_per_node[target_node]
- scheduling_request.required_resources
)

def _schedule_with_spread_strategy(self):
"""Tries to schedule pending replicas using the SPREAD strategy."""
Expand All @@ -747,28 +774,45 @@ def _pack_schedule_replica(
self,
scheduling_request: ReplicaSchedulingRequest,
all_node_labels: Dict[str, Dict[str, str]],
):
"""Attempts to schedule a single request on the best available node."""
available_resources_per_node: Dict[str, Resources],
node_to_running_replicas: Dict[str, Set[ReplicaID]],
) -> Optional[str]:
"""Attempts to schedule a single request on the best available node.

Args:
scheduling_request: The replica scheduling request.
all_node_labels: Labels for all active nodes.
available_resources_per_node: Pre-computed available resources
per node.
node_to_running_replicas: Pre-computed mapping of node IDs to
running replica IDs.

Returns:
The target node ID if scheduling succeeded, None otherwise.
"""

placement_candidates = self._build_pack_placement_candidates(scheduling_request)

target_node = None
for required_resources, required_labels in placement_candidates:
target_node = self._find_best_fit_node_for_pack(
required_resources,
self._get_available_resources_per_node(),
available_resources_per_node,
node_to_running_replicas,
required_labels_list=required_labels,
node_labels=all_node_labels,
)
if target_node:
break

self._schedule_replica(
succeeded = self._schedule_replica(
scheduling_request,
default_scheduling_strategy="DEFAULT",
target_node_id=target_node,
)

return target_node if succeeded else None

def _build_pack_placement_candidates(
self, scheduling_request: ReplicaSchedulingRequest
) -> List[Tuple[Resources, List[Dict[str, str]]]]:
Expand Down Expand Up @@ -907,6 +951,7 @@ def _find_best_fit_node_for_pack(
self,
required_resources: Resources,
available_resources_per_node: Dict[str, Resources],
node_to_running_replicas: Dict[str, Set[ReplicaID]],
required_labels_list: Optional[List[Dict[str, str]]] = None,
node_labels: Optional[Dict[str, Dict[str, str]]] = None,
) -> Optional[str]:
Expand All @@ -915,6 +960,17 @@ def _find_best_fit_node_for_pack(
If there are available nodes, returns the node ID of the best
available node, minimizing fragmentation. Prefers non-idle nodes
over idle nodes.

Args:
required_resources: Resources needed for this replica.
available_resources_per_node: Available resources per node.
node_to_running_replicas: Pre-computed mapping of node IDs
to running replica IDs.
required_labels_list: Label selectors to filter nodes.
node_labels: Labels for each node.

Returns:
The target node ID if scheduling succeeded, None otherwise.
"""

# Filter feasible nodes by provided label selectors if provided.
Expand All @@ -926,8 +982,6 @@ def _find_best_fit_node_for_pack(
if not available_resources_per_node:
return None

node_to_running_replicas = self._get_node_to_running_replicas()

non_idle_nodes = {
node_id: res
for node_id, res in available_resources_per_node.items()
Expand Down
156 changes: 156 additions & 0 deletions python/ray/serve/tests/unit/test_deployment_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
DeploymentDownscaleRequest,
DeploymentSchedulingInfo,
ReplicaSchedulingRequest,
ReplicaSchedulingRequestStatus,
Resources,
SpreadDeploymentSchedulingPolicy,
)
Expand Down Expand Up @@ -1424,6 +1425,161 @@ def on_scheduled(actor_handle, placement_group):
downscales={},
)

def test_actor_creation_failure_does_not_decrement_resources(self):
"""When actor creation fails for a replica, available resources
should not be decremented so subsequent replicas in the same
scheduling batch can still use that node.
"""

d_id = DeploymentID(name="deployment1")
node_id = NodeID.from_random().hex()

cluster_node_info_cache = MockClusterNodeInfoCache()
# Node has exactly 2 CPUs — enough for two 1-CPU replicas.
cluster_node_info_cache.add_node(node_id, {"CPU": 2})

scheduler = default_impl.create_deployment_scheduler(
cluster_node_info_cache,
head_node_id_override="fake-head-node-id",
create_placement_group_fn_override=None,
)
scheduler.on_deployment_created(d_id, SpreadDeploymentSchedulingPolicy())
scheduler.on_deployment_deployed(
d_id,
ReplicaConfig.create(dummy, ray_actor_options={"num_cpus": 1}),
)

# Create a mock actor class whose .options().remote() raises on the
# first call (simulating actor creation failure) but succeeds after.
call_count = 0

class FailOnceMockActorClass(MockActorClass):
def remote(self, *args):
nonlocal call_count
call_count += 1
if call_count == 1:
raise RuntimeError("Simulated actor creation failure")
return super().remote(*args)

on_scheduled_mock = Mock()
r0_id = ReplicaID(unique_id="r0", deployment_id=d_id)
r1_id = ReplicaID(unique_id="r1", deployment_id=d_id)

req0 = ReplicaSchedulingRequest(
replica_id=r0_id,
actor_def=FailOnceMockActorClass(),
actor_resources={"CPU": 1},
actor_options={},
actor_init_args=(),
on_scheduled=on_scheduled_mock,
)
req1 = ReplicaSchedulingRequest(
replica_id=r1_id,
actor_def=MockActorClass(),
actor_resources={"CPU": 1},
actor_options={},
actor_init_args=(),
on_scheduled=on_scheduled_mock,
)

scheduler.schedule(
upscales={d_id: [req0, req1]},
downscales={},
)

# The first replica should have failed.
assert req0.status == ReplicaSchedulingRequestStatus.ACTOR_CREATION_FAILED

# The second replica should have succeeded and been scheduled to the
# node.
assert req1.status == ReplicaSchedulingRequestStatus.SUCCEEDED
assert on_scheduled_mock.call_count == 1
call = on_scheduled_mock.call_args_list[0]
scheduling_strategy = call.args[0]._options["scheduling_strategy"]
assert isinstance(scheduling_strategy, NodeAffinitySchedulingStrategy)
assert scheduling_strategy.node_id == node_id

def test_pg_creation_failure_does_not_decrement_resources(self):
"""When placement group creation fails for a replica, available
resources should not be decremented so subsequent replicas in the
same scheduling batch can still use that node.
"""

d_id = DeploymentID(name="deployment1")
node_id = NodeID.from_random().hex()

cluster_node_info_cache = MockClusterNodeInfoCache()
# Node has exactly 2 CPUs — enough for two replicas with 1-CPU PGs.
cluster_node_info_cache.add_node(node_id, {"CPU": 2})

call_count = 0

def fail_once_create_pg(request):
nonlocal call_count
call_count += 1
if call_count == 1:
raise RuntimeError("Simulated PG creation failure")
return MockPlacementGroup(request)

scheduler = default_impl.create_deployment_scheduler(
cluster_node_info_cache,
head_node_id_override="fake-head-node-id",
create_placement_group_fn_override=fail_once_create_pg,
)
scheduler.on_deployment_created(d_id, SpreadDeploymentSchedulingPolicy())
scheduler.on_deployment_deployed(
d_id,
ReplicaConfig.create(
dummy,
ray_actor_options={"num_cpus": 0},
placement_group_bundles=[{"CPU": 1}],
placement_group_strategy="STRICT_PACK",
),
)

on_scheduled_mock = Mock()
r0_id = ReplicaID(unique_id="r0", deployment_id=d_id)
r1_id = ReplicaID(unique_id="r1", deployment_id=d_id)

req0 = ReplicaSchedulingRequest(
replica_id=r0_id,
actor_def=MockActorClass(),
actor_resources={"CPU": 0},
placement_group_bundles=[{"CPU": 1}],
placement_group_strategy="STRICT_PACK",
actor_options={"name": "r0"},
actor_init_args=(),
on_scheduled=on_scheduled_mock,
)
req1 = ReplicaSchedulingRequest(
replica_id=r1_id,
actor_def=MockActorClass(),
actor_resources={"CPU": 0},
placement_group_bundles=[{"CPU": 1}],
placement_group_strategy="STRICT_PACK",
actor_options={"name": "r1"},
actor_init_args=(),
on_scheduled=on_scheduled_mock,
)

scheduler.schedule(
upscales={d_id: [req0, req1]},
downscales={},
)

# The first replica should have failed at PG creation.
assert (
req0.status
== ReplicaSchedulingRequestStatus.PLACEMENT_GROUP_CREATION_FAILED
)

# The second replica should still succeed.
assert req1.status == ReplicaSchedulingRequestStatus.SUCCEEDED
assert on_scheduled_mock.call_count == 1
call = on_scheduled_mock.call_args_list[0]
scheduling_strategy = call.args[0]._options["scheduling_strategy"]
assert isinstance(scheduling_strategy, PlacementGroupSchedulingStrategy)


if __name__ == "__main__":
sys.exit(pytest.main(["-v", "-s", __file__]))