PD streaming: batch notify + SSE fast path#22658
Open
inkcherry wants to merge 1 commit intosgl-project:mainfrom
Open
PD streaming: batch notify + SSE fast path#22658inkcherry wants to merge 1 commit intosgl-project:mainfrom
inkcherry wants to merge 1 commit intosgl-project:mainfrom
Conversation
P1 (Batch Notify): Batch event.set() calls in groups of 16 with asyncio.sleep(0) yield points to reduce asyncio wakeup storms under high-concurrency PD disagg streaming. A1 (SSE Fast Path): Replace Pydantic model_dump_json() with direct dict construction + orjson.dumps() in the SSE streaming hot path, eliminating per-chunk Pydantic overhead.
Contributor
|
Warning You have reached your daily quota limit. Please wait up to 24 hours and I will start processing your requests again! |
Collaborator
|
/tag-and-rerun-ci |
Collaborator
|
@hnyls2002 please help review. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Motivation
cc @ZhaiFeiyue @Duyi-Wang
Under high-concurrency PD disaggregation streaming (e.g., 2048 concurrent requests), the decode-side
tokenizer_managerbecomes a CPU bottleneck due to two issues:event.set()in_handle_batch_outputimmediately wakes an asyncio coroutine. With hundreds of requests per decode batch, this causes excessive context switching.DeltaMessage,ChatCompletionResponseStreamChoice,ChatCompletionStreamResponse) and callsmodel_dump_json(), which involves schema validation, field traversal, and recursive serialization — unnecessary for a fixed-structure streaming chunk.Both optimizations target the post-decode CPU path only (tokenizer manager + API entrypoint), improving throughput and TPOT without increasing ITL.
Changes
1. Batch Notify (
tokenizer_manager.py)_handle_batch_outputfrom sync toasyncstate.event.set()per request, collect pending notifications and flush in groups of 16 withawait asyncio.sleep(0)yield points2. SSE Fast Path (
serving_chat.py)_fast_sse_content()helper that constructs plain Python dicts and usesorjson.dumps()instead of Pydanticmodel_dump_json()Note on IPC serialization
During profiling we also identified pickle serialization on the detokenizer→tokenizer IPC path as a major bottleneck. We noticed that #21643 is already in progress to migrate this from pickle to msgpack/msgspec, which we believe will provide significant additional throughput gains for high-concurrency PD disagg workloads.
Test Setup
Hardware: 2-node PD disagg (1× prefill MI355, 1× decode MI355)
Model: DeepSeek-R1-0528-MXFP4
Benchmark: 10,240 prompts, random input/output len 1024 (range ratio 0.8), max-concurrency 2048, 2,048 warmup requests
Prefill server (node 1):
python3 -m sglang.launch_server \ --model-path DeepSeek-R1-0528-MXFP4 \ --disaggregation-mode prefill \ --tp-size 4 --ep-size 4 --dp-size 4 \ --max-running-requests 256 \ --chunked-prefill-size 49152 \ --kv-cache-dtype fp8_e4m3 \ --attention-backend aiter \ --enable-dp-attention \ --disable-radix-cacheDecode server (node 2):
python3 -m sglang.launch_server \ --model-path DeepSeek-R1-0528-MXFP4 \ --disaggregation-mode decode \ --tp-size 8 --ep-size 8 --dp-size 8 \ --max-running-requests 2048 \ --kv-cache-dtype fp8_e4m3 \ --attention-backend aiter \ --enable-dp-attention \ --cuda-graph-bs 1..560Router:
python3 -m sglang_router.launch_router \ --pd-disaggregation --port 30000 \ --policy random --prefill-policy random --decode-policy random \ --prefill http://<prefill_ip>:8000 --decode http://<decode_ip>:8000Benchmark:
python3 benchmark_serving.py \ --backend openai --base-url http://0.0.0.0:30000 \ --model DeepSeek-R1-0528-MXFP4 \ --dataset-name random --random-input-len 1024 --random-output-len 1024 --random-range-ratio 0.8 \ --num-prompts 10240 --max-concurrency 2048 --request-rate inf \ --ignore-eos --num-warmups 2048Results
Interleaved validation (baseline and this PR alternated across rounds to control for thermal drift):
Output throughput: 11,488 → 13,802 tok/s (+20.1%), Mean TPOT: 110.74 → 92.62 ms (-16.4%), Mean ITL: 110.61 → 92.53 ms (-16.4%)
Checklist
Review and Merge Process
/tag-and-rerun-ci,/tag-run-ci-label,/rerun-failed-ci