Skip to content

Commit e0107cc

Browse files
committed
remove adaptive behavior, it introduced a lot of complexity and the gains weren't tremendous
1 parent 1f53f22 commit e0107cc

File tree

2 files changed

+13
-52
lines changed

2 files changed

+13
-52
lines changed

tokio/src/runtime/blocking/pool.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -399,17 +399,14 @@ impl Spawner {
399399
return Err(SpawnError::ShuttingDown);
400400
}
401401

402-
// Get thread count for adaptive sharding
403-
let num_threads = self.inner.metrics.num_threads();
404-
405-
// Push to the sharded queue - uses adaptive shard count based on thread count
406-
self.inner.queue.push(task, num_threads);
402+
// Push to the sharded queue
403+
self.inner.queue.push(task);
407404
self.inner.metrics.inc_queue_depth();
408405

409406
// Check if we need to spawn a new thread or notify an idle one
410407
if self.inner.metrics.num_idle_threads() == 0 {
411408
// No idle threads - might need to spawn one
412-
if num_threads < self.inner.thread_cap {
409+
if self.inner.metrics.num_threads() < self.inner.thread_cap {
413410
// Try to spawn a new thread
414411
let mut shared = self.inner.shared.lock();
415412

tokio/src/runtime/blocking/sharded_queue.rs

Lines changed: 10 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,6 @@
55
//! The push operations use per-shard locking, while notifications use a global
66
//! condvar for simplicity.
77
//!
8-
//! The queue adapts to the current concurrency level by using fewer shards
9-
//! when there are few threads, which improves cache locality and reduces
10-
//! lock contention on the active shards.
11-
//!
128
//! For shard selection, we use the same approach as `sync::watch`: prefer
139
//! randomness when available to reduce contention, falling back to circular
1410
//! access when the random number generator is not available.
@@ -18,8 +14,10 @@ use crate::loom::sync::{Condvar, Mutex};
1814
use std::collections::VecDeque;
1915
#[cfg(loom)]
2016
use std::sync::atomic::Ordering::Relaxed;
21-
use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
22-
use std::sync::atomic::{AtomicBool, AtomicUsize};
17+
use std::sync::atomic::Ordering::{Acquire, Release};
18+
use std::sync::atomic::AtomicBool;
19+
#[cfg(loom)]
20+
use std::sync::atomic::AtomicUsize;
2321
use std::time::Duration;
2422

2523
use super::pool::Task;
@@ -91,11 +89,6 @@ pub(super) struct ShardedQueue {
9189
/// Only used when randomness is not available (loom).
9290
#[cfg(loom)]
9391
push_index: AtomicUsize,
94-
/// Tracks the highest shard index that has ever been pushed to.
95-
/// This allows `pop()` to skip checking shards that have never had tasks,
96-
/// which is important for maintaining low overhead at low concurrency.
97-
/// Only increases, never decreases (even when shards become empty).
98-
max_shard_pushed: AtomicUsize,
9992
/// Global shutdown flag.
10093
shutdown: AtomicBool,
10194
/// Global condition variable for worker notifications.
@@ -105,25 +98,12 @@ pub(super) struct ShardedQueue {
10598
condvar_mutex: Mutex<()>,
10699
}
107100

108-
/// Calculate the effective number of shards to use based on thread count.
109-
/// Uses fewer shards at low concurrency for better cache locality.
110-
#[inline]
111-
fn effective_shards(num_threads: usize) -> usize {
112-
match num_threads {
113-
0..=2 => 2,
114-
3..=4 => 4,
115-
5..=8 => 8,
116-
_ => NUM_SHARDS,
117-
}
118-
}
119-
120101
impl ShardedQueue {
121102
pub(super) fn new() -> Self {
122103
ShardedQueue {
123104
shards: std::array::from_fn(|_| Shard::new()),
124105
#[cfg(loom)]
125106
push_index: AtomicUsize::new(0),
126-
max_shard_pushed: AtomicUsize::new(0),
127107
shutdown: AtomicBool::new(false),
128108
condvar: Condvar::new(),
129109
condvar_mutex: Mutex::new(()),
@@ -145,17 +125,8 @@ impl ShardedQueue {
145125
}
146126

147127
/// Push a task to the queue.
148-
///
149-
/// `num_threads` is a hint about the current thread count, used to
150-
/// adaptively choose how many shards to distribute across.
151-
pub(super) fn push(&self, task: Task, num_threads: usize) {
152-
let num_shards = effective_shards(num_threads);
153-
let index = self.next_push_index(num_shards);
154-
155-
// Update max_shard_pushed BEFORE pushing the task.
156-
// AcqRel ensures the subsequent push cannot be reordered before this.
157-
self.max_shard_pushed.fetch_max(index, AcqRel);
158-
128+
pub(super) fn push(&self, task: Task) {
129+
let index = self.next_push_index(NUM_SHARDS);
159130
self.shards[index].push(task);
160131
}
161132

@@ -170,18 +141,11 @@ impl ShardedQueue {
170141
}
171142

172143
/// Try to pop a task, checking the preferred shard first, then others.
173-
///
174-
/// Only checks shards up to `max_shard_pushed` since tasks can only exist
175-
/// in shards that have been pushed to.
176144
pub(super) fn pop(&self, preferred_shard: usize) -> Option<Task> {
177-
// Only check shards that have ever had tasks pushed to them
178-
let max_shard = self.max_shard_pushed.load(Acquire);
179-
let num_shards_to_check = max_shard + 1;
180-
181-
// Check shards starting from preferred, wrapping within active range
182-
let start = preferred_shard % num_shards_to_check;
183-
for i in 0..num_shards_to_check {
184-
let index = (start + i) % num_shards_to_check;
145+
// Check shards starting from preferred, wrapping around
146+
let start = preferred_shard % NUM_SHARDS;
147+
for i in 0..NUM_SHARDS {
148+
let index = (start + i) % NUM_SHARDS;
185149
if let Some(task) = self.shards[index].pop() {
186150
return Some(task);
187151
}

0 commit comments

Comments
 (0)