Skip to content

Commit 7eb1a12

Browse files
bveeramaniiamjustinhsu
authored andcommitted
[Data] Add PhysicalOperator.min_max_resource_usage_bounds (#52502)
<!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Why are these changes needed? <!-- Please give a short summary of the change and the problem this solves. --> This change is necessary to ensure we don't over-reserve resources for operators. For example: * If an actor-pool only uses GPU resources, we don't need to reserve CPU resources for it. * If a task-pool has finished receiving inputs and launching tasks, we don't ned to reserve more resources than required for the currently active tasks. ## Related issue number <!-- For example: "Closes #1234" --> ## Checks - [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( --------- Signed-off-by: Balaji Veeramani <[email protected]> Signed-off-by: jhsu <[email protected]>
1 parent c23e436 commit 7eb1a12

13 files changed

+239
-125
lines changed

python/ray/data/BUILD

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -590,6 +590,20 @@ py_test(
590590
],
591591
)
592592

593+
py_test(
594+
name = "test_task_pool_map_operator",
595+
size = "small",
596+
srcs = ["tests/test_task_pool_map_operator.py"],
597+
tags = [
598+
"exclusive",
599+
"team:data",
600+
],
601+
deps = [
602+
":conftest",
603+
"//:ray_lib",
604+
],
605+
)
606+
593607
py_test(
594608
name = "test_tensor",
595609
size = "small",

python/ray/data/_internal/execution/interfaces/execution_options.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
import os
22
from typing import Dict, List, Optional, Union
33

4-
from .common import NodeIdStr
54
from ray.data._internal.execution.util import memory_string
65
from ray.util.annotations import DeveloperAPI
76

7+
from .common import NodeIdStr
8+
89

910
class ExecutionResources:
1011
"""Specifies resources usage or resource limits for execution.
@@ -136,6 +137,11 @@ def zero(cls) -> "ExecutionResources":
136137
"""Returns an ExecutionResources object with zero resources."""
137138
return ExecutionResources(0.0, 0.0, 0.0, 0.0)
138139

140+
@classmethod
141+
def inf(cls) -> "ExecutionResources":
142+
"""Returns an ExecutionResources object with infinite resources."""
143+
return ExecutionResources.for_limits()
144+
139145
def is_zero(self) -> bool:
140146
"""Returns True if all resources are zero."""
141147
return (

python/ray/data/_internal/execution/interfaces/physical_operator.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
import logging
2+
import uuid
23
from abc import ABC, abstractmethod
34
from typing import Any, Callable, Dict, Iterator, List, Optional, Tuple, Union
4-
import uuid
55

66
import ray
7-
from .ref_bundle import RefBundle
87
from ray._raylet import ObjectRefGenerator
98
from ray.data._internal.execution.autoscaler.autoscaling_actor_pool import (
109
AutoscalingActorPool,
@@ -15,10 +14,11 @@
1514
)
1615
from ray.data._internal.execution.interfaces.op_runtime_metrics import OpRuntimeMetrics
1716
from ray.data._internal.logical.interfaces import LogicalOperator, Operator
17+
from ray.data._internal.output_buffer import OutputBlockSizeOption
1818
from ray.data._internal.stats import StatsDict, Timer
1919
from ray.data.context import DataContext
20-
from ray.data._internal.output_buffer import OutputBlockSizeOption
2120

21+
from .ref_bundle import RefBundle
2222

2323
logger = logging.getLogger(__name__)
2424

@@ -542,13 +542,18 @@ def pending_processor_usage(self) -> ExecutionResources:
542542
"""
543543
return ExecutionResources(0, 0, 0)
544544

545-
def base_resource_usage(self) -> ExecutionResources:
546-
"""Returns the minimum amount of resources required for execution.
545+
def min_max_resource_requirements(
546+
self,
547+
) -> Tuple[ExecutionResources, ExecutionResources]:
548+
"""Returns the min and max resources to start the operator and make progress.
547549
548550
For example, an operator that creates an actor pool requiring 8 GPUs could
549-
return ExecutionResources(gpu=8) as its base usage.
551+
return ExecutionResources(gpu=8) as its minimum usage.
552+
553+
This method is used by the resource manager to reserve minimum resources and to
554+
ensure that it doesn't over-provision resources.
550555
"""
551-
return ExecutionResources()
556+
return ExecutionResources.zero(), ExecutionResources.inf()
552557

553558
def incremental_resource_usage(self) -> ExecutionResources:
554559
"""Returns the incremental resources required for processing another input.

python/ray/data/_internal/execution/operators/actor_pool_map_operator.py

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -302,13 +302,28 @@ def progress_str(self) -> str:
302302
)
303303
return "[locality off]"
304304

305-
def base_resource_usage(self) -> ExecutionResources:
306-
min_workers = self._actor_pool.min_size()
307-
return ExecutionResources(
308-
cpu=self._ray_remote_args.get("num_cpus", 0) * min_workers,
309-
gpu=self._ray_remote_args.get("num_gpus", 0) * min_workers,
305+
def min_max_resource_requirements(
306+
self,
307+
) -> Tuple[ExecutionResources, ExecutionResources]:
308+
min_actors = self._actor_pool.min_size()
309+
assert min_actors is not None, min_actors
310+
311+
num_cpus_per_actor = self._ray_remote_args.get("num_cpus", 0)
312+
num_gpus_per_actor = self._ray_remote_args.get("num_gpus", 0)
313+
memory_per_actor = self._ray_remote_args.get("memory", 0)
314+
315+
min_resource_usage = ExecutionResources(
316+
cpu=num_cpus_per_actor * min_actors,
317+
gpu=num_gpus_per_actor * min_actors,
318+
memory=memory_per_actor * min_actors,
319+
# To ensure that all actors are utilized, reserve enough resource budget
320+
# to launch one task for each worker.
321+
object_store_memory=self._metrics.obj_store_mem_max_pending_output_per_task
322+
* min_actors,
310323
)
311324

325+
return min_resource_usage, ExecutionResources.for_limits()
326+
312327
def current_processor_usage(self) -> ExecutionResources:
313328
# Both pending and running actors count towards our current resource usage.
314329
num_active_workers = self._actor_pool.current_size()

python/ray/data/_internal/execution/operators/map_operator.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,14 +44,14 @@
4444
ApplyAdditionalSplitToOutputBlocks,
4545
MapTransformer,
4646
)
47-
from ray.data._internal.util import MemoryProfiler
4847
from ray.data._internal.execution.util import memory_string
4948
from ray.data._internal.stats import StatsDict
49+
from ray.data._internal.util import MemoryProfiler
5050
from ray.data.block import (
5151
Block,
5252
BlockAccessor,
53-
BlockMetadata,
5453
BlockExecStats,
54+
BlockMetadata,
5555
BlockStats,
5656
to_stats,
5757
)
@@ -489,8 +489,10 @@ def pending_processor_usage(self) -> ExecutionResources:
489489
raise NotImplementedError
490490

491491
@abstractmethod
492-
def base_resource_usage(self) -> ExecutionResources:
493-
raise NotImplementedError
492+
def min_max_resource_requirements(
493+
self,
494+
) -> Tuple[ExecutionResources, ExecutionResources]:
495+
...
494496

495497
@abstractmethod
496498
def incremental_resource_usage(self) -> ExecutionResources:
@@ -739,6 +741,8 @@ def _canonicalize_ray_remote_args(ray_remote_args: Dict[str, Any]) -> Dict[str,
739741
"""
740742
ray_remote_args = ray_remote_args.copy()
741743

744+
# TODO: Might be better to log this warning at composition-time rather than at
745+
# execution. Validating inputs early is a good practice.
742746
if ray_remote_args.get("num_cpus") and ray_remote_args.get("num_gpus"):
743747
logger.warning(
744748
"Specifying both num_cpus and num_gpus for map tasks is experimental, "

python/ray/data/_internal/execution/operators/task_pool_map_operator.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from typing import Any, Callable, Dict, Optional
1+
from typing import Any, Callable, Dict, Optional, Tuple
22

33
from ray.data._internal.execution.interfaces import (
44
ExecutionResources,
@@ -110,8 +110,10 @@ def _add_bundled_input(self, bundle: RefBundle):
110110
def progress_str(self) -> str:
111111
return ""
112112

113-
def base_resource_usage(self) -> ExecutionResources:
114-
return ExecutionResources()
113+
def min_max_resource_requirements(
114+
self,
115+
) -> Tuple[ExecutionResources, ExecutionResources]:
116+
return self.incremental_resource_usage(), ExecutionResources.for_limits()
115117

116118
def current_processor_usage(self) -> ExecutionResources:
117119
num_active_workers = self.num_active_tasks()
@@ -127,6 +129,7 @@ def incremental_resource_usage(self) -> ExecutionResources:
127129
return ExecutionResources(
128130
cpu=self._ray_remote_args.get("num_cpus", 0),
129131
gpu=self._ray_remote_args.get("num_gpus", 0),
132+
memory=self._ray_remote_args.get("memory", 0),
130133
object_store_memory=self._metrics.obj_store_mem_max_pending_output_per_task
131134
or 0,
132135
)

python/ray/data/_internal/execution/resource_manager.py

Lines changed: 22 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,7 @@
2020
from ray.data.context import DataContext
2121

2222
if TYPE_CHECKING:
23-
from ray.data._internal.execution.streaming_executor_state import OpState
24-
from ray.data._internal.execution.streaming_executor_state import Topology
23+
from ray.data._internal.execution.streaming_executor_state import OpState, Topology
2524

2625

2726
logger = logging.getLogger(__name__)
@@ -419,9 +418,6 @@ def __init__(self, resource_manager: ResourceManager, reservation_ratio: float):
419418
# See `test_no_deadlock_on_small_cluster_resources` as an example.
420419
self._reserved_min_resources: Dict[PhysicalOperator, bool] = {}
421420

422-
self._cached_global_limits = ExecutionResources.zero()
423-
self._cached_num_eligible_ops = 0
424-
425421
self._idle_detector = self.IdleDetector()
426422

427423
def _is_op_eligible(self, op: PhysicalOperator) -> bool:
@@ -442,14 +438,6 @@ def _update_reservation(self):
442438
global_limits = self._resource_manager.get_global_limits()
443439
eligible_ops = self._get_eligible_ops()
444440

445-
if (
446-
global_limits == self._cached_global_limits
447-
and len(eligible_ops) == self._cached_num_eligible_ops
448-
):
449-
return
450-
self._cached_global_limits = global_limits
451-
self._cached_num_eligible_ops = len(eligible_ops)
452-
453441
self._op_reserved.clear()
454442
self._reserved_for_op_outputs.clear()
455443
self._reserved_min_resources.clear()
@@ -467,38 +455,23 @@ def _update_reservation(self):
467455
# Reserve at least half of the default reserved resources for the outputs.
468456
# This makes sure that we will have enough budget to pull blocks from the
469457
# op.
470-
self._reserved_for_op_outputs[op] = max(
471-
default_reserved.object_store_memory / 2, 1.0
458+
reserved_for_outputs = ExecutionResources(
459+
0, 0, max(default_reserved.object_store_memory / 2, 1)
472460
)
473-
# Calculate the minimum amount of resources to reserve.
474-
# 1. Make sure the reserved resources are at least to allow one task.
475-
min_reserved = op.incremental_resource_usage().copy()
476-
# 2. To ensure that all GPUs are utilized, reserve enough resource budget
477-
# to launch one task for each worker.
478-
if op.base_resource_usage().gpu > 0:
479-
min_workers = sum(
480-
pool.min_size() for pool in op.get_autoscaling_actor_pools()
481-
)
482-
min_reserved.object_store_memory *= min_workers
483-
# Also include `reserved_for_op_outputs`.
484-
min_reserved.object_store_memory += self._reserved_for_op_outputs[op]
485-
# Total resources we want to reserve for this operator.
486-
op_total_reserved = default_reserved.max(min_reserved)
487-
488-
# Check if the remaining resources are enough for op_total_reserved.
489-
# Note, we only consider CPU and GPU, but not object_store_memory,
490-
# because object_store_memory can be oversubscribed, but CPU/GPU cannot.
491-
if op_total_reserved.satisfies_limit(
461+
462+
min_resource_usage, max_resource_usage = op.min_max_resource_requirements()
463+
reserved_for_tasks = default_reserved.subtract(reserved_for_outputs)
464+
reserved_for_tasks = reserved_for_tasks.max(min_resource_usage)
465+
reserved_for_tasks = reserved_for_tasks.min(max_resource_usage)
466+
467+
# Check if the remaining resources are enough for both reserved_for_tasks
468+
# and reserved_for_outputs. Note, we only consider CPU and GPU, but not
469+
# object_store_memory, because object_store_memory can be oversubscribed,
470+
# but CPU/GPU cannot.
471+
if reserved_for_tasks.add(reserved_for_outputs).satisfies_limit(
492472
remaining, ignore_object_store_memory=True
493473
):
494-
# If the remaining resources are enough to reserve `op_total_reserved`,
495-
# subtract it from the remaining and reserve it for this op.
496474
self._reserved_min_resources[op] = True
497-
remaining = remaining.subtract(op_total_reserved)
498-
self._op_reserved[op] = op_total_reserved
499-
self._op_reserved[
500-
op
501-
].object_store_memory -= self._reserved_for_op_outputs[op]
502475
else:
503476
# If the remaining resources are not enough to reserve the minimum
504477
# resources for this operator, we'll only reserve the minimum object
@@ -508,14 +481,8 @@ def _update_reservation(self):
508481
# ops. It's fine that downstream ops don't get the minimum reservation,
509482
# because they can wait for upstream ops to finish and release resources.
510483
self._reserved_min_resources[op] = False
511-
self._op_reserved[op] = ExecutionResources(
512-
0,
513-
0,
514-
min_reserved.object_store_memory
515-
- self._reserved_for_op_outputs[op],
516-
)
517-
remaining = remaining.subtract(
518-
ExecutionResources(0, 0, min_reserved.object_store_memory)
484+
reserved_for_tasks = ExecutionResources(
485+
0, 0, min_resource_usage.object_store_memory
519486
)
520487
if index == 0:
521488
# Log a warning if even the first operator cannot reserve
@@ -525,7 +492,13 @@ def _update_reservation(self):
525492
" The job may hang forever unless the cluster scales up."
526493
)
527494

495+
self._op_reserved[op] = reserved_for_tasks
496+
self._reserved_for_op_outputs[op] = reserved_for_outputs.object_store_memory
497+
498+
op_total_reserved = reserved_for_tasks.add(reserved_for_outputs)
499+
remaining = remaining.subtract(op_total_reserved)
528500
remaining = remaining.max(ExecutionResources.zero())
501+
529502
self._total_shared = remaining
530503

531504
def can_submit_new_task(self, op: PhysicalOperator) -> bool:

python/ray/data/_internal/execution/streaming_executor.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,10 @@
3131
register_dataset_logger,
3232
unregister_dataset_logger,
3333
)
34+
from ray.data._internal.metadata_exporter import Topology as TopologyMetadata
3435
from ray.data._internal.progress_bar import ProgressBar
35-
from ray.data._internal.stats import DatasetStats, StatsManager, DatasetState, Timer
36+
from ray.data._internal.stats import DatasetState, DatasetStats, StatsManager, Timer
3637
from ray.data.context import OK_PREFIX, WARN_PREFIX, DataContext
37-
from ray.data._internal.metadata_exporter import Topology as TopologyMetadata
3838

3939
logger = logging.getLogger(__name__)
4040

@@ -491,7 +491,8 @@ def walk(op):
491491

492492
base_usage = ExecutionResources(cpu=1)
493493
for op in walk(dag):
494-
base_usage = base_usage.add(op.base_resource_usage())
494+
min_resource_usage, _ = op.min_max_resource_requirements()
495+
base_usage = base_usage.add(min_resource_usage)
495496

496497
if not base_usage.satisfies_limit(limits):
497498
error_message = (

python/ray/data/tests/test_actor_pool_map_operator.py

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import threading
44
import unittest
55
from typing import Any, Optional, Tuple
6+
from unittest.mock import MagicMock
67

78
import pytest
89

@@ -11,7 +12,11 @@
1112
from ray.actor import ActorHandle
1213
from ray.data._internal.compute import ActorPoolStrategy
1314
from ray.data._internal.execution.interfaces import ExecutionResources
14-
from ray.data._internal.execution.operators.actor_pool_map_operator import _ActorPool
15+
from ray.data._internal.execution.operators.actor_pool_map_operator import (
16+
ActorPoolMapOperator,
17+
_ActorPool,
18+
)
19+
from ray.data._internal.execution.operators.input_data_buffer import InputDataBuffer
1520
from ray.data._internal.execution.util import make_ref_bundles
1621
from ray.tests.conftest import * # noqa
1722
from ray.types import ObjectRef
@@ -450,7 +455,33 @@ def test_locality_manager_busyness_ranking(self):
450455
assert res3 is None
451456

452457

453-
def test_start_actor_timeout(ray_start_regular, restore_data_context):
458+
def test_min_max_resource_requirements(restore_data_context):
459+
data_context = ray.data.DataContext.get_current()
460+
op = ActorPoolMapOperator(
461+
map_transformer=MagicMock(),
462+
input_op=InputDataBuffer(data_context, input_data=MagicMock()),
463+
data_context=data_context,
464+
target_max_block_size=None,
465+
compute_strategy=ray.data.ActorPoolStrategy(
466+
min_size=1,
467+
max_size=2,
468+
),
469+
ray_remote_args={"num_cpus": 1},
470+
)
471+
op._metrics = MagicMock(obj_store_mem_max_pending_output_per_task=3)
472+
473+
(
474+
min_resource_usage_bound,
475+
max_resource_usage_bound,
476+
) = op.min_max_resource_requirements()
477+
478+
assert (
479+
min_resource_usage_bound == ExecutionResources(cpu=1, object_store_memory=3)
480+
and max_resource_usage_bound == ExecutionResources.for_limits()
481+
)
482+
483+
484+
def test_start_actor_timeout(ray_start_regular_shared, restore_data_context):
454485
"""Tests that ActorPoolMapOperator raises an exception on
455486
timeout while waiting for actors."""
456487

@@ -482,6 +513,8 @@ def __call__(self, x):
482513
def test_actor_pool_fault_tolerance_e2e(ray_start_cluster, restore_data_context):
483514
"""Test that a dataset with actor pools can finish, when
484515
all nodes in the cluster are removed and added back."""
516+
ray.shutdown()
517+
485518
cluster = ray_start_cluster
486519
cluster.add_node(num_cpus=0)
487520
ray.init()

0 commit comments

Comments
 (0)