Skip to content

Commit d9f674e

Browse files
committed
feat: try different ownership paradigm
1 parent c74d126 commit d9f674e

File tree

2 files changed

+76
-86
lines changed

2 files changed

+76
-86
lines changed

benches/bench.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -91,11 +91,11 @@ fn bench_concurrent_spsc(c: &mut Criterion) {
9191
messages,
9292
|b, &messages| {
9393
b.iter(|| {
94-
let (mut producer, mut consumer) = FastQueue::<u64>::new(1024);
94+
let (mut producer, mut consumer) = FastQueue::<usize>::new(1024);
9595

9696
let producer_handle = thread::spawn(move || {
9797
for i in 0..messages {
98-
while black_box(producer.push(black_box(i as u64))).is_err() {
98+
while black_box(producer.push(black_box(i as usize))).is_err() {
9999
std::hint::spin_loop();
100100
}
101101
}
@@ -179,7 +179,7 @@ fn bench_burst_operations(c: &mut Criterion) {
179179

180180
group.bench_function("burst_push_pop_16", |b| {
181181
b.iter(|| {
182-
let (mut producer, mut consumer) = FastQueue::<u64>::new(32);
182+
let (mut producer, mut consumer) = FastQueue::<usize>::new(32);
183183

184184
// Burst push 16 items
185185
for i in 0..16 {
@@ -195,7 +195,7 @@ fn bench_burst_operations(c: &mut Criterion) {
195195

196196
group.bench_function("alternating_push_pop", |b| {
197197
b.iter(|| {
198-
let (mut producer, mut consumer) = FastQueue::<u64>::new(16);
198+
let (mut producer, mut consumer) = FastQueue::<usize>::new(16);
199199

200200
for i in 0..1000 {
201201
black_box(producer.push(black_box(i))).unwrap();
@@ -213,7 +213,7 @@ fn bench_wraparound(c: &mut Criterion) {
213213
group.bench_function("wraparound_operations", |b| {
214214
b.iter_batched(
215215
|| {
216-
let (mut producer, mut consumer) = FastQueue::<u64>::new(64);
216+
let (mut producer, mut consumer) = FastQueue::<usize>::new(64);
217217
// Fill queue to near capacity
218218
for i in 0..60 {
219219
producer.push(i).unwrap();
@@ -254,7 +254,7 @@ fn bench_capacity_scaling(c: &mut Criterion) {
254254
|b, &capacity| {
255255
b.iter(|| {
256256
let (_producer, _consumer) =
257-
black_box(FastQueue::<u64>::new(black_box(capacity)));
257+
black_box(FastQueue::<usize>::new(black_box(capacity)));
258258
});
259259
},
260260
);
@@ -269,7 +269,7 @@ fn bench_peek_operations(c: &mut Criterion) {
269269
group.bench_function("peek_vs_pop", |b| {
270270
b.iter_batched(
271271
|| {
272-
let (mut producer, consumer) = FastQueue::<u64>::new(1024);
272+
let (mut producer, consumer) = FastQueue::<usize>::new(1024);
273273
for i in 0..500 {
274274
producer.push(i).unwrap();
275275
}
@@ -296,7 +296,7 @@ fn bench_len_operations(c: &mut Criterion) {
296296
group.bench_function("len_check_overhead", |b| {
297297
b.iter_batched(
298298
|| {
299-
let (mut producer, consumer) = FastQueue::<u64>::new(1024);
299+
let (mut producer, consumer) = FastQueue::<usize>::new(1024);
300300
for i in 0..500 {
301301
producer.push(i).unwrap();
302302
}

src/lib.rs

Lines changed: 68 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -53,27 +53,13 @@ pub struct FastQueue<T> {
5353
/// Buffer storing elements directly
5454
buffer: CachePadded<*mut MaybeUninit<T>>,
5555

56-
/// Producer cache line with head and cached tail together
57-
producer: CachePadded<ProducerCache>,
56+
/// Written by producer, read by consumer.
57+
head: CachePadded<AtomicUsize>,
5858

59-
/// Consumer cache line with tail and cached head together
60-
consumer: CachePadded<ConsumerCache>,
59+
/// Written by consumer, read by producer.
60+
tail: CachePadded<AtomicUsize>,
6161

62-
_phantom: PhantomData<T>,
63-
}
64-
65-
struct ProducerCache {
66-
/// Write position
67-
head: AtomicUsize,
68-
/// Cached read position to avoid loading tail
69-
cached_tail: UnsafeCell<usize>,
70-
}
71-
72-
struct ConsumerCache {
73-
/// Read position
74-
tail: AtomicUsize,
75-
/// Cached write position to avoid loading head
76-
cached_head: UnsafeCell<usize>,
62+
_pd: PhantomData<T>,
7763
}
7864

7965
unsafe impl<T: Send> Send for FastQueue<T> {}
@@ -108,22 +94,24 @@ impl<T> FastQueue<T> {
10894
mask: CachePadded(mask),
10995
capacity: CachePadded(capacity),
11096
buffer: CachePadded(buffer),
111-
producer: CachePadded(ProducerCache {
112-
head: AtomicUsize::new(0),
113-
cached_tail: UnsafeCell::new(0),
114-
}),
115-
consumer: CachePadded(ConsumerCache {
116-
tail: AtomicUsize::new(0),
117-
cached_head: UnsafeCell::new(0),
118-
}),
119-
_phantom: PhantomData,
97+
head: CachePadded(AtomicUsize::new(0)),
98+
tail: CachePadded(AtomicUsize::new(0)),
99+
_pd: PhantomData,
120100
});
121101

122102
let producer = Producer {
123-
queue: Arc::clone(&queue),
103+
queue: CachePadded(Arc::clone(&queue)),
104+
head: CachePadded(UnsafeCell::new(0)),
105+
cached_tail: CachePadded(UnsafeCell::new(0)),
106+
_pd: PhantomData,
124107
};
125108

126-
let consumer = Consumer { queue };
109+
let consumer = Consumer {
110+
queue: CachePadded(queue),
111+
tail: CachePadded(UnsafeCell::new(0)),
112+
cached_head: CachePadded(UnsafeCell::new(0)),
113+
_pd: PhantomData,
114+
};
127115

128116
(producer, consumer)
129117
}
@@ -132,8 +120,8 @@ impl<T> FastQueue<T> {
132120
impl<T> Drop for FastQueue<T> {
133121
/// Drops all elements in the queue. This will only drop the elements, not the queue itself.
134122
fn drop(&mut self) {
135-
let head = self.producer.0.head.load(Ordering::Relaxed);
136-
let mut tail = self.consumer.0.tail.load(Ordering::Relaxed);
123+
let head = self.head.0.load(Ordering::Relaxed);
124+
let mut tail = self.tail.0.load(Ordering::Relaxed);
137125

138126
while tail != head {
139127
unsafe {
@@ -153,7 +141,10 @@ impl<T> Drop for FastQueue<T> {
153141

154142
/// A producer for the `FastQueue`. This is used to push values into the queue.
155143
pub struct Producer<T> {
156-
queue: Arc<FastQueue<T>>,
144+
queue: CachePadded<Arc<FastQueue<T>>>,
145+
head: CachePadded<UnsafeCell<usize>>,
146+
cached_tail: CachePadded<UnsafeCell<usize>>,
147+
_pd: PhantomData<T>,
157148
}
158149

159150
unsafe impl<T: Send> Send for Producer<T> {}
@@ -170,7 +161,7 @@ impl<T> Producer<T> {
170161
/// ```
171162
#[inline(always)]
172163
pub fn push(&mut self, value: T) -> Result<(), T> {
173-
let head = self.queue.producer.0.head.load(Ordering::Relaxed);
164+
let head = unsafe { *self.head.0.get() };
174165
let next_head = head.wrapping_add(1);
175166

176167
#[cfg(any(
@@ -183,39 +174,40 @@ impl<T> Producer<T> {
183174
prefetch_write(next_slot as *const u8);
184175
}
185176

186-
let cached_tail = unsafe { *self.queue.producer.0.cached_tail.get() };
177+
let cached_tail = unsafe { *self.cached_tail.0.get() };
187178

188-
if next_head.wrapping_sub(cached_tail) > self.queue.capacity.0 {
179+
if next_head.wrapping_sub(cached_tail) > self.queue.0.capacity.0 {
189180
// Reload actual tail (slow path)
190-
let tail = self.queue.consumer.0.tail.load(Ordering::Acquire);
191-
unsafe {
192-
*self.queue.producer.0.cached_tail.get() = tail;
181+
let tail = self.queue.0.tail.0.load(Ordering::Acquire);
182+
183+
if tail != cached_tail {
184+
// Update cached tail
185+
unsafe {
186+
*self.cached_tail.0.get() = tail;
187+
}
193188
}
194189

195190
// Check again with fresh tail
196-
if next_head.wrapping_sub(tail) > self.queue.capacity.0 {
191+
if next_head.wrapping_sub(tail) > self.queue.0.capacity.0 {
197192
return Err(value);
198193
}
199194
}
200195

201196
unsafe {
202-
let index = head & self.queue.mask.0;
203-
let slot = self.queue.buffer.0.add(index);
197+
let index = head & self.queue.0.mask.0;
198+
let slot = self.queue.0.buffer.0.add(index);
204199
(*slot).as_mut_ptr().write(value);
200+
*self.head.0.get() = next_head;
205201
}
206202

207-
self.queue
208-
.producer
209-
.0
210-
.head
211-
.store(next_head, Ordering::Release);
203+
self.queue.0.head.0.store(next_head, Ordering::Release);
212204

213205
Ok(())
214206
}
215207

216208
/// Returns the current number of elements in the queue (may be stale)
217209
///
218-
/// This function will return stale data when holding a lock on the queue.
210+
/// This function may return stale data when holding a lock on the queue.
219211
/// # Example
220212
/// ```
221213
/// use fq::FastQueue;
@@ -225,8 +217,8 @@ impl<T> Producer<T> {
225217
/// ```
226218
#[inline(always)]
227219
pub fn len(&self) -> usize {
228-
let head = self.queue.producer.0.head.load(Ordering::Relaxed);
229-
let tail = self.queue.consumer.0.tail.load(Ordering::Relaxed);
220+
let head = unsafe { *self.head.0.get() };
221+
let tail = self.queue.0.tail.0.load(Ordering::Relaxed);
230222
head.wrapping_sub(tail)
231223
}
232224

@@ -256,12 +248,15 @@ impl<T> Producer<T> {
256248
/// ```
257249
#[inline(always)]
258250
pub fn is_full(&self) -> bool {
259-
self.len() >= self.queue.capacity.0
251+
self.len() >= self.queue.0.capacity.0
260252
}
261253
}
262254

263255
pub struct Consumer<T> {
264-
queue: Arc<FastQueue<T>>,
256+
queue: CachePadded<Arc<FastQueue<T>>>,
257+
tail: CachePadded<UnsafeCell<usize>>,
258+
cached_head: CachePadded<UnsafeCell<usize>>,
259+
_pd: PhantomData<T>,
265260
}
266261

267262
unsafe impl<T: Send> Send for Consumer<T> {}
@@ -279,26 +274,31 @@ impl<T> Consumer<T> {
279274
/// ```
280275
#[inline(always)]
281276
pub fn pop(&mut self) -> Option<T> {
282-
let tail = self.queue.consumer.0.tail.load(Ordering::Relaxed);
277+
let tail = unsafe { *self.tail.0.get() };
283278

284279
#[cfg(any(
285280
target_arch = "x86",
286281
all(target_arch = "x86_64", target_feature = "sse")
287282
))]
288283
unsafe {
289-
let next_index = (tail + 1) & self.queue.mask.0;
284+
let next_tail = tail.wrapping_add(1);
285+
let next_index = next_tail & self.queue.mask.0;
290286
let next_slot = self.queue.buffer.0.add(next_index);
291287
prefetch_read(next_slot as *const u8);
292288
}
293289

294290
// Check cached head first (fast path)
295-
let cached_head = unsafe { *self.queue.consumer.0.cached_head.get() };
291+
let cached_head = unsafe { *self.cached_head.0.get() };
296292

297293
if tail == cached_head {
298294
// Reload actual head (slow path)
299-
let head = self.queue.producer.0.head.load(Ordering::Acquire);
300-
unsafe {
301-
*self.queue.consumer.0.cached_head.get() = head;
295+
let head = self.queue.0.head.0.load(Ordering::Acquire);
296+
297+
if head != cached_head {
298+
// Update cached head
299+
unsafe {
300+
*self.cached_head.0.get() = head;
301+
}
302302
}
303303

304304
// Check if still empty
@@ -308,17 +308,14 @@ impl<T> Consumer<T> {
308308
}
309309

310310
let value = unsafe {
311-
let index = tail & self.queue.mask.0;
312-
let slot = self.queue.buffer.0.add(index);
311+
let index = tail & self.queue.0.mask.0;
312+
let slot = self.queue.0.buffer.0.add(index);
313313
(*slot).as_ptr().read()
314314
};
315315

316316
let next_tail = tail.wrapping_add(1);
317-
self.queue
318-
.consumer
319-
.0
320-
.tail
321-
.store(next_tail, Ordering::Release);
317+
unsafe { *self.tail.0.get() = next_tail };
318+
self.queue.0.tail.0.store(next_tail, Ordering::Release);
322319

323320
Some(value)
324321
}
@@ -334,23 +331,16 @@ impl<T> Consumer<T> {
334331
/// ```
335332
#[inline(always)]
336333
pub fn peek(&self) -> Option<&T> {
337-
let tail = self.queue.consumer.0.tail.load(Ordering::Relaxed);
338-
let head = self.queue.producer.0.head.load(Ordering::Acquire);
334+
let tail = unsafe { *self.tail.0.get() };
335+
let head = self.queue.0.head.0.load(Ordering::Relaxed);
339336

340337
if tail == head {
341338
return None;
342339
}
343340

344341
unsafe {
345-
let index = tail & self.queue.mask.0;
346-
let slot = self.queue.buffer.0.add(index);
347-
#[cfg(any(
348-
target_arch = "x86",
349-
all(target_arch = "x86_64", target_feature = "sse")
350-
))]
351-
{
352-
prefetch_read(slot as *const u8);
353-
}
342+
let index = tail & self.queue.0.mask.0;
343+
let slot = self.queue.0.buffer.0.add(index);
354344
Some(&*(*slot).as_ptr())
355345
}
356346
}
@@ -367,8 +357,8 @@ impl<T> Consumer<T> {
367357
/// ```
368358
#[inline(always)]
369359
pub fn len(&self) -> usize {
370-
let head = self.queue.producer.0.head.load(Ordering::Relaxed);
371-
let tail = self.queue.consumer.0.tail.load(Ordering::Relaxed);
360+
let head = self.queue.0.head.0.load(Ordering::Relaxed);
361+
let tail = unsafe { *self.tail.0.get() };
372362
head.wrapping_sub(tail)
373363
}
374364

0 commit comments

Comments
 (0)