1
+ import torch .distributed as dist
2
+ from unittest import TestCase
3
+ from torchft .torchft import Lighthouse
4
+ from torchft import Manager , ProcessGroupGloo
5
+ import time
6
+
7
+ class TestLighthouse (TestCase ):
8
+ def test_join_timeout_behavior (self ) -> None :
9
+ """Test that join_timeout_ms affects joining behavior"""
10
+ # To test, we create a lighthouse with 100ms and 400ms join timeouts
11
+ # and measure the time taken to validate the quorum.
12
+ lighthouse = Lighthouse (
13
+ bind = "[::]:0" ,
14
+ min_replicas = 1 ,
15
+ join_timeout_ms = 100 ,
16
+ )
17
+
18
+ # Create a manager that tries to join
19
+ try :
20
+ store = dist .TCPStore (
21
+ host_name = "localhost" ,
22
+ port = 0 ,
23
+ is_master = True ,
24
+ wait_for_workers = False ,
25
+ )
26
+ pg = ProcessGroupGloo ()
27
+ manager = Manager (
28
+ pg = pg ,
29
+ min_replica_size = 1 ,
30
+ load_state_dict = lambda x : None ,
31
+ state_dict = lambda : None ,
32
+ replica_id = f"lighthouse_test" ,
33
+ store_addr = "localhost" ,
34
+ store_port = store .port ,
35
+ rank = 0 ,
36
+ world_size = 1 ,
37
+ use_async_quorum = False ,
38
+ lighthouse_addr = lighthouse .address (),
39
+ )
40
+
41
+ start_time = time .time ()
42
+ manager .start_quorum ()
43
+ time_taken = time .time () - start_time
44
+ assert time_taken < 0.4 , f"Time taken to join: { time_taken } > 0.4s"
45
+
46
+ finally :
47
+ # Cleanup
48
+ lighthouse .shutdown ()
49
+ if 'manager' in locals ():
50
+ manager .shutdown ()
51
+
52
+ lighthouse = Lighthouse (
53
+ bind = "[::]:0" ,
54
+ min_replicas = 1 ,
55
+ join_timeout_ms = 400 ,
56
+ )
57
+
58
+ # Create a manager that tries to join
59
+ try :
60
+ store = dist .TCPStore (
61
+ host_name = "localhost" ,
62
+ port = 0 ,
63
+ is_master = True ,
64
+ wait_for_workers = False ,
65
+ )
66
+ pg = ProcessGroupGloo ()
67
+ manager = Manager (
68
+ pg = pg ,
69
+ min_replica_size = 1 ,
70
+ load_state_dict = lambda x : None ,
71
+ state_dict = lambda : None ,
72
+ replica_id = f"lighthouse_test" ,
73
+ store_addr = "localhost" ,
74
+ store_port = store .port ,
75
+ rank = 0 ,
76
+ world_size = 1 ,
77
+ use_async_quorum = False ,
78
+ lighthouse_addr = lighthouse .address (),
79
+ )
80
+
81
+ start_time = time .time ()
82
+ manager .start_quorum ()
83
+ time_taken = time .time () - start_time
84
+ assert time_taken > 0.4 , f"Time taken to join: { time_taken } < 0.4s"
85
+
86
+ finally :
87
+ # Cleanup
88
+ lighthouse .shutdown ()
89
+ if 'manager' in locals ():
90
+ manager .shutdown ()
91
+
0 commit comments