2020DP_SIZE = int (os .getenv ("DP_SIZE" , "2" ))
2121# Default tensor parallel size to use
2222TP_SIZE = int (os .getenv ("TP_SIZE" , "1" ))
23+ # Make sure CCL worker count is set for data parallelism
24+ os .environ ["CCL_WORKER_COUNT" ] = str (DP_SIZE )
25+
26+ import socket
27+
28+
29+ def is_port_available (port : int , host : str = "127.0.0.1" ) -> bool :
30+ # Try to bind to the port to check if it's available. This is more reliable
31+ # than trying to connect.
32+ with socket .socket (socket .AF_INET , socket .SOCK_STREAM ) as s :
33+ try :
34+ s .bind ((host , port ))
35+ return True
36+ except OSError :
37+ return False
38+
39+
40+ def get_unique_port (start_port = 8000 ):
41+ """Find an available port"""
42+ port = start_port
43+ while not is_port_available (port ):
44+ port += 1 # Increment until an available port is found
45+ if port > start_port + 100 : # Limit the search range
46+ raise RuntimeError ("No available ports" )
47+ return port
2348
2449
2550class ExternalLBServerManager :
@@ -44,6 +69,14 @@ def __init__(
4469
4570 def __enter__ (self ) -> list [tuple [RemoteOpenAIServer , list [str ]]]:
4671 """Start all server instances for external LB mode."""
72+
73+ allocated_ports = []
74+ last_port = 7999
75+ for _ in range (self .dp_size ):
76+ port = get_unique_port (start_port = last_port + 1 )
77+ allocated_ports .append (port )
78+ last_port = port
79+
4780 for rank in range (self .dp_size ):
4881 # Create server args for this specific rank
4982 server_args = self .base_server_args .copy ()
@@ -60,7 +93,7 @@ def __enter__(self) -> list[tuple[RemoteOpenAIServer, list[str]]]:
6093 "--tensor-parallel-size" ,
6194 str (self .tp_size ),
6295 "--port" ,
63- str (8000 + rank ), # Different port for each rank
96+ str (allocated_ports [ rank ] ), # Different port for each rank
6497 "--api-server-count" ,
6598 str (self .api_server_count ),
6699 ]
0 commit comments