Skip to content

Commit 3c08216

Browse files
[runtime/utils/buffer/write] use Blob::write_at_sync (#3852)
Co-authored-by: Patrick O'Grady <me@patrickogrady.xyz>
1 parent 527763b commit 3c08216

2 files changed

Lines changed: 366 additions & 34 deletions

File tree

runtime/src/utils/buffer/mod.rs

Lines changed: 253 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,114 @@ mod tests {
128128
}
129129
}
130130

131+
#[derive(Default)]
132+
struct RangeSyncState {
133+
/// All data currently stored in the blob.
134+
///
135+
/// This includes every durable byte plus any newer bytes that have not
136+
/// been made durable yet.
137+
data: Vec<u8>,
138+
139+
/// Prefix/ranges of `data` that would survive a crash.
140+
durable: Vec<u8>,
141+
142+
/// Number of full sync barriers.
143+
full_syncs: usize,
144+
145+
/// Number of range-scoped write syncs.
146+
range_syncs: usize,
147+
}
148+
149+
/// Test blob with separate visible and durable state.
150+
///
151+
/// Plain writes and resizes only update `data`. `write_at_sync` updates `data`
152+
/// and then copies only that submitted range into `durable`. `sync` copies all
153+
/// of `data` to `durable`. This lets tests assert that `Write::sync` uses range
154+
/// sync only when no earlier unsynced mutation needs a full durability barrier.
155+
#[derive(Clone)]
156+
struct RangeSyncBlob {
157+
state: Arc<Mutex<RangeSyncState>>,
158+
}
159+
160+
impl RangeSyncBlob {
161+
fn new() -> Self {
162+
Self {
163+
state: Arc::new(Mutex::new(RangeSyncState::default())),
164+
}
165+
}
166+
167+
fn snapshot(&self) -> (Vec<u8>, usize, usize) {
168+
let state = self.state.lock();
169+
(state.durable.clone(), state.full_syncs, state.range_syncs)
170+
}
171+
172+
fn write(data: &mut Vec<u8>, offset: u64, buf: &[u8]) -> Result<(), Error> {
173+
let start = usize::try_from(offset).map_err(|_| Error::OffsetOverflow)?;
174+
let end = start.checked_add(buf.len()).ok_or(Error::OffsetOverflow)?;
175+
if end > data.len() {
176+
data.resize(end, 0);
177+
}
178+
data[start..end].copy_from_slice(buf);
179+
Ok(())
180+
}
181+
}
182+
183+
impl crate::Blob for RangeSyncBlob {
184+
async fn read_at(&self, offset: u64, len: usize) -> Result<IoBufsMut, Error> {
185+
self.read_at_buf(offset, len, IoBufMut::default()).await
186+
}
187+
188+
async fn read_at_buf(
189+
&self,
190+
offset: u64,
191+
len: usize,
192+
buf: impl Into<IoBufsMut> + Send,
193+
) -> Result<IoBufsMut, Error> {
194+
let start = usize::try_from(offset).map_err(|_| Error::OffsetOverflow)?;
195+
let end = start.checked_add(len).ok_or(Error::OffsetOverflow)?;
196+
let state = self.state.lock();
197+
if end > state.data.len() {
198+
return Err(Error::BlobInsufficientLength);
199+
}
200+
201+
let mut out = buf.into();
202+
out.put_slice(&state.data[start..end]);
203+
Ok(out)
204+
}
205+
206+
async fn write_at(&self, offset: u64, buf: impl Into<IoBufs> + Send) -> Result<(), Error> {
207+
let buf = buf.into().coalesce();
208+
let mut state = self.state.lock();
209+
Self::write(&mut state.data, offset, buf.as_ref())
210+
}
211+
212+
async fn write_at_sync(
213+
&self,
214+
offset: u64,
215+
buf: impl Into<IoBufs> + Send,
216+
) -> Result<(), Error> {
217+
let buf = buf.into().coalesce();
218+
let mut state = self.state.lock();
219+
Self::write(&mut state.data, offset, buf.as_ref())?;
220+
Self::write(&mut state.durable, offset, buf.as_ref())?;
221+
state.range_syncs += 1;
222+
Ok(())
223+
}
224+
225+
async fn resize(&self, len: u64) -> Result<(), Error> {
226+
let len = usize::try_from(len).map_err(|_| Error::OffsetOverflow)?;
227+
self.state.lock().data.resize(len, 0);
228+
Ok(())
229+
}
230+
231+
async fn sync(&self) -> Result<(), Error> {
232+
let mut state = self.state.lock();
233+
state.durable = state.data.clone();
234+
state.full_syncs += 1;
235+
Ok(())
236+
}
237+
}
238+
131239
#[test_traced]
132240
fn test_read_basic() {
133241
let executor = deterministic::Runner::default();
@@ -1329,7 +1437,7 @@ mod tests {
13291437
}
13301438

13311439
#[test_traced]
1332-
fn test_resize_then_append_at_size() {
1440+
fn test_write_resize_then_append_at_size() {
13331441
let executor = deterministic::Runner::default();
13341442
executor.start(|context| async move {
13351443
// Test truncating, then appending at the new size
@@ -1375,4 +1483,148 @@ mod tests {
13751483
assert_eq!(read.as_ref(), b"01234XXXXX");
13761484
});
13771485
}
1486+
1487+
#[test_traced]
1488+
fn test_write_sync_uses_range_sync_for_buffer_only_write() {
1489+
let executor = deterministic::Runner::default();
1490+
executor.start(|context| async move {
1491+
let blob = RangeSyncBlob::new();
1492+
let writer = Write::from_pooler(&context, blob.clone(), 0, NZUsize!(8));
1493+
1494+
// A fresh writer preserves one sync barrier for mutations that predate wrapping.
1495+
writer.sync().await.unwrap();
1496+
let (durable, full_syncs, range_syncs) = blob.snapshot();
1497+
assert!(durable.is_empty());
1498+
assert_eq!(full_syncs, 1);
1499+
assert_eq!(range_syncs, 0);
1500+
1501+
// The write remains entirely buffered, so sync can make just this range durable.
1502+
writer.write_at(0, b"abc").await.unwrap();
1503+
writer.sync().await.unwrap();
1504+
1505+
// No prior plain blob mutation required another full sync barrier.
1506+
let (durable, full_syncs, range_syncs) = blob.snapshot();
1507+
assert_eq!(durable.as_slice(), b"abc");
1508+
assert_eq!(full_syncs, 1);
1509+
assert_eq!(range_syncs, 1);
1510+
1511+
// The prior sync used write_at_sync, so there is still no pending full-sync barrier.
1512+
writer.sync().await.unwrap();
1513+
let (durable, full_syncs, range_syncs) = blob.snapshot();
1514+
assert_eq!(durable.as_slice(), b"abc");
1515+
assert_eq!(full_syncs, 1);
1516+
assert_eq!(range_syncs, 1);
1517+
});
1518+
}
1519+
1520+
#[test_traced]
1521+
fn test_write_sync_persists_pre_wrapped_blob_mutation() {
1522+
let executor = deterministic::Runner::default();
1523+
executor.start(|context| async move {
1524+
let blob = RangeSyncBlob::new();
1525+
1526+
// Simulate a plain blob mutation before the writer wraps it.
1527+
blob.write_at(0, b"abc").await.unwrap();
1528+
1529+
let writer = Write::from_pooler(&context, blob.clone(), 3, NZUsize!(8));
1530+
writer.sync().await.unwrap();
1531+
1532+
// The first sync must use a full barrier to make the pre-wrapped write durable.
1533+
let (durable, full_syncs, range_syncs) = blob.snapshot();
1534+
assert_eq!(durable.as_slice(), b"abc");
1535+
assert_eq!(full_syncs, 1);
1536+
assert_eq!(range_syncs, 0);
1537+
1538+
// After the barrier is clear, a buffered tip-only write can use range sync again.
1539+
writer.write_at(3, b"d").await.unwrap();
1540+
writer.sync().await.unwrap();
1541+
1542+
let (durable, full_syncs, range_syncs) = blob.snapshot();
1543+
assert_eq!(durable.as_slice(), b"abcd");
1544+
assert_eq!(full_syncs, 1);
1545+
assert_eq!(range_syncs, 1);
1546+
});
1547+
}
1548+
1549+
#[test_traced]
1550+
fn test_write_sync_failed_range_sync_does_not_mark_clean() {
1551+
let executor = deterministic::Runner::default();
1552+
executor.start(|context| async move {
1553+
let name = b"failed_range_sync";
1554+
let (blob, size) = context.open("partition", name).await.unwrap();
1555+
let writer = Write::from_pooler(&context, blob, size, NZUsize!(8));
1556+
writer.sync().await.unwrap();
1557+
1558+
// Keep the write buffered so sync attempts the clean `write_at_sync` path.
1559+
writer.write_at(0, b"abc").await.unwrap();
1560+
1561+
// Removing the blob makes the range-sync flush fail.
1562+
context.remove("partition", Some(name)).await.unwrap();
1563+
assert!(writer.sync().await.is_err());
1564+
1565+
// The failed `write_at_sync` must leave a pending full-sync barrier, so a
1566+
// later sync cannot report success.
1567+
assert!(writer.sync().await.is_err());
1568+
});
1569+
}
1570+
1571+
#[test_traced]
1572+
fn test_write_sync_persists_prior_direct_flushes_with_buffered_tip() {
1573+
let executor = deterministic::Runner::default();
1574+
executor.start(|context| async move {
1575+
let blob = RangeSyncBlob::new();
1576+
let writer = Write::from_pooler(&context, blob.clone(), 0, NZUsize!(4));
1577+
1578+
// This exceeds the buffer and forces a plain write before the final buffered tip.
1579+
writer.write_at(0, b"abcdef").await.unwrap();
1580+
writer.write_at(6, b"g").await.unwrap();
1581+
writer.sync().await.unwrap();
1582+
1583+
// The final sync must cover both the prior plain write and the buffered tip.
1584+
let (durable, full_syncs, range_syncs) = blob.snapshot();
1585+
assert_eq!(durable.as_slice(), b"abcdefg");
1586+
assert_eq!(full_syncs, 1);
1587+
assert_eq!(range_syncs, 0);
1588+
1589+
// With no new writes, sync has no work left.
1590+
writer.sync().await.unwrap();
1591+
let (durable, full_syncs, range_syncs) = blob.snapshot();
1592+
assert_eq!(durable.as_slice(), b"abcdefg");
1593+
assert_eq!(full_syncs, 1);
1594+
assert_eq!(range_syncs, 0);
1595+
1596+
// After the full sync, the next buffer-only write can use range sync again.
1597+
writer.write_at(7, b"h").await.unwrap();
1598+
writer.sync().await.unwrap();
1599+
1600+
let (durable, full_syncs, range_syncs) = blob.snapshot();
1601+
assert_eq!(durable.as_slice(), b"abcdefgh");
1602+
assert_eq!(full_syncs, 1);
1603+
assert_eq!(range_syncs, 1);
1604+
});
1605+
}
1606+
1607+
#[test_traced]
1608+
fn test_write_sync_uses_full_sync_after_resize() {
1609+
let executor = deterministic::Runner::default();
1610+
executor.start(|context| async move {
1611+
let blob = RangeSyncBlob::new();
1612+
let writer = Write::from_pooler(&context, blob.clone(), 0, NZUsize!(8));
1613+
writer.sync().await.unwrap();
1614+
1615+
// Establish already-durable data with a range sync.
1616+
writer.write_at(0, b"abcdef").await.unwrap();
1617+
writer.sync().await.unwrap();
1618+
1619+
// Resize alone is an unsynced blob mutation.
1620+
writer.resize(4).await.unwrap();
1621+
writer.sync().await.unwrap();
1622+
1623+
// The resized contents require a full sync barrier to become durable.
1624+
let (durable, full_syncs, range_syncs) = blob.snapshot();
1625+
assert_eq!(durable.as_slice(), b"abcd");
1626+
assert_eq!(full_syncs, 2);
1627+
assert_eq!(range_syncs, 1);
1628+
});
1629+
}
13781630
}

0 commit comments

Comments
 (0)