Skip to content

Commit 7e7fcca

Browse files
committed
[runtime/utils/buffer/write] optimistically use Blob::write_at_sync
1 parent d8589a0 commit 7e7fcca

2 files changed

Lines changed: 230 additions & 21 deletions

File tree

runtime/src/utils/buffer/mod.rs

Lines changed: 180 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,114 @@ mod tests {
128128
}
129129
}
130130

131+
#[derive(Default)]
132+
struct RangeSyncState {
133+
/// All data currently stored in the blob.
134+
///
135+
/// This includes every durable byte plus any newer bytes that have not
136+
/// been made durable yet.
137+
data: Vec<u8>,
138+
139+
/// Prefix/ranges of `data` that would survive a crash.
140+
durable: Vec<u8>,
141+
142+
/// Number of full sync barriers.
143+
full_syncs: usize,
144+
145+
/// Number of range-scoped write syncs.
146+
range_syncs: usize,
147+
}
148+
149+
/// Test blob with separate visible and durable state.
150+
///
151+
/// Plain writes and resizes only update `data`. `write_at_sync` updates `data`
152+
/// and then copies only that submitted range into `durable`. `sync` copies all
153+
/// of `data` to `durable`. This lets tests assert that `Write::sync` uses range
154+
/// sync only when no earlier unsynced mutation needs a full durability barrier.
155+
#[derive(Clone)]
156+
struct RangeSyncBlob {
157+
state: Arc<Mutex<RangeSyncState>>,
158+
}
159+
160+
impl RangeSyncBlob {
161+
fn new() -> Self {
162+
Self {
163+
state: Arc::new(Mutex::new(RangeSyncState::default())),
164+
}
165+
}
166+
167+
fn snapshot(&self) -> (Vec<u8>, usize, usize) {
168+
let state = self.state.lock();
169+
(state.durable.clone(), state.full_syncs, state.range_syncs)
170+
}
171+
172+
fn write(data: &mut Vec<u8>, offset: u64, buf: &[u8]) -> Result<(), Error> {
173+
let start = usize::try_from(offset).map_err(|_| Error::OffsetOverflow)?;
174+
let end = start.checked_add(buf.len()).ok_or(Error::OffsetOverflow)?;
175+
if end > data.len() {
176+
data.resize(end, 0);
177+
}
178+
data[start..end].copy_from_slice(buf);
179+
Ok(())
180+
}
181+
}
182+
183+
impl crate::Blob for RangeSyncBlob {
184+
async fn read_at(&self, offset: u64, len: usize) -> Result<IoBufsMut, Error> {
185+
self.read_at_buf(offset, len, IoBufMut::default()).await
186+
}
187+
188+
async fn read_at_buf(
189+
&self,
190+
offset: u64,
191+
len: usize,
192+
buf: impl Into<IoBufsMut> + Send,
193+
) -> Result<IoBufsMut, Error> {
194+
let start = usize::try_from(offset).map_err(|_| Error::OffsetOverflow)?;
195+
let end = start.checked_add(len).ok_or(Error::OffsetOverflow)?;
196+
let state = self.state.lock();
197+
if end > state.data.len() {
198+
return Err(Error::BlobInsufficientLength);
199+
}
200+
201+
let mut out = buf.into();
202+
out.put_slice(&state.data[start..end]);
203+
Ok(out)
204+
}
205+
206+
async fn write_at(&self, offset: u64, buf: impl Into<IoBufs> + Send) -> Result<(), Error> {
207+
let buf = buf.into().coalesce();
208+
let mut state = self.state.lock();
209+
Self::write(&mut state.data, offset, buf.as_ref())
210+
}
211+
212+
async fn write_at_sync(
213+
&self,
214+
offset: u64,
215+
buf: impl Into<IoBufs> + Send,
216+
) -> Result<(), Error> {
217+
let buf = buf.into().coalesce();
218+
let mut state = self.state.lock();
219+
Self::write(&mut state.data, offset, buf.as_ref())?;
220+
Self::write(&mut state.durable, offset, buf.as_ref())?;
221+
state.range_syncs += 1;
222+
Ok(())
223+
}
224+
225+
async fn resize(&self, len: u64) -> Result<(), Error> {
226+
let len = usize::try_from(len).map_err(|_| Error::OffsetOverflow)?;
227+
self.state.lock().data.resize(len, 0);
228+
Ok(())
229+
}
230+
231+
async fn sync(&self) -> Result<(), Error> {
232+
let mut state = self.state.lock();
233+
state.durable = state.data.clone();
234+
state.full_syncs += 1;
235+
Ok(())
236+
}
237+
}
238+
131239
#[test_traced]
132240
fn test_read_basic() {
133241
let executor = deterministic::Runner::default();
@@ -1329,7 +1437,7 @@ mod tests {
13291437
}
13301438

13311439
#[test_traced]
1332-
fn test_resize_then_append_at_size() {
1440+
fn test_write_resize_then_append_at_size() {
13331441
let executor = deterministic::Runner::default();
13341442
executor.start(|context| async move {
13351443
// Test truncating, then appending at the new size
@@ -1375,4 +1483,75 @@ mod tests {
13751483
assert_eq!(read.as_ref(), b"01234XXXXX");
13761484
});
13771485
}
1486+
1487+
#[test_traced]
1488+
fn test_write_sync_uses_range_sync_for_buffer_only_write() {
1489+
let executor = deterministic::Runner::default();
1490+
executor.start(|context| async move {
1491+
let blob = RangeSyncBlob::new();
1492+
let writer = Write::from_pooler(&context, blob.clone(), 0, NZUsize!(8));
1493+
1494+
// The write remains entirely buffered, so sync can make just this range durable.
1495+
writer.write_at(0, b"abc").await.unwrap();
1496+
writer.sync().await.unwrap();
1497+
1498+
// No prior plain blob mutation required a full sync barrier.
1499+
let (durable, full_syncs, range_syncs) = blob.snapshot();
1500+
assert_eq!(durable.as_slice(), b"abc");
1501+
assert_eq!(full_syncs, 0);
1502+
assert_eq!(range_syncs, 1);
1503+
});
1504+
}
1505+
1506+
#[test_traced]
1507+
fn test_write_sync_persists_prior_direct_flushes_with_buffered_tip() {
1508+
let executor = deterministic::Runner::default();
1509+
executor.start(|context| async move {
1510+
let blob = RangeSyncBlob::new();
1511+
let writer = Write::from_pooler(&context, blob.clone(), 0, NZUsize!(4));
1512+
1513+
// This exceeds the buffer and forces a plain write before the final buffered tip.
1514+
writer.write_at(0, b"abcdef").await.unwrap();
1515+
writer.write_at(6, b"g").await.unwrap();
1516+
writer.sync().await.unwrap();
1517+
1518+
// The final sync must cover both the prior plain write and the buffered tip.
1519+
let (durable, full_syncs, range_syncs) = blob.snapshot();
1520+
assert_eq!(durable.as_slice(), b"abcdefg");
1521+
assert_eq!(full_syncs, 1);
1522+
assert_eq!(range_syncs, 0);
1523+
1524+
// After the full sync, the next buffer-only write can use range sync again.
1525+
writer.write_at(7, b"h").await.unwrap();
1526+
writer.sync().await.unwrap();
1527+
1528+
let (durable, full_syncs, range_syncs) = blob.snapshot();
1529+
assert_eq!(durable.as_slice(), b"abcdefgh");
1530+
assert_eq!(full_syncs, 1);
1531+
assert_eq!(range_syncs, 1);
1532+
});
1533+
}
1534+
1535+
#[test_traced]
1536+
fn test_write_sync_uses_full_sync_after_resize() {
1537+
let executor = deterministic::Runner::default();
1538+
executor.start(|context| async move {
1539+
let blob = RangeSyncBlob::new();
1540+
let writer = Write::from_pooler(&context, blob.clone(), 0, NZUsize!(8));
1541+
1542+
// Establish already-durable data with a range sync.
1543+
writer.write_at(0, b"abcdef").await.unwrap();
1544+
writer.sync().await.unwrap();
1545+
1546+
// Resize alone is an unsynced blob mutation.
1547+
writer.resize(4).await.unwrap();
1548+
writer.sync().await.unwrap();
1549+
1550+
// The resized contents require a full sync barrier to become durable.
1551+
let (durable, full_syncs, range_syncs) = blob.snapshot();
1552+
assert_eq!(durable.as_slice(), b"abcd");
1553+
assert_eq!(full_syncs, 1);
1554+
assert_eq!(range_syncs, 1);
1555+
});
1556+
}
13781557
}

runtime/src/utils/buffer/write.rs

Lines changed: 50 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,20 @@ use crate::{buffer::tip::Buffer, Blob, Buf, BufferPool, BufferPooler, Error, IoB
22
use commonware_utils::sync::AsyncRwLock;
33
use std::{num::NonZeroUsize, sync::Arc};
44

5+
/// Buffered tip data and durability bookkeeping.
6+
struct State {
7+
/// The buffer containing the data yet to be appended to the tip of the
8+
/// underlying blob.
9+
buffer: Buffer,
10+
11+
/// Whether prior blob mutation requires a full sync barrier.
12+
///
13+
/// Set after plain [Blob::write_at] or [Blob::resize]. While set, [Write::sync]
14+
/// cannot use [Blob::write_at_sync] because that only makes the current write
15+
/// durable.
16+
needs_sync: bool,
17+
}
18+
519
/// A writer that buffers the raw content of a [Blob] to optimize the performance of appending or
620
/// updating data.
721
///
@@ -48,8 +62,8 @@ pub struct Write<B: Blob> {
4862
/// The underlying blob to write to.
4963
blob: B,
5064

51-
/// The buffer containing the data yet to be appended to the tip of the underlying blob.
52-
buffer: Arc<AsyncRwLock<Buffer>>,
65+
/// Shared tip buffer and durability state.
66+
state: Arc<AsyncRwLock<State>>,
5367
}
5468

5569
impl<B: Blob> Write<B> {
@@ -58,7 +72,10 @@ impl<B: Blob> Write<B> {
5872
pub fn new(blob: B, size: u64, capacity: NonZeroUsize, pool: BufferPool) -> Self {
5973
Self {
6074
blob,
61-
buffer: Arc::new(AsyncRwLock::new(Buffer::new(size, capacity.get(), pool))),
75+
state: Arc::new(AsyncRwLock::new(State {
76+
buffer: Buffer::new(size, capacity.get(), pool),
77+
needs_sync: false,
78+
})),
6279
}
6380
}
6481

@@ -75,10 +92,9 @@ impl<B: Blob> Write<B> {
7592
/// Returns the current logical size of the blob including any buffered data.
7693
///
7794
/// This represents the total size of data that would be present after flushing.
78-
#[allow(clippy::len_without_is_empty)]
7995
pub async fn size(&self) -> u64 {
80-
let buffer = self.buffer.read().await;
81-
buffer.size()
96+
let state = self.state.read().await;
97+
state.buffer.size()
8298
}
8399

84100
/// Read exactly `len` immutable bytes starting at `offset`.
@@ -88,8 +104,9 @@ impl<B: Blob> Write<B> {
88104
.checked_add(len as u64)
89105
.ok_or(Error::OffsetOverflow)?;
90106

91-
// Acquire a read lock on the buffer.
92-
let buffer = self.buffer.read().await;
107+
// Acquire a read lock on the buffer state.
108+
let state = self.state.read().await;
109+
let buffer = &state.buffer;
93110

94111
// If the data required is beyond the size of the blob, return an error.
95112
if end_offset > buffer.size() {
@@ -138,8 +155,8 @@ impl<B: Blob> Write<B> {
138155
.checked_add(bufs.remaining() as u64)
139156
.ok_or(Error::OffsetOverflow)?;
140157

141-
// Acquire a write lock on the buffer.
142-
let mut buffer = self.buffer.write().await;
158+
// Acquire a write lock on the buffer state.
159+
let mut state = self.state.write().await;
143160

144161
// Process each chunk of the input buffer, attempting to merge into the tip buffer
145162
// or writing directly to the underlying blob.
@@ -149,7 +166,7 @@ impl<B: Blob> Write<B> {
149166
let chunk_len = chunk.len();
150167

151168
// Chunk falls entirely within the buffer's current range and can be merged.
152-
if buffer.merge(chunk, current_offset) {
169+
if state.buffer.merge(chunk, current_offset) {
153170
bufs.advance(chunk_len);
154171
current_offset += chunk_len as u64;
155172
continue;
@@ -158,10 +175,11 @@ impl<B: Blob> Write<B> {
158175
// Chunk cannot be merged, so flush the buffer if the range overlaps, and check
159176
// if merge is possible after.
160177
let chunk_end = current_offset + chunk_len as u64;
161-
if buffer.offset < chunk_end {
162-
if let Some((old_buf, old_offset)) = buffer.take() {
178+
if state.buffer.offset < chunk_end {
179+
if let Some((old_buf, old_offset)) = state.buffer.take() {
163180
self.blob.write_at(old_offset, old_buf).await?;
164-
if buffer.merge(chunk, current_offset) {
181+
state.needs_sync = true;
182+
if state.buffer.merge(chunk, current_offset) {
165183
bufs.advance(chunk_len);
166184
current_offset += chunk_len as u64;
167185
continue;
@@ -175,11 +193,12 @@ impl<B: Blob> Write<B> {
175193
// below. Removing this inefficiency may not be worth the additional complexity.
176194
let direct = bufs.split_to(chunk_len);
177195
self.blob.write_at(current_offset, direct).await?;
196+
state.needs_sync = true;
178197
current_offset += chunk_len as u64;
179198

180199
// Maintain the "buffer at tip" invariant by advancing offset to the end of this
181200
// write if it extended the underlying blob.
182-
buffer.offset = buffer.offset.max(current_offset);
201+
state.buffer.offset = state.buffer.offset.max(current_offset);
183202
}
184203

185204
Ok(())
@@ -191,27 +210,38 @@ impl<B: Blob> Write<B> {
191210
/// before resizing the underlying blob.
192211
pub async fn resize(&self, len: u64) -> Result<(), Error> {
193212
// Acquire a write lock on the buffer.
194-
let mut buffer = self.buffer.write().await;
213+
let mut state = self.state.write().await;
195214

196215
// Flush buffered data to the underlying blob.
197216
//
198217
// This can only happen if the new size is greater than the current size.
199-
if let Some((buf, offset)) = buffer.resize(len) {
218+
if let Some((buf, offset)) = state.buffer.resize(len) {
200219
self.blob.write_at(offset, buf).await?;
220+
state.needs_sync = true;
201221
}
202222

203223
// Resize the underlying blob.
204224
self.blob.resize(len).await?;
225+
state.needs_sync = true;
205226

206227
Ok(())
207228
}
208229

209230
/// Flush buffered bytes and durably sync the underlying blob.
210231
pub async fn sync(&self) -> Result<(), Error> {
211-
let mut buffer = self.buffer.write().await;
212-
if let Some((buf, offset)) = buffer.take() {
232+
let mut state = self.state.write().await;
233+
if let Some((buf, offset)) = state.buffer.take() {
234+
if !state.needs_sync {
235+
return self.blob.write_at_sync(offset, buf).await;
236+
}
237+
213238
self.blob.write_at(offset, buf).await?;
239+
self.blob.sync().await?;
240+
state.needs_sync = false;
241+
return Ok(());
214242
}
215-
self.blob.sync().await
243+
self.blob.sync().await?;
244+
state.needs_sync = false;
245+
Ok(())
216246
}
217247
}

0 commit comments

Comments
 (0)