Skip to content

Commit c1e27a8

Browse files
committed
feat: pad all struct fields to reduce contention
1 parent 6740b6c commit c1e27a8

File tree

1 file changed

+60
-64
lines changed

1 file changed

+60
-64
lines changed

src/lib.rs

Lines changed: 60 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -8,27 +8,17 @@ use std::sync::atomic::{AtomicUsize, Ordering};
88

99
/// Padding to prevent false sharing
1010
#[repr(C, align(64))]
11-
struct CachePadded<T> {
12-
value: T,
13-
}
14-
15-
impl<T> CachePadded<T> {
16-
#[inline(always)]
17-
const fn new(value: T) -> Self {
18-
Self { value }
19-
}
20-
}
11+
struct CachePadded<T>(T);
2112

22-
/// High-performance SPSC queue with direct value storage
2313
pub struct FastQueue<T> {
2414
/// Capacity mask (capacity - 1) for fast modulo
25-
mask: usize,
15+
mask: CachePadded<usize>,
2616

2717
/// The actual capacity
28-
capacity: usize,
18+
capacity: CachePadded<usize>,
2919

3020
/// Buffer storing elements directly
31-
buffer: *mut MaybeUninit<T>,
21+
buffer: CachePadded<*mut MaybeUninit<T>>,
3222

3323
/// Producer cache line - head and cached tail together
3424
producer: CachePadded<ProducerCache>,
@@ -70,14 +60,14 @@ impl<T> FastQueue<T> {
7060
}
7161

7262
let queue = Arc::new(FastQueue {
73-
mask,
74-
capacity,
75-
buffer,
76-
producer: CachePadded::new(ProducerCache {
63+
mask: CachePadded(mask),
64+
capacity: CachePadded(capacity),
65+
buffer: CachePadded(buffer),
66+
producer: CachePadded(ProducerCache {
7767
head: AtomicUsize::new(0),
7868
cached_tail: UnsafeCell::new(0),
7969
}),
80-
consumer: CachePadded::new(ConsumerCache {
70+
consumer: CachePadded(ConsumerCache {
8171
tail: AtomicUsize::new(0),
8272
cached_head: UnsafeCell::new(0),
8373
}),
@@ -96,22 +86,22 @@ impl<T> FastQueue<T> {
9686

9787
impl<T> Drop for FastQueue<T> {
9888
fn drop(&mut self) {
99-
let head = self.producer.value.head.load(Ordering::Relaxed);
100-
let mut tail = self.consumer.value.tail.load(Ordering::Relaxed);
89+
let head = self.producer.0.head.load(Ordering::Relaxed);
90+
let mut tail = self.consumer.0.tail.load(Ordering::Relaxed);
10191

10292
while tail != head {
10393
unsafe {
104-
let index = tail & self.mask;
105-
let slot = self.buffer.add(index);
94+
let index = tail & self.mask.0;
95+
let slot = self.buffer.0.add(index);
10696
ptr::drop_in_place((*slot).as_mut_ptr());
10797
}
10898
tail = tail.wrapping_add(1);
10999
}
110100

111101
unsafe {
112102
let layout =
113-
Layout::array::<MaybeUninit<T>>(self.capacity).expect("Layout calculation failed");
114-
dealloc(self.buffer as *mut u8, layout);
103+
Layout::array::<MaybeUninit<T>>(self.capacity.0).expect("Layout calculation failed");
104+
dealloc(self.buffer.0 as *mut u8, layout);
115105
}
116106
}
117107
}
@@ -126,49 +116,50 @@ impl<T> Producer<T> {
126116
/// Returns Ok(()) on success, Err(value) if queue is full
127117
#[inline(always)]
128118
pub fn push(&mut self, value: T) -> Result<(), T> {
129-
let head = self.queue.producer.value.head.load(Ordering::Relaxed);
119+
let head = self.queue.producer.0.head.load(Ordering::Relaxed);
130120
let next_head = head.wrapping_add(1);
131121

132-
let cached_tail = unsafe { *self.queue.producer.value.cached_tail.get() };
122+
let cached_tail = unsafe { *self.queue.producer.0.cached_tail.get() };
133123

134-
if next_head.wrapping_sub(cached_tail) > self.queue.capacity {
124+
if next_head.wrapping_sub(cached_tail) > self.queue.capacity.0 {
135125
// Reload actual tail (slow path)
136-
let tail = self.queue.consumer.value.tail.load(Ordering::Acquire);
126+
let tail = self.queue.consumer.0.tail.load(Ordering::Acquire);
137127
unsafe {
138-
*self.queue.producer.value.cached_tail.get() = tail;
128+
*self.queue.producer.0.cached_tail.get() = tail;
139129
}
140130

141131
// Check again with fresh tail
142-
if next_head.wrapping_sub(tail) > self.queue.capacity {
132+
if next_head.wrapping_sub(tail) > self.queue.capacity.0 {
143133
return Err(value);
144134
}
145135
}
146136

147137
unsafe {
148-
let index = head & self.queue.mask;
149-
let slot = self.queue.buffer.add(index);
150-
#[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
151-
{
152-
let next_index = next_head & self.queue.mask;
153-
let next_slot = self.queue.buffer.add(next_index);
154-
prefetch_write(next_slot as *const u8);
155-
}
138+
let index = head & self.queue.mask.0;
139+
let slot = self.queue.buffer.0.add(index);
156140
(*slot).as_mut_ptr().write(value);
157141
}
158142

159143
self.queue
160144
.producer
161-
.value
145+
.0
162146
.head
163147
.store(next_head, Ordering::Release);
164148

149+
#[cfg(any(target_arch = "x86", all(target_arch = "x86_64", target_feature = "sse")))]
150+
{
151+
let next_index = next_head & self.queue.mask;
152+
let next_slot = self.queue.buffer.add(next_index);
153+
prefetch_write(next_slot as *const u8);
154+
}
155+
165156
Ok(())
166157
}
167158

168159
#[inline(always)]
169160
pub fn len(&self) -> usize {
170-
let head = self.queue.producer.value.head.load(Ordering::Relaxed);
171-
let tail = self.queue.consumer.value.tail.load(Ordering::Relaxed);
161+
let head = self.queue.producer.0.head.load(Ordering::Relaxed);
162+
let tail = self.queue.consumer.0.tail.load(Ordering::Relaxed);
172163
head.wrapping_sub(tail)
173164
}
174165

@@ -179,7 +170,7 @@ impl<T> Producer<T> {
179170

180171
#[inline(always)]
181172
pub fn is_full(&self) -> bool {
182-
self.len() >= self.queue.capacity
173+
self.len() >= self.queue.capacity.0
183174
}
184175
}
185176

@@ -193,16 +184,16 @@ impl<T> Consumer<T> {
193184
/// Returns None if queue is empty or Some(T)
194185
#[inline(always)]
195186
pub fn pop(&mut self) -> Option<T> {
196-
let tail = self.queue.consumer.value.tail.load(Ordering::Relaxed);
187+
let tail = self.queue.consumer.0.tail.load(Ordering::Relaxed);
197188

198189
// Check cached head first (fast path)
199-
let cached_head = unsafe { *self.queue.consumer.value.cached_head.get() };
190+
let cached_head = unsafe { *self.queue.consumer.0.cached_head.get() };
200191

201192
if tail == cached_head {
202193
// Reload actual head (slow path)
203-
let head = self.queue.producer.value.head.load(Ordering::Acquire);
194+
let head = self.queue.producer.0.head.load(Ordering::Acquire);
204195
unsafe {
205-
*self.queue.consumer.value.cached_head.get() = head;
196+
*self.queue.consumer.0.cached_head.get() = head;
206197
}
207198

208199
// Check if still empty
@@ -212,49 +203,54 @@ impl<T> Consumer<T> {
212203
}
213204

214205
let value = unsafe {
215-
let index = tail & self.queue.mask;
216-
let slot = self.queue.buffer.add(index);
217-
#[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
218-
{
219-
let next_index = (tail + 1) & self.queue.mask;
220-
let next_slot = self.queue.buffer.add(next_index);
221-
prefetch_read(next_slot as *const u8);
222-
}
206+
let index = tail & self.queue.mask.0;
207+
let slot = self.queue.buffer.0.add(index);
223208
(*slot).as_ptr().read()
224209
};
225210

226211
let next_tail = tail.wrapping_add(1);
227212
self.queue
228213
.consumer
229-
.value
214+
.0
230215
.tail
231216
.store(next_tail, Ordering::Release);
232217

218+
#[cfg(any(target_arch = "x86", all(target_arch = "x86_64", target_feature = "sse")))]
219+
{
220+
let next_index = (tail + 1) & self.queue.mask;
221+
let next_slot = self.queue.buffer.add(next_index);
222+
prefetch_read(next_slot as *const u8);
223+
}
224+
233225
Some(value)
234226
}
235227

236228
/// Peek at the front element without removing it
237229
#[inline(always)]
238230
pub fn peek(&self) -> Option<&T> {
239-
let tail = self.queue.consumer.value.tail.load(Ordering::Relaxed);
240-
let head = self.queue.producer.value.head.load(Ordering::Acquire);
231+
let tail = self.queue.consumer.0.tail.load(Ordering::Relaxed);
232+
let head = self.queue.producer.0.head.load(Ordering::Acquire);
241233

242234
if tail == head {
243235
return None;
244236
}
245237

246238
unsafe {
247-
let index = tail & self.queue.mask;
248-
let slot = self.queue.buffer.add(index);
239+
let index = tail & self.queue.mask.0;
240+
let slot = self.queue.buffer.0.add(index);
241+
#[cfg(any(target_arch = "x86", all(target_arch = "x86_64", target_feature = "sse")))]
242+
{
243+
prefetch_read(slot as *const u8);
244+
}
249245
Some(&*(*slot).as_ptr())
250246
}
251247
}
252248

253249
/// Get the current size of the queue (may be stale)
254250
#[inline(always)]
255251
pub fn len(&self) -> usize {
256-
let head = self.queue.producer.value.head.load(Ordering::Relaxed);
257-
let tail = self.queue.consumer.value.tail.load(Ordering::Relaxed);
252+
let head = self.queue.producer.0.head.load(Ordering::Relaxed);
253+
let tail = self.queue.consumer.0.tail.load(Ordering::Relaxed);
258254
head.wrapping_sub(tail)
259255
}
260256

@@ -265,7 +261,7 @@ impl<T> Consumer<T> {
265261
}
266262
}
267263

268-
#[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
264+
#[cfg(any(target_arch = "x86", all(target_arch = "x86_64", target_feature = "sse")))]
269265
#[inline(always)]
270266
fn prefetch_read(p: *const u8) {
271267
unsafe {
@@ -279,7 +275,7 @@ fn prefetch_read(p: *const u8) {
279275
}
280276
}
281277

282-
#[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
278+
#[cfg(any(target_arch = "x86", all(target_arch = "x86_64", target_feature = "sse")))]
283279
#[inline(always)]
284280
fn prefetch_write(p: *const u8) {
285281
unsafe {

0 commit comments

Comments
 (0)