Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ def __init__(
self._in_task_output_backpressure = False
self._estimated_num_output_bundles = None
self._estimated_output_num_rows = None
self._execution_finished = False
self._is_execution_marked_finished = False
# The LogicalOperator(s) which were translated to create this PhysicalOperator.
# Set via `PhysicalOperator.set_logical_operators()`.
self._logical_operators: List[LogicalOperator] = []
Expand Down Expand Up @@ -401,48 +401,51 @@ def override_target_max_block_size(self, target_max_block_size: Optional[int]):

def mark_execution_finished(self):
"""Manually mark that this operator has finished execution."""
self._execution_finished = True
self._is_execution_marked_finished = True

def execution_finished(self) -> bool:
def has_execution_finished(self) -> bool:
"""Return True when this operator has finished execution.

The outputs may or may not have been taken.
"""
return self._execution_finished
from ..operators.base_physical_operator import InternalQueueOperatorMixin

internal_input_queue_num_blocks = 0
if isinstance(self, InternalQueueOperatorMixin):
internal_input_queue_num_blocks = self.internal_input_queue_num_blocks()

# NOTE: Execution is considered finished if
# - The operator was explicitly marked finished OR
# - The following auto-completion conditions are met
# - All input blocks have been ingested
# - Internal queue is empty
# - There are no active or pending tasks

return self._is_execution_marked_finished or (
self._inputs_complete
and self.num_active_tasks() == 0
and internal_input_queue_num_blocks == 0
)

def completed(self) -> bool:
"""Returns whether this operator has been fully completed.

An operator is completed iff:
* The operator has finished execution (i.e., `execution_finished()` is True).
* The operator has finished execution (i.e., `has_execution_finished()` is True).
* All outputs have been taken (i.e., `has_next()` is False) from it.
"""
from ..operators.base_physical_operator import InternalQueueOperatorMixin

internal_input_queue_num_blocks = 0
internal_output_queue_num_blocks = 0
if isinstance(self, InternalQueueOperatorMixin):
internal_input_queue_num_blocks = self.internal_input_queue_num_blocks()
internal_output_queue_num_blocks = self.internal_output_queue_num_blocks()

if not self._execution_finished:
if (
self._inputs_complete
and internal_input_queue_num_blocks == 0
and self.num_active_tasks() == 0
):
# NOTE: Operator is considered completed iff
# - All input blocks have been ingested
# - Internal queue is empty
# - There are no active or pending tasks
self._execution_finished = True

# NOTE: We check for (internal_output_queue_size == 0) and
# (not self.has_next()) because _OrderedOutputQueue can
# return False for self.has_next(), but have a non-empty queue size.
# Draining the internal output queue is important to free object refs.
return (
self._execution_finished
self.has_execution_finished()
and not self.has_next()
and internal_output_queue_num_blocks == 0
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def get_stats(self) -> StatsDict:
def num_outputs_total(self) -> Optional[int]:
# Before execution is completed, we don't know how many output
# bundles we will have. We estimate based off the consumption so far.
if self._execution_finished:
if self._is_execution_marked_finished:
return self._cur_output_bundles
return self._estimated_num_output_bundles

Expand Down
8 changes: 4 additions & 4 deletions python/ray/data/_internal/execution/resource_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ def is_op_eligible(self, op: PhysicalOperator) -> bool:
not op.throttling_disabled()
# As long as the op has finished execution, even if there are still
# non-taken outputs, we don't need to allocate resources for it.
and not op.execution_finished()
and not op.has_execution_finished()
)

def get_eligible_ops(self) -> List[PhysicalOperator]:
Expand Down Expand Up @@ -553,7 +553,7 @@ def _is_op_eligible(op: PhysicalOperator) -> bool:
not op.throttling_disabled()
# As long as the op has finished execution, even if there are still
# non-taken outputs, we don't need to allocate resources for it.
and not op.execution_finished()
and not op.has_execution_finished()
)

def _get_downstream_eligible_ops(
Expand Down Expand Up @@ -674,9 +674,9 @@ def _get_ineligible_ops_with_usage(self) -> List[PhysicalOperator]:
ops_to_exclude_from_reservation = []
# Traverse operator tree collecting all operators that have already finished
for op in self._topology:
if not op.execution_finished():
if not op.has_execution_finished():
for dep in op.input_dependencies:
if dep.execution_finished():
if dep.has_execution_finished():
last_completed_ops.append(dep)

# In addition to completed operators,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,7 @@ def _get_state_dict(self, state):
"total_rows": op.num_output_rows_total(),
"queued_blocks": op_state.total_enqueued_input_blocks(),
"state": DatasetState.FINISHED.name
if op.execution_finished()
if op.has_execution_finished()
else state,
}
for i, (op, op_state) in enumerate(self._topology.items())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -672,7 +672,7 @@ def update_operator_states(topology: Topology) -> None:
# Drain external input queue if current operator is execution finished.
# This is needed when the limit is reached, and `mark_execution_finished`
# is called manually.
if op.execution_finished():
if op.has_execution_finished():
for input_queue in op_state.input_queues:
# Drain input queue
input_queue.clear()
Expand Down
6 changes: 3 additions & 3 deletions python/ray/data/tests/test_operators.py
Original file line number Diff line number Diff line change
Expand Up @@ -1055,7 +1055,7 @@ def test_limit_operator(ray_start_regular_shared):
while input_op.has_next() and not limit_op._limit_reached():
loop_count += 1
assert not limit_op.completed(), limit
assert not limit_op._execution_finished, limit
assert not limit_op._is_execution_marked_finished, limit
limit_op.add_input(input_op.get_next(), 0)
while limit_op.has_next():
# Drain the outputs. So the limit operator
Expand All @@ -1066,12 +1066,12 @@ def test_limit_operator(ray_start_regular_shared):
assert limit_op.mark_execution_finished.call_count == 1, limit
assert limit_op.completed(), limit
assert limit_op._limit_reached(), limit
assert limit_op._execution_finished, limit
assert limit_op._is_execution_marked_finished, limit
else:
assert limit_op.mark_execution_finished.call_count == 0, limit
assert not limit_op.completed(), limit
assert not limit_op._limit_reached(), limit
assert not limit_op._execution_finished, limit
assert not limit_op._is_execution_marked_finished, limit
limit_op.mark_execution_finished()
# After inputs done, the number of output bundles
# should be the same as the number of `add_input`s.
Expand Down
2 changes: 1 addition & 1 deletion python/ray/data/tests/test_streaming_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ def test_update_operator_states_drains_upstream(ray_start_regular_shared):

# Manually mark o2 as execution finished (simulating limit operator behavior)
o2.mark_execution_finished()
assert o2.execution_finished(), "o2 should be execution finished"
assert o2.has_execution_finished(), "o2 should be execution finished"

# Call update_operator_states - this should drain o1's output queue
update_operator_states(topo)
Expand Down