Skip to content

Commit 5d2c4e7

Browse files
[data] Support multiple datasets in a cluster (2/2): partition cluster resources by subcluster label (#63375)
The end goal is to support 2 ray data datasets in 1 cluster with subcluster label scheduling. In such a setup, we have 2 datasets sharing the same AutoscalingCoordinator. The previous PR in this stack (#63331) made sure that each dataset's tasks ended up in the correct subcluster. This PR ensures that all requesters, whether they are trainers or datasets, only request and receive resources in their subcluster. --------- Signed-off-by: Timothy Seah <tseah@anyscale.com> Co-authored-by: Justin Yu <justin.v.yu@gmail.com>
1 parent 7b18437 commit 5d2c4e7

7 files changed

Lines changed: 617 additions & 50 deletions

File tree

python/ray/data/_internal/cluster_autoscaler/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ def create_cluster_autoscaler(
4040
execution_id: str,
4141
) -> ClusterAutoscaler:
4242
resource_limits = data_context.execution_options.resource_limits
43+
label_selector = data_context.execution_options.label_selector
4344
cluster_autoscaler_version = os.environ.get(
4445
CLUSTER_AUTOSCALER_ENV_KEY, DEFAULT_CLUSTER_AUTOSCALER_VERSION
4546
)
@@ -50,6 +51,7 @@ def create_cluster_autoscaler(
5051
resource_manager,
5152
execution_id=execution_id,
5253
resource_limits=resource_limits,
54+
label_selector=label_selector,
5355
)
5456

5557
elif cluster_autoscaler_version == ClusterAutoscalerVersion.V1:

python/ray/data/_internal/cluster_autoscaler/default_autoscaling_coordinator.py

Lines changed: 119 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,13 @@
1919
logger = logging.getLogger(__name__)
2020

2121
HEAD_NODE_RESOURCE_LABEL = "node:__internal_head__"
22+
# Label key the cluster autoscaler uses to bucket nodes by subcluster.
23+
# Hardcoded so all components agree without per-Dataset configuration.
24+
SUBCLUSTER_LABEL_KEY = "__subcluster__"
25+
# Sentinel for "no subcluster" — used as both a node-label fallback and
26+
# the bucket key for unlabeled nodes in ``_cluster_node_resources``.
27+
DEFAULT_SUBCLUSTER: Optional[str] = None
28+
2229

2330
RAY_DATA_AUTOSCALING_COORDINATOR_LOG_TRACEBACK = env_bool(
2431
"RAY_DATA_AUTOSCALING_COORDINATOR_LOG_TRACEBACK", True
@@ -71,8 +78,12 @@ def __init__(
7178
self,
7279
requester_id: str,
7380
autoscaling_coordinator_actor=None, # For testing only: injects an actor instead of using the shared named singleton.
81+
subcluster_selector: Optional[Dict[str, str]] = None,
7482
):
7583
self._requester_id = requester_id
84+
# Label selector keyed by ``SUBCLUSTER_LABEL_KEY`` pinning this
85+
# requester to a single subcluster.
86+
self._subcluster_selector = subcluster_selector
7687
self._cached_allocated_resources: List[ResourceDict] = []
7788
# In-flight get_allocated_resources ref, or None if no request is pending.
7889
self._pending_allocated_resources: Optional[ray.ObjectRef] = None
@@ -83,7 +94,7 @@ def __init__(
8394

8495
@functools.cached_property
8596
def _autoscaling_coordinator(self):
86-
# Create the coordinator actor lazily rather than eagerly in the constructor.
97+
# Lazy: avoids creating the actor in __init__.
8798
return get_or_create_autoscaling_coordinator()
8899

89100
def request_resources(
@@ -105,6 +116,7 @@ def request_resources(
105116
request_remaining=request_remaining,
106117
priority=priority,
107118
label_selectors=label_selectors,
119+
subcluster_selector=self._subcluster_selector,
108120
)
109121

110122
def cancel_request(self) -> None:
@@ -190,7 +202,11 @@ def __init__(
190202
self._get_cluster_nodes = get_cluster_nodes
191203

192204
self._ongoing_reqs: Dict[str, OngoingRequest] = {}
193-
self._cluster_node_resources: List[ResourceDict] = []
205+
# Map from requester id to its subcluster selector.
206+
self._subcluster_selectors: Dict[str, Optional[Dict[str, str]]] = {}
207+
# Node resources bucketed by their ``SUBCLUSTER_LABEL_KEY`` value.
208+
# Nodes without the key fall under ``DEFAULT_SUBCLUSTER``.
209+
self._cluster_node_resources: Dict[Optional[str], List[ResourceDict]] = {}
194210
# Lock for thread-safe access to shared state from the background
195211
self._lock = threading.Lock()
196212
self._update_cluster_node_resources()
@@ -223,12 +239,15 @@ def request_resources(
223239
request_remaining: bool = False,
224240
priority: ResourceRequestPriority = ResourceRequestPriority.MEDIUM,
225241
label_selectors: Optional[List[Dict[str, str]]] = None,
242+
subcluster_selector: Optional[Dict[str, str]] = None,
226243
) -> None:
227244
logger.debug(
228-
"Received request from %s: %s (label_selectors=%s).",
245+
"Received request from %s: %s "
246+
"(label_selectors=%s, subcluster_selector=%s).",
229247
requester_id,
230248
resources,
231249
label_selectors,
250+
subcluster_selector,
232251
)
233252
if label_selectors is None:
234253
label_selectors = [{} for _ in resources]
@@ -237,6 +256,20 @@ def request_resources(
237256
f"label_selectors length ({len(label_selectors)}) must match "
238257
f"resources length ({len(resources)})."
239258
)
259+
if subcluster_selector and label_selectors:
260+
req_subcluster = subcluster_selector.get(SUBCLUSTER_LABEL_KEY)
261+
for i, sel in enumerate(label_selectors):
262+
bundle_subcluster = sel.get(SUBCLUSTER_LABEL_KEY)
263+
if (
264+
bundle_subcluster is not None
265+
and bundle_subcluster != req_subcluster
266+
):
267+
raise ValueError(
268+
f"Bundle {i} label_selector targets subcluster "
269+
f"{bundle_subcluster!r}, but requester is registered to "
270+
f"{req_subcluster!r}. Per-bundle cross-subcluster "
271+
f"allocation is not supported."
272+
)
240273
with self._lock:
241274
now = self._get_current_time()
242275
request_updated = False
@@ -248,6 +281,15 @@ def request_resources(
248281
)
249282
if priority.value != old_req.priority:
250283
raise ValueError("Cannot change priority of an ongoing request.")
284+
if (
285+
requester_id in self._subcluster_selectors
286+
and self._subcluster_selectors[requester_id] != subcluster_selector
287+
):
288+
raise ValueError(
289+
"Cannot change subcluster_selector of an ongoing request "
290+
f"from {self._subcluster_selectors[requester_id]!r} to "
291+
f"{subcluster_selector!r}."
292+
)
251293

252294
request_updated = (
253295
resources != old_req.requested_resources
@@ -267,6 +309,9 @@ def request_resources(
267309
expiration_time=now + expire_after_s,
268310
allocated_resources=[],
269311
)
312+
# Write subcluster after all validations so a rejected call
313+
# never leaves the registry on a new subcluster.
314+
self._subcluster_selectors[requester_id] = subcluster_selector
270315
if request_updated:
271316
# If the request has updated, immediately send
272317
# a new request and reallocate resources.
@@ -282,25 +327,38 @@ def cancel_request(
282327
if requester_id not in self._ongoing_reqs:
283328
return
284329
del self._ongoing_reqs[requester_id]
330+
self._subcluster_selectors.pop(requester_id, None)
285331
self._merge_and_send_requests()
286332
self._reallocate_resources()
287333

288334
def _purge_expired_requests(self):
289335
now = self._get_current_time()
290-
self._ongoing_reqs = {
336+
live = {
291337
requester_id: req
292338
for requester_id, req in self._ongoing_reqs.items()
293339
if req.expiration_time > now
294340
}
341+
for expired_id in self._ongoing_reqs.keys() - live.keys():
342+
self._subcluster_selectors.pop(expired_id, None)
343+
self._ongoing_reqs = live
295344

296345
def _merge_and_send_requests(self):
297-
"""Merge requests and send them to Ray Autoscaler."""
346+
"""Merge requests and send them to Ray Autoscaler.
347+
348+
Each bundle's forwarded selector is the union of its per-bundle
349+
``requested_label_selectors`` entry and the requester's
350+
``subcluster_selector``. The subcluster pin wins on key conflict,
351+
so the autoscaler always sees the correct subcluster regardless
352+
of what the per-bundle selectors contain.
353+
"""
298354
self._purge_expired_requests()
299355
merged_req: List[ResourceDict] = []
300356
merged_selectors: List[Dict[str, str]] = []
301-
for req in self._ongoing_reqs.values():
357+
for requester_id, req in self._ongoing_reqs.items():
302358
merged_req.extend(req.requested_resources)
303-
merged_selectors.extend(req.requested_label_selectors)
359+
subcluster_selector = self._subcluster_selectors.get(requester_id) or {}
360+
for per_bundle in req.requested_label_selectors:
361+
merged_selectors.append({**per_bundle, **subcluster_selector})
304362
if any(merged_selectors):
305363
self._send_resources_request(merged_req, label_selectors=merged_selectors)
306364
else:
@@ -324,7 +382,7 @@ def _maybe_subtract_resources(self, res1: ResourceDict, res2: ResourceDict) -> b
324382
return True
325383

326384
def _update_cluster_node_resources(self) -> bool:
327-
"""Update cluster's total resources. Return True if changed."""
385+
"""Update cluster resources bucketed by subcluster. Return True if changed."""
328386

329387
def _is_node_eligible(node):
330388
# Exclude dead nodes.
@@ -341,47 +399,69 @@ def _is_node_eligible(node):
341399

342400
nodes = list(filter(_is_node_eligible, self._get_cluster_nodes()))
343401
nodes = sorted(nodes, key=lambda node: node.get("NodeID", ""))
344-
cluster_node_resources = [node["Resources"] for node in nodes]
402+
cluster_node_resources: Dict[Optional[str], List[ResourceDict]] = {}
403+
for node in nodes:
404+
# Safeguard against case where the value of Labels is None.
405+
labels = node.get("Labels") or {}
406+
subcluster = labels.get(SUBCLUSTER_LABEL_KEY, DEFAULT_SUBCLUSTER)
407+
cluster_node_resources.setdefault(subcluster, []).append(node["Resources"])
345408
if cluster_node_resources == self._cluster_node_resources:
346409
return False
347-
else:
348-
logger.debug("Cluster resources updated: %s.", cluster_node_resources)
349-
self._cluster_node_resources = cluster_node_resources
350-
return True
410+
logger.debug("Cluster resources updated: %s.", cluster_node_resources)
411+
self._cluster_node_resources = cluster_node_resources
412+
return True
351413

352414
def _reallocate_resources(self):
353-
"""Reallocate cluster resources."""
415+
"""Reallocate cluster resources.
416+
417+
Each requester's subcluster comes from its ``subcluster_selector``.
418+
A requester without one is eligible only for the ``None`` bucket.
419+
"""
354420
now = self._get_current_time()
355-
cluster_node_resources = copy.deepcopy(self._cluster_node_resources)
356-
ongoing_reqs = sorted(
357-
[req for req in self._ongoing_reqs.values() if req.expiration_time >= now]
421+
cluster_node_resources: Dict[Optional[str], List[ResourceDict]] = copy.deepcopy(
422+
self._cluster_node_resources
358423
)
359-
# Allocate resources to ongoing requests.
424+
live_items = [
425+
(req_id, req)
426+
for req_id, req in self._ongoing_reqs.items()
427+
if req.expiration_time >= now
428+
]
429+
live_items.sort(key=lambda item: item[1])
430+
431+
def _subcluster_of(requester_id: str) -> Optional[str]:
432+
selector = self._subcluster_selectors.get(requester_id)
433+
return (selector or {}).get(SUBCLUSTER_LABEL_KEY, DEFAULT_SUBCLUSTER)
434+
360435
# TODO(hchen): Optimize the following triple loop.
361-
for ongoing_req in ongoing_reqs:
436+
for requester_id, ongoing_req in live_items:
362437
ongoing_req.allocated_resources = []
363-
for req in ongoing_req.requested_resources:
364-
for node_resource in cluster_node_resources:
365-
if self._maybe_subtract_resources(node_resource, req):
366-
ongoing_req.allocated_resources.append(req)
438+
subcluster = _subcluster_of(requester_id)
439+
for bundle in ongoing_req.requested_resources:
440+
for node_resource in cluster_node_resources.get(subcluster, []):
441+
if self._maybe_subtract_resources(node_resource, bundle):
442+
ongoing_req.allocated_resources.append(bundle)
367443
break
368-
# Allocate remaining resources.
369-
# NOTE: to handle the case where multiple datasets are running concurrently,
370-
# we divide remaining resources equally to all requesters with `request_remaining=True`.
371-
remaining_resource_requesters = [
372-
req for req in ongoing_reqs if req.request_remaining
444+
445+
# Allocate remaining resources. Multiple concurrent requesters in
446+
# the same subcluster split that subcluster's leftovers equally.
447+
remaining_items = [
448+
(req_id, req) for req_id, req in live_items if req.request_remaining
373449
]
374-
num_remaining_requesters = len(remaining_resource_requesters)
375-
if num_remaining_requesters > 0:
376-
for node_resource in cluster_node_resources:
377-
# Divide remaining resources equally among requesters.
378-
# NOTE: Integer division may leave some resources unallocated.
379-
divided_resource = {
380-
k: v // num_remaining_requesters for k, v in node_resource.items()
381-
}
382-
for ongoing_req in remaining_resource_requesters:
383-
if any(v > 0 for v in divided_resource.values()):
384-
ongoing_req.allocated_resources.append(divided_resource)
450+
for subcluster, node_resources in cluster_node_resources.items():
451+
eligible = [
452+
req
453+
for req_id, req in remaining_items
454+
if _subcluster_of(req_id) == subcluster
455+
]
456+
if not eligible:
457+
continue
458+
for node_resource in node_resources:
459+
# Integer division may leave some resources unallocated.
460+
divided = {k: v // len(eligible) for k, v in node_resource.items()}
461+
if not any(v > 0 for v in divided.values()):
462+
continue
463+
for r in eligible:
464+
r.allocated_resources.append(divided)
385465

386466
if logger.isEnabledFor(logging.DEBUG):
387467
msg = "Allocated resources:\n"

python/ray/data/_internal/cluster_autoscaler/default_cluster_autoscaler_v2.py

Lines changed: 45 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
import ray
99
from .base_autoscaling_coordinator import AutoscalingCoordinator, ResourceDict
1010
from .default_autoscaling_coordinator import (
11+
DEFAULT_SUBCLUSTER,
12+
SUBCLUSTER_LABEL_KEY,
1113
DefaultAutoscalingCoordinator,
1214
)
1315
from .resource_utilization_gauge import (
@@ -69,8 +71,27 @@ def to_bundle(self):
6971
return {"CPU": self.cpu, "GPU": self.gpu, "memory": self.mem}
7072

7173

72-
def _get_node_resource_spec_and_count() -> Dict[_NodeResourceSpec, int]:
73-
"""Get the unique node resource specs and their count in the cluster."""
74+
def _get_node_resource_spec_and_count(
75+
subcluster: Optional[str] = DEFAULT_SUBCLUSTER,
76+
) -> Dict[_NodeResourceSpec, int]:
77+
"""Get the unique node resource specs and their count in the cluster,
78+
scoped to a single subcluster.
79+
80+
``subcluster`` is the value at ``SUBCLUSTER_LABEL_KEY`` to match
81+
against. The default ``DEFAULT_SUBCLUSTER`` (None) selects nodes with
82+
no subcluster label.
83+
84+
Quirk: the returned dict also contains a ``node_type: 0`` (ex: `"m5.xlarge": 0`) entry for every
85+
node type registered in ``cluster_config.node_group_configs`` that
86+
isn't included in this subcluster. ``get_cluster_config()``
87+
reports node types but not labels, so the only way to know a
88+
shape's subcluster is to inspect live nodes. Harmless: for example,
89+
if m5.xlarge nodes only exist in the training subcluster, the validation
90+
dataset will emit pending-bundle scale-up demand for foo nodes
91+
stamped with the validation label, which the autoscaler can never
92+
satisfy.
93+
TODO: get labels from cluster config so the catalog can be filtered.
94+
"""
7495
nodes_resource_spec_count = defaultdict(int)
7596

7697
cluster_config = ray._private.state.state.get_cluster_config()
@@ -84,11 +105,16 @@ def _get_node_resource_spec_and_count() -> Dict[_NodeResourceSpec, int]:
84105
)
85106
nodes_resource_spec_count[node_resource_spec] = 0
86107

87-
# Filter out the head node.
108+
# Filter out the head node and nodes outside the requester's subcluster.
88109
node_resources = [
89110
node["Resources"]
90111
for node in ray.nodes()
91-
if node["Alive"] and "node:__internal_head__" not in node["Resources"]
112+
if (
113+
node["Alive"]
114+
and "node:__internal_head__" not in node["Resources"]
115+
and (node.get("Labels") or {}).get(SUBCLUSTER_LABEL_KEY, DEFAULT_SUBCLUSTER)
116+
== subcluster
117+
)
92118
]
93119

94120
for r in node_resources:
@@ -162,10 +188,9 @@ def __init__(
162188
min_gap_between_autoscaling_requests_s: float = MIN_GAP_BETWEEN_AUTOSCALING_REQUESTS, # noqa: E501
163189
low_util_request_release_delay_s: float = DEFAULT_LOW_UTIL_REQUEST_RELEASE_DELAY_S, # noqa: E501
164190
autoscaling_coordinator: Optional[AutoscalingCoordinator] = None,
165-
get_node_counts: Callable[[], Dict[_NodeResourceSpec, int]] = (
166-
_get_node_resource_spec_and_count
167-
),
191+
get_node_counts: Optional[Callable[[], Dict[_NodeResourceSpec, int]]] = None,
168192
get_time: Callable[[], float] = time.time,
193+
label_selector: Optional[Dict[str, str]] = None,
169194
):
170195
assert cluster_scaling_up_delta > 0
171196
assert cluster_util_avg_window_s > 0
@@ -180,6 +205,7 @@ def __init__(
180205
)
181206

182207
self._resource_limits = resource_limits
208+
self._label_selector = label_selector or {}
183209
self._resource_utilization_calculator = resource_utilization_calculator
184210
# Threshold of cluster utilization to trigger scaling up.
185211
self._cluster_scaling_up_util_threshold = cluster_scaling_up_util_threshold
@@ -200,9 +226,20 @@ def __init__(
200226
self._requester_id = f"data-{execution_id}"
201227
if autoscaling_coordinator is None:
202228
autoscaling_coordinator = DefaultAutoscalingCoordinator(
203-
requester_id=self._requester_id
229+
requester_id=self._requester_id,
230+
subcluster_selector=label_selector,
204231
)
205232
self._autoscaling_coordinator = autoscaling_coordinator
233+
if get_node_counts is None:
234+
# Scope node-shape/count discovery to this requester's subcluster
235+
# so try_trigger_scaling doesn't pull node shapes / counts from
236+
# other subclusters into ``active_bundles`` / ``pending_bundles``.
237+
subcluster = self._label_selector.get(
238+
SUBCLUSTER_LABEL_KEY, DEFAULT_SUBCLUSTER
239+
)
240+
get_node_counts = lambda: _get_node_resource_spec_and_count( # noqa: E731
241+
subcluster=subcluster
242+
)
206243
self._get_node_counts = get_node_counts
207244
self._get_time = get_time
208245
self._autoscaling_enabled = is_autoscaling_enabled()

python/ray/data/_internal/cluster_autoscaler/fake_autoscaling_coordinator.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ def request_resources(
5151
request_remaining: bool = False,
5252
priority: ResourceRequestPriority = ResourceRequestPriority.MEDIUM,
5353
label_selectors: Optional[List[Dict[str, str]]] = None,
54+
subcluster_selector: Optional[Dict[str, str]] = None,
5455
) -> None:
5556
if priority != ResourceRequestPriority.MEDIUM:
5657
raise NotImplementedError(

0 commit comments

Comments
 (0)