Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 20 additions & 3 deletions lib/llm/src/kv_router/prefill_router.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

use std::sync::{Arc, OnceLock};
use std::sync::{
Arc, OnceLock,
atomic::{AtomicU64, Ordering},
};

use anyhow::Result;
use futures::StreamExt;
use rand::Rng;
use tokio::sync::{OwnedSemaphorePermit, oneshot};
use tokio_util::sync::CancellationToken;
use tracing::Instrument;
Expand Down Expand Up @@ -293,11 +295,26 @@ impl PrefillRouter {
let host = endpoint.bootstrap_host?;
let port = endpoint.bootstrap_port?;

let bootstrap_room: u64 = rand::rng().random();
let prefill_dp_size = match self
.prefill_router
.get()
.expect("prefill_router not initialized")
{
InnerPrefillRouter::KvRouter(router) => {
router.chooser.client().instances().len() as u64
}
InnerPrefillRouter::SimpleRouter(router) => router.client.instances().len() as u64,
};

// Encode the chosen DP rank into bootstrap_room
static BOOTSTRAP_ROOM_COUNTER: AtomicU64 = AtomicU64::new(0);
let base = BOOTSTRAP_ROOM_COUNTER.fetch_add(prefill_dp_size, Ordering::Relaxed);
let bootstrap_room = base + (dp_rank as u64);

tracing::info!(
worker_id = worker_id,
dp_rank = dp_rank,
prefill_dp_size = prefill_dp_size,
bootstrap_host = %host,
bootstrap_port = port,
bootstrap_room = bootstrap_room,
Expand Down
Loading