Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 88 additions & 0 deletions python/ray/data/_internal/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,94 @@ def _is_local_scheme(paths: Union[str, List[str]]) -> bool:
return num == len(paths)


def _validate_head_node_resources_for_local_scheduling(
ray_remote_args: Dict[str, Any],
*,
op_name: str,
required_num_cpus: int = 1,
required_num_gpus: int = 0,
required_memory: int = 0,
) -> None:
"""Ensure the head node has enough resources before pinning work there.

Local paths (``local://``) and other driver-local I/O force tasks onto the
head node via ``NodeAffinitySchedulingStrategy``. If the head node was
intentionally started with zero logical resources (a common practice to
avoid OOMs), those tasks become unschedulable. Detect this upfront and
raise a clear error with remediation steps.
"""

# Ray defaults to reserving 1 CPU per task when num_cpus isn't provided.
num_cpus = ray_remote_args.get("num_cpus", required_num_cpus)
num_gpus = ray_remote_args.get("num_gpus", required_num_gpus)
memory = ray_remote_args.get("memory", required_memory)

required_resources: Dict[str, float] = {}
if num_cpus > 0:
required_resources["CPU"] = float(num_cpus)
if num_gpus > 0:
required_resources["GPU"] = float(num_gpus)
if memory > 0:
required_resources["memory"] = float(memory)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing None handling for standard resource arguments

Medium Severity

The standard resources (num_cpus, num_gpus, memory) use .get() with a default value, but this only applies when the key is absent. If ray_remote_args contains an explicit None value (e.g., {"num_cpus": None}), the .get() returns None, and the subsequent comparison like num_cpus > 0 raises a TypeError. This is inconsistent with the custom resources handling at lines 404-405, which explicitly checks for and skips None values.

Fix in Cursor Fix in Web


# Include any additional custom resources requested.
custom_resources = ray_remote_args.get("resources", {})
for name, amount in custom_resources.items():
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing None handling for resources dict causes AttributeError

Medium Severity

Similar to the standard resource fields, if ray_remote_args contains {"resources": None}, the .get("resources", {}) call returns None (since the key exists), and then custom_resources.items() raises AttributeError: 'NoneType' object has no attribute 'items'. The code handles None for individual resource amounts within the dict (line 404-405), but not for the case where the entire resources value is None.

Fix in Cursor Fix in Web

if amount is None:
continue
try:
amount = float(amount)
except (TypeError, ValueError) as err:
raise ValueError(f"Invalid resource amount for '{name}': {amount}") from err
if amount > 0:
required_resources[name] = amount

# If there are no positive resource requirements, the task can run on a
# zero-resource head node (e.g., num_cpus=0 opt-out), so nothing to check.
if not required_resources:
return

head_node = next(
(
node
for node in ray.nodes()
if node.get("Alive")
and "node:__internal_head__" in node.get("Resources", {})
),
None,
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Validation checks head node but tasks scheduled elsewhere

Medium Severity

The validation function explicitly searches for the head node using node:__internal_head__ in resources, but the actual NodeAffinitySchedulingStrategy is set using ray.get_runtime_context().get_node_id(), which returns the current node (driver's node). If the driver is running on a non-head node (e.g., in a multi-node cluster where a script runs from a worker node), the validation checks resources on the wrong node. This could cause false positives (blocking operations that would succeed) or false negatives (allowing operations that will fail).

Additional Locations (2)

Fix in Cursor Fix in Web

if not head_node:
# The head node metadata is unavailable (e.g., during shutdown). Fall back
# to the default behavior and let Ray surface its own error.
return
Comment on lines +427 to +430
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my understanding, do u have a script of when this can occur? (head_node is None , BUT next(...) doesn't throw a StopIteration exception?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the question. next(..., None) is intentional, so it returns None (no StopIteration) when no matching head node is visible. That can happen during shutdown/teardown or before head resources are fully registered, so we fall back and let Ray surface its own error.


# Build a map of required vs available resources on the head node.
head_resources: Dict[str, float] = head_node.get("Resources", {})
# Map: resource name -> (required, available).
insufficient: Dict[str, Tuple[float, float]] = {}
for name, req in required_resources.items():
avail = head_resources.get(name, 0.0)
if avail < req:
insufficient[name] = (req, avail)

# If nothing is below the required amount, we are good to proceed.
if not insufficient:
return

details = "; ".join(
f"{name} required {req:g} but head has {avail:g}"
for name, (req, avail) in insufficient.items()
)

raise ValueError(
f"{op_name} must run on the head node (e.g., for local:// paths), "
f"but the head node doesn't have enough resources: {details}. "
"Add resources to the head node, switch to a shared filesystem instead "
"of local://, or set the resource requests on this operation to 0 "
"(for example, num_cpus=0) so it can run without head resources."
)


def _truncated_repr(obj: Any) -> str:
"""Utility to return a truncated object representation for error messages."""
msg = str(obj)
Expand Down
6 changes: 6 additions & 0 deletions python/ray/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
from ray.data._internal.util import (
AllToAllAPI,
ConsumptionAPI,
_validate_head_node_resources_for_local_scheduling,
_validate_rows_per_file_args,
get_compute_strategy,
merge_resources_to_ray_remote_args,
Expand Down Expand Up @@ -5346,6 +5347,11 @@ def write_datasink(
soft=False,
)

_validate_head_node_resources_for_local_scheduling(
ray_remote_args,
op_name="Writing to a local:// path",
)

plan = self._plan.copy()
write_op = Write(
self._logical_plan.dag,
Expand Down
7 changes: 7 additions & 0 deletions python/ray/data/read_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
from ray.data._internal.tensor_extensions.utils import _create_possibly_ragged_ndarray
from ray.data._internal.util import (
_autodetect_parallelism,
_validate_head_node_resources_for_local_scheduling,
get_table_block_metadata_schema,
merge_resources_to_ray_remote_args,
ndarray_to_block,
Expand Down Expand Up @@ -440,6 +441,12 @@ def read_datasource(
ray_remote_args,
)

if not datasource.supports_distributed_reads:
_validate_head_node_resources_for_local_scheduling(
ray_remote_args,
op_name="Reading from a local:// path",
)

datasource_or_legacy_reader = _get_datasource_or_legacy_reader(
datasource,
ctx,
Expand Down
36 changes: 36 additions & 0 deletions python/ray/data/tests/test_consumption.py
Original file line number Diff line number Diff line change
Expand Up @@ -743,6 +743,42 @@ def check_dataset_is_local(ds):
).materialize()


def test_read_local_scheme_zero_head_resources(ray_start_cluster, tmp_path):
cluster = ray_start_cluster
cluster.add_node(num_cpus=0)
cluster.wait_for_nodes()

ray.shutdown()
ray.init(address=cluster.address)

cluster.add_node(num_cpus=1)
cluster.wait_for_nodes()

csv_path = tmp_path / "data.csv"
pd.DataFrame({"a": [1, 2]}).to_csv(csv_path, index=False)

with pytest.raises(ValueError, match=r"local://.*head node"):
ray.data.read_csv(f"local://{csv_path}").materialize()


def test_write_local_scheme_zero_head_resources(ray_start_cluster, tmp_path):
cluster = ray_start_cluster
cluster.add_node(num_cpus=0)
cluster.wait_for_nodes()

ray.shutdown()
ray.init(address=cluster.address)

cluster.add_node(num_cpus=1)
cluster.wait_for_nodes()

output_dir = tmp_path / "local_out"
ds = ray.data.range(10)

with pytest.raises(ValueError, match=r"local://.*head node"):
ds.write_parquet(f"local://{output_dir}")


class FlakyCSVDatasource(CSVDatasource):
def __init__(self, paths, **csv_datasource_kwargs):
super().__init__(paths, **csv_datasource_kwargs)
Expand Down