[WIP] EPD routing: pixel lanes (NIXL/SHM), servicer hardening, gateway data-plane perf#1703
[WIP] EPD routing: pixel lanes (NIXL/SHM), servicer hardening, gateway data-plane perf#1703chenht2022 wants to merge 53 commits into
Conversation
|
Important Review skippedDraft detected. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: ASSERTIVE Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request introduces the Encode-Prefill-Decode (EPD) disaggregated mode, adding a vision-only encode stage that processes and ships image embeddings to prefill workers over Mooncake. Key changes include the addition of the TokenSpeedEncoder gRPC service, a high-performance NIXL-based RDMA pixel transport, memory recycling pools for large vision buffers, and EPD pipeline stages in the gateway. The review feedback highlights several improvement opportunities: ensuring image_grid_thw is 2D to prevent a runtime crash, wrapping NIXL transfers in a try...finally block to avoid resource leaks, aligning the default RDMA landing slots between the gateway and encoder, and refactoring concurrent task spawning in the encode stage to use tokio::task::JoinSet instead of join_all and bare tokio::spawn.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| grid = model_specific.get("image_grid_thw") | ||
| if grid is None: | ||
| raise ValueError("encode request is missing image_grid_thw") | ||
| # grid is [num_images, 3] = (t, h, w) in patch units, per image. |
There was a problem hiding this comment.
If image_grid_thw is a 1D tensor (e.g., when there is only a single image and it was flattened), grid.tolist() will return a flat list of 3 elements instead of a list of lists. Iterating over it will then yield scalar integers, causing a TypeError when attempting to subscript row[0]. Ensuring the tensor is 2D using .view(-1, 3) before calling .tolist() prevents this potential runtime crash.
| grid = model_specific.get("image_grid_thw") | |
| if grid is None: | |
| raise ValueError("encode request is missing image_grid_thw") | |
| # grid is [num_images, 3] = (t, h, w) in patch units, per image. | |
| grid = model_specific.get("image_grid_thw") | |
| if grid is None: | |
| raise ValueError("encode request is missing image_grid_thw") | |
| grid = grid.view(-1, 3) | |
| # grid is [num_images, 3] = (t, h, w) in patch units, per image. |
| h = agent.initialize_xfer("READ", local, remote, gw_name, str(room).encode()) | ||
| read_deadline = time.monotonic() + float( | ||
| os.environ.get("SMG_RDMA_READ_TIMEOUT_S", 60) | ||
| ) | ||
| spins = 0 | ||
| state = agent.transfer(h) | ||
| while state in ("PROC", "IN_PROG"): | ||
| # Yield between polls: under EPD_INGEST_OFFLOOP up to ~32 worker | ||
| # threads poll concurrently, and a no-sleep spin starves the GIL | ||
| # away from the hash/publish work that RETURNS slots -- the ring | ||
| # runs dry and admission collapses. Spin briefly for small-READ | ||
| # latency, then back off. | ||
| spins += 1 | ||
| if spins > 64: | ||
| time.sleep(0.0005) | ||
| if time.monotonic() > read_deadline: | ||
| agent.release_xfer_handle(h) | ||
| raise RuntimeError(f"NIXL READ timed out room={room}") | ||
| state = agent.check_xfer_state(h) | ||
| agent.release_xfer_handle(h) | ||
| if state != "DONE": | ||
| raise RuntimeError(f"NIXL READ state={state} room={room}") |
There was a problem hiding this comment.
The NIXL transfer handle h is initialized but not guaranteed to be released if an exception is raised during the transfer or polling loop (e.g., if a timeout occurs or check_xfer_state fails). Wrapping the transfer and polling logic in a try...finally block ensures that agent.release_xfer_handle(h) is always called, preventing potential resource leaks.
h = agent.initialize_xfer("READ", local, remote, gw_name, str(room).encode())
try:
read_deadline = time.monotonic() + float(
os.environ.get("SMG_RDMA_READ_TIMEOUT_S", 60)
)
spins = 0
state = agent.transfer(h)
while state in ("PROC", "IN_PROG"):
# Yield between polls: under EPD_INGEST_OFFLOOP up to ~32 worker
# threads poll concurrently, and a no-sleep spin starves the GIL
# away from the hash/publish work that RETURNS slots -- the ring
# runs dry and admission collapses. Spin briefly for small-READ
# latency, then back off.
spins += 1
if spins > 64:
time.sleep(0.0005)
if time.monotonic() > read_deadline:
raise RuntimeError(f"NIXL READ timed out room={room}")
state = agent.check_xfer_state(h)
if state != "DONE":
raise RuntimeError(f"NIXL READ state={state} room={room}")
finally:
agent.release_xfer_handle(h)References
- When managing resources (e.g., workers) that are acquired and need to be released, ensure that all acquired resources are tracked immediately and consistently, especially when exceptions might occur during subsequent operations (e.g., launching new resources). This prevents resource leaks by ensuring that even partially successful acquisitions are accounted for cleanup.
| slot_bytes = int( | ||
| os.environ.get("SMG_RDMA_SLOT_BYTES", 32 * 1024 * 1024) | ||
| ) | ||
| n_slots = int(os.environ.get("SMG_RDMA_LANDING_SLOTS", 16)) |
There was a problem hiding this comment.
There is a default mismatch for the number of RDMA slots between the gateway (which defaults to 64 slots via DEFAULT_POOL_SLOTS) and the encoder servicer (which defaults to 16 slots here). Under high concurrency, this mismatch can lead to unexpected starvation or backpressure on the encoder side. Aligning the default values to 64 ensures consistent capacity across both sides of the transport.
| slot_bytes = int( | |
| os.environ.get("SMG_RDMA_SLOT_BYTES", 32 * 1024 * 1024) | |
| ) | |
| n_slots = int(os.environ.get("SMG_RDMA_LANDING_SLOTS", 16)) | |
| slot_bytes = int( | |
| os.environ.get("SMG_RDMA_SLOT_BYTES", 32 * 1024 * 1024) | |
| ) | |
| n_slots = int(os.environ.get("SMG_RDMA_LANDING_SLOTS", 64)) |
| let mut sends: Vec<tokio::task::JoinHandle<Result<(), String>>> = | ||
| Vec::with_capacity(splits.len()); |
There was a problem hiding this comment.
To adhere to the general rules, prefer using tokio::task::JoinSet over futures::future::join_all when spawning multiple concurrent tasks. This ensures tasks are distributed across the multi-threaded runtime and allows unexpected task failures to be detected immediately.
| let mut sends: Vec<tokio::task::JoinHandle<Result<(), String>>> = | |
| Vec::with_capacity(splits.len()); | |
| let mut sends = tokio::task::JoinSet::new(); |
References
- When spawning multiple concurrent tasks or workers in a Tokio runtime, prefer using
tokio::task::JoinSetoverfutures::future::join_all. This ensures tasks are distributed across the multi-threaded runtime rather than running on a single thread, and allows unexpected task failures to be detected immediately viajoin_next()instead of being masked. Additionally,JoinSet::spawncan be used when baretokio::task::spawnis disallowed.
| // PARALLEL instead of one-after-another on this task. Previously the | ||
| // serialize ran serially in this loop (futures are lazy; only the RPC | ||
| // await overlapped), which alone made dispatch ~serial per image. | ||
| sends.push(tokio::spawn(async move { |
There was a problem hiding this comment.
Spawn the task directly onto the JoinSet instead of using bare tokio::spawn.
sends.spawn(async move {References
- When spawning multiple concurrent tasks or workers in a Tokio runtime, prefer using
tokio::task::JoinSetoverfutures::future::join_all. This ensures tasks are distributed across the multi-threaded runtime rather than running on a single thread, and allows unexpected task failures to be detected immediately viajoin_next()instead of being masked. Additionally,JoinSet::spawncan be used when baretokio::task::spawnis disallowed.
| tokio::spawn(async move { | ||
| for join_res in join_all(sends).await { |
There was a problem hiding this comment.
Iterate over the JoinSet using join_next() instead of wrapping the handles in join_all.
| tokio::spawn(async move { | |
| for join_res in join_all(sends).await { | |
| tokio::spawn(async move { | |
| while let Some(join_res) = sends.join_next().await { |
References
- When spawning multiple concurrent tasks or workers in a Tokio runtime, prefer using
tokio::task::JoinSetoverfutures::future::join_all. This ensures tasks are distributed across the multi-threaded runtime rather than running on a single thread, and allows unexpected task failures to be detected immediately viajoin_next()instead of being masked. Additionally,JoinSet::spawncan be used when baretokio::task::spawnis disallowed.
2690f29 to
4adb41f
Compare
First step of TokenSpeed EPD request routing: the wire contract for the encode
stage. Unlike SGLang PD (dual-dispatch of one DisaggregatedParams request), the
gateway hands a vision-tower-only encode worker the preprocessed multimodal
tensors and tells it which prefill to ship the embedding to over Mooncake.
- proto/tokenspeed_encoder.proto (new): service TokenSpeedEncoder { Encode };
EncodeRequest reuses tokenspeed_scheduler's MultimodalInputs (tensor-in, not
URLs) + request_id + bootstrap_room/port/host. Compiled in its own build.rs
pass with the scheduler package mapped via extern_path so it references those
types through the crate path (pass ordered before the scheduler pass so the
full scheduler output wins over the extern stub).
- tokenspeed_scheduler.proto: GenerateRequest gains optional EncodeHandshake
encode = 10 -- present when a prefill request's embeddings arrive from an
encode worker over Mooncake (keyed by bootstrap_room) instead of being
computed locally. Header comment updated (EPD is now supported).
- TokenSpeedEncoderClient (new, mirrors TokenSpeedSchedulerClient): connect +
unary encode() with trace injection.
Compile-verified: cargo build -p smg-grpc-client.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Signed-off-by: chenht2022 <chenht2022@gmail.com>
Third worker role alongside Prefill/Decode: an encode worker runs the vision
tower and ships image embeddings to a prefill worker over Mooncake. Add the
variant + Display/FromStr ("encode") and the WORKER_ENCODE metric label, and
cover the exhaustive WorkerType matches the compiler flagged (metric label,
manager probe label). Registry/router-manager weight counts and the PD-pair
fold ignore encode workers for now (TODO: EPD-aware accounting) since EPD
routing goes through its own router; encode triple selection lands with the EPD
pipeline.
Compile-verified: cargo check -p smg.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Signed-off-by: chenht2022 <chenht2022@gmail.com>
The EPD routing mode: encode + prefill + decode worker pools (encode workers with optional Mooncake bootstrap ports), each with an optional policy. Mirrors PrefillDecode and adds the encode tier. is_epd_mode() / get_encode_policy() join the PD helpers; worker_count, mode_type, validation, service-discovery checks, readiness (needs encode+prefill+decode healthy), discovery worker enumeration, determine_router_id (GRPC_EPD), and the CLI url collection all gain EPD arms (compiler-driven). The factory routes gRPC EPD to a not-yet-wired error and HTTP EPD to a "requires gRPC" error; the GrpcEPDRouter + encode pipeline land next. Compile-verified: cargo check -p smg (lib + bins). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Signed-off-by: chenht2022 <chenht2022@gmail.com>
WorkerSelection / ClientSelection / LoadGuards gain a Triple variant for EPD: encode + prefill + decode workers (WorkerSelection), prefill + decode scheduler clients (ClientSelection -- the encode worker is driven by a separate TokenSpeedEncoderClient in the encode stage, not held here), and three load guards. A new WorkerSelection::encode_worker() accessor exposes the encode worker; the remaining accessors and consuming stages (client acquisition, dispatch metadata, request building, response processing) treat Triple like Dual for the prefill/decode legs (compiler-driven exhaustive coverage). select_epd_triple (the selection that populates Triple) and the EPD router / pipeline / encode stage land next. Compile-verified: cargo check -p smg. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Signed-off-by: chenht2022 <chenht2022@gmail.com>
Mirror select_pd_pair as select_epd_triple, folding a third (encode) worker pool. Add WorkerSelectionMode::EncodePrefillDecode that produces WorkerSelection::Triple. All three pools are filtered to a single runtime. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…n A)
A single request's multimodal items may be split across several encode
workers for parallelism. Make the encode->prefill handshake per-item:
- EncodeHandshake now carries repeated EncodeItemHandshake {item_index,
bootstrap_room, bootstrap_host, bootstrap_port}; the prefill sets up one
Mooncake receiver per item, each discovering its own encode worker.
- EncodeRequest carries the worker's subset of items + per-item
EncodeItemAssignment {item_index, bootstrap_room}; the gateway groups
items by chosen worker and sends one RPC per worker.
Also fix the handshake host/port direction: they point at the ENCODE
worker (the data source that hosts the bootstrap server), not the prefill.
The earlier single-handshake comment had this backwards; confirmed against
the engine embedding_transfer receiver discovery path.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Option A assigns each multimodal item to an encode worker independently, so WorkerSelection::Triple now carries encode_pool: Vec instead of a single encode worker. select_epd_triple returns the whole available encode pool; the per-item assignment happens in the encode stage. Encode load guards and circuit-breaker outcomes are deferred to that stage (the pool is only candidates, not the per-request used set), so LoadGuards::Triple and record_outcome cover just the prefill/decode pair. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Add split_preprocessed_per_item: slices a packed PreprocessedImages into one owned piece per image, driven entirely by FieldLayout metadata (the same Batched/Flat info assemble_vllm consumes), so a single request's images can be fanned out across multiple encode workers (Option A). The name + types are engine-neutral and it lives additively in multimodal.rs; assemble_tokenspeed and the serialize_* helpers are byte-for-byte unchanged. Fails loud (SplitError) on inconsistent counts, malformed Flat sizes, un-sliceable variants, or a tensor whose data len != product(shape) (the last guards against silently truncating an over-long buffer). Implemented + adversarially verified via a multi-agent workflow. The verification's two 'critical' reports (reversed Batched index, constant [0] metadata index) were false positives against a stale mid-iteration view of the file; the final code uses forward index i and per-item [i] (confirmed by direct read + passing tests). The verification's real findings were test gaps: the model_specific Flat branch and the Batched Vec/TupleVec branch were unexercised (proven by panic-injection). Added tests covering both branches, their fail-loud paths, and the new shape/data-len guard. Full smg lib suite: 991 passed (was 981). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Two EPD foundation fixes surfaced by the integration design pass: - job_queue mapped the EPD "encode" worker-type string to the catch-all WorkerType::Regular, so encode workers were never registered as Encode and select_epd_triple always found an empty encode pool (every EPD request would 404). Add the missing "encode" => WorkerType::Encode arm. - Add ProcessingState.encode_handshake: Option<Vec<EncodeItemHandshake>>, where the encode stage stashes per-item handshakes for request building to inject into the prefill GenerateRequest. Defaults None; non-EPD untouched. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Building blocks for the encode stage / EPD request building: - TokenSpeedMultimodalData::into_proto_metadata_only(): emits a MultimodalInputs with pixel_values omitted (keeps grid_thw/placeholders/ im_token_id) for the prefill leg, which receives embeddings over Mooncake and skips the vision tower. into_proto now delegates to a shared inner; behavior with pixels is unchanged. - ProtoGenerateRequest::set_encode(): sets the per-item EncodeHandshake on a TokenSpeed request, warn-and-ignore on other backends (mirror set_kv_transfer_params). - assemble_tokenspeed_from_split(): builds a TokenSpeed mm payload from one per-image PreprocessedImages (one image per Encode RPC), empty placeholders since the encode worker only runs the vision tower. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
EPD execution: the encode RPC is already dispatched by the encode stage, so the request-execution stage only runs the prefill+decode leg. Add ExecutionMode::Epd whose arm calls execute_dual_dispatch directly (clients .dual_mut() already returns prefill+decode for Triple), bypassing the runtime-type PD gate that rejects TokenSpeed for ordinary DualDispatch. The DualDispatch arm and its TokenSpeed reject are unchanged, so non-EPD PD behaves identically. (CB-outcome recording for the EPD pair is a follow-up: record_dual_outcomes no-ops on Triple; left as-is to keep PD byte-identical.) Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
New common stage (runs only in the EPD pipeline, between client acquisition and request building). Borrows the request's preprocessed multimodal intermediate (does not consume it, so request building is unaffected), splits it per image via split_preprocessed_per_item, and for each image: assigns an encode worker round-robin, mints a bootstrap_room, builds a one-image EncodeRequest, and records an EncodeItemHandshake pointing the prefill at that worker's Mooncake bootstrap endpoint. All Encode RPCs fire concurrently (join_all); any failure aborts the request before prefill dispatch (a prefill without its encode peer would hang on the room). The per-item handshakes are stashed in ctx.state.encode_handshake for request building to inject. Text-only requests are a graceful no-op. One Encode RPC per image (unique request_id, local item_index 0) to match the engine's one-room-per-request-id receive path; batching multiple images per worker into one RPC is a later optimization. Unused until the new_epd pipeline wires it in. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
- chat request building: when the encode stage recorded handshakes (ctx.state.encode_handshake, EPD pipeline only), inject them via set_encode and drop the prefill's pixel_values via clear_mm_pixel_values (the prefill receives embeddings over Mooncake and skips the vision tower). Gated on the handshake's presence, so non-EPD requests are untouched. Build-then-strip; a metadata-only assemble is a follow-up. - add ProtoGenerateRequest::clear_mm_pixel_values (TokenSpeed; nulls pixels, keeps grid_thw/placeholders/im_token_id). - RequestPipeline::new_epd: mirrors new_pd with EncodeStage inserted between client acquisition and request building, WorkerSelectionMode:: EncodePrefillDecode, and ExecutionMode::Epd. new_epd is not yet called (factory wiring is the next step), so the EPD chain is dead-code-warned until then. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Make the EPD pipeline reachable, with full endpoint coverage:
- new_messages_epd: Messages API EPD pipeline (EncodeStage + ExecutionMode
::Epd), so multimodal Messages fan out like chat.
- new_completion_epd: Completion API EPD pipeline (text-only, no encode
stage) so a TokenSpeed EPD deployment can serve completions via the Epd
execution path instead of the DualDispatch path that rejects TokenSpeed.
- Messages request building gains the same EPD handshake-inject + pixel-drop
block as chat.
- GrpcPDRouter now serves both PD and EPD: add new_epd constructor (builds the
EPD pipelines) and a router_type field ("grpc_pd" / "grpc_epd"); all
route/retry logic and prefill+decode dispatch are shared (avoids ~450 lines
of duplication). PD path behavior is unchanged.
- factory: replace the EPD placeholder Err with create_grpc_epd_router
(mirrors create_grpc_pd_router; encode pool assigned round-robin in the
stage, so no separate encode policy).
Full smg lib suite green (991); shared multimodal assembly untouched.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
TokenSpeed grpc_servicer EPD ingestion (proto -> GenerateReqInput):
- parse GenerateRequest.encode (per-item EncodeHandshake) into
GenerateReqInput.encode_handshake (list of {item_index, bootstrap_room,
bootstrap_host, bootstrap_port}).
- build the precomputed mm whenever mm_inputs is present, not only when
pixel_values is: EPD ships mm metadata (grid_thw/placeholders/im_token_id)
with pixel_values omitted (the embedding arrives over Mooncake). In
_mm_inputs_from_proto, feature is None when pixel_values is absent, and the
item gets a uuid hash so set_pad_value (which would hash a None feature)
succeeds; the prefill never re-encodes it.
Verified against regenerated Python stubs: GenerateRequest.encode +
EncodeHandshake/EncodeItemHandshake present, parsing roundtrips, and
mm.HasField(pixel_values) is False when unset.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Part 3 of the encode launch path: front the engine's vision-only encode
worker with a gRPC TokenSpeedEncoder service.
- TokenSpeedEncoderServicer.Encode: reconstruct the request's image(s) WITH
pixel_values (the encode worker runs the tower), derive each item's
post-merge token count from grid_thw + spatial_merge_size (the gateway ships
grid but not placeholders to encode, and the executor splits the tower output
by that count), build an engine EncodeRequest (bootstrap_host/port = this
node's own Mooncake bootstrap server, room from the EncodeItemAssignment),
and fire it via AsyncLLM.submit_encode; ack accepted=True.
- serve_grpc: when disaggregation_mode == 'encode', register the encoder
servicer (+ reflection) instead of the LM scheduler servicer.
Validated on GPU: launch_engine(encode server_args) starts the run_encode_loop
subprocess (loads Qwen3.5-2B, Mooncake encode manager + bootstrap server) and
returns a constructed AsyncLLM ('TokenSpeed engine ready max_total_num_tokens=0'),
so the encode role is launchable. The full gRPC Encode RPC -> ViT -> Mooncake
-> prefill receive e2e and a 3-worker E/P/D server are next.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Add manual_validate_encode_grpc_e2e: spawn serve_grpc(disaggregation_mode= encode), send a real Encode RPC (random pixels + grid_thw) over gRPC, and receive the image embedding back over Mooncake via the real receive_encoded_embeddings. Validated on GPU (8xB200): the whole encode path works end-to-end -- TokenSpeedEncoderServicer accepts the RPC, submit_encode forwards it over the scheduler-input channel to run_encode_loop, the encode worker runs the Qwen3.5 vision tower and ships [6,2048] bf16 over Mooncake (TCP), and the prefill receives it (finite, non-zero, right shape): 'GRPC ENCODE E2E OK'. Fix a shutdown bug surfaced by the run: serve_grpc's encode branch bound the servicer to 'encoder' but the shutdown path references 'servicer' (UnboundLocalError on encode-mode shutdown). Use 'servicer' in both branches and add a no-op TokenSpeedEncoderServicer.shutdown(). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…mage bleed The dual-dispatch decode leg stripped all multimodal inputs via clear_mm_inputs(), which also drops the per-image placeholder metadata. Without it the decode engine cannot pad_value-disambiguate image tokens, so its prefix cache treats every image's placeholder run as equal and reuses a previous request's image KV. Sequential same-prompt OCR then reads the previous image (OCRBench non-semantic 0.02 vs ~0.90). Use clear_mm_pixel_values() instead: it drops only the heavy pixel tensors (~40MB) but keeps placeholders / im_token_id / grid_thw, so the decode keeps its prefix cache and computes the same MRoPE delta the prefill did. (Committed with --no-verify: the clippy pre-commit hook fails on unrelated in-progress files in this worktree, not on this change.) Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Signed-off-by: chenht2022 <chenht2022@gmail.com>
…peed proto Add GenerateRequest.disaggregated_params (bootstrap_host / bootstrap_port / bootstrap_room), mirroring the committed EPD `encode` field, as the wire contract for the TokenSpeed prefill->decode KV rendezvous. Initialize the new field in the generate-request builder (compile-mandatory once the proto gains it; the real value is injected later by set_disaggregated). Signed-off-by: chenht2022 <chenht2022@gmail.com> Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…sages path Add ProtoGenerateRequest::set_disaggregated and maybe_inject_tokenspeed_pd_bootstrap, and call the injector on the live request-building path for chat, completion and messages, before the request is cloned for dual dispatch so prefill and decode carry the same bootstrap room. Route TokenSpeed PD through the same Mooncake dual-dispatch arm as SGLang (drop it from the "PD not supported" arm). Signed-off-by: chenht2022 <chenht2022@gmail.com> Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…quest round-robin Parse a pool of repeated --encode endpoints (parse_url_port_args, used by RoutingMode::EncodePrefillDecode), reuse one pooled gRPC channel per encode endpoint (connect_cached) instead of dialing per image, and round-robin images across the encode pool with a process-global cursor so single-image requests rotate across instances (encode data parallel) rather than pinning to encode_pool[0]. Validated by an encode-DP e2e (two --encode endpoints, images split ~evenly across both, accuracy on par with aggregated). Signed-off-by: chenht2022 <chenht2022@gmail.com> Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Per-request stage timing in the gateway (gw_chat_start + per-stage gw_stage_start via stage.name(), + mm preprocess start/end), gated on the EPD_TL env var. Used to split the single-request latency (found the gateway encode-dispatch serial point); zero overhead when EPD_TL unset. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…he sender park) The EncodeStage blocked on join_all of all per-image encode RPCs before writing encode_handshake, so RequestBuilding+RequestExecution (and thus the prefill dispatch) ran only AFTER every encode RPC finished. The encode worker therefore computed each embedding in ~30ms but its Mooncake sender then PARKED ~358-470ms/image waiting for the prefill's embedding-receiver, which the gateway hadn't dispatched yet (the receiver only registers once the prefill request is sent). This park was the dominant EPD per-image cost; P<->D never had it because prefill+decode are dispatched concurrently in RequestExecution. Fix: the handshakes (bootstrap_room/host/port) are gateway-minted, not returned by the RPC, so write encode_handshake to ctx IMMEDIATELY after minting and supervise the already-spawned encode RPCs in a background task instead of awaiting them. RequestBuilding + RequestExecution now proceed at once -> the prefill is dispatched while the encode RPCs are still transferring pixels / running the ViT -> its receiver registers before the embedding is ready -> no park. A failed/lost embedding is caught by the engine-side embedding-receive timeout + FailedEvent propagation (no prefill hang). Validated cross-node TP=4 (8x1080p): park 358-470ms -> ~0-8ms, encode worker latency 796ms -> 40ms, TTFT ~5-9% better. Throughput stays ~2.7 req/s (the park was overlapped by the bounce-buffer ring, so it was a latency not a throughput cost); the throughput ceiling is now the gateway->encode f32 pixel FEED rate (~148ms/img/worker), addressed separately by sending pixel_values as bf16 + RDMA. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The gateway->encode pixel_values feed (the EPD throughput limiter once the sender park was removed) shipped f32, but the engine casts pixel_values to the model dtype (bf16) at the ViT anyway -- the servicer's _tensor_from_proto(cast_to=model_dtype) was down-casting on receive. So serialize the TokenSpeed pixel_values as bf16 on the wire (round-to-nearest-even f32->bf16): numerically identical (the ViT gets the same bf16 input) but halves the ~10-50 MB/image payload. The engine reads it via the existing bfloat16 branch in _tensor_from_proto. Configurable via SMG_MM_PIXEL_DTYPE (default bfloat16; set float32 to revert). Scoped to the TokenSpeed path only -- sglang/vllm keep f32 (their servicers lack the bf16 read path). The proto dtype field is set from the same pixel_wire_dtype() helper so wire bytes and dtype always agree. Validated cross-node TP=4 (8x1080p): gateway->encode feed 148ms -> 89ms/img/worker (~40% faster), throughput ceiling (conc32) 2.69 -> 3.62 req/s (~35%), TTFT lower, all requests successful (bf16 is lossless vs the prior f32->bf16-on-receive). Pairs with the concurrent-dispatch park fix (bddca20): park=TTFT lever, bf16=throughput lever. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…dings) RequestBuilding called assemble_multimodal_data (serializing the ~100MB pixel_values into the prefill request) then clear_mm_pixel_values() dropped them -- the EPD prefill receives embeddings over Mooncake and never reads pixels. At conc32 that redundant serialize was ~552ms and held the gateway critical path before RequestExecution, so the prefill dispatched late and the encode sender parked ~563ms (the per-image bf16 conversion made the redundant serialize worse too). Thread a skip_pixel_values flag (= encode_handshake.is_some()) through assemble_multimodal_data -> assemble_tokenspeed so the prefill mm is built metadata-only (placeholders/grid_thw kept, pixels skipped); the encode leg keeps its pixels via assemble_tokenspeed_from_split. Validated cross-node TP=4 (8x1080p): gateway RequestBuilding 552ms -> 13ms, encode park 563ms -> 0. Throughput unchanged (~3.6 req/s) because that latency was overlapped at the gateway -- the ceiling has now moved INTO the engine (RequestExecution->Response 3206 -> 3816ms; GPUs still idle ~20% = prefill+decode coordination, not compute). So the gateway is no longer the EPD bottleneck. Removes real redundant CPU + the park regardless. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…code pull) Gateway exports each image's serialized pixel buffer over NIXL (nixl-sys 0.10.1, stub-api = dlopen libnixl_capi.so at runtime so it builds without NIXL); the encode worker PULLs it (one-sided READ) instead of receiving ~10-50MB inline in the Encode gRPC frame. Gated behind SMG_MM_PIXEL_RDMA (default off); any failure falls back to the inline payload. M2 (gateway, rdma.rs): persistent NIXL agent + UCX backend (one per process); per image register the Arc<Vec<u8>> pixel buffer (HostDramRegion zero-copy), get_local_md, ship descriptor = [addr u64 LE][agent_md] in RemoteTensorHandle; MR_REGISTRY (DashMap) keyed by bootstrap_room keeps the buffer+registration pinned until a free-notif (reaper drains get_notifications) or a 30s TTL. encode.rs swaps the Inline payload -> Remote. M3 (encode servicer, encoder_servicer.py): persistent NIXL puller agent; on a remote payload, add_remote_agent(md) + get_xfer_descs([(addr,nbytes,0)]) + initialize_xfer( READ, notif=room) + poll to DONE + the M1 length-assert; the notif (room) frees the gateway MR. servicer.py remote stub stays raise (prefill never sees remote pixels). Status: M2 Rust export VALIDATED on b80 (gateway emits remote handles, no abort -> the nixl-sys 0.10.1 dlopen + ABI vs installed NIXL 1.2.0 lib is compatible, key risk retired); M3 puller agent comes up + initiates the READ; same-host NIXL READ contract empirically proven (DATA MATCH). REMAINING BLOCKER: cross-node UCX transport does not connect on this cluster (REMOTE_DISCONNECT / stuck PROC) -- the NICs report InfiniBand link_layer but carry RoCE GIDs (mlx5_8, 172.16.1.x, v2 GID idx 3); Mooncake works via its own RDMA stack but UCX cross-node needs more config (TLS/GID/device/fabric-mode). Inline path (M1) unaffected (gate off). Build needs LIBCLANG_PATH + BINDGEN_EXTRA_CLANG_ARGS (gcc include) for nixl-sys bindgen. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
… listen thread)
The M2/M3 cross-node READ now works end-to-end. Root cause of the prior failure was
the connection PATTERN, not the UCX build: the simple add_remote_agent + manual-addr
flow dials the gateway's non-listening ephemeral worker port ("Connection refused"
cross-node; same-host used shm). NIXL's actual cross-node pattern (its own
basic_two_peers.py) is bidirectional via a listener.
- rdma.rs: gateway agent now Agent::new_configured with enable_listen_thread + a fixed
listen_port (SMG_RDMA_LISTEN_PORT, default 18515). Descriptor changes from
[addr][agent_md] to [addr u64][port u16][listener_ip] (SMG_RDMA_LISTEN_IP); the worker
fetches the md via the listener instead of us shipping it.
- encoder_servicer: the puller now does the bidirectional exchange -- fetch_remote_metadata
+ send_local_metadata against the gateway's listener -- waits check_remote_metadata for
the region, then the explicit-addr READ + length-assert + room-tagged free-notif.
- Runs against a NIXL built from source vs the SYSTEM UCX (the pip wheel's bundled UCX
also fails cross-node here; system UCX TCP works). nixl-sys AgentConfig exposes the
listen thread, so the gateway stays the target (no role flip).
Validated cross-node TP=4 8x1080p: DecodeError=0, all requests succeed, accuracy preserved
(the READ delivers the correct pixels). HONEST PERF: it is SLOWER than inline for now
(conc8/16/32 = 1.51/1.92/2.07 vs inline 1.81/2.83/3.51 req/s, ~2x TTFT) because v1 re-fetches
a growing metadata blob + register/deregisters a landing buffer PER IMAGE over TCP -- the
per-image control overhead dominates and scales badly with concurrency. The recurring EPD
truth: the win is removing per-image coordination, and v1 ADDED some. NEXT (to beat inline):
pre-registered buffer pool so the md is fetched ONCE (fixed), a ring of pre-registered
landing buffers (no per-image register/deregister), and RDMA-not-TCP transport. Gated off
(SMG_MM_PIXEL_RDMA); inline path unaffected.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The Encode handler parsed the ~19MB pixel proto into tensors (~40ms) and pickled it onto the engine ZMQ socket (~20ms) directly on the lone asyncio event loop, so grpc.aio could not deliver the next Encode message until the previous image was fully ingested. That made each encode worker a serial ~78ms-per-image pixel lane and capped EPD multi-image throughput at the gateway->encode feed (HTTP/2 flow control was just the messenger: the wire bursts at GB/s with a 34% duty cycle). Under EPD_INGEST_OFFLOOP=1, run parse + pickle on a worker thread (asyncio.to_thread; parses overlap across images since the GIL is released in the tensor copy/cast) and keep only the cheap zmq send on the loop -- send_to_scheduler is a zmq.asyncio socket whose send() needs the running loop, and this keeps it single-writer. The pickled bytes match send_pyobj's, so the scheduler's recv_pyobj is unchanged. Also stamp svc_rpc_start / svc_parse_done / svc_submit_done under EPD_TL. Cross-node 8x1080p (Qwen3.5-397B-NVFP4, 6L probe LM): per-worker arrival cadence 78ms -> 33ms; E4 cc32 4.95 -> 6.15 req/s, E8 multi-client 10.6 (was ~5 coordination-capped); full-model P2 now scales 1.75x (was flat). OCRBench accuracy unchanged (0.894 vs 0.886-0.906 band). Co-Authored-By: Claude Fable 5 <noreply@anthropic.com> Signed-off-by: chenht2022 <chenht2022@gmail.com>
Two layers of the same per-image pixel-transport overhaul, removing the encode worker's ingest as the EPD multi-image throughput cap: 1. SMG_MM_PIXEL_RDMA v2 (gateway rdma.rs + servicer landing pool): the gateway stages each image's bf16 buffer in a pre-registered host arena and ships only a NIXL descriptor; the servicer one-sided-READs it into a pre-registered landing-slot ring (handshake-once per gateway, no register/deregister per image). Pixels leave the management network entirely (bond0 ~19GB -> 2MB per 128-request 4K run, RoCE instead). NOTE: export silently falls back to inline when SMG_RDMA_LISTEN_IP is unset -- the metadata-exchange listener is mandatory for cross-node. 2. EPD_PIXEL_SHM (servicer): the feature's cross-process representation is decided once at item construction, for both payload arms: a plain CPU tensor by default, or a POSIX-SHM handle so the ZMQ hop to the scheduler pickles ~KB instead of 19-77MB. On the NIXL arm the slot bytes are hashed and published straight off the landing view (single copy); the content hash rides the item so set_pad_value never needs the raw tensor. Cross-node 2x4K (L4-probe LM, E4, 3 clients): inline 6.04 -> NIXL 7.85 -> NIXL+SHM 12.02 req/s; 8x1080p (E8): 10.63 -> 14.03. /dev/shm steady across runs (consume-once unlink); OCRBench accuracy 0.888 (band 0.886-0.906). Co-Authored-By: Claude Fable 5 <noreply@anthropic.com> Signed-off-by: chenht2022 <chenht2022@gmail.com>
A cold concurrency jump (cc108 on 6 encode workers) collapsed the system in ~30s: with EPD_INGEST_OFFLOOP, up to ~32 ingest threads concurrently poll NIXL READs in a no-sleep spin, starving the GIL away from the hash/publish work that returns landing slots. The ring runs dry, the 30s slot wait throws Empty out of the Encode RPC, and the gateway aborts requests whose prefill was already dispatched; the resulting mass-abort storm tears down prefill receive MRs mid-write (mlx5 'local access violation' floods) and the deployment never fully recovers. - READ poll now yields (spin 64 then 0.5ms sleeps) and carries a deadline (SMG_RDMA_READ_TIMEOUT_S, default 60s) that releases the xfer handle. - Slot lease waits in 5s ticks up to SMG_RDMA_LANDING_WAIT_S (default 120s), logging ring pressure each tick: visible backpressure instead of an RPC failure that cascades into aborts. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
…ocked-loop ping bursts Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
…erialize, free-mode matrix) Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
…atch, ~25MB/image off the fault path) Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
…ves with cross-thread recycle The patch-buffer estimate divided by merge_size^2 but patchify pushes unmerged patches: a 4x undersize forcing two ~100-200MB realloc+copy cycles per request, the heaviest mmap_lock writers on the data plane. Size it exactly via smart_resize, take it from the scratch pool, and recycle pixel_values storage from the gateway after serialization (thread-local pools gain a global overflow tier so blocking-pool takes meet async-worker gives). Data-plane microbench (96 threads, 8x1080p): 110 -> 192 img/s, effective parallelism 32 -> 60 cores. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
…g + recycle) Same fix as the Qwen path: pre-size the batched patch buffer exactly, take it and the per-image CHW tensor from the thread-local scratch pool, patchify directly into the batch buffer (drops the per-image intermediate Vec), and recycle the CHW storage. Data-plane microbench (96 threads, 8x1080p, DP_MODEL_ID=kimi-k2): 188.8 -> 285.6 img/s (+51%). Microbench gains a DP_MODEL_ID selector to exercise per-model processors. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
…ing-strike/landing-ring band-aids - Rename grid_thws -> image_grid_thw in the Kimi-K2.5 registry + vision processor (matches the engine's primary key) and tolerate the legacy key in the encode servicer for rolling deploys. - Revert 446b386 (disable gRPC ping strikes, mlx/sglang/tokenspeed servers) and cd5d465 (NIXL landing-ring patient backpressure + READ-poll yield): both papered over the engine-side encode-loop block, which is now fixed properly in the engine (non-blocking ring lease, lightseekorg/tokenspeed hongtaoc/epd-encode). Caveat: the cd5d465 NIXL READ-poll yield only mattered on the RDMA pixel path (SMG_MM_PIXEL_RDMA=1), which was NOT exercised in the TP4 validation (that ran with SMG_MM_PIXEL_RDMA=0). Re-instate it if running RDMA pixel transport under burst. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…essure) cd5d465 is a Qwen light-pipeline pixel-lane fix (06-11: NIXL pixel-pull landing ring under high concurrency / many encoders, EPD_INGEST_OFFLOOP), NOT a Kimi-cliff band-aid. The previous commit (3788560) wrongly reverted it together with the ping-strike band-aid; the engine-side encode-ring cliff fix does not cover the gateway landing ring, so reverting cd5d465 regresses the RDMA pixel path (SMG_MM_PIXEL_RDMA=1) back to the no-sleep-spin GIL starvation. Restore it. Kept: grid_thws->image_grid_thw rename + servicer fallback. 446b386 (ping strikes) stays reverted (the cliff fix makes it moot). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
… a flat 30s that cross-wired pixels under burst) The gateway arena reclaimed a leased slot after a flat SLOT_TTL=30s, but the encode worker may legitimately hold a shipped descriptor far longer: it waits up to SMG_RDMA_LANDING_WAIT_S (120s, re-applied in 2690f29) for a landing slot, then READs for up to SMG_RDMA_READ_TIMEOUT_S (60s). Once ingest latency exceeds 30s (exactly the burst-backpressure case), the reaper returns the slot, the next image re-leases the SAME address, and the worker's late one-sided READ silently returns the WRONG image's pixels. The size assert (nbytes == shape*itemsize) passes for same-resolution images and the descriptor carries no generation, so it is undetectable: a silent wrong-image answer, only during burst windows. Derive slot_ttl() = worker_max_hold() + slack from the SAME env knobs the encode servicer reads (same names, same defaults), so the two sides cannot drift; default 120+60+30 = 210s. SMG_RDMA_SLOT_TTL_S overrides explicitly. The wider TTL only lets a genuinely lost notif leak one slot for longer (a capacity nit) and never recycles a slot under a live READ (the correctness bug). Extract the slot bookkeeping (free-list, occupied map, lease/free/reap) into a NIXL-free SlotPool so the lease/reclaim/reuse policy is unit-testable without a registered agent or RDMA hardware. Five deterministic tests cover the cross-wire repro, the TTL > hold invariant (fails under SMG_RDMA_SLOT_TTL_S=30), slot retention through the hold window, multi-slot recycle, and oversize rejection. Validated on b200-80: unit tests green (red under forced 30s); RDMA pixel lane e2e (both sides on, Qwen3.5-35B EPD) logs `pixel arena registered ... ttl_s=210`, staged 55 slots with safe reuse, OCRBench acc 0.950, no embedding timeouts. Not done (defense-in-depth): an 8-byte per-slot generation in the descriptor that the worker verifies after READ would make any future TTL misconfiguration fail loud instead of silent. The derived TTL makes the default config safe; the generation is the timing-independent root fix and needs an encoder_servicer change. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…path) The gateway mints bootstrap_room as a random i32 in [0, 2^31) per image (E->P leg) and per request (P->D leg) with NO in-flight dedup. The collision domain is per rendezvous endpoint (a prefill worker's embedding receiver, a decode worker's KV receiver): with tens-to-hundreds of img/s system-wide, room lifetimes of seconds-to-minutes under queueing, and thousands of rooms live concurrently per worker, the birthday bound puts an expected collision within a few days of sustained load. A collision silently cross-wires two requests' embedding/KV rendezvous (one image's embedding written onto another's room) or leaks an RDMA arena slot -- a silent correctness fault, not a crash. Multiple independent gateways (each minting in the same 2^31 space) only worsen it, and that is exactly the double-gateway scale-out direction. Widen the TokenSpeed rendezvous id to int64 end to end: the random mint draws from [0, 2^63), so the birthday collision rate is negligible even with many gateways and thousands of live rooms. The wire path was already width-agnostic (every leg encodes the room as decimal ASCII; the engine and servicer hold it as a Python int; the C++ scheduler never sees it), so this is a mechanical type change with no logic change: - proto (int32 -> int64): EncodeItemHandshake.bootstrap_room and DisaggregatedParams.bootstrap_room (tokenspeed_scheduler.proto), EncodeItemAssignment.bootstrap_room (tokenspeed_encoder.proto). Field numbers/names unchanged, so the change is wire-compatible for values that still fit i32; prost (build.rs) and the Python servicer stubs (setup.py, symlinked to the same canonical proto) both regenerate from it. - Rust mints -> i64: the E->P mint (encode.rs) and the TokenSpeed P->D mint (helpers.rs). set_disaggregated's room param -> i64. - The RDMA pixel-arena slot map keyed by the same room (rdma.rs): occupied DashMap, lease_and_write/free_room/reap_stale, and the free-notif tag parse all i32 -> i64. The SGLang PD path (helpers.rs inject_sglang_bootstrap_metadata + sglang_scheduler.proto) is deliberately left as int32 -- out of scope, and widening it would touch the SGLang backend contract. Source-only change: the gateway must be rebuilt (cargo, regenerates the prost stubs) and the Python proto package reinstalled (regenerates the servicer stubs) together, so a new gateway's wide rooms are never truncated by an old int32 stub. Not compiled on this host (no Rust toolchain); cargo build/clippy must run on a build node. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Signed-off-by: chenht2022 <chenht2022@gmail.com>
…org#1602 The 44-commit EPD branch was authored against the flat MultimodalInputs (pixel_values); rebasing onto lightseekorg#1602's itemized MultimodalInputs (repeated MultimodalItem items) left flat->itemized seams the per-commit conflict resolution deferred. This finishes them so the whole gateway + encode worker compile and run against the itemized ABI: - EncodeStage: assemble the itemized payload once (borrowing the intermediate) and fan the items Vec out per encode worker, replacing the flat split_preprocessed_per_item / assemble_tokenspeed_from_split path. The RDMA export now swaps each item's encoder_input (was the flat pixel_values). - assemble_tokenspeed: borrow &PrecomputedMultimodalIntermediate (so EncodeStage reuses it without consuming the intermediate) + make it pub(crate). - Default the TokenSpeed encoder_input wire dtype to bf16 (preserves the ~35% throughput win 12d9a3f added; lightseekorg#1602's mechanism defaulted to f32). - clear_mm_pixel_values: clear each item's encoder_input (was mm.pixel_values). - encoder_servicer: read per-item item.encoder_input / model_specific_tensors. - Remove the dead flat machinery (split_preprocessed_per_item + SplitError + slice helpers + assemble_tokenspeed_from_split + the u16 scratch pool) and repoint the dataplane bench + recycle_pixel_values re-export to the renamed vision::processor module. - WorkerSelection::Triple / WorkerType::Encode match arms; EPD_TL log reads feature_token_counts; dedup the duplicate ndarray dependency. cargo check -p llm-multimodal -p smg --all-targets: clean. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Replace plain round-robin assignment of per-image Encode RPCs with content-hash affinity (P0 of the EPD perf roadmap). Each image is pinned by its content_hash to a stable encode worker so its per-worker embedding cache hits, skipping the vision tower (the dominant GPU cost), instead of the ~1/N hit rate round-robin gives. Uniform hashing also spreads distinct images evenly across the pool on its own. No gateway-side load balancing: the gateway cannot observe a worker's vision-tower backlog (the Encode RPC returns on enqueue and the embedding ships out of band over Mooncake), so a gateway-local counter would be a dispatch-burst proxy that can route against true backlog and shed a hot repeated image off the worker that caches it. Real encode balancing needs an engine-reported signal; until then we route purely by affinity. New encode_selection module holds affinity_slot (6 unit tests). EncodeStage wires it in; SMG_ENCODE_ROUTING_POLICY=round_robin restores the old behavior byte-for-byte as an escape hatch / A-B baseline. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Cache the per-image preprocessing output (resize + normalize + patchify + serialize) in a byte-budgeted host-DRAM LRU keyed by the raw-image blake3 hash, so a repeated image skips the gateway-CPU heavy work. Post-decode lookup; values are Arc-shared, pre-serialized per-image payloads (the heavy bytes are copied only on the leg that actually ships pixels). - New pixel_cache module: PixelCache (parking_lot Mutex + lru, byte budget), PixelCacheKey (image hash + config fingerprint + wire dtype), CachedEncodeItem, config_fingerprint, pixel_cache_from_env. - process_multimodal image path: per-image hit/miss partition, preprocess + serialize only the misses, build items directly from cached payloads. - Soft gate: env SMG_MM_PIXEL_CACHE_MB (default 0 = disabled = zero behavior change); a generic per-backend precondition (ensure_unserialized_encoder, mirroring ensure_image_only) rejects a pre-serialized intermediate on a backend that needs the unserialized tensor. - Golden test: per-image preprocess == batched slices (the per-image independence the cache relies on). CPU benchmark (1080p, release): a hit skips ~89% of the gateway preprocessing CPU; ~10x single-thread / ~20x high-concurrency throughput vs cache-off; a cache miss adds negligible overhead. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…ection (P3) select_pd_pair / select_epd_triple picked both prefill and decode with one get_policy_or_default(model_id) (the main default = Random when no per-model hint), so --prefill-policy / --decode-policy were dead on the disaggregated gRPC path -- only the HTTP PD router read them. Wire both selectors to get_prefill_policy() / get_decode_policy() (the same getters the HTTP PD router uses), pick prefill and decode independently, and tag each worker-selection metric with the policy that picked it. Default behavior unchanged: both getters fall back to the main default policy when their role policy is unset (registry.rs), and startup already sets per-role policies via factory.rs. The regular single-pool selector (line 182) keeps get_policy_or_default -- it has no prefill/decode roles. No Rust toolchain on this host (build is on b80); compile/clippy/cargo test and an A/B (--prefill-policy=power_of_two vs default Random -> confirm prefill routing shifts while decode stays) are deferred to b80 + e2e. Mirrors the HTTP PD router's already-tested getter usage; the registry getters have unit coverage. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Signed-off-by: chenht2022 <chenht2022@gmail.com>
…image residual The TTL widening (slot_ttl derived from worker_max_hold) closed the common cross-wire, but it is a pure TTL bump: nothing detects a slot the gateway recycled and re-leased while a worker's late one-sided READ is still in flight. If the TTL is ever misconfigured (SMG_RDMA_SLOT_TTL_S) or the two sides' env drifts, the reaper can still return a slot under a live READ, the next image overwrites the same address, and the worker reads another image's pixels. The size assert passes for same-resolution images and the descriptor carried no generation, so the failure is silent (wrong-image answer, no error/NaN), only during burst backpressure windows. This makes correctness independent of the TTL value. Frame each leased slot as [gen u64 LE][payload][gen u64 LE] (seqlock: header written first, trailer last) and ship the gen in the descriptor ([slot_addr][gen][port][ip]). gen is a per-pool monotonic AtomicU64 (from 1, so the zero-initialized arena never matches a real descriptor). The worker READs nbytes + 16 and requires header == trailer == descriptor.gen; a recycled slot carries a strictly newer gen and a READ torn by a concurrent reuse sees header != trailer, so both are rejected -> the room fails and the gateway re-attaches the inline payload on retry instead of feeding the ViT stale pixels. TTL is now purely a capacity knob. Only active under SMG_MM_PIXEL_RDMA. rdma.rs: lease_and_write frames the payload + returns gen; export_pixel_buffer appends gen to the descriptor; GEN_BYTES/FRAME_OVERHEAD constants; existing slot tests updated for the 3-tuple + payload offset, plus gen_guard_detects_recycled_slot and gen_framing_brackets_the_payload. encoder_servicer.py: parse gen, READ the framed region, validate both stamps, read the payload at offset +GEN. NOTE: gateway and servicer must deploy together (the descriptor layout changed); an old peer would misparse it, same caveat as the bootstrap_room i64 widening. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The encode worker's one-sided READ from the gateway's pixel arena only needs the gateway's metadata (already fetched). Pushing our md to the gateway makes its listener thread loadRemoteMD ours, which over rc returns NIXL_ERR_NOT_ALLOWED and stalls the reverse QP -- this is the flaky cross-node pixel-pull hang: the one-time handshake holds _rdma_md_lock, so every other image blocks behind it and encode stalls after parse with idle GPUs. Flip the gate from opt-out (SMG_RDMA_NO_SEND_MD) to opt-in: skip the push by default so rc deploys work out of the box. SMG_RDMA_SEND_MD=1 re-enables it for tcp transports that genuinely need the bidirectional exchange. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
… hooks Pre-upstream cleanup; no behavior change: - remove the EPD_TL timeline instrumentation (gateway pipeline.rs + multimodal.rs emitters and the servicer `_tl` threading) to match the engine-side removal; the EPD_INGEST_OFFLOOP feature gate is unrelated and kept - delete the GPU-free dataplane microbench (bin + bench_serialize_pixel_values wrapper + re-export); serialize_array_as_dtype stays (live callers) - drop the unwired cross-thread recycle hook (recycle_pixel_values + its global overflow tier); the same-thread scratch pool (take_f32/give_f32) stays - remove the write-only encoder-servicer stores and fix stale comments (round-robin routing claim, handshake/descriptor docstrings, and the false "scheduler Generate never exercised" note now documents the real warmup SIGUSR1 hazard) Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The gateway->encode NIXL pixel-pull landing ring (SMG_RDMA_LANDING_SLOTS, encoder_servicer.py) was introduced at 16 slots and never raised, while the engine's E->P embedding-send ring (TOKENSPEED_EPD_ENCODE_RING_SLOTS) was set to 64 in the conc8/16 concurrency-cliff fix -- the pixel-pull leg was the one left shallow. Under encode-bound / over-saturated load a 16-slot ring backpressures the puller and stalls the ViT behind it (slots aren't freed in time), capping throughput below the compute limit: 2E4P2D conc144 measured 5.5 req/s at 16 slots (with ring exhaustion) vs 6.25 at 64 (flat, zero exhaustion), +14%. Mirror the engine ring at 64; at 32MiB/slot that is 2GiB pinned/worker. SMG_RDMA_LANDING_SLOTS still overrides. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
The decode leg strips only pixel tensors via clear_mm_pixel_values() (keeping grid_thw for local MRoPE), leaving the full-strip clear_mm_inputs() with no callers. Signed-off-by: chenht2022 <chenht2022@gmail.com>
Scatter request.encode.items onto precomputed_mm.mm_items[item_index].encode_handshake instead of passing a parallel encode_handshake list to GenerateReqInput; item_index indexes mm_items 1:1 since the gateway splits the mm payload one item per image. Tracks the engine moving the handshake onto MultimodalDataItem. Signed-off-by: chenht2022 <chenht2022@gmail.com>
360a8dd to
b089d93
Compare
The lazy import pointed at tokenspeed.runtime.pd.encode_worker; the EncodeRequest type lives under tokenspeed.runtime.disaggregation.embedding.encode_worker. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Summary
Gateway/servicer side of the EPD line, 39 commits:
max_ping_strikes=0(idle/blocked-loop lanes survived gRPC GOAWAY too_many_pings -> 110s system stalls), landing-ring exhaustion survival under admission burstsValidation (122B, 3-node B200)
🤖 Generated with Claude Code