Skip to content

Commit 3d50312

Browse files
committed
Add array queue
1 parent 6d28d1b commit 3d50312

File tree

8 files changed

+559
-23
lines changed

8 files changed

+559
-23
lines changed

crossbeam-circbuf/src/buffer.rs

+30-13
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,14 @@ use std::mem;
66
/// A slot in buffer.
77
#[derive(Debug)]
88
pub struct Slot<T> {
9+
/// The index of this slot's data.
10+
///
11+
/// An index consists of an offset into the buffer and a lap, but packed into a single `usize`.
12+
/// The lower bits represent the offset, while the upper bits represent the lap. For example if
13+
/// the buffer capacity is 4, then index 6 represents offset 2 in lap 1.
914
index: AtomicUsize,
15+
16+
/// The value in this slot.
1017
data: UnsafeCell<mem::ManuallyDrop<T>>,
1118
}
1219

@@ -16,6 +23,11 @@ pub struct Slot<T> {
1623
#[derive(Debug)]
1724
pub struct Buffer<T> {
1825
/// Pointer to the allocated memory.
26+
///
27+
/// The allocated memory exhibit data races by `read`, `write`, `read_index`, and `write_index`,
28+
/// which technically invoke undefined behavior. We should be using relaxed accesses, but that
29+
/// would cost too much performance. Hence, as a HACK, we use volatile accesses instead.
30+
/// Experimental evidence shows that this works.
1931
ptr: *mut Slot<T>,
2032

2133
/// Capacity of the buffer. Always a power of two.
@@ -68,10 +80,6 @@ impl<T> Buffer<T> {
6880
/// Reads a value from the specified `index`.
6981
///
7082
/// Returns `Some(v)` if `v` is at `index`; or `None` if there's no valid value for `index`.
71-
///
72-
/// Using this concurrently with a `write` is technically speaking UB due to data races. We
73-
/// should be using relaxed accesses, but that would cost too much performance. Hence, as a
74-
/// HACK, we use volatile accesses instead. Experimental evidence shows that this works.
7583
pub unsafe fn read(&self, index: usize) -> Option<mem::ManuallyDrop<T>> {
7684
let slot = self.at(index);
7785

@@ -90,10 +98,6 @@ impl<T> Buffer<T> {
9098
/// Reads a value from the specified `index` without checking the index.
9199
///
92100
/// Returns the value at `index` regardless or whether it's valid or not.
93-
///
94-
/// Using this concurrently with a `write` is technically speaking UB due to data races. We
95-
/// should be using relaxed accesses, but that would cost too much performance. Hence, as a
96-
/// HACK, we use volatile accesses instead. Experimental evidence shows that this works.
97101
pub unsafe fn read_unchecked(&self, index: usize) -> mem::ManuallyDrop<T> {
98102
let slot = self.at(index);
99103

@@ -102,11 +106,6 @@ impl<T> Buffer<T> {
102106
}
103107

104108
/// Writes `value` into the specified `index`.
105-
///
106-
/// Using this concurrently with another `read` or `write` is technically
107-
/// speaking UB due to data races. We should be using relaxed accesses, but
108-
/// that would cost too much performance. Hence, as a HACK, we use volatile
109-
/// accesses instead. Experimental evidence shows that this works.
110109
pub unsafe fn write(&self, index: usize, value: T) {
111110
let slot = self.at(index);
112111

@@ -116,4 +115,22 @@ impl<T> Buffer<T> {
116115
// Writes the index with `Release`.
117116
(*slot).index.store(index, Ordering::Release);
118117
}
118+
119+
/// Reads the index from the specified slot.
120+
pub unsafe fn read_index(&self, offset: usize, ord: Ordering) -> usize {
121+
let slot = self.at(offset);
122+
(*slot).index.load(ord)
123+
}
124+
125+
/// Writes the specified `index` in the slot.
126+
pub unsafe fn write_index(&self, index: usize, ord: Ordering) {
127+
let slot = self.at(index);
128+
(*slot).index.store(index, ord);
129+
}
130+
131+
/// Reads the value from the specified slot.
132+
pub unsafe fn read_value(&self, offset: usize) -> mem::ManuallyDrop<T> {
133+
let slot = self.at(offset);
134+
(*slot).data.get().read_volatile()
135+
}
119136
}

crossbeam-circbuf/src/lib.rs

+3
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,12 @@ mod buffer;
1414

1515
#[doc(hidden)] // for doc-tests
1616
pub mod sp;
17+
#[doc(hidden)] // for doc-tests
18+
pub mod mp;
1719

1820
pub use sp::mc as spmc;
1921
pub use sp::sc as spsc;
22+
pub use mp::mc as mpmc;
2023

2124
/// The return type for `try_recv` methods.
2225
#[derive(Debug, Clone, Copy, PartialEq, Eq)]

crossbeam-circbuf/src/mp/mc.rs

+270
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,270 @@
1+
//! Concurrent multiple-producer multiple-consumer queues based on circular buffer.
2+
3+
/// Bounded MPMC queue based on fixed-sized concurrent circular buffer.
4+
///
5+
/// The implementation is based on Dmitry Vyukov's bounded MPMC queue:
6+
///
7+
/// * http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
8+
/// * https://docs.google.com/document/d/1yIAYmbvL3JxOKOjuCyon7JhW4cSv1wy5hC0ApeGMV9s/pub
9+
///
10+
/// # Examples
11+
///
12+
/// ```
13+
/// use crossbeam_circbuf::mpmc::bounded::{Queue, TryRecv};
14+
/// use std::thread;
15+
/// use std::sync::Arc;
16+
///
17+
/// let q = Arc::new(Queue::<char>::new(16));
18+
/// let r = q.clone();
19+
///
20+
/// q.send('a').unwrap();
21+
/// r.send('b').unwrap();
22+
///
23+
/// assert_eq!(q.recv(), Some('a'));
24+
/// assert_eq!(r.recv(), Some('b'));
25+
/// ```
26+
pub mod bounded {
27+
use std::sync::atomic::Ordering;
28+
use std::sync::atomic::AtomicUsize;
29+
use std::marker::PhantomData;
30+
use std::mem::ManuallyDrop;
31+
use utils::CachePadded;
32+
33+
use buffer::Buffer;
34+
pub use TryRecv;
35+
36+
/// A bounded MPMC queue.
37+
#[derive(Debug)]
38+
pub struct Queue<T> {
39+
/// The head index from which values are popped.
40+
///
41+
/// The lap of the head index is always an odd number.
42+
head: CachePadded<AtomicUsize>,
43+
44+
/// The tail index into which values are pushed.
45+
///
46+
/// The lap in the tail is always an even number.
47+
tail: CachePadded<AtomicUsize>,
48+
49+
/// The underlying buffer.
50+
buffer: Buffer<T>,
51+
52+
/// The capacity of the queue.
53+
cap: usize,
54+
55+
/// Indicates that dropping an `ArrayQueue<T>` may drop values of type `T`.
56+
_marker: PhantomData<T>,
57+
}
58+
59+
unsafe impl<T: Send> Sync for Queue<T> {}
60+
unsafe impl<T: Send> Send for Queue<T> {}
61+
62+
impl<T> Queue<T> {
63+
/// Creates a new queue of capacity at least `cap`.
64+
pub fn new(cap: usize) -> Self {
65+
// One lap is the smallest power of two greater than or equal to `cap`.
66+
let lap = cap.next_power_of_two();
67+
68+
// Head is initialized to `{ lap: 1, offset: 0 }`.
69+
// Tail is initialized to `{ lap: 0, offset: 0 }`.
70+
let head = lap;
71+
let tail = 0;
72+
73+
// Allocate a buffer of `lap` slots.
74+
let buffer = Buffer::new(lap);
75+
76+
// Initialize stamps in the slots.
77+
for i in 0..lap {
78+
unsafe {
79+
// Set the index to `{ lap: 0, offset: i }`.
80+
buffer.write_index(i, Ordering::Relaxed);
81+
}
82+
}
83+
84+
Self {
85+
head: CachePadded::new(AtomicUsize::new(head)),
86+
tail: CachePadded::new(AtomicUsize::new(tail)),
87+
buffer,
88+
cap,
89+
_marker: PhantomData,
90+
}
91+
}
92+
93+
/// Returns the size of one lap.
94+
#[inline]
95+
pub fn lap(&self) -> usize {
96+
self.buffer.cap()
97+
}
98+
99+
/// Returns the capacity of the queue.
100+
#[inline]
101+
pub fn cap(&self) -> usize {
102+
self.cap
103+
}
104+
105+
/// Attempts to send a value to the queue.
106+
pub fn send(&self, value: T) -> Result<(), T> {
107+
loop {
108+
// Loads the tail and deconstruct it.
109+
let tail = self.tail.load(Ordering::Acquire);
110+
let offset = tail & (self.lap() - 1);
111+
let lap = tail & !(self.lap() - 1);
112+
113+
// Inspects the corresponding slot.
114+
let index = unsafe { self.buffer.read_index(offset, Ordering::Acquire) };
115+
116+
// If the tail and the stamp match, we may attempt to push.
117+
if tail == index {
118+
let new_tail = if offset + 1 < self.cap() {
119+
// Same lap, incremented index.
120+
// Set to `{ lap: lap, offset: offset + 1 }`.
121+
tail + 1
122+
} else {
123+
// Two laps forward, index wraps around to zero.
124+
// Set to `{ lap: lap.wrapping_add(2), offset: 0 }`.
125+
lap.wrapping_add(self.lap().wrapping_mul(2))
126+
};
127+
128+
// Tries moving the tail.
129+
if self
130+
.tail
131+
.compare_exchange_weak(tail, new_tail, Ordering::AcqRel, Ordering::Relaxed)
132+
.is_ok()
133+
{
134+
// Writes the value into the slot and update the stamp.
135+
unsafe { self.buffer.write(index.wrapping_add(self.lap()), value) };
136+
return Ok(());
137+
}
138+
// But if the slot lags one lap behind the tail...
139+
} else if index.wrapping_add(self.lap()) == tail {
140+
let head = self.head.load(Ordering::Acquire);
141+
142+
// ...and if the head lags one lap behind the tail as well...
143+
if head.wrapping_add(self.lap()) == tail {
144+
// ...then the queue is full.
145+
return Err(value);
146+
}
147+
}
148+
}
149+
}
150+
151+
/// Attempts to pop a value from the queue.
152+
pub fn recv(&self) -> Option<T> {
153+
loop {
154+
// Loads the head and deconstruct it.
155+
let head = self.head.load(Ordering::Acquire);
156+
let offset = head & (self.lap() - 1);
157+
let lap = head & !(self.lap() - 1);
158+
159+
// Inspects the corresponding slot.
160+
let index = unsafe { self.buffer.read_index(offset, Ordering::Acquire) };
161+
162+
// If the the head and the stamp match, we may attempt to pop.
163+
if head == index {
164+
let new = if offset + 1 < self.cap() {
165+
// Same lap, incremented index.
166+
// Set to `{ lap: lap, offset: offset + 1 }`.
167+
head + 1
168+
} else {
169+
// Two laps forward, index wraps around to zero.
170+
// Set to `{ lap: lap.wrapping_add(2), offset: 0 }`.
171+
lap.wrapping_add(self.lap().wrapping_mul(2))
172+
};
173+
174+
// Tries moving the head.
175+
if self
176+
.head
177+
.compare_exchange_weak(head, new, Ordering::AcqRel, Ordering::Relaxed)
178+
.is_ok()
179+
{
180+
// Reads the value from the slot and update the stamp.
181+
let value = unsafe { self.buffer.read_value(index) };
182+
unsafe { self.buffer.write_index(index.wrapping_add(self.lap()), Ordering::Release) };
183+
return Some(ManuallyDrop::into_inner(value));
184+
}
185+
// But if the slot lags one lap behind the head...
186+
} else if index.wrapping_add(self.lap()) == head {
187+
let tail = self.tail.load(Ordering::Acquire);
188+
189+
// ...and if the tail lags one lap behind the head as well, that means the queue
190+
// is empty.
191+
if tail.wrapping_add(self.lap()) == head {
192+
return None;
193+
}
194+
}
195+
}
196+
}
197+
198+
/// Returns `true` if the queue is empty.
199+
///
200+
/// Inaccurate in the presence of concurrent method invocations.
201+
pub fn is_empty(&self) -> bool {
202+
let head = self.head.load(Ordering::Relaxed);
203+
let tail = self.tail.load(Ordering::Relaxed);
204+
205+
// Is the tail lagging one lap behind head?
206+
//
207+
// Note: If the head changes just before we load the tail, that means there was a moment
208+
// when the queue was not empty, so it is safe to just return `false`.
209+
tail.wrapping_add(self.lap()) == head
210+
}
211+
212+
/// Returns `true` if the queue is full.
213+
///
214+
/// Inaccurate in the presence of concurrent method invocations.
215+
pub fn is_full(&self) -> bool {
216+
let tail = self.tail.load(Ordering::Relaxed);
217+
let head = self.head.load(Ordering::Relaxed);
218+
219+
// Is the head lagging one lap behind tail?
220+
//
221+
// Note: If the tail changes just before we load the head, that means there was a moment
222+
// when the queue was not full, so it is safe to just return `false`.
223+
head.wrapping_add(self.lap()) == tail
224+
}
225+
226+
/// Returns the current number of elements inside the queue.
227+
///
228+
/// Inaccurate in the presence of concurrent method invocations.
229+
pub fn len(&self) -> usize {
230+
// Load the tail, then load the head.
231+
let tail = self.tail.load(Ordering::Relaxed);
232+
let head = self.head.load(Ordering::Relaxed);
233+
234+
let hix = head & (self.lap() - 1);
235+
let tix = tail & (self.lap() - 1);
236+
237+
if hix < tix {
238+
tix - hix
239+
} else if hix > tix {
240+
self.cap() - hix + tix
241+
} else if tail.wrapping_add(self.lap()) == head {
242+
0
243+
} else {
244+
self.cap()
245+
}
246+
}
247+
}
248+
249+
impl<T> Drop for Queue<T> {
250+
fn drop(&mut self) {
251+
// Get the index of the head.
252+
let hix = self.head.load(Ordering::Relaxed) & (self.lap() - 1);
253+
254+
// Loop over all slots that hold a message and drop them.
255+
for i in 0..self.len() {
256+
// Compute the index of the next slot holding a message.
257+
let index = if hix + i < self.cap() {
258+
hix + i
259+
} else {
260+
hix + i - self.cap()
261+
};
262+
263+
unsafe {
264+
let mut value = self.buffer.read_unchecked(index);
265+
ManuallyDrop::drop(&mut value);
266+
}
267+
}
268+
}
269+
}
270+
}

crossbeam-circbuf/src/mp/mod.rs

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
//! Concurrent multiple-producer queues based on circular buffer.
2+
3+
pub mod mc;

crossbeam-circbuf/src/sp/internal.rs

+3-4
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use buffer::Buffer;
1616
pub use TryRecv;
1717

1818
/// Internal data shared among a circular buffer and its receivers.
19+
#[derive(Debug)]
1920
struct Inner<T> {
2021
/// The rx index.
2122
rx: CachePadded<AtomicUsize>,
@@ -50,11 +51,9 @@ impl<T> Drop for Inner<T> {
5051
let buffer = self.buffer.load(Ordering::Relaxed, epoch::unprotected());
5152

5253
// Drops the values from rx to tx in the buffer.
53-
let mut i = rx;
54-
while i != tx {
55-
let mut value = buffer.deref().read_unchecked(i);
54+
for i in 0..tx.wrapping_sub(rx) {
55+
let mut value = buffer.deref().read_unchecked(rx.wrapping_add(i));
5656
ManuallyDrop::drop(&mut value);
57-
i = i.wrapping_add(1);
5857
}
5958

6059
// Free the memory allocated by the buffer.

0 commit comments

Comments
 (0)