-
Notifications
You must be signed in to change notification settings - Fork 3.9k
[model-gateway] new radix tree #16249
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Introduce the RadixTree trait as a common interface for cache-aware routing trees. This provides a unified API for both character-based and token-based implementations. - RadixTree trait with insert, prefix_match, evict, and reset methods - MatchResult trait for polymorphic match results with hit_ratio - Common TenantId type (Arc<str>) for efficient tenant identification
Character-based radix tree optimized for HTTP router with text input. This is a refactored version of the existing policies::tree::Tree with improved structure and the new RadixTree trait interface. Features: - DashMap-based concurrent node storage with custom CharHasher - Prefix compression for memory efficiency - LRU-based tenant eviction with timestamp tracking - Probabilistic cache updates to reduce write contention - Comprehensive test suite including concurrent operations
Token-based radix tree optimized for gRPC router with pre-tokenized input. Operates on token IDs (u32) instead of characters, matching SGLang's Python scheduler which works with token arrays. Performance benefits vs StringTree: - ~4x fewer nodes to traverse (500 tokens vs 2000 chars for same content) - 2.4-2.9x faster INSERT operations - ~4x faster MATCH operations - Significant memory savings from reduced node count Features: - Custom TokenHasher with golden ratio mixing for u32 keys - O(1) split operations using DashMap::clone() - Same concurrent safety guarantees as StringTree - Comprehensive test suite with 47 test cases
…nTree Criterion-based benchmark suite comparing the two radix tree implementations for cache-aware routing performance. Benchmarks included: - INSERT: Fresh tree insertions at various worker scales (10-500) - MATCH: Prefix matching against pre-populated trees - CONCURRENT: Mixed workload with 64 threads - OLD vs NEW: Validation that StringTree matches legacy tree performance Test configuration: - StringTree: ~2000 chars per request (realistic LLM prompts) - TokenTree: ~500 tokens per request (4:1 char-to-token ratio) - 10,000 tree entries for MATCH tests - Worker counts: 10, 50, 100, 500
|
Warning You have reached your daily quota limit. Please wait up to 24 hours and I will start processing your requests again! |
Add benchmark-radix-tree job to pr-benchmark-rust.yml to run StringTree vs TokenTree comparison in CI. - Runs on 4-gpu-a10 runner for multi-threaded benchmarks - Triggered on push to main, workflow_dispatch, or PR with labels - Results included in benchmark summary report - Uploads Criterion results as artifacts
8aba36a to
26533e0
Compare
e5a513e to
c58775c
Compare
d2ab348 to
b4f255a
Compare
|
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a new, highly-performant, and concurrent radix tree implementation, which is a significant piece of work. It includes two variants: one for strings (StringTree) and one for tokens (TokenTree), along with a comprehensive benchmark suite. The code is well-structured with clear separation of concerns, and the use of dashmap, custom hashers, and other performance optimizations is commendable.
However, I've identified a few issues that should be addressed. Most critically, the StringTree implementation has a memory leak due to Arc reference cycles with parent pointers. There are also some inconsistencies between the two tree implementations, an opportunity to improve performance in StringTree's tenant handling, and some minor issues in the benchmark code. Addressing these points will make the new radix tree implementation robust and production-ready.
| #[inline] | ||
| fn intern_tenant(tenant: &str) -> TenantId { | ||
| Arc::from(tenant) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The intern_tenant function is not actually interning tenant strings. It creates a new Arc<str> for every call, even for the same tenant string. This leads to unnecessary allocations and increased memory usage, as multiple Arcs will point to different allocations of the same string data. This also reduces the effectiveness of using TenantId as a key in DashMap.
For true interning, you should use a shared pool to store and reuse Arc<str> instances. The token_tree.rs implementation already does this correctly using a global static DashMap. I recommend adopting the same pattern here for consistency and performance.
First, add a static pool at the module level:
use once_cell::sync::Lazy; // Already a dependency
static TENANT_INTERN_POOL: Lazy<DashMap<Arc<str>, ()>> = Lazy::new(DashMap::new);Then, update intern_tenant to use this pool as shown in the suggestion.
| #[inline] | |
| fn intern_tenant(tenant: &str) -> TenantId { | |
| Arc::from(tenant) | |
| } | |
| #[inline] | |
| fn intern_tenant(tenant: &str) -> TenantId { | |
| if let Some(entry) = TENANT_INTERN_POOL.get(tenant) { | |
| return Arc::clone(entry.key()); | |
| } | |
| let interned: Arc<str> = Arc::from(tenant); | |
| TENANT_INTERN_POOL.insert(Arc::clone(&interned), ()); | |
| interned | |
| } |
| let mut rng = thread_rng(); | ||
| (0..count) | ||
| .map(|_| { | ||
| let prefix_idx = rng.random_range(0..CONVERSATION_PREFIXES.len()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The random_range method has been deprecated since rand version 0.8. You should use gen_range instead for generating random numbers within a range.
This change should be applied in all places where random_range is used, including lines 83, 98, and 99 in this file, as well as in benches/tree_benchmark.rs.
| let prefix_idx = rng.random_range(0..CONVERSATION_PREFIXES.len()); | |
| let prefix_idx = rng.gen_range(0..CONVERSATION_PREFIXES.len()); |
| } | ||
| } | ||
|
|
||
| pub fn remove_tenant(&self, tenant: &str) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The function remove_tenant is public, but it's not part of the RadixTree trait and is not implemented for TokenTree. The same applies to evict_tenant_by_size (line 733).
If these methods are intended for internal use or testing only, they should be made crate-private by changing pub to pub(crate). This would make the public API consistent across both tree implementations and clarify the intended usage.
| pub fn remove_tenant(&self, tenant: &str) { | |
| pub(crate) fn remove_tenant(&self, tenant: &str) { |
| pub fn evict_tenant(&self, tenant: &TenantId, max_tokens: usize) { | ||
| let current_count = self.tenant_token_count.get(tenant).map(|v| *v).unwrap_or(0); | ||
|
|
||
| if current_count <= max_tokens { | ||
| return; | ||
| } | ||
|
|
||
| let to_evict = current_count - max_tokens; | ||
| let mut evicted = 0; | ||
|
|
||
| // Collect nodes with timestamps for LRU eviction | ||
| let mut nodes_with_time: Vec<(NodeRef, u64)> = Vec::new(); | ||
| self.collect_tenant_nodes(&self.root, tenant, &mut nodes_with_time); | ||
|
|
||
| // Sort by timestamp (oldest first) | ||
| nodes_with_time.sort_by_key(|(_, ts)| *ts); | ||
|
|
||
| for (node, _) in nodes_with_time { | ||
| if evicted >= to_evict { | ||
| break; | ||
| } | ||
|
|
||
| let node_tokens = node.tokens.read().len(); | ||
| if self.remove_tenant_from_node(&node, tenant) { | ||
| evicted += node_tokens; | ||
| } | ||
| } | ||
|
|
||
| // Update tenant token count | ||
| self.tenant_token_count | ||
| .entry(tenant.clone()) | ||
| .and_modify(|count| *count = count.saturating_sub(evicted)); | ||
|
|
||
| debug!( | ||
| tenant = %tenant.as_ref(), | ||
| evicted = evicted, | ||
| remaining = current_count.saturating_sub(evicted), | ||
| "Evicted tokens from tenant" | ||
| ); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The evict_tenant function correctly removes tenant ownership from nodes, but it doesn't prune the nodes themselves if they become empty (i.e., have no tenants and no children). Over time, this can lead to an accumulation of "dead" nodes in the tree, consuming memory without holding any data.
The string_tree.rs implementation attempts to prune empty nodes during eviction, which seems to be the desired behavior for long-running services.
To fix this, you would need to:
- Add parent pointers to the
Nodestruct intoken_tree.rs(usingWeak<Node>to avoid reference cycles). - Update the eviction logic to traverse up from an evicted leaf and remove parent nodes if they become empty.
This would make the behavior consistent with string_tree.rs and prevent memory bloat from empty nodes.
94b04e1 to
7d610cf
Compare
|
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces new, highly-performant, and concurrent radix tree implementations (StringTree and TokenTree) for cache-aware routing, along with extensive benchmarks. The implementations showcase many advanced optimization techniques like custom hashers, tenant interning, and lock-free epoch counters. I've found some critical issues in the eviction logic for both StringTree and TokenTree that could lead to memory leaks over time due to not cleaning up empty nodes. Specifically, the TokenTree's Node struct is missing a parent pointer, which makes this cleanup impossible to implement correctly. I've also included some suggestions to improve the new benchmark file's maintainability and use of modern APIs.
| tokens: Box<[TokenId]>, | ||
| /// Children nodes keyed by first PAGE_SIZE tokens (page key) | ||
| children: DashMap<TokenPageKey, NodeRef, TokenPageHasherBuilder>, | ||
| /// Tenants that own this node with last access timestamps |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Node struct is missing a parent pointer. This is a critical omission because it makes it impossible to traverse up the tree, which is necessary for cleaning up empty nodes during eviction. This will lead to memory leaks as evicted nodes that become empty cannot be removed from their parent's children map.
The string_tree.rs implementation correctly includes a parent pointer (RwLock<Option<Weak<Node>>>), and a similar field should be added here. You will also need to update the Node::new and Node::new_root methods to initialize this new field.
| /// Tenants that own this node with last access timestamps | |
| /// Parent pointer for upward traversal during eviction cleanup. | |
| /// Uses Weak to avoid Arc reference cycles. | |
| parent: ParkingLotRwLock<Option<Weak<Node>>>, | |
| /// Tenants that own this node with last access timestamps |
| pub fn evict_tenant(&self, tenant: &TenantId, max_tokens: usize) { | ||
| use std::{cmp::Reverse, collections::BinaryHeap}; | ||
|
|
||
| let current_count = self | ||
| .tenant_token_count | ||
| .get(tenant.as_ref()) | ||
| .map(|v| *v) | ||
| .unwrap_or(0); | ||
|
|
||
| if current_count <= max_tokens { | ||
| return; | ||
| } | ||
|
|
||
| let to_evict = current_count - max_tokens; | ||
| let mut evicted = 0; | ||
|
|
||
| // Collect initial tenant-specific leaves | ||
| let mut leaves: Vec<(NodeRef, u64)> = Vec::new(); | ||
| self.collect_tenant_leaves(&self.root, tenant, &mut leaves); | ||
|
|
||
| // Build min-heap: oldest timestamp first (using Reverse for min-heap behavior) | ||
| let mut heap: BinaryHeap<Reverse<(u64, usize)>> = BinaryHeap::new(); | ||
| let mut leaf_nodes: Vec<NodeRef> = Vec::with_capacity(leaves.len()); | ||
|
|
||
| for (node, ts) in leaves.drain(..) { | ||
| let idx = leaf_nodes.len(); | ||
| leaf_nodes.push(node); | ||
| heap.push(Reverse((ts, idx))); | ||
| } | ||
|
|
||
| // Track which nodes we've already evicted (by index) | ||
| let mut evicted_indices: std::collections::HashSet<usize> = | ||
| std::collections::HashSet::new(); | ||
|
|
||
| while evicted < to_evict { | ||
| // Pop oldest leaf | ||
| let Some(Reverse((_, idx))) = heap.pop() else { | ||
| // No more leaves - need to re-collect (parent became leaf) | ||
| leaves.clear(); | ||
| self.collect_tenant_leaves(&self.root, tenant, &mut leaves); | ||
|
|
||
| if leaves.is_empty() { | ||
| break; // No more nodes with this tenant | ||
| } | ||
|
|
||
| // Rebuild heap with new leaves | ||
| leaf_nodes.clear(); | ||
| evicted_indices.clear(); | ||
| for (node, ts) in leaves.drain(..) { | ||
| let idx = leaf_nodes.len(); | ||
| leaf_nodes.push(node); | ||
| heap.push(Reverse((ts, idx))); | ||
| } | ||
| continue; | ||
| }; | ||
|
|
||
| // Skip if already evicted | ||
| if evicted_indices.contains(&idx) { | ||
| continue; | ||
| } | ||
|
|
||
| let node = &leaf_nodes[idx]; | ||
| let node_tokens = node.tokens.len(); | ||
|
|
||
| // Remove tenant from this leaf node | ||
| if self.remove_tenant_from_node(node, tenant) { | ||
| evicted += node_tokens; | ||
| evicted_indices.insert(idx); | ||
| } | ||
|
|
||
| // Note: We don't immediately check if parent became leaf. | ||
| // When heap empties, we re-collect leaves which will find new ones. | ||
| } | ||
|
|
||
| // Update tenant token count | ||
| if let Some(mut count) = self.tenant_token_count.get_mut(tenant.as_ref()) { | ||
| *count = count.saturating_sub(evicted); | ||
| } | ||
|
|
||
| debug!( | ||
| tenant = %tenant.as_ref(), | ||
| evicted = evicted, | ||
| remaining = current_count.saturating_sub(evicted), | ||
| "Evicted tokens from tenant (leaf-first LRU)" | ||
| ); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The evict_tenant function removes a tenant's ownership from a node but doesn't clean up the node itself if it becomes empty (i.e., has no tenants and no children). This can lead to memory bloat as these empty nodes will remain in the tree.
After adding a parent pointer to the Node struct (as suggested in another comment), this function should be updated to traverse up from an evicted leaf and remove any nodes that become empty. The logic in string_tree.rs::remove_tenant can serve as a good example for implementing this cleanup.
| fn bench_summary(c: &mut Criterion) { | ||
| let mut group = c.benchmark_group("benchmark_summary"); | ||
|
|
||
| // Reduce warmup and measurement time for faster runs | ||
| group.warm_up_time(std::time::Duration::from_millis(500)); | ||
| group.measurement_time(std::time::Duration::from_secs(2)); | ||
| group.sample_size(50); | ||
|
|
||
| // Configuration constants | ||
| const TREE_SIZE: usize = 2_000; | ||
| const INSERT_POOL_SIZE: usize = 2_000; | ||
| const NUM_THREADS: usize = 32; | ||
| const OPS_PER_THREAD: usize = 100; | ||
|
|
||
| // Worker counts and sizes to test (reduced for faster runs) | ||
| const WORKER_COUNTS: [usize; 3] = [10, 100, 500]; | ||
| const TOKEN_SIZES: [usize; 3] = [1024, 4096, 16384]; | ||
| const CHAR_SIZES: [usize; 3] = [4096, 16384, 65536]; | ||
|
|
||
| // ======================================================================== | ||
| // OLD StringTree Benchmark | ||
| // ======================================================================== | ||
| for &num_workers in &WORKER_COUNTS { | ||
| let workers = generate_worker_endpoints(num_workers); | ||
|
|
||
| for &char_size in &CHAR_SIZES { | ||
| let fixed_strings = generate_fixed_char_strings(INSERT_POOL_SIZE, char_size); | ||
|
|
||
| // Pre-populate tree for MATCH | ||
| let old_tree = Arc::new(OldTree::new()); | ||
| for (i, s) in fixed_strings.iter().take(TREE_SIZE).enumerate() { | ||
| let tenant = &workers[i % workers.len()]; | ||
| old_tree.insert(s, tenant); | ||
| } | ||
|
|
||
| // OLD tree INSERT | ||
| let printed = Arc::new(AtomicBool::new(false)); | ||
| let bench_name = format!("old_insert_{}w_{}c", num_workers, char_size); | ||
| let workers_clone = workers.clone(); | ||
| let strings_clone = fixed_strings.clone(); | ||
|
|
||
| group.bench_function(&bench_name, |b| { | ||
| let workers = workers_clone.clone(); | ||
| let strings = strings_clone.clone(); | ||
| let printed = printed.clone(); | ||
|
|
||
| b.iter_custom(|iters| { | ||
| let tree = OldTree::new(); | ||
| let start = Instant::now(); | ||
| for i in 0..iters { | ||
| let tenant = &workers[i as usize % workers.len()]; | ||
| let text = &strings[i as usize % strings.len()]; | ||
| tree.insert(black_box(text), tenant); | ||
| } | ||
| let duration = start.elapsed(); | ||
|
|
||
| if !printed.swap(true, Ordering::Relaxed) { | ||
| let ops_per_sec = iters as f64 / duration.as_secs_f64(); | ||
| let latency_us = duration.as_nanos() as f64 / iters as f64 / 1000.0; | ||
| let throughput_mb = (ops_per_sec * char_size as f64) / 1_000_000.0; | ||
| add_result( | ||
| "old", | ||
| format!( | ||
| "{:>3}w | {:>5} chars | INSERT | {:>8.0} ops/s | {:>6.1} µs | {:>7.1} MB/s", | ||
| num_workers, char_size, ops_per_sec, latency_us, throughput_mb | ||
| ), | ||
| ); | ||
| } | ||
|
|
||
| duration | ||
| }); | ||
| }); | ||
|
|
||
| // OLD tree MATCH | ||
| let printed = Arc::new(AtomicBool::new(false)); | ||
| let bench_name = format!("old_match_{}w_{}c", num_workers, char_size); | ||
| let tree_clone = old_tree.clone(); | ||
| let strings_clone = fixed_strings.clone(); | ||
|
|
||
| group.bench_function(&bench_name, |b| { | ||
| let tree = tree_clone.clone(); | ||
| let strings = strings_clone.clone(); | ||
| let mut idx = 0; | ||
| let printed = printed.clone(); | ||
|
|
||
| b.iter_custom(|iters| { | ||
| let start = Instant::now(); | ||
| for _ in 0..iters { | ||
| let result = tree.prefix_match(black_box(&strings[idx % strings.len()])); | ||
| black_box(result); | ||
| idx += 1; | ||
| } | ||
| let duration = start.elapsed(); | ||
|
|
||
| if !printed.swap(true, Ordering::Relaxed) { | ||
| let ops_per_sec = iters as f64 / duration.as_secs_f64(); | ||
| let latency_us = duration.as_nanos() as f64 / iters as f64 / 1000.0; | ||
| let throughput_mb = (ops_per_sec * char_size as f64) / 1_000_000.0; | ||
| add_result( | ||
| "old", | ||
| format!( | ||
| "{:>3}w | {:>5} chars | MATCH | {:>8.0} ops/s | {:>6.1} µs | {:>7.1} MB/s", | ||
| num_workers, char_size, ops_per_sec, latency_us, throughput_mb | ||
| ), | ||
| ); | ||
| } | ||
|
|
||
| duration | ||
| }); | ||
| }); | ||
| } | ||
|
|
||
| // OLD tree CONCURRENT | ||
| let printed = Arc::new(AtomicBool::new(false)); | ||
| let bench_name = format!("old_concurrent_{}w", num_workers); | ||
| let workers_clone = workers.clone(); | ||
|
|
||
| group.bench_function(&bench_name, |b| { | ||
| let printed = printed.clone(); | ||
| let workers = workers_clone.clone(); | ||
|
|
||
| b.iter_custom(|iters| { | ||
| let start = Instant::now(); | ||
| for _ in 0..iters { | ||
| let tree = Arc::new(OldTree::new()); | ||
| let workers_ref = &workers; | ||
| let handles: Vec<_> = (0..NUM_THREADS) | ||
| .map(|t| { | ||
| let tree = Arc::clone(&tree); | ||
| let worker = workers_ref[t % workers_ref.len()].clone(); | ||
| thread::spawn(move || { | ||
| for i in 0..OPS_PER_THREAD { | ||
| let text = format!( | ||
| "{}thread{}_request{}_padding{}", | ||
| CONVERSATION_PREFIXES[i % CONVERSATION_PREFIXES.len()], | ||
| t, | ||
| i, | ||
| "x".repeat(1000) | ||
| ); | ||
| if i % 3 == 0 { | ||
| tree.prefix_match(&text); | ||
| } else { | ||
| tree.insert(&text, &worker); | ||
| } | ||
| } | ||
| }) | ||
| }) | ||
| .collect(); | ||
|
|
||
| for h in handles { | ||
| h.join().unwrap(); | ||
| } | ||
| } | ||
| let duration = start.elapsed(); | ||
|
|
||
| if !printed.swap(true, Ordering::Relaxed) { | ||
| let total_ops = iters * NUM_THREADS as u64 * OPS_PER_THREAD as u64; | ||
| let ops_per_sec = total_ops as f64 / duration.as_secs_f64(); | ||
| add_result( | ||
| "old", | ||
| format!( | ||
| "{:>3}w | CONCURRENT | {:>7.0} ops/s | {} threads x {} ops", | ||
| num_workers, ops_per_sec, NUM_THREADS, OPS_PER_THREAD | ||
| ), | ||
| ); | ||
| } | ||
|
|
||
| duration | ||
| }); | ||
| }); | ||
| } | ||
|
|
||
| // ======================================================================== | ||
| // NEW StringTree Benchmark | ||
| // ======================================================================== | ||
| for &num_workers in &WORKER_COUNTS { | ||
| let workers = generate_worker_endpoints(num_workers); | ||
|
|
||
| for &char_size in &CHAR_SIZES { | ||
| let fixed_strings = generate_fixed_char_strings(INSERT_POOL_SIZE, char_size); | ||
|
|
||
| // Pre-populate tree for MATCH | ||
| let string_tree = Arc::new(StringTree::new()); | ||
| for (i, s) in fixed_strings.iter().take(TREE_SIZE).enumerate() { | ||
| let tenant = &workers[i % workers.len()]; | ||
| string_tree.insert_text(s, tenant); | ||
| } | ||
|
|
||
| // StringTree INSERT | ||
| let printed = Arc::new(AtomicBool::new(false)); | ||
| let bench_name = format!("string_insert_{}w_{}c", num_workers, char_size); | ||
| let workers_clone = workers.clone(); | ||
| let strings_clone = fixed_strings.clone(); | ||
|
|
||
| group.bench_function(&bench_name, |b| { | ||
| let workers = workers_clone.clone(); | ||
| let strings = strings_clone.clone(); | ||
| let printed = printed.clone(); | ||
|
|
||
| b.iter_custom(|iters| { | ||
| let tree = StringTree::new(); | ||
| let start = Instant::now(); | ||
| for i in 0..iters { | ||
| let tenant = &workers[i as usize % workers.len()]; | ||
| let text = &strings[i as usize % strings.len()]; | ||
| tree.insert_text(black_box(text), tenant); | ||
| } | ||
| let duration = start.elapsed(); | ||
|
|
||
| if !printed.swap(true, Ordering::Relaxed) { | ||
| let ops_per_sec = iters as f64 / duration.as_secs_f64(); | ||
| let latency_us = duration.as_nanos() as f64 / iters as f64 / 1000.0; | ||
| let throughput_mb = (ops_per_sec * char_size as f64) / 1_000_000.0; | ||
| add_result( | ||
| "string", | ||
| format!( | ||
| "{:>3}w | {:>5} chars | INSERT | {:>8.0} ops/s | {:>6.1} µs | {:>7.1} MB/s", | ||
| num_workers, char_size, ops_per_sec, latency_us, throughput_mb | ||
| ), | ||
| ); | ||
| } | ||
|
|
||
| duration | ||
| }); | ||
| }); | ||
|
|
||
| // StringTree MATCH | ||
| let printed = Arc::new(AtomicBool::new(false)); | ||
| let bench_name = format!("string_match_{}w_{}c", num_workers, char_size); | ||
| let tree_clone = string_tree.clone(); | ||
| let strings_clone = fixed_strings.clone(); | ||
|
|
||
| group.bench_function(&bench_name, |b| { | ||
| let tree = tree_clone.clone(); | ||
| let strings = strings_clone.clone(); | ||
| let mut idx = 0; | ||
| let printed = printed.clone(); | ||
|
|
||
| b.iter_custom(|iters| { | ||
| let start = Instant::now(); | ||
| for _ in 0..iters { | ||
| let result = tree.prefix_match_legacy(black_box(&strings[idx % strings.len()])); | ||
| black_box(result); | ||
| idx += 1; | ||
| } | ||
| let duration = start.elapsed(); | ||
|
|
||
| if !printed.swap(true, Ordering::Relaxed) { | ||
| let ops_per_sec = iters as f64 / duration.as_secs_f64(); | ||
| let latency_us = duration.as_nanos() as f64 / iters as f64 / 1000.0; | ||
| let throughput_mb = (ops_per_sec * char_size as f64) / 1_000_000.0; | ||
| add_result( | ||
| "string", | ||
| format!( | ||
| "{:>3}w | {:>5} chars | MATCH | {:>8.0} ops/s | {:>6.1} µs | {:>7.1} MB/s", | ||
| num_workers, char_size, ops_per_sec, latency_us, throughput_mb | ||
| ), | ||
| ); | ||
| } | ||
|
|
||
| duration | ||
| }); | ||
| }); | ||
| } | ||
|
|
||
| // StringTree CONCURRENT | ||
| let printed = Arc::new(AtomicBool::new(false)); | ||
| let bench_name = format!("string_concurrent_{}w", num_workers); | ||
| let workers_clone = workers.clone(); | ||
|
|
||
| group.bench_function(&bench_name, |b| { | ||
| let printed = printed.clone(); | ||
| let workers = workers_clone.clone(); | ||
|
|
||
| b.iter_custom(|iters| { | ||
| let start = Instant::now(); | ||
| for _ in 0..iters { | ||
| let tree = Arc::new(StringTree::new()); | ||
| let workers_ref = &workers; | ||
| let handles: Vec<_> = (0..NUM_THREADS) | ||
| .map(|t| { | ||
| let tree = Arc::clone(&tree); | ||
| let worker = workers_ref[t % workers_ref.len()].clone(); | ||
| thread::spawn(move || { | ||
| for i in 0..OPS_PER_THREAD { | ||
| let text = format!( | ||
| "{}thread{}_request{}_padding{}", | ||
| CONVERSATION_PREFIXES[i % CONVERSATION_PREFIXES.len()], | ||
| t, | ||
| i, | ||
| "x".repeat(1000) | ||
| ); | ||
| if i % 3 == 0 { | ||
| tree.prefix_match_legacy(&text); | ||
| } else { | ||
| tree.insert_text(&text, &worker); | ||
| } | ||
| } | ||
| }) | ||
| }) | ||
| .collect(); | ||
|
|
||
| for h in handles { | ||
| h.join().unwrap(); | ||
| } | ||
| } | ||
| let duration = start.elapsed(); | ||
|
|
||
| if !printed.swap(true, Ordering::Relaxed) { | ||
| let total_ops = iters * NUM_THREADS as u64 * OPS_PER_THREAD as u64; | ||
| let ops_per_sec = total_ops as f64 / duration.as_secs_f64(); | ||
| add_result( | ||
| "string", | ||
| format!( | ||
| "{:>3}w | CONCURRENT | {:>7.0} ops/s | {} threads x {} ops", | ||
| num_workers, ops_per_sec, NUM_THREADS, OPS_PER_THREAD | ||
| ), | ||
| ); | ||
| } | ||
|
|
||
| duration | ||
| }); | ||
| }); | ||
| } | ||
|
|
||
| // ======================================================================== | ||
| // TokenTree Benchmark | ||
| // ======================================================================== | ||
| for &num_workers in &WORKER_COUNTS { | ||
| let workers = generate_worker_endpoints(num_workers); | ||
|
|
||
| for &token_size in &TOKEN_SIZES { | ||
| let fixed_sequences = generate_fixed_token_sequences(INSERT_POOL_SIZE, token_size); | ||
|
|
||
| // Pre-populate tree for MATCH | ||
| let token_tree = Arc::new(TokenTree::new()); | ||
| for (i, seq) in fixed_sequences.iter().take(TREE_SIZE).enumerate() { | ||
| let tenant = &workers[i % workers.len()]; | ||
| token_tree.insert_tokens(seq, tenant); | ||
| } | ||
|
|
||
| // TokenTree INSERT | ||
| let printed = Arc::new(AtomicBool::new(false)); | ||
| let bench_name = format!("token_insert_{}w_{}tok", num_workers, token_size); | ||
| let workers_clone = workers.clone(); | ||
| let seqs_clone = fixed_sequences.clone(); | ||
|
|
||
| group.bench_function(&bench_name, |b| { | ||
| let workers = workers_clone.clone(); | ||
| let seqs = seqs_clone.clone(); | ||
| let printed = printed.clone(); | ||
|
|
||
| b.iter_custom(|iters| { | ||
| let tree = TokenTree::new(); | ||
| let start = Instant::now(); | ||
| for i in 0..iters { | ||
| let tenant = &workers[i as usize % workers.len()]; | ||
| let tokens = &seqs[i as usize % seqs.len()]; | ||
| tree.insert_tokens(black_box(tokens), tenant); | ||
| } | ||
| let duration = start.elapsed(); | ||
|
|
||
| if !printed.swap(true, Ordering::Relaxed) { | ||
| let ops_per_sec = iters as f64 / duration.as_secs_f64(); | ||
| let latency_us = duration.as_nanos() as f64 / iters as f64 / 1000.0; | ||
| let throughput_mtok = (ops_per_sec * token_size as f64) / 1_000_000.0; | ||
| add_result( | ||
| "token", | ||
| format!( | ||
| "{:>3}w | {:>5} tokens | INSERT | {:>8.0} ops/s | {:>6.1} µs | {:>6.1} Mtok/s", | ||
| num_workers, token_size, ops_per_sec, latency_us, throughput_mtok | ||
| ), | ||
| ); | ||
| } | ||
|
|
||
| duration | ||
| }); | ||
| }); | ||
|
|
||
| // TokenTree MATCH | ||
| let printed = Arc::new(AtomicBool::new(false)); | ||
| let bench_name = format!("token_match_{}w_{}tok", num_workers, token_size); | ||
| let tree_clone = token_tree.clone(); | ||
| let seqs_clone = fixed_sequences.clone(); | ||
|
|
||
| group.bench_function(&bench_name, |b| { | ||
| let tree = tree_clone.clone(); | ||
| let seqs = seqs_clone.clone(); | ||
| let mut idx = 0; | ||
| let printed = printed.clone(); | ||
|
|
||
| b.iter_custom(|iters| { | ||
| let start = Instant::now(); | ||
| for _ in 0..iters { | ||
| let result = tree.prefix_match_legacy(black_box(&seqs[idx % seqs.len()])); | ||
| black_box(result); | ||
| idx += 1; | ||
| } | ||
| let duration = start.elapsed(); | ||
|
|
||
| if !printed.swap(true, Ordering::Relaxed) { | ||
| let ops_per_sec = iters as f64 / duration.as_secs_f64(); | ||
| let latency_us = duration.as_nanos() as f64 / iters as f64 / 1000.0; | ||
| let throughput_mtok = (ops_per_sec * token_size as f64) / 1_000_000.0; | ||
| add_result( | ||
| "token", | ||
| format!( | ||
| "{:>3}w | {:>5} tokens | MATCH | {:>8.0} ops/s | {:>6.1} µs | {:>6.1} Mtok/s", | ||
| num_workers, token_size, ops_per_sec, latency_us, throughput_mtok | ||
| ), | ||
| ); | ||
| } | ||
|
|
||
| duration | ||
| }); | ||
| }); | ||
| } | ||
|
|
||
| // TokenTree CONCURRENT | ||
| let printed = Arc::new(AtomicBool::new(false)); | ||
| let bench_name = format!("token_concurrent_{}w", num_workers); | ||
| let workers_clone = workers.clone(); | ||
|
|
||
| group.bench_function(&bench_name, |b| { | ||
| let printed = printed.clone(); | ||
| let workers = workers_clone.clone(); | ||
|
|
||
| b.iter_custom(|iters| { | ||
| let start = Instant::now(); | ||
| for _ in 0..iters { | ||
| let tree = Arc::new(TokenTree::new()); | ||
| let workers_ref = &workers; | ||
| let handles: Vec<_> = (0..NUM_THREADS) | ||
| .map(|t| { | ||
| let tree = Arc::clone(&tree); | ||
| let worker = workers_ref[t % workers_ref.len()].clone(); | ||
| thread::spawn(move || { | ||
| for i in 0..OPS_PER_THREAD { | ||
| // Generate deterministic token sequence | ||
| let tokens: Vec<TokenId> = (0..1024) | ||
| .map(|j| (t * 10000 + i * 100 + j) as u32) | ||
| .collect(); | ||
| if i % 3 == 0 { | ||
| tree.prefix_match_legacy(&tokens); | ||
| } else { | ||
| tree.insert_tokens(&tokens, &worker); | ||
| } | ||
| } | ||
| }) | ||
| }) | ||
| .collect(); | ||
|
|
||
| for h in handles { | ||
| h.join().unwrap(); | ||
| } | ||
| } | ||
| let duration = start.elapsed(); | ||
|
|
||
| if !printed.swap(true, Ordering::Relaxed) { | ||
| let total_ops = iters * NUM_THREADS as u64 * OPS_PER_THREAD as u64; | ||
| let ops_per_sec = total_ops as f64 / duration.as_secs_f64(); | ||
| add_result( | ||
| "token", | ||
| format!( | ||
| "{:>3}w | CONCURRENT | {:>7.0} ops/s | {} threads x {} ops", | ||
| num_workers, ops_per_sec, NUM_THREADS, OPS_PER_THREAD | ||
| ), | ||
| ); | ||
| } | ||
|
|
||
| duration | ||
| }); | ||
| }); | ||
| } | ||
|
|
||
| group.finish(); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This benchmark file contains a significant amount of duplicated code for testing the OldTree, StringTree, and TokenTree. The logic for INSERT, MATCH, and CONCURRENT benchmarks is repeated for each tree type with only minor variations.
To improve maintainability and reduce the code size, consider refactoring this into a more generic structure. You could define a trait that abstracts the tree operations needed for benchmarking and then implement it for each tree type. A generic benchmark function could then take any type that implements this trait, making it much easier to add new tree implementations or modify the benchmark logic in the future.
Checklist
Review Process
/tag-run-ci-label,/rerun-failed-ci,/tag-and-rerun-ci) or contact authorized users to do so.