From 8fd4696fbb710cf75e184664ba815aa5f3b1a0bb Mon Sep 17 00:00:00 2001 From: Haichuan Hu Date: Tue, 3 Feb 2026 22:21:05 +0800 Subject: [PATCH] [Data] Add descriptive error when using local:// paths with a zero-resource head node Signed-off-by: Haichuan Hu --- python/ray/data/_internal/util.py | 88 +++++++++++++++++++++++ python/ray/data/dataset.py | 6 ++ python/ray/data/read_api.py | 7 ++ python/ray/data/tests/test_consumption.py | 36 ++++++++++ 4 files changed, 137 insertions(+) diff --git a/python/ray/data/_internal/util.py b/python/ray/data/_internal/util.py index 1b6ffa0a3855..85b09a849153 100644 --- a/python/ray/data/_internal/util.py +++ b/python/ray/data/_internal/util.py @@ -368,6 +368,94 @@ def _is_local_scheme(paths: Union[str, List[str]]) -> bool: return num == len(paths) +def _validate_head_node_resources_for_local_scheduling( + ray_remote_args: Dict[str, Any], + *, + op_name: str, + required_num_cpus: int = 1, + required_num_gpus: int = 0, + required_memory: int = 0, +) -> None: + """Ensure the head node has enough resources before pinning work there. + + Local paths (``local://``) and other driver-local I/O force tasks onto the + head node via ``NodeAffinitySchedulingStrategy``. If the head node was + intentionally started with zero logical resources (a common practice to + avoid OOMs), those tasks become unschedulable. Detect this upfront and + raise a clear error with remediation steps. + """ + + # Ray defaults to reserving 1 CPU per task when num_cpus isn't provided. + num_cpus = ray_remote_args.get("num_cpus", required_num_cpus) + num_gpus = ray_remote_args.get("num_gpus", required_num_gpus) + memory = ray_remote_args.get("memory", required_memory) + + required_resources: Dict[str, float] = {} + if num_cpus > 0: + required_resources["CPU"] = float(num_cpus) + if num_gpus > 0: + required_resources["GPU"] = float(num_gpus) + if memory > 0: + required_resources["memory"] = float(memory) + + # Include any additional custom resources requested. + custom_resources = ray_remote_args.get("resources", {}) + for name, amount in custom_resources.items(): + if amount is None: + continue + try: + amount = float(amount) + except (TypeError, ValueError) as err: + raise ValueError(f"Invalid resource amount for '{name}': {amount}") from err + if amount > 0: + required_resources[name] = amount + + # If there are no positive resource requirements, the task can run on a + # zero-resource head node (e.g., num_cpus=0 opt-out), so nothing to check. + if not required_resources: + return + + head_node = next( + ( + node + for node in ray.nodes() + if node.get("Alive") + and "node:__internal_head__" in node.get("Resources", {}) + ), + None, + ) + if not head_node: + # The head node metadata is unavailable (e.g., during shutdown). Fall back + # to the default behavior and let Ray surface its own error. + return + + # Build a map of required vs available resources on the head node. + head_resources: Dict[str, float] = head_node.get("Resources", {}) + # Map: resource name -> (required, available). + insufficient: Dict[str, Tuple[float, float]] = {} + for name, req in required_resources.items(): + avail = head_resources.get(name, 0.0) + if avail < req: + insufficient[name] = (req, avail) + + # If nothing is below the required amount, we are good to proceed. + if not insufficient: + return + + details = "; ".join( + f"{name} required {req:g} but head has {avail:g}" + for name, (req, avail) in insufficient.items() + ) + + raise ValueError( + f"{op_name} must run on the head node (e.g., for local:// paths), " + f"but the head node doesn't have enough resources: {details}. " + "Add resources to the head node, switch to a shared filesystem instead " + "of local://, or set the resource requests on this operation to 0 " + "(for example, num_cpus=0) so it can run without head resources." + ) + + def _truncated_repr(obj: Any) -> str: """Utility to return a truncated object representation for error messages.""" msg = str(obj) diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 485fdad4f8bb..1536571f719f 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -89,6 +89,7 @@ from ray.data._internal.util import ( AllToAllAPI, ConsumptionAPI, + _validate_head_node_resources_for_local_scheduling, _validate_rows_per_file_args, get_compute_strategy, merge_resources_to_ray_remote_args, @@ -5346,6 +5347,11 @@ def write_datasink( soft=False, ) + _validate_head_node_resources_for_local_scheduling( + ray_remote_args, + op_name="Writing to a local:// path", + ) + plan = self._plan.copy() write_op = Write( self._logical_plan.dag, diff --git a/python/ray/data/read_api.py b/python/ray/data/read_api.py index 0a636e2254c6..db00178aef4a 100644 --- a/python/ray/data/read_api.py +++ b/python/ray/data/read_api.py @@ -76,6 +76,7 @@ from ray.data._internal.tensor_extensions.utils import _create_possibly_ragged_ndarray from ray.data._internal.util import ( _autodetect_parallelism, + _validate_head_node_resources_for_local_scheduling, get_table_block_metadata_schema, merge_resources_to_ray_remote_args, ndarray_to_block, @@ -440,6 +441,12 @@ def read_datasource( ray_remote_args, ) + if not datasource.supports_distributed_reads: + _validate_head_node_resources_for_local_scheduling( + ray_remote_args, + op_name="Reading from a local:// path", + ) + datasource_or_legacy_reader = _get_datasource_or_legacy_reader( datasource, ctx, diff --git a/python/ray/data/tests/test_consumption.py b/python/ray/data/tests/test_consumption.py index b34d59625d48..d069a930d9da 100644 --- a/python/ray/data/tests/test_consumption.py +++ b/python/ray/data/tests/test_consumption.py @@ -743,6 +743,42 @@ def check_dataset_is_local(ds): ).materialize() +def test_read_local_scheme_zero_head_resources(ray_start_cluster, tmp_path): + cluster = ray_start_cluster + cluster.add_node(num_cpus=0) + cluster.wait_for_nodes() + + ray.shutdown() + ray.init(address=cluster.address) + + cluster.add_node(num_cpus=1) + cluster.wait_for_nodes() + + csv_path = tmp_path / "data.csv" + pd.DataFrame({"a": [1, 2]}).to_csv(csv_path, index=False) + + with pytest.raises(ValueError, match=r"local://.*head node"): + ray.data.read_csv(f"local://{csv_path}").materialize() + + +def test_write_local_scheme_zero_head_resources(ray_start_cluster, tmp_path): + cluster = ray_start_cluster + cluster.add_node(num_cpus=0) + cluster.wait_for_nodes() + + ray.shutdown() + ray.init(address=cluster.address) + + cluster.add_node(num_cpus=1) + cluster.wait_for_nodes() + + output_dir = tmp_path / "local_out" + ds = ray.data.range(10) + + with pytest.raises(ValueError, match=r"local://.*head node"): + ds.write_parquet(f"local://{output_dir}") + + class FlakyCSVDatasource(CSVDatasource): def __init__(self, paths, **csv_datasource_kwargs): super().__init__(paths, **csv_datasource_kwargs)