-
-
Notifications
You must be signed in to change notification settings - Fork 2.9k
rt: improve spawn_blocking scalability with sharded queue #7757
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
rt: improve spawn_blocking scalability with sharded queue #7757
Conversation
9537dda to
016f6ca
Compare
|
(FreeBSD failures look unrelated.) |
f4416fb to
21ff5ce
Compare
|
Please rebase to latest |
21ff5ce to
694fa6b
Compare
694fa6b to
edd5e10
Compare
edd5e10 to
126cb78
Compare
| // Update max_shard_pushed BEFORE pushing the task. | ||
| self.max_shard_pushed.fetch_max(index, Release); | ||
|
|
||
| self.shards[index].push(task); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the Release ordering, compiler might reorder the self.shards[index].push(task) and fetch_max, which means that the .push(task) might be sequenced-before the fetch_max.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you're right. (I hate atomic orderings :-/) AcqRel I think is what we want.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is still wrong. Consider the following scenario:
Thread A Thread B Thread C
preferred_shard = 0
preferred_shard = 1
max_shard_pushed = 0
shards[0].push(_)
condvar.notify_one()
wakes up ...
max_shard = 0
max_shard_pushed = 1
shards[1].push(_)
condvar.notify_one()
wakes up ...
max_shard = 1
shards[0].pop() = Some
shards[0].pop() = None
In this case, Thread B does not check shards[1] because it read max_shard with the value of zero. This means that two tasks were spawned, but only one gets picked up.
Well, I guess in principle Thread C will see the second task after it finishes executing the first one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I think it's always the case that if a push happens concurrently with a pop that the pop might miss it, and we'll have to "fall back" to catching it in the wait_for_task loop.
I think in principle we could address this one by reloading max_shard_pushed, but of course you can still have a race condition.
In the scenario you've got here, what would happen is that after thread B returns None from pop, it'll go wait_for_task and then the task will get picked up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I think it's always the case that if a push happens concurrently with a pop that the pop might miss it, and we'll have to "fall back" to catching it in the
wait_for_taskloop.
Your notify_one() call ensures that for every push(), there will be a subsequent call to pop() that is not concurrent and hence guaranteed to see the pushed message. So there's at least one thread that's guaranteed to pick up each message.
If we imagine that the max_shard_pushed logic was removed, then Thread B would in fact be guaranteed to see the message in shards[1].
shards[1].push(_)on A happens-beforeshards[0].pop() = Someon thread C because thread C is the thread waken up by the secondnotify_one()call.shards[0].pop() = Someon thread C happens-beforeshards[0].pop() = Noneon thread B, since otherwise thread B would have gottenSomewhen callingpop().- After
shards[0].pop() = None, thread B would attempt to callshards[1].pop()
So by this logic, the shards[1].pop() call would in fact happen after shards[1].push(_), and is hence guaranteed to see the message that was pushed.
855e269 to
046d5c6
Compare
| // Acquire the condvar mutex before waiting | ||
| let guard = self.condvar_mutex.lock(); | ||
|
|
||
| // Double-check shutdown and tasks after acquiring lock, as state may | ||
| // have changed while we were waiting for the lock | ||
| if self.is_shutdown() { | ||
| return WaitResult::Shutdown; | ||
| } | ||
| if let Some(task) = self.pop(preferred_shard) { | ||
| return WaitResult::Task(task); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here, we attempt to acquire the shard lock while holding the condvar lock, this is the nested locking pattern. In general, we should avoid this pattern as it is error-prone.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, is there a preferred pattern to avoid the nested locking?
In this case we want to ensure that when we wait for a notification, there wasn't already a task that's made pending concurrently.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nested locking is fine as long as locks are always taken in the same order.
|
(I don't think the netlify failures are related) |
81d7f25 to
54330c3
Compare
ADD-SP
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the late review. I'd like to take some time to think about the nested locking issue. We may have a better choice, or not.
|
No problem -- rebase was to pick up a fix for the netlify failures. Let me know if there's other experiments it'd be useful for me to try. |
| /// Calculate the effective number of shards to use based on thread count. | ||
| /// Uses fewer shards at low concurrency for better cache locality. | ||
| #[inline] | ||
| fn effective_shards(num_threads: usize) -> usize { | ||
| match num_threads { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic seems error-prone and likely to lead to missed tasks. Does it actually matter for your benchmark?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It matters at N_THREADS=1 -- I don't personally care about that case at all, if we're ok with a small pessimization there (10% iirc?), I'd be delighted to delete this max shard logic and just always use a fixed number.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this sound ok to you? I'd love to delete it because its responsible for a lot of the complexity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've gone ahead and dropped this behavior.
08376f4 to
3fd9e25
Compare
The blocking pool's task queue was protected by a single mutex, causing severe contention when many threads spawn blocking tasks concurrently. This resulted in nearly linear degradation: 16 concurrent threads took ~18x longer than a single thread. Replace the single-mutex queue with a sharded queue that distributes tasks across 16 lock-protected shards. The implementation adapts to concurrency levels by using fewer shards when thread count is low, maintaining cache locality while avoiding contention at scale. Benchmark results (spawning 100 batches of 16 tasks per thread): | Concurrency | Before | After | Improvement | |-------------|----------|---------|-------------| | 1 thread | 13.3ms | 17.8ms | +34% | | 2 threads | 26.0ms | 20.1ms | -23% | | 4 threads | 45.4ms | 27.5ms | -39% | | 8 threads | 111.5ms | 20.3ms | -82% | | 16 threads | 247.8ms | 22.4ms | -91% | The slight overhead at 1 thread is due to the sharded infrastructure, but this is acceptable given the dramatic improvement at higher concurrency where the original design suffered from lock contention.
🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
3fd9e25 to
a7c341e
Compare
Use the same approach as sync::watch: prefer thread_rng_n() for shard selection to reduce contention on the atomic counter, falling back to round-robin when the RNG is not available (loom mode or missing features). 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
a7c341e to
fa4e0a6
Compare
25439f8 to
98e0071
Compare
this allows readers to proceed concurrently
…ains weren't tremendous
e0107cc to
502a6d8
Compare
The blocking pool's task queue was protected by a single mutex, causing severe contention when many threads spawn blocking tasks concurrently. This resulted in nearly linear degradation: 16 concurrent threads took ~18x longer than a single thread.
Replace the single-mutex queue with a sharded queue that distributes tasks across 16 lock-protected shards. The implementation adapts to concurrency levels by using fewer shards when thread count is low, maintaining cache locality while avoiding contention at scale.
Benchmark results (spawning 100 batches of 16 tasks per thread):
The slight overhead at 1 thread is due to the sharded infrastructure, but this is acceptable given the dramatic improvement at higher concurrency where the original design suffered from lock contention.
(Notwithstanding that this shows as a commit from claude, every line is human reviewed. If there's a mistake, it's Alex's fault.)
Closes #2528.