Skip to content

Commit 2a94555

Browse files
committed
Use AtomicU64 for head/tail index in deque, channel, and queues
1 parent 85d0bdc commit 2a94555

23 files changed

+334
-101
lines changed

Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ default-features = false
6868
optional = true
6969

7070
[dependencies.crossbeam-utils]
71-
version = "0.8.5"
71+
version = "0.8.6"
7272
path = "./crossbeam-utils"
7373
default-features = false
7474

ci/no_atomic.sh

+2-1
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ cat >"${file}" <<EOF
4141
// This file is @generated by $(basename "$0").
4242
// It is not intended for manual editing.
4343
44+
#[allow(dead_code)] // Only crossbeam-{epoch,queue,skiplist,utils} use this.
4445
const NO_ATOMIC_CAS: &[&str] = &[
4546
EOF
4647
for target in "${no_atomic_cas[@]}"; do
@@ -49,7 +50,7 @@ done
4950
cat >>"${file}" <<EOF
5051
];
5152
52-
#[allow(dead_code)] // Only crossbeam-utils uses this.
53+
#[allow(dead_code)] // Only crossbeam-{channel,deque,queue,utils} use this.
5354
const NO_ATOMIC_64: &[&str] = &[
5455
EOF
5556
for target in "${no_atomic_64[@]}"; do

crossbeam-channel/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ std = ["crossbeam-utils/std"]
2727
cfg-if = "1"
2828

2929
[dependencies.crossbeam-utils]
30-
version = "0.8"
30+
version = "0.8.6"
3131
path = "../crossbeam-utils"
3232
default-features = false
3333
optional = true

crossbeam-channel/build.rs

+43
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
// The rustc-cfg listed below are considered public API, but it is *unstable*
2+
// and outside of the normal semver guarantees:
3+
//
4+
// - `crossbeam_no_atomic_64`
5+
// Assume the target does *not* support AtomicU64/AtomicI64.
6+
// This is usually detected automatically by the build script, but you may
7+
// need to enable it manually when building for custom targets or using
8+
// non-cargo build systems that don't run the build script.
9+
//
10+
// With the exceptions mentioned above, the rustc-cfg emitted by the build
11+
// script are *not* public API.
12+
13+
#![warn(rust_2018_idioms)]
14+
15+
use std::env;
16+
17+
include!("no_atomic.rs");
18+
19+
fn main() {
20+
let target = match env::var("TARGET") {
21+
Ok(target) => target,
22+
Err(e) => {
23+
println!(
24+
"cargo:warning={}: unable to get TARGET environment variable: {}",
25+
env!("CARGO_PKG_NAME"),
26+
e
27+
);
28+
return;
29+
}
30+
};
31+
32+
// Note that this is `no_*`, not `has_*`. This allows treating
33+
// "max-atomic-width" as 64 when the build script doesn't run. This is
34+
// needed for compatibility with non-cargo build systems that don't run the
35+
// build script.
36+
if NO_ATOMIC_64.contains(&&*target) {
37+
println!("cargo:rustc-cfg=crossbeam_no_atomic_64");
38+
} else {
39+
// Otherwise, assuming `"max-atomic-width" == 64` or `"max-atomic-width" == 128`.
40+
}
41+
42+
println!("cargo:rerun-if-changed=no_atomic.rs");
43+
}

crossbeam-channel/no_atomic.rs

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
../no_atomic.rs

crossbeam-channel/src/flavors/array.rs

+20-19
Original file line numberDiff line numberDiff line change
@@ -11,20 +11,21 @@
1111
use std::cell::UnsafeCell;
1212
use std::mem::MaybeUninit;
1313
use std::ptr;
14-
use std::sync::atomic::{self, AtomicUsize, Ordering};
14+
use std::sync::atomic::{self, Ordering};
1515
use std::time::Instant;
1616

1717
use crossbeam_utils::{Backoff, CachePadded};
1818

1919
use crate::context::Context;
2020
use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
2121
use crate::select::{Operation, SelectHandle, Selected, Token};
22+
use crate::utils::AtomicU64;
2223
use crate::waker::SyncWaker;
2324

2425
/// A slot in a channel.
2526
struct Slot<T> {
2627
/// The current stamp.
27-
stamp: AtomicUsize,
28+
stamp: AtomicU64,
2829

2930
/// The message in this slot.
3031
msg: UnsafeCell<MaybeUninit<T>>,
@@ -37,7 +38,7 @@ pub(crate) struct ArrayToken {
3738
slot: *const u8,
3839

3940
/// Stamp to store into the slot after reading or writing.
40-
stamp: usize,
41+
stamp: u64,
4142
}
4243

4344
impl Default for ArrayToken {
@@ -55,20 +56,20 @@ pub(crate) struct Channel<T> {
5556
/// The head of the channel.
5657
///
5758
/// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but
58-
/// packed into a single `usize`. The lower bits represent the index, while the upper bits
59+
/// packed into a single `u64`. The lower bits represent the index, while the upper bits
5960
/// represent the lap. The mark bit in the head is always zero.
6061
///
6162
/// Messages are popped from the head of the channel.
62-
head: CachePadded<AtomicUsize>,
63+
head: CachePadded<AtomicU64>,
6364

6465
/// The tail of the channel.
6566
///
6667
/// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but
67-
/// packed into a single `usize`. The lower bits represent the index, while the upper bits
68+
/// packed into a single `u64`. The lower bits represent the index, while the upper bits
6869
/// represent the lap. The mark bit indicates that the channel is disconnected.
6970
///
7071
/// Messages are pushed into the tail of the channel.
71-
tail: CachePadded<AtomicUsize>,
72+
tail: CachePadded<AtomicU64>,
7273

7374
/// The buffer holding slots.
7475
buffer: Box<[Slot<T>]>,
@@ -77,10 +78,10 @@ pub(crate) struct Channel<T> {
7778
cap: usize,
7879

7980
/// A stamp with the value of `{ lap: 1, mark: 0, index: 0 }`.
80-
one_lap: usize,
81+
one_lap: u64,
8182

8283
/// If this bit is set in the tail, that means the channel is disconnected.
83-
mark_bit: usize,
84+
mark_bit: u64,
8485

8586
/// Senders waiting while the channel is full.
8687
senders: SyncWaker,
@@ -95,7 +96,7 @@ impl<T> Channel<T> {
9596
assert!(cap > 0, "capacity must be positive");
9697

9798
// Compute constants `mark_bit` and `one_lap`.
98-
let mark_bit = (cap + 1).next_power_of_two();
99+
let mark_bit = (cap as u64 + 1).next_power_of_two();
99100
let one_lap = mark_bit * 2;
100101

101102
// Head is initialized to `{ lap: 0, mark: 0, index: 0 }`.
@@ -105,11 +106,11 @@ impl<T> Channel<T> {
105106

106107
// Allocate a buffer of `cap` slots initialized
107108
// with stamps.
108-
let buffer: Box<[Slot<T>]> = (0..cap)
109+
let buffer: Box<[Slot<T>]> = (0..cap as u64)
109110
.map(|i| {
110111
// Set the stamp to `{ lap: 0, mark: 0, index: i }`.
111112
Slot {
112-
stamp: AtomicUsize::new(i),
113+
stamp: AtomicU64::new(i),
113114
msg: UnsafeCell::new(MaybeUninit::uninit()),
114115
}
115116
})
@@ -120,8 +121,8 @@ impl<T> Channel<T> {
120121
cap,
121122
one_lap,
122123
mark_bit,
123-
head: CachePadded::new(AtomicUsize::new(head)),
124-
tail: CachePadded::new(AtomicUsize::new(tail)),
124+
head: CachePadded::new(AtomicU64::new(head)),
125+
tail: CachePadded::new(AtomicU64::new(tail)),
125126
senders: SyncWaker::new(),
126127
receivers: SyncWaker::new(),
127128
}
@@ -151,7 +152,7 @@ impl<T> Channel<T> {
151152
}
152153

153154
// Deconstruct the tail.
154-
let index = tail & (self.mark_bit - 1);
155+
let index = (tail & (self.mark_bit - 1)) as usize;
155156
let lap = tail & !(self.one_lap - 1);
156157

157158
// Inspect the corresponding slot.
@@ -234,7 +235,7 @@ impl<T> Channel<T> {
234235

235236
loop {
236237
// Deconstruct the head.
237-
let index = head & (self.mark_bit - 1);
238+
let index = (head & (self.mark_bit - 1)) as usize;
238239
let lap = head & !(self.one_lap - 1);
239240

240241
// Inspect the corresponding slot.
@@ -452,8 +453,8 @@ impl<T> Channel<T> {
452453

453454
// If the tail didn't change, we've got consistent values to work with.
454455
if self.tail.load(Ordering::SeqCst) == tail {
455-
let hix = head & (self.mark_bit - 1);
456-
let tix = tail & (self.mark_bit - 1);
456+
let hix = (head & (self.mark_bit - 1)) as usize;
457+
let tix = (tail & (self.mark_bit - 1)) as usize;
457458

458459
return if hix < tix {
459460
tix - hix
@@ -522,7 +523,7 @@ impl<T> Channel<T> {
522523
impl<T> Drop for Channel<T> {
523524
fn drop(&mut self) {
524525
// Get the index of the head.
525-
let hix = self.head.load(Ordering::Relaxed) & (self.mark_bit - 1);
526+
let hix = (self.head.load(Ordering::Relaxed) & (self.mark_bit - 1)) as usize;
526527

527528
// Loop over all slots that hold a message and drop them.
528529
for i in 0..self.len() {

crossbeam-channel/src/flavors/list.rs

+16-15
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use crossbeam_utils::{Backoff, CachePadded};
1212
use crate::context::Context;
1313
use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
1414
use crate::select::{Operation, SelectHandle, Selected, Token};
15+
use crate::utils::AtomicU64;
1516
use crate::waker::SyncWaker;
1617

1718
// TODO(stjepang): Once we bump the minimum required Rust version to 1.28 or newer, re-apply the
@@ -29,15 +30,15 @@ const READ: usize = 2;
2930
const DESTROY: usize = 4;
3031

3132
// Each block covers one "lap" of indices.
32-
const LAP: usize = 32;
33+
const LAP: u64 = 32;
3334
// The maximum number of messages a block can hold.
34-
const BLOCK_CAP: usize = LAP - 1;
35+
const BLOCK_CAP: usize = LAP as usize - 1;
3536
// How many lower bits are reserved for metadata.
36-
const SHIFT: usize = 1;
37+
const SHIFT: u64 = 1;
3738
// Has two different purposes:
3839
// * If set in head, indicates that the block is not the last one.
3940
// * If set in tail, indicates that the channel is disconnected.
40-
const MARK_BIT: usize = 1;
41+
const MARK_BIT: u64 = 1;
4142

4243
/// A slot in a block.
4344
struct Slot<T> {
@@ -66,7 +67,7 @@ struct Block<T> {
6667
next: AtomicPtr<Block<T>>,
6768

6869
/// Slots for messages.
69-
slots: [Slot<T>; BLOCK_CAP],
70+
slots: [Slot<T>; BLOCK_CAP as usize],
7071
}
7172

7273
impl<T> Block<T> {
@@ -97,7 +98,7 @@ impl<T> Block<T> {
9798
unsafe fn destroy(this: *mut Block<T>, start: usize) {
9899
// It is not necessary to set the `DESTROY` bit in the last slot because that slot has
99100
// begun destruction of the block.
100-
for i in start..BLOCK_CAP - 1 {
101+
for i in start..BLOCK_CAP as usize - 1 {
101102
let slot = (*this).slots.get_unchecked(i);
102103

103104
// Mark the `DESTROY` bit if a thread is still using the slot.
@@ -118,7 +119,7 @@ impl<T> Block<T> {
118119
#[derive(Debug)]
119120
struct Position<T> {
120121
/// The index in the channel.
121-
index: AtomicUsize,
122+
index: AtomicU64,
122123

123124
/// The block in the linked list.
124125
block: AtomicPtr<Block<T>>,
@@ -171,11 +172,11 @@ impl<T> Channel<T> {
171172
Channel {
172173
head: CachePadded::new(Position {
173174
block: AtomicPtr::new(ptr::null_mut()),
174-
index: AtomicUsize::new(0),
175+
index: AtomicU64::new(0),
175176
}),
176177
tail: CachePadded::new(Position {
177178
block: AtomicPtr::new(ptr::null_mut()),
178-
index: AtomicUsize::new(0),
179+
index: AtomicU64::new(0),
179180
}),
180181
receivers: SyncWaker::new(),
181182
_marker: PhantomData,
@@ -207,7 +208,7 @@ impl<T> Channel<T> {
207208
}
208209

209210
// Calculate the offset of the index into the block.
210-
let offset = (tail >> SHIFT) % LAP;
211+
let offset = ((tail >> SHIFT) % LAP) as usize;
211212

212213
// If we reached the end of the block, wait until the next one is installed.
213214
if offset == BLOCK_CAP {
@@ -302,7 +303,7 @@ impl<T> Channel<T> {
302303

303304
loop {
304305
// Calculate the offset of the index into the block.
305-
let offset = (head >> SHIFT) % LAP;
306+
let offset = ((head >> SHIFT) % LAP) as usize;
306307

307308
// If we reached the end of the block, wait until the next one is installed.
308309
if offset == BLOCK_CAP {
@@ -520,7 +521,7 @@ impl<T> Channel<T> {
520521
head >>= SHIFT;
521522

522523
// Return the difference minus the number of blocks between tail and head.
523-
return tail - head - tail / LAP;
524+
return (tail - head - tail / LAP) as usize;
524525
}
525526
}
526527
}
@@ -567,7 +568,7 @@ impl<T> Channel<T> {
567568
let backoff = Backoff::new();
568569
let mut tail = self.tail.index.load(Ordering::Acquire);
569570
loop {
570-
let offset = (tail >> SHIFT) % LAP;
571+
let offset = ((tail >> SHIFT) % LAP) as usize;
571572
if offset != BLOCK_CAP {
572573
break;
573574
}
@@ -585,7 +586,7 @@ impl<T> Channel<T> {
585586
unsafe {
586587
// Drop all messages between head and tail and deallocate the heap-allocated blocks.
587588
while head >> SHIFT != tail >> SHIFT {
588-
let offset = (head >> SHIFT) % LAP;
589+
let offset = ((head >> SHIFT) % LAP) as usize;
589590

590591
if offset < BLOCK_CAP {
591592
// Drop the message in the slot.
@@ -645,7 +646,7 @@ impl<T> Drop for Channel<T> {
645646
unsafe {
646647
// Drop all messages between head and tail and deallocate the heap-allocated blocks.
647648
while head != tail {
648-
let offset = (head >> SHIFT) % LAP;
649+
let offset = ((head >> SHIFT) % LAP) as usize;
649650

650651
if offset < BLOCK_CAP {
651652
// Drop the message in the slot.

crossbeam-channel/src/utils.rs

+40
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,46 @@ pub(crate) fn sleep_until(deadline: Option<Instant>) {
6161
}
6262
}
6363

64+
#[cfg(not(crossbeam_no_atomic_64))]
65+
pub(crate) use core::sync::atomic::AtomicU64;
66+
67+
#[cfg(crossbeam_no_atomic_64)]
68+
#[derive(Debug)]
69+
#[repr(transparent)]
70+
pub(crate) struct AtomicU64 {
71+
inner: crossbeam_utils::atomic::AtomicCell<u64>,
72+
}
73+
74+
#[cfg(crossbeam_no_atomic_64)]
75+
impl AtomicU64 {
76+
pub(crate) const fn new(v: u64) -> Self {
77+
Self {
78+
inner: crossbeam_utils::atomic::AtomicCell::new(v),
79+
}
80+
}
81+
pub(crate) fn load(&self, _order: Ordering) -> u64 {
82+
self.inner.load()
83+
}
84+
pub(crate) fn store(&self, val: u64, _order: Ordering) {
85+
self.inner.store(val);
86+
}
87+
pub(crate) fn compare_exchange_weak(
88+
&self,
89+
current: u64,
90+
new: u64,
91+
_success: Ordering,
92+
_failure: Ordering,
93+
) -> Result<u64, u64> {
94+
self.inner.compare_exchange(current, new)
95+
}
96+
pub(crate) fn fetch_add(&self, val: u64, _order: Ordering) -> u64 {
97+
self.inner.fetch_add(val)
98+
}
99+
pub(crate) fn fetch_or(&self, val: u64, _order: Ordering) -> u64 {
100+
self.inner.fetch_or(val)
101+
}
102+
}
103+
64104
/// A simple spinlock.
65105
pub(crate) struct Spinlock<T> {
66106
flag: AtomicBool,

0 commit comments

Comments
 (0)