11import base64
22import concurrent .futures
3+ import datetime
34import json
45import os
5- from typing import Any
6- import datetime
7- import time
6+ import queue
87import sys
9- import requests
8+ import threading
9+ import time
10+ from typing import Any
1011
12+ import requests
1113from openai import OpenAI
1214from vllm .assets .audio import AudioAsset
1315from vllm .utils .argparse_utils import FlexibleArgumentParser
1618openai_api_key = "EMPTY"
1719openai_api_base = "http://localhost:18091/v1"
1820
21+
22+ # Modify OpenAI's API key and API base to use vLLM's API server.
23+
24+
1925client = OpenAI (
2026 # defaults to os.environ.get("OPENAI_API_KEY")
2127 api_key = openai_api_key ,
@@ -344,68 +350,146 @@ def run_one_request(req_id: int):
344350 )
345351
346352 with concurrent .futures .ThreadPoolExecutor (max_workers = num_concurrent_requests ) as executor :
347- futures = [
348- executor .submit (run_one_request , req_id )
349- for req_id in range (num_concurrent_requests )
350- ]
351- # Collect by req_id so chat_completions[i] = response for request i
352- results_by_req = [None ] * num_concurrent_requests
353- for future in concurrent .futures .as_completed (futures ):
354- req_id , completion = future .result ()
355- results_by_req [req_id ] = completion
356- chat_completions = results_by_req
357-
358- assert len (chat_completions ) == num_concurrent_requests
359- count = 0
360- if not args .stream :
361- for req_id , chat_completion in enumerate (chat_completions ):
362- chat_completion_id = getattr (chat_completion , "id" , "" )
363- for choice in chat_completion .choices :
364- if choice .message .audio :
365- audio_data = base64 .b64decode (choice .message .audio .data )
366- audio_file_path = f"{ output_audio_path } /{ args .query_type } /audio_{ count } .wav"
367- os .makedirs (os .path .dirname (audio_file_path ), exist_ok = True )
368- with open (audio_file_path , "wb" ) as f :
369- f .write (audio_data )
370- print (f"[req { req_id } _{ chat_completion_id } ] Audio saved to { audio_file_path } " )
371- count += 1
372- elif choice .message .content :
373- print (f"[req { req_id } _{ chat_completion_id } ] Chat completion output from text:" , choice .message .content )
374- else :
375- for req_id , chat_completion in enumerate (chat_completions ):
376- printed_text_prefix = False
377- chat_completion_id = None
378- for chunk in chat_completion :
379- chat_completion_id = getattr (chunk , "id" , "" )
380- for choice in chunk .choices :
381- if hasattr (choice , "delta" ):
382- content = getattr (choice .delta , "content" , None )
383- else :
384- content = None
385-
386- if getattr (chunk , "modality" , None ) == "audio" and content :
387- audio_data = base64 .b64decode (content )
388- audio_file_path = f"{ output_audio_path } /{ args .query_type } /{ num_concurrent_requests } /{ chat_completion_id } /audio_{ count } .wav"
353+ futures = [executor .submit (run_one_request , req_id ) for req_id in range (num_concurrent_requests )]
354+
355+ if not args .stream :
356+ # Collect by req_id so chat_completions[i] = response for request i
357+ results_by_req = [None ] * num_concurrent_requests
358+ for future in concurrent .futures .as_completed (futures ):
359+ req_id , completion = future .result ()
360+ results_by_req [req_id ] = completion
361+ chat_completions = results_by_req
362+
363+ assert len (chat_completions ) == num_concurrent_requests
364+ count = 0
365+ for req_id , chat_completion in enumerate (chat_completions ):
366+ chat_completion_id = getattr (chat_completion , "id" , "" )
367+ for choice in chat_completion .choices :
368+ if choice .message .audio :
369+ audio_data = base64 .b64decode (choice .message .audio .data )
370+ audio_file_path = f"{ output_audio_path } /{ args .query_type } /audio_{ count } .wav"
389371 os .makedirs (os .path .dirname (audio_file_path ), exist_ok = True )
390372 with open (audio_file_path , "wb" ) as f :
391373 f .write (audio_data )
392- print (f"\n [req { req_id } _{ chat_completion_id } ] Audio saved to { audio_file_path } " , file = sys . stderr , flush = True )
374+ print (f"[req { req_id } _{ chat_completion_id } ] Audio saved to { audio_file_path } " )
393375 count += 1
376+ elif choice .message .content :
377+ print (
378+ f"[req { req_id } _{ chat_completion_id } ] Chat completion output from text:" ,
379+ choice .message .content ,
380+ )
381+ else :
382+ # Stream mode: process chunks from all requests in real-time,
383+ # displaying them in the order they arrive.
384+ chunk_queue = queue .Queue ()
385+ audio_counters = {}
386+ chat_completion_id_by_req = {}
387+ chat_completion_id_lock = threading .Lock ()
388+ last_text_req_id = None
389+
390+ def _stream_reader (req_id : int , stream ):
391+ """Read one stream and relay every chunk to the shared queue."""
392+ try :
393+ for chunk in stream :
394+ chunk_queue .put ((req_id , time .perf_counter (), chunk ))
395+ chunk_queue .put ((req_id , time .perf_counter (), None ))
396+ except Exception as e :
397+ print (f"\n [Request { req_id } ] stream error: { e } " , file = sys .stderr , flush = True )
398+ chunk_queue .put ((req_id , time .time (), None ))
399+
400+ # Kick off a reader thread per request
401+ reader_threads = []
402+ for req_id , future in enumerate (futures ):
403+ req_id_from_future , stream = future .result ()
404+ assert req_id == req_id_from_future , f"Request ID mismatch: { req_id } != { req_id_from_future } "
405+ audio_counters [req_id ] = 0
406+ t = threading .Thread (target = _stream_reader , args = (req_id , stream ), daemon = True )
407+ t .start ()
408+ reader_threads .append (t )
409+
410+ # Main loop – consume chunks in arrival order
411+ active_streams = set (range (num_concurrent_requests ))
412+ while active_streams :
413+ try :
414+ request_id , ts , chunk = chunk_queue .get (timeout = 2.0 )
415+ except queue .Empty :
416+ if all (not t .is_alive () for t in reader_threads ):
417+ break
418+ continue
419+
420+ if chunk is None :
421+ elapsed = ts - start_time
422+ print (f" ({ elapsed :.2f} s)" , flush = True )
423+ with chat_completion_id_lock :
424+ chat_completion_id = chat_completion_id_by_req .get (request_id , "" )
425+ print (
426+ f" [req { request_id } _{ chat_completion_id } ] Time finished for streaming: " ,
427+ datetime .datetime .now (),
428+ file = sys .stderr ,
429+ flush = True ,
430+ )
431+ active_streams .discard (request_id )
432+ continue
433+
434+ with chat_completion_id_lock :
435+ if request_id not in chat_completion_id_by_req :
436+ chat_completion_id_by_req [request_id ] = getattr (chunk , "id" , "" )
437+ chat_completion_id = chat_completion_id_by_req [request_id ]
438+
439+ modality = getattr (chunk , "modality" , None )
440+ elapsed = ts - start_time
441+ for choice in chunk .choices :
442+ content = getattr (choice .delta , "content" , None ) if hasattr (choice , "delta" ) else None
394443
395- elif getattr (chunk , "modality" , None ) == "text" and content :
396- if not printed_text_prefix :
397- printed_text_prefix = True
398- print (f"\n [req { req_id } _{ chat_completion_id } ] content:" , end = "" , flush = True )
399- print (content , end = "" , flush = True )
400- print (f" [req { req_id } _{ chat_completion_id } ] Time finished for streaming: " , datetime .datetime .now (), file = sys .stderr , flush = True )
401-
402- elapsed = time .perf_counter () - start_time
403- print (f"num_concurrent_requests_{ num_concurrent_requests } >>>>>>>>>Time finished for streaming<<<<<<<<: " , elapsed , file = sys .stderr , flush = True )
404- timing_audio_folder = f"{ output_audio_path } /{ args .query_type } /{ num_concurrent_requests } "
405- os .makedirs (timing_audio_folder , exist_ok = True )
406- timing_file = os .path .join (timing_audio_folder , "streaming_finish_time.txt" )
407- with open (timing_file , "w" ) as f :
408- f .write (f"num_concurrent_requests_{ num_concurrent_requests } elapsed_seconds: { elapsed } \n " )
444+ if modality == "audio" and content :
445+ audio_data = base64 .b64decode (content )
446+ audio_dir = (
447+ f"{ output_audio_path } /{ args .query_type } /{ num_concurrent_requests } /{ chat_completion_id } "
448+ )
449+ os .makedirs (audio_dir , exist_ok = True )
450+ audio_file_path = f"{ audio_dir } /audio_{ audio_counters [request_id ]} .wav"
451+ with open (audio_file_path , "wb" ) as f :
452+ f .write (audio_data )
453+ print (
454+ f"\n [{ elapsed :7.2f} s][req { request_id } _{ chat_completion_id } ] Audio saved to { audio_file_path } " ,
455+ file = sys .stderr ,
456+ flush = True ,
457+ )
458+ audio_counters [request_id ] += 1
459+
460+ elif modality == "text" and content :
461+ if last_text_req_id != request_id :
462+ if last_text_req_id is not None :
463+ print (flush = True )
464+ print (
465+ f"\n [{ elapsed :7.2f} s][req { request_id } _{ chat_completion_id } ] content:" ,
466+ end = "" ,
467+ flush = True ,
468+ )
469+ last_text_req_id = request_id
470+ print (
471+ f"\n [{ elapsed :7.2f} s][req { request_id } _{ chat_completion_id } ] { content } " , end = "" , flush = True
472+ )
473+
474+ # Final newline if the last output was streaming text
475+ if last_text_req_id is not None :
476+ print (flush = True )
477+
478+ for t in reader_threads :
479+ t .join (timeout = 1.0 )
480+
481+ elapsed = time .perf_counter () - start_time
482+ print (
483+ f"num_concurrent_requests_{ num_concurrent_requests } >>>>>>>>>Time finished for streaming<<<<<<<<: " ,
484+ elapsed ,
485+ file = sys .stderr ,
486+ flush = True ,
487+ )
488+ timing_audio_folder = f"{ output_audio_path } /{ args .query_type } /{ num_concurrent_requests } "
489+ os .makedirs (timing_audio_folder , exist_ok = True )
490+ timing_file = os .path .join (timing_audio_folder , "streaming_finish_time.txt" )
491+ with open (timing_file , "w" ) as f :
492+ f .write (f"num_concurrent_requests_{ num_concurrent_requests } elapsed_seconds: { elapsed } \n " )
409493
410494
411495def parse_args ():
@@ -429,7 +513,8 @@ def parse_args():
429513 "--message-json" ,
430514 "-m" ,
431515 type = str ,
432- default = "../../offline_inference/mimo_audio/message_base64_wav.json" ,
516+ # default="../../offline_inference/mimo_audio/message_base64_wav.json",
517+ default = "../../offline_inference/mimo_audio/message_base64_wav_tts.json" ,
433518 help = "Path to message.json file containing conversation history. When provided, "
434519 "system prompt and multi_audios query will be loaded from this file." ,
435520 )
0 commit comments