Skip to content

Commit 08376f4

Browse files
alexclaude
andcommitted
use randomness for shard selection when available
Use the same approach as sync::watch: prefer thread_rng_n() for shard selection to reduce contention on the atomic counter, falling back to round-robin when the RNG is not available (loom mode or missing features). 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 54330c3 commit 08376f4

File tree

1 file changed

+25
-3
lines changed

1 file changed

+25
-3
lines changed

tokio/src/runtime/blocking/sharded_queue.rs

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,17 @@
88
//! The queue adapts to the current concurrency level by using fewer shards
99
//! when there are few threads, which improves cache locality and reduces
1010
//! lock contention on the active shards.
11+
//!
12+
//! For shard selection, we use the same approach as `sync::watch`: prefer
13+
//! randomness when available to reduce contention, falling back to circular
14+
//! access when the random number generator is not available.
1115
1216
use crate::loom::sync::{Condvar, Mutex};
1317

1418
use std::collections::VecDeque;
15-
use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};
19+
use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
20+
#[cfg(not(all(not(loom), any(feature = "macros", all(feature = "sync", feature = "rt")))))]
21+
use std::sync::atomic::Ordering::Relaxed;
1622
use std::sync::atomic::{AtomicBool, AtomicUsize};
1723
use std::time::Duration;
1824

@@ -52,6 +58,8 @@ pub(super) struct ShardedQueue {
5258
/// The shards - each with its own mutex-protected queue.
5359
shards: [Shard; NUM_SHARDS],
5460
/// Atomic counter for round-robin task distribution.
61+
/// Only used when randomness is not available (loom or missing features).
62+
#[cfg(not(all(not(loom), any(feature = "macros", all(feature = "sync", feature = "rt")))))]
5563
push_index: AtomicUsize,
5664
/// Tracks the highest shard index that has ever been pushed to.
5765
/// This allows `pop()` to skip checking shards that have never had tasks,
@@ -83,6 +91,7 @@ impl ShardedQueue {
8391
pub(super) fn new() -> Self {
8492
ShardedQueue {
8593
shards: std::array::from_fn(|_| Shard::new()),
94+
#[cfg(not(all(not(loom), any(feature = "macros", all(feature = "sync", feature = "rt")))))]
8695
push_index: AtomicUsize::new(0),
8796
max_shard_pushed: AtomicUsize::new(0),
8897
shutdown: AtomicBool::new(false),
@@ -91,14 +100,27 @@ impl ShardedQueue {
91100
}
92101
}
93102

103+
/// Select the next shard index for pushing a task -- when the RNG is
104+
/// available.
105+
#[cfg(all(not(loom), any(feature = "macros", all(feature = "sync", feature = "rt"))))]
106+
fn next_push_index(&self, num_shards: usize) -> usize {
107+
crate::runtime::context::thread_rng_n(num_shards as u32) as usize
108+
}
109+
110+
/// Select the next shard index for pushing a task -- when the RNG is not
111+
/// available.
112+
#[cfg(not(all(not(loom), any(feature = "macros", all(feature = "sync", feature = "rt")))))]
113+
fn next_push_index(&self, num_shards: usize) -> usize {
114+
self.push_index.fetch_add(1, Relaxed) & (num_shards - 1)
115+
}
116+
94117
/// Push a task to the queue.
95118
///
96119
/// `num_threads` is a hint about the current thread count, used to
97120
/// adaptively choose how many shards to distribute across.
98121
pub(super) fn push(&self, task: Task, num_threads: usize) {
99122
let num_shards = effective_shards(num_threads);
100-
let mask = num_shards - 1;
101-
let index = self.push_index.fetch_add(1, Relaxed) & mask;
123+
let index = self.next_push_index(num_shards);
102124

103125
// Update max_shard_pushed BEFORE pushing the task.
104126
// AcqRel ensures the subsequent push cannot be reordered before this.

0 commit comments

Comments
 (0)