[PD] MORI-IO: Add state transfer, async I/O workers, and high-concurrency fixes#22665
Draft
maning00 wants to merge 5 commits intosgl-project:mainfrom
Draft
[PD] MORI-IO: Add state transfer, async I/O workers, and high-concurrency fixes#22665maning00 wants to merge 5 commits intosgl-project:mainfrom
maning00 wants to merge 5 commits intosgl-project:mainfrom
Conversation
Contributor
|
Warning You have reached your daily quota limit. Please wait up to 24 hours and I will start processing your requests again! |
Three fixes for the Worker-Exclusive I/O model (Transfer Queue): 1. AUX data via RDMA instead of ZMQ TCP: send_aux() now uses engine.batch_write() with registered aux_mem_descs, eliminating 10 ZMQ messages per room that flooded the decode PULL socket and blocked status notifications. 2. Stale metadata guard relaxed: _handle_transfer_message() now accepts metadata when current status is None (room not yet created by scheduler), since decode can send metadata before prefill creates MoriKVSender. Only rejects active/terminal states. 3. ZMQ socket improvements: _connect_threadsafe() shares a single zmq.Context across worker threads, sets SNDHWM=0 and LINGER=0 to prevent message loss under load.
a9ef95a to
fcb1aa3
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Motivation
Follow-up to #14626 which introduced MORI-IO as the RDMA-based KV transfer backend for PD disaggregation on AMD hardware. This PR addresses the known limitation (no state transfer) and resolves several performance bottlenecks and correctness issues discovered under high-concurrency workloads:
prefill_tp_size > decode_tp_sizewith GQA/MQA.Modifications
All changes are confined to
python/sglang/srt/disaggregation/mori/conn.py.1. State Transfer Support (Mamba, SWA, NSA)
send_state()method onMoriKVManagerthat dispatches to_send_mamba_state()or_send_swa_nsa_state()based onstate_type._send_mamba_state(): Single-index Mamba SSM state transfer with TP-mismatch slice support (computes per-dimension offsets when prefill TP != decode TP)._send_swa_nsa_state(): Multi-token SWA/NSA state transfer usinggroup_concurrent_contiguous()and batched RDMA writes.TransferInfowithdst_state_indicesandKVArgsRegisterInfowithdst_state_item_lens/dst_state_dim_per_tensor.2. Async Worker-Exclusive I/O Model (FastQueue)
Replaced the synchronous inline transfer model with an asynchronous worker-thread architecture:
TransferKVChunk: New dataclass encapsulating a single chunk of transfer work.SGLANG_MORI_TRANSFER_QUEUE_SIZE, default: 4), each blocking on its ownFastQueue._process_transfer_chunk(): Full lifecycle management per chunk — KV transfer, state transfer, aux transfer, RDMA status polling, and decode notification.poll()to a pure status reader; all transfer work is offloaded to worker threads.3. AUX Data via RDMA (Replacing ZMQ TCP)
send_aux()to useengine.batch_write()with registered aux memory descriptors instead of ZMQsend_multipart. This eliminates ZMQ message flooding that caused decode-side hangs under high concurrency._connect_threadsafe()with thread-local ZMQ sockets for remaining ZMQ usage.4. Batched Multi-Layer RDMA Transfers
_batched_layer_transfers(): Issues a singleengine.batch_write()across all layers, reducing RDMA calls from O(layers) to O(1)._batched_tp_slice_transfers(): Same batching for TP-mismatch slice transfers with vectorized NumPy offset computation.5. Bug Fixes
prefill_tp_size > decode_tp_sizewith GQA/MQA. Introducedsrc_replicationandunique_head_idxfor correct replicated head mapping._handle_transfer_message()now accepts metadata when room status isNone(room not yet created by scheduler), preventing hangs.update_status()state machine guard:Failedis terminal and never overwritten._compute_prefill_unique_rank()now correctly encodes TP/PP/CP ranks.6. Default Parallelism Tuning
SGLANG_MORI_QP_PER_TRANSFERSGLANG_MORI_NUM_WORKERSSGLANG_MORI_TRANSFER_QUEUE_SIZEBenchmarking
Hardware Configuration
ionic_0~ionic_7)Benchmark Commands
Prefill instance:
export MORI_RDMA_TC=104 python3 -m sglang.launch_server --model-path DeepSeek-R1 \ --disaggregation-mode prefill --host 0.0.0.0 --port 30002 \ --tp-size 8 --kv-cache-dtype fp8_e4m3 \ --disaggregation-transfer-backend mori \ --disaggregation-ib-device ionic_0,ionic_1,ionic_2,ionic_3,ionic_4,ionic_5,ionic_6,ionic_7 \ --disable-radix-cache --trust-remote-codeDecode instance:
export MORI_RDMA_TC=104 python3 -m sglang.launch_server --model-path DeepSeek-R1 \ --disaggregation-mode decode --host 0.0.0.0 --port 30003 \ --tp-size 8 --kv-cache-dtype fp8_e4m3 \ --disaggregation-transfer-backend mori \ --disaggregation-ib-device ionic_0,ionic_1,ionic_2,ionic_3,ionic_4,ionic_5,ionic_6,ionic_7 \ --disable-radix-cache --trust-remote-codeRouter:
python -m sglang_router.launch_router --pd-disaggregation \ --port 30000 --policy random \ --prefill-policy random --decode-policy random \ --prefill http://<prefill_host>:30002 \ --decode http://<decode_host>:30003Benchmark client:
python3 -m sglang.bench_serving \ --backend sglang --host 0.0.0.0 --port 30000 \ --dataset-name random --num-prompts 2048 --max-concurrency 2048 \ --random-input-len 8192 --random-output-len 1024 \ --model DeepSeek-R1Performance Results
FP8 KV Cache (
--kv-cache-dtype fp8_e4m3,--num-prompts 1,--random-output 16, each run 2 times averaged):BF16 KV Cache (
--num-prompts 1,--random-output 16, each run 2 times averaged):Accuracy Test
python3 -m sglang.test.few_shot_gsm8k \ --host http://127.0.0.1 --port 30000 \ --num-questions 200 --parallel 128 --num-shots 5Known Limitations
cc @Duyi-Wang @ZhaiFeiyue