Skip to content

Commit 935160a

Browse files
ryanaolearyzcinabrarsheikhMengjinYan
authored andcommitted
[Serve] Add label_selector and bundle_label_selector to Serve API (ray-project#57694)
<!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Why are these changes needed? This PR adds the `label_selector` option to the supported list of Actor options for a Serve deployment. Additionally, we add `bundle_label_selector` to specify label selectors for bundles when `placement_group_bundles` are specified for the deployment. These two options are already supported for Tasks/Actors and placement groups respectively. Example use case: ``` llm_config = LLMConfig( model_loading_config={ "model_id": "meta-llama/Meta-Llama-3-70B-Instruct", "model_source": "huggingface", }, engine_kwargs=tpu_engine_config, resources_per_bundle={"TPU": 4}, runtime_env={"env_vars": {"VLLM_USE_V1": "1"}}, deployment_config={ "num_replicas": 4, "ray_actor_options": { # In a GKE cluster with multiple TPU node-pools, schedule # only to the desired slice. "label_selector": { "ray.io/tpu-topology": "4x4" # added by default by Ray } } } ) ``` The expected behaviors of these new fields is as follows: **Pack scheduling enabled** ---------------------------------------- **PACK/STRICT_PACK PG strategy:** - Standard PG without bundle_label_selector or fallback: - Sorts replicas by resource size (descending). Attempts to find the "best fit" node (minimizing fragmentation) that has available resources. Creates a Placement Group on that target node. - PG node label selector provided: - Same behavior as regular placement group but filters the list of candidate nodes to only those matching the label selector before finding the best fit - PG node label selector and fallback: Same as above but when scheduling tries the following: 1. Tries to find a node matching the primary placement_group_bundles and bundle_label_selector. 2. If no node fits, iterates through the placement_group_fallback_strategy. For each fallback entry, tries to find a node matching that entry's bundles and labels. 3. If a node is found, creates a PG on it. **SPREAD/STRICT_SPREAD PG strategy:** - If any deployment uses these strategies, the global logic falls back to "Spread Scheduling" (see below) **Spread scheduling enabled** ---------------------------------------- - Standard PG without bundle_label_selector or fallback: - Creates a Placement Group via Ray Core without specifying a target_node_id. Ray Core decides placement based on the strategy. - PG node label selector provided: - Serve passes the bundle_label_selector to the CreatePlacementGroupRequest. Ray Core handles the soft/hard constraint logic during PG creation. - PG node label selector and fallback: - Serve passes the bundle_label_selector to the CreatePlacementGroupRequest, fallback_strategy is not yet supported in the placement group options so this field isn't passed / considered. It's only used in the "best fit" node selection logic which is skipped for Spread scheduling. ## Related issue number ray-project#51564 ## Checks - [x] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [x] I've run pre-commit jobs to lint the changes in this PR. ([pre-commit setup](https://docs.ray.io/en/latest/ray-contribute/getting-involved.html#lint-and-formatting)) - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [x] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [x] Unit tests - [ ] Release tests - [ ] This PR is not tested :( --------- Signed-off-by: Ryan O'Leary <ryanaoleary@google.com> Signed-off-by: ryanaoleary <ryanaoleary@google.com> Signed-off-by: Ryan O'Leary <113500783+ryanaoleary@users.noreply.github.com> Co-authored-by: Cindy Zhang <cindyzyx9@gmail.com> Co-authored-by: Abrar Sheikh <abrar2002as@gmail.com> Co-authored-by: Mengjin Yan <mengjinyan3@gmail.com>
1 parent ddd44fe commit 935160a

29 files changed

+1678
-119
lines changed

python/ray/_raylet.pyx

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,8 @@ from ray.includes.common cimport (
103103
CLabelMatchExpression,
104104
CLabelIn,
105105
CLabelNotIn,
106+
CLabelSelector,
107+
CNodeResources,
106108
CRayFunction,
107109
CWorkerType,
108110
CJobConfig,
@@ -141,6 +143,7 @@ from ray.includes.common cimport (
141143
PersistPort,
142144
WaitForPersistedPort,
143145
CWaitForPersistedPortResult,
146+
SetNodeResourcesLabels,
144147
)
145148
from ray.includes.unique_ids cimport (
146149
CActorID,
@@ -597,6 +600,23 @@ cdef int prepare_label_selector(
597600

598601
return 0
599602

603+
def node_labels_match_selector(node_labels: Dict[str, str], selector: Dict[str, str]) -> bool:
604+
"""
605+
Checks if the given node labels satisfy the label selector. This helper function exposes
606+
the C++ logic for determining if a node satisfies a label selector to the Python layer.
607+
"""
608+
cdef:
609+
CNodeResources c_node_resources
610+
CLabelSelector c_label_selector
611+
unordered_map[c_string, c_string] c_labels_map
612+
613+
prepare_labels(node_labels, &c_labels_map)
614+
SetNodeResourcesLabels(c_node_resources, c_labels_map)
615+
prepare_label_selector(selector, &c_label_selector)
616+
617+
# Return whether the node resources satisfy the label constraint.
618+
return c_node_resources.HasRequiredLabels(c_label_selector)
619+
600620
cdef int prepare_fallback_strategy(
601621
list fallback_strategy,
602622
c_vector[CFallbackOption] *fallback_strategy_vector) except -1:

python/ray/includes/common.pxd

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,14 @@ cdef extern from "src/ray/protobuf/common.pb.h" nogil:
275275
CLineageReconstructionTask()
276276
const c_string &SerializeAsString() const
277277

278+
cdef extern from "ray/common/scheduling/cluster_resource_data.h" namespace "ray" nogil:
279+
cdef cppclass CNodeResources "ray::NodeResources":
280+
CNodeResources()
281+
unordered_map[c_string, c_string] labels
282+
c_bool HasRequiredLabels(const CLabelSelector &label_selector) const
283+
284+
void SetNodeResourcesLabels(CNodeResources& resources, const unordered_map[c_string, c_string]& labels)
285+
278286
cdef extern from "ray/common/scheduling/label_selector.h" namespace "ray":
279287
cdef cppclass CLabelSelector "ray::LabelSelector":
280288
CLabelSelector() nogil except +

python/ray/serve/_private/application_state.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1707,6 +1707,14 @@ def override_deployment_info(
17071707
override_max_replicas_per_node = options.pop(
17081708
"max_replicas_per_node", replica_config.max_replicas_per_node
17091709
)
1710+
override_bundle_label_selector = options.pop(
1711+
"placement_group_bundle_label_selector",
1712+
replica_config.placement_group_bundle_label_selector,
1713+
)
1714+
override_fallback_strategy = options.pop(
1715+
"placement_group_fallback_strategy",
1716+
replica_config.placement_group_fallback_strategy,
1717+
)
17101718

17111719
# Record telemetry for container runtime env feature at deployment level
17121720
if override_actor_options.get("runtime_env") and (
@@ -1725,6 +1733,8 @@ def override_deployment_info(
17251733
placement_group_bundles=override_placement_group_bundles,
17261734
placement_group_strategy=override_placement_group_strategy,
17271735
max_replicas_per_node=override_max_replicas_per_node,
1736+
placement_group_bundle_label_selector=override_bundle_label_selector,
1737+
placement_group_fallback_strategy=override_fallback_strategy,
17281738
)
17291739
override_options["replica_config"] = replica_config
17301740

python/ray/serve/_private/cluster_node_info_cache.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,10 @@ def get_available_resources_per_node(self) -> Dict[str, Union[float, Dict]]:
8989

9090
return self._cached_available_resources_per_node
9191

92+
def get_node_labels(self, node_id: str) -> Dict[str, str]:
93+
"""Get the labels for a specific node from the cache."""
94+
return self._cached_node_labels.get(node_id, {})
95+
9296

9397
class DefaultClusterNodeInfoCache(ClusterNodeInfoCache):
9498
def __init__(self, gcs_client: GcsClient):

python/ray/serve/_private/common.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -855,6 +855,8 @@ class CreatePlacementGroupRequest:
855855
target_node_id: str
856856
name: str
857857
runtime_env: Optional[str] = None
858+
bundle_label_selector: Optional[List[Dict[str, str]]] = None
859+
fallback_strategy: Optional[List[Dict[str, Any]]] = None
858860

859861

860862
# This error is used to raise when a by-value DeploymentResponse is converted to an

python/ray/serve/_private/config.py

Lines changed: 132 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -480,6 +480,8 @@ def __init__(
480480
ray_actor_options: Dict,
481481
placement_group_bundles: Optional[List[Dict[str, float]]] = None,
482482
placement_group_strategy: Optional[str] = None,
483+
placement_group_bundle_label_selector: Optional[List[Dict[str, str]]] = None,
484+
placement_group_fallback_strategy: Optional[List[Dict[str, Any]]] = None,
483485
max_replicas_per_node: Optional[int] = None,
484486
needs_pickle: bool = True,
485487
):
@@ -505,9 +507,14 @@ def __init__(
505507

506508
self.placement_group_bundles = placement_group_bundles
507509
self.placement_group_strategy = placement_group_strategy
510+
self.placement_group_bundle_label_selector = (
511+
placement_group_bundle_label_selector
512+
)
513+
self.placement_group_fallback_strategy = placement_group_fallback_strategy
508514

509515
self.max_replicas_per_node = max_replicas_per_node
510516

517+
self._normalize_bundle_label_selector()
511518
self._validate()
512519

513520
# Create resource_dict. This contains info about the replica's resource
@@ -516,6 +523,21 @@ def __init__(
516523
self.resource_dict = resources_from_ray_options(self.ray_actor_options)
517524
self.needs_pickle = needs_pickle
518525

526+
def _normalize_bundle_label_selector(self):
527+
"""If a single selector is provided for multiple bundles, it is broadcasted
528+
uniformly to all bundles.
529+
"""
530+
if (
531+
self.placement_group_bundles
532+
and self.placement_group_bundle_label_selector
533+
and len(self.placement_group_bundle_label_selector) == 1
534+
and len(self.placement_group_bundles) > 1
535+
):
536+
single_selector = self.placement_group_bundle_label_selector[0]
537+
self.placement_group_bundle_label_selector = [
538+
single_selector.copy() for _ in range(len(self.placement_group_bundles))
539+
]
540+
519541
def _validate(self):
520542
self._validate_ray_actor_options()
521543
self._validate_placement_group_options()
@@ -535,15 +557,22 @@ def update(
535557
ray_actor_options: dict,
536558
placement_group_bundles: Optional[List[Dict[str, float]]] = None,
537559
placement_group_strategy: Optional[str] = None,
560+
placement_group_bundle_label_selector: Optional[List[Dict[str, str]]] = None,
561+
placement_group_fallback_strategy: Optional[List[Dict[str, Any]]] = None,
538562
max_replicas_per_node: Optional[int] = None,
539563
):
540564
self.ray_actor_options = ray_actor_options
541565

542566
self.placement_group_bundles = placement_group_bundles
543567
self.placement_group_strategy = placement_group_strategy
568+
self.placement_group_bundle_label_selector = (
569+
placement_group_bundle_label_selector
570+
)
571+
self.placement_group_fallback_strategy = placement_group_fallback_strategy
544572

545573
self.max_replicas_per_node = max_replicas_per_node
546574

575+
self._normalize_bundle_label_selector()
547576
self._validate()
548577

549578
self.resource_dict = resources_from_ray_options(self.ray_actor_options)
@@ -557,6 +586,8 @@ def create(
557586
ray_actor_options: Optional[Dict] = None,
558587
placement_group_bundles: Optional[List[Dict[str, float]]] = None,
559588
placement_group_strategy: Optional[str] = None,
589+
placement_group_bundle_label_selector: Optional[List[Dict[str, str]]] = None,
590+
placement_group_fallback_strategy: Optional[List[Dict[str, Any]]] = None,
560591
max_replicas_per_node: Optional[int] = None,
561592
deployment_def_name: Optional[str] = None,
562593
):
@@ -597,17 +628,23 @@ def create(
597628
deployment_def_name = deployment_def.__name__
598629

599630
config = cls(
600-
deployment_def_name,
601-
pickle_dumps(
631+
deployment_def_name=deployment_def_name,
632+
serialized_deployment_def=pickle_dumps(
602633
deployment_def,
603634
f"Could not serialize the deployment {repr(deployment_def)}",
604635
),
605-
pickle_dumps(init_args, "Could not serialize the deployment init args"),
606-
pickle_dumps(init_kwargs, "Could not serialize the deployment init kwargs"),
607-
ray_actor_options,
608-
placement_group_bundles,
609-
placement_group_strategy,
610-
max_replicas_per_node,
636+
serialized_init_args=pickle_dumps(
637+
init_args, "Could not serialize the deployment init args"
638+
),
639+
serialized_init_kwargs=pickle_dumps(
640+
init_kwargs, "Could not serialize the deployment init kwargs"
641+
),
642+
ray_actor_options=ray_actor_options,
643+
placement_group_bundles=placement_group_bundles,
644+
placement_group_strategy=placement_group_strategy,
645+
placement_group_bundle_label_selector=placement_group_bundle_label_selector,
646+
placement_group_fallback_strategy=placement_group_fallback_strategy,
647+
max_replicas_per_node=max_replicas_per_node,
611648
)
612649

613650
config._deployment_def = deployment_def
@@ -633,6 +670,8 @@ def _validate_ray_actor_options(self):
633670
"resources",
634671
# Other options
635672
"runtime_env",
673+
"label_selector",
674+
"fallback_strategy",
636675
}
637676

638677
for option in self.ray_actor_options:
@@ -674,11 +713,37 @@ def _validate_placement_group_options(self) -> None:
674713
"`placement_group_bundles` must also be provided."
675714
)
676715

716+
if self.placement_group_fallback_strategy is not None:
717+
if self.placement_group_bundles is None:
718+
raise ValueError(
719+
"If `placement_group_fallback_strategy` is provided, "
720+
"`placement_group_bundles` must also be provided."
721+
)
722+
if not isinstance(self.placement_group_fallback_strategy, list):
723+
raise TypeError(
724+
"placement_group_fallback_strategy must be a list of dictionaries. "
725+
f"Got: {type(self.placement_group_fallback_strategy)}."
726+
)
727+
for i, strategy in enumerate(self.placement_group_fallback_strategy):
728+
if not isinstance(strategy, dict):
729+
raise TypeError(
730+
f"placement_group_fallback_strategy entry at index {i} must be a dictionary. "
731+
f"Got: {type(strategy)}."
732+
)
733+
734+
if self.placement_group_bundle_label_selector is not None:
735+
if self.placement_group_bundles is None:
736+
raise ValueError(
737+
"If `placement_group_bundle_label_selector` is provided, "
738+
"`placement_group_bundles` must also be provided."
739+
)
740+
677741
if self.placement_group_bundles is not None:
678742
validate_placement_group(
679743
bundles=self.placement_group_bundles,
680744
strategy=self.placement_group_strategy or "PACK",
681745
lifetime="detached",
746+
bundle_label_selector=self.placement_group_bundle_label_selector,
682747
)
683748

684749
resource_error_prefix = (
@@ -772,19 +837,37 @@ def init_kwargs(self) -> Optional[Tuple[Any]]:
772837
@classmethod
773838
def from_proto(cls, proto: ReplicaConfigProto, needs_pickle: bool = True):
774839
return ReplicaConfig(
775-
proto.deployment_def_name,
776-
proto.deployment_def,
777-
proto.init_args if proto.init_args != b"" else None,
778-
proto.init_kwargs if proto.init_kwargs != b"" else None,
779-
json.loads(proto.ray_actor_options),
780-
json.loads(proto.placement_group_bundles)
781-
if proto.placement_group_bundles
782-
else None,
783-
proto.placement_group_strategy
784-
if proto.placement_group_strategy != ""
785-
else None,
786-
proto.max_replicas_per_node if proto.max_replicas_per_node else None,
787-
needs_pickle,
840+
deployment_def_name=proto.deployment_def_name,
841+
serialized_deployment_def=proto.deployment_def,
842+
serialized_init_args=(proto.init_args if proto.init_args != b"" else None),
843+
serialized_init_kwargs=(
844+
proto.init_kwargs if proto.init_kwargs != b"" else None
845+
),
846+
ray_actor_options=json.loads(proto.ray_actor_options),
847+
placement_group_bundles=(
848+
json.loads(proto.placement_group_bundles)
849+
if proto.placement_group_bundles
850+
else None
851+
),
852+
placement_group_strategy=(
853+
proto.placement_group_strategy
854+
if proto.placement_group_strategy != ""
855+
else None
856+
),
857+
placement_group_bundle_label_selector=(
858+
json.loads(proto.placement_group_bundle_label_selector)
859+
if proto.placement_group_bundle_label_selector
860+
else None
861+
),
862+
placement_group_fallback_strategy=(
863+
json.loads(proto.placement_group_fallback_strategy)
864+
if proto.placement_group_fallback_strategy
865+
else None
866+
),
867+
max_replicas_per_node=(
868+
proto.max_replicas_per_node if proto.max_replicas_per_node else None
869+
),
870+
needs_pickle=needs_pickle,
788871
)
789872

790873
@classmethod
@@ -793,19 +876,39 @@ def from_proto_bytes(cls, proto_bytes: bytes, needs_pickle: bool = True):
793876
return cls.from_proto(proto, needs_pickle)
794877

795878
def to_proto(self):
879+
placement_group_bundles = (
880+
json.dumps(self.placement_group_bundles)
881+
if self.placement_group_bundles is not None
882+
else ""
883+
)
884+
885+
bundle_label_selector = (
886+
json.dumps(self.placement_group_bundle_label_selector)
887+
if self.placement_group_bundle_label_selector is not None
888+
else ""
889+
)
890+
891+
fallback_strategy = (
892+
json.dumps(self.placement_group_fallback_strategy)
893+
if self.placement_group_fallback_strategy is not None
894+
else ""
895+
)
896+
897+
max_replicas_per_node = (
898+
self.max_replicas_per_node if self.max_replicas_per_node is not None else 0
899+
)
900+
796901
return ReplicaConfigProto(
797902
deployment_def_name=self.deployment_def_name,
798903
deployment_def=self.serialized_deployment_def,
799904
init_args=self.serialized_init_args,
800905
init_kwargs=self.serialized_init_kwargs,
801906
ray_actor_options=json.dumps(self.ray_actor_options),
802-
placement_group_bundles=json.dumps(self.placement_group_bundles)
803-
if self.placement_group_bundles is not None
804-
else "",
907+
placement_group_bundles=placement_group_bundles,
805908
placement_group_strategy=self.placement_group_strategy,
806-
max_replicas_per_node=self.max_replicas_per_node
807-
if self.max_replicas_per_node is not None
808-
else 0,
909+
placement_group_bundle_label_selector=bundle_label_selector,
910+
placement_group_fallback_strategy=fallback_strategy,
911+
max_replicas_per_node=max_replicas_per_node,
809912
)
810913

811914
def to_proto_bytes(self):
@@ -818,6 +921,8 @@ def to_dict(self):
818921
"ray_actor_options": self.ray_actor_options,
819922
"placement_group_bundles": self.placement_group_bundles,
820923
"placement_group_strategy": self.placement_group_strategy,
924+
"placement_group_bundle_label_selector": self.placement_group_bundle_label_selector,
925+
"placement_group_fallback_strategy": self.placement_group_fallback_strategy,
821926
"max_replicas_per_node": self.max_replicas_per_node,
822927
}
823928

python/ray/serve/_private/default_impl.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ def _default_create_placement_group(
6666
_soft_target_node_id=request.target_node_id,
6767
name=request.name,
6868
lifetime="detached",
69+
bundle_label_selector=request.bundle_label_selector,
6970
)
7071

7172

0 commit comments

Comments
 (0)