Skip to content

Commit 51971fa

Browse files
authored
Split task/resource preparation utils from _raylet.pyx (#63088)
## Description Splits the task/resource preparation helpers out of python/ray/_raylet.pyx into a new Cython include file python/ray/includes/task_options_utils.pxi, following the existing .pxi split pattern (e.g. raylet_client.pxi, function_descriptor.pxi). Helpers moved: - prepare_labels - prepare_label_selector - node_labels_match_selector - prepare_fallback_strategy - prepare_resources - prepare_function_descriptors - prepare_actor_concurrency_groups _raylet.pyx now includes the new file alongside the other includes. The new .pxi declares its own cimports (libcpp, ray.includes.common, ray.includes.function_descriptor, ray.includes.ray_config). ## Related issues Closes #63082 Signed-off-by: Omkar Kabde <omkarkabde@gmail.com>
1 parent dc56cbb commit 51971fa

2 files changed

Lines changed: 169 additions & 149 deletions

File tree

python/ray/_raylet.pyx

Lines changed: 1 addition & 149 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,7 @@ include "includes/setproctitle.pxi"
198198
include "includes/raylet_client.pxi"
199199
include "includes/gcs_subscriber.pxi"
200200
include "includes/rpc_token_authentication.pxi"
201+
include "includes/task_options_utils.pxi"
201202
# Ray Serve-only: Cython timeseries utilities for autoscaling metrics.
202203
include "includes/timeseries_utils.pxi"
203204

@@ -563,155 +564,6 @@ cdef class Language:
563564
JAVA = Language.from_native(LANGUAGE_JAVA)
564565

565566

566-
cdef int prepare_labels(
567-
dict label_dict,
568-
unordered_map[c_string, c_string] *label_map) except -1:
569-
570-
if label_dict is None:
571-
return 0
572-
573-
label_map[0].reserve(len(label_dict))
574-
for key, value in label_dict.items():
575-
if not isinstance(key, str):
576-
raise ValueError(f"Label key must be string, but got {type(key)}")
577-
if not isinstance(value, str):
578-
raise ValueError(f"Label value must be string, but got {type(value)}")
579-
label_map[0][key.encode("utf-8")] = value.encode("utf-8")
580-
581-
return 0
582-
583-
cdef int prepare_label_selector(
584-
dict label_selector_dict,
585-
CLabelSelector *c_label_selector) except -1:
586-
587-
c_label_selector[0] = CLabelSelector()
588-
589-
if label_selector_dict is None:
590-
return 0
591-
592-
for key, value in label_selector_dict.items():
593-
if not isinstance(key, str):
594-
raise ValueError(f"Label selector key type must be string, but got {type(key)}")
595-
if not isinstance(value, str):
596-
raise ValueError(f"Label selector value must be string, but got {type(value)}")
597-
if key == "":
598-
raise ValueError("Label selector key must be a non-empty string.")
599-
if (value.startswith("in(") and value.endswith(")")) or \
600-
(value.startswith("!in(") and value.endswith(")")):
601-
inner = value[value.index("(")+1:-1].strip()
602-
if not inner:
603-
raise ValueError(f"No values provided for Label Selector '{value[:value.index('(')]}' operator on key '{key}'.")
604-
# Add key-value constraint to the LabelSelector object.
605-
c_label_selector[0].AddConstraint(key.encode("utf-8"), value.encode("utf-8"))
606-
607-
return 0
608-
609-
def node_labels_match_selector(node_labels: Dict[str, str], selector: Dict[str, str]) -> bool:
610-
"""
611-
Checks if the given node labels satisfy the label selector. This helper function exposes
612-
the C++ logic for determining if a node satisfies a label selector to the Python layer.
613-
"""
614-
cdef:
615-
CNodeResources c_node_resources
616-
CLabelSelector c_label_selector
617-
unordered_map[c_string, c_string] c_labels_map
618-
619-
prepare_labels(node_labels, &c_labels_map)
620-
SetNodeResourcesLabels(c_node_resources, c_labels_map)
621-
prepare_label_selector(selector, &c_label_selector)
622-
623-
# Return whether the node resources satisfy the label constraint.
624-
return c_node_resources.HasRequiredLabels(c_label_selector)
625-
626-
cdef int prepare_fallback_strategy(
627-
list fallback_strategy,
628-
c_vector[CFallbackOption] *fallback_strategy_vector) except -1:
629-
630-
cdef dict label_selector_dict
631-
cdef CLabelSelector c_label_selector
632-
633-
if fallback_strategy is None:
634-
return 0
635-
636-
for strategy_dict in fallback_strategy:
637-
if not isinstance(strategy_dict, dict):
638-
raise ValueError(
639-
"Fallback strategy must be a list of dicts, "
640-
f"but got list containing {type(strategy_dict)}")
641-
642-
label_selector_dict = strategy_dict.get("label_selector")
643-
644-
if label_selector_dict is not None and not isinstance(label_selector_dict, dict):
645-
raise ValueError("Invalid fallback strategy element: invalid 'label_selector'.")
646-
647-
prepare_label_selector(label_selector_dict, &c_label_selector)
648-
649-
fallback_strategy_vector.push_back(
650-
CFallbackOption(c_label_selector)
651-
)
652-
653-
return 0
654-
655-
cdef int prepare_resources(
656-
dict resource_dict,
657-
unordered_map[c_string, double] *resource_map) except -1:
658-
cdef:
659-
list unit_resources
660-
661-
if resource_dict is None:
662-
raise ValueError("Must provide resource map.")
663-
664-
resource_map[0].reserve(len(resource_dict))
665-
for key, value in resource_dict.items():
666-
if not (isinstance(value, int) or isinstance(value, float)):
667-
raise ValueError("Resource quantities may only be ints or floats.")
668-
if value < 0:
669-
raise ValueError("Resource quantities may not be negative.")
670-
if value > 0:
671-
unit_resources = (
672-
f"{RayConfig.instance().predefined_unit_instance_resources()\
673-
.decode('utf-8')},"
674-
f"{RayConfig.instance().custom_unit_instance_resources()\
675-
.decode('utf-8')}"
676-
).split(",")
677-
678-
if (value >= 1 and isinstance(value, float)
679-
and not value.is_integer() and str(key) in unit_resources):
680-
raise ValueError(
681-
f"{key} resource quantities >1 must",
682-
f" be whole numbers. The specified quantity {value} is invalid.")
683-
resource_map[0][key.encode("ascii")] = float(value)
684-
return 0
685-
686-
cdef c_vector[CFunctionDescriptor] prepare_function_descriptors(pyfd_list):
687-
cdef:
688-
c_vector[CFunctionDescriptor] fd_list
689-
690-
fd_list.reserve(len(pyfd_list))
691-
for pyfd in pyfd_list:
692-
fd_list.push_back(CFunctionDescriptorBuilder.BuildPython(
693-
pyfd.module_name, pyfd.class_name, pyfd.function_name, b""))
694-
return fd_list
695-
696-
697-
cdef int prepare_actor_concurrency_groups(
698-
dict concurrency_groups_dict,
699-
c_vector[CConcurrencyGroup] *concurrency_groups):
700-
701-
cdef:
702-
c_vector[CFunctionDescriptor] c_fd_list
703-
704-
if concurrency_groups_dict is None:
705-
raise ValueError("Must provide it...")
706-
707-
concurrency_groups.reserve(len(concurrency_groups_dict))
708-
for key, value in concurrency_groups_dict.items():
709-
c_fd_list = prepare_function_descriptors(value["function_descriptors"])
710-
concurrency_groups.push_back(CConcurrencyGroup(
711-
key.encode("ascii"), value["max_concurrency"], move(c_fd_list)))
712-
return 1
713-
714-
715567
def raise_sys_exit_with_custom_error_message(
716568
ray_terminate_msg: str,
717569
exit_code: int = 0) -> None:
Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
from typing import Dict
2+
3+
from libcpp.string cimport string as c_string
4+
from libcpp.unordered_map cimport unordered_map
5+
from libcpp.utility cimport move
6+
from libcpp.vector cimport vector as c_vector
7+
8+
from ray.includes.common cimport (
9+
CConcurrencyGroup,
10+
CFallbackOption,
11+
CLabelSelector,
12+
CNodeResources,
13+
SetNodeResourcesLabels,
14+
)
15+
from ray.includes.function_descriptor cimport (
16+
CFunctionDescriptor,
17+
CFunctionDescriptorBuilder,
18+
)
19+
from ray.includes.ray_config cimport RayConfig
20+
21+
22+
cdef int prepare_labels(
23+
dict label_dict,
24+
unordered_map[c_string, c_string] *label_map) except -1:
25+
26+
if label_dict is None:
27+
return 0
28+
29+
label_map[0].reserve(len(label_dict))
30+
for key, value in label_dict.items():
31+
if not isinstance(key, str):
32+
raise ValueError(f"Label key must be string, but got {type(key)}")
33+
if not isinstance(value, str):
34+
raise ValueError(f"Label value must be string, but got {type(value)}")
35+
label_map[0][key.encode("utf-8")] = value.encode("utf-8")
36+
37+
return 0
38+
39+
cdef int prepare_label_selector(
40+
dict label_selector_dict,
41+
CLabelSelector *c_label_selector) except -1:
42+
43+
c_label_selector[0] = CLabelSelector()
44+
45+
if label_selector_dict is None:
46+
return 0
47+
48+
for key, value in label_selector_dict.items():
49+
if not isinstance(key, str):
50+
raise ValueError(f"Label selector key type must be string, but got {type(key)}")
51+
if not isinstance(value, str):
52+
raise ValueError(f"Label selector value must be string, but got {type(value)}")
53+
if key == "":
54+
raise ValueError("Label selector key must be a non-empty string.")
55+
if (value.startswith("in(") and value.endswith(")")) or \
56+
(value.startswith("!in(") and value.endswith(")")):
57+
inner = value[value.index("(")+1:-1].strip()
58+
if not inner:
59+
raise ValueError(f"No values provided for Label Selector '{value[:value.index('(')]}' operator on key '{key}'.")
60+
# Add key-value constraint to the LabelSelector object.
61+
c_label_selector[0].AddConstraint(key.encode("utf-8"), value.encode("utf-8"))
62+
63+
return 0
64+
65+
def node_labels_match_selector(node_labels: Dict[str, str], selector: Dict[str, str]) -> bool:
66+
"""
67+
Checks if the given node labels satisfy the label selector. This helper function exposes
68+
the C++ logic for determining if a node satisfies a label selector to the Python layer.
69+
"""
70+
cdef:
71+
CNodeResources c_node_resources
72+
CLabelSelector c_label_selector
73+
unordered_map[c_string, c_string] c_labels_map
74+
75+
prepare_labels(node_labels, &c_labels_map)
76+
SetNodeResourcesLabels(c_node_resources, c_labels_map)
77+
prepare_label_selector(selector, &c_label_selector)
78+
79+
# Return whether the node resources satisfy the label constraint.
80+
return c_node_resources.HasRequiredLabels(c_label_selector)
81+
82+
cdef int prepare_fallback_strategy(
83+
list fallback_strategy,
84+
c_vector[CFallbackOption] *fallback_strategy_vector) except -1:
85+
86+
cdef dict label_selector_dict
87+
cdef CLabelSelector c_label_selector
88+
89+
if fallback_strategy is None:
90+
return 0
91+
92+
for strategy_dict in fallback_strategy:
93+
if not isinstance(strategy_dict, dict):
94+
raise ValueError(
95+
"Fallback strategy must be a list of dicts, "
96+
f"but got list containing {type(strategy_dict)}")
97+
98+
label_selector_dict = strategy_dict.get("label_selector")
99+
100+
if label_selector_dict is not None and not isinstance(label_selector_dict, dict):
101+
raise ValueError("Invalid fallback strategy element: invalid 'label_selector'.")
102+
103+
prepare_label_selector(label_selector_dict, &c_label_selector)
104+
105+
fallback_strategy_vector.push_back(
106+
CFallbackOption(c_label_selector)
107+
)
108+
109+
return 0
110+
111+
cdef int prepare_resources(
112+
dict resource_dict,
113+
unordered_map[c_string, double] *resource_map) except -1:
114+
cdef:
115+
list unit_resources
116+
117+
if resource_dict is None:
118+
raise ValueError("Must provide resource map.")
119+
120+
resource_map[0].reserve(len(resource_dict))
121+
for key, value in resource_dict.items():
122+
if not (isinstance(value, int) or isinstance(value, float)):
123+
raise ValueError("Resource quantities may only be ints or floats.")
124+
if value < 0:
125+
raise ValueError("Resource quantities may not be negative.")
126+
if value > 0:
127+
unit_resources = (
128+
f"{RayConfig.instance().predefined_unit_instance_resources()\
129+
.decode('utf-8')},"
130+
f"{RayConfig.instance().custom_unit_instance_resources()\
131+
.decode('utf-8')}"
132+
).split(",")
133+
134+
if (value >= 1 and isinstance(value, float)
135+
and not value.is_integer() and str(key) in unit_resources):
136+
raise ValueError(
137+
f"{key} resource quantities >1 must",
138+
f" be whole numbers. The specified quantity {value} is invalid.")
139+
resource_map[0][key.encode("ascii")] = float(value)
140+
return 0
141+
142+
cdef c_vector[CFunctionDescriptor] prepare_function_descriptors(pyfd_list):
143+
cdef:
144+
c_vector[CFunctionDescriptor] fd_list
145+
146+
fd_list.reserve(len(pyfd_list))
147+
for pyfd in pyfd_list:
148+
fd_list.push_back(CFunctionDescriptorBuilder.BuildPython(
149+
pyfd.module_name, pyfd.class_name, pyfd.function_name, b""))
150+
return fd_list
151+
152+
153+
cdef int prepare_actor_concurrency_groups(
154+
dict concurrency_groups_dict,
155+
c_vector[CConcurrencyGroup] *concurrency_groups):
156+
157+
cdef:
158+
c_vector[CFunctionDescriptor] c_fd_list
159+
160+
if concurrency_groups_dict is None:
161+
raise ValueError("Must provide it...")
162+
163+
concurrency_groups.reserve(len(concurrency_groups_dict))
164+
for key, value in concurrency_groups_dict.items():
165+
c_fd_list = prepare_function_descriptors(value["function_descriptors"])
166+
concurrency_groups.push_back(CConcurrencyGroup(
167+
key.encode("ascii"), value["max_concurrency"], move(c_fd_list)))
168+
return 1

0 commit comments

Comments
 (0)