44
55from lattica import Lattica
66
7+ from backend .server .constants import NODE_STATUS_AVAILABLE , NODE_STATUS_WAITING
78from backend .server .rpc_connection_handler import RPCConnectionHandler
8- from backend .server .static_config import get_model_info
9+ from backend .server .static_config import get_model_info , get_node_join_command
910from parallax_utils .logging_config import get_logger
1011from scheduling .node import RequestSignal
1112from scheduling .scheduler import Scheduler
1415
1516
1617class SchedulerManage :
17- """Coordinates the in-process scheduler and the P2P RPC layer.
18+ """
19+ Coordinates the in-process scheduler and the P2P RPC layer.
1820
1921 This manager owns the `Scheduler` instance and the Lattica P2P node,
2022 wiring RPC calls from workers to scheduler events.
@@ -35,28 +37,81 @@ def __init__(
3537 self .host_maddrs = host_maddrs
3638 self .announce_maddrs = announce_maddrs
3739
40+ self .model_name = None
41+ self .init_nodes_num = None
3842 self .scheduler = None
3943 self .node_id = f"{ dht_prefix } _announce"
4044 self .lattica = None
4145 self .stubs = {}
46+ self .is_local_network = False
4247
43- def run (self , model_name , init_nodes_num ):
44- """Start the scheduler and the P2P service for RPC handling."""
48+ def run (self , model_name , init_nodes_num , is_local_network = False ):
49+ """
50+ Start the scheduler and the P2P service for RPC handling.
51+ """
4552 logger .info (
4653 f"SchedulerManage starting: model_name={ model_name } , init_nodes_num={ init_nodes_num } "
4754 )
55+ self .is_local_network = is_local_network
4856 self ._start_scheduler (model_name , init_nodes_num )
4957 self ._start_lattica ()
5058
59+ def is_running (self ):
60+ """
61+ Returns True if the scheduler is running, False otherwise.
62+ """
63+ return self .scheduler is not None
64+
65+ def get_model_name (self ):
66+ return self .model_name
67+
68+ def get_init_nodes_num (self ):
69+ return self .init_nodes_num
70+
71+ def get_is_local_network (self ):
72+ return self .is_local_network
73+
74+ def get_cluster_status (self ):
75+ return {
76+ "type" : "cluster_status" ,
77+ "data" : {
78+ "status" : self .get_schedule_status (),
79+ "model_name" : self .model_name ,
80+ "init_nodes_num" : self .init_nodes_num ,
81+ "node_join_command" : get_node_join_command (
82+ self .model_name , "${scheduler_addr}" , self .is_local_network
83+ ),
84+ "node_list" : self .get_node_list (),
85+ },
86+ }
87+
88+ def get_node_list (self ):
89+ if self .scheduler is None :
90+ return []
91+
92+ return [self .build_node_info (node ) for node in self .scheduler .nodes ]
93+
94+ def build_node_info (self , node ):
95+ return {
96+ "node_id" : node .node_id ,
97+ "status" : NODE_STATUS_AVAILABLE if node .is_active else NODE_STATUS_WAITING ,
98+ "gpu_name" : node .hardware .gpu_name ,
99+ "gpu_memory" : node .hardware .memory_gb ,
100+ }
101+
51102 def _start_scheduler (self , model_name , init_nodes_num ):
52- """Create the scheduler and start its background run loop if needed."""
103+ """
104+ Create the scheduler and start its background run loop if needed.
105+ """
53106 if self .scheduler is not None :
54107 logger .info ("Scheduler already started; skipping re-initialization" )
55108 return
56109
57- mode_info = get_model_info (model_name )
58- # 初始化 scheduler
59- self .scheduler = Scheduler (mode_info , [], min_nodes_bootstrapping = init_nodes_num )
110+ self .model_name = model_name
111+ self .init_nodes_num = init_nodes_num
112+
113+ model_info = get_model_info (model_name )
114+ self .scheduler = Scheduler (model_info , [], min_nodes_bootstrapping = init_nodes_num )
60115
61116 # Run the scheduler's event/dispatch loops in background so the process
62117 # can continue to serve RPCs and HTTP traffic.
@@ -69,7 +124,9 @@ def _start_scheduler(self, model_name, init_nodes_num):
69124 logger .info ("Scheduler background thread started (poll_interval=0.05)" )
70125
71126 def _start_lattica (self ):
72- """Initialize and start the Lattica P2P node used for RPCs."""
127+ """
128+ Initialize and start the Lattica P2P node used for RPCs.
129+ """
73130 logger .info (
74131 f"Starting Lattica with host_maddrs={ self .host_maddrs } , mdns=False, dht_prefix={ self .dht_prefix } "
75132 )
@@ -113,12 +170,12 @@ def get_routing_table(self, request_id, received_ts):
113170 request = RequestSignal (request_id , received_ts )
114171 self .scheduler .receive_request (request )
115172
116- # 等待最长 5s, 但如果路由表已被设置(包括空列表),则立即返回
173+ # Wait up to 5 seconds, but return immediately if the routing table is set (including an empty list)
117174 start_time = time .time ()
118175 while request .routing_table is None and (time .time () - start_time ) < 5.0 :
119176 time .sleep (0.05 )
120177
121- # 返回routing_table
178+ # Return the routing_table
122179 if request .routing_table is None :
123180 logger .info (
124181 f"Routing table not ready after { (time .time () - start_time ):.2f} s for request_id={ request_id } "
@@ -130,17 +187,26 @@ def get_routing_table(self, request_id, received_ts):
130187 return request .routing_table
131188
132189 def get_schedule_status (self ):
133- """Return whether a full pipeline has been allocated across joined nodes."""
190+ """
191+ Return whether a full pipeline has been allocated across joined nodes.
192+ """
134193 if self .scheduler is None :
135194 logger .info ("SchedulerManage status queried: waiting (scheduler not initialized)" )
136- return "waiting"
195+ return NODE_STATUS_WAITING
137196
138- status = "success" if self .scheduler .layer_allocator .has_full_pipeline () else "waiting"
197+ # todo rebalance status
198+ status = (
199+ NODE_STATUS_AVAILABLE
200+ if self .scheduler .layer_allocator .has_full_pipeline ()
201+ else NODE_STATUS_WAITING
202+ )
139203 logger .info (f"SchedulerManage status queried: { status } " )
140204 return status
141205
142206 def get_call_url_by_node_id (self , node_id ):
143- """Lookup the HTTP endpoint for a given node id managed by the RPC layer."""
207+ """
208+ Lookup the HTTP endpoint for a given node id managed by the RPC layer.
209+ """
144210 url = self .connection_handler .get_call_url_by_node_id (node_id )
145211 logger .info (f"Lookup call_url for node_id={ node_id } -> { url } " )
146212 return url
0 commit comments