Skip to content

[Data] Fix ineligible ops object store memory accounting#60815

Open
slfan1989 wants to merge 2 commits intoray-project:masterfrom
slfan1989:data/fix-ineligible-ops-object-store-memory
Open

[Data] Fix ineligible ops object store memory accounting#60815
slfan1989 wants to merge 2 commits intoray-project:masterfrom
slfan1989:data/fix-ineligible-ops-object-store-memory

Conversation

@slfan1989
Copy link
Contributor

@slfan1989 slfan1989 commented Feb 7, 2026

Description

This PR fixes the TODO at resource_manager.py:178 where object store memory usage for ineligible operators was not correctly accounted for in resource management calculations.

  • Problem: Ineligible operators (e.g., LimitOperator with throttling_disabled=True) don't participate in resource throttling, but they hold memory and cause downstream memory accumulation. The previous implementation only counted the ineligible ops' own object store memory, missing all downstream ops' memory usage. This could lead to memory pressure and OOM issues when ineligible ops have long downstream chains.

  • Solution: Enhanced _get_downstream_ineligible_ops_usage() to include all downstream operators' object store memory when computing resource usage for ineligible operators. This correctly attributes downstream memory pressure to upstream operators.

Related issues

Related to TODO at python/ray/data/_internal/execution/resource_manager.py:178

Additional information

Implementation Details

Changes to resource_manager.py:

  • Enhanced _get_downstream_ineligible_ops_usage()
  • Collects full resource usage (CPU/GPU/memory) from ineligible ops
  • Additionally collects object store memory from ALL downstream ops (both eligible and ineligible)
  • Avoids double-counting by excluding ineligible ops from the additional object store memory sum
  • Returns ExecutionResources where CPU/GPU reflect only ineligible ops, but object_store_memory includes the entire downstream chain

Added _collect_downstream_ops() helper

  • Performs DFS traversal to collect all reachable downstream operators
  • Uses visited set to handle potential cycles or shared dependencies
  • Time complexity: O(V+E), Space complexity: O(V)

This commit fixes the TODO at resource_manager.py:178 where ineligible
operators' downstream object store memory was not included in resource
usage calculations.

Key changes:
- Enhanced _get_downstream_ineligible_ops_usage() to collect all
  downstream ops' object store memory
- Added _collect_downstream_ops() helper for DFS traversal
- Added test case for ineligible ops with downstream memory

Signed-off-by: slfan1989 <slfan1989@apache.org>
@slfan1989 slfan1989 requested a review from a team as a code owner February 7, 2026 00:38
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request effectively addresses a memory accounting issue for ineligible operators by correctly including the object store memory of all downstream operators. The logic is sound, and the new _collect_downstream_ops helper provides a clean way to traverse the operator graph. The changes are well-supported by a comprehensive set of new tests covering various topological scenarios. I have one suggestion to improve code readability.

Comment on lines +336 to 340
usage = reduce(
lambda x, y: x.add(y),
[self.get_op_usage(op) for op in self._get_downstream_ineligible_ops(op)],
[self._op_usages[op] for op in ineligible_ops],
ExecutionResources.zero(),
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

For better readability and performance, consider replacing reduce with sum over generator expressions for each resource type. This approach is generally more Pythonic and easier to understand than using reduce with a lambda function.

Suggested change
usage = reduce(
lambda x, y: x.add(y),
[self.get_op_usage(op) for op in self._get_downstream_ineligible_ops(op)],
[self._op_usages[op] for op in ineligible_ops],
ExecutionResources.zero(),
)
usage = ExecutionResources(
cpu=sum(self._op_usages[op].cpu for op in ineligible_ops),
gpu=sum(self._op_usages[op].gpu for op in ineligible_ops),
object_store_memory=sum(
self._op_usages[op].object_store_memory for op in ineligible_ops
),
)

@ray-gardener ray-gardener bot added the community-contribution Contributed by the community label Feb 7, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-contribution Contributed by the community

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant