Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
version: 2


updates:
- package-ecosystem: cargo
directory: "/"
schedule:
interval: daily
time: "02:00"
open-pull-requests-limit: 10
- package-ecosystem: github-actions
directory: "/"
schedule:
interval: daily
time: "02:00"
open-pull-requests-limit: 10
68 changes: 68 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
name: CI
on: pull_request

env:
CARGO_TERM_COLOR: always

jobs:
format:
name: format
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: dtolnay/rust-toolchain@stable
with:
components: clippy
- uses: Swatinem/rust-cache@v2
- run: make format

check:
name: check
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: dtolnay/rust-toolchain@stable
- uses: Swatinem/rust-cache@v2
- run: make check

base-test:
name: base-test
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: dtolnay/rust-toolchain@stable
- uses: Swatinem/rust-cache@v2
- run: make base-test

loom:
name: loom
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: dtolnay/rust-toolchain@stable
- uses: Swatinem/rust-cache@v2
- run: make loom

miri-spinlock:
name: miri / SpinLock
runs-on: ubuntu-latest
timeout-minutes: 10
steps:
- uses: actions/checkout@v4
- uses: dtolnay/rust-toolchain@nightly
with:
components: miri
- uses: Swatinem/rust-cache@v2
- run: make miri-spinlock

miri-spscringbuffer:
name: miri / SPSCRingBuffer + SPSCRingBufferV2
runs-on: ubuntu-latest
timeout-minutes: 10
steps:
- uses: actions/checkout@v4
- uses: dtolnay/rust-toolchain@nightly
with:
components: miri
- uses: Swatinem/rust-cache@v2
- run: make miri-spscringbuffer
19 changes: 17 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
RUN_MIRI = MIRIFLAGS=-Zmiri-backtrace=full cargo +nightly miri test


pre-commit: format check test build

tools:
Expand All @@ -18,7 +21,19 @@ build:
run:
cargo run --release

test:
base-test:
cargo test --release

loom:
RUST_BACKTRACE=full cargo test --features sanitizers --release
MIRIFLAGS=-Zmiri-backtrace=full cargo +nightly miri test

miri-spinlock:
$(RUN_MIRI) spinlock

miri-spscringbuffer: # v1 and v2
$(RUN_MIRI) spscringbuffer

miri:
$(RUN_MIRI)

test: base-test loom miri
1 change: 1 addition & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod spinlock;
mod spscringbuffer;
mod spscringbufferv2;
mod sync;

fn main() {
Expand Down
134 changes: 133 additions & 1 deletion src/spscringbuffer/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,133 @@
// TODO: ну ето, надо
use crate::sync::{AtomicUsize, Ordering};

use std::cell::UnsafeCell;
use std::ptr;

/// Single-Producer-Single-Consumer Ring Buffer
#[allow(dead_code)]
pub struct SPSCRingBuffer<T> {
capacity: usize,
buffer: UnsafeCell<Box<[T]>>,
head: AtomicUsize,
tail: AtomicUsize,
}

unsafe impl<T: Send> Send for SPSCRingBuffer<T> {}
unsafe impl<T: Send> Sync for SPSCRingBuffer<T> {}

#[allow(dead_code)]
impl<T> SPSCRingBuffer<T>
Comment thread
soltanoff marked this conversation as resolved.
where
T: Copy + Default,
{
pub fn new(capacity: usize) -> Self {
let buffer = vec![T::default(); capacity].into_boxed_slice();
Self {
capacity,
buffer: UnsafeCell::new(buffer),
head: AtomicUsize::new(0),
tail: AtomicUsize::new(0),
}
}

pub fn try_produce(&self, value: T) -> bool {
let current_head = self.head.load(Ordering::Acquire);
let current_tail = self.tail.load(Ordering::Relaxed);

if self.is_full(current_head, current_tail) {
return false;
}

unsafe {
let slot_ptr = self.slot_ptr(current_tail);
ptr::write(slot_ptr, value);
}

self.tail.store(self.next(current_tail), Ordering::Release);

true
}

pub fn try_consume(&self) -> Option<T> {
let current_head = self.head.load(Ordering::Relaxed);
let current_tail = self.tail.load(Ordering::Acquire);

if self.is_empty(current_head, current_tail) {
return None;
}

let value = unsafe {
let slot_ptr = self.slot_ptr(current_head);
ptr::read(slot_ptr)
};

self.head.store(self.next(current_head), Ordering::Release);

Some(value)
}

/// Возвращает сырой указатель `*mut T` на элемент буфера по заданному индексу.
///
/// Обходит создание промежуточных ссылок (`&` / `&mut`) на весь слайс,
/// чтобы не нарушать правила Stacked Borrows при одновременном доступе
/// из потока-производителя и потока-потребителя к разным элементам буфера.
///
/// `index` должен быть строго меньше `self.capacity`.
fn slot_ptr(&self, index: usize) -> *mut T {
unsafe {
// self.buffer.get() -> *mut Box<[T]>
// *self.buffer.get() -> Box<[T]> (place expression, без перемещения)
// Сырой указатель на срез в куче, без промежуточной ссылки
// &raw mut **self.buffer.get() -> *mut [T]
let slice_ptr: *mut [T] = &raw mut **self.buffer.get();
(slice_ptr as *mut T).add(index)
}
}

fn next(&self, slot: usize) -> usize {
(slot + 1) % self.capacity
}

fn is_full(&self, head: usize, tail: usize) -> bool {
self.next(tail) == head
}

fn is_empty(&self, head: usize, tail: usize) -> bool {
tail == head
}
}

#[cfg(all(test, not(feature = "sanitizers")))]
mod tests {
use super::*;
use std::sync::Arc;
use std::thread;

#[test]
fn test_concurrent_reads_and_writes() {
let ring_buffer: Arc<SPSCRingBuffer<i32>> = Arc::new(SPSCRingBuffer::new(42));
let producer_buffer = Arc::clone(&ring_buffer);
let consumer_buffer = Arc::clone(&ring_buffer);

let values_count = 100500;

let producer_handle = thread::spawn(move || {
for i in 0..values_count {
while !producer_buffer.try_produce(i) {
// Retry
}
}
});

let consumer_handle = thread::spawn(move || {
for _ in 0..values_count {
while consumer_buffer.try_consume().is_none() {
// Retry
}
}
});

producer_handle.join().unwrap();
consumer_handle.join().unwrap();
}
}
Loading