Skip to content
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ def __init__(
self._in_task_output_backpressure = False
self._estimated_num_output_bundles = None
self._estimated_output_num_rows = None
self._execution_finished = False
self._is_execution_marked_finished = False
# The LogicalOperator(s) which were translated to create this PhysicalOperator.
# Set via `PhysicalOperator.set_logical_operators()`.
self._logical_operators: List[LogicalOperator] = []
Expand Down Expand Up @@ -401,48 +401,51 @@ def override_target_max_block_size(self, target_max_block_size: Optional[int]):

def mark_execution_finished(self):
"""Manually mark that this operator has finished execution."""
self._execution_finished = True
self._is_execution_marked_finished = True

def execution_finished(self) -> bool:
def has_execution_finished(self) -> bool:
"""Return True when this operator has finished execution.

The outputs may or may not have been taken.
"""
return self._execution_finished
from ..operators.base_physical_operator import InternalQueueOperatorMixin

internal_input_queue_num_blocks = 0
if isinstance(self, InternalQueueOperatorMixin):
internal_input_queue_num_blocks = self.internal_input_queue_num_blocks()

# NOTE: Execution is considered finished if
# - The operator was explicitly marked finished OR
# - The following auto-completion conditions are met
# - All input blocks have been ingested
# - Internal queue is empty
# - There are no active or pending tasks

return self._is_execution_marked_finished or (
self._inputs_complete
and self.num_active_tasks() == 0
and internal_input_queue_num_blocks == 0
)

def completed(self) -> bool:
"""Returns whether this operator has been fully completed.

An operator is completed iff:
* The operator has finished execution (i.e., `execution_finished()` is True).
* The operator has finished execution (i.e., `has_execution_finished()` is True).
* All outputs have been taken (i.e., `has_next()` is False) from it.
"""
from ..operators.base_physical_operator import InternalQueueOperatorMixin

internal_input_queue_num_blocks = 0
internal_output_queue_num_blocks = 0
if isinstance(self, InternalQueueOperatorMixin):
internal_input_queue_num_blocks = self.internal_input_queue_num_blocks()
internal_output_queue_num_blocks = self.internal_output_queue_num_blocks()

if not self._execution_finished:
if (
self._inputs_complete
and internal_input_queue_num_blocks == 0
and self.num_active_tasks() == 0
):
# NOTE: Operator is considered completed iff
# - All input blocks have been ingested
# - Internal queue is empty
# - There are no active or pending tasks
self._execution_finished = True

# NOTE: We check for (internal_output_queue_size == 0) and
# (not self.has_next()) because _OrderedOutputQueue can
# return False for self.has_next(), but have a non-empty queue size.
# Draining the internal output queue is important to free object refs.
return (
self._execution_finished
self.has_execution_finished()
and not self.has_next()
and internal_output_queue_num_blocks == 0
)
Expand Down