diff --git a/ci/lint/pydoclint-baseline.txt b/ci/lint/pydoclint-baseline.txt index 683e71488691..4eb5bed0b370 100644 --- a/ci/lint/pydoclint-baseline.txt +++ b/ci/lint/pydoclint-baseline.txt @@ -2281,7 +2281,6 @@ python/ray/util/client/worker.py -------------------- python/ray/util/collective/collective.py DOC101: Function `init_collective_group`: Docstring contains fewer arguments than in function signature. - DOC107: Function `init_collective_group`: The option `--arg-type-hints-in-signature` is `True` but not all args in the signature have type hints DOC103: Function `init_collective_group`: Docstring arguments are different from function arguments. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Arguments in the function signature but not in the docstring: [gloo_timeout: int]. DOC202: Function `init_collective_group` has a return section in docstring, but there are no return statements or annotations DOC101: Function `create_collective_group`: Docstring contains fewer arguments than in function signature. diff --git a/python/ray/util/collective/__init__.py b/python/ray/util/collective/__init__.py index 09423ad37c11..68bf95031031 100644 --- a/python/ray/util/collective/__init__.py +++ b/python/ray/util/collective/__init__.py @@ -1,3 +1,7 @@ +from ray.util.collective.backend_registry import ( + get_backend_registry, + register_collective_backend, +) from ray.util.collective.collective import ( allgather, allgather_multigpu, @@ -50,4 +54,6 @@ "recv", "recv_multigpu", "get_group_handle", + "get_backend_registry", + "register_collective_backend", ] diff --git a/python/ray/util/collective/backend_registry.py b/python/ray/util/collective/backend_registry.py new file mode 100644 index 000000000000..3e11f3f1d2a6 --- /dev/null +++ b/python/ray/util/collective/backend_registry.py @@ -0,0 +1,48 @@ +from typing import Dict, Type + +from .collective_group.base_collective_group import BaseGroup + + +class BackendRegistry: + _instance = None + _map: Dict[str, Type[BaseGroup]] + + def __new__(cls): + if cls._instance is None: + cls._instance = super(BackendRegistry, cls).__new__(cls) + cls._instance._map = {} + return cls._instance + + def put(self, name: str, group_cls: Type[BaseGroup]) -> None: + if not issubclass(group_cls, BaseGroup): + raise TypeError(f"{group_cls} is not a subclass of BaseGroup") + if name.upper() in self._map: + raise ValueError(f"Backend {name} already registered") + self._map[name.upper()] = group_cls + + def get(self, name: str) -> Type[BaseGroup]: + name = name.upper() + if name not in self._map: + raise ValueError(f"Backend {name} not registered") + return self._map[name] + + def check(self, name: str) -> bool: + try: + cls = self.get(name) + return cls.check_backend_availability() + except (ValueError, AttributeError): + return False + + def list_backends(self) -> list: + return list(self._map.keys()) + + +_global_registry = BackendRegistry() + + +def register_collective_backend(name: str, group_cls: Type[BaseGroup]) -> None: + _global_registry.put(name, group_cls) + + +def get_backend_registry() -> BackendRegistry: + return _global_registry diff --git a/python/ray/util/collective/collective.py b/python/ray/util/collective/collective.py index 8803da0219eb..f6545b692773 100644 --- a/python/ray/util/collective/collective.py +++ b/python/ray/util/collective/collective.py @@ -13,6 +13,10 @@ import ray.experimental.internal_kv as _internal_kv from . import types from ray._common.network_utils import find_free_port, is_ipv6 +from ray.util.collective.backend_registry import ( + get_backend_registry, + register_collective_backend, +) from ray.util.collective.collective_group.torch_gloo_collective_group import ( get_master_address_metadata_key as _get_master_addr_key, ) @@ -38,6 +42,11 @@ except ImportError: _TORCH_DISTRIBUTED_AVAILABLE = False +if _NCCL_AVAILABLE: + register_collective_backend("NCCL", NCCLGroup) +if _TORCH_DISTRIBUTED_AVAILABLE: + register_collective_backend("GLOO", TorchGLOOGroup) + def nccl_available(): global _LOG_NCCL_WARNING @@ -57,10 +66,6 @@ def gloo_available(): return _TORCH_DISTRIBUTED_AVAILABLE -def torch_distributed_available(): - return _TORCH_DISTRIBUTED_AVAILABLE - - def get_address_and_port() -> Tuple[str, int]: """Returns the IP address and a free port on this node.""" addr = ray.util.get_node_ip_address() @@ -78,18 +83,25 @@ class GroupManager(object): def __init__(self): self._name_group_map = {} + self._registry = get_backend_registry() def create_collective_group( - self, backend, world_size, rank, group_name, gloo_timeout + self, backend, world_size, rank, group_name, gloo_timeout=None ): """The entry to create new collective groups in the manager. Put the registration and the group information into the manager metadata as well. """ - backend = types.Backend(backend) - if backend == types.Backend.GLOO: - # Rendezvous: ensure a MASTER_ADDR:MASTER_PORT is published in internal_kv. + backend = backend.upper() + backend_cls = self._registry.get(backend) + + if not backend_cls.check_backend_availability(): + raise RuntimeError( + f"Backend {backend} is not available. Please check the installation." + ) + + if backend == "GLOO": metadata_key = _get_master_addr_key(group_name) if rank == 0: addr, port = get_address_and_port() @@ -112,13 +124,9 @@ def create_collective_group( logger.debug( "Creating torch.distributed GLOO group: '{}'...".format(group_name) ) - g = TorchGLOOGroup(world_size, rank, group_name, gloo_timeout) - elif backend == types.Backend.NCCL: - _check_backend_availability(backend) - logger.debug("Creating NCCL group: '{}'...".format(group_name)) - g = NCCLGroup(world_size, rank, group_name) + g = backend_cls(world_size, rank, group_name, gloo_timeout) else: - raise RuntimeError(f"Unexpected backend: {backend}") + g = backend_cls(world_size, rank, group_name) self._name_group_map[group_name] = g return self._name_group_map[group_name] @@ -171,7 +179,7 @@ def is_group_initialized(group_name): def init_collective_group( world_size: int, rank: int, - backend=types.Backend.NCCL, + backend: str = "NCCL", group_name: str = "default", gloo_timeout: int = 30000, ): @@ -187,11 +195,13 @@ def init_collective_group( None """ _check_inside_actor() - backend = types.Backend(backend) - _check_backend_availability(backend) + global _group_mgr global _group_mgr_lock + backend_cls = _group_mgr._registry.get(backend) + if not backend_cls.check_backend_availability(): + raise RuntimeError("Backend '{}' is not available.".format(backend)) # TODO(Hao): implement a group auto-counter. if not group_name: raise ValueError("group_name '{}' needs to be a string.".format(group_name)) @@ -212,7 +222,7 @@ def create_collective_group( actors, world_size: int, ranks: List[int], - backend=types.Backend.NCCL, + backend: str = "NCCL", group_name: str = "default", gloo_timeout: int = 30000, ): @@ -230,8 +240,9 @@ def create_collective_group( Returns: None """ - backend = types.Backend(backend) - _check_backend_availability(backend) + backend_cls = _group_mgr._registry.get(backend) + if not backend_cls.check_backend_availability(): + raise RuntimeError("Backend '{}' is not available.".format(backend)) name = "info_" + group_name try: @@ -805,17 +816,6 @@ def _check_single_tensor_input(tensor): ) -def _check_backend_availability(backend: types.Backend): - """Check whether the backend is available.""" - if backend == types.Backend.GLOO: - # Now we have deprecated pygloo, and use torch_gloo in all cases. - if not torch_distributed_available(): - raise RuntimeError("torch.distributed is not available.") - elif backend == types.Backend.NCCL: - if not nccl_available(): - raise RuntimeError("NCCL is not available.") - - def _check_inside_actor(): """Check if currently it is inside a Ray actor/task.""" worker = ray._private.worker.global_worker diff --git a/python/ray/util/collective/collective_group/base_collective_group.py b/python/ray/util/collective/collective_group/base_collective_group.py index eff07fb16c67..0ce3911efbb1 100644 --- a/python/ray/util/collective/collective_group/base_collective_group.py +++ b/python/ray/util/collective/collective_group/base_collective_group.py @@ -50,6 +50,12 @@ def backend(cls): """The backend of this collective group.""" raise NotImplementedError() + @classmethod + @abstractmethod + def check_backend_availability(cls) -> bool: + """Check if the backend is available.""" + raise NotImplementedError() + @abstractmethod def allreduce(self, tensor, allreduce_options=AllReduceOptions()): raise NotImplementedError() diff --git a/python/ray/util/collective/collective_group/nccl_collective_group.py b/python/ray/util/collective/collective_group/nccl_collective_group.py index 07e3da29686a..40432c0bd2cf 100644 --- a/python/ray/util/collective/collective_group/nccl_collective_group.py +++ b/python/ray/util/collective/collective_group/nccl_collective_group.py @@ -2,9 +2,6 @@ import logging import time -import cupy -import torch - import ray from ray.util.collective.collective_group import nccl_util from ray.util.collective.collective_group.base_collective_group import BaseGroup @@ -13,7 +10,6 @@ from ray.util.collective.types import ( AllGatherOptions, AllReduceOptions, - Backend, BarrierOptions, BroadcastOptions, RecvOptions, @@ -25,6 +21,18 @@ logger = logging.getLogger(__name__) +global _LOG_NCCL_WARNING, _NCCL_AVAILABLE + +try: + import cupy + import torch + + _NCCL_AVAILABLE = True + _LOG_NCCL_WARNING = False +except ImportError: + _NCCL_AVAILABLE = False + _LOG_NCCL_WARNING = True + class Rendezvous: """A rendezvous class for different actor/task processes to meet. @@ -163,7 +171,19 @@ def destroy_group(self): @classmethod def backend(cls): - return Backend.NCCL + return "NCCL" + + @classmethod + def check_backend_availability(cls) -> bool: + global _LOG_NCCL_WARNING, _NCCL_AVAILABLE + if ray.get_gpu_ids() and _LOG_NCCL_WARNING: + logger.warning( + "NCCL seems unavailable. Please install Cupy " + "following the guide at: " + "https://docs.cupy.dev/en/stable/install.html." + ) + _LOG_NCCL_WARNING = False + return _NCCL_AVAILABLE def allreduce(self, tensors, allreduce_options=AllReduceOptions()): """AllReduce tensors across the collective group following options. diff --git a/python/ray/util/collective/collective_group/torch_gloo_collective_group.py b/python/ray/util/collective/collective_group/torch_gloo_collective_group.py index cf06728739c3..2b337fdf007b 100644 --- a/python/ray/util/collective/collective_group/torch_gloo_collective_group.py +++ b/python/ray/util/collective/collective_group/torch_gloo_collective_group.py @@ -3,14 +3,12 @@ import numpy as np import torch -import torch.distributed as dist import ray.experimental.internal_kv as internal_kv from ray.util.collective.collective_group.base_collective_group import BaseGroup from ray.util.collective.types import ( AllGatherOptions, AllReduceOptions, - Backend, BarrierOptions, BroadcastOptions, RecvOptions, @@ -23,13 +21,19 @@ if TYPE_CHECKING: import torch +try: + import torch.distributed as dist -TORCH_REDUCE_OP_MAP = { - ReduceOp.SUM: dist.ReduceOp.SUM, - ReduceOp.PRODUCT: dist.ReduceOp.PRODUCT, - ReduceOp.MIN: dist.ReduceOp.MIN, - ReduceOp.MAX: dist.ReduceOp.MAX, -} + _TORCH_DISTRIBUTED_AVAILABLE = True + TORCH_REDUCE_OP_MAP = { + ReduceOp.SUM: dist.ReduceOp.SUM, + ReduceOp.PRODUCT: dist.ReduceOp.PRODUCT, + ReduceOp.MIN: dist.ReduceOp.MIN, + ReduceOp.MAX: dist.ReduceOp.MAX, + } +except ImportError: + _TORCH_DISTRIBUTED_AVAILABLE = False + TORCH_REDUCE_OP_MAP = None def get_master_address_metadata_key(group_name: str): @@ -106,7 +110,11 @@ def destroy_group(self): @classmethod def backend(cls): """The backend of this collective group.""" - return Backend.GLOO + return "GLOO" + + @classmethod + def check_backend_availability(cls) -> bool: + return _TORCH_DISTRIBUTED_AVAILABLE def _check_tensor_input(self, tensor: List["torch.Tensor"]) -> "torch.Tensor": """ray.util.collective wraps tensor arguments in a list. diff --git a/python/ray/util/collective/examples/gloo_allreduce_register_example.py b/python/ray/util/collective/examples/gloo_allreduce_register_example.py new file mode 100644 index 000000000000..f019ebcbd8de --- /dev/null +++ b/python/ray/util/collective/examples/gloo_allreduce_register_example.py @@ -0,0 +1,55 @@ +import torch + +import ray +from ray.util.collective import ( + allreduce, + create_collective_group, + init_collective_group, +) +from ray.util.collective.backend_registry import get_backend_registry +from ray.util.collective.types import ReduceOp + + +def test_gloo_via_registry(): + ray.init() + + registry = get_backend_registry() + assert "GLOO" in registry.list_backends() + assert registry.check("GLOO") + + @ray.remote + class Worker: + def __init__(self, rank): + self.rank = rank + self.tensor = None + + def setup(self, world_size): + init_collective_group( + world_size=world_size, + rank=self.rank, + backend="GLOO", + group_name="default", + gloo_timeout=30000, + ) + + def compute(self): + self.tensor = torch.tensor([self.rank + 1], dtype=torch.float32) + allreduce(self.tensor, op=ReduceOp.SUM) + return self.tensor.item() + + actors = [Worker.remote(rank=i) for i in range(2)] + create_collective_group( + actors=actors, + world_size=2, + ranks=[0, 1], + backend="GLOO", + group_name="default", + gloo_timeout=30000, + ) + + ray.get([a.setup.remote(2) for a in actors]) + results = ray.get([a.compute.remote() for a in actors]) + + assert results == [3.0, 3.0], f"Expected [3.0, 3.0], got {results}" + + ray.shutdown() diff --git a/python/ray/util/collective/examples/nccl_allreduce_register_example.py b/python/ray/util/collective/examples/nccl_allreduce_register_example.py new file mode 100644 index 000000000000..514799b88ddb --- /dev/null +++ b/python/ray/util/collective/examples/nccl_allreduce_register_example.py @@ -0,0 +1,54 @@ +import torch + +import ray +from ray.util.collective import ( + allreduce, + create_collective_group, + init_collective_group, +) +from ray.util.collective.backend_registry import get_backend_registry +from ray.util.collective.types import ReduceOp + + +def test_nccl_via_registry(): + ray.init(num_gpus=8) + + registry = get_backend_registry() + assert "NCCL" in registry.list_backends() + assert registry.check("NCCL") + + @ray.remote(num_gpus=1) + class Worker: + def __init__(self, rank): + self.rank = rank + self.tensor = None + + def setup(self, world_size): + init_collective_group( + world_size=world_size, + rank=self.rank, + backend="NCCL", + group_name="default", + ) + + def compute(self): + device = torch.cuda.current_device() + self.tensor = torch.tensor([float(self.rank + 1)], device=device) + allreduce(self.tensor, op=ReduceOp.SUM, group_name="default") + return self.tensor.cpu().item() + + actors = [Worker.remote(rank=i) for i in range(2)] + create_collective_group( + actors=actors, + world_size=2, + ranks=[0, 1], + backend="NCCL", + group_name="default", + ) + + ray.get([a.setup.remote(2) for a in actors]) + results = ray.get([a.compute.remote() for a in actors]) + + assert results == [3.0, 3.0], f"Expected [3.0, 3.0], got {results}" + + ray.shutdown()