Skip to content

[Data] Include logical memory in resource manager scheduling decisions #60744

@bveeramani

Description

@bveeramani

Description

Rename current_processor_usage to current_logical_usage across physical operators, and update the method to include logical memory (memory) alongside CPU and GPU. This ensures the resource manager accounts for memory when making scheduling decisions.

Background

Ray Data's resource manager (resource_manager.py) tracks resource usage to prevent over-subscription and coordinate task scheduling. Currently, the current_processor_usage() method on physical operators returns only CPU and GPU usage—logical memory is ignored during scheduling decisions.

The ExecutionResources class already supports a memory field representing logical memory (distinct from object_store_memory). The per-task resource allocation methods like per_task_resource_allocation() in TaskPoolMapOperator already track memory. However, the aggregation in current_processor_usage() and its callers in the resource manager do not include it.

Key files:

  • _internal/execution/interfaces/physical_operator.py – defines current_processor_usage() (line 682)
  • _internal/execution/operators/task_pool_map_operator.py – implements current_processor_usage() (line 139)
  • _internal/execution/operators/actor_pool_map_operator.py – implements current_processor_usage() (line 484)
  • _internal/execution/resource_manager.py – calls current_processor_usage() (line 217)

Motivation

Without accounting for logical memory, the resource manager can over-schedule tasks. For example:

  • A cluster has 40GB memory and 10 CPUs
  • An actor-based operation requests 10GB memory per actor
  • The scheduler only sees CPU constraints and scales to 10 actors (100GB memory requested)
  • Downstream task-based operations hang because no memory budget remains

This causes pipeline deadlocks that are difficult to debug. See ray-project/ray#60290 for a user-reported case.

Implementation Boundaries & Constraints

  1. Rename the method: Change current_processor_usagecurrent_logical_usage in:

    • Base class PhysicalOperator
    • TaskPoolMapOperator
    • ActorPoolMapOperator
    • HashShuffleOperator (if applicable)
    • Any other operator overrides
  2. Include memory in the return value: Update each implementation to include the memory field from _ray_remote_args.get("memory", 0). For example, in TaskPoolMapOperator:

    def current_logical_usage(self) -> ExecutionResources:
        num_active_workers = self.num_active_tasks()
        return ExecutionResources(
            cpu=self._ray_remote_args.get("num_cpus", 0) * num_active_workers,
            gpu=self._ray_remote_args.get("num_gpus", 0) * num_active_workers,
            memory=self._ray_remote_args.get("memory", 0) * num_active_workers,
        )
  3. Update callers: The resource manager and any other code that calls current_processor_usage() should be updated to use the new name. The running_processor_usage() method in PhysicalOperator (line 703) also needs updating.

  4. Update related methods: Consider whether pending_processor_usage() and running_processor_usage() should also be renamed and include memory (likely yes, for consistency).

  5. Tests: Update tests in tests/test_executor_resource_management.py and tests/test_resource_manager.py that mock or assert on current_processor_usage().

  6. Backwards compatibility: This is an internal API change. No deprecation warning is needed since current_processor_usage is not part of the public API.

Metadata

Metadata

Assignees

Labels

P1Issue that should be fixed within a few weeksdataRay Data-related issues

Type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions