Skip to content

Commit 7e6d730

Browse files
committed
Add OneMpsc
1 parent f81369d commit 7e6d730

File tree

3 files changed

+267
-2
lines changed

3 files changed

+267
-2
lines changed

src/flavor/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ mod list;
99
pub use list::*;
1010
mod one;
1111
pub use one::*;
12+
mod one_mpsc;
13+
pub use one_mpsc::OneMpsc;
1214
mod one_spmc;
1315
pub use one_spmc::{OneSpmc, OneSpsc};
1416

src/flavor/one_mpsc.rs

Lines changed: 263 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,263 @@
1+
use super::{FlavorImpl, FlavorNew, FlavorSelect, Queue, Token};
2+
use crate::backoff::*;
3+
use core::cell::UnsafeCell;
4+
use core::mem::{needs_drop, MaybeUninit};
5+
use core::ptr;
6+
use core::sync::atomic::{
7+
AtomicU16, AtomicU32,
8+
Ordering::{self, Acquire, Relaxed, Release, SeqCst},
9+
};
10+
use crossbeam_utils::CachePadded;
11+
12+
/// A simplify ArrayQueue specialized for size=1
13+
pub struct OneMpsc<T> {
14+
pos: CachePadded<AtomicU32>,
15+
16+
/// The value in this slot.
17+
slots: [Slot<T>; 2],
18+
}
19+
20+
unsafe impl<T: Send> Sync for OneMpsc<T> {}
21+
unsafe impl<T: Send> Send for OneMpsc<T> {}
22+
23+
impl<T: Send + Unpin + 'static> Queue for OneMpsc<T> {
24+
type Item = T;
25+
26+
#[inline(always)]
27+
fn pop(&self) -> Option<T> {
28+
self._pop(Ordering::SeqCst)
29+
}
30+
31+
#[inline(always)]
32+
fn push(&self, item: T) -> Result<(), T> {
33+
let _item = MaybeUninit::new(item);
34+
if unsafe { self._try_push(SeqCst, _item.as_ptr(), Acquire).is_ok() } {
35+
Ok(())
36+
} else {
37+
Err(unsafe { _item.assume_init_read() })
38+
}
39+
}
40+
41+
#[inline(always)]
42+
fn len(&self) -> usize {
43+
if self.is_full() {
44+
1
45+
} else {
46+
0
47+
}
48+
}
49+
50+
#[inline(always)]
51+
fn capacity(&self) -> Option<usize> {
52+
Some(1)
53+
}
54+
55+
#[inline(always)]
56+
fn is_full(&self) -> bool {
57+
!self.is_empty()
58+
}
59+
60+
#[inline(always)]
61+
fn is_empty(&self) -> bool {
62+
let pos = self.pos.load(SeqCst);
63+
let (head, tail) = Self::unpack(pos);
64+
head == tail
65+
}
66+
}
67+
68+
impl<T> OneMpsc<T> {
69+
#[inline]
70+
pub fn new() -> Self {
71+
Self { pos: CachePadded::new(AtomicU32::new(0)), slots: [Slot::init(0), Slot::init(1)] }
72+
}
73+
74+
#[inline(always)]
75+
fn unpack(pos: u32) -> (u16, u16) {
76+
let head = (pos >> 16) as u16;
77+
let tail = pos as u16;
78+
(head, tail)
79+
}
80+
81+
#[inline(always)]
82+
fn pack(head: u16, tail: u16) -> u32 {
83+
((head as u32) << 16) | (tail as u32)
84+
}
85+
86+
/// return Ok(true) on ok, Ok(false) on full, Err(()) to spin
87+
#[inline(always)]
88+
unsafe fn _try_push(
89+
&self, order: Ordering, value: *const T, failure: Ordering,
90+
) -> Result<(), ()> {
91+
let mut pos = self.pos.load(order);
92+
loop {
93+
let (head, tail) = Self::unpack(pos);
94+
if head == tail {
95+
let new_pos = Self::pack(head, tail.wrapping_add(1));
96+
match self.pos.compare_exchange_weak(pos, new_pos, SeqCst, failure) {
97+
Ok(_) => {
98+
let index = tail & 0x1;
99+
self.slots[index as usize].write(tail, value);
100+
return Ok(());
101+
}
102+
Err(_pos) => {
103+
pos = _pos;
104+
}
105+
}
106+
} else {
107+
return Err(());
108+
}
109+
}
110+
}
111+
112+
#[inline(always)]
113+
fn _start_read(&self, order: Ordering) -> Option<(u16, u16)> {
114+
let pos = self.pos.load(order);
115+
let (head, tail) = Self::unpack(pos);
116+
if head == tail {
117+
return None;
118+
}
119+
let next_head = head.wrapping_add(1);
120+
let index = head & 0x1;
121+
return Some((index, next_head));
122+
}
123+
124+
#[inline(always)]
125+
fn _read(&self, slot: &Slot<T>, next_head: u16) -> T {
126+
let new_pos = Self::pack(next_head, next_head);
127+
// Because we have two slot, the sender will write to next index,
128+
// it's safe to update the pos before we read, so that sender may begin to write
129+
self.pos.store(new_pos, SeqCst);
130+
let msg = slot.read(next_head);
131+
msg
132+
}
133+
134+
#[inline(always)]
135+
fn _pop(&self, order: Ordering) -> Option<T> {
136+
if let Some((index, new_head)) = self._start_read(order) {
137+
Some(self._read(&self.slots[index as usize], new_head))
138+
} else {
139+
None
140+
}
141+
}
142+
}
143+
144+
struct Slot<T> {
145+
value: UnsafeCell<MaybeUninit<T>>,
146+
stamp: CachePadded<AtomicU16>,
147+
}
148+
149+
impl<T> Slot<T> {
150+
#[inline]
151+
fn init(i: u16) -> Self {
152+
Self {
153+
value: UnsafeCell::new(MaybeUninit::uninit()),
154+
stamp: CachePadded::new(AtomicU16::new(i)),
155+
}
156+
}
157+
158+
#[inline(always)]
159+
fn write(&self, tail: u16, value: *const T) {
160+
unsafe { (*self.value.get()).write(ptr::read(value)) };
161+
self.stamp.store(tail.wrapping_add(1), Release);
162+
}
163+
164+
#[inline(always)]
165+
fn read(&self, head: u16) -> T {
166+
let mut stamp = self.stamp.load(Acquire);
167+
if stamp != head {
168+
let mut backoff = Backoff::new();
169+
loop {
170+
backoff.snooze();
171+
stamp = self.stamp.load(Acquire);
172+
if stamp == head {
173+
break;
174+
}
175+
}
176+
}
177+
let msg = unsafe { self.value.get().read().assume_init() };
178+
msg
179+
}
180+
181+
#[inline(always)]
182+
fn drop(&self) {
183+
unsafe { self.value.get().read().assume_init_drop() };
184+
}
185+
}
186+
187+
impl<T> Drop for OneMpsc<T> {
188+
#[inline(always)]
189+
fn drop(&mut self) {
190+
if needs_drop::<T>() {
191+
let pos = *self.pos.get_mut();
192+
let (head, tail) = Self::unpack(pos);
193+
if head != tail {
194+
let index = head & 0x1;
195+
self.slots[index as usize].drop();
196+
}
197+
}
198+
}
199+
}
200+
201+
impl<T: Send + 'static + Unpin> FlavorImpl for OneMpsc<T> {
202+
#[inline(always)]
203+
fn try_send(&self, item: &MaybeUninit<T>) -> bool {
204+
// Will always double-check with is_full or try_send_oneshot()
205+
unsafe { self._try_push(Acquire, item.as_ptr(), Acquire).is_ok() }
206+
}
207+
208+
#[inline(always)]
209+
fn try_send_oneshot(&self, item: *const T) -> Option<bool> {
210+
Some(unsafe { self._try_push(SeqCst, item, Acquire).is_ok() })
211+
}
212+
213+
#[inline(always)]
214+
fn try_recv(&self) -> Option<T> {
215+
self._pop(Acquire)
216+
}
217+
218+
#[inline(always)]
219+
fn try_recv_final(&self) -> Option<T> {
220+
self._pop(SeqCst)
221+
}
222+
223+
#[inline]
224+
fn backoff_limit(&self) -> u16 {
225+
// Due to bound is too small,
226+
// yield with MAX_LIMIT to prevent collapse in high contention
227+
crate::backoff::MAX_LIMIT
228+
}
229+
230+
#[inline]
231+
fn may_direct_copy(&self) -> bool {
232+
true
233+
}
234+
}
235+
236+
impl<T> FlavorNew for OneMpsc<T> {
237+
#[inline]
238+
fn new() -> Self {
239+
OneMpsc::new()
240+
}
241+
}
242+
243+
impl<T: Send + Unpin + 'static> FlavorSelect for OneMpsc<T> {
244+
#[inline]
245+
fn try_select(&self, final_check: bool) -> Option<Token> {
246+
if let Some((index, head)) =
247+
self._start_read(if final_check { Ordering::SeqCst } else { Ordering::Acquire })
248+
{
249+
Some(Token::new(
250+
&self.slots[index as usize] as *const Slot<T> as *const u8,
251+
head as usize,
252+
))
253+
} else {
254+
None
255+
}
256+
}
257+
258+
#[inline(always)]
259+
fn read_with_token(&self, token: Token) -> T {
260+
let slot: &Slot<T> = unsafe { &*token.pos.cast::<Slot<T>>() };
261+
self._read(slot, token.stamp as u16)
262+
}
263+
}

src/mpsc.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,14 +57,14 @@ pub type One<T> = FlavorWrap<crate::flavor::One<T>, RegistryMultiSend<T>, Regist
5757
/// Flavor Type alias for bounded MPSC channel wrapped with specified One impl
5858
pub enum Array<T> {
5959
Array(crate::flavor::ArrayMpsc<T>),
60-
One(crate::flavor::One<T>),
60+
One(crate::flavor::OneMpsc<T>),
6161
}
6262

6363
impl<T: Send + Unpin + 'static> Array<T> {
6464
#[inline]
6565
pub fn new(size: usize) -> Self {
6666
if size <= 1 {
67-
Self::One(crate::flavor::One::new())
67+
Self::One(crate::flavor::OneMpsc::new())
6868
} else {
6969
Self::Array(crate::flavor::ArrayMpsc::<T>::new(size))
7070
}

0 commit comments

Comments
 (0)