Skip to content

Commit 3b3758d

Browse files
authored
feat(localfile): Add support of io_uring engine (experimental) (#549)
1 parent cab0d2a commit 3b3758d

File tree

13 files changed

+657
-3
lines changed

13 files changed

+657
-3
lines changed

Cargo.lock

Lines changed: 12 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ datafusion-postgres = { git = "https://github.com/zuston/datafusion-postgres.git
8989
sqlparser = "0.56.0"
9090
nix = { version = "0.29.0" }
9191
dirs = { version = "6.0.0" }
92+
io-uring = { version = "0.7.0" }
9293

9394
[profile.release]
9495
strip = true

dev/anolisos8/amd64/docker-compose.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,4 @@ services:
1212
- ./../../../target-docker:/R1/target:rw
1313
# environment:
1414
# RUSTFLAGS: "-C target-cpu=skylake"
15-
command: "bash -c 'source ~/.bashrc && cd /R1 && cargo build --features hdrs,logforth,memory-prof --release'"
15+
command: "bash -c 'source ~/.bashrc && cd /R1 && cargo build --features hdrs,logforth,memory-prof,io-uring --release'"

riffle-server/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ logforth = ["dep:logforth"]
5959

6060
deadlock-detection = ["parking_lot/deadlock_detection"]
6161

62+
io-uring = ["dep:io-uring"]
63+
6264
[dependencies]
6365
anyhow = { workspace = true }
6466
tokio = { workspace = true, features = ["full"] }
@@ -134,6 +136,7 @@ tikv-jemalloc-sys= { workspace = true, features = ["stats", "profiling", "unpref
134136
tikv-jemallocator= { workspace = true, features = ["profiling", "unprefixed_malloc_on_supported_platforms"], optional = true }
135137
jemalloc_pprof= { workspace = true, features = ["symbolize", "flamegraph"], optional = true }
136138
nix= { workspace = true, optional = true }
139+
io-uring = { workspace = true, optional = true }
137140

138141
[build-dependencies]
139142
tonic-build = { workspace = true }

riffle-server/src/config.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,18 @@ pub struct LocalfileStoreConfig {
139139
pub read_io_sendfile_enable: bool,
140140

141141
pub read_ahead_options: Option<ReadAheadConfig>,
142+
143+
pub io_uring_options: Option<IoUringConfig>,
144+
}
145+
146+
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
147+
pub struct IoUringConfig {
148+
#[serde(default = "as_default_io_uring_threads")]
149+
pub threads: usize,
150+
}
151+
152+
fn as_default_io_uring_threads() -> usize {
153+
1
142154
}
143155

144156
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
@@ -253,6 +265,7 @@ impl LocalfileStoreConfig {
253265
io_limiter: None,
254266
read_io_sendfile_enable: as_default_read_io_sendfile_enable(),
255267
read_ahead_options: None,
268+
io_uring_options: None,
256269
}
257270
}
258271
}

riffle-server/src/error.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,9 @@ pub enum WorkerError {
111111

112112
#[error("HDFS client initialization failed")]
113113
HDFS_CLIENT_INIT_FAILED,
114+
115+
#[error("raw io error code: {0}")]
116+
RAW_IO_ERR(i32),
114117
}
115118

116119
impl From<AcquireError> for WorkerError {

riffle-server/src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
#![allow(dead_code, unused)]
1919
#![feature(impl_trait_in_assoc_type)]
2020
extern crate core;
21-
2221
pub mod app_manager;
2322
pub mod await_tree;
2423
pub mod client_configs;

riffle-server/src/runtime/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ mod tests {
215215
use std::thread;
216216
use std::time::Duration;
217217

218-
fn create_runtime(pool_size: usize, name: &str) -> Arc<Runtime> {
218+
pub fn create_runtime(pool_size: usize, name: &str) -> Arc<Runtime> {
219219
let runtime = Builder::default()
220220
.worker_threads(pool_size as usize)
221221
.thread_name(name)

riffle-server/src/store/local/delegator.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ use crate::store::local::layers::{Handler, OperatorBuilder};
2121
use crate::store::local::options::WriteOptions;
2222
use crate::store::local::read_options::ReadOptions;
2323
use crate::store::local::sync_io::SyncLocalIO;
24+
#[cfg(feature = "io-uring")]
25+
use crate::store::local::uring_io::UringIoEngineBuilder;
2426
use crate::store::local::{DiskStat, FileStat, LocalDiskStorage, LocalIO};
2527
use crate::store::DataBytes;
2628
use crate::util;
@@ -95,7 +97,24 @@ impl LocalDiskDelegator {
9597
Some(read_capacity.as_u64() as usize),
9698
);
9799

100+
#[cfg(feature = "io-uring")]
101+
info!("Binary compiled with the io-uring feature enabled.");
102+
103+
#[cfg(feature = "io-uring")]
104+
let mut operator_builder = if let Some(cfg) = &config.io_uring_options {
105+
info!("io-uring engine is activated!");
106+
OperatorBuilder::new(Arc::new(Box::new(
107+
UringIoEngineBuilder::new()
108+
.build(underlying_io_handler)
109+
.unwrap(),
110+
)))
111+
} else {
112+
OperatorBuilder::new(Arc::new(Box::new(underlying_io_handler)))
113+
};
114+
115+
#[cfg(not(feature = "io-uring"))]
98116
let mut operator_builder = OperatorBuilder::new(Arc::new(Box::new(underlying_io_handler)));
117+
99118
if let Some(conf) = config.io_limiter.as_ref() {
100119
operator_builder = operator_builder.layer(ThrottleLayer::new(
101120
&runtime_manager.localfile_write_runtime,

riffle-server/src/store/local/mod.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@ pub mod options;
3636
pub mod read_options;
3737
pub mod sync_io;
3838

39+
#[cfg(feature = "io-uring")]
40+
pub mod uring_io;
41+
3942
pub struct FileStat {
4043
pub content_length: u64,
4144
}
@@ -79,3 +82,34 @@ impl Default for LocalfileStoreStat {
7982
Self { stats: vec![] }
8083
}
8184
}
85+
86+
#[cfg(test)]
87+
mod tests {
88+
use bytes::BytesMut;
89+
90+
#[test]
91+
fn test_bytes_as_mut_ptr() {
92+
struct RawBuf<T> {
93+
ptr: T,
94+
len: usize,
95+
}
96+
97+
// 1. use bytesMut to write
98+
let mut bytes = BytesMut::zeroed(10);
99+
let ptr = bytes.as_mut_ptr();
100+
let raw_buf = RawBuf { ptr, len: 10 };
101+
unsafe {
102+
for i in 0..bytes.len() {
103+
*raw_buf.ptr.add(i) = 1;
104+
}
105+
}
106+
for &b in bytes.iter() {
107+
assert_eq!(b, 1);
108+
}
109+
110+
// 2. use bytes to read
111+
let mut bytes = bytes.freeze();
112+
let prt = bytes.as_ptr();
113+
let raw_buf = RawBuf { ptr, len: 10 };
114+
}
115+
}

0 commit comments

Comments
 (0)