[Core] Add register_collective_backend API for customized collective backends#60701
[Core] Add register_collective_backend API for customized collective backends#60701Evelynn-V wants to merge 5 commits intoray-project:masterfrom
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces a valuable refactoring to allow custom collective communication backends through a dynamic registry. The core concept is well-implemented and improves extensibility. However, I've identified a few issues that should be addressed. There is a critical bug that prevents the use of custom backends due to legacy code paths. Additionally, some backend-specific logic remains hardcoded, which undermines the goal of a truly generic system. I've also provided suggestions to improve code clarity, remove redundancy, and fix a minor issue in the singleton implementation. Addressing these points will make the new backend registry more robust and fully realize the goals of this PR.
| @@ -188,10 +196,15 @@ def init_collective_group( | |||
| """ | |||
| _check_inside_actor() | |||
| backend = types.Backend(backend) | |||
There was a problem hiding this comment.
Using types.Backend(backend) will raise a ValueError for any custom backend not explicitly defined in types.Backend (i.e., not "NCCL" or "GLOO"). This breaks the core functionality of this PR, which is to allow custom backends. This line should be removed, as the backend argument is already a string.
There was a problem hiding this comment.
This looks like a valid issue. It might be better to remove the types.Backend(backend) call and accept the backend as a string to ensure custom backends aren't blocked by the enum validation.
| @@ -231,7 +244,11 @@ def create_collective_group( | |||
| None | |||
| """ | |||
| backend = types.Backend(backend) | |||
There was a problem hiding this comment.
Using types.Backend(backend) will raise a ValueError for any custom backend not explicitly defined in types.Backend (i.e., not "NCCL" or "GLOO"). This breaks the core functionality of this PR, which is to allow custom backends. This line should be removed, as the backend argument is already a string.
| class BackendRegistry: | ||
| _instance = None | ||
| _map: Dict[str, Type[BaseGroup]] = {} | ||
|
|
||
| def __new__(cls): | ||
| if cls._instance is None: | ||
| cls._instance = super(BackendRegistry, cls).__new__(cls) | ||
| return cls._instance |
There was a problem hiding this comment.
The current singleton implementation with _map as a class attribute is not robust against subclassing. A subclass would share the same _map dictionary, which is likely unintended. To make this more robust, _map should be an instance attribute, initialized only once within __new__.
| class BackendRegistry: | |
| _instance = None | |
| _map: Dict[str, Type[BaseGroup]] = {} | |
| def __new__(cls): | |
| if cls._instance is None: | |
| cls._instance = super(BackendRegistry, cls).__new__(cls) | |
| return cls._instance | |
| class BackendRegistry: | |
| _instance = None | |
| _map: Dict[str, Type[BaseGroup]] | |
| def __new__(cls): | |
| if cls._instance is None: | |
| cls._instance = super(BackendRegistry, cls).__new__(cls) | |
| cls._instance._map = {} | |
| return cls._instance |
There was a problem hiding this comment.
That’s a very solid point.
| if _NCCL_AVAILABLE: | ||
| register_collective_backend("NCCL", NCCLGroup) | ||
| if _TORCH_DISTRIBUTED_AVAILABLE: | ||
| register_collective_backend("GLOO", TorchGLOOGroup) |
There was a problem hiding this comment.
The conditional registration of backends is based on _NCCL_AVAILABLE and _TORCH_DISTRIBUTED_AVAILABLE flags. Due to the changes in this PR (guarded imports within group classes), these flags are no longer reliable here and will likely always be True. The registration should be unconditional, as the actual availability of the backend is checked dynamically when a collective group is created.
| if _NCCL_AVAILABLE: | |
| register_collective_backend("NCCL", NCCLGroup) | |
| if _TORCH_DISTRIBUTED_AVAILABLE: | |
| register_collective_backend("GLOO", TorchGLOOGroup) | |
| register_collective_backend("NCCL", NCCLGroup) | |
| register_collective_backend("GLOO", TorchGLOOGroup) |
| if backend_cls is None: | ||
| raise ValueError("Backend '{}' is not supported.".format(backend)) |
There was a problem hiding this comment.
This check is redundant and can be removed.
| if backend_cls is None: | ||
| raise ValueError("Backend '{}' is not supported.".format(backend)) |
| init_collective_group( | ||
| world_size=world_size, | ||
| rank=self.rank, | ||
| backend=Backend.GLOO, |
There was a problem hiding this comment.
| actors=actors, | ||
| world_size=2, | ||
| ranks=[0, 1], | ||
| backend=Backend.GLOO, |
There was a problem hiding this comment.
python/ray/util/collective/collective_group/torch_gloo_collective_group.py
Show resolved
Hide resolved
python/ray/util/collective/collective_group/torch_gloo_collective_group.py
Show resolved
Hide resolved
| ) | ||
|
|
||
|
|
||
| def _check_backend_availability(backend: types.Backend): |
There was a problem hiding this comment.
Type mismatch in gloo_timeout environment variable
High Severity
The os.getenv("collective_gloo_timeout", 30000) call returns a string when the environment variable is set, but an integer is expected. This causes a TypeError when the code attempts division at line 112 (gloo_timeout / 1000.0). The value needs to be converted to an integer using int(os.getenv(...)).
|
|
||
| backend_cls = _group_mgr._registry.get(backend) | ||
| if not backend_cls.check_backend_availability(): | ||
| raise RuntimeError("Backend '{}' is not available.".format(backend)) |
There was a problem hiding this comment.
Redundant backend availability checks
Low Severity
Backend availability is checked three times for the same operation. Both init_collective_group and create_collective_group check backend_cls.check_backend_availability() before calling _group_mgr.create_collective_group(), which performs the same check again. The checks in the public functions are redundant and add unnecessary overhead.
Additional Locations (2)
| _check_backend_availability(backend) | ||
| logger.debug("Creating NCCL group: '{}'...".format(group_name)) | ||
| g = NCCLGroup(world_size, rank, group_name) | ||
| g = backend_cls(world_size, rank, group_name, gloo_timeout) |
There was a problem hiding this comment.
Hardcoded GLOO check causes crashes with custom backends
Medium Severity
The hardcoded string check if backend == "GLOO": at line 104 assumes any backend named "GLOO" follows TorchGLOOGroup's signature and needs rendezvous setup. If torch.distributed is unavailable, the built-in GLOO backend isn't registered, allowing users to register a custom backend named "GLOO". When such a backend is used, the code executes GLOO-specific rendezvous logic and passes gloo_timeout as a fourth constructor parameter, which will crash if the custom backend doesn't accept that parameter.
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
| global _LOG_NCCL_WARNING, _NCCL_AVAILABLE |
There was a problem hiding this comment.
Unnecessary global statement at module level
Low Severity
The global _LOG_NCCL_WARNING, _NCCL_AVAILABLE statement appears at module scope where it has no effect. The global keyword is only meaningful inside functions to indicate that a variable refers to a module-level name. At module level, variables are already in the global scope, making this statement a no-op that may confuse developers.
Signed-off-by: Evelynn-V <liwenlin0223l@gmail.com>
Signed-off-by: Evelynn-V <liwenlin0223l@gmail.com>
Signed-off-by: Evelynn-V <liwenlin0223l@gmail.com>
Signed-off-by: Evelynn-V <liwenlin0223l@gmail.com>
Signed-off-by: Evelynn-V <liwenlin0223l@gmail.com>


Description
This PR implements a dynamic registry system for Ray Collective communication backends, enabling users to register and integrate custom collective communication libraries (e.g., HCCL for Ascend NPU, XCCL for other vendors) without modifying Ray's core codebase.
Key accomplishments
ray.util.collective.backend_registry.register_collective_backend(name: str, group_cls: Type[BaseGroup])for runtime backend registrationRelated issues
Related to #60603 No.1 and No.3 to No.6
Related to PR 60693
Usage Examples
For usage instructions, please refer to
util/collective/examples/gloo_allreduce_example.pyTODO