Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 43 additions & 2 deletions python/ray/data/_internal/execution/resource_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,11 +315,52 @@ def get_op_usage(
def _get_downstream_ineligible_ops_usage(
self, op: PhysicalOperator
) -> ExecutionResources:
return reduce(
"""Return resource usage for downstream ineligible ops.

For ineligible ops (e.g., LimitOperator with throttling_disabled=True), we need
to account for all downstream object store memory, not just the ineligible ops
themselves, because memory is held across the entire downstream chain.

Returns:
ExecutionResources where CPU/GPU are summed over ineligible ops only, and
object_store_memory is the sum of ineligible ops plus all their downstream
ops.

Args:
op: The operator whose downstream ineligible usage is computed.
"""
ineligible_ops = list(self._get_downstream_ineligible_ops(op))
if not ineligible_ops:
return ExecutionResources.zero()

usage = reduce(
lambda x, y: x.add(y),
[self.get_op_usage(op) for op in self._get_downstream_ineligible_ops(op)],
[self._op_usages[op] for op in ineligible_ops],
ExecutionResources.zero(),
)
Comment on lines +336 to 340
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

For better readability and performance, consider replacing reduce with sum over generator expressions for each resource type. This approach is generally more Pythonic and easier to understand than using reduce with a lambda function.

Suggested change
usage = reduce(
lambda x, y: x.add(y),
[self.get_op_usage(op) for op in self._get_downstream_ineligible_ops(op)],
[self._op_usages[op] for op in ineligible_ops],
ExecutionResources.zero(),
)
usage = ExecutionResources(
cpu=sum(self._op_usages[op].cpu for op in ineligible_ops),
gpu=sum(self._op_usages[op].gpu for op in ineligible_ops),
object_store_memory=sum(
self._op_usages[op].object_store_memory for op in ineligible_ops
),
)

# For ineligible ops, include all downstream object store usage to avoid
# undercounting memory held beyond the ineligible chain.
ineligible_set = set(ineligible_ops)
additional_object_store_usage = sum(
self._op_usages[op].object_store_memory
for op in self._collect_downstream_ops(ineligible_ops)
if op not in ineligible_set
)
object_store_usage = usage.object_store_memory + additional_object_store_usage
return usage.copy(object_store_memory=object_store_usage)

def _collect_downstream_ops(
self, ops: Iterable[PhysicalOperator]
) -> Iterable[PhysicalOperator]:
visited = set()
stack = list(ops)
while stack:
cur = stack.pop()
if cur in visited:
continue
visited.add(cur)
stack.extend(cur.output_dependencies)
return visited

def get_mem_op_internal(self, op: PhysicalOperator) -> int:
"""Return the memory usage of pending task outputs for the given operator."""
Expand Down
161 changes: 161 additions & 0 deletions python/ray/data/tests/test_resource_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,23 @@ def mock_all_to_all_op(input_op, name="MockShuffle"):
return op


def mock_op_metrics(topo, op, pending_output, internal_outqueue, external_outqueue):
op.update_resource_usage = MagicMock()
op.current_processor_usage = MagicMock(return_value=ExecutionResources.zero())
op.running_processor_usage = MagicMock(return_value=ExecutionResources.zero())
op.pending_processor_usage = MagicMock(return_value=ExecutionResources.zero())
op.extra_resource_usage = MagicMock(return_value=ExecutionResources.zero())
op._metrics = MagicMock(
obj_store_mem_pending_task_outputs=pending_output,
obj_store_mem_internal_outqueue=internal_outqueue,
obj_store_mem_internal_inqueue=0,
obj_store_mem_pending_task_inputs=0,
)
if external_outqueue:
ref_bundle = MagicMock(size_bytes=MagicMock(return_value=external_outqueue))
topo[op].add_output(ref_bundle)


class TestResourceManager:
"""Unit tests for ResourceManager."""

Expand Down Expand Up @@ -318,6 +335,150 @@ def test_update_usage(self):
global_cpu, 0, global_mem
)

def test_ineligible_ops_include_downstream_object_store_usage(self):
o1 = InputDataBuffer(DataContext.get_current(), [])
o2 = mock_map_op(o1, name="Map1")
o3 = LimitOperator(1, o2, DataContext.get_current())
o4 = mock_map_op(o3, name="Map2")

topo = build_streaming_topology(o4, ExecutionOptions())

op_outputs = {
o1: 0,
o2: 0,
o3: 20,
o4: 70,
}
op_pending_outputs = {
o1: 0,
o2: 0,
o3: 10,
o4: 50,
}
op_internal_outqueue = {
o1: 0,
o2: 0,
o3: 30,
o4: 60,
}

for op in [o1, o2, o3, o4]:
mock_op_metrics(
topo,
op,
op_pending_outputs[op],
op_internal_outqueue[op],
op_outputs[op],
)

resource_manager = ResourceManager(
topo, ExecutionOptions(), MagicMock(), DataContext.get_current()
)
resource_manager._op_resource_allocator = None
resource_manager.update_usages()

ineligible_usage = (
op_pending_outputs[o3] + op_internal_outqueue[o3] + op_outputs[o3]
)
downstream_usage = (
op_pending_outputs[o4] + op_internal_outqueue[o4] + op_outputs[o4]
)
assert (
resource_manager.get_op_usage(
o2, include_ineligible_downstream=True
).object_store_memory
== ineligible_usage + downstream_usage
)

def test_ineligible_ops_multi_layer_chain(self):
o1 = InputDataBuffer(DataContext.get_current(), [])
o2 = mock_map_op(o1, name="Map1")
limit1 = LimitOperator(1, o2, DataContext.get_current())
limit2 = LimitOperator(1, limit1, DataContext.get_current())
o3 = mock_map_op(limit2, name="Map2")

topo = build_streaming_topology(o3, ExecutionOptions())

mock_op_metrics(topo, o1, 0, 0, 0)
mock_op_metrics(topo, o2, 0, 0, 0)
mock_op_metrics(topo, limit1, 10, 30, 20)
mock_op_metrics(topo, limit2, 5, 15, 25)
mock_op_metrics(topo, o3, 50, 60, 70)

resource_manager = ResourceManager(
topo, ExecutionOptions(), MagicMock(), DataContext.get_current()
)
resource_manager._op_resource_allocator = None
resource_manager.update_usages()

limit1_usage = 10 + 30 + 20
limit2_usage = 5 + 15 + 25
downstream_usage = 50 + 60 + 70
assert (
resource_manager.get_op_usage(
o2, include_ineligible_downstream=True
).object_store_memory
== limit1_usage + limit2_usage + downstream_usage
)

def test_ineligible_ops_branching_downstream(self):
o1 = InputDataBuffer(DataContext.get_current(), [])
o2 = mock_map_op(o1, name="Map1")
limit = LimitOperator(1, o2, DataContext.get_current())
o3 = mock_map_op(limit, name="Map2")
o4 = mock_map_op(limit, name="Map3")
o5 = mock_union_op([o3, o4])

topo = build_streaming_topology(o5, ExecutionOptions())

mock_op_metrics(topo, o1, 0, 0, 0)
mock_op_metrics(topo, o2, 0, 0, 0)
mock_op_metrics(topo, limit, 10, 20, 30)
mock_op_metrics(topo, o3, 40, 50, 60)
mock_op_metrics(topo, o4, 70, 80, 90)
mock_op_metrics(topo, o5, 0, 0, 0)

resource_manager = ResourceManager(
topo, ExecutionOptions(), MagicMock(), DataContext.get_current()
)
resource_manager._op_resource_allocator = None
resource_manager.update_usages()

limit_usage = 10 + 20 + 30
o3_usage = 40 + 50 + 60
o4_usage = 70 + 80 + 90
assert (
resource_manager.get_op_usage(
o2, include_ineligible_downstream=True
).object_store_memory
== limit_usage + o3_usage + o4_usage
)

def test_ineligible_ops_without_downstream(self):
o1 = InputDataBuffer(DataContext.get_current(), [])
o2 = mock_map_op(o1, name="Map1")
limit = LimitOperator(1, o2, DataContext.get_current())

topo = build_streaming_topology(limit, ExecutionOptions())

mock_op_metrics(topo, o1, 0, 0, 0)
mock_op_metrics(topo, o2, 0, 0, 0)
mock_op_metrics(topo, limit, 5, 15, 25)

resource_manager = ResourceManager(
topo, ExecutionOptions(), MagicMock(), DataContext.get_current()
)
resource_manager._op_resource_allocator = None
resource_manager.update_usages()

limit_usage = 5 + 15 + 25
assert (
resource_manager.get_op_usage(
o2, include_ineligible_downstream=True
).object_store_memory
== limit_usage
)

def test_object_store_usage(self, restore_data_context):
input = make_ref_bundles([[x] for x in range(1)])[0]
input.size_bytes = MagicMock(return_value=1)
Expand Down