Skip to content
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 56 additions & 6 deletions python/ray/serve/_private/deployment_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -728,8 +728,31 @@ def _schedule_with_pack_strategy(self):
for node_id in active_nodes
}

# Compute available resources once upfront and update incrementally
# as replicas are scheduled.
# Complexity is O(launching + running + nodes + requests * nodes).
available_resources_per_node = self._get_available_resources_per_node()

# Running replicas don't change during scheduling, so compute once.
node_to_running_replicas = self._get_node_to_running_replicas()

for scheduling_request in all_scheduling_requests:
self._pack_schedule_replica(scheduling_request, all_node_labels)
target_node = self._pack_schedule_replica(
scheduling_request,
all_node_labels,
available_resources_per_node,
node_to_running_replicas,
)
# Incrementally update available resources for the target node.
# This is slightly conservative compared to recomputing from
# scratch (since we subtract from the min of GCS and calculated
# resources rather than only from the calculated side), but
# _get_available_resources_per_node is already best-effort.
if target_node and target_node in available_resources_per_node:
available_resources_per_node[target_node] = (
available_resources_per_node[target_node]
- scheduling_request.required_resources
)

def _schedule_with_spread_strategy(self):
"""Tries to schedule pending replicas using the SPREAD strategy."""
Expand All @@ -747,16 +770,31 @@ def _pack_schedule_replica(
self,
scheduling_request: ReplicaSchedulingRequest,
all_node_labels: Dict[str, Dict[str, str]],
):
"""Attempts to schedule a single request on the best available node."""
available_resources_per_node: Dict[str, Resources],
node_to_running_replicas: Dict[str, Set[ReplicaID]],
) -> Optional[str]:
"""Attempts to schedule a single request on the best available node.

Args:
scheduling_request: The replica scheduling request.
all_node_labels: Labels for all active nodes.
available_resources_per_node: Pre-computed available resources
per node.
node_to_running_replicas: Pre-computed mapping of node IDs to
running replica IDs.

Returns:
The target node ID if scheduling succeeded, None otherwise.
"""

placement_candidates = self._build_pack_placement_candidates(scheduling_request)

target_node = None
for required_resources, required_labels in placement_candidates:
target_node = self._find_best_fit_node_for_pack(
required_resources,
self._get_available_resources_per_node(),
available_resources_per_node,
node_to_running_replicas,
required_labels_list=required_labels,
node_labels=all_node_labels,
)
Expand All @@ -769,6 +807,8 @@ def _pack_schedule_replica(
target_node_id=target_node,
)

return target_node

def _build_pack_placement_candidates(
self, scheduling_request: ReplicaSchedulingRequest
) -> List[Tuple[Resources, List[Dict[str, str]]]]:
Expand Down Expand Up @@ -907,6 +947,7 @@ def _find_best_fit_node_for_pack(
self,
required_resources: Resources,
available_resources_per_node: Dict[str, Resources],
node_to_running_replicas: Dict[str, Set[ReplicaID]],
required_labels_list: Optional[List[Dict[str, str]]] = None,
node_labels: Optional[Dict[str, Dict[str, str]]] = None,
) -> Optional[str]:
Expand All @@ -915,6 +956,17 @@ def _find_best_fit_node_for_pack(
If there are available nodes, returns the node ID of the best
available node, minimizing fragmentation. Prefers non-idle nodes
over idle nodes.

Args:
required_resources: Resources needed for this replica.
available_resources_per_node: Available resources per node.
node_to_running_replicas: Pre-computed mapping of node IDs
to running replica IDs.
required_labels_list: Label selectors to filter nodes.
node_labels: Labels for each node.

Returns:
The target node ID if scheduling succeeded, None otherwise.
"""

# Filter feasible nodes by provided label selectors if provided.
Expand All @@ -926,8 +978,6 @@ def _find_best_fit_node_for_pack(
if not available_resources_per_node:
return None

node_to_running_replicas = self._get_node_to_running_replicas()

non_idle_nodes = {
node_id: res
for node_id, res in available_resources_per_node.items()
Expand Down