Skip to content

Commit 8817c7f

Browse files
authored
Merge pull request #7 from soltanoff/feat/spscringbuffer
feat: implement Single-Producer-Single-Consumer Ring Buffer
2 parents 53c2728 + 4646a0b commit 8817c7f

7 files changed

Lines changed: 391 additions & 5 deletions

File tree

.github/dependabot.yml

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
version: 2
2+
3+
4+
updates:
5+
- package-ecosystem: cargo
6+
directory: "/"
7+
schedule:
8+
interval: daily
9+
time: "02:00"
10+
open-pull-requests-limit: 10
11+
- package-ecosystem: github-actions
12+
directory: "/"
13+
schedule:
14+
interval: daily
15+
time: "02:00"
16+
open-pull-requests-limit: 10

.github/workflows/ci.yml

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
name: CI
2+
on: pull_request
3+
4+
env:
5+
CARGO_TERM_COLOR: always
6+
7+
jobs:
8+
format:
9+
name: format
10+
runs-on: ubuntu-latest
11+
steps:
12+
- uses: actions/checkout@v4
13+
- uses: dtolnay/rust-toolchain@stable
14+
with:
15+
components: clippy
16+
- uses: Swatinem/rust-cache@v2
17+
- run: make format
18+
19+
check:
20+
name: check
21+
runs-on: ubuntu-latest
22+
steps:
23+
- uses: actions/checkout@v4
24+
- uses: dtolnay/rust-toolchain@stable
25+
- uses: Swatinem/rust-cache@v2
26+
- run: make check
27+
28+
base-test:
29+
name: base-test
30+
runs-on: ubuntu-latest
31+
steps:
32+
- uses: actions/checkout@v4
33+
- uses: dtolnay/rust-toolchain@stable
34+
- uses: Swatinem/rust-cache@v2
35+
- run: make base-test
36+
37+
loom:
38+
name: loom
39+
runs-on: ubuntu-latest
40+
steps:
41+
- uses: actions/checkout@v4
42+
- uses: dtolnay/rust-toolchain@stable
43+
- uses: Swatinem/rust-cache@v2
44+
- run: make loom
45+
46+
miri-spinlock:
47+
name: miri / SpinLock
48+
runs-on: ubuntu-latest
49+
timeout-minutes: 10
50+
steps:
51+
- uses: actions/checkout@v4
52+
- uses: dtolnay/rust-toolchain@nightly
53+
with:
54+
components: miri
55+
- uses: Swatinem/rust-cache@v2
56+
- run: make miri-spinlock
57+
58+
miri-spscringbuffer:
59+
name: miri / SPSCRingBuffer + SPSCRingBufferV2
60+
runs-on: ubuntu-latest
61+
timeout-minutes: 10
62+
steps:
63+
- uses: actions/checkout@v4
64+
- uses: dtolnay/rust-toolchain@nightly
65+
with:
66+
components: miri
67+
- uses: Swatinem/rust-cache@v2
68+
- run: make miri-spscringbuffer

Makefile

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
RUN_MIRI = MIRIFLAGS=-Zmiri-backtrace=full cargo +nightly miri test
2+
3+
14
pre-commit: format check test build
25

36
tools:
@@ -18,7 +21,19 @@ build:
1821
run:
1922
cargo run --release
2023

21-
test:
24+
base-test:
2225
cargo test --release
26+
27+
loom:
2328
RUST_BACKTRACE=full cargo test --features sanitizers --release
24-
MIRIFLAGS=-Zmiri-backtrace=full cargo +nightly miri test
29+
30+
miri-spinlock:
31+
$(RUN_MIRI) spinlock
32+
33+
miri-spscringbuffer: # v1 and v2
34+
$(RUN_MIRI) spscringbuffer
35+
36+
miri:
37+
$(RUN_MIRI)
38+
39+
test: base-test loom miri

src/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
mod spinlock;
22
mod spscringbuffer;
3+
mod spscringbufferv2;
34
mod sync;
45

56
fn main() {

src/spscringbuffer/mod.rs

Lines changed: 133 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,133 @@
1-
// TODO: ну ето, надо
1+
use crate::sync::{AtomicUsize, Ordering};
2+
3+
use std::cell::UnsafeCell;
4+
use std::ptr;
5+
6+
/// Single-Producer-Single-Consumer Ring Buffer
7+
#[allow(dead_code)]
8+
pub struct SPSCRingBuffer<T> {
9+
capacity: usize,
10+
buffer: UnsafeCell<Box<[T]>>,
11+
head: AtomicUsize,
12+
tail: AtomicUsize,
13+
}
14+
15+
unsafe impl<T: Send> Send for SPSCRingBuffer<T> {}
16+
unsafe impl<T: Send> Sync for SPSCRingBuffer<T> {}
17+
18+
#[allow(dead_code)]
19+
impl<T> SPSCRingBuffer<T>
20+
where
21+
T: Copy + Default,
22+
{
23+
pub fn new(capacity: usize) -> Self {
24+
let buffer = vec![T::default(); capacity].into_boxed_slice();
25+
Self {
26+
capacity,
27+
buffer: UnsafeCell::new(buffer),
28+
head: AtomicUsize::new(0),
29+
tail: AtomicUsize::new(0),
30+
}
31+
}
32+
33+
pub fn try_produce(&self, value: T) -> bool {
34+
let current_head = self.head.load(Ordering::Acquire);
35+
let current_tail = self.tail.load(Ordering::Relaxed);
36+
37+
if self.is_full(current_head, current_tail) {
38+
return false;
39+
}
40+
41+
unsafe {
42+
let slot_ptr = self.slot_ptr(current_tail);
43+
ptr::write(slot_ptr, value);
44+
}
45+
46+
self.tail.store(self.next(current_tail), Ordering::Release);
47+
48+
true
49+
}
50+
51+
pub fn try_consume(&self) -> Option<T> {
52+
let current_head = self.head.load(Ordering::Relaxed);
53+
let current_tail = self.tail.load(Ordering::Acquire);
54+
55+
if self.is_empty(current_head, current_tail) {
56+
return None;
57+
}
58+
59+
let value = unsafe {
60+
let slot_ptr = self.slot_ptr(current_head);
61+
ptr::read(slot_ptr)
62+
};
63+
64+
self.head.store(self.next(current_head), Ordering::Release);
65+
66+
Some(value)
67+
}
68+
69+
/// Возвращает сырой указатель `*mut T` на элемент буфера по заданному индексу.
70+
///
71+
/// Обходит создание промежуточных ссылок (`&` / `&mut`) на весь слайс,
72+
/// чтобы не нарушать правила Stacked Borrows при одновременном доступе
73+
/// из потока-производителя и потока-потребителя к разным элементам буфера.
74+
///
75+
/// `index` должен быть строго меньше `self.capacity`.
76+
fn slot_ptr(&self, index: usize) -> *mut T {
77+
unsafe {
78+
// self.buffer.get() -> *mut Box<[T]>
79+
// *self.buffer.get() -> Box<[T]> (place expression, без перемещения)
80+
// Сырой указатель на срез в куче, без промежуточной ссылки
81+
// &raw mut **self.buffer.get() -> *mut [T]
82+
let slice_ptr: *mut [T] = &raw mut **self.buffer.get();
83+
(slice_ptr as *mut T).add(index)
84+
}
85+
}
86+
87+
fn next(&self, slot: usize) -> usize {
88+
(slot + 1) % self.capacity
89+
}
90+
91+
fn is_full(&self, head: usize, tail: usize) -> bool {
92+
self.next(tail) == head
93+
}
94+
95+
fn is_empty(&self, head: usize, tail: usize) -> bool {
96+
tail == head
97+
}
98+
}
99+
100+
#[cfg(all(test, not(feature = "sanitizers")))]
101+
mod tests {
102+
use super::*;
103+
use std::sync::Arc;
104+
use std::thread;
105+
106+
#[test]
107+
fn test_concurrent_reads_and_writes() {
108+
let ring_buffer: Arc<SPSCRingBuffer<i32>> = Arc::new(SPSCRingBuffer::new(42));
109+
let producer_buffer = Arc::clone(&ring_buffer);
110+
let consumer_buffer = Arc::clone(&ring_buffer);
111+
112+
let values_count = 100500;
113+
114+
let producer_handle = thread::spawn(move || {
115+
for i in 0..values_count {
116+
while !producer_buffer.try_produce(i) {
117+
// Retry
118+
}
119+
}
120+
});
121+
122+
let consumer_handle = thread::spawn(move || {
123+
for _ in 0..values_count {
124+
while consumer_buffer.try_consume().is_none() {
125+
// Retry
126+
}
127+
}
128+
});
129+
130+
producer_handle.join().unwrap();
131+
consumer_handle.join().unwrap();
132+
}
133+
}

0 commit comments

Comments
 (0)