Skip to content

Commit 6740b6c

Browse files
committed
feat: add prefetches for x86/x86_64
1 parent 8cac2cd commit 6740b6c

File tree

2 files changed

+50
-6
lines changed

2 files changed

+50
-6
lines changed

benches/bench.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -125,8 +125,8 @@ fn bench_concurrent_spsc_large_messages(c: &mut Criterion) {
125125
let mut group = c.benchmark_group("concurrent_spsc_large_messages");
126126

127127
struct LargeMessage {
128-
val1: u128,
129-
val2: String,
128+
_val1: u128,
129+
_val2: String,
130130
}
131131

132132
for messages in [1000, 10000, 100000].iter() {
@@ -143,8 +143,8 @@ fn bench_concurrent_spsc_large_messages(c: &mut Criterion) {
143143
for i in 0..messages {
144144
while producer
145145
.push(black_box(LargeMessage {
146-
val1: i as u128,
147-
val2: format!("Message {i}"),
146+
_val1: i as u128,
147+
_val2: format!("Message {i}"),
148148
}))
149149
.is_err()
150150
{

src/lib.rs

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,6 @@ unsafe impl<T: Send> Sync for FastQueue<T> {}
5959
impl<T> FastQueue<T> {
6060
/// Capacity will be rounded up to the next power of two
6161
pub fn new(capacity: usize) -> (Producer<T>, Consumer<T>) {
62-
assert!(capacity > 0, "Capacity must be greater than 0");
63-
6462
let capacity = capacity.next_power_of_two();
6563
let mask = capacity - 1;
6664

@@ -149,6 +147,12 @@ impl<T> Producer<T> {
149147
unsafe {
150148
let index = head & self.queue.mask;
151149
let slot = self.queue.buffer.add(index);
150+
#[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
151+
{
152+
let next_index = next_head & self.queue.mask;
153+
let next_slot = self.queue.buffer.add(next_index);
154+
prefetch_write(next_slot as *const u8);
155+
}
152156
(*slot).as_mut_ptr().write(value);
153157
}
154158

@@ -210,6 +214,12 @@ impl<T> Consumer<T> {
210214
let value = unsafe {
211215
let index = tail & self.queue.mask;
212216
let slot = self.queue.buffer.add(index);
217+
#[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
218+
{
219+
let next_index = (tail + 1) & self.queue.mask;
220+
let next_slot = self.queue.buffer.add(next_index);
221+
prefetch_read(next_slot as *const u8);
222+
}
213223
(*slot).as_ptr().read()
214224
};
215225

@@ -255,13 +265,47 @@ impl<T> Consumer<T> {
255265
}
256266
}
257267

268+
#[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
269+
#[inline(always)]
270+
fn prefetch_read(p: *const u8) {
271+
unsafe {
272+
#[cfg(target_arch = "x86")]
273+
use std::arch::x86::_mm_prefetch;
274+
#[cfg(target_arch = "x86_64")]
275+
use std::arch::x86_64::_mm_prefetch;
276+
277+
const _MM_HINT_T0: i32 = 3; // Prefetch to all cache levels as read
278+
_mm_prefetch(p as *const i8, _MM_HINT_T0);
279+
}
280+
}
281+
282+
#[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
283+
#[inline(always)]
284+
fn prefetch_write(p: *const u8) {
285+
unsafe {
286+
#[cfg(target_arch = "x86")]
287+
use std::arch::x86::_mm_prefetch;
288+
#[cfg(target_arch = "x86_64")]
289+
use std::arch::x86_64::_mm_prefetch;
290+
291+
const _MM_HINT_ET0: i32 = 7; // Prefetch to all cache levels as write
292+
_mm_prefetch(p as *const i8, _MM_HINT_ET0);
293+
}
294+
}
295+
258296
impl<T> Iterator for Consumer<T> {
259297
type Item = T;
260298

261299
#[inline(always)]
262300
fn next(&mut self) -> Option<Self::Item> {
263301
self.pop()
264302
}
303+
304+
#[inline(always)]
305+
fn size_hint(&self) -> (usize, Option<usize>) {
306+
let len = self.len();
307+
(len, Some(len))
308+
}
265309
}
266310

267311
#[cfg(test)]

0 commit comments

Comments
 (0)