Skip to content

Commit f4603ab

Browse files
Mileriusclaude
andcommitted
perf(queue): add push_shared/pop_shared &self API, bench all Rust variants
- Add push_shared/pop_shared on RawRing taking &self instead of &mut self to avoid LLVM noalias interference in two-thread benchmarks - Raw bench now tests all 3 Rust queues: mantis-inline (push_shared), mantis-copy (push/pop &self), rtrb (push/pop Result) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 854ae89 commit f4603ab

2 files changed

Lines changed: 175 additions & 12 deletions

File tree

benchmarks/rust/src/raw_bench.rs

Lines changed: 144 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -86,10 +86,10 @@ fn run_raw(producer_core: usize, consumer_core: usize, total_ops: u64) -> u64 {
8686
// SpscRing uses CacheLine-colocated fields internally.
8787
let mut ring = SpscRing::<Msg, 1024>::new();
8888

89-
// Cast to usize for Send across threads.
90-
// SAFETY: ring lives on this stack frame and we join both threads before returning.
91-
// SPSC protocol guarantees disjoint access (producer: push, consumer: pop_into).
92-
let ring_addr = &mut ring as *mut SpscRing<Msg, 1024> as usize;
89+
// Use &self shared references — no &mut aliasing UB.
90+
// SAFETY: SPSC protocol guarantees disjoint access. Ring lives on stack
91+
// and we join both threads before returning.
92+
let ring_addr = &ring as *const SpscRing<Msg, 1024> as usize;
9393

9494
let consumer_ready = AtomicBool::new(false);
9595
let ready_addr = &consumer_ready as *const AtomicBool as usize;
@@ -104,12 +104,13 @@ fn run_raw(producer_core: usize, consumer_core: usize, total_ops: u64) -> u64 {
104104
let latency = unsafe { &*(latency_addr as *const AtomicU64) };
105105
ready.store(true, Ordering::Release);
106106

107-
let rb = unsafe { &mut *(ring_addr as *mut SpscRing<Msg, 1024>) };
107+
// SAFETY: SPSC consumer — only pops. &self avoids noalias interference.
108+
let rb = unsafe { &*(ring_addr as *const SpscRing<Msg, 1024>) };
108109
let mut msg = Msg::default();
109110
let mut sum: u64 = 0;
110111
let mut count: u64 = 0;
111112
while count < total_ops {
112-
if unsafe { rb.pop_into(&mut msg as *mut Msg) } {
113+
if unsafe { rb.pop_shared(&mut msg as *mut Msg) } {
113114
let now = rdtsc();
114115
sum += now - msg.timestamp;
115116
count += 1;
@@ -124,7 +125,8 @@ fn run_raw(producer_core: usize, consumer_core: usize, total_ops: u64) -> u64 {
124125
let ready = unsafe { &*(ready_addr as *const AtomicBool) };
125126
while !ready.load(Ordering::Acquire) {}
126127

127-
let rb = unsafe { &mut *(ring_addr as *mut SpscRing<Msg, 1024>) };
128+
// SAFETY: SPSC producer — only pushes. &self avoids noalias interference.
129+
let rb = unsafe { &*(ring_addr as *const SpscRing<Msg, 1024>) };
128130
for i in 0..total_ops {
129131
let msg = Msg {
130132
timestamp: rdtsc(),
@@ -136,7 +138,7 @@ fn run_raw(producer_core: usize, consumer_core: usize, total_ops: u64) -> u64 {
136138
quantity: ((i & 0xFF) + 1) as i64,
137139
order_id: i as i64,
138140
};
139-
while !rb.push(msg) {}
141+
while !unsafe { rb.push_shared(msg) } {}
140142
}
141143
});
142144

@@ -146,14 +148,135 @@ fn run_raw(producer_core: usize, consumer_core: usize, total_ops: u64) -> u64 {
146148
total_latency.load(Ordering::Acquire)
147149
}
148150

149-
/// Run multiple iterations and print cycles/op.
150-
pub fn run_raw_bench(producer_core: usize, consumer_core: usize, ops: u64, iterations: usize) {
151+
// ─── SpscRingCopy variant ────────────────────────────────────────────────────
152+
153+
fn run_raw_copy(producer_core: usize, consumer_core: usize, total_ops: u64) -> u64 {
154+
use mantis_queue::SpscRingCopy;
155+
156+
let ring = SpscRingCopy::<Msg, 1024>::new();
157+
let ring_addr = &ring as *const SpscRingCopy<Msg, 1024> as usize;
158+
159+
let consumer_ready = AtomicBool::new(false);
160+
let ready_addr = &consumer_ready as *const AtomicBool as usize;
161+
let total_latency = AtomicU64::new(0);
162+
let latency_addr = &total_latency as *const AtomicU64 as usize;
163+
164+
let consumer = thread::spawn(move || {
165+
pin(consumer_core);
166+
let ready = unsafe { &*(ready_addr as *const AtomicBool) };
167+
let latency = unsafe { &*(latency_addr as *const AtomicU64) };
168+
ready.store(true, Ordering::Release);
169+
170+
let rb = unsafe { &*(ring_addr as *const SpscRingCopy<Msg, 1024>) };
171+
let mut msg = Msg::default();
172+
let mut sum: u64 = 0;
173+
let mut count: u64 = 0;
174+
while count < total_ops {
175+
if rb.pop(&mut msg) {
176+
let now = rdtsc();
177+
sum += now - msg.timestamp;
178+
count += 1;
179+
}
180+
}
181+
latency.store(sum, Ordering::Release);
182+
});
183+
184+
let producer = thread::spawn(move || {
185+
pin(producer_core);
186+
let ready = unsafe { &*(ready_addr as *const AtomicBool) };
187+
while !ready.load(Ordering::Acquire) {}
188+
189+
let rb = unsafe { &*(ring_addr as *const SpscRingCopy<Msg, 1024>) };
190+
for i in 0..total_ops {
191+
let mut msg = Msg {
192+
timestamp: rdtsc(),
193+
sequence: i,
194+
symbol_id: (i & 0xFFF) as u32,
195+
side: (i & 1) as u16,
196+
_pad: 0,
197+
price: (i * 100 + 1) as i64,
198+
quantity: ((i & 0xFF) + 1) as i64,
199+
order_id: i as i64,
200+
};
201+
while !rb.push(&msg) {}
202+
}
203+
});
204+
205+
producer.join().unwrap();
206+
consumer.join().unwrap();
207+
total_latency.load(Ordering::Acquire)
208+
}
209+
210+
// ─── rtrb variant ────────────────────────────────────────────────────────────
211+
212+
fn run_raw_rtrb(producer_core: usize, consumer_core: usize, total_ops: u64) -> u64 {
213+
let (mut tx, mut rx) = rtrb::RingBuffer::<Msg>::new(1024);
214+
215+
let consumer_ready = AtomicBool::new(false);
216+
let ready_addr = &consumer_ready as *const AtomicBool as usize;
217+
let total_latency = AtomicU64::new(0);
218+
let latency_addr = &total_latency as *const AtomicU64 as usize;
219+
220+
let consumer = thread::spawn(move || {
221+
pin(consumer_core);
222+
let ready = unsafe { &*(ready_addr as *const AtomicBool) };
223+
let latency = unsafe { &*(latency_addr as *const AtomicU64) };
224+
ready.store(true, Ordering::Release);
225+
226+
let mut sum: u64 = 0;
227+
let mut count: u64 = 0;
228+
while count < total_ops {
229+
if let Ok(msg) = rx.pop() {
230+
let now = rdtsc();
231+
sum += now - msg.timestamp;
232+
count += 1;
233+
}
234+
}
235+
latency.store(sum, Ordering::Release);
236+
});
237+
238+
let producer = thread::spawn(move || {
239+
pin(producer_core);
240+
let ready = unsafe { &*(ready_addr as *const AtomicBool) };
241+
while !ready.load(Ordering::Acquire) {}
242+
243+
for i in 0..total_ops {
244+
let msg = Msg {
245+
timestamp: rdtsc(),
246+
sequence: i,
247+
symbol_id: (i & 0xFFF) as u32,
248+
side: (i & 1) as u16,
249+
_pad: 0,
250+
price: (i * 100 + 1) as i64,
251+
quantity: ((i & 0xFF) + 1) as i64,
252+
order_id: i as i64,
253+
};
254+
while tx.push(msg).is_err() {}
255+
}
256+
});
257+
258+
producer.join().unwrap();
259+
consumer.join().unwrap();
260+
total_latency.load(Ordering::Acquire)
261+
}
262+
263+
// ─── Runner ──────────────────────────────────────────────────────────────────
264+
265+
fn run_variant(
266+
name: &str,
267+
run_fn: fn(usize, usize, u64) -> u64,
268+
producer_core: usize,
269+
consumer_core: usize,
270+
ops: u64,
271+
iterations: usize,
272+
) {
273+
eprintln!("[{name}]");
151274
// Warmup
152-
let _ = run_raw(producer_core, consumer_core, ops);
275+
let _ = run_fn(producer_core, consumer_core, ops);
153276

154277
let mut best = u64::MAX;
155278
for i in 1..=iterations {
156-
let total_cycles = run_raw(producer_core, consumer_core, ops);
279+
let total_cycles = run_fn(producer_core, consumer_core, ops);
157280
let cycles_per_op = total_cycles as f64 / ops as f64;
158281
if total_cycles < best {
159282
best = total_cycles;
@@ -163,3 +286,12 @@ pub fn run_raw_bench(producer_core: usize, consumer_core: usize, ops: u64, itera
163286
let best_per_op = best as f64 / ops as f64;
164287
eprintln!(" BEST: {best_per_op:.1} cycles/op");
165288
}
289+
290+
/// Run all variants or a specific one.
291+
pub fn run_raw_bench(producer_core: usize, consumer_core: usize, ops: u64, iterations: usize) {
292+
run_variant("mantis-inline (push_shared/pop_shared)", run_raw, producer_core, consumer_core, ops, iterations);
293+
eprintln!();
294+
run_variant("mantis-copy (push/pop &self)", run_raw_copy, producer_core, consumer_core, ops, iterations);
295+
eprintln!();
296+
run_variant("rtrb (push/pop Result)", run_raw_rtrb, producer_core, consumer_core, ops, iterations);
297+
}

crates/queue/src/handle.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,37 @@ where
280280
unsafe { self.engine.pop(out) }
281281
}
282282

283+
/// Push via shared reference. Returns `true` on success, `false` if full.
284+
///
285+
/// # Safety
286+
///
287+
/// The caller must uphold the SPSC protocol: exactly one thread calls
288+
/// `push_shared` (the producer), and a different thread calls `pop_shared`
289+
/// (the consumer). No two threads may call the same method concurrently.
290+
#[expect(
291+
unsafe_code,
292+
reason = "SPSC shared-reference push for zero-overhead two-thread use"
293+
)]
294+
#[inline(always)]
295+
pub unsafe fn push_shared(&self, value: T) -> bool {
296+
self.engine.push(value)
297+
}
298+
299+
/// Pop via shared reference into `out`. Returns `true` on success, `false` if empty.
300+
///
301+
/// # Safety
302+
///
303+
/// Same SPSC contract as `push_shared`. Additionally, `out` must be
304+
/// a valid, writeable, properly aligned pointer to `T`.
305+
#[expect(
306+
unsafe_code,
307+
reason = "SPSC shared-reference pop for zero-overhead two-thread use"
308+
)]
309+
#[inline(always)]
310+
pub unsafe fn pop_shared(&self, out: *mut T) -> bool {
311+
unsafe { self.engine.pop(out) }
312+
}
313+
283314
/// Number of elements currently in the ring.
284315
#[must_use]
285316
pub fn len(&self) -> usize {

0 commit comments

Comments
 (0)