@@ -368,6 +368,94 @@ def _is_local_scheme(paths: Union[str, List[str]]) -> bool:
368368 return num == len (paths )
369369
370370
371+ def _validate_head_node_resources_for_local_scheduling (
372+ ray_remote_args : Dict [str , Any ],
373+ * ,
374+ op_name : str ,
375+ required_num_cpus : int = 1 ,
376+ required_num_gpus : int = 0 ,
377+ required_memory : int = 0 ,
378+ ) -> None :
379+ """Ensure the head node has enough resources before pinning work there.
380+
381+ Local paths (``local://``) and other driver-local I/O force tasks onto the
382+ head node via ``NodeAffinitySchedulingStrategy``. If the head node was
383+ intentionally started with zero logical resources (a common practice to
384+ avoid OOMs), those tasks become unschedulable. Detect this upfront and
385+ raise a clear error with remediation steps.
386+ """
387+
388+ # Ray defaults to reserving 1 CPU per task when num_cpus isn't provided.
389+ num_cpus = ray_remote_args .get ("num_cpus" , required_num_cpus )
390+ num_gpus = ray_remote_args .get ("num_gpus" , required_num_gpus )
391+ memory = ray_remote_args .get ("memory" , required_memory )
392+
393+ required_resources : Dict [str , float ] = {}
394+ if num_cpus > 0 :
395+ required_resources ["CPU" ] = float (num_cpus )
396+ if num_gpus > 0 :
397+ required_resources ["GPU" ] = float (num_gpus )
398+ if memory > 0 :
399+ required_resources ["memory" ] = float (memory )
400+
401+ # Include any additional custom resources requested.
402+ custom_resources = ray_remote_args .get ("resources" , {})
403+ for name , amount in custom_resources .items ():
404+ if amount is None :
405+ continue
406+ try :
407+ amount = float (amount )
408+ except (TypeError , ValueError ) as err :
409+ raise ValueError (f"Invalid resource amount for '{ name } ': { amount } " ) from err
410+ if amount > 0 :
411+ required_resources [name ] = amount
412+
413+ # If there are no positive resource requirements, the task can run on a
414+ # zero-resource head node (e.g., num_cpus=0 opt-out), so nothing to check.
415+ if not required_resources :
416+ return
417+
418+ head_node = next (
419+ (
420+ node
421+ for node in ray .nodes ()
422+ if node .get ("Alive" )
423+ and "node:__internal_head__" in node .get ("Resources" , {})
424+ ),
425+ None ,
426+ )
427+ if not head_node :
428+ # The head node metadata is unavailable (e.g., during shutdown). Fall back
429+ # to the default behavior and let Ray surface its own error.
430+ return
431+
432+ # Build a map of required vs available resources on the head node.
433+ head_resources : Dict [str , float ] = head_node .get ("Resources" , {})
434+ # Map: resource name -> (required, available).
435+ insufficient : Dict [str , Tuple [float , float ]] = {}
436+ for name , req in required_resources .items ():
437+ avail = head_resources .get (name , 0.0 )
438+ if avail < req :
439+ insufficient [name ] = (req , avail )
440+
441+ # If nothing is below the required amount, we are good to proceed.
442+ if not insufficient :
443+ return
444+
445+ details = "; " .join (
446+ f"{ name } required { req :g} but head has { avail :g} "
447+ for name , (req , avail ) in insufficient .items ()
448+ )
449+
450+ raise ValueError (
451+ f"{ op_name } must run on the head node (e.g., for local:// paths), "
452+ f"but the head node doesn't have enough resources: { details } . "
453+ "Add resources to the head node, switch to a shared filesystem instead "
454+ "of local://, or set the resource requests on this operation to 0 "
455+ "(for example, num_cpus=0) so it can run without head resources."
456+ )
457+
458+
371459def _truncated_repr (obj : Any ) -> str :
372460 """Utility to return a truncated object representation for error messages."""
373461 msg = str (obj )
0 commit comments