Skip to content

Commit cb913a7

Browse files
authored
fix(io-uring): Data inconsistency due to hanging pointer of written bytes (#553)
1 parent 9488d97 commit cb913a7

File tree

2 files changed

+52
-16
lines changed

2 files changed

+52
-16
lines changed

riffle-server/src/store/local/delegator.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use crate::store::local::io_layer_throttle::{ThrottleLayer, ThroughputBasedRateL
1919
use crate::store::local::io_layer_timeout::TimeoutLayer;
2020
use crate::store::local::layers::{Handler, OperatorBuilder};
2121
use crate::store::local::options::WriteOptions;
22-
use crate::store::local::read_options::ReadOptions;
22+
use crate::store::local::read_options::{ReadOptions, ReadRange};
2323
use crate::store::local::sync_io::SyncLocalIO;
2424
#[cfg(feature = "io-uring")]
2525
use crate::store::local::uring_io::UringIoEngineBuilder;
@@ -313,7 +313,7 @@ impl LocalDiskDelegator {
313313
let write_time = timer.elapsed().as_millis();
314314

315315
let timer = Instant::now();
316-
let options = ReadOptions::default().with_read_all();
316+
let options = ReadOptions::default().with_read_range(ReadRange::RANGE(0, 1024 * 1024 * 10));
317317
let read_data = self.read(&detection_file, options).await?;
318318
let read_data = read_data.freeze();
319319
let read_time = timer.elapsed().as_millis();

riffle-server/src/store/local/uring_io.rs

Lines changed: 50 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,13 @@ use clap::builder::Str;
2424
use core_affinity::CoreId;
2525
use io_uring::types::Fd;
2626
use io_uring::{opcode, squeue, IoUring};
27+
use libc::iovec;
2728
use std::fs::OpenOptions;
2829
use std::io::{Bytes, IoSlice};
2930
use std::os::fd::AsRawFd;
3031
use std::path::Path;
32+
use std::thread::sleep;
33+
use std::time::Duration;
3134
use std::{
3235
fmt::Debug,
3336
fs,
@@ -240,13 +243,19 @@ unsafe impl Send for RawBuf {}
240243
unsafe impl Sync for RawBuf {}
241244

242245
struct UringIoCtx {
243-
tx: oneshot::Sender<anyhow::Result<(), WorkerError>>,
246+
tx: oneshot::Sender<anyhow::Result<usize, WorkerError>>,
244247
io_type: UringIoType,
245248
addr: RawFileAddress,
249+
246250
w_bufs: Vec<bytes::Bytes>,
251+
w_iovecs: Vec<iovec>,
252+
247253
r_bufs: Vec<RawBuf>,
248254
}
249255

256+
unsafe impl Send for UringIoCtx {}
257+
unsafe impl Sync for UringIoCtx {}
258+
250259
struct UringIoEngineShard {
251260
read_rx: mpsc::Receiver<UringIoCtx>,
252261
write_rx: mpsc::Receiver<UringIoCtx>,
@@ -290,7 +299,11 @@ impl UringIoEngineShard {
290299

291300
let ctx = match ctx {
292301
Some(ctx) => ctx,
293-
None => break 'prepare,
302+
None => {
303+
// todo: we must sleep to avoid cpu busy, but maybe recv is better rather than try_recv
304+
sleep(Duration::from_millis(5));
305+
break 'prepare;
306+
}
294307
};
295308

296309
let mut ctx = Box::new(ctx);
@@ -307,15 +320,14 @@ impl UringIoEngineShard {
307320
UringIoType::WriteV => {
308321
self.write_inflight += 1;
309322
// https://github.com/tokio-rs/io-uring/blob/master/io-uring-test/src/utils.rs#L95
310-
let slices = ctx
311-
.w_bufs
312-
.iter()
313-
.map(|x| IoSlice::new(x.as_ref()))
314-
.collect::<Vec<_>>();
315-
opcode::Writev::new(fd, slices.as_ptr().cast(), slices.len() as _)
316-
.offset(ctx.addr.offset)
317-
.build()
318-
.flags(squeue::Flags::IO_LINK)
323+
opcode::Writev::new(
324+
fd,
325+
ctx.w_iovecs.as_ptr().cast(),
326+
ctx.w_iovecs.len() as _,
327+
)
328+
.offset(ctx.addr.offset)
329+
.build()
330+
.into()
319331
}
320332
};
321333
let data = Box::into_raw(ctx) as u64;
@@ -340,7 +352,7 @@ impl UringIoEngineShard {
340352
if res < 0 {
341353
let _ = ctx.tx.send(Err(WorkerError::RAW_IO_ERR(res)));
342354
} else {
343-
let _ = ctx.tx.send(Ok(()));
355+
let _ = ctx.tx.send(Ok(res as usize));
344356
}
345357
}
346358
}
@@ -373,28 +385,51 @@ impl LocalIO for UringIo {
373385
let (tx, rx) = oneshot::channel();
374386
let tag = options.data.len();
375387
let shard = &self.write_txs[tag % self.write_txs.len()];
388+
let byte_size = options.data.len();
376389
let bufs = options.data.always_bytes();
390+
let buf_len = bufs.len();
391+
let slices = bufs
392+
.iter()
393+
.map(|x| IoSlice::new(x.as_ref()))
394+
.collect::<Vec<_>>();
377395

378396
let path = self.with_root(path);
379397
let path = Path::new(&path);
380398
let mut file = OpenOptions::new().append(true).create(true).open(path)?;
381399
let raw_fd = file.as_raw_fd();
382400

383-
let ctx = UringIoCtx {
401+
let mut ctx = UringIoCtx {
384402
tx,
385403
io_type: UringIoType::WriteV,
386404
addr: RawFileAddress {
387405
file: RawFile(raw_fd),
388406
offset: options.offset.unwrap_or(0),
389407
},
390408
w_bufs: bufs,
409+
w_iovecs: Vec::with_capacity(buf_len),
391410
r_bufs: vec![],
392411
};
412+
ctx.w_iovecs = ctx
413+
.w_bufs
414+
.iter()
415+
.map(|b| iovec {
416+
iov_base: b.as_ptr() as *mut _,
417+
iov_len: b.len(),
418+
})
419+
.collect();
420+
393421
let _ = shard.send(ctx);
394-
let res = match rx.await {
422+
let written_bytes = match rx.await {
395423
Ok(res) => res,
396424
Err(e) => Err(WorkerError::Other(anyhow::Error::from(e))),
397425
}?;
426+
if (byte_size != written_bytes) {
427+
return Err(WorkerError::Other(anyhow!(
428+
"Unexpected io write. expected/written: {}/{}",
429+
byte_size,
430+
written_bytes
431+
)));
432+
}
398433
Ok(())
399434
}
400435

@@ -432,6 +467,7 @@ impl LocalIO for UringIo {
432467
offset,
433468
},
434469
w_bufs: vec![],
470+
w_iovecs: vec![],
435471
r_bufs: vec![RawBuf {
436472
ptr: buf.as_mut_ptr(),
437473
len: length as usize,

0 commit comments

Comments
 (0)