Skip to content

Commit a915fe0

Browse files
authored
[Data] Delay 'cluster resources not enough' warning until operator is persistently starved (#63969)
## Description A dataset's resource allocator depends on the `AutoscalingCoordinator` server to get its share of allocated resources. To improve reliability, #62725 made calls to the server non-blocking. One consequence of this change is that the dataset gets zero resources at the very start of execution while it waits for the first response from the autoscaling coordnanator. As a result, we'd consistently emit spurious warnings like this at the start of execution: ``` Cluster resources are not enough to run any task from TaskPoolMapOperator[ReadRange]. The job may hang forever unless the cluster scales up. ``` To avoid this confusion, I've made it so that we only emit the warning after the first eligible operator has been starved for a minute. ## Related issues ## Additional information --------- Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
1 parent 269995b commit a915fe0

1 file changed

Lines changed: 37 additions & 10 deletions

File tree

python/ray/data/_internal/execution/resource_manager.py

Lines changed: 37 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,13 @@
3939
"RAY_DATA_DEBUG_RESOURCE_MANAGER", None
4040
)
4141

42+
# Only warn that the cluster can't run any task once the operator has been starved of
43+
# its minimum resources for this long. This avoids spurious warnings while the cluster
44+
# is still scaling up or waiting for a response from the autoscaling coordinator.
45+
#
46+
# I arbitrarily chose the default delay.
47+
STARVATION_WARNING_DELAY_S = env_float("RAY_DATA_STARVATION_WARNING_DELAY_S", 60)
48+
4249

4350
# Following list is a list of *blocking* materializing operators, that prevent
4451
# operators downstream from them from starting execution until these operators
@@ -705,6 +712,9 @@ def __init__(self, resource_manager: ResourceManager, reservation_ratio: float):
705712
# enough to run one task of each op.
706713
# See `test_no_deadlock_on_small_cluster_resources` as an example.
707714
self._reserved_min_resources: Dict[PhysicalOperator, bool] = {}
715+
# `time.monotonic()` timestamp at which each operator most recently became
716+
# starved of its minimum resources, or None if it currently has them.
717+
self._op_starved_since: Dict[PhysicalOperator, Optional[float]] = {}
708718

709719
def _update_reservation(self, limits: ExecutionResources):
710720
eligible_ops = self._resource_manager.get_eligible_ops()
@@ -746,27 +756,27 @@ def _update_reservation(self, limits: ExecutionResources):
746756
remaining, ignore_object_store_memory=True
747757
):
748758
self._reserved_min_resources[op] = True
759+
self._op_starved_since[op] = None
749760
else:
761+
self._reserved_min_resources[op] = False
762+
if self._op_starved_since.get(op) is None:
763+
self._op_starved_since[op] = time.monotonic()
764+
750765
# If the remaining resources are not enough to reserve the minimum
751766
# resources for this operator, we'll only reserve the minimum object
752767
# store memory, but not the CPU and GPU resources.
753768
# Because Ray Core doesn't allow CPU/GPU resources to be oversubscribed.
754769
# NOTE: we prioritize upstream operators for minimum resource reservation.
755770
# ops. It's fine that downstream ops don't get the minimum reservation,
756771
# because they can wait for upstream ops to finish and release resources.
757-
self._reserved_min_resources[op] = False
758772
reserved_for_tasks = ExecutionResources(
759773
0, 0, min_resource_usage.object_store_memory
760774
)
761-
# Add `id(self)` to the log_once key so that it will be logged once
762-
# per execution.
763-
if index == 0 and log_once(f"low_resource_warning_{id(self)}"):
764-
# Log a warning if even the first operator cannot reserve
765-
# the minimum resources.
766-
logger.warning(
767-
f"Cluster resources are not enough to run any task from {op}."
768-
" The job may hang forever unless the cluster scales up."
769-
)
775+
776+
# Log a warning if even the first operator cannot reserve the minimum
777+
# resources.
778+
if index == 0:
779+
self._warn_if_op_starved_too_long(op)
770780

771781
self._op_reserved[op] = reserved_for_tasks
772782
self._reserved_for_op_outputs[op] = reserved_for_outputs.object_store_memory
@@ -777,6 +787,23 @@ def _update_reservation(self, limits: ExecutionResources):
777787

778788
self._total_shared = remaining
779789

790+
def _warn_if_op_starved_too_long(self, op: PhysicalOperator) -> None:
791+
# The operator isn't starved. Return early.
792+
if self._op_starved_since.get(op) is None:
793+
return
794+
795+
op_starved_duration = time.monotonic() - self._op_starved_since[op]
796+
if (
797+
op_starved_duration >= STARVATION_WARNING_DELAY_S
798+
# Add `id(self)` to the log_once key so that it will be logged once per
799+
# execution.
800+
and log_once(f"starvation_warning_{id(self)}")
801+
):
802+
logger.warning(
803+
f"Cluster resources are not enough to run any task from {op}."
804+
" The job may hang forever unless the cluster scales up."
805+
)
806+
780807
def can_submit_new_task(self, op: PhysicalOperator) -> bool:
781808
"""Return whether the given operator can submit a new task based on budget."""
782809
budget = self.get_budget(op)

0 commit comments

Comments
 (0)