Skip to content

Commit 48a6e7f

Browse files
[Data] Fix resource reservation by excluding completed operators' usages (#56319)
<!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Problem The `ReservationOpResourceAllocator` was incorrectly accounting for resource usage when calculating available resources for reservation. Specifically, it wasn't properly handling completed operators who have blocks in the output queue. The `ReadFiles` operator below consumes 50 GB of object store memory and should be excluded from reservation, but it is currently not. <img width="1628" height="281" alt="image" src="https://github.com/user-attachments/assets/8a80902d-7f88-4263-bc97-a3dee519b401" /> ## Solution Added logic to identify and subtract resource usage specifically from completed physical operators: ## Testing results Before the fix <img width="958" height="653" alt="image" src="https://github.com/user-attachments/assets/432dc94e-bbe1-4ecb-b1c3-a6e201da724a" /> After the fix <img width="1567" height="700" alt="image" src="https://github.com/user-attachments/assets/2b780d68-a208-4c6b-a250-dee0823e9083" /> ## Checks - [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [x] Unit tests - [ ] Release tests - [ ] This PR is not tested :( --------- Signed-off-by: xgui <xgui@anyscale.com> Signed-off-by: Xinyuan <43737116+xinyuangui2@users.noreply.github.com> Co-authored-by: Alexey Kudinkin <alexey.kudinkin@gmail.com>
1 parent f4dc12e commit 48a6e7f

File tree

2 files changed

+393
-2
lines changed

2 files changed

+393
-2
lines changed

python/ray/data/_internal/execution/resource_manager.py

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -493,18 +493,56 @@ def _get_eligible_ops(self) -> List[PhysicalOperator]:
493493
op for op in self._resource_manager._topology if self._is_op_eligible(op)
494494
]
495495

496+
def _get_ineligible_ops_with_usage(self) -> List[PhysicalOperator]:
497+
"""
498+
Resource reservation is based on the number of eligible operators.
499+
However, there might be completed operators that still have blocks in their output queue, which we need to exclude them from the reservation.
500+
And we also need to exclude the downstream ineligible operators.
501+
502+
E.g., for the following pipeline:
503+
```
504+
map1 (completed, but still has blocks in its output queue) -> limit1 (ineligible, not completed) -> map2 (not completed) -> limit2 -> map3
505+
```
506+
507+
The reservation is based on the number of eligible operators (map2 and map3), but we need to exclude map1 and limit1 from the reservation.
508+
"""
509+
last_completed_ops = []
510+
ops_to_exclude_from_reservation = []
511+
# Traverse operator tree collecting all operators that have already finished
512+
for op in self._resource_manager._topology:
513+
if not op.execution_finished():
514+
for dep in op.input_dependencies:
515+
if dep.execution_finished():
516+
last_completed_ops.append(dep)
517+
518+
# In addition to completed operators,
519+
# filter out downstream ineligible operators since they are omitted from reservation calculations.
520+
for op in last_completed_ops:
521+
ops_to_exclude_from_reservation.extend(
522+
list(self._get_downstream_ineligible_ops(op))
523+
)
524+
ops_to_exclude_from_reservation.append(op)
525+
return list(set(ops_to_exclude_from_reservation))
526+
496527
def _update_reservation(self):
497-
global_limits = self._resource_manager.get_global_limits()
528+
global_limits = self._resource_manager.get_global_limits().copy()
498529
eligible_ops = self._get_eligible_ops()
499530

500531
self._op_reserved.clear()
501532
self._reserved_for_op_outputs.clear()
502533
self._reserved_min_resources.clear()
503-
remaining = global_limits.copy()
504534

505535
if len(eligible_ops) == 0:
506536
return
507537

538+
op_to_exclude_from_reservation = self._get_ineligible_ops_with_usage()
539+
for completed_op in op_to_exclude_from_reservation:
540+
global_limits = global_limits.subtract(
541+
self._resource_manager.get_op_usage(completed_op)
542+
)
543+
global_limits = global_limits.max(ExecutionResources.zero())
544+
remaining = global_limits.copy()
545+
508546
# Reserve `reservation_ratio * global_limits / num_ops` resources for each
509547
# operator.
510548
default_reserved = global_limits.scale(

0 commit comments

Comments
 (0)