Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion ci/lint/pydoclint-baseline.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2281,7 +2281,6 @@ python/ray/util/client/worker.py
--------------------
python/ray/util/collective/collective.py
DOC101: Function `init_collective_group`: Docstring contains fewer arguments than in function signature.
DOC107: Function `init_collective_group`: The option `--arg-type-hints-in-signature` is `True` but not all args in the signature have type hints
DOC103: Function `init_collective_group`: Docstring arguments are different from function arguments. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Arguments in the function signature but not in the docstring: [gloo_timeout: int].
DOC202: Function `init_collective_group` has a return section in docstring, but there are no return statements or annotations
DOC101: Function `create_collective_group`: Docstring contains fewer arguments than in function signature.
Expand Down
6 changes: 6 additions & 0 deletions python/ray/util/collective/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
from ray.util.collective.backend_registry import (
get_backend_registry,
register_collective_backend,
)
from ray.util.collective.collective import (
allgather,
allgather_multigpu,
Expand Down Expand Up @@ -50,4 +54,6 @@
"recv",
"recv_multigpu",
"get_group_handle",
"get_backend_registry",
"register_collective_backend",
]
48 changes: 48 additions & 0 deletions python/ray/util/collective/backend_registry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from typing import Dict, Type

from .collective_group.base_collective_group import BaseGroup


class BackendRegistry:
_instance = None
_map: Dict[str, Type[BaseGroup]]

def __new__(cls):
if cls._instance is None:
cls._instance = super(BackendRegistry, cls).__new__(cls)
cls._instance._map = {}
return cls._instance

def put(self, name: str, group_cls: Type[BaseGroup]) -> None:
if not issubclass(group_cls, BaseGroup):
raise TypeError(f"{group_cls} is not a subclass of BaseGroup")
if name.upper() in self._map:
raise ValueError(f"Backend {name} already registered")
self._map[name.upper()] = group_cls

def get(self, name: str) -> Type[BaseGroup]:
name = name.upper()
if name not in self._map:
raise ValueError(f"Backend {name} not registered")
return self._map[name]

def check(self, name: str) -> bool:
try:
cls = self.get(name)
return cls.check_backend_availability()
except (ValueError, AttributeError):
return False

def list_backends(self) -> list:
return list(self._map.keys())


_global_registry = BackendRegistry()


def register_collective_backend(name: str, group_cls: Type[BaseGroup]) -> None:
_global_registry.put(name, group_cls)


def get_backend_registry() -> BackendRegistry:
return _global_registry
62 changes: 31 additions & 31 deletions python/ray/util/collective/collective.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
import ray.experimental.internal_kv as _internal_kv
from . import types
from ray._common.network_utils import find_free_port, is_ipv6
from ray.util.collective.backend_registry import (
get_backend_registry,
register_collective_backend,
)
from ray.util.collective.collective_group.torch_gloo_collective_group import (
get_master_address_metadata_key as _get_master_addr_key,
)
Expand All @@ -38,6 +42,11 @@
except ImportError:
_TORCH_DISTRIBUTED_AVAILABLE = False

if _NCCL_AVAILABLE:
register_collective_backend("NCCL", NCCLGroup)
if _TORCH_DISTRIBUTED_AVAILABLE:
register_collective_backend("GLOO", TorchGLOOGroup)
Comment on lines +45 to +48
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The conditional registration of backends is based on _NCCL_AVAILABLE and _TORCH_DISTRIBUTED_AVAILABLE flags. Due to the changes in this PR (guarded imports within group classes), these flags are no longer reliable here and will likely always be True. The registration should be unconditional, as the actual availability of the backend is checked dynamically when a collective group is created.

Suggested change
if _NCCL_AVAILABLE:
register_collective_backend("NCCL", NCCLGroup)
if _TORCH_DISTRIBUTED_AVAILABLE:
register_collective_backend("GLOO", TorchGLOOGroup)
register_collective_backend("NCCL", NCCLGroup)
register_collective_backend("GLOO", TorchGLOOGroup)



def nccl_available():
global _LOG_NCCL_WARNING
Expand All @@ -57,10 +66,6 @@ def gloo_available():
return _TORCH_DISTRIBUTED_AVAILABLE


def torch_distributed_available():
return _TORCH_DISTRIBUTED_AVAILABLE


def get_address_and_port() -> Tuple[str, int]:
"""Returns the IP address and a free port on this node."""
addr = ray.util.get_node_ip_address()
Expand All @@ -78,18 +83,25 @@ class GroupManager(object):

def __init__(self):
self._name_group_map = {}
self._registry = get_backend_registry()

def create_collective_group(
self, backend, world_size, rank, group_name, gloo_timeout
self, backend, world_size, rank, group_name, gloo_timeout=None
):
"""The entry to create new collective groups in the manager.

Put the registration and the group information into the manager
metadata as well.
"""
backend = types.Backend(backend)
if backend == types.Backend.GLOO:
# Rendezvous: ensure a MASTER_ADDR:MASTER_PORT is published in internal_kv.
backend = backend.upper()
backend_cls = self._registry.get(backend)

if not backend_cls.check_backend_availability():
raise RuntimeError(
f"Backend {backend} is not available. Please check the installation."
)

if backend == "GLOO":
metadata_key = _get_master_addr_key(group_name)
if rank == 0:
addr, port = get_address_and_port()
Expand All @@ -112,13 +124,9 @@ def create_collective_group(
logger.debug(
"Creating torch.distributed GLOO group: '{}'...".format(group_name)
)
g = TorchGLOOGroup(world_size, rank, group_name, gloo_timeout)
elif backend == types.Backend.NCCL:
_check_backend_availability(backend)
logger.debug("Creating NCCL group: '{}'...".format(group_name))
g = NCCLGroup(world_size, rank, group_name)
g = backend_cls(world_size, rank, group_name, gloo_timeout)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hardcoded GLOO check causes crashes with custom backends

Medium Severity

The hardcoded string check if backend == "GLOO": at line 104 assumes any backend named "GLOO" follows TorchGLOOGroup's signature and needs rendezvous setup. If torch.distributed is unavailable, the built-in GLOO backend isn't registered, allowing users to register a custom backend named "GLOO". When such a backend is used, the code executes GLOO-specific rendezvous logic and passes gloo_timeout as a fourth constructor parameter, which will crash if the custom backend doesn't accept that parameter.

Fix in Cursor Fix in Web

else:
raise RuntimeError(f"Unexpected backend: {backend}")
g = backend_cls(world_size, rank, group_name)

self._name_group_map[group_name] = g
return self._name_group_map[group_name]
Expand Down Expand Up @@ -171,7 +179,7 @@ def is_group_initialized(group_name):
def init_collective_group(
world_size: int,
rank: int,
backend=types.Backend.NCCL,
backend: str = "NCCL",
group_name: str = "default",
gloo_timeout: int = 30000,
):
Expand All @@ -187,11 +195,13 @@ def init_collective_group(
None
"""
_check_inside_actor()
backend = types.Backend(backend)
_check_backend_availability(backend)

global _group_mgr
global _group_mgr_lock

backend_cls = _group_mgr._registry.get(backend)
if not backend_cls.check_backend_availability():
raise RuntimeError("Backend '{}' is not available.".format(backend))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Redundant backend availability checks

Low Severity

Backend availability is checked three times for the same operation. Both init_collective_group and create_collective_group check backend_cls.check_backend_availability() before calling _group_mgr.create_collective_group(), which performs the same check again. The checks in the public functions are redundant and add unnecessary overhead.

Additional Locations (2)

Fix in Cursor Fix in Web

# TODO(Hao): implement a group auto-counter.
if not group_name:
raise ValueError("group_name '{}' needs to be a string.".format(group_name))
Expand All @@ -212,7 +222,7 @@ def create_collective_group(
actors,
world_size: int,
ranks: List[int],
backend=types.Backend.NCCL,
backend: str = "NCCL",
group_name: str = "default",
gloo_timeout: int = 30000,
):
Expand All @@ -230,8 +240,9 @@ def create_collective_group(
Returns:
None
"""
backend = types.Backend(backend)
_check_backend_availability(backend)
backend_cls = _group_mgr._registry.get(backend)
if not backend_cls.check_backend_availability():
raise RuntimeError("Backend '{}' is not available.".format(backend))

name = "info_" + group_name
try:
Expand Down Expand Up @@ -805,17 +816,6 @@ def _check_single_tensor_input(tensor):
)


def _check_backend_availability(backend: types.Backend):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Type mismatch in gloo_timeout environment variable

High Severity

The os.getenv("collective_gloo_timeout", 30000) call returns a string when the environment variable is set, but an integer is expected. This causes a TypeError when the code attempts division at line 112 (gloo_timeout / 1000.0). The value needs to be converted to an integer using int(os.getenv(...)).

Fix in Cursor Fix in Web

"""Check whether the backend is available."""
if backend == types.Backend.GLOO:
# Now we have deprecated pygloo, and use torch_gloo in all cases.
if not torch_distributed_available():
raise RuntimeError("torch.distributed is not available.")
elif backend == types.Backend.NCCL:
if not nccl_available():
raise RuntimeError("NCCL is not available.")


def _check_inside_actor():
"""Check if currently it is inside a Ray actor/task."""
worker = ray._private.worker.global_worker
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ def backend(cls):
"""The backend of this collective group."""
raise NotImplementedError()

@classmethod
@abstractmethod
def check_backend_availability(cls) -> bool:
"""Check if the backend is available."""
raise NotImplementedError()

@abstractmethod
def allreduce(self, tensor, allreduce_options=AllReduceOptions()):
raise NotImplementedError()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@
import logging
import time

import cupy
import torch

import ray
from ray.util.collective.collective_group import nccl_util
from ray.util.collective.collective_group.base_collective_group import BaseGroup
Expand All @@ -13,7 +10,6 @@
from ray.util.collective.types import (
AllGatherOptions,
AllReduceOptions,
Backend,
BarrierOptions,
BroadcastOptions,
RecvOptions,
Expand All @@ -25,6 +21,18 @@

logger = logging.getLogger(__name__)

global _LOG_NCCL_WARNING, _NCCL_AVAILABLE
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary global statement at module level

Low Severity

The global _LOG_NCCL_WARNING, _NCCL_AVAILABLE statement appears at module scope where it has no effect. The global keyword is only meaningful inside functions to indicate that a variable refers to a module-level name. At module level, variables are already in the global scope, making this statement a no-op that may confuse developers.

Fix in Cursor Fix in Web


try:
import cupy
import torch

_NCCL_AVAILABLE = True
_LOG_NCCL_WARNING = False
except ImportError:
_NCCL_AVAILABLE = False
_LOG_NCCL_WARNING = True


class Rendezvous:
"""A rendezvous class for different actor/task processes to meet.
Expand Down Expand Up @@ -163,7 +171,19 @@ def destroy_group(self):

@classmethod
def backend(cls):
return Backend.NCCL
return "NCCL"

@classmethod
def check_backend_availability(cls) -> bool:
global _LOG_NCCL_WARNING, _NCCL_AVAILABLE
if ray.get_gpu_ids() and _LOG_NCCL_WARNING:
logger.warning(
"NCCL seems unavailable. Please install Cupy "
"following the guide at: "
"https://docs.cupy.dev/en/stable/install.html."
)
_LOG_NCCL_WARNING = False
return _NCCL_AVAILABLE

def allreduce(self, tensors, allreduce_options=AllReduceOptions()):
"""AllReduce tensors across the collective group following options.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,12 @@

import numpy as np
import torch
import torch.distributed as dist

import ray.experimental.internal_kv as internal_kv
from ray.util.collective.collective_group.base_collective_group import BaseGroup
from ray.util.collective.types import (
AllGatherOptions,
AllReduceOptions,
Backend,
BarrierOptions,
BroadcastOptions,
RecvOptions,
Expand All @@ -23,13 +21,19 @@
if TYPE_CHECKING:
import torch

try:
import torch.distributed as dist

TORCH_REDUCE_OP_MAP = {
ReduceOp.SUM: dist.ReduceOp.SUM,
ReduceOp.PRODUCT: dist.ReduceOp.PRODUCT,
ReduceOp.MIN: dist.ReduceOp.MIN,
ReduceOp.MAX: dist.ReduceOp.MAX,
}
_TORCH_DISTRIBUTED_AVAILABLE = True
TORCH_REDUCE_OP_MAP = {
ReduceOp.SUM: dist.ReduceOp.SUM,
ReduceOp.PRODUCT: dist.ReduceOp.PRODUCT,
ReduceOp.MIN: dist.ReduceOp.MIN,
ReduceOp.MAX: dist.ReduceOp.MAX,
}
except ImportError:
_TORCH_DISTRIBUTED_AVAILABLE = False
TORCH_REDUCE_OP_MAP = None


def get_master_address_metadata_key(group_name: str):
Expand Down Expand Up @@ -106,7 +110,11 @@ def destroy_group(self):
@classmethod
def backend(cls):
"""The backend of this collective group."""
return Backend.GLOO
return "GLOO"

@classmethod
def check_backend_availability(cls) -> bool:
return _TORCH_DISTRIBUTED_AVAILABLE

def _check_tensor_input(self, tensor: List["torch.Tensor"]) -> "torch.Tensor":
"""ray.util.collective wraps tensor arguments in a list.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import torch

import ray
from ray.util.collective import (
allreduce,
create_collective_group,
init_collective_group,
)
from ray.util.collective.backend_registry import get_backend_registry
from ray.util.collective.types import ReduceOp


def test_gloo_via_registry():
ray.init()

registry = get_backend_registry()
assert "GLOO" in registry.list_backends()
assert registry.check("GLOO")

@ray.remote
class Worker:
def __init__(self, rank):
self.rank = rank
self.tensor = None

def setup(self, world_size):
init_collective_group(
world_size=world_size,
rank=self.rank,
backend="GLOO",
group_name="default",
gloo_timeout=30000,
)

def compute(self):
self.tensor = torch.tensor([self.rank + 1], dtype=torch.float32)
allreduce(self.tensor, op=ReduceOp.SUM)
return self.tensor.item()

actors = [Worker.remote(rank=i) for i in range(2)]
create_collective_group(
actors=actors,
world_size=2,
ranks=[0, 1],
backend="GLOO",
group_name="default",
gloo_timeout=30000,
)

ray.get([a.setup.remote(2) for a in actors])
results = ray.get([a.compute.remote() for a in actors])

assert results == [3.0, 3.0], f"Expected [3.0, 3.0], got {results}"

ray.shutdown()
Loading