Skip to content

Conversation

@bveeramani
Copy link
Member

@bveeramani bveeramani commented Apr 21, 2025

Why are these changes needed?

This change is necessary to ensure we don't over-reserve resources for operators. For example:

  • If an actor-pool only uses GPU resources, we don't need to reserve CPU resources for it.
  • If a task-pool has finished receiving inputs and launching tasks, we don't ned to reserve more resources than required for the currently active tasks.

Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

@bveeramani bveeramani requested a review from a team as a code owner April 21, 2025 17:18
@bveeramani bveeramani marked this pull request as draft April 21, 2025 17:18
@bveeramani bveeramani changed the title [Data] Add PhysicalOperator.max_resource_usage [Data] Add PhysicalOperator.min_max_resource_usage_bounds Apr 22, 2025
@bveeramani bveeramani marked this pull request as ready for review April 22, 2025 03:02
@bveeramani bveeramani added the go add ONLY when ready to merge, run all tests label Apr 22, 2025
def min_max_resource_usage_bounds(
self,
) -> Tuple[ExecutionResources, ExecutionResources]:
"""Returns the min and max resources to start the operator and make progress.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's expand that these are derived from operator's concurrency configuration multiplied by single task/actor resource requirements.


def base_resource_usage(self) -> ExecutionResources:
"""Returns the minimum amount of resources required for execution.
def min_max_resource_usage_bounds(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the back and forth, but on a second thought i've realized that min_max_resource_requirements might be a better fit here (will defer to you to decide)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

min_max_resource_requirements sounds slightly better to me as well

"""Returns the minimum amount of resources required for execution.
def min_max_resource_usage_bounds(
self,
) -> Tuple[ExecutionResources, ExecutionResources]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be optional

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How would we handle None in the resource manager? Would it be equivalent to the default implementation?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it'd default to not knowing resource reqs

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, i see that you're defaulting to [0, inf) that's fine too

Comment on lines 492 to 495
def min_max_resource_usage_bounds(
self,
) -> Tuple[ExecutionResources, ExecutionResources]:
raise NotImplementedError
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make sure there's a default implementation (so that it doesn't break any custom operators)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the map op. there will only be 2 sub classes: task/actor pool map ops.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my other comment:

#52502 (comment)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Responded in the thread for #52502 (comment)

Comment on lines 118 to 148
def _min_resource_usage(self) -> ExecutionResources:
# Make sure the reserved resources are at least to allow one task.
return self.incremental_resource_usage()

def _max_resource_usage(self) -> ExecutionResources:
if self._inputs_complete:
# If the operator has already received all input data, we know it won't
# launch more tasks. So, we only need to reserve resources for the tasks
# that are currently running.
num_cpus_per_task = self._ray_remote_args.get("num_cpus", 0)
num_gpus_per_task = self._ray_remote_args.get("num_gpus", 0)
object_store_memory_per_task = (
self._metrics.obj_store_mem_max_pending_output_per_task or 0
)
resources = ExecutionResources.for_limits(
cpu=num_cpus_per_task * self.num_active_tasks(),
gpu=num_gpus_per_task * self.num_active_tasks(),
object_store_memory=object_store_memory_per_task
* self.num_active_tasks(),
)
else:
resources = ExecutionResources.for_limits()

return resources
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's generalize this to be base method in PhysicalOperator

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is a sensible default for PhysicalOperator. The implementation makes sense for TaskPoolOperator, but I don't think we can assume it makes sense for all PhysicalOperator subclasses (e.g., the current all-to-all operator implementation)

Comment on lines 473 to 465
min_resource_usage, max_resource_usage = op.min_max_resource_usage_bounds()
reserved_for_tasks = default_reserved.subtract(reserved_for_outputs)
reserved_for_tasks = reserved_for_tasks.max(min_resource_usage)
reserved_for_tasks = reserved_for_tasks.min(max_resource_usage)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is much, much cleaner!


def base_resource_usage(self) -> ExecutionResources:
"""Returns the minimum amount of resources required for execution.
def min_max_resource_usage_bounds(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

min_max_resource_requirements sounds slightly better to me as well

Comment on lines 492 to 495
def min_max_resource_usage_bounds(
self,
) -> Tuple[ExecutionResources, ExecutionResources]:
raise NotImplementedError
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the map op. there will only be 2 sub classes: task/actor pool map ops.

@bveeramani bveeramani force-pushed the max-resource-usage2 branch from 0d4b23c to 8edacd4 Compare April 29, 2025 03:30
Signed-off-by: Balaji Veeramani <[email protected]>

Add limit

Signed-off-by: Balaji Veeramani <[email protected]>

Update files

Signed-off-by: Balaji Veeramani <[email protected]>

Appease lint

Signed-off-by: Balaji Veeramani <[email protected]>

Fix test

Signed-off-by: Balaji Veeramani <[email protected]>

Address review comments

Signed-off-by: Balaji Veeramani <[email protected]>

Address review comments

Signed-off-by: Balaji Veeramani <[email protected]>

Update stuff

Signed-off-by: Balaji Veeramani <[email protected]>
@bveeramani bveeramani force-pushed the max-resource-usage2 branch from 6e9754f to 0babe2d Compare April 29, 2025 04:07
* min_actors,
)

return min_resource_usage, ExecutionResources.for_limits()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return min_resource_usage, ExecutionResources.for_limits()
return min_resource_usage, ExecutionResources.inf()

"""Returns the minimum amount of resources required for execution.
def min_max_resource_usage_bounds(
self,
) -> Tuple[ExecutionResources, ExecutionResources]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it'd default to not knowing resource reqs

"""Returns the minimum amount of resources required for execution.
def min_max_resource_usage_bounds(
self,
) -> Tuple[ExecutionResources, ExecutionResources]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, i see that you're defaulting to [0, inf) that's fine too

Signed-off-by: Balaji Veeramani <[email protected]>
Signed-off-by: Balaji Veeramani <[email protected]>
Signed-off-by: Balaji Veeramani <[email protected]>
Signed-off-by: Balaji Veeramani <[email protected]>
@bveeramani bveeramani enabled auto-merge (squash) May 1, 2025 14:35
Signed-off-by: Balaji Veeramani <[email protected]>
@github-actions github-actions bot disabled auto-merge May 1, 2025 15:50
@bveeramani bveeramani enabled auto-merge (squash) May 1, 2025 16:32
@bveeramani bveeramani merged commit c664886 into master May 1, 2025
6 checks passed
@bveeramani bveeramani deleted the max-resource-usage2 branch May 1, 2025 17:03
iamjustinhsu pushed a commit that referenced this pull request May 3, 2025
<!-- Thank you for your contribution! Please review
https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before
opening a pull request. -->

<!-- Please add a reviewer to the assignee section when you create a PR.
If you don't have the access to it, we will shortly find a reviewer and
assign them to your PR. -->

## Why are these changes needed?

<!-- Please give a short summary of the change and the problem this
solves. -->

This change is necessary to ensure we don't over-reserve resources for
operators. For example:
* If an actor-pool only uses GPU resources, we don't need to reserve CPU
resources for it.
* If a task-pool has finished receiving inputs and launching tasks, we
don't ned to reserve more resources than required for the currently
active tasks.

## Related issue number

<!-- For example: "Closes #1234" -->

## Checks

- [ ] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [ ] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

---------

Signed-off-by: Balaji Veeramani <[email protected]>
Signed-off-by: jhsu <[email protected]>
raulchen added a commit that referenced this pull request May 16, 2025
#52502 removed the cache in
`update_reservation`.
now the low-resource warning will spam the console when cluster
resources are low.
Add a `log_once` check to fix this.

---------

Signed-off-by: Hao Chen <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-backlog go add ONLY when ready to merge, run all tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants