Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 44 additions & 20 deletions runtime/src/utils/buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,9 @@ mod tests {
/// Prefix/ranges of `data` that would survive a crash.
durable: Vec<u8>,

/// Number of write operations.
writes: usize,

/// Number of full sync barriers.
full_syncs: usize,

Expand All @@ -153,20 +156,29 @@ mod tests {
/// of `data` to `durable`. This lets tests assert that `Write::sync` uses range
/// sync only when no earlier unsynced mutation needs a full durability barrier.
#[derive(Clone)]
struct RangeSyncBlob {
pub struct SyncTrackingBlob {
state: Arc<Mutex<RangeSyncState>>,
}

impl RangeSyncBlob {
fn new() -> Self {
impl SyncTrackingBlob {
pub fn new() -> Self {
Self {
state: Arc::new(Mutex::new(RangeSyncState::default())),
}
}

fn snapshot(&self) -> (Vec<u8>, usize, usize) {
pub fn snapshot(&self) -> (Vec<u8>, usize, usize, usize) {
let state = self.state.lock();
(state.durable.clone(), state.full_syncs, state.range_syncs)
(
state.durable.clone(),
state.writes,
state.full_syncs,
state.range_syncs,
)
}

pub fn size(&self) -> u64 {
self.state.lock().data.len() as u64
}

fn write(data: &mut Vec<u8>, offset: u64, buf: &[u8]) -> Result<(), Error> {
Expand All @@ -180,7 +192,7 @@ mod tests {
}
}

impl crate::Blob for RangeSyncBlob {
impl crate::Blob for SyncTrackingBlob {
async fn read_at(&self, offset: u64, len: usize) -> Result<IoBufsMut, Error> {
self.read_at_buf(offset, len, IoBufMut::default()).await
}
Expand All @@ -206,7 +218,9 @@ mod tests {
async fn write_at(&self, offset: u64, buf: impl Into<IoBufs> + Send) -> Result<(), Error> {
let buf = buf.into().coalesce();
let mut state = self.state.lock();
Self::write(&mut state.data, offset, buf.as_ref())
Self::write(&mut state.data, offset, buf.as_ref())?;
state.writes += 1;
Ok(())
}

async fn write_at_sync(
Expand All @@ -218,6 +232,7 @@ mod tests {
let mut state = self.state.lock();
Self::write(&mut state.data, offset, buf.as_ref())?;
Self::write(&mut state.durable, offset, buf.as_ref())?;
state.writes += 1;
state.range_syncs += 1;
Ok(())
}
Expand Down Expand Up @@ -1488,13 +1503,14 @@ mod tests {
fn test_write_sync_uses_range_sync_for_buffer_only_write() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let blob = RangeSyncBlob::new();
let blob = SyncTrackingBlob::new();
let writer = Write::from_pooler(&context, blob.clone(), 0, NZUsize!(8));

// A fresh writer preserves one sync barrier for mutations that predate wrapping.
writer.sync().await.unwrap();
let (durable, full_syncs, range_syncs) = blob.snapshot();
let (durable, writes, full_syncs, range_syncs) = blob.snapshot();
assert!(durable.is_empty());
assert_eq!(writes, 0);
assert_eq!(full_syncs, 1);
assert_eq!(range_syncs, 0);

Expand All @@ -1503,15 +1519,17 @@ mod tests {
writer.sync().await.unwrap();

// No prior plain blob mutation required another full sync barrier.
let (durable, full_syncs, range_syncs) = blob.snapshot();
let (durable, writes, full_syncs, range_syncs) = blob.snapshot();
assert_eq!(durable.as_slice(), b"abc");
assert_eq!(writes, 1);
assert_eq!(full_syncs, 1);
assert_eq!(range_syncs, 1);

// The prior sync used write_at_sync, so there is still no pending full-sync barrier.
writer.sync().await.unwrap();
let (durable, full_syncs, range_syncs) = blob.snapshot();
let (durable, writes, full_syncs, range_syncs) = blob.snapshot();
assert_eq!(durable.as_slice(), b"abc");
assert_eq!(writes, 1);
assert_eq!(full_syncs, 1);
assert_eq!(range_syncs, 1);
});
Expand All @@ -1521,7 +1539,7 @@ mod tests {
fn test_write_sync_persists_pre_wrapped_blob_mutation() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let blob = RangeSyncBlob::new();
let blob = SyncTrackingBlob::new();

// Simulate a plain blob mutation before the writer wraps it.
blob.write_at(0, b"abc").await.unwrap();
Expand All @@ -1530,17 +1548,19 @@ mod tests {
writer.sync().await.unwrap();

// The first sync must use a full barrier to make the pre-wrapped write durable.
let (durable, full_syncs, range_syncs) = blob.snapshot();
let (durable, writes, full_syncs, range_syncs) = blob.snapshot();
assert_eq!(durable.as_slice(), b"abc");
assert_eq!(writes, 1);
assert_eq!(full_syncs, 1);
assert_eq!(range_syncs, 0);

// After the barrier is clear, a buffered tip-only write can use range sync again.
writer.write_at(3, b"d").await.unwrap();
writer.sync().await.unwrap();

let (durable, full_syncs, range_syncs) = blob.snapshot();
let (durable, writes, full_syncs, range_syncs) = blob.snapshot();
assert_eq!(durable.as_slice(), b"abcd");
assert_eq!(writes, 2);
assert_eq!(full_syncs, 1);
assert_eq!(range_syncs, 1);
});
Expand Down Expand Up @@ -1572,7 +1592,7 @@ mod tests {
fn test_write_sync_persists_prior_direct_flushes_with_buffered_tip() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let blob = RangeSyncBlob::new();
let blob = SyncTrackingBlob::new();
let writer = Write::from_pooler(&context, blob.clone(), 0, NZUsize!(4));

// This exceeds the buffer and forces a plain write before the final buffered tip.
Expand All @@ -1581,24 +1601,27 @@ mod tests {
writer.sync().await.unwrap();

// The final sync must cover both the prior plain write and the buffered tip.
let (durable, full_syncs, range_syncs) = blob.snapshot();
let (durable, writes, full_syncs, range_syncs) = blob.snapshot();
assert_eq!(durable.as_slice(), b"abcdefg");
assert_eq!(writes, 2);
assert_eq!(full_syncs, 1);
assert_eq!(range_syncs, 0);

// With no new writes, sync has no work left.
writer.sync().await.unwrap();
let (durable, full_syncs, range_syncs) = blob.snapshot();
let (durable, writes, full_syncs, range_syncs) = blob.snapshot();
assert_eq!(durable.as_slice(), b"abcdefg");
assert_eq!(writes, 2);
assert_eq!(full_syncs, 1);
assert_eq!(range_syncs, 0);

// After the full sync, the next buffer-only write can use range sync again.
writer.write_at(7, b"h").await.unwrap();
writer.sync().await.unwrap();

let (durable, full_syncs, range_syncs) = blob.snapshot();
let (durable, writes, full_syncs, range_syncs) = blob.snapshot();
assert_eq!(durable.as_slice(), b"abcdefgh");
assert_eq!(writes, 3);
assert_eq!(full_syncs, 1);
assert_eq!(range_syncs, 1);
});
Expand All @@ -1608,7 +1631,7 @@ mod tests {
fn test_write_sync_uses_full_sync_after_resize() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let blob = RangeSyncBlob::new();
let blob = SyncTrackingBlob::new();
let writer = Write::from_pooler(&context, blob.clone(), 0, NZUsize!(8));
writer.sync().await.unwrap();

Expand All @@ -1621,8 +1644,9 @@ mod tests {
writer.sync().await.unwrap();

// The resized contents require a full sync barrier to become durable.
let (durable, full_syncs, range_syncs) = blob.snapshot();
let (durable, writes, full_syncs, range_syncs) = blob.snapshot();
assert_eq!(durable.as_slice(), b"abcd");
assert_eq!(writes, 1);
assert_eq!(full_syncs, 2);
assert_eq!(range_syncs, 1);
});
Expand Down
Loading
Loading