You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
<img alt="TTFP Performance Data Comparison" src="https://raw.githubusercontent.com/vllm-project/vllm-omni/refs/heads/main/docs/source/performance/qwen3-omni_ttfp_performance.png" width=100%>
32
46
</picture>
33
47
</p>
34
48
@@ -52,7 +66,7 @@ Here is qwen3-omni showcase for reference:
52
66
-`OmniGenerationScheduler`: For generation stages
53
67
- Both schedulers use **OmniChunkManager** and **before/after** hooks around `super().schedule()`: process chunk queues before, restore queues and merge chunk data after
54
68
55
-
4.**OmniChunkManager**: Owns the full chunk lifecycle when async_chunk is enabled
69
+
4.**OmniChunkManager**: Owns the full chunk lifecycle when async_chunk is enabled (WIP)
56
70
-**Chunk ID and key construction**: Builds keys like `{req_id}_{stage_id}_{chunk_id}` for put/get
57
71
-**Assembling chunked business data**: Merges thinker embeddings (stage 0), accumulates code_predictor_codes and builds code2wav payloads (stage 1), etc.; uses connector-backed state (e.g. put_requests, get_requests, request_prompt_token_ids, code_prompt_token_ids) where needed
58
72
-**Async get**: `get_chunk(request)` enqueues the request for loading; a background **recv_loop** thread polls the connector and, when data is available, updates the request (e.g. `additional_information`, `prompt_token_ids`) and marks it in `_finished_load_reqs`; scheduler calls `get_finished()` to learn which requests have chunks ready
@@ -161,7 +175,7 @@ sequenceDiagram
161
175
Runner2->>Client: audio output
162
176
```
163
177
164
-
### Detailed Chunk Flow For Qwen3-Omni
178
+
### Example Chunk Flow For Qwen3-Omni
165
179
166
180
1.**Thinker Stage (Stage 0)**:
167
181
- Processes input and generates text tokens incrementally
@@ -182,9 +196,7 @@ sequenceDiagram
182
196
- Uses `chunked_decode_streaming()` for streaming audio generation
- **Connector**: Performs only data transport (`put`/`get`). It does not track per-request chunk counters or payload metadata; those live on connector-backed state used by OmniChunkManager (or on the connector implementation if still required for key construction).
205
-
- **OmniChunkManager**: When async_chunk is enabled, it manages chunk keys, async get (enqueue → recv_loop → get_finished), async put (build payload in main thread → enqueue → save_loop), and uses per-request counters and accumulated data (e.g. put_requests, get_requests, request_prompt_token_ids, code_prompt_token_ids, finished_requests) where needed for key construction and payload assembly.
214
+
### Stage Configuration
206
215
207
-
### Chunk Processing Functions in stage input processor
- `_process_chunk_queue(waiting, waiting_for_chunk_waiting_requests, WAITING)` and same for running: for each request, if not WAITING_FOR_CHUNK and not in `requests_with_ready_chunks` and not in connector `finished_requests`, call `chunk_manager.get_chunk(request)` and set status to WAITING_FOR_CHUNK; if already WAITING_FOR_CHUNK and in `finished_load_chunk_reqs`, set status back to WAITING/RUNNING and add to `requests_with_ready_chunks`; move requests that are now WAITING_FOR_CHUNK from main queue into `waiting_for_chunk_*` deques
249
-
- Reduce `max_num_running_reqs` by the number of requests in `waiting_for_chunk_running_requests`
- **In `finally`**: Restore requests from `waiting_for_chunk_*` back into `waiting` and `running`; clear `finished_load_chunk_reqs`
252
-
- **After**: Merge chunk data into scheduler_output (e.g. for `scheduled_cached_reqs`, set `additional_information[req_id]` from `self.requests[req_id].additional_information`); if `chunk_manager`, call `_clear_chunk_ready(scheduler_output)` to remove consumed reqs from `requests_with_ready_chunks`
253
-
254
-
#### `OmniARScheduler.update_from_output()`
255
-
256
-
- When chunk_manager is set, calls `chunk_manager.put_chunk(pooler_output, request, custom_process_next_stage_input_func)` (async: payload built here, send in save_loop)
257
-
258
-
#### `OmniGenerationScheduler.schedule()`
259
-
260
-
- **Before** the main scheduling loop (when `chunk_manager` is set): same pattern as AR—`get_finished()`, `_process_chunk_queue` for waiting and running, then reduce `max_num_running_reqs`
261
-
- Chunk retrieval for generation is driven by OmniChunkManager (recv_loop updates request `prompt_token_ids` and status); no blocking `get_chunk_for_generation()` in the hot path
262
-
- **After** building scheduler_output (or after `super().schedule()` in fallback): `_restore_chunk_requests()` and `_clear_chunk_ready(scheduler_output)`
263
-
264
-
### Async Scheduling for Chunk IO Overlap
265
-
266
-
The async scheduling feature overlaps chunk IO operations with compute to improve throughput:
267
-
268
-
#### Request State Transitions
269
-
270
-
1. **WAITING/RUNNING → WAITING_FOR_CHUNK**:
271
-
- In `_process_chunk_queue`, when a request needs a chunk, `chunk_manager.get_chunk(request)` is called (enqueues for recv_loop)
272
-
- Request status set to WAITING_FOR_CHUNK and request is moved from main waiting/running queues into `waiting_for_chunk_waiting_requests` / `waiting_for_chunk_running_requests`
273
-
- Base vLLM scheduler does not see these requests, so it does not schedule them
274
-
- This prevents blocking while chunk retrieval happens in the background
275
-
276
-
2. **WAITING_FOR_CHUNK → WAITING/RUNNING**:
277
-
- When recv_loop has loaded the chunk, the request is in `chunk_manager.get_finished()`
278
-
- Scheduler restores those requests to WAITING or RUNNING and adds them back to `waiting` / `running` in the `finally` block after `super().schedule()`
279
-
- Next schedule cycle they are eligible again; `_clear_chunk_ready` removes them from `requests_with_ready_chunks` once they have been scheduled and consumed
280
-
281
-
#### OmniChunkManager
282
-
283
-
- **recv_loop**: Background thread; iterates over `_pending_load_reqs`, calls `connector.get(...)` (non-blocking/timeout), on success updates request and connector state, moves req to `_finished_load_reqs` and removes from `_pending_load_reqs`
284
-
- **save_loop**: Background thread; dequeues tasks from `_pending_save_reqs`, calls `connector.put(...)`; on success marks request in `_finished_save_reqs`
285
-
- **get_chunk(request)**: Enqueues request for load (adds to `_pending_load_reqs`)
286
-
- **get_finished()**: Returns and clears the set of request IDs that have finished loading a chunk
287
-
- **put_chunk(...)**: Builds payload (including optional merging/accumulation) in main thread, increments put_requests, enqueues save task into `_pending_save_reqs`
288
-
289
-
#### Benefits
290
-
291
-
- **Non-blocking**: Other requests can continue processing while chunks are being fetched
292
-
- **Better GPU Utilization**: Reduces idle time when waiting for chunk IO
293
-
- **Improved Throughput**: Overlaps IO operations with compute operations
294
-
- **Lower Latency**: Requests don't block the entire scheduler while waiting for chunks
295
-
296
-
### Model Runner Modifications
297
-
298
-
#### `OmniGPUModelRunner._preprocess()`
299
-
300
-
- When `async_chunk` is enabled, uses `_get_additional_information()` to retrieve chunk data from scheduler
301
-
- Falls back to request state for non-async_chunk mode
302
-
- Handles per-request additional information for prefill and decode
- Codes are reshaped: `[num_quantizers, seq_len] → [seq_len * num_quantizers]`
346
-
347
-
## Request Lifecycle
348
-
349
-
1. **Request Initiation**:
350
-
- Request submitted to Stage 0 (Thinker)
351
-
- Connector/chunk-manager state for the request is initialized (e.g. put_requests, get_requests) when first used
352
-
353
-
2. **Chunk Generation (Stage 0)**:
354
-
- After each decode step, `chunk_manager.put_chunk()` is called
355
-
- Payload built in main thread; chunk key: `f"{req_id}_{stage_id}_{chunk_id}"`
356
-
- Save task enqueued; save_loop performs `connector.put()` asynchronously
357
-
- put_requests incremented when committing to send
358
-
359
-
3. **Chunk Consumption (Stage 1+)** - With Async Scheduling:
360
-
- At schedule start, scheduler gets `chunk_manager.get_finished()` and runs `_process_chunk_queue`
361
-
- Requests that need a chunk: `chunk_manager.get_chunk(request)`enqueues them; status → WAITING_FOR_CHUNK; moved to waiting_for_chunk_* queues
362
-
- recv_loop in background polls connector; when chunk arrives, updates request (e.g. additional_information, prompt_token_ids) and adds to get_finished()
363
-
- After super().schedule(), scheduler restores waiting_for_chunk_* requests back to waiting/running
364
-
- Chunk data is already on the request (e.g. additional_information); scheduler_output cached_reqs get additional_information from requests; _clear_chunk_ready clears consumed entries from requests_with_ready_chunks
365
-
366
-
4. **Chunk Processing**:
367
-
- Model runner processes chunk in `_preprocess()`
368
-
- Model forward pass generates output
369
-
- Output chunk sent to next stage via `chunk_manager.put_chunk()` (async: enqueue, save_loop sends)
370
-
371
-
5. **Request Completion**:
372
-
- When `finished=True` in chunk, request marked as finished
373
-
- Final chunks processed
374
-
- Resources cleaned up
375
-
- Request removed from chunk manager tracking
376
-
377
-
## Synchronization and Ordering
378
-
379
-
- **Chunk Ordering**: Chunks are sequenced via chunk_id counter (e.g. put_requests/get_requests)
380
-
- **Request Isolation**: Each request has independent chunk counters and queues
381
-
- **Completion Detection**: `finished` flag in chunk indicates completion
382
-
- **State Synchronization**: OmniChunkManager recv_loop/save_loop use locks; scheduler only reads get_finished() and enqueues get_chunk/put_chunk
383
-
- **Queue Coordination**: Temporary queues (waiting_for_chunk_waiting_requests, waiting_for_chunk_running_requests) keep requests out of base scheduler until chunk is ready, then restore
- **Thinker → Talker**: Per decode step (typically 1 token)
@@ -409,6 +267,20 @@ connectors:
409
267
```
410
268
411
269
270
+
## Request State Transitions
271
+
272
+
1. **WAITING/RUNNING → WAITING_FOR_CHUNK**:
273
+
- In `_process_chunk_queue`, when a request needs a chunk, `chunk_manager.get_chunk(request)` is called (enqueues for recv_loop)
274
+
- Request status set to WAITING_FOR_CHUNK and request is moved from main waiting/running queues into `waiting_for_chunk_waiting_requests` / `waiting_for_chunk_running_requests`
275
+
- Base vLLM scheduler does not see these requests, so it does not schedule them
276
+
- This prevents blocking while chunk retrieval happens in the background
277
+
278
+
2. **WAITING_FOR_CHUNK → WAITING/RUNNING**:
279
+
- When recv_loop has loaded the chunk, the request is in `chunk_manager.get_finished()`
280
+
- Scheduler restores those requests to WAITING or RUNNING and adds them back to `waiting` / `running` in the `finally` block after `super().schedule()`
281
+
- Next schedule cycle they are eligible again; `_clear_chunk_ready` removes them from `requests_with_ready_chunks` once they have been scheduled and consumed
0 commit comments