1717from loguru import logger
1818
1919from guidellm .config import settings
20- from guidellm .request .session import RequestSession
2120from guidellm .request .types import (
2221 RequestT ,
2322 ResponseT ,
2726 SchedulerRequestResult ,
2827 SchedulerResult ,
2928 SchedulerRunInfo ,
30- WorkerProcessRequestTime ,
29+ WorkerProcessRequest ,
3130 WorkerProcessResult ,
3231)
3332from guidellm .scheduler .strategy import SchedulingStrategy
@@ -127,10 +126,14 @@ async def run(
127126 ) as executor ,
128127 ):
129128 requests_iter : Optional [Iterator [Any ]] = None
129+ # TODO: Configurable delay and move somewhere more appropriate
130+ scheduling_strategy .start_time = (
131+ time .time ()
132+ ) # Add a small delay to allow processes to start
130133 futures , queues , stop_event = await self ._start_processes (
131134 manager , executor , scheduling_strategy
132135 )
133- run_info , requests_iter , times_iter = self ._run_setup (
136+ run_info , requests_iter = self ._run_setup (
134137 futures , scheduling_strategy , max_number , max_duration
135138 )
136139 yield SchedulerResult (
@@ -147,17 +150,16 @@ async def run(
147150
148151 if (
149152 requests_iter is None
150- and run_info .completed_requests >= run_info .created_requests
153+ # FIXME: Need new way to handle max requests
154+ # and run_info.completed_requests >= run_info.created_requests
151155 ):
152156 # we've exhausted all requests we've wanted to run
153157 # and yielded all responses
154158 break
155159
156160 requests_iter = self ._add_requests (
157161 requests_iter ,
158- times_iter ,
159162 queues .requests ,
160- queues .times ,
161163 run_info ,
162164 )
163165 await asyncio .sleep (0 ) # enable requests to start
@@ -196,7 +198,6 @@ async def _start_processes(
196198 requests = manager .Queue (
197199 maxsize = scheduling_strategy .processing_requests_limit
198200 ),
199- times = manager .Queue (maxsize = scheduling_strategy .processing_requests_limit ),
200201 responses = manager .Queue (),
201202 )
202203 stop_event = manager .Event ()
@@ -229,10 +230,12 @@ async def _start_processes(
229230 executor ,
230231 self .worker .process_loop_asynchronous ,
231232 queues ,
233+ scheduling_strategy ,
232234 stop_event ,
233235 False , # TODO: Make configurable
234236 requests_limit ,
235237 id_ ,
238+ num_processes ,
236239 )
237240 )
238241
@@ -246,11 +249,9 @@ def _run_setup(
246249 scheduling_strategy : SchedulingStrategy ,
247250 max_number : Optional [int ],
248251 max_duration : Optional [float ],
249- ) -> tuple [SchedulerRunInfo , Iterator [Any ], Iterator [ float ] ]:
252+ ) -> tuple [SchedulerRunInfo , Iterator [Any ]]:
250253 requests_iter = iter (self .request_loader )
251- start_time = time .time ()
252- times_iter = iter (scheduling_strategy .request_times ())
253- end_time = time .time () + (max_duration or math .inf )
254+ end_time = scheduling_strategy .start_time + (max_duration or math .inf )
254255 end_number = max_number or math .inf
255256
256257 try :
@@ -268,21 +269,19 @@ def _run_setup(
268269 )
269270
270271 info = SchedulerRunInfo (
271- start_time = start_time ,
272+ start_time = scheduling_strategy . start_time ,
272273 end_time = end_time ,
273274 end_number = end_number ,
274275 processes = len (processes ),
275276 strategy = scheduling_strategy ,
276277 )
277278
278- return info , requests_iter , times_iter
279+ return info , requests_iter
279280
280281 def _add_requests (
281282 self ,
282283 requests_iter : Optional [Iterator [Any ]],
283- times_iter : Iterator [float ],
284- requests_queue : Queue [RequestSession [RequestT , ResponseT ]],
285- times_queue : Queue [WorkerProcessRequestTime ],
284+ requests_queue : Queue [WorkerProcessRequest [RequestT , ResponseT ]],
286285 run_info : SchedulerRunInfo ,
287286 ) -> Optional [Iterator [Any ]]:
288287 if requests_iter is not None :
@@ -296,24 +295,20 @@ def _add_requests(
296295 if run_info .created_requests >= run_info .end_number :
297296 raise StopIteration
298297
298+ if time .time () >= run_info .end_time :
299+ raise StopIteration
300+
299301 session = next (requests_iter )
300- requests_queue .put (session )
301- for _ in range (len (session )):
302- if (
303- request_time := next (times_iter )
304- ) >= run_info .end_time or time .time () >= run_info .end_time :
305- raise StopIteration
306-
307- work_req = WorkerProcessRequestTime (
308- start_time = request_time ,
309- timeout_time = run_info .end_time ,
310- queued_time = time .time (),
311- )
312- times_queue .put (work_req )
313-
314- run_info .created_requests += 1
315- run_info .queued_requests += 1
316- added_count += 1
302+ work_req = WorkerProcessRequest (
303+ session = session ,
304+ timeout_time = run_info .end_time ,
305+ queued_time = time .time (),
306+ )
307+ requests_queue .put (work_req )
308+
309+ run_info .created_requests += len (session )
310+ run_info .queued_requests += len (session )
311+ added_count += len (session )
317312 except StopIteration :
318313 # we've reached the limit number, limit time, or exhausted the requests
319314 # set to None to stop adding more and tell the loop no more requests
0 commit comments