Skip to content

Commit 06245a2

Browse files
committed
impl WakerCell to replace ArrayQueue(size=1)
for faster mpsc/spsc crossfire_bounded_100_async_n_1 mpsc/1 20.298 Melem/s +60% mpsc/2 17.767 Melem/s +67%
1 parent 0d7ee6a commit 06245a2

File tree

3 files changed

+113
-37
lines changed

3 files changed

+113
-37
lines changed

src/collections.rs

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,58 @@
11
use parking_lot::Mutex;
22
use std::collections::VecDeque;
3-
use std::sync::atomic::{AtomicBool, Ordering};
3+
use std::ptr;
4+
use std::sync::{
5+
atomic::{AtomicBool, AtomicPtr, Ordering},
6+
Arc, Weak,
7+
};
8+
9+
pub struct WeakCell<T> {
10+
ptr: AtomicPtr<T>,
11+
}
12+
13+
unsafe impl<T> Send for WeakCell<T> {}
14+
unsafe impl<T> Sync for WeakCell<T> {}
15+
16+
impl<T> WeakCell<T> {
17+
#[inline(always)]
18+
pub fn new() -> Self {
19+
Self { ptr: AtomicPtr::new(ptr::null_mut()) }
20+
}
21+
22+
#[inline(always)]
23+
pub fn exists(&self) -> bool {
24+
self.ptr.load(Ordering::Acquire) == ptr::null_mut()
25+
}
26+
27+
#[inline(always)]
28+
pub fn pop(&self) -> Option<Arc<T>> {
29+
if self.ptr.load(Ordering::Acquire) == ptr::null_mut() {
30+
return None;
31+
}
32+
let ptr = self.ptr.swap(ptr::null_mut(), Ordering::SeqCst);
33+
if ptr != ptr::null_mut() {
34+
return unsafe { Weak::from_raw(ptr) }.upgrade();
35+
} else {
36+
None
37+
}
38+
}
39+
40+
pub fn clear(&self) {
41+
let ptr = self.ptr.swap(ptr::null_mut(), Ordering::SeqCst);
42+
if ptr != ptr::null_mut() {
43+
// Convert into Weak and drop
44+
let _ = unsafe { Weak::from_raw(ptr) };
45+
}
46+
}
47+
48+
#[inline(always)]
49+
pub fn put(&self, item: Weak<T>) {
50+
let old_ptr = self.ptr.swap(item.into_raw() as *mut T, Ordering::SeqCst);
51+
if old_ptr != ptr::null_mut() {
52+
let _ = unsafe { Weak::from_raw(old_ptr) };
53+
}
54+
}
55+
}
456

557
pub struct LockedQueue<T> {
658
empty: AtomicBool,

src/locked_waker.rs

Lines changed: 48 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use crate::collections::WeakCell;
12
use std::fmt;
23
use std::sync::{
34
atomic::{AtomicBool, Ordering},
@@ -21,9 +22,7 @@ struct LockedWakerInner {
2122
seq: u64,
2223
}
2324

24-
pub struct LockedWakerRef {
25-
w: Weak<LockedWakerInner>,
26-
}
25+
pub struct LockedWakerRef(Weak<LockedWakerInner>);
2726

2827
impl fmt::Debug for LockedWakerRef {
2928
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
@@ -61,7 +60,7 @@ impl LockedWaker {
6160

6261
#[inline(always)]
6362
pub(crate) fn weak(&self) -> LockedWakerRef {
64-
LockedWakerRef { w: Arc::downgrade(&self.0) }
63+
LockedWakerRef(Arc::downgrade(&self.0))
6564
}
6665

6766
#[inline(always)]
@@ -84,7 +83,7 @@ impl LockedWaker {
8483
impl LockedWakerRef {
8584
#[inline(always)]
8685
pub(crate) fn wake(&self) -> bool {
87-
if let Some(_self) = self.w.upgrade() {
86+
if let Some(_self) = self.0.upgrade() {
8887
return LockedWaker(_self).wake();
8988
} else {
9089
return false;
@@ -93,7 +92,7 @@ impl LockedWakerRef {
9392

9493
/// return true to stop; return false to continue the search.
9594
pub(crate) fn try_to_clear(&self, seq: u64) -> bool {
96-
if let Some(w) = self.w.upgrade() {
95+
if let Some(w) = self.0.upgrade() {
9796
let waker = LockedWaker(w);
9897
let _seq = waker.get_seq();
9998
if _seq == seq {
@@ -111,17 +110,56 @@ impl LockedWakerRef {
111110
return false;
112111
}
113112

113+
#[allow(dead_code)]
114114
#[inline(always)]
115115
pub(crate) fn check_eq(&self, other: LockedWakerRef) -> bool {
116-
if self.w.ptr_eq(&other.w) {
116+
if self.0.ptr_eq(&other.0) {
117117
return true;
118118
}
119119
other.wake();
120120
false
121121
}
122122
}
123123

124-
#[test]
125-
fn test_waker() {
126-
println!("waker size {}", std::mem::size_of::<LockedWakerRef>());
124+
pub struct WakerCell(WeakCell<LockedWakerInner>);
125+
126+
impl WakerCell {
127+
#[inline(always)]
128+
pub fn new() -> Self {
129+
Self(WeakCell::new())
130+
}
131+
132+
#[inline(always)]
133+
pub fn wake(&self) -> bool {
134+
if let Some(waker) = self.0.pop() {
135+
return LockedWaker(waker).wake();
136+
}
137+
false
138+
}
139+
140+
#[inline(always)]
141+
pub fn clear(&self) {
142+
self.0.clear();
143+
}
144+
145+
#[inline(always)]
146+
pub fn put(&self, waker: LockedWakerRef) {
147+
self.0.put(waker.0);
148+
}
149+
150+
#[inline(always)]
151+
pub fn exists(&self) -> bool {
152+
self.0.exists()
153+
}
154+
}
155+
156+
#[cfg(test)]
157+
mod tests {
158+
use super::*;
159+
#[test]
160+
fn test_waker() {
161+
println!("waker size {}", std::mem::size_of::<LockedWakerRef>());
162+
println!("arc size {}", std::mem::size_of::<Arc<WakerCell>>());
163+
println!("arc size {}", std::mem::size_of::<Weak<WakerCell>>());
164+
}
127165
}

src/waker_registry.rs

Lines changed: 12 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
use crate::collections::LockedQueue;
22
use crate::locked_waker::*;
3-
use crossbeam::queue::ArrayQueue;
43
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
54
use std::task::Context;
65

@@ -64,13 +63,13 @@ impl RegistryTrait for RegistryDummy {
6463
}
6564

6665
pub struct RegistrySingle {
67-
queue: ArrayQueue<LockedWakerRef>,
66+
cell: WakerCell,
6867
}
6968

7069
impl RegistrySingle {
7170
#[inline(always)]
7271
pub fn new() -> Registry {
73-
Registry::Single(Self { queue: ArrayQueue::new(1) })
72+
Registry::Single(Self { cell: WakerCell::new() })
7473
}
7574
}
7675

@@ -79,42 +78,25 @@ impl RegistryTrait for RegistrySingle {
7978
#[inline(always)]
8079
fn reg_async(&self, ctx: &mut Context) -> LockedWaker {
8180
let waker = LockedWaker::new(ctx, 0);
82-
match self.queue.push(waker.weak()) {
83-
Ok(_) => {}
84-
Err(_weak) => {
85-
if let Some(old_waker) = self.queue.pop() {
86-
_weak.check_eq(old_waker);
87-
self.queue.push(_weak).expect("Do not misuse mpsc as mpmc");
88-
} else {
89-
self.queue.push(_weak).expect("Do not misuse mpsc as mpmc");
90-
}
91-
}
92-
}
81+
self.cell.put(waker.weak());
9382
waker
9483
}
9584

9685
#[inline(always)]
97-
fn cancel_waker(&self, waker: LockedWaker) {
86+
fn cancel_waker(&self, _waker: LockedWaker) {
9887
// Got to be it, because only one single thread.
99-
if let Some(waker_ref) = self.queue.pop() {
100-
// protect miss-use of multi thread
101-
waker.weak().check_eq(waker_ref);
102-
}
88+
self.cell.clear();
10389
}
10490

10591
#[inline(always)]
10692
fn clear_wakers(&self, _seq: u64) {
10793
// Got to be it, because only one single thread.
108-
if let Some(_waker) = self.queue.pop() {
109-
_waker.wake();
110-
}
94+
self.cell.clear();
11195
}
11296

11397
#[inline(always)]
11498
fn fire(&self) {
115-
if let Some(waker) = self.queue.pop() {
116-
waker.wake();
117-
}
99+
self.cell.wake();
118100
}
119101

120102
#[inline(always)]
@@ -125,7 +107,11 @@ impl RegistryTrait for RegistrySingle {
125107
/// return waker queue size
126108
#[inline(always)]
127109
fn get_size(&self) -> usize {
128-
self.queue.len()
110+
if self.cell.exists() {
111+
1
112+
} else {
113+
0
114+
}
129115
}
130116
}
131117

0 commit comments

Comments
 (0)