Skip to content

Commit e8b9f68

Browse files
authored
feat(io-uring): Introduce zero-copy reads to socket using splice (#559)
Limited by the linux kernel version of 5.10 in anolisos8.10 . Now we have to use the `splice` to make zero-copy possible
1 parent fa27d71 commit e8b9f68

File tree

12 files changed

+355
-11
lines changed

12 files changed

+355
-11
lines changed

riffle-server/src/client_configs.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,15 @@ use std::collections::HashMap;
88
pub static SENDFILE_ENABLED_OPTION: Lazy<ClientConfigOption<bool>> = Lazy::new(|| {
99
ClientConfigOption::key("spark.rss.riffle.urpcSendfileEnabled")
1010
.default_value(false)
11-
.with_description("This indicates whether the sendfile is enabled when urpc is activated")
11+
.with_description("This indicates whether the sendfile is enabled when urpc is activated without io-uring.")
12+
});
13+
14+
pub static SPLICE_ENABLED_OPTION: Lazy<ClientConfigOption<bool>> = Lazy::new(|| {
15+
ClientConfigOption::key("spark.rss.riffle.urpcSpliceEnabled")
16+
.default_value(false)
17+
.with_description(
18+
"This indicates whether the splice is enabled when urpc is activated with io-uring.",
19+
)
1220
});
1321

1422
pub static HDFS_CLIENT_EAGER_LOADING_ENABLED_OPTION: Lazy<ClientConfigOption<bool>> = Lazy::new(

riffle-server/src/config.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ pub struct LocalfileStoreConfig {
134134

135135
pub io_limiter: Option<IoLimiterConfig>,
136136

137-
// This is only for urpc
137+
// This is only for urpc without uring
138138
#[serde(default = "as_default_read_io_sendfile_enable")]
139139
pub read_io_sendfile_enable: bool,
140140

@@ -149,6 +149,9 @@ pub struct IoUringConfig {
149149
pub threads: usize,
150150
#[serde(default = "as_default_io_uring_io_depth")]
151151
pub io_depth: usize,
152+
153+
#[serde(default = "bool::default")]
154+
pub read_splice_enbaled: bool,
152155
}
153156

154157
fn as_default_io_uring_io_depth() -> usize {

riffle-server/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,3 +65,5 @@ pub mod mini_riffle;
6565
mod partition_stats;
6666
mod raw_io;
6767
mod system_libc;
68+
69+
pub mod raw_pipe;

riffle-server/src/main.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,8 @@ pub mod ddashmap;
111111

112112
pub mod partition_stats;
113113

114+
pub mod raw_pipe;
115+
114116
const MAX_MEMORY_ALLOCATION_SIZE_ENV_KEY: &str = "MAX_MEMORY_ALLOCATION_LIMIT_SIZE";
115117

116118
#[derive(Parser, Debug)]

riffle-server/src/raw_pipe.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
use std::fs::File;
2+
use std::os::fd::{AsRawFd, RawFd};
3+
4+
#[derive(Debug)]
5+
pub struct RawPipe {
6+
pub pipe_in_fd: File,
7+
pub pipe_out_fd: File,
8+
pub length: usize,
9+
}
10+
11+
impl RawPipe {
12+
pub fn from(pipe_in_fd: File, pipe_out_fd: File, length: usize) -> Self {
13+
Self {
14+
pipe_in_fd,
15+
pipe_out_fd,
16+
length,
17+
}
18+
}
19+
}

riffle-server/src/store/local/read_options.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,9 +65,12 @@ impl ReadRange {
6565
#[derive(Clone, Debug)]
6666
#[allow(non_camel_case_types)]
6767
pub enum IoMode {
68+
// only valid for read in linux
6869
SENDFILE,
6970
DIRECT_IO,
7071
BUFFER_IO,
72+
// only valid for read in linux io-uring engine
73+
SPLICE,
7174
}
7275

7376
impl Default for ReadOptions {
@@ -123,6 +126,11 @@ impl ReadOptions {
123126
self
124127
}
125128

129+
pub fn with_splice(mut self) -> Self {
130+
self.io_mode = IoMode::SPLICE;
131+
self
132+
}
133+
126134
pub fn with_task_id(mut self, task_id: i64) -> Self {
127135
self.task_id = task_id;
128136
self

riffle-server/src/store/local/sync_io.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ impl SyncLocalIO {
192192
DataBytes::Composed(composed) => {
193193
buf_writer.write_all(&composed.freeze())?;
194194
}
195-
DataBytes::RawIO(_) => todo!(),
195+
_ => todo!(),
196196
}
197197
buf_writer.flush()?;
198198

@@ -430,7 +430,7 @@ impl LocalIO for SyncLocalIO {
430430
options: ReadOptions,
431431
) -> anyhow::Result<DataBytes, WorkerError> {
432432
let data = match options.io_mode {
433-
IoMode::SENDFILE => {
433+
IoMode::SENDFILE | IoMode::SPLICE => {
434434
let (off, len) = match options.read_range {
435435
ReadRange::RANGE(off, len) => (off, len),
436436
_ => {

riffle-server/src/store/local/uring_io.rs

Lines changed: 157 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,9 @@
1313
// limitations under the License.
1414

1515
use crate::error::WorkerError;
16+
use crate::raw_pipe::RawPipe;
1617
use crate::store::local::options::{CreateOptions, WriteOptions};
17-
use crate::store::local::read_options::{ReadOptions, ReadRange};
18+
use crate::store::local::read_options::{IoMode, ReadOptions, ReadRange};
1819
use crate::store::local::sync_io::SyncLocalIO;
1920
use crate::store::local::{FileStat, LocalIO};
2021
use crate::store::DataBytes;
@@ -24,10 +25,10 @@ use clap::builder::Str;
2425
use core_affinity::CoreId;
2526
use io_uring::types::Fd;
2627
use io_uring::{opcode, squeue, IoUring};
27-
use libc::iovec;
28+
use libc::{fcntl, iovec, F_SETPIPE_SZ};
2829
use std::fs::OpenOptions;
2930
use std::io::{Bytes, IoSlice};
30-
use std::os::fd::AsRawFd;
31+
use std::os::fd::{AsRawFd, FromRawFd};
3132
use std::path::Path;
3233
use std::thread::sleep;
3334
use std::time::Duration;
@@ -224,6 +225,8 @@ impl UringIoEngineBuilder {
224225
enum UringIoType {
225226
Read,
226227
WriteV,
228+
// data zero-copy to socket for read
229+
Splice,
227230
}
228231

229232
#[cfg(any(target_family = "unix", target_family = "wasm"))]
@@ -251,6 +254,13 @@ struct UringIoCtx {
251254
w_iovecs: Vec<iovec>,
252255

253256
r_bufs: Vec<RawBuf>,
257+
258+
splice_pipe: Option<SplicePipe>,
259+
}
260+
261+
struct SplicePipe {
262+
sin: i32,
263+
len: usize,
254264
}
255265

256266
unsafe impl Send for UringIoCtx {}
@@ -329,6 +339,22 @@ impl UringIoEngineShard {
329339
.build()
330340
.into()
331341
}
342+
UringIoType::Splice => {
343+
// refer: https://github.com/tokio-rs/io-uring/blob/7ec7ae909f7eabcf03450e6b858919449f135ac3/io-uring-test/src/tests/fs.rs#L1084
344+
self.read_inflight += 1;
345+
let pipe = ctx.splice_pipe.as_ref().unwrap();
346+
let pipe_in = pipe.sin;
347+
let len: u32 = pipe.len as u32;
348+
opcode::Splice::new(
349+
fd,
350+
ctx.addr.offset as i64,
351+
Fd(pipe_in),
352+
-1, // pipe/socket destination must use -1
353+
len,
354+
)
355+
.build()
356+
.into()
357+
}
332358
};
333359
let data = Box::into_raw(ctx) as u64;
334360
let sqe = sqe.user_data(data);
@@ -346,6 +372,7 @@ impl UringIoEngineShard {
346372
match ctx.io_type {
347373
UringIoType::Read => self.read_inflight -= 1,
348374
UringIoType::WriteV => self.write_inflight -= 1,
375+
UringIoType::Splice => self.read_inflight -= 1,
349376
}
350377

351378
let res = cqe.result();
@@ -408,6 +435,7 @@ impl LocalIO for UringIo {
408435
w_bufs: bufs,
409436
w_iovecs: Vec::with_capacity(buf_len),
410437
r_bufs: vec![],
438+
splice_pipe: None,
411439
};
412440
ctx.w_iovecs = ctx
413441
.w_bufs
@@ -456,6 +484,61 @@ impl LocalIO for UringIo {
456484
let file = OpenOptions::new().read(true).open(path)?;
457485
let raw_fd = file.as_raw_fd();
458486

487+
// make the size as the optional config option in io-uring
488+
if matches!(options.io_mode, IoMode::SPLICE) && length < 16 * 1024 * 1024 {
489+
// init the pipe
490+
let (pipe_in, mut pipe_out) = {
491+
let mut pipes = [0, 0];
492+
let ret = unsafe { libc::pipe(pipes.as_mut_ptr()) };
493+
if (ret != 0) {
494+
return Err(WorkerError::Other(anyhow!(
495+
"Failed to create pipe for splice: {}",
496+
std::io::Error::last_os_error()
497+
)));
498+
}
499+
let pipe_out = unsafe { std::fs::File::from_raw_fd(pipes[0]) };
500+
let pipe_in = unsafe { fs::File::from_raw_fd(pipes[1]) };
501+
(pipe_in, pipe_out)
502+
};
503+
504+
use libc::fcntl;
505+
use libc::F_SETPIPE_SZ;
506+
unsafe {
507+
let pipe_size = 16 * 1024 * 1024;
508+
fcntl(pipe_in.as_raw_fd(), F_SETPIPE_SZ, pipe_size);
509+
}
510+
511+
let ctx = UringIoCtx {
512+
tx,
513+
io_type: UringIoType::Splice,
514+
addr: RawFileAddress {
515+
file: RawFile(raw_fd),
516+
offset,
517+
},
518+
w_bufs: vec![],
519+
w_iovecs: vec![],
520+
r_bufs: vec![],
521+
splice_pipe: Some(SplicePipe {
522+
sin: pipe_in.as_raw_fd(),
523+
len: length as _,
524+
}),
525+
};
526+
527+
let _ = shard.send(ctx);
528+
let _result = match rx.await {
529+
Ok(res) => res,
530+
Err(e) => {
531+
return Err(WorkerError::Other(anyhow::Error::from(e)));
532+
}
533+
}?;
534+
535+
return Ok(DataBytes::RawPipe(RawPipe::from(
536+
pipe_in,
537+
pipe_out,
538+
length as usize,
539+
)));
540+
}
541+
459542
// init buf with BytesMut for io_uring to write into
460543
let mut buf = BytesMut::zeroed(length as _);
461544

@@ -472,6 +555,7 @@ impl LocalIO for UringIo {
472555
ptr: buf.as_mut_ptr(),
473556
len: length as usize,
474557
}],
558+
splice_pipe: None,
475559
};
476560

477561
let _ = shard.send(ctx);
@@ -495,14 +579,15 @@ impl LocalIO for UringIo {
495579
}
496580

497581
#[cfg(test)]
498-
mod tests {
499-
use crate::composed_bytes::ComposedBytes;
582+
pub mod tests {
500583
use crate::runtime::manager::create_runtime;
501584
use crate::runtime::RuntimeRef;
502585
use crate::store::local::read_options::IoMode;
503586
use crate::store::local::sync_io::SyncLocalIO;
504587
use crate::store::local::uring_io::UringIoEngineBuilder;
505588
use crate::store::local::LocalIO;
589+
use crate::store::DataBytes;
590+
use crate::{composed_bytes::ComposedBytes, raw_pipe::RawPipe};
506591
use bytes::{BufMut, BytesMut};
507592
use log::info;
508593

@@ -596,4 +681,71 @@ mod tests {
596681

597682
Ok(())
598683
}
684+
685+
pub fn read_with_splice(content: String) -> anyhow::Result<DataBytes> {
686+
// create the data and then read with splice
687+
use crate::store::DataBytes;
688+
use bytes::Bytes;
689+
use std::io::Read;
690+
use tempdir::TempDir;
691+
let temp_dir = TempDir::new("test_read_splice")?;
692+
let temp_path = temp_dir.path().to_str().unwrap().to_string();
693+
println!("init local file path: {}", temp_path);
694+
let r_runtime = create_runtime(1, "r");
695+
let w_runtime = create_runtime(2, "w");
696+
let sync_io_engine =
697+
SyncLocalIO::new(&r_runtime, &w_runtime, temp_path.as_str(), None, None);
698+
let uring_io_engine = UringIoEngineBuilder::new().build(sync_io_engine)?;
699+
// 1. write
700+
println!("writing...");
701+
let write_options = crate::store::local::options::WriteOptions {
702+
append: true,
703+
offset: Some(0),
704+
data: DataBytes::Direct(Bytes::from(content)),
705+
};
706+
w_runtime.block_on(async {
707+
uring_io_engine
708+
.write("test_file_splice", write_options)
709+
.await
710+
.unwrap();
711+
});
712+
// 2. read with splice
713+
println!("reading with splice...");
714+
let read_options = crate::store::local::read_options::ReadOptions {
715+
io_mode: IoMode::SPLICE,
716+
task_id: 0,
717+
read_range: crate::store::local::read_options::ReadRange::ALL,
718+
ahead_options: None,
719+
};
720+
let result = r_runtime.block_on(async {
721+
uring_io_engine
722+
.read("test_file_splice", read_options)
723+
.await
724+
.unwrap()
725+
});
726+
Ok(result)
727+
}
728+
729+
#[test]
730+
fn test_read_with_splice() -> anyhow::Result<()> {
731+
use std::io::Read;
732+
733+
// case1: check the raw-pipe read
734+
let write_data = "helloworld!!!!!!";
735+
println!("validating with direct read...");
736+
let result = read_with_splice(write_data.to_owned())?;
737+
match result {
738+
DataBytes::RawPipe(raw_pipe) => {
739+
assert!(raw_pipe.length == write_data.len());
740+
let mut read_buf = vec![0u8; raw_pipe.length];
741+
let mut fd = raw_pipe.pipe_out_fd;
742+
fd.read_exact(&mut read_buf)?;
743+
// to compare
744+
assert_eq!(read_buf.as_slice(), write_data.as_bytes());
745+
}
746+
_ => panic!("Expected raw pipe bytes"),
747+
}
748+
749+
Ok(())
750+
}
599751
}

0 commit comments

Comments
 (0)