44import asyncio
55import os
66import pickle
7+ import time
78from collections import deque
89from copy import deepcopy
910from typing import Any , List
3435 ServerArgs ,
3536 set_global_server_args ,
3637)
37- from sglang .multimodal_gen .runtime .utils .common import get_zmq_socket
38+ from sglang .multimodal_gen .runtime .utils .common import (
39+ get_diffusion_metrics_collector ,
40+ get_zmq_socket ,
41+ )
3842from sglang .multimodal_gen .runtime .utils .distributed import broadcast_pyobj
3943from sglang .multimodal_gen .runtime .utils .logging_utils import GREEN , RESET , init_logger
4044
@@ -101,6 +105,8 @@ def __init__(
101105
102106 # FIFO, new reqs are appended
103107 self .waiting_queue : deque [tuple [bytes , Req ]] = deque ()
108+ self ._generation_waiting_count = 0
109+ self ._generation_enqueue_timestamps : dict [int , float ] = {}
104110
105111 # whether we've send the necessary warmup reqs
106112 self .warmed_up = False
@@ -110,6 +116,15 @@ def __init__(
110116
111117 self .prepare_server_warmup_reqs ()
112118
119+ self .metrics_collector = (
120+ get_diffusion_metrics_collector (server_args )
121+ if server_args .enable_metrics and gpu_id == 0
122+ else None
123+ )
124+ if self .metrics_collector is not None :
125+ self .metrics_collector .set_queue_depth (self ._generation_waiting_count )
126+ self .metrics_collector .set_running_reqs (0 )
127+
113128 # Maximum consecutive errors before terminating the event loop
114129 self ._max_consecutive_errors = 3
115130 self ._consecutive_error_count = 0
@@ -187,9 +202,32 @@ def get_next_batch_to_run(self) -> list[tuple[bytes, Req]] | None:
187202
188203 # pop the first (earliest)
189204 item = self .waiting_queue .popleft ()
205+ self ._on_req_dequeued (item [1 ])
206+ if self .metrics_collector is not None :
207+ self .metrics_collector .set_queue_depth (self ._generation_waiting_count )
190208
191209 return [item ]
192210
211+ def _on_req_enqueued (self , req : Any ) -> None :
212+ if not isinstance (req , Req ):
213+ return
214+ self ._generation_waiting_count += 1
215+ self ._generation_enqueue_timestamps [id (req )] = time .monotonic ()
216+
217+ def _on_req_dequeued (self , req : Any ) -> None :
218+ if not isinstance (req , Req ):
219+ return
220+ if self ._generation_waiting_count > 0 :
221+ self ._generation_waiting_count -= 1
222+ enqueue_ts = self ._generation_enqueue_timestamps .pop (id (req ), None )
223+ if enqueue_ts is not None and self .metrics_collector is not None :
224+ self .metrics_collector .observe_queue_time (time .monotonic () - enqueue_ts )
225+
226+ def _enqueue_received_reqs (self , new_reqs : list [tuple [bytes , Any ]]) -> None :
227+ self .waiting_queue .extend (new_reqs )
228+ for _ , req in new_reqs :
229+ self ._on_req_enqueued (req )
230+
193231 def prepare_server_warmup_reqs (self ):
194232 if (
195233 self .server_args .warmup
@@ -235,6 +273,7 @@ def prepare_server_warmup_reqs(self):
235273 )
236274 req .set_as_warmup ()
237275 self .waiting_queue .append ((None , req ))
276+ self ._on_req_enqueued (req )
238277 # if server is warmed-up, set this flag to avoid req-based warmup
239278 self .warmed_up = True
240279
@@ -334,7 +373,11 @@ def event_loop(self) -> None:
334373 try :
335374 new_reqs = self .recv_reqs ()
336375 new_reqs = self .process_received_reqs_with_req_based_warmup (new_reqs )
337- self .waiting_queue .extend (new_reqs )
376+ self ._enqueue_received_reqs (new_reqs )
377+ if self .metrics_collector is not None :
378+ self .metrics_collector .set_queue_depth (
379+ self ._generation_waiting_count
380+ )
338381 # Reset error count on success
339382 self ._consecutive_error_count = 0
340383 except Exception as e :
@@ -362,60 +405,72 @@ def event_loop(self) -> None:
362405
363406 identities = [item [0 ] for item in items ]
364407 reqs = [item [1 ] for item in items ]
408+ generation_running_reqs = sum (1 for req in reqs if isinstance (req , Req ))
409+ if self .metrics_collector is not None :
410+ self .metrics_collector .set_running_reqs (generation_running_reqs )
365411
366412 try :
367- processed_req = reqs [0 ]
368- handler = self .request_handlers .get (type (processed_req ))
369- if handler :
370- output_batch = handler (reqs )
371- else :
372- output_batch = OutputBatch (
373- error = f"Unknown request type: { type (processed_req )} "
413+ try :
414+ processed_req = reqs [0 ]
415+ handler = self .request_handlers .get (type (processed_req ))
416+ if handler :
417+ output_batch = handler (reqs )
418+ else :
419+ output_batch = OutputBatch (
420+ error = f"Unknown request type: { type (processed_req )} "
421+ )
422+ except Exception as e :
423+ logger .error (
424+ f"Error executing request in scheduler event loop: { e } " ,
425+ exc_info = True ,
426+ )
427+ # Determine appropriate error response format
428+ output_batch = (
429+ OutputBatch (error = str (e ))
430+ if reqs and isinstance (reqs [0 ], Req )
431+ else OutputBatch (error = str (e ))
374432 )
375- except Exception as e :
376- logger .error (
377- f"Error executing request in scheduler event loop: { e } " ,
378- exc_info = True ,
379- )
380- # Determine appropriate error response format
381- output_batch = (
382- OutputBatch (error = str (e ))
383- if reqs and isinstance (reqs [0 ], Req )
384- else OutputBatch (error = str (e ))
385- )
386433
387- # 3. return results
388- try :
389- # log warmup info
390- is_warmup = (
391- processed_req .is_warmup if isinstance (processed_req , Req ) else False
392- )
393- if is_warmup :
394- if output_batch .error is None :
395- if self ._warmup_total > 0 :
396- logger .info (
397- f"Warmup req ({ self ._warmup_processed } /{ self ._warmup_total } ) processed in { GREEN } %.2f{ RESET } seconds" ,
398- output_batch .metrics .total_duration_s ,
399- )
400- else :
401- logger .info (
402- f"Warmup req processed in { GREEN } %.2f{ RESET } seconds" ,
403- output_batch .metrics .total_duration_s ,
404- )
405- else :
406- if self ._warmup_total > 0 :
407- logger .info (
408- f"Warmup req ({ self ._warmup_processed } /{ self ._warmup_total } ) processing failed"
409- )
434+ # 3. return results
435+ try :
436+ # log warmup info
437+ is_warmup = (
438+ processed_req .is_warmup
439+ if isinstance (processed_req , Req )
440+ else False
441+ )
442+ if is_warmup :
443+ if output_batch .error is None :
444+ if self ._warmup_total > 0 :
445+ logger .info (
446+ f"Warmup req ({ self ._warmup_processed } /{ self ._warmup_total } ) processed in { GREEN } %.2f{ RESET } seconds" ,
447+ output_batch .metrics .total_duration_s ,
448+ )
449+ else :
450+ logger .info (
451+ f"Warmup req processed in { GREEN } %.2f{ RESET } seconds" ,
452+ output_batch .metrics .total_duration_s ,
453+ )
410454 else :
411- logger .info (f"Warmup req processing failed" )
412-
413- # TODO: Support sending back to multiple identities if batched
414- self .return_result (output_batch , identities [0 ], is_warmup = is_warmup )
415- except zmq .ZMQError as e :
416- # Reply failed; log and keep loop alive to accept future requests
417- logger .error (f"ZMQ error sending reply: { e } " )
418- continue
455+ if self ._warmup_total > 0 :
456+ logger .info (
457+ f"Warmup req ({ self ._warmup_processed } /{ self ._warmup_total } ) processing failed"
458+ )
459+ else :
460+ logger .info (f"Warmup req processing failed" )
461+
462+ # TODO: Support sending back to multiple identities if batched
463+ self .return_result (output_batch , identities [0 ], is_warmup = is_warmup )
464+ except zmq .ZMQError as e :
465+ # Reply failed; log and keep loop alive to accept future requests
466+ logger .error (f"ZMQ error sending reply: { e } " )
467+ continue
468+ finally :
469+ if self .metrics_collector is not None :
470+ self .metrics_collector .set_running_reqs (0 )
471+ self .metrics_collector .set_queue_depth (
472+ self ._generation_waiting_count
473+ )
419474
420475 if self .receiver is not None :
421476 self .receiver .close ()
0 commit comments