Skip to content

[Feat]: Specdec Streaming: RDMA + Multinode#1611

Merged
h-guo18 merged 12 commits into
mainfrom
haoguo/multinode-streaming
Jun 11, 2026
Merged

[Feat]: Specdec Streaming: RDMA + Multinode#1611
h-guo18 merged 12 commits into
mainfrom
haoguo/multinode-streaming

Conversation

@h-guo18

@h-guo18 h-guo18 commented Jun 2, 2026

Copy link
Copy Markdown
Contributor

What does this PR do?

Type of change: New feature

Multi-node streaming training for speculative decoding (EAGLE3 / DFlash):
a live vllm serve captures the target model's hidden states and moves them
straight to the trainer over NIXL RDMA — no disk round-trip. The streaming
dataset is map-style — each rank fetches only its own DistributedSampler shard
(concurrency from dataloader_num_workers), round-robins across multiple serve
replicas (server_urls), and scales to multi-node DDP. Serve-side tensor
parallelism (TP>1) is supported: hidden states are replicated across TP ranks, so
rank 0 alone owns the pool + transfer.

How

  • RdmaHiddenStatesConnector — out-of-tree vLLM connector (no vLLM source edits):
    one pre-registered pinned NIXL pool per serve, a ring slot per request, and a
    small HTTP sidecar serving transfer metadata. The trainer RDMA-READs the slot
    into a per-worker buffer. RDMA is the only transport (the earlier
    disk/safetensors path is removed).
  • Map-style dataset + multi-node accelerate launch (--machine_rank, optional
    Slurm --segment to keep nodes in one NVLink domain).

Usage

data:
  mode: streaming
  streaming_server_url: "http://node0:8000,http://node1:8000"  # round-robin

Validation (Qwen3-8B, oci-nrt H100)

sandbox CI: https://gitlab-master.nvidia.com/omniml/integration/nmm-sandbox/-/jobs/337489812

1. End-to-end convergence — EAGLE3 & DFlash, 5000 steps. Both algorithms
converge and export a deployable draft; the DFlash drafts also serve under vLLM
speculative decoding (8/8 smoke prompts pass).

algorithm topology (nodes) train loss (step 0 → 5000) vLLM draft acc-len
EAGLE3 2 serve TP=2 + 2 trainer DDP (4) 37.1 → 8.20
DFlash 1 serve TP=1 + 1 trainer (2) 11.7 → 5.56 1.11
DFlash 2 serve TP=2 + 2 trainer DDP (4) 10.9 → 5.26 1.19

2. Scalability — 1 → 12 nodes (EAGLE3, 200 steps). Throughput scales ~23× across the
sweep below. The step-time growth is cross-node DDP all-reduce, not the streaming path —
RDMA (~0.33 ms/req @ 2 MB, ~47 GB/s host-pinned READ) is never the bottleneck. Scale
serve + trainer nodes together for near-linear speedup.

serve / trainer nodes step time samples / step samples / sec (global) acc @ step 200
1 serve / 1 rank (co-located, 1 node 2 GPU) 1 0.23 s 1 4.4 [0.141, 0.094, 0.072]
1 serve / 1 rank (cross-node) 2 0.23 s 1 4.3 [0.137, 0.105, 0.074]
2 serve / 8 ranks 3 0.26 s 8 31.1 [0.215, 0.126, 0.097]
4 serve / 16 ranks (2 trainer nodes) 6 0.28 s 16 56.5 [0.217, 0.148, 0.110]
8 serve / 32 ranks (4 trainer nodes) 12 0.31 s 32 101.9 [0.235, 0.165, 0.137]

3. Serve-side TP correctness. TP=1 vs TP=2 draft top-1 accuracy track
step-for-step (hidden states are replicated across TP ranks).

serve-tp-acc

Before your PR is "Ready for review"

  • Backward compatible?: ❌ — streaming is now RDMA-only; server_urlserver_urls;
    the disk transport (HS_TRANSPORT, streaming_shared_storage_path) is removed.
  • New tests?: ✅ tests/unit/torch/speculative/plugins/test_hf_streaming_dataset.py
    (map-style dataset + mocked RDMA fetch).
  • Updated Changelog?: ❌
  • Claude approval?: ❌

@copy-pr-bot

copy-pr-bot Bot commented Jun 2, 2026

Copy link
Copy Markdown

Auto-sync is disabled for draft pull requests in this repository. Workflows must be run manually.

Contributors can view more details about this message here.

@coderabbitai

coderabbitai Bot commented Jun 2, 2026

Copy link
Copy Markdown
Contributor

Review Change Stack

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Converts streaming Dataset to synchronous map-style getitem, replaces server_url with server_urls and adds safetensors retry, removes async resume/prefetch/seed plumbing, rewrites multi-node serve/trainer orchestration and launcher CLI/wiring, and updates tests and launcher YAMLs.

Changes

Streaming Dataset and Multi-Node Training

Layer / File(s) Summary
Config exports and DFlash mode derivation
modelopt/recipe/config.py, modelopt/torch/speculative/config.py, modelopt/torch/speculative/plugins/hf_training_args.py
Add explicit __all__ exports; derive dflash_offline from data.mode != "online"; remove DataArguments.streaming_prefetch from training args schema.
Streaming Dataset Map-Style Refactor
modelopt/torch/speculative/plugins/hf_streaming_dataset.py
Convert StreamingDataset to map-style (__len__/__getitem__), make _fetch synchronous, remove prefetch/seed, validate fetch_payload_cls keys, use per-process httpx.Client, round-robin across server_urls, add safetensors read-retry, and remove StreamingResumeCallback.
Unit and E2E Tests for Map-Style Streaming
tests/unit/torch/speculative/plugins/test_hf_streaming_dataset.py, tests/examples/speculative_decoding/test_eagle_streaming.py
Refactor tests to exercise __getitem__ contract, add coverage for empty corpus/len/resample/circuit-breaker/fetch exhaustion, server_urls normalization/round-robin, safetensors retry, and update example test overrides to remove data.streaming_prefetch.
Training Entrypoints and CLI Wiring
examples/speculative_decoding/main.py, examples/speculative_decoding/launch_train.sh, examples/speculative_decoding/eagle_utils.py, examples/specdec_bench/specdec_bench/utils.py
Stop passing seed into data module and remove StreamingResumeCallback registration; switch streaming config to server_urls; add --machine_rank CLI parsing and forward into accelerate launch argv array; make _is_sensitive_key safe for non-string keys.
Serve/Trainer Multi-Node Orchestration
tools/launcher/common/eagle3/train_eagle_streaming.sh
Introduce SERVE_NODES-driven serve/trainer split, publish per-node serve address files, trainers gather serve URLs and poll readiness, add resolve_routable_ip(), gate export to head trainer, and use STREAMING_NUM_WORKERS for DataLoader concurrency.
Launcher Core & Slurm Segment
tools/launcher/core.py, tools/launcher/slurm_config.py
Add explicit __all__ exports, add optional SlurmConfig.segment and slurm_factory param, and forward segment into run.SlurmExecutor via optional kwargs when provided.
Launcher Examples and Bench Configs
tools/launcher/examples/...
Add/update multiple launcher YAMLs including Qwen/Kimi multi-node streaming and specdec benchmark configs; remove data.streaming_prefetch and add STREAMING_NUM_WORKERS/segment tuning fields where applicable.

Sequence Diagram(s)

sequenceDiagram
  participant DistributedSampler
  participant StreamingDataset
  participant EagleVllmStreamingDataset
  participant vLLMEndpoint
  DistributedSampler->>StreamingDataset: provide sharded index -> __getitem__(i)
  StreamingDataset->>EagleVllmStreamingDataset: validate/tokenize sample, call _fetch(sample)
  EagleVllmStreamingDataset->>vLLMEndpoint: synchronous HTTP POST prompt
  vLLMEndpoint-->>EagleVllmStreamingDataset: return hidden-state safetensor path
  EagleVllmStreamingDataset->>StreamingDataset: return validated batch payload
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

  • NVIDIA/Model-Optimizer#1509: Related streaming hidden-states dataset work informing removal of StreamingResumeCallback and changes to Eagle streaming config.

Suggested reviewers

  • kevalmorabia97
  • ChenhanYu
  • shengliangxu
  • yeyu-nvidia
🚥 Pre-merge checks | ✅ 5 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 51.67% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (5 passed)
Check name Status Explanation
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.
Security Anti-Patterns ✅ Passed No unsafe torch.load, numpy.load, eval/exec, or hardcoded trust_remote_code found. trust_remote_code defaults to False and is user-configurable via recipe configuration.
Title check ✅ Passed The title '[Feat]: Specdec Streaming: RDMA + Multinode' is partially related to the changeset. It highlights multi-node and RDMA aspects, but the PR summary emphasizes the map-style dataset refactor and DataLoader-driven concurrency as primary changes.
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch haoguo/multinode-streaming

Comment @coderabbitai help to get the list of available commands and usage tips.

@h-guo18 h-guo18 force-pushed the haoguo/multinode-streaming branch from aae3d2d to 11571c0 Compare June 2, 2026 22:46
@github-actions

github-actions Bot commented Jun 2, 2026

Copy link
Copy Markdown
Contributor
PR Preview Action v1.8.1
Preview removed because the pull request was closed.
2026-06-11 01:35 UTC

@codecov

codecov Bot commented Jun 2, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 43.23671% with 235 lines in your changes missing coverage. Please review.
✅ Project coverage is 76.52%. Comparing base (06af68c) to head (24abee0).
⚠️ Report is 2 commits behind head on main.

Files with missing lines Patch % Lines
...peculative/plugins/rdma_hidden_states_connector.py 0.00% 214 Missing ⚠️
.../torch/speculative/plugins/hf_streaming_dataset.py 85.71% 19 Missing ⚠️
modelopt/torch/speculative/eagle/utils.py 71.42% 2 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #1611      +/-   ##
==========================================
- Coverage   77.36%   76.52%   -0.85%     
==========================================
  Files         509      511       +2     
  Lines       55927    56169     +242     
==========================================
- Hits        43269    42981     -288     
- Misses      12658    13188     +530     
Flag Coverage Δ
examples 41.83% <6.03%> (-0.65%) ⬇️
gpu 57.70% <5.79%> (-0.82%) ⬇️
regression 14.67% <8.69%> (+0.03%) ⬆️
unit 54.34% <42.51%> (-0.19%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@h-guo18 h-guo18 self-assigned this Jun 3, 2026
@h-guo18 h-guo18 force-pushed the haoguo/multinode-streaming branch from 9a73971 to 0265624 Compare June 3, 2026 18:28
@h-guo18 h-guo18 marked this pull request as ready for review June 3, 2026 18:28
@h-guo18 h-guo18 requested a review from a team as a code owner June 3, 2026 18:28
@h-guo18 h-guo18 requested review from ChenhanYu and yeyu-nvidia June 3, 2026 18:28

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Warning

CodeRabbit couldn't request changes on this pull request because it doesn't have sufficient GitHub permissions.

Please grant CodeRabbit Pull requests: Read and write permission and re-run the review.

👉 Steps to fix this

Actionable comments posted: 8

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
modelopt/torch/speculative/plugins/hf_training_args.py (1)

30-35: 🛠️ Refactor suggestion | 🟠 Major | ⚡ Quick win

Add __all__ to keep the schema API explicit.

This module exports public Pydantic models, so leaving the symbol list implicit works against the repo's Python API convention. Please declare __all__ near the top.

As per coding guidelines, "**/*.py: Define the public API with __all__ at the top of each Python module ... to keep the public API explicit and make star-imports safe".

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@modelopt/torch/speculative/plugins/hf_training_args.py` around lines 30 - 35,
Add an explicit __all__ declaration near the top of hf_training_args.py
(immediately after the imports) that lists the module's public API: include the
names of the Pydantic models and any helper functions or constants you intend to
export (i.e., the public class names that subclass BaseModel and any functions
that should be visible to consumers), so star-imports are safe and the module
follows the repo convention.
examples/speculative_decoding/launch_train.sh (1)

80-92: ⚠️ Potential issue | 🔴 Critical | ⚡ Quick win

Stop using sh -c to run the assembled accelerate launch command.

sh -c "… ${EXTRA_ARGS[*]}" re-parses CLI overrides with shell syntax, so argument boundaries break (e.g. training.output_dir=/tmp/has space becomes two separate args) and command substitutions inside overrides execute (e.g. note=$(...) becomes note=...). This same interpolation also embeds unquoted multi-node variables like $MACHINE_RANK / $HEAD_NODE_IP into the sh -c string.

Build an argv array and invoke it directly (no sh -c):

Suggested fix
- set -x
- start_time=$(date +%s)
- sh -c "accelerate launch --mixed_precision bf16 $MULTI_NODE_ARGS ${SCRIPT_DIR}/main.py --config $CONFIG_FILE ${EXTRA_ARGS[*]}"
+ CMD=(accelerate launch --mixed_precision bf16)
+ if [[ "$NUM_NODES" != "1" ]]; then
+   CMD+=(
+     --multi_gpu
+     --num_processes "$TOTAL_GPU"
+     --num_machines "$NUM_NODES"
+     --machine_rank "${MACHINE_RANK:-$SLURM_PROCID}"
+     --main_process_ip "$HEAD_NODE_IP"
+     --main_process_port 29500
+   )
+ fi
+ CMD+=("${SCRIPT_DIR}/main.py" --config "$CONFIG_FILE" "${EXTRA_ARGS[@]}")
+ set -x
+ start_time=$(date +%s)
+ "${CMD[@]}"
  echo "Total time: $(( $(date +%s) - $start_time )) seconds"
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@examples/speculative_decoding/launch_train.sh` around lines 80 - 92, The
script currently uses sh -c "accelerate launch ... $MULTI_NODE_ARGS
${SCRIPT_DIR}/main.py --config $CONFIG_FILE ${EXTRA_ARGS[*]}", which causes
word-splitting and unintended shell interpolation (affecting MULTI_NODE_ARGS,
MACHINE_RANK, HEAD_NODE_IP and EXTRA_ARGS). Fix by constructing an argv array
for the command (e.g., base args: "accelerate" "launch" "--mixed_precision"
"bf16" plus expanded MULTI_NODE_ARGS tokens, "${SCRIPT_DIR}/main.py", "--config"
"$CONFIG_FILE" and each element of EXTRA_ARGS) and then invoke it directly (exec
or run the array) instead of using sh -c so argument boundaries and literal
values are preserved; ensure you expand EXTRA_ARGS as separate array elements
rather than via ${EXTRA_ARGS[*]} or unquoted expansions.
tools/launcher/common/eagle3/train_eagle_streaming.sh (1)

299-337: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Reject SERVE_NODES >= SLURM_NNODES up front.

In the new dispatch, that configuration turns every node into a serve node, so no trainer ever publishes the rendezvous address or creates DONE_FILE. The job just waits forever.

Suggested fix
 NNODES="${SLURM_NNODES:-1}"
 NODEID="${SLURM_NODEID:-0}"
+
+if [ "$NNODES" -gt 1 ] && [ "$SERVE_NODES" -ge "$NNODES" ]; then
+    echo "ERROR: SERVE_NODES must be smaller than SLURM_NNODES." >&2
+    exit 1
+fi
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tools/launcher/common/eagle3/train_eagle_streaming.sh` around lines 299 -
337, Add an explicit sanity check that fails fast when SERVE_NODES is >= NNODES
(SLURM_NNODES) so we don't turn every node into a serve replica and deadlock
waiting for a trainer; locate the topology dispatch logic that uses NNODES,
NODEID and SERVE_NODES and before branching (or at start of that block) validate
that if SERVE_NODES is set it is strictly less than NNODES, otherwise print an
error and exit (include variables SERVE_NODES and NNODES in the message) so the
job is rejected up front instead of hanging waiting for DONE_FILE or a trainer
rendezvous.
🧹 Nitpick comments (5)
tests/unit/torch/speculative/plugins/test_hf_streaming_dataset.py (1)

128-143: ⚡ Quick win

Add one behavioral test for round-robin dispatch across multiple server_urls.

This refactor introduces multi-server fan-out as core behavior, but current coverage only checks URL normalization and a single-endpoint fetch path. A two-endpoint mock that asserts alternating destinations would catch regressions that silently pin all traffic to the first replica.

Also applies to: 176-214

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/unit/torch/speculative/plugins/test_hf_streaming_dataset.py` around
lines 128 - 143, Add a new unit test alongside test_server_urls_normalization
that verifies round-robin dispatch across multiple server_urls: construct an
EagleVllmStreamingConfig with server_urls set to two distinct endpoints, patch
or mock the network/send/dispatch call used by the streaming plugin (replace the
HTTP client or the function that actually sends requests) to record the target
URL for each invocation, then invoke the dataset/request dispatching method
twice (or more) and assert that recorded destinations alternate between the two
endpoints (e.g., calls[0] == endpoint_a, calls[1] == endpoint_b, calls[2] ==
endpoint_a). Ensure the test uses pytest/monkeypatch to avoid real network I/O
and name the test to reflect round_robin behavior so regressions that pin
traffic to a single replica are caught.
tools/launcher/examples/moonshotai/Kimi-K2.5/hf_dflash_dryrun.yaml (1)

50-56: ⚡ Quick win

Set data.mode=offline explicitly in the dry-run config.

This smoke test currently depends on offline_data_path side effects to flip the recipe into offline mode. Since the DFlash path now keys off data.mode, making that override explicit will keep this config aligned with the new contract and avoid brittle coupling to mode inference.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tools/launcher/examples/moonshotai/Kimi-K2.5/hf_dflash_dryrun.yaml` around
lines 50 - 56, Add an explicit data.mode=offline entry to the dry-run config so
the recipe uses offline mode deterministically; update the HF dry-run YAML that
currently lists --dry_run, --config ..., model.* and data.offline_data_path to
also include data.mode=offline (rather than relying on the presence of
data.offline_data_path to infer mode).
tools/launcher/slurm_config.py (1)

16-26: ⚡ Quick win

Add __all__ to make the factory/config API explicit.

This module now exposes the launcher-facing SlurmConfig/slurm_factory pair, but the public surface is still implicit.

As per coding guidelines, **/*.py: Define the public API with __all__ at the top of each Python module and re-export submodules in __init__.py files using from .module import * to keep the public API explicit and make star-imports safe.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tools/launcher/slurm_config.py` around lines 16 - 26, This module should
explicitly declare its public API: add an __all__ list near the top that exports
the launcher-facing symbols, e.g. include "SlurmConfig" and "slurm_factory"
(matching the actual class/function names in this file) so star-imports are
safe; also update any package __init__.py to re-export with from .slurm_config
import * if you want the same public surface at package level.
tools/launcher/examples/moonshotai/Kimi-K2.5/specdec_bench.yaml (1)

60-61: ⚡ Quick win

Replace the draft checkpoint placeholder with a real config input.

The checked-in example still requires hand-editing --draft_model_dir, so it is easy to run the file exactly as documented and fail immediately. Promoting this to global_vars or defaulting to the standard export path would make the example self-contained.

If you want, I can wire this through global_vars so the example remains override-friendly without the inline TODO.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tools/launcher/examples/moonshotai/Kimi-K2.5/specdec_bench.yaml` around lines
60 - 61, The example hardcodes a placeholder for the DFLASH draft checkpoint
(--draft_model_dir) which forces manual edits; update specdec_bench.yaml so the
draft checkpoint is provided from a reusable config variable instead: add a
global_vars entry (e.g., draft_model_dir) with a sensible default path (the
standard HF export location) and replace the inline "- --draft_model_dir
/hf-local/nvidia/Kimi-K2.5-DFlash" with a reference to that global var, or
alternatively set the draft path to the project’s standard export path by
default so the example runs out-of-the-box; ensure the variable name matches any
existing templating scheme used in this file.
tools/launcher/core.py (1)

16-30: ⚡ Quick win

Add __all__ for the launcher module's public surface.

This module exports the launcher entry points and dataclasses, but it still leaves the public API implicit.

As per coding guidelines, **/*.py: Define the public API with __all__ at the top of each Python module and re-export submodules in __init__.py files using from .module import * to keep the public API explicit and make star-imports safe.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tools/launcher/core.py` around lines 16 - 30, Add an explicit __all__ list at
the top of this module (immediately after the module docstring) that enumerates
the public API symbols exported by tools.launcher.core — specifically the
launcher entry points, dataclasses and any executor-builder and job-run-loop
function/class names defined in this file; be careful not to accidentally export
imported modules like the dataclasses module, nemo_run (imported as run), or
yaml unless they are intentionally part of the public surface. Also update the
package __init__.py to re-export the core module via from .core import * if you
want core's public names to be available at package level.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@examples/speculative_decoding/main.py`:
- Around line 280-286: The code unconditionally sets
training_args.ignore_data_skip = True when recipe.data.mode == "streaming";
change this to only enable ignore_data_skip when the run is effectively
single-epoch or when the user explicitly opts into restart-on-resume: check
training_args.num_train_epochs <= 1 before setting
training_args.ignore_data_skip, and otherwise either leave it False and emit a
warning or require an explicit flag (e.g., resume_without_fast_forward) to opt
in; also consider detecting a resume scenario (checkpoint present in output_dir)
to apply the gating only on resume. Ensure references: recipe.data.mode,
training_args.ignore_data_skip, training_args.num_train_epochs, and
output_dir/checkpoint are used to implement the guard and warning.

In `@modelopt/recipe/config.py`:
- Around line 179-185: Add an explicit __all__ at the top of the module that
lists the module's public recipe types and loader mapping so star-imports are
safe; include the exported class names such as ModelOptDFlashRecipe and
ModelOptEagleRecipe plus the recipe loader mapping name (e.g., RECIPE_LOADERS)
and any public helper functions used to load or validate recipes, then remove
reliance on implicit globals—place the __all__ declaration near the module
imports/header so it clearly documents the public API.

In `@modelopt/torch/speculative/config.py`:
- Around line 68-74: Add an explicit __all__ declaration at the top of this
public config module listing the exported names (e.g. "dflash_offline" and any
other public symbols such as ModeloptField, ModelOptDFlashRecipe or other config
types/constants defined in this file) so the module no longer relies on implicit
exports; place __all__ = ["dflash_offline", ...] near the top of the file and
ensure it includes every identifier meant to be part of the public API so
star-imports remain safe and explicit.

In `@modelopt/torch/speculative/plugins/hf_streaming_dataset.py`:
- Around line 202-214: The current broad except in the loop around
self._fetch(sample) swallows deterministic bugs; narrow it so only
transient/IO/transport errors are treated as fetch misses: catch specific
transport-related exceptions (e.g., network/IO exceptions used by your stack)
and handle them by logging via warn_rank_0 and incrementing
self._consecutive_fail as before, but immediately re-raise critical exceptions
such as RuntimeError and ValueError from _fetch to avoid masking contract
violations; ensure the RuntimeError/ValueError paths bypass the
resample/circuit-breaker logic that references self._consecutive_fail and
config.fail_after_consecutive_skips so real misconfigurations surface.
- Around line 48-65: Add an explicit module public API by defining __all__ near
the top (after imports) that lists the public symbols exported from this file —
include the dataset/class names defined in this module (e.g., the main dataset
class such as HFStreamingDataset or any other public classes/functions you
defined below) and constants like IGNORE_TOKEN_ID; place a single __all__ =
["HFStreamingDataset", "OtherPublicClass", "some_public_function",
"IGNORE_TOKEN_ID"] (adjust names to match the actual symbols in this file) so
star-imports and re-exports are stable.

In `@tools/launcher/common/eagle3/train_eagle_streaming.sh`:
- Around line 145-151: The SERVE_ADDR_FILE and DONE_FILE are global and must be
namespaced per run; modify the assignment of SERVE_ADDR_FILE and DONE_FILE so
they include a stable per-job identifier (e.g., JOB_ID, SLURM_JOB_ID, or
fallback to $$ or a timestamp) and use that same identifier everywhere the
script references SERVE_ADDR_FILE and DONE_FILE (so functions that publish/read
addresses and the head trainer check the same namespaced paths); ensure the
chosen identifier is exported or passed to serve/trainer subprocesses so
concurrent launcher runs write/read distinct files.

In `@tools/launcher/core.py`:
- Around line 289-291: build_slurm_executor currently assumes slurm_config has a
.segment attribute which can raise AttributeError for older/custom config types
patched via set_slurm_config_type; change the access to be defensive (use
getattr(slurm_config, "segment", None)) and only pass the segment option when it
is not None (or otherwise omit it) so older Pydantic-based configs and
ModeloptBaseConfig remain compatible; update any use sites in
build_slurm_executor that reference slurm_config.segment to follow this pattern.

In `@tools/launcher/examples/moonshotai/Kimi-K2.5/specdec_bench.yaml`:
- Around line 76-81: The slurm_config in the specdec_bench.yaml currently sets
container to "vllm/vllm-openai:latest", which may not support DFLASH on
GB200/aarch64; update the container value to a GB200-compatible image tag (pin
to a specific image known to include DFLASH/GB200 support) instead of the
floating "latest" tag so the documented HSG path works out of the box — change
the slurm_config.container entry to that pinned GB200-compatible image.

---

Outside diff comments:
In `@examples/speculative_decoding/launch_train.sh`:
- Around line 80-92: The script currently uses sh -c "accelerate launch ...
$MULTI_NODE_ARGS ${SCRIPT_DIR}/main.py --config $CONFIG_FILE ${EXTRA_ARGS[*]}",
which causes word-splitting and unintended shell interpolation (affecting
MULTI_NODE_ARGS, MACHINE_RANK, HEAD_NODE_IP and EXTRA_ARGS). Fix by constructing
an argv array for the command (e.g., base args: "accelerate" "launch"
"--mixed_precision" "bf16" plus expanded MULTI_NODE_ARGS tokens,
"${SCRIPT_DIR}/main.py", "--config" "$CONFIG_FILE" and each element of
EXTRA_ARGS) and then invoke it directly (exec or run the array) instead of using
sh -c so argument boundaries and literal values are preserved; ensure you expand
EXTRA_ARGS as separate array elements rather than via ${EXTRA_ARGS[*]} or
unquoted expansions.

In `@modelopt/torch/speculative/plugins/hf_training_args.py`:
- Around line 30-35: Add an explicit __all__ declaration near the top of
hf_training_args.py (immediately after the imports) that lists the module's
public API: include the names of the Pydantic models and any helper functions or
constants you intend to export (i.e., the public class names that subclass
BaseModel and any functions that should be visible to consumers), so
star-imports are safe and the module follows the repo convention.

In `@tools/launcher/common/eagle3/train_eagle_streaming.sh`:
- Around line 299-337: Add an explicit sanity check that fails fast when
SERVE_NODES is >= NNODES (SLURM_NNODES) so we don't turn every node into a serve
replica and deadlock waiting for a trainer; locate the topology dispatch logic
that uses NNODES, NODEID and SERVE_NODES and before branching (or at start of
that block) validate that if SERVE_NODES is set it is strictly less than NNODES,
otherwise print an error and exit (include variables SERVE_NODES and NNODES in
the message) so the job is rejected up front instead of hanging waiting for
DONE_FILE or a trainer rendezvous.

---

Nitpick comments:
In `@tests/unit/torch/speculative/plugins/test_hf_streaming_dataset.py`:
- Around line 128-143: Add a new unit test alongside
test_server_urls_normalization that verifies round-robin dispatch across
multiple server_urls: construct an EagleVllmStreamingConfig with server_urls set
to two distinct endpoints, patch or mock the network/send/dispatch call used by
the streaming plugin (replace the HTTP client or the function that actually
sends requests) to record the target URL for each invocation, then invoke the
dataset/request dispatching method twice (or more) and assert that recorded
destinations alternate between the two endpoints (e.g., calls[0] == endpoint_a,
calls[1] == endpoint_b, calls[2] == endpoint_a). Ensure the test uses
pytest/monkeypatch to avoid real network I/O and name the test to reflect
round_robin behavior so regressions that pin traffic to a single replica are
caught.

In `@tools/launcher/core.py`:
- Around line 16-30: Add an explicit __all__ list at the top of this module
(immediately after the module docstring) that enumerates the public API symbols
exported by tools.launcher.core — specifically the launcher entry points,
dataclasses and any executor-builder and job-run-loop function/class names
defined in this file; be careful not to accidentally export imported modules
like the dataclasses module, nemo_run (imported as run), or yaml unless they are
intentionally part of the public surface. Also update the package __init__.py to
re-export the core module via from .core import * if you want core's public
names to be available at package level.

In `@tools/launcher/examples/moonshotai/Kimi-K2.5/hf_dflash_dryrun.yaml`:
- Around line 50-56: Add an explicit data.mode=offline entry to the dry-run
config so the recipe uses offline mode deterministically; update the HF dry-run
YAML that currently lists --dry_run, --config ..., model.* and
data.offline_data_path to also include data.mode=offline (rather than relying on
the presence of data.offline_data_path to infer mode).

In `@tools/launcher/examples/moonshotai/Kimi-K2.5/specdec_bench.yaml`:
- Around line 60-61: The example hardcodes a placeholder for the DFLASH draft
checkpoint (--draft_model_dir) which forces manual edits; update
specdec_bench.yaml so the draft checkpoint is provided from a reusable config
variable instead: add a global_vars entry (e.g., draft_model_dir) with a
sensible default path (the standard HF export location) and replace the inline
"- --draft_model_dir /hf-local/nvidia/Kimi-K2.5-DFlash" with a reference to that
global var, or alternatively set the draft path to the project’s standard export
path by default so the example runs out-of-the-box; ensure the variable name
matches any existing templating scheme used in this file.

In `@tools/launcher/slurm_config.py`:
- Around line 16-26: This module should explicitly declare its public API: add
an __all__ list near the top that exports the launcher-facing symbols, e.g.
include "SlurmConfig" and "slurm_factory" (matching the actual class/function
names in this file) so star-imports are safe; also update any package
__init__.py to re-export with from .slurm_config import * if you want the same
public surface at package level.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Enterprise

Run ID: 824b234f-9634-4bc3-9fea-29506a691a90

📥 Commits

Reviewing files that changed from the base of the PR and between 88fd7ff and 0265624.

📒 Files selected for processing (20)
  • examples/specdec_bench/specdec_bench/utils.py
  • examples/speculative_decoding/eagle_utils.py
  • examples/speculative_decoding/launch_train.sh
  • examples/speculative_decoding/main.py
  • modelopt/recipe/config.py
  • modelopt/torch/speculative/config.py
  • modelopt/torch/speculative/plugins/hf_streaming_dataset.py
  • modelopt/torch/speculative/plugins/hf_training_args.py
  • tests/examples/speculative_decoding/test_eagle_streaming.py
  • tests/unit/torch/speculative/plugins/test_hf_streaming_dataset.py
  • tools/launcher/common/eagle3/train_eagle_streaming.sh
  • tools/launcher/core.py
  • tools/launcher/examples/Qwen/Qwen3-8B/hf_streaming_eagle3_multi_node.yaml
  • tools/launcher/examples/moonshotai/Kimi-K2.5/hf_dflash_dryrun.yaml
  • tools/launcher/examples/moonshotai/Kimi-K2.5/hf_streaming_dflash.yaml
  • tools/launcher/examples/moonshotai/Kimi-K2.5/hf_streaming_dflash_multi_node.yaml
  • tools/launcher/examples/moonshotai/Kimi-K2.5/hf_streaming_eagle3.yaml
  • tools/launcher/examples/moonshotai/Kimi-K2.5/hf_streaming_eagle3_multi_node.yaml
  • tools/launcher/examples/moonshotai/Kimi-K2.5/specdec_bench.yaml
  • tools/launcher/slurm_config.py

Comment thread examples/speculative_decoding/main.py Outdated
Comment thread modelopt/recipe/config.py
Comment thread modelopt/torch/speculative/config.py
Comment thread modelopt/torch/speculative/plugins/hf_streaming_dataset.py
Comment thread modelopt/torch/speculative/plugins/hf_streaming_dataset.py
Comment thread tools/launcher/common/eagle3/train_eagle_streaming.sh Outdated
Comment thread tools/launcher/core.py Outdated
Comment thread tools/launcher/examples/moonshotai/Kimi-K2.5/specdec_bench.yaml Outdated
@h-guo18

h-guo18 commented Jun 3, 2026

Copy link
Copy Markdown
Contributor Author

/claude review

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
modelopt/torch/speculative/plugins/hf_training_args.py (1)

52-69: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Restore streaming_prefetch as a deprecated no-op for config compatibility.

Line 52 uses extra="forbid", and DataArguments no longer accepts streaming_prefetch. Any existing recipe YAML that still sets data.streaming_prefetch will now fail validation instead of being tolerated as a no-op.

Proposed patch
-from pydantic import BaseModel, ConfigDict, field_validator, model_validator
+from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator
@@
 class DataArguments(BaseModel):
@@
     streaming_model_name: str | None = None
     # Mirror of the vLLM connector's ``shared_storage_path``; trainer-side allowlist.
     streaming_shared_storage_path: str | None = None
+    streaming_prefetch: int = Field(
+        default=64,
+        ge=1,
+        description=(
+            "Deprecated no-op kept for backward YAML compatibility. "
+            "Use dataloader_num_workers for fetch concurrency."
+        ),
+    )

As per coding guidelines: "Preserve config and checkpoint backward compatibility; explicitly handle ModeloptBaseConfig and other Pydantic-based configs to ensure older checkpoints remain loadable when configs change."

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@modelopt/torch/speculative/plugins/hf_training_args.py` around lines 52 - 69,
Add back a declared `streaming_prefetch` field to the config so unknown-field
validation (ConfigDict(extra="forbid")) does not reject legacy YAML: declare
`streaming_prefetch: bool | None = None` alongside the other fields (e.g., near
`mode`, `data_path`, etc.) and treat it as a deprecated no-op; additionally add
a minimal deprecation warning when it is set (via the model's init/validator for
the DataArguments / hf_training_args model) to inform users it is ignored while
preserving backwards compatibility.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Outside diff comments:
In `@modelopt/torch/speculative/plugins/hf_training_args.py`:
- Around line 52-69: Add back a declared `streaming_prefetch` field to the
config so unknown-field validation (ConfigDict(extra="forbid")) does not reject
legacy YAML: declare `streaming_prefetch: bool | None = None` alongside the
other fields (e.g., near `mode`, `data_path`, etc.) and treat it as a deprecated
no-op; additionally add a minimal deprecation warning when it is set (via the
model's init/validator for the DataArguments / hf_training_args model) to inform
users it is ignored while preserving backwards compatibility.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Enterprise

Run ID: 7f6214fe-beb9-4efe-a615-05835a00ffcc

📥 Commits

Reviewing files that changed from the base of the PR and between 9882ee0 and 9024ca9.

📒 Files selected for processing (5)
  • examples/speculative_decoding/eagle_utils.py
  • examples/speculative_decoding/main.py
  • modelopt/torch/speculative/plugins/hf_training_args.py
  • tests/unit/torch/speculative/plugins/test_hf_streaming_dataset.py
  • tools/launcher/examples/Qwen/Qwen3-8B/hf_streaming_eagle3.yaml
💤 Files with no reviewable changes (1)
  • tools/launcher/examples/Qwen/Qwen3-8B/hf_streaming_eagle3.yaml
🚧 Files skipped from review as they are similar to previous changes (1)
  • tests/unit/torch/speculative/plugins/test_hf_streaming_dataset.py

@h-guo18

h-guo18 commented Jun 4, 2026

Copy link
Copy Markdown
Contributor Author

/claude review

Comment thread modelopt/torch/speculative/plugins/hf_training_args.py Outdated
Comment thread modelopt/torch/speculative/plugins/hf_streaming_dataset.py Outdated
Comment thread tools/launcher/common/eagle3/train_eagle_streaming.sh Outdated

@claude claude Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Claude review summary

Findings: CRITICAL: 0 · IMPORTANT: 1 · SUGGESTION: 2

Most impactful finding

  • [IMPORTANT Compatibility] streaming_prefetch was removed from DataArguments (which has extra="forbid"), but the PR description claims the field is "kept so existing yamls still validate". Existing recipe YAMLs with data.streaming_prefetch=… will now hard-fail Pydantic validation rather than be silently tolerated. Re-declare it as a deprecated no-op field. (CodeRabbit raised the same issue and it remains unaddressed.)

Suggestions

  • DataLoader workers all start their round-robin cursor at _rr=0, so cold-start sends the first request from every worker × every rank to server_urls[0] — exactly the flood pattern the PR's docstring warns about. Stagger initial cursor by worker_id (and/or rank).
  • train_eagle_streaming.sh falls back to /scratchspace/eagle3 when deriving out_dir, but it now serves DFlash runs too — the failure mode is silent if a user forgets to forward training.output_dir=.

Overall assessment

Solid refactor — the map-style + DistributedSampler design is the right shape, the tests are good (especially the resume test that proves no re-fetch on skip_first_batches), and the writer-race retry + transient-error narrowing are nice clean-ups. The one IMPORTANT finding is a real but easy backward-compat fix; the rest are nice-to-have. Risk level: low-to-moderate (a single-line Pydantic addition closes the only blocking concern).

Algorithm correctness, mode/state composition, and HF/TRT-LLM export paths are unaffected by this PR (it stays inside the data-loading layer).

Comment thread modelopt/torch/speculative/plugins/hf_training_args.py Outdated
Comment thread modelopt/torch/speculative/plugins/hf_streaming_dataset.py Outdated
Comment thread tools/launcher/common/eagle3/train_eagle_streaming.sh Outdated
@copy-pr-bot

copy-pr-bot Bot commented Jun 8, 2026

Copy link
Copy Markdown

This pull request requires additional validation before any workflows can run on NVIDIA's runners.

Pull request vetters can view their responsibilities here.

Contributors can view more details about this message here.

@h-guo18 h-guo18 changed the title [Feat]: Specdec Multinode Streaming [Feat]: Specdec Streaming: Use RDMA + Multinode Support Jun 8, 2026
@h-guo18 h-guo18 marked this pull request as draft June 8, 2026 03:43
@h-guo18 h-guo18 changed the title [Feat]: Specdec Streaming: Use RDMA + Multinode Support [Feat]: Specdec Streaming: RDMA + Multinode Jun 8, 2026
@h-guo18 h-guo18 force-pushed the haoguo/multinode-streaming branch from d696144 to f16e927 Compare June 8, 2026 05:48
@h-guo18 h-guo18 force-pushed the haoguo/multinode-streaming branch from f16e927 to 81f0c3d Compare June 8, 2026 05:51
@h-guo18 h-guo18 requested a review from benchislett June 8, 2026 06:53
@h-guo18 h-guo18 marked this pull request as ready for review June 10, 2026 00:11
@ChenhanYu

Copy link
Copy Markdown
Collaborator

Need CHANGELOG and example README update.

@h-guo18 h-guo18 requested a review from a team as a code owner June 10, 2026 18:18
@h-guo18 h-guo18 requested a review from Edwardf0t1 June 10, 2026 18:18
@h-guo18 h-guo18 force-pushed the haoguo/multinode-streaming branch from e433c35 to bd049f7 Compare June 10, 2026 21:05
h-guo18 added 12 commits June 10, 2026 21:59
Signed-off-by: h-guo18 <67671475+h-guo18@users.noreply.github.com>
Signed-off-by: h-guo18 <67671475+h-guo18@users.noreply.github.com>
Signed-off-by: h-guo18 <67671475+h-guo18@users.noreply.github.com>
Signed-off-by: h-guo18 <67671475+h-guo18@users.noreply.github.com>
Signed-off-by: h-guo18 <67671475+h-guo18@users.noreply.github.com>
Signed-off-by: h-guo18 <67671475+h-guo18@users.noreply.github.com>
Signed-off-by: h-guo18 <67671475+h-guo18@users.noreply.github.com>
Signed-off-by: h-guo18 <67671475+h-guo18@users.noreply.github.com>
Signed-off-by: h-guo18 <67671475+h-guo18@users.noreply.github.com>
Signed-off-by: h-guo18 <67671475+h-guo18@users.noreply.github.com>
Signed-off-by: h-guo18 <67671475+h-guo18@users.noreply.github.com>
Signed-off-by: h-guo18 <67671475+h-guo18@users.noreply.github.com>
@h-guo18 h-guo18 force-pushed the haoguo/multinode-streaming branch from bd049f7 to 24abee0 Compare June 10, 2026 22:00
@h-guo18 h-guo18 merged commit 46eddab into main Jun 11, 2026
52 checks passed
@h-guo18 h-guo18 deleted the haoguo/multinode-streaming branch June 11, 2026 01:35
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants