Skip to content

Commit 47ffc17

Browse files
committed
Reorganize dirctories
1 parent 016626a commit 47ffc17

File tree

8 files changed

+106
-112
lines changed

8 files changed

+106
-112
lines changed

crossbeam-circbuf/src/lib.rs

+6-27
Original file line numberDiff line numberDiff line change
@@ -12,34 +12,13 @@ extern crate crossbeam_epoch as epoch;
1212
extern crate crossbeam_utils as utils;
1313

1414
mod buffer;
15+
mod try_recv;
1516

1617
#[doc(hidden)] // for doc-tests
17-
pub mod mp;
18-
#[doc(hidden)] // for doc-tests
19-
pub mod sp;
20-
21-
pub use mp::mc as mpmc;
22-
pub use sp::mc as spmc;
23-
pub use sp::sc as spsc;
18+
pub mod sp_inner;
2419

25-
/// The return type for `try_recv` methods.
26-
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
27-
pub enum TryRecv<T> {
28-
/// Received a value.
29-
Data(T),
30-
/// Not received a value because the buffer is empty.
31-
Empty,
32-
/// Lost the race to a concurrent operation. Try again.
33-
Retry,
34-
}
20+
pub use try_recv::TryRecv;
3521

36-
impl<T> TryRecv<T> {
37-
/// Applies a function to the content of `TryRecv::Data`.
38-
pub fn map<U, F: FnOnce(T) -> U>(self, f: F) -> TryRecv<U> {
39-
match self {
40-
TryRecv::Data(v) => TryRecv::Data(f(v)),
41-
TryRecv::Empty => TryRecv::Empty,
42-
TryRecv::Retry => TryRecv::Retry,
43-
}
44-
}
45-
}
22+
pub mod mpmc;
23+
pub mod spmc;
24+
pub mod spsc;

crossbeam-circbuf/src/mp/mod.rs

-3
This file was deleted.
File renamed without changes.

crossbeam-circbuf/src/sp/mod.rs

-52
This file was deleted.

crossbeam-circbuf/src/sp/internal.rs crossbeam-circbuf/src/sp_inner.rs

+61-12
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,53 @@
11
//! Internal data structures used in single-producer queues.
2+
//!
3+
//! TODO: move
4+
//!
5+
//! Concurrent single-producer queues based on circular buffer.
6+
//!
7+
//! [`CircBuf`] is a circular buffer, which is basically a fixed-sized array that has two ends: tx
8+
//! and rx. A [`CircBuf`] can [`send`] values into the tx end and [`CircBuf::try_recv`] values from
9+
//! the rx end. A [`CircBuf`] doesn't implement `Sync` so it cannot be shared among multiple
10+
//! threads. However, it can create [`Receiver`]s, and those can be easily cloned, shared, and sent
11+
//! to other threads. [`Receiver`]s can only [`Receiver::try_recv`] values from the rx end.
12+
//!
13+
//! Here's a visualization of a [`CircBuf`] of capacity 4, consisting of 2 values `a` and `b`.
14+
//!
15+
//! ```text
16+
//! ___
17+
//! | a | <- rx (CircBuf::try_recv, Receiver::try_recv)
18+
//! | b |
19+
//! | | <- tx (CircBuf::send)
20+
//! | |
21+
//! ¯¯¯
22+
//! ```
23+
//!
24+
//! [`DynamicCircBuf`] is a dynamically growable and shrinkable circular buffer. Internally,
25+
//! [`DynamicCircBuf`] has a [`CircBuf`] and resizes it when necessary.
26+
//!
27+
//!
28+
//! # Usage: fair work-stealing schedulers
29+
//!
30+
//! This data structure can be used in fair work-stealing schedulers for multiple threads as
31+
//! follows.
32+
//!
33+
//! Each thread owns a [`CircBuf`] (or [`DynamicCircBuf`]) and creates a [`Receiver`] that is shared
34+
//! among all other threads (or creates one [`Receiver`] for each of the other threads).
35+
//!
36+
//! Each thread is executing a loop in which it attempts to [`CircBuf::try_recv`] some task from its
37+
//! own [`CircBuf`] and perform it. If the buffer is empty, it attempts to [`Receiver::try_recv`]
38+
//! work from other threads instead. When performing a task, a thread may produce more tasks by
39+
//! [`send`]ing them to its buffer.
40+
//!
41+
//! It is worth noting that it is discouraged to use work-stealing deque for fair schedulers,
42+
//! because its `pop()` may return a task that is just `push()`ed, effectively scheduling the same
43+
//! work repeatedly.
44+
//!
45+
//! [`CircBuf`]: struct.CircBuf.html
46+
//! [`DynamicCircBuf`]: struct.DynamicCircBuf.html
47+
//! [`Receiver`]: struct.Receiver.html
48+
//! [`send`]: struct.CircBuf.html#method.send
49+
//! [`CircBuf::try_recv`]: struct.CircBuf.html#method.try_recv
50+
//! [`Receiver::try_recv`]: struct.Receiver.html#method.try_recv
251
352
use std::cell::Cell;
453
use std::fmt;
@@ -104,7 +153,7 @@ impl<T> CircBuf<T> {
104153
/// # Examples
105154
///
106155
/// ```
107-
/// use crossbeam_circbuf::sp::internal::CircBuf;
156+
/// use crossbeam_circbuf::sp_inner::CircBuf;
108157
///
109158
/// // The capacity will be rounded up to 1024.
110159
/// let cb = CircBuf::<i32>::new(1000);
@@ -128,7 +177,7 @@ impl<T> CircBuf<T> {
128177
/// # Examples
129178
///
130179
/// ```
131-
/// use crossbeam_circbuf::sp::internal::CircBuf;
180+
/// use crossbeam_circbuf::sp_inner::CircBuf;
132181
///
133182
/// let cb = CircBuf::new(16);
134183
/// cb.send(1).unwrap();
@@ -147,7 +196,7 @@ impl<T> CircBuf<T> {
147196
/// # Examples
148197
///
149198
/// ```
150-
/// use crossbeam_circbuf::sp::internal::{CircBuf, TryRecv};
199+
/// use crossbeam_circbuf::sp_inner::{CircBuf, TryRecv};
151200
///
152201
/// let cb = CircBuf::new(16);
153202
/// cb.send(1).unwrap();
@@ -268,7 +317,7 @@ impl<T> CircBuf<T> {
268317
/// # Examples
269318
///
270319
/// ```
271-
/// use crossbeam_circbuf::sp::internal::{CircBuf, TryRecv};
320+
/// use crossbeam_circbuf::sp_inner::{CircBuf, TryRecv};
272321
/// use std::thread;
273322
///
274323
/// let cb = CircBuf::new(16);
@@ -351,7 +400,7 @@ impl<T> DynamicCircBuf<T> {
351400
/// # Examples
352401
///
353402
/// ```
354-
/// use crossbeam_circbuf::sp::internal::DynamicCircBuf;
403+
/// use crossbeam_circbuf::sp_inner::DynamicCircBuf;
355404
///
356405
/// let cb = DynamicCircBuf::<i32>::new();
357406
/// ```
@@ -367,7 +416,7 @@ impl<T> DynamicCircBuf<T> {
367416
/// # Examples
368417
///
369418
/// ```
370-
/// use crossbeam_circbuf::sp::internal::DynamicCircBuf;
419+
/// use crossbeam_circbuf::sp_inner::DynamicCircBuf;
371420
///
372421
/// // The minimum capacity will be rounded up to 1024.
373422
/// let cb = DynamicCircBuf::<i32>::with_min_capacity(1000);
@@ -391,7 +440,7 @@ impl<T> DynamicCircBuf<T> {
391440
/// # Examples
392441
///
393442
/// ```
394-
/// use crossbeam_circbuf::sp::internal::DynamicCircBuf;
443+
/// use crossbeam_circbuf::sp_inner::DynamicCircBuf;
395444
///
396445
/// let cb = DynamicCircBuf::new();
397446
/// cb.send(1);
@@ -430,7 +479,7 @@ impl<T> DynamicCircBuf<T> {
430479
/// # Examples
431480
///
432481
/// ```
433-
/// use crossbeam_circbuf::sp::internal::{DynamicCircBuf, TryRecv};
482+
/// use crossbeam_circbuf::sp_inner::{DynamicCircBuf, TryRecv};
434483
///
435484
/// let cb = DynamicCircBuf::new();
436485
/// cb.send(1);
@@ -464,7 +513,7 @@ impl<T> DynamicCircBuf<T> {
464513
/// # Examples
465514
///
466515
/// ```
467-
/// use crossbeam_circbuf::sp::internal::{DynamicCircBuf, TryRecv};
516+
/// use crossbeam_circbuf::sp_inner::{DynamicCircBuf, TryRecv};
468517
/// use std::thread;
469518
///
470519
/// let cb = DynamicCircBuf::new();
@@ -558,7 +607,7 @@ impl<T> Receiver<T> {
558607
/// # Examples
559608
///
560609
/// ```
561-
/// use crossbeam_circbuf::sp::internal::{DynamicCircBuf, TryRecv};
610+
/// use crossbeam_circbuf::sp_inner::{DynamicCircBuf, TryRecv};
562611
///
563612
/// let cb = DynamicCircBuf::new();
564613
/// let r = cb.receiver();
@@ -615,7 +664,7 @@ impl<T> Receiver<T> {
615664
/// # Examples
616665
///
617666
/// ```
618-
/// use crossbeam_circbuf::sp::internal::{DynamicCircBuf, TryRecv};
667+
/// use crossbeam_circbuf::sp_inner::{DynamicCircBuf, TryRecv};
619668
///
620669
/// let cb = DynamicCircBuf::new();
621670
/// let r = cb.receiver();
@@ -706,7 +755,7 @@ impl<T> Receiver<T> {
706755
/// # Examples
707756
///
708757
/// ```
709-
/// use crossbeam_circbuf::sp::internal::{DynamicCircBuf, TryRecv};
758+
/// use crossbeam_circbuf::sp_inner::{DynamicCircBuf, TryRecv};
710759
///
711760
/// let cb = DynamicCircBuf::new();
712761
/// let r = cb.receiver();

crossbeam-circbuf/src/sp/mc.rs crossbeam-circbuf/src/spmc.rs

+9-9
Original file line numberDiff line numberDiff line change
@@ -24,16 +24,16 @@
2424
/// }).join().unwrap();
2525
/// ```
2626
pub mod bounded {
27-
use super::super::internal;
27+
use crate::sp_inner;
2828
pub use TryRecv;
2929

3030
/// A bounded SPMC queue.
3131
#[derive(Debug)]
32-
pub struct Queue<T>(internal::CircBuf<T>);
32+
pub struct Queue<T>(sp_inner::CircBuf<T>);
3333

3434
/// The receiver of a bounded SPMC queue.
3535
#[derive(Debug)]
36-
pub struct Receiver<T>(internal::Receiver<T>);
36+
pub struct Receiver<T>(sp_inner::Receiver<T>);
3737

3838
impl<T> Queue<T> {
3939
/// Creates a bounded SPMC queue with the specified capacity.
@@ -49,7 +49,7 @@ pub mod bounded {
4949
/// ```
5050
pub fn new(cap: usize) -> Self {
5151
Queue {
52-
0: internal::CircBuf::new(cap),
52+
0: sp_inner::CircBuf::new(cap),
5353
}
5454
}
5555

@@ -202,16 +202,16 @@ pub mod bounded {
202202
/// }).join().unwrap();
203203
/// ```
204204
pub mod unbounded {
205-
use super::super::internal;
205+
use crate::sp_inner;
206206
pub use TryRecv;
207207

208208
/// an unbounded SPMC queue.
209209
#[derive(Debug)]
210-
pub struct Queue<T>(internal::DynamicCircBuf<T>);
210+
pub struct Queue<T>(sp_inner::DynamicCircBuf<T>);
211211

212212
/// The receiver of an unbounded SPMC queue.
213213
#[derive(Debug)]
214-
pub struct Receiver<T>(internal::Receiver<T>);
214+
pub struct Receiver<T>(sp_inner::Receiver<T>);
215215

216216
impl<T> Queue<T> {
217217
/// Creates an unbounded SPMC queue.
@@ -225,7 +225,7 @@ pub mod unbounded {
225225
/// ```
226226
pub fn new() -> Self {
227227
Queue {
228-
0: internal::DynamicCircBuf::new(),
228+
0: sp_inner::DynamicCircBuf::new(),
229229
}
230230
}
231231

@@ -243,7 +243,7 @@ pub mod unbounded {
243243
/// ```
244244
pub fn with_min_capacity(min_cap: usize) -> Self {
245245
Queue {
246-
0: internal::DynamicCircBuf::with_min_capacity(min_cap),
246+
0: sp_inner::DynamicCircBuf::with_min_capacity(min_cap),
247247
}
248248
}
249249

crossbeam-circbuf/src/sp/sc.rs crossbeam-circbuf/src/spsc.rs

+9-9
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,16 @@
2323
/// }).join().unwrap();
2424
/// ```
2525
pub mod bounded {
26-
use sp::internal;
26+
use crate::sp_inner;
2727
pub use TryRecv;
2828

2929
/// The sender of a bounded SPSC queue.
3030
#[derive(Debug)]
31-
pub struct Sender<T>(internal::CircBuf<T>);
31+
pub struct Sender<T>(sp_inner::CircBuf<T>);
3232

3333
/// The receiver of a bounded SPSC queue.
3434
#[derive(Debug)]
35-
pub struct Receiver<T>(internal::Receiver<T>);
35+
pub struct Receiver<T>(sp_inner::Receiver<T>);
3636

3737
unsafe impl<T> Send for Receiver<T> {}
3838

@@ -49,7 +49,7 @@ pub mod bounded {
4949
/// let (tx, rx) = spsc::new::<u32>(16);
5050
/// ```
5151
pub fn new<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
52-
let circbuf = internal::CircBuf::new(cap);
52+
let circbuf = sp_inner::CircBuf::new(cap);
5353
let receiver = circbuf.receiver();
5454
let sender = Sender { 0: circbuf };
5555
let receiver = Receiver { 0: receiver };
@@ -121,16 +121,16 @@ pub mod bounded {
121121
/// }).join().unwrap();
122122
/// ```
123123
pub mod unbounded {
124-
use sp::internal;
124+
use crate::sp_inner;
125125
pub use TryRecv;
126126

127127
/// The sender of an unbounded SPSC queue.
128128
#[derive(Debug)]
129-
pub struct Sender<T>(internal::DynamicCircBuf<T>);
129+
pub struct Sender<T>(sp_inner::DynamicCircBuf<T>);
130130

131131
/// The receiver of an unbounded SPSC queue.
132132
#[derive(Debug)]
133-
pub struct Receiver<T>(internal::Receiver<T>);
133+
pub struct Receiver<T>(sp_inner::Receiver<T>);
134134

135135
unsafe impl<T> Send for Receiver<T> {}
136136

@@ -144,7 +144,7 @@ pub mod unbounded {
144144
/// let (tx, rx) = spsc::new::<u32>();
145145
/// ```
146146
pub fn new<T>() -> (Sender<T>, Receiver<T>) {
147-
let circbuf = internal::DynamicCircBuf::new();
147+
let circbuf = sp_inner::DynamicCircBuf::new();
148148
let receiver = circbuf.receiver();
149149
let sender = Sender { 0: circbuf };
150150
let receiver = Receiver { 0: receiver };
@@ -165,7 +165,7 @@ pub mod unbounded {
165165
/// let (tx, rx) = spsc::with_min_capacity::<u32>(1000);
166166
/// ```
167167
pub fn with_min_capacity<T>(min_cap: usize) -> (Sender<T>, Receiver<T>) {
168-
let circbuf = internal::DynamicCircBuf::with_min_capacity(min_cap);
168+
let circbuf = sp_inner::DynamicCircBuf::with_min_capacity(min_cap);
169169
let receiver = circbuf.receiver();
170170
let sender = Sender { 0: circbuf };
171171
let receiver = Receiver { 0: receiver };

0 commit comments

Comments
 (0)