Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions runtime/src/iouring/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,30 @@ impl Handle {
offset,
written: 0,
write: bufs.into(),
sync: false,
result: None,
sender: tx,
}))
.await
.map_err(|_| Error::WriteFailed)?;
rx.await.map_err(|_| Error::WriteFailed)?
}

/// Submit a logical positioned write with per-write sync and wait for its completion.
#[cfg_attr(not(feature = "iouring-storage"), allow(dead_code))]
pub async fn write_at_sync(
&self,
file: Arc<File>,
offset: u64,
bufs: IoBufs,
) -> Result<(), Error> {
let (tx, rx) = oneshot::channel();
self.enqueue(Request::WriteAt(WriteAtRequest {
file,
offset,
written: 0,
write: bufs.into(),
sync: true,
result: None,
sender: tx,
}))
Expand Down
49 changes: 47 additions & 2 deletions runtime/src/iouring/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -522,17 +522,29 @@ pub(super) struct WriteAtRequest {
pub(super) written: usize,
/// Write cursor and buffers that still need to be written.
pub(super) write: WriteBuffers,
/// Whether the write should be durably persisted before completion.
pub(super) sync: bool,
/// Terminal result captured by `on_cqe` and delivered by `finish`.
pub(super) result: Option<Result<(), Error>>,
/// Completion channel for the top-level caller.
pub(super) sender: oneshot::Sender<Result<(), Error>>,
}

impl WriteAtRequest {
/// Return the flags for this write request, setting `RWF_SYNC` when `sync` is set.
const fn rw_flags(&self) -> i32 {
if self.sync {
libc::RWF_SYNC
} else {
0
}
}

/// Build the next positioned write SQE for the remaining bytes.
fn build_sqe(&mut self) -> SqueueEntry {
let fd = Fd(self.file.as_raw_fd());
let offset = self.offset + self.written as u64;
let rw_flags = self.rw_flags();
match &mut self.write {
WriteBuffers::Single { buf } => {
let ptr = buf.as_ptr();
Expand All @@ -545,6 +557,7 @@ impl WriteAtRequest {
.expect("single-buffer SQE length exceeds u32"),
)
.offset(offset)
.rw_flags(rw_flags)
.build()
}
WriteBuffers::Vectored { bufs, iovecs } => {
Expand All @@ -563,6 +576,7 @@ impl WriteAtRequest {

opcode::Writev::new(fd, iovecs.as_ptr(), iovecs_len)
.offset(offset)
.rw_flags(rw_flags)
.build()
}
}
Expand Down Expand Up @@ -1170,14 +1184,17 @@ mod tests {

// Retryable CQEs should requeue the positioned write.
let (tx, _rx) = oneshot::channel();
let mut request = Request::WriteAt(WriteAtRequest {
let write = WriteAtRequest {
file: make_file_fd(),
offset: 0,
written: 0,
write: IoBufs::from(IoBuf::from(b"hello")).into(),
sync: false,
result: None,
sender: tx,
});
};
assert_eq!(write.rw_flags(), 0);
let mut request = Request::WriteAt(write);
assert!(!request.on_cqe(WaiterState::Active { target_tick: None }, -libc::EAGAIN));

// Single-buffer writes should track partial progress until complete.
Expand All @@ -1187,6 +1204,7 @@ mod tests {
offset: 0,
written: 0,
write: IoBufs::from(IoBuf::from(b"hello")).into(),
sync: false,
result: None,
sender: tx,
});
Expand All @@ -1207,6 +1225,7 @@ mod tests {
offset: 0,
written: 0,
write: vectored.into(),
sync: false,
result: None,
sender: tx,
});
Expand All @@ -1224,6 +1243,7 @@ mod tests {
offset: 0,
written: 0,
write: IoBufs::from(IoBuf::from(b"hello")).into(),
sync: false,
result: None,
sender: tx,
});
Expand All @@ -1240,6 +1260,7 @@ mod tests {
offset: 0,
written: 0,
write: IoBufs::from(IoBuf::from(b"hello")).into(),
sync: false,
result: None,
sender: tx,
});
Expand All @@ -1250,13 +1271,35 @@ mod tests {
Err(Error::WriteFailed)
));

// Synchronous writes use the same logical error surface as regular
// writes, `sync` only changes the SQE flags.
let (tx, rx) = oneshot::channel();
let write = WriteAtRequest {
file: make_file_fd(),
offset: 0,
written: 0,
write: IoBufs::from(IoBuf::from(b"hello")).into(),
sync: true,
result: None,
sender: tx,
};
assert_eq!(write.rw_flags(), libc::RWF_SYNC);
let mut request = Request::WriteAt(write);
assert!(request.on_cqe(WaiterState::Active { target_tick: None }, -libc::EINVAL));
request.complete();
assert!(matches!(
block_on(rx).expect("missing sync write failure"),
Err(Error::WriteFailed)
));

// Timeout cancellation should also surface as a write failure.
let (tx, rx) = oneshot::channel();
let mut request = Request::WriteAt(WriteAtRequest {
file: make_file_fd(),
offset: 0,
written: 0,
write: IoBufs::from(IoBuf::from(b"hello")).into(),
sync: false,
result: None,
sender: tx,
});
Expand Down Expand Up @@ -1408,6 +1451,7 @@ mod tests {
offset: 0,
written: 0,
write: IoBufs::from(IoBuf::from(b"hello")).into(),
sync: false,
result: None,
sender: tx,
});
Expand Down Expand Up @@ -1492,6 +1536,7 @@ mod tests {
offset: 0,
written: 0,
write: IoBufs::from(IoBuf::from(b"hello")).into(),
sync: false,
result: None,
sender: tx,
});
Expand Down
12 changes: 12 additions & 0 deletions runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -773,6 +773,18 @@ stability_scope!(BETA {
bufs: impl Into<IoBufs> + Send,
) -> impl Future<Output = Result<(), Error>> + Send;

/// Write `bufs` to the blob at the given offset and durably persist that write.
///
/// This is not a durability barrier for previous operations. When it completes,
/// only the bytes submitted to this call are guaranteed durable. Earlier unsynced
/// [`Blob::write_at`] or [`Blob::resize`] calls require [`Blob::sync`] to become
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this true? does sync on the blob here also sync other pending writes?

Copy link
Copy Markdown
Member Author

@andresilva andresilva May 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is the only thing that this API guarantees. On Linux where we actually implement this with RWF_SYNC the kernel only guarantees the range written by that specific syscall is durable. On non-Linux where we fallback to write+sync then we'll sync all pending writes, but that's incidental and users shouldn't lean on it. For example, even on non-Linux, if you issue a write_at_sync([]) we early return since there's nothing to write, which avoids syncing previous writes.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good to know...I would've definitely messed this up.

/// durable.
fn write_at_sync(
&self,
offset: u64,
bufs: impl Into<IoBufs> + Send,
) -> impl Future<Output = Result<(), Error>> + Send;

/// Resize the blob to the given length.
///
/// If the length is greater than the current length, the blob is extended with zeros.
Expand Down
59 changes: 53 additions & 6 deletions runtime/src/storage/audited.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,21 @@ impl<B: crate::Blob> crate::Blob for Blob<B> {
self.inner.write_at(offset, bufs).await
}

async fn write_at_sync(
&self,
offset: u64,
bufs: impl Into<IoBufs> + Send,
) -> Result<(), Error> {
let bufs = bufs.into();
self.auditor.event(b"write_at_sync", |hasher| {
hasher.update(self.partition.as_bytes());
hasher.update(&self.name);
hasher.update(&offset.to_be_bytes());
bufs.for_each_chunk(|chunk| hasher.update(chunk));
});
self.inner.write_at_sync(offset, bufs).await
}

async fn resize(&self, len: u64) -> Result<(), Error> {
self.auditor.event(b"resize", |hasher| {
hasher.update(self.partition.as_bytes());
Expand Down Expand Up @@ -269,7 +284,8 @@ mod tests {

#[derive(Clone)]
struct RecordingBlob {
chunk_counts: Arc<Mutex<Vec<usize>>>,
write_chunk_counts: Arc<Mutex<Vec<usize>>>,
sync_write_chunk_counts: Arc<Mutex<Vec<usize>>>,
}

impl crate::Blob for RecordingBlob {
Expand All @@ -291,7 +307,20 @@ mod tests {
_offset: u64,
bufs: impl Into<IoBufs> + Send,
) -> Result<(), Error> {
self.chunk_counts.lock().push(bufs.into().chunk_count());
self.write_chunk_counts
.lock()
.push(bufs.into().chunk_count());
Ok(())
}

async fn write_at_sync(
&self,
_offset: u64,
bufs: impl Into<IoBufs> + Send,
) -> Result<(), Error> {
self.sync_write_chunk_counts
.lock()
.push(bufs.into().chunk_count());
Ok(())
}

Expand All @@ -305,14 +334,16 @@ mod tests {
}

#[tokio::test]
async fn test_audited_blob_write_preserves_chunking() {
let chunk_counts = Arc::new(Mutex::new(Vec::new()));
async fn test_audited_blob_writes_preserve_chunking() {
let write_chunk_counts = Arc::new(Mutex::new(Vec::new()));
let sync_write_chunk_counts = Arc::new(Mutex::new(Vec::new()));
let blob = super::Blob {
auditor: Arc::new(crate::deterministic::Auditor::default()),
partition: "partition".into(),
name: b"blob".to_vec(),
inner: RecordingBlob {
chunk_counts: chunk_counts.clone(),
write_chunk_counts: write_chunk_counts.clone(),
sync_write_chunk_counts: sync_write_chunk_counts.clone(),
},
};

Expand All @@ -328,6 +359,22 @@ mod tests {
.await
.unwrap();

assert_eq!(*chunk_counts.lock(), vec![4]);
assert_eq!(*write_chunk_counts.lock(), vec![4]);
assert!(sync_write_chunk_counts.lock().is_empty());

blob.write_at_sync(
0,
IoBufs::from(vec![
IoBuf::from(b"a".to_vec()),
IoBuf::from(b"b".to_vec()),
IoBuf::from(b"c".to_vec()),
IoBuf::from(b"d".to_vec()),
]),
)
.await
.unwrap();

assert_eq!(*write_chunk_counts.lock(), vec![4]);
assert_eq!(*sync_write_chunk_counts.lock(), vec![4]);
}
}
Loading
Loading