[serve.llm] Add MoRIIO KV-connector backend for prefill/decode#63951
[serve.llm] Add MoRIIO KV-connector backend for prefill/decode#63951kouroshHakha wants to merge 6 commits into
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces the MoRIIOConnector backend for Ray Serve LLM disaggregated prefill/decode, along with a static replica metadata hook (record_replica_metadata) to publish coordination data. The orchestrator is updated to delegate request shaping and handoff discipline to the resolved connector backend. Feedback focuses on preserving other fields of RawRequestInfo when canonicalizing headers, adding defensive checks for missing ZMQ addresses to prevent invalid request IDs, and populating the request ID early to avoid a fragile fallback.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| headers = dict(raw_request_info.headers) if raw_request_info is not None else {} | ||
| headers = {k: v for k, v in headers.items() if k.lower() != "x-request-id"} | ||
| headers["x-request-id"] = rid | ||
| return RawRequestInfo(headers=headers) |
There was a problem hiding this comment.
Recreating RawRequestInfo with only headers discards any other fields (such as cookies, query parameters, or client IP) that might be present in raw_request_info now or in the future. Since RawRequestInfo is a dataclass, we should use dataclasses.replace to safely update only the headers field while preserving all other attributes.
| headers = dict(raw_request_info.headers) if raw_request_info is not None else {} | |
| headers = {k: v for k, v in headers.items() if k.lower() != "x-request-id"} | |
| headers["x-request-id"] = rid | |
| return RawRequestInfo(headers=headers) | |
| headers = dict(raw_request_info.headers) if raw_request_info is not None else {} | |
| headers = {k: v for k, v in headers.items() if k.lower() != "x-request-id"} | |
| headers["x-request-id"] = rid | |
| import dataclasses | |
| if raw_request_info is not None and dataclasses.is_dataclass(raw_request_info): | |
| return dataclasses.replace(raw_request_info, headers=headers) | |
| return RawRequestInfo(headers=headers) |
| prefill_zmq = self._peer_prefill_zmq(peer) | ||
| decode_zmq = self._own_zmq_address() | ||
| seed = self._request_seed(request) | ||
| uid = derive_uid(seed) | ||
| request_id = build_pd_request_id(prefill_zmq, decode_zmq, uid) |
There was a problem hiding this comment.
If prefill_zmq or decode_zmq is None or empty, build_pd_request_id will silently construct an invalid request ID containing "None" (e.g., "___prefill_addr_None___decode_addr_..."), which will fail later with cryptic regex mismatch or connection errors. Adding explicit defensive checks here prevents silent failures and provides clear debugging information.
prefill_zmq = self._peer_prefill_zmq(peer)
if not prefill_zmq:
raise ValueError(
"Prefill peer MoRIIO ZMQ address is missing from peer metadata. "
"Ensure the prefill replica has completed setup and published its metadata."
)
decode_zmq = self._own_zmq_address()
if not decode_zmq:
raise RuntimeError(
"Local MoRIIO ZMQ address is not initialized. "
"Ensure setup() has been called on the local connector backend."
)
seed = self._request_seed(request)
uid = derive_uid(seed)
request_id = build_pd_request_id(prefill_zmq, decode_zmq, uid)|
|
||
| # 1. Remote prefill | ||
| prefill_request = self._prepare_prefill_request(request) | ||
| be = self._get_connector_backend() |
There was a problem hiding this comment.
By ensuring that request.request_id is populated early in _pd_handle_request using the authoritative Serve request ID, we can completely avoid relying on the fragile id(request) fallback in the MoRIIO connector backend. This guarantees a consistent and stable request ID across the entire disaggregated prefill/decode pipeline.
| be = self._get_connector_backend() | |
| if hasattr(self, "_maybe_add_request_id_to_request"): | |
| await self._maybe_add_request_id_to_request(request) | |
| be = self._get_connector_backend() |
2a6d06b to
c1df389
Compare
Adds an optional `record_replica_metadata` deployment hook, captured once when a replica becomes ready and surfaced as `ReplicaSelection.replica_metadata`. Distinct from `record_routing_stats` (dynamic/polled) — this is static identity/coordination data captured once and immutable. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Signed-off-by: Kourosh Hakhamaneshi <kourosh@anyscale.com>
_maybe_add_request_id_to_request no longer clobbers an explicitly-set request_id, and the vLLM engine canonicalizes the X-Request-Id header from request.request_id, so a P/D orchestrator / KV-connector can control the engine request id by setting request.request_id alone. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Signed-off-by: Kourosh Hakhamaneshi <kourosh@anyscale.com>
Refactors PDOrchestratorMixin to delegate request shaping, peer addressing, and handoff discipline to BaseConnectorBackend (requires_peer_binding, concurrent_handoff, prepare_prefill_request, prepare_decode_request). The defaults reproduce the existing NIXL/default flow exactly; connectors that need pre-dispatch peer binding (e.g. request-id-addressed transfers) can opt into choose_replica + concurrent handoff without new orchestrator concepts. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Signed-off-by: Kourosh Hakhamaneshi <kourosh@anyscale.com>
…he/Multi - Make BaseConnectorBackend.prepare_prefill/decode_request abstract; add a shared DefaultPDProtocolMixin with the standard (no-peer, sequential) policy. - NIXL, LMCache, and Multi connector backends now use the interface via the mixin. - Guard prepare_decode_request against a None prefill_response (concurrent mode). - Cancel the background prefill task if local decode fails/cancels (no leak). - Keyword-only, typed prepare_* signatures; Optional[BaseConnectorBackend] return type. - Connector-agnostic protocol docs; keep requires_peer_binding/concurrent_handoff flags (independent: standard=F,F; push-addressed=T,T; pull-addressed=T,F). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Signed-off-by: Kourosh Hakhamaneshi <kourosh@anyscale.com>
…ng, helper - MultiConnectorBackend delegates prepare_*/flags to its top-most sub-connector (rather than inheriting the default mixin), so a sub-connector's policy governs. - Cache the resolved connector backend on the server (no per-request factory call). - Extract the concurrent prefill+decode handoff into a _concurrent_decode helper (dedupes the two paths; cancels the background prefill if decode doesn't finish). - Inline peer=None in the default path; drop the git-history comment. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Signed-off-by: Kourosh Hakhamaneshi <kourosh@anyscale.com>
Implements MoRIIOConnectorBackend against the abstract connector interface: pre-dispatch peer binding (reads the selected prefill's replica_metadata), a stateless deterministic dual-address request_id + transfer_id, WRITE (concurrent) and READ (sequential) handoff via read_mode, and publishes its zmq address through the replica-metadata hook. Registered in KVConnectorBackendFactory. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Signed-off-by: Kourosh Hakhamaneshi <kourosh@anyscale.com>
c1df389 to
32d4f14
Compare
Why
Add the MoRIIO KV-transfer connector to Ray Serve LLM prefill/decode — as a connector backend on
the abstract P/D protocol (#63950), with no new orchestrator concepts.
What
MoRIIOConnectorBackendimplements the connector P/D protocol:requires_peer_binding = True— the orchestrator selects the prefill replica first and passes itspublished
replica_metadata(the MoRIIO zmq address) aspeer.concurrent_handoff=not read_mode— WRITE mode overlaps prefill push with local decode; READmode is sequential (decode pulls using the block ids the prefill returns).
prepare_prefill_request/prepare_decode_requeststamp a stateless, deterministicdual-address
request_id+transfer_id(derived viauuid5from the incoming request id, so thetwo calls agree without per-request state) and the appropriate
kv_transfer_params.record_replica_metadataon theprefill server →
replica_metadata()on the backend).Registered in
KVConnectorBackendFactory; the builder shifts decode's MoRIIO port bases (mirrors NIXL).Stack
Builds on the series: per-replica metadata hook (#63948),
request.request_idauthoritative (#63949),and the abstract PD connector protocol (#63950). This PR's diff currently includes those until they
merge.
Cross-node correctness additionally needs a worker host-IP shim (advertise the node-internal IP
inside each vLLM worker, since
VLLM_HOST_IPis excluded from driver→worker env copy) — a follow-up PR.Testing
test_moriio_connector.py(CPU/mock): backend setup (ports/zmq/read_mode);requires_peer_binding;concurrent_handoffwrite/read;replica_metadata(); deterministic request_id identical acrossprepare_prefill/prepare_decode, vLLM-regex compatible, round-trips to the right peer zmq; factoryregistration. GPU-validated end-to-end (WRITE+READ, 1P1D/3P1D, DeepSeek-R1 TP8) in the out-of-tree
prototype.
🤖 Generated with Claude Code