[Data] Fix ineligible ops object store memory accounting#60815
[Data] Fix ineligible ops object store memory accounting#60815slfan1989 wants to merge 2 commits intoray-project:masterfrom
Conversation
This commit fixes the TODO at resource_manager.py:178 where ineligible operators' downstream object store memory was not included in resource usage calculations. Key changes: - Enhanced _get_downstream_ineligible_ops_usage() to collect all downstream ops' object store memory - Added _collect_downstream_ops() helper for DFS traversal - Added test case for ineligible ops with downstream memory Signed-off-by: slfan1989 <slfan1989@apache.org>
There was a problem hiding this comment.
Code Review
This pull request effectively addresses a memory accounting issue for ineligible operators by correctly including the object store memory of all downstream operators. The logic is sound, and the new _collect_downstream_ops helper provides a clean way to traverse the operator graph. The changes are well-supported by a comprehensive set of new tests covering various topological scenarios. I have one suggestion to improve code readability.
| usage = reduce( | ||
| lambda x, y: x.add(y), | ||
| [self.get_op_usage(op) for op in self._get_downstream_ineligible_ops(op)], | ||
| [self._op_usages[op] for op in ineligible_ops], | ||
| ExecutionResources.zero(), | ||
| ) |
There was a problem hiding this comment.
For better readability and performance, consider replacing reduce with sum over generator expressions for each resource type. This approach is generally more Pythonic and easier to understand than using reduce with a lambda function.
| usage = reduce( | |
| lambda x, y: x.add(y), | |
| [self.get_op_usage(op) for op in self._get_downstream_ineligible_ops(op)], | |
| [self._op_usages[op] for op in ineligible_ops], | |
| ExecutionResources.zero(), | |
| ) | |
| usage = ExecutionResources( | |
| cpu=sum(self._op_usages[op].cpu for op in ineligible_ops), | |
| gpu=sum(self._op_usages[op].gpu for op in ineligible_ops), | |
| object_store_memory=sum( | |
| self._op_usages[op].object_store_memory for op in ineligible_ops | |
| ), | |
| ) |
Description
This PR fixes the TODO at
resource_manager.py:178where object store memory usage for ineligible operators was not correctly accounted for in resource management calculations.Problem: Ineligible operators (e.g.,
LimitOperatorwiththrottling_disabled=True) don't participate in resource throttling, but they hold memory and cause downstream memory accumulation. The previous implementation only counted the ineligible ops' own object store memory, missing all downstream ops' memory usage. This could lead to memory pressure and OOM issues when ineligible ops have long downstream chains.Solution: Enhanced
_get_downstream_ineligible_ops_usage()to include all downstream operators' object store memory when computing resource usage for ineligible operators. This correctly attributes downstream memory pressure to upstream operators.Related issues
Related to TODO at
python/ray/data/_internal/execution/resource_manager.py:178Additional information
Implementation Details
Changes to resource_manager.py:
Added _collect_downstream_ops() helper