Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion riffle-server/src/client_configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,15 @@ use std::collections::HashMap;
pub static SENDFILE_ENABLED_OPTION: Lazy<ClientConfigOption<bool>> = Lazy::new(|| {
ClientConfigOption::key("spark.rss.riffle.urpcSendfileEnabled")
.default_value(false)
.with_description("This indicates whether the sendfile is enabled when urpc is activated")
.with_description("This indicates whether the sendfile is enabled when urpc is activated without io-uring.")
});

pub static SPLICE_ENABLED_OPTION: Lazy<ClientConfigOption<bool>> = Lazy::new(|| {
ClientConfigOption::key("spark.rss.riffle.urpcSpliceEnabled")
.default_value(false)
.with_description(
"This indicates whether the splice is enabled when urpc is activated with io-uring.",
)
});

pub static HDFS_CLIENT_EAGER_LOADING_ENABLED_OPTION: Lazy<ClientConfigOption<bool>> = Lazy::new(
Expand Down
5 changes: 4 additions & 1 deletion riffle-server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ pub struct LocalfileStoreConfig {

pub io_limiter: Option<IoLimiterConfig>,

// This is only for urpc
// This is only for urpc without uring
#[serde(default = "as_default_read_io_sendfile_enable")]
pub read_io_sendfile_enable: bool,

Expand All @@ -149,6 +149,9 @@ pub struct IoUringConfig {
pub threads: usize,
#[serde(default = "as_default_io_uring_io_depth")]
pub io_depth: usize,

#[serde(default = "bool::default")]
pub read_splice_enbaled: bool,
}

fn as_default_io_uring_io_depth() -> usize {
Expand Down
2 changes: 2 additions & 0 deletions riffle-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,5 @@ pub mod mini_riffle;
mod partition_stats;
mod raw_io;
mod system_libc;

pub mod raw_pipe;
2 changes: 2 additions & 0 deletions riffle-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ pub mod ddashmap;

pub mod partition_stats;

pub mod raw_pipe;

const MAX_MEMORY_ALLOCATION_SIZE_ENV_KEY: &str = "MAX_MEMORY_ALLOCATION_LIMIT_SIZE";

#[derive(Parser, Debug)]
Expand Down
19 changes: 19 additions & 0 deletions riffle-server/src/raw_pipe.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
use std::fs::File;
use std::os::fd::{AsRawFd, RawFd};

#[derive(Debug)]
pub struct RawPipe {
pub pipe_in_fd: File,
pub pipe_out_fd: File,
pub length: usize,
}

impl RawPipe {
pub fn from(pipe_in_fd: File, pipe_out_fd: File, length: usize) -> Self {
Self {
pipe_in_fd,
pipe_out_fd,
length,
}
}
}
8 changes: 8 additions & 0 deletions riffle-server/src/store/local/read_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,12 @@ impl ReadRange {
#[derive(Clone, Debug)]
#[allow(non_camel_case_types)]
pub enum IoMode {
// only valid for read in linux
SENDFILE,
DIRECT_IO,
BUFFER_IO,
// only valid for read in linux io-uring engine
SPLICE,
}

impl Default for ReadOptions {
Expand Down Expand Up @@ -123,6 +126,11 @@ impl ReadOptions {
self
}

pub fn with_splice(mut self) -> Self {
self.io_mode = IoMode::SPLICE;
self
}

pub fn with_task_id(mut self, task_id: i64) -> Self {
self.task_id = task_id;
self
Expand Down
4 changes: 2 additions & 2 deletions riffle-server/src/store/local/sync_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ impl SyncLocalIO {
DataBytes::Composed(composed) => {
buf_writer.write_all(&composed.freeze())?;
}
DataBytes::RawIO(_) => todo!(),
_ => todo!(),
}
buf_writer.flush()?;

Expand Down Expand Up @@ -430,7 +430,7 @@ impl LocalIO for SyncLocalIO {
options: ReadOptions,
) -> anyhow::Result<DataBytes, WorkerError> {
let data = match options.io_mode {
IoMode::SENDFILE => {
IoMode::SENDFILE | IoMode::SPLICE => {
let (off, len) = match options.read_range {
ReadRange::RANGE(off, len) => (off, len),
_ => {
Expand Down
162 changes: 157 additions & 5 deletions riffle-server/src/store/local/uring_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@
// limitations under the License.

use crate::error::WorkerError;
use crate::raw_pipe::RawPipe;
use crate::store::local::options::{CreateOptions, WriteOptions};
use crate::store::local::read_options::{ReadOptions, ReadRange};
use crate::store::local::read_options::{IoMode, ReadOptions, ReadRange};
use crate::store::local::sync_io::SyncLocalIO;
use crate::store::local::{FileStat, LocalIO};
use crate::store::DataBytes;
Expand All @@ -24,10 +25,10 @@ use clap::builder::Str;
use core_affinity::CoreId;
use io_uring::types::Fd;
use io_uring::{opcode, squeue, IoUring};
use libc::iovec;
use libc::{fcntl, iovec, F_SETPIPE_SZ};
use std::fs::OpenOptions;
use std::io::{Bytes, IoSlice};
use std::os::fd::AsRawFd;
use std::os::fd::{AsRawFd, FromRawFd};
use std::path::Path;
use std::thread::sleep;
use std::time::Duration;
Expand Down Expand Up @@ -224,6 +225,8 @@ impl UringIoEngineBuilder {
enum UringIoType {
Read,
WriteV,
// data zero-copy to socket for read
Splice,
}

#[cfg(any(target_family = "unix", target_family = "wasm"))]
Expand Down Expand Up @@ -251,6 +254,13 @@ struct UringIoCtx {
w_iovecs: Vec<iovec>,

r_bufs: Vec<RawBuf>,

splice_pipe: Option<SplicePipe>,
}

struct SplicePipe {
sin: i32,
len: usize,
}

unsafe impl Send for UringIoCtx {}
Expand Down Expand Up @@ -329,6 +339,22 @@ impl UringIoEngineShard {
.build()
.into()
}
UringIoType::Splice => {
// refer: https://github.com/tokio-rs/io-uring/blob/7ec7ae909f7eabcf03450e6b858919449f135ac3/io-uring-test/src/tests/fs.rs#L1084
self.read_inflight += 1;
let pipe = ctx.splice_pipe.as_ref().unwrap();
let pipe_in = pipe.sin;
let len: u32 = pipe.len as u32;
opcode::Splice::new(
fd,
ctx.addr.offset as i64,
Fd(pipe_in),
-1, // pipe/socket destination must use -1
len,
)
.build()
.into()
}
};
let data = Box::into_raw(ctx) as u64;
let sqe = sqe.user_data(data);
Expand All @@ -346,6 +372,7 @@ impl UringIoEngineShard {
match ctx.io_type {
UringIoType::Read => self.read_inflight -= 1,
UringIoType::WriteV => self.write_inflight -= 1,
UringIoType::Splice => self.read_inflight -= 1,
}

let res = cqe.result();
Expand Down Expand Up @@ -408,6 +435,7 @@ impl LocalIO for UringIo {
w_bufs: bufs,
w_iovecs: Vec::with_capacity(buf_len),
r_bufs: vec![],
splice_pipe: None,
};
ctx.w_iovecs = ctx
.w_bufs
Expand Down Expand Up @@ -456,6 +484,61 @@ impl LocalIO for UringIo {
let file = OpenOptions::new().read(true).open(path)?;
let raw_fd = file.as_raw_fd();

// make the size as the optional config option in io-uring
if matches!(options.io_mode, IoMode::SPLICE) && length < 16 * 1024 * 1024 {
// init the pipe
let (pipe_in, mut pipe_out) = {
let mut pipes = [0, 0];
let ret = unsafe { libc::pipe(pipes.as_mut_ptr()) };
if (ret != 0) {
return Err(WorkerError::Other(anyhow!(
"Failed to create pipe for splice: {}",
std::io::Error::last_os_error()
)));
}
let pipe_out = unsafe { std::fs::File::from_raw_fd(pipes[0]) };
let pipe_in = unsafe { fs::File::from_raw_fd(pipes[1]) };
(pipe_in, pipe_out)
};

use libc::fcntl;
use libc::F_SETPIPE_SZ;
unsafe {
let pipe_size = 16 * 1024 * 1024;
fcntl(pipe_in.as_raw_fd(), F_SETPIPE_SZ, pipe_size);
}

let ctx = UringIoCtx {
tx,
io_type: UringIoType::Splice,
addr: RawFileAddress {
file: RawFile(raw_fd),
offset,
},
w_bufs: vec![],
w_iovecs: vec![],
r_bufs: vec![],
splice_pipe: Some(SplicePipe {
sin: pipe_in.as_raw_fd(),
len: length as _,
}),
};

let _ = shard.send(ctx);
let _result = match rx.await {
Ok(res) => res,
Err(e) => {
return Err(WorkerError::Other(anyhow::Error::from(e)));
}
}?;

return Ok(DataBytes::RawPipe(RawPipe::from(
pipe_in,
pipe_out,
length as usize,
)));
}

// init buf with BytesMut for io_uring to write into
let mut buf = BytesMut::zeroed(length as _);

Expand All @@ -472,6 +555,7 @@ impl LocalIO for UringIo {
ptr: buf.as_mut_ptr(),
len: length as usize,
}],
splice_pipe: None,
};

let _ = shard.send(ctx);
Expand All @@ -495,14 +579,15 @@ impl LocalIO for UringIo {
}

#[cfg(test)]
mod tests {
use crate::composed_bytes::ComposedBytes;
pub mod tests {
use crate::runtime::manager::create_runtime;
use crate::runtime::RuntimeRef;
use crate::store::local::read_options::IoMode;
use crate::store::local::sync_io::SyncLocalIO;
use crate::store::local::uring_io::UringIoEngineBuilder;
use crate::store::local::LocalIO;
use crate::store::DataBytes;
use crate::{composed_bytes::ComposedBytes, raw_pipe::RawPipe};
use bytes::{BufMut, BytesMut};
use log::info;

Expand Down Expand Up @@ -596,4 +681,71 @@ mod tests {

Ok(())
}

pub fn read_with_splice(content: String) -> anyhow::Result<DataBytes> {
// create the data and then read with splice
use crate::store::DataBytes;
use bytes::Bytes;
use std::io::Read;
use tempdir::TempDir;
let temp_dir = TempDir::new("test_read_splice")?;
let temp_path = temp_dir.path().to_str().unwrap().to_string();
println!("init local file path: {}", temp_path);
let r_runtime = create_runtime(1, "r");
let w_runtime = create_runtime(2, "w");
let sync_io_engine =
SyncLocalIO::new(&r_runtime, &w_runtime, temp_path.as_str(), None, None);
let uring_io_engine = UringIoEngineBuilder::new().build(sync_io_engine)?;
// 1. write
println!("writing...");
let write_options = crate::store::local::options::WriteOptions {
append: true,
offset: Some(0),
data: DataBytes::Direct(Bytes::from(content)),
};
w_runtime.block_on(async {
uring_io_engine
.write("test_file_splice", write_options)
.await
.unwrap();
});
// 2. read with splice
println!("reading with splice...");
let read_options = crate::store::local::read_options::ReadOptions {
io_mode: IoMode::SPLICE,
task_id: 0,
read_range: crate::store::local::read_options::ReadRange::ALL,
ahead_options: None,
};
let result = r_runtime.block_on(async {
uring_io_engine
.read("test_file_splice", read_options)
.await
.unwrap()
});
Ok(result)
}

#[test]
fn test_read_with_splice() -> anyhow::Result<()> {
use std::io::Read;

// case1: check the raw-pipe read
let write_data = "helloworld!!!!!!";
println!("validating with direct read...");
let result = read_with_splice(write_data.to_owned())?;
match result {
DataBytes::RawPipe(raw_pipe) => {
assert!(raw_pipe.length == write_data.len());
let mut read_buf = vec![0u8; raw_pipe.length];
let mut fd = raw_pipe.pipe_out_fd;
fd.read_exact(&mut read_buf)?;
// to compare
assert_eq!(read_buf.as_slice(), write_data.as_bytes());
}
_ => panic!("Expected raw pipe bytes"),
}

Ok(())
}
}
Loading
Loading