Skip to content

Commit 73b54e1

Browse files
Merge pull request #190 from hexinw-nvidia/no_libuv
Use non-libuv for ft_rendezvous
2 parents 24690e4 + 9addb5b commit 73b54e1

File tree

2 files changed

+142
-0
lines changed

2 files changed

+142
-0
lines changed
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2+
# SPDX-License-Identifier: Apache-2.0
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
"""
17+
Monkey patch for PyTorch's c10d_rendezvous_backend to add use_libuv support.
18+
19+
This patch modifies the _create_tcp_store function to accept and use the use_libuv
20+
parameter from RendezvousParameters, allowing users to control whether to use
21+
the libuv backend or the traditional socket backend for TCPStore.
22+
23+
Usage:
24+
from nvidia_resiliency_ext.fault_tolerance.c10d_monkey_patch import apply_c10d_patch
25+
apply_c10d_patch()
26+
"""
27+
28+
import logging
29+
30+
from nvidia_resiliency_ext.shared_utils.log_manager import LogConfig
31+
32+
logger = logging.getLogger(LogConfig.name)
33+
34+
35+
def _patched_create_tcp_store(params: "RendezvousParameters") -> "TCPStore": # noqa: F821
36+
"""
37+
Patched version of _create_tcp_store that supports use_libuv parameter.
38+
39+
This function is identical to the original _create_tcp_store except it
40+
extracts and uses the use_libuv parameter from RendezvousParameters.
41+
"""
42+
import os
43+
from datetime import timedelta
44+
from typing import cast
45+
46+
from torch.distributed import TCPStore
47+
from torch.distributed.elastic.events import NodeState, construct_and_record_rdzv_event
48+
from torch.distributed.elastic.rendezvous.api import RendezvousConnectionError
49+
from torch.distributed.elastic.rendezvous.c10d_rendezvous_backend import (
50+
_matches_machine_hostname,
51+
parse_rendezvous_endpoint,
52+
)
53+
54+
# Default port for TCP store (29400) - defined locally for PyTorch 2.3.1 compatibility
55+
DEFAULT_PORT = 29400
56+
host, port = parse_rendezvous_endpoint(params.endpoint, default_port=DEFAULT_PORT)
57+
58+
cfg_is_host = params.get_as_bool("is_host")
59+
# If the user has explicitly specified whether our process should host the
60+
# the store, respect it.
61+
if cfg_is_host is not None:
62+
is_host = cfg_is_host
63+
# Otherwise try to determine whether we are the host based on our hostname
64+
# and IP address.
65+
else:
66+
is_host = _matches_machine_hostname(host)
67+
68+
# The timeout
69+
read_timeout = cast(int, params.get_as_int("read_timeout", 60))
70+
if read_timeout <= 0:
71+
raise ValueError("The read timeout must be a positive integer.")
72+
73+
# The use_libuv parameter - NEW FUNCTIONALITY
74+
use_libuv = params.get_as_bool("use_libuv", True)
75+
76+
# In specific cases we attempt to instantiate the store twice. For details
77+
# see the explanation in the except clause below.
78+
for is_server in [is_host, False]:
79+
try:
80+
store = TCPStore(
81+
host,
82+
port,
83+
is_master=is_server,
84+
multi_tenant=True,
85+
timeout=timedelta(seconds=read_timeout),
86+
use_libuv=use_libuv, # NEW PARAMETER
87+
)
88+
89+
if is_server:
90+
msg = f"Process {os.getpid()} hosts the TCP store for the C10d rendezvous backend."
91+
construct_and_record_rdzv_event(
92+
run_id=params.run_id, message=msg, node_state=NodeState.INIT
93+
)
94+
logger.info(msg)
95+
96+
break
97+
except (ValueError, RuntimeError, TimeoutError) as exc:
98+
# If we heuristically inferred the value of is_host as True and our
99+
# first attempt to instantiate the TCP store has failed, try it one
100+
# more time with is_host set to False. As an edge case there can be
101+
# more than one process that is part of the same rendezvous on this
102+
# machine and only one of them will eventually host the store.
103+
104+
if not is_server or cfg_is_host is not None:
105+
raise RendezvousConnectionError(
106+
"The connection to the C10d store has failed. See inner exception for details."
107+
) from exc
108+
109+
return store # type: ignore[possibly-undefined]
110+
111+
112+
def apply_c10d_patch():
113+
"""
114+
Apply the monkey patch to add use_libuv support to c10d_rendezvous_backend.
115+
116+
This function patches the _create_tcp_store function in the c10d_rendezvous_backend
117+
module to support the use_libuv parameter.
118+
"""
119+
try:
120+
from torch.distributed.elastic.rendezvous import c10d_rendezvous_backend
121+
122+
# Apply the patch
123+
c10d_rendezvous_backend._create_tcp_store = _patched_create_tcp_store
124+
125+
logger.info(
126+
"Successfully applied c10d_rendezvous_backend monkey patch for use_libuv support"
127+
)
128+
129+
except ImportError as e:
130+
logger.error(f"Failed to import c10d_rendezvous_backend: {e}")
131+
raise
132+
except Exception as e:
133+
logger.error(f"Failed to apply c10d monkey patch: {e}")
134+
raise

src/nvidia_resiliency_ext/fault_tolerance/launcher.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,10 @@ def _register_ft_rdzv_handler():
101101
from torch.distributed.elastic.rendezvous.c10d_rendezvous_backend import create_backend
102102

103103
from ._ft_rendezvous import FtRendezvousHandler, create_handler
104+
from .c10d_monkey_patch import apply_c10d_patch
105+
106+
# Apply monkey patch to add use_libuv support to c10d backend
107+
apply_c10d_patch()
104108

105109
def _create_ft_rdzv_handler(params: RendezvousParameters) -> FtRendezvousHandler:
106110
backend, store = create_backend(params)
@@ -1987,6 +1991,10 @@ def config_from_args(args) -> Tuple[LaunchConfig, Union[Callable, str], List[str
19871991

19881992
rdzv_configs = _parse_rendezvous_config(args.rdzv_conf)
19891993

1994+
# Add use_libuv=False for c10d backend
1995+
if args.rdzv_backend == 'c10d':
1996+
rdzv_configs['use_libuv'] = False
1997+
19901998
if args.rdzv_backend == "static":
19911999
rdzv_configs["rank"] = args.node_rank
19922000

0 commit comments

Comments
 (0)