Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions riffle-server/src/store/local/delegator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::store::local::io_layer_throttle::{ThrottleLayer, ThroughputBasedRateL
use crate::store::local::io_layer_timeout::TimeoutLayer;
use crate::store::local::layers::{Handler, OperatorBuilder};
use crate::store::local::options::WriteOptions;
use crate::store::local::read_options::ReadOptions;
use crate::store::local::read_options::{ReadOptions, ReadRange};
use crate::store::local::sync_io::SyncLocalIO;
#[cfg(feature = "io-uring")]
use crate::store::local::uring_io::UringIoEngineBuilder;
Expand Down Expand Up @@ -311,7 +311,7 @@ impl LocalDiskDelegator {
let write_time = timer.elapsed().as_millis();

let timer = Instant::now();
let options = ReadOptions::default().with_read_all();
let options = ReadOptions::default().with_read_range(ReadRange::RANGE(0, 1024 * 1024 * 10));
let read_data = self.read(&detection_file, options).await?;
let read_data = read_data.freeze();
let read_time = timer.elapsed().as_millis();
Expand Down
64 changes: 50 additions & 14 deletions riffle-server/src/store/local/uring_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@ use clap::builder::Str;
use core_affinity::CoreId;
use io_uring::types::Fd;
use io_uring::{opcode, squeue, IoUring};
use libc::iovec;
use std::fs::OpenOptions;
use std::io::{Bytes, IoSlice};
use std::os::fd::AsRawFd;
use std::path::Path;
use std::thread::sleep;
use std::time::Duration;
use std::{
fmt::Debug,
fs,
Expand Down Expand Up @@ -235,13 +238,19 @@ unsafe impl Send for RawBuf {}
unsafe impl Sync for RawBuf {}

struct UringIoCtx {
tx: oneshot::Sender<anyhow::Result<(), WorkerError>>,
tx: oneshot::Sender<anyhow::Result<usize, WorkerError>>,
io_type: UringIoType,
addr: RawFileAddress,

w_bufs: Vec<bytes::Bytes>,
w_iovecs: Vec<iovec>,

r_bufs: Vec<RawBuf>,
}

unsafe impl Send for UringIoCtx {}
unsafe impl Sync for UringIoCtx {}

struct UringIoEngineShard {
read_rx: mpsc::Receiver<UringIoCtx>,
write_rx: mpsc::Receiver<UringIoCtx>,
Expand Down Expand Up @@ -285,7 +294,11 @@ impl UringIoEngineShard {

let ctx = match ctx {
Some(ctx) => ctx,
None => break 'prepare,
None => {
// todo: we must sleep to avoid cpu busy, but maybe recv is better rather than try_recv
sleep(Duration::from_millis(5));
break 'prepare;
}
};

let mut ctx = Box::new(ctx);
Expand All @@ -302,15 +315,14 @@ impl UringIoEngineShard {
UringIoType::WriteV => {
self.write_inflight += 1;
// https://github.com/tokio-rs/io-uring/blob/master/io-uring-test/src/utils.rs#L95
let slices = ctx
.w_bufs
.iter()
.map(|x| IoSlice::new(x.as_ref()))
.collect::<Vec<_>>();
opcode::Writev::new(fd, slices.as_ptr().cast(), slices.len() as _)
.offset(ctx.addr.offset)
.build()
.flags(squeue::Flags::IO_LINK)
opcode::Writev::new(
fd,
ctx.w_iovecs.as_ptr().cast(),
ctx.w_iovecs.len() as _,
)
.offset(ctx.addr.offset)
.build()
.into()
}
};
let data = Box::into_raw(ctx) as u64;
Expand All @@ -335,7 +347,7 @@ impl UringIoEngineShard {
if res < 0 {
let _ = ctx.tx.send(Err(WorkerError::RAW_IO_ERR(res)));
} else {
let _ = ctx.tx.send(Ok(()));
let _ = ctx.tx.send(Ok(res as usize));
}
}
}
Expand Down Expand Up @@ -368,28 +380,51 @@ impl LocalIO for UringIo {
let (tx, rx) = oneshot::channel();
let tag = options.data.len();
let shard = &self.write_txs[tag % self.write_txs.len()];
let byte_size = options.data.len();
let bufs = options.data.always_bytes();
let buf_len = bufs.len();
let slices = bufs
.iter()
.map(|x| IoSlice::new(x.as_ref()))
.collect::<Vec<_>>();

let path = self.with_root(path);
let path = Path::new(&path);
let mut file = OpenOptions::new().append(true).create(true).open(path)?;
let raw_fd = file.as_raw_fd();

let ctx = UringIoCtx {
let mut ctx = UringIoCtx {
tx,
io_type: UringIoType::WriteV,
addr: RawFileAddress {
file: RawFile(raw_fd),
offset: options.offset.unwrap_or(0),
},
w_bufs: bufs,
w_iovecs: Vec::with_capacity(buf_len),
r_bufs: vec![],
};
ctx.w_iovecs = ctx
.w_bufs
.iter()
.map(|b| iovec {
iov_base: b.as_ptr() as *mut _,
iov_len: b.len(),
})
.collect();

let _ = shard.send(ctx);
let res = match rx.await {
let written_bytes = match rx.await {
Ok(res) => res,
Err(e) => Err(WorkerError::Other(anyhow::Error::from(e))),
}?;
if (byte_size != written_bytes) {
return Err(WorkerError::Other(anyhow!(
"Unexpected io write. expected/written: {}/{}",
byte_size,
written_bytes
)));
}
Ok(())
}

Expand Down Expand Up @@ -427,6 +462,7 @@ impl LocalIO for UringIo {
offset,
},
w_bufs: vec![],
w_iovecs: vec![],
r_bufs: vec![RawBuf {
ptr: buf.as_mut_ptr(),
len: length as usize,
Expand Down
Loading