Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ members = [
"lib/runtime",
"lib/config",
"lib/tokens",
"lib/kv-router",
"lib/memory",
"lib/async-openai",
"lib/parsers",
Expand Down Expand Up @@ -47,6 +48,7 @@ dynamo-runtime = { path = "lib/runtime", version = "0.9.0" }
dynamo-llm = { path = "lib/llm", version = "0.9.0" }
dynamo-config = { path = "lib/config", version = "0.9.0" }
dynamo-tokens = { path = "lib/tokens", version = "0.9.0" }
dynamo-kv-router = { path = "lib/kv-router", version = "0.9.0", features = ["metrics"] }
dynamo-async-openai = { path = "lib/async-openai", version = "0.9.0", features = [
"byot",
"rustls",
Expand Down
48 changes: 48 additions & 0 deletions lib/kv-router/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

[package]
name = "dynamo-kv-router"
description = "KV Router - Radix tree for LLM KV cache routing"
version.workspace = true
edition.workspace = true
authors.workspace = true
license.workspace = true
repository.workspace = true

[features]
default = []
metrics = ["dep:dynamo-runtime"]
bench = ["dep:clap", "dep:indicatif"]

[dependencies]
# repo
dynamo-runtime = { workspace = true, optional = true }
dynamo-tokens = { workspace = true }

# workspace
anyhow = { workspace = true }
async-trait = { workspace = true }
prometheus = { workspace = true }
rand = { workspace = true }
serde = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tokio-util = { workspace = true }
tracing = { workspace = true }
xxhash-rust = { workspace = true }

# bench (optional)
clap = { version = "4.5", features = ["derive"], optional = true }
indicatif = { version = "0.18.0", optional = true }

[dev-dependencies]
rstest = "0.18.2"
rstest_reuse = "0.7.0"
serde_json = { workspace = true }
tokio = { workspace = true, features = ["rt", "macros", "time"] }

[[bench]]
name = "radix_tree_microbench"
harness = false
required-features = ["bench"]
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@
//! Size is defined as total (worker, block) pairs in the tree.
//! Depth is the number of blocks per sequence (depth = (isl + osl) / block_size).
//!
//! Run with: cargo run --package dynamo-llm --bin radix_tree_microbench --features kv-router-stress -- --help
//! Run with: cargo bench --package dynamo-kv-router --bench radix_tree_microbench --features bench -- --help

use clap::{Parser, ValueEnum};
use dynamo_llm::kv_router::{
use dynamo_kv_router::{
compute_block_hash_for_seq,
indexer::{RadixTree, RouterEvent},
protocols::{
ExternalSequenceBlockHash, KvCacheEvent, KvCacheEventData, KvCacheRemoveData,
KvCacheStoreData, KvCacheStoredBlockData, LocalBlockHash, WorkerId,
compute_block_hash_for_seq,
},
};
use rand::rngs::StdRng;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ use std::collections::{BinaryHeap, HashMap};
use std::hash::Hash;
use tokio::time::{Duration, Instant};

use crate::kv_router::indexer::KvRouterError;
use crate::kv_router::protocols::{ExternalSequenceBlockHash, WorkerWithDpRank};
use crate::indexer::KvRouterError;
use crate::protocols::{ExternalSequenceBlockHash, WorkerWithDpRank};

/// Block entry to be inserted in the [`PruneManager::expirations`] heap.
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
Expand Down Expand Up @@ -222,8 +222,8 @@ impl<K: Clone + Hash + Eq + Ord> PruneManager<K> {
#[cfg(test)]
mod tests {
use super::*;
use crate::kv_router::indexer::{KvIndexer, KvIndexerInterface, KvIndexerMetrics};
use crate::kv_router::protocols::{TokensWithHashes, WorkerId, WorkerWithDpRank};
use crate::indexer::{KvIndexer, KvIndexerInterface, KvIndexerMetrics};
use crate::protocols::{TokensWithHashes, WorkerId, WorkerWithDpRank};
use std::sync::Arc;
use tokio::time::{self, Duration, Instant};
use tokio_util::sync::CancellationToken;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,28 +32,42 @@
//! This module provides a scalable and efficient way to manage and retrieve data blocks for LLM inference, leveraging a global KV cache to optimize performance.

use async_trait::async_trait;
#[cfg(feature = "metrics")]
pub use dynamo_runtime::protocols::maybe_error::MaybeError;
#[cfg(feature = "metrics")]
use dynamo_runtime::{
component::Component,
metrics::{MetricsHierarchy, prometheus_names::kvrouter},
protocols::maybe_error::MaybeError,
};
use prometheus::{IntCounterVec, Opts};

/// Trait for types that may represent an error response.
/// Used for RPC-style responses that can indicate success or failure.
#[cfg(not(feature = "metrics"))]
pub trait MaybeError {
/// Construct an instance from an error.
fn from_err(err: Box<dyn std::error::Error + Send + Sync>) -> Self;
/// Convert to an error instance if this represents an error.
fn err(&self) -> Option<anyhow::Error>;
}
use serde::{Deserialize, Serialize};
#[cfg(feature = "metrics")]
use std::sync::OnceLock;
use std::{
cell::RefCell,
collections::{HashMap, HashSet, VecDeque},
iter,
rc::Rc,
sync::{Arc, Mutex, OnceLock},
sync::{Arc, Mutex},
thread::JoinHandle,
time::{Duration, Instant},
};
use tokio::sync::{broadcast, mpsc, oneshot};
use tokio_util::sync::CancellationToken;

use crate::kv_router::approx::{BlockEntry, PruneConfig, PruneManager};
use crate::kv_router::protocols::*;
use crate::tokens::SequenceHash;
use crate::approx::{BlockEntry, PruneConfig, PruneManager};
use crate::protocols::*;
use dynamo_tokens::SequenceHash;

/// Errors that can occur in the KV Router.
#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -622,9 +636,14 @@ pub const METRIC_EVENT_STORED: &str = "stored";
pub const METRIC_EVENT_REMOVED: &str = "removed";
pub const METRIC_EVENT_CLEARED: &str = "cleared";

/// Metric name for KV cache events applied counter.
const KV_CACHE_EVENTS_APPLIED_NAME: &str = "dynamo_kvrouter_kv_cache_events_applied";

#[cfg(feature = "metrics")]
static KV_INDEXER_METRICS: OnceLock<Arc<KvIndexerMetrics>> = OnceLock::new();

impl KvIndexerMetrics {
#[cfg(feature = "metrics")]
fn new(kv_cache_events_applied: IntCounterVec) -> Self {
Self {
kv_cache_events_applied,
Expand All @@ -633,6 +652,7 @@ impl KvIndexerMetrics {

/// Creates a new KvIndexerMetrics from a Component, memoizing the result in
/// KV_INDEXER_METRICS to avoid duplicate registration issues.
#[cfg(feature = "metrics")]
pub fn from_component(component: &Component) -> Arc<Self> {
KV_INDEXER_METRICS.get_or_init(|| {
match component.metrics().create_intcountervec(
Expand All @@ -656,7 +676,7 @@ impl KvIndexerMetrics {
Self {
kv_cache_events_applied: IntCounterVec::new(
Opts::new(
kvrouter::KV_CACHE_EVENTS_APPLIED,
KV_CACHE_EVENTS_APPLIED_NAME,
"Total number of KV cache events applied to index",
),
&["event_type", "status"],
Expand Down Expand Up @@ -2038,14 +2058,14 @@ impl Drop for KvIndexerSharded {
#[cfg(test)]
mod tests {
use super::*;
use crate::kv_router::protocols::{ExternalSequenceBlockHash, LocalBlockHash};
use crate::protocols::{ExternalSequenceBlockHash, LocalBlockHash};
use rstest::rstest;
use rstest_reuse::{self, *};
use tokio::time;
use tokio_util::sync::CancellationToken;

fn setup() {
dynamo_runtime::logging::init();
// Logging init removed to avoid dynamo-runtime dependency
}

fn make_blocks(hashes: Vec<u64>) -> Vec<KvCacheStoredBlockData> {
Expand Down
15 changes: 15 additions & 0 deletions lib/kv-router/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

//! KV Router - Radix tree data structures for LLM KV cache routing.
//!
//! This crate provides the core radix tree implementation and protocols for
//! efficient KV cache lookup and routing in distributed LLM inference systems.
pub mod approx;
pub mod indexer;
pub mod protocols;

// Re-export key types for convenience
pub use indexer::{MaybeError, RadixTree, RouterEvent};
pub use protocols::{LocalBlockHash, WorkerId, compute_block_hash_for_seq};
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

use crate::tokens::{SequenceHash, Token};
use dynamo_tokens::{SequenceHash, Token};
use serde::{Deserialize, Serialize};
use xxhash_rust::xxh3;

Expand Down
7 changes: 2 additions & 5 deletions lib/llm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ required-features = ["block-manager", "testing-cuda"]
[dependencies]
# repo
dynamo-runtime = { workspace = true }
dynamo-tokens = { workspace = true }
dynamo-kv-router = { workspace = true, features = ["metrics"] }
dynamo-memory = { path = "../memory", optional = true }

# workspace
Expand Down Expand Up @@ -208,8 +210,3 @@ name = "bench_local_transfer_v2"
path = "bin/bench_local_transfer_v2.rs"
required-features = ["block-manager-bench"]

[[bench]]
name = "radix_tree_microbench"
path = "benches/radix_tree_microbench.rs"
harness = false
required-features = ["kv-router-stress"]
8 changes: 5 additions & 3 deletions lib/llm/src/kv_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@ use rand::Rng;
use serde::{Deserialize, Serialize};
use serde_json::json;

pub mod approx;
pub mod indexer;
// Re-export from dynamo-kv-router crate
pub use dynamo_kv_router::approx;
pub use dynamo_kv_router::indexer;
pub use dynamo_kv_router::protocols;

pub mod prefill_router;
pub mod protocols;
pub mod publisher;
pub mod recorder;
pub mod scheduler;
Expand Down
2 changes: 1 addition & 1 deletion lib/llm/src/kv_router/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use super::indexer::OverlapScores;
use super::protocols::{DpRank, WorkerId, WorkerSelectionResult, WorkerWithDpRank};
use super::sequence::{ActiveSequencesMultiWorker, SequenceError};

use crate::tokens::SequenceHash;
use dynamo_tokens::SequenceHash;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct KVHitRateEvent {
Expand Down
2 changes: 1 addition & 1 deletion lib/llm/src/kv_router/sequence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@
//! requests share common prefixes (e.g., system prompts, few-shot examples).

use crate::kv_router::indexer::OverlapScores;
use crate::tokens::SequenceHash;
use anyhow::Result;
use dashmap::DashMap;
use derive_getters::Getters;
use dynamo_runtime::component::Component;
use dynamo_runtime::traits::DistributedRuntimeProvider;
use dynamo_runtime::transports::event_plane::{EventPublisher, EventSubscriber};
use dynamo_tokens::SequenceHash;
use std::collections::{HashMap, HashSet};
use std::rc::{Rc, Weak};
use std::sync::Arc;
Expand Down
4 changes: 2 additions & 2 deletions lib/llm/src/mocker/kv_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ use crate::kv_router::publisher::KvEventPublisher;
use crate::mocker::evictor::LRUEvictor;
use crate::mocker::protocols::{MoveBlock, PrefillCost};
use crate::mocker::sequence::ActiveSequence;
use crate::tokens::blocks::UniqueBlock;
use crate::tokens::{BlockHash, SequenceHash};
use derive_getters::Getters;
use dynamo_runtime::component::Component;
use dynamo_tokens::blocks::UniqueBlock;
use dynamo_tokens::{BlockHash, SequenceHash};
use std::collections::{HashMap, HashSet};
use std::sync::Arc;

Expand Down
4 changes: 2 additions & 2 deletions lib/llm/src/mocker/protocols.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use std::sync::Arc;
use uuid::Uuid;

use crate::mocker::perf_model::PerfModel;
use crate::tokens::blocks::UniqueBlock;
use crate::tokens::{BlockHash, SequenceHash, Token};
use dynamo_tokens::blocks::UniqueBlock;
use dynamo_tokens::{BlockHash, SequenceHash, Token};

pub type NumBlocks = usize;

Expand Down
2 changes: 1 addition & 1 deletion lib/llm/src/mocker/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use crate::mocker::protocols::{
};
use crate::mocker::running_mean::RunningMean;
use crate::mocker::sequence::ActiveSequence;
use crate::tokens::blocks::UniqueBlock;
use dynamo_tokens::blocks::UniqueBlock;
use std::collections::{HashMap, VecDeque};
use tokio::sync::mpsc;
use tokio::time::Duration;
Expand Down
4 changes: 2 additions & 2 deletions lib/llm/src/mocker/sequence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
// SPDX-License-Identifier: Apache-2.0

use crate::mocker::protocols::MoveBlock;
use crate::tokens::blocks::UniqueBlock;
use crate::tokens::{TokenBlockSequence, Tokens};
use derive_getters::Getters;
use dynamo_tokens::blocks::UniqueBlock;
use dynamo_tokens::{TokenBlockSequence, Tokens};
use rand::random;

/// Create unique blocks from a TokenBlockSequence
Expand Down
2 changes: 0 additions & 2 deletions lib/llm/src/tokens.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ use derive_getters::Dissolve;
use rayon::prelude::*;
use std::ops::Range;

pub mod blocks;

/// A token is represented as a 32-bit unsigned integer.
pub type Token = u32;

Expand Down
1 change: 1 addition & 0 deletions lib/tokens/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ dashmap = { workspace = true }
derive-getters = { workspace = true }
serde = { workspace = true }
thiserror = { workspace = true }
uuid = { workspace = true }
xxhash-rust = { workspace = true }
bs58 = "0.5"

Expand Down
Loading
Loading