Skip to content

Commit 3ad003c

Browse files
committed
[runtime/utils/buffer/append] use Blob::write_at_sync
1 parent c42be7a commit 3ad003c

2 files changed

Lines changed: 496 additions & 104 deletions

File tree

runtime/src/utils/buffer/mod.rs

Lines changed: 44 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ mod tests {
129129
}
130130

131131
#[derive(Default)]
132-
struct RangeSyncState {
132+
struct SyncTrackingState {
133133
/// All data currently stored in the blob.
134134
///
135135
/// This includes every durable byte plus any newer bytes that have not
@@ -139,6 +139,9 @@ mod tests {
139139
/// Prefix/ranges of `data` that would survive a crash.
140140
durable: Vec<u8>,
141141

142+
/// Number of write operations.
143+
writes: usize,
144+
142145
/// Number of full sync barriers.
143146
full_syncs: usize,
144147

@@ -150,23 +153,32 @@ mod tests {
150153
///
151154
/// Plain writes and resizes only update `data`. `write_at_sync` updates `data`
152155
/// and then copies only that submitted range into `durable`. `sync` copies all
153-
/// of `data` to `durable`. This lets tests assert that `Write::sync` uses range
154-
/// sync only when no earlier unsynced mutation needs a full durability barrier.
156+
/// of `data` to `durable`. This lets buffer tests assert range sync is only
157+
/// used when no earlier unsynced mutation needs a full durability barrier.
155158
#[derive(Clone)]
156-
struct RangeSyncBlob {
157-
state: Arc<Mutex<RangeSyncState>>,
159+
pub struct SyncTrackingBlob {
160+
state: Arc<Mutex<SyncTrackingState>>,
158161
}
159162

160-
impl RangeSyncBlob {
161-
fn new() -> Self {
163+
impl SyncTrackingBlob {
164+
pub fn new() -> Self {
162165
Self {
163-
state: Arc::new(Mutex::new(RangeSyncState::default())),
166+
state: Arc::new(Mutex::new(SyncTrackingState::default())),
164167
}
165168
}
166169

167-
fn snapshot(&self) -> (Vec<u8>, usize, usize) {
170+
pub fn snapshot(&self) -> (Vec<u8>, usize, usize, usize) {
168171
let state = self.state.lock();
169-
(state.durable.clone(), state.full_syncs, state.range_syncs)
172+
(
173+
state.durable.clone(),
174+
state.writes,
175+
state.full_syncs,
176+
state.range_syncs,
177+
)
178+
}
179+
180+
pub fn size(&self) -> u64 {
181+
self.state.lock().data.len() as u64
170182
}
171183

172184
fn write(data: &mut Vec<u8>, offset: u64, buf: &[u8]) -> Result<(), Error> {
@@ -180,7 +192,7 @@ mod tests {
180192
}
181193
}
182194

183-
impl crate::Blob for RangeSyncBlob {
195+
impl crate::Blob for SyncTrackingBlob {
184196
async fn read_at(&self, offset: u64, len: usize) -> Result<IoBufsMut, Error> {
185197
self.read_at_buf(offset, len, IoBufMut::default()).await
186198
}
@@ -206,7 +218,9 @@ mod tests {
206218
async fn write_at(&self, offset: u64, buf: impl Into<IoBufs> + Send) -> Result<(), Error> {
207219
let buf = buf.into().coalesce();
208220
let mut state = self.state.lock();
209-
Self::write(&mut state.data, offset, buf.as_ref())
221+
Self::write(&mut state.data, offset, buf.as_ref())?;
222+
state.writes += 1;
223+
Ok(())
210224
}
211225

212226
async fn write_at_sync(
@@ -218,6 +232,7 @@ mod tests {
218232
let mut state = self.state.lock();
219233
Self::write(&mut state.data, offset, buf.as_ref())?;
220234
Self::write(&mut state.durable, offset, buf.as_ref())?;
235+
state.writes += 1;
221236
state.range_syncs += 1;
222237
Ok(())
223238
}
@@ -1488,13 +1503,14 @@ mod tests {
14881503
fn test_write_sync_uses_range_sync_for_buffer_only_write() {
14891504
let executor = deterministic::Runner::default();
14901505
executor.start(|context| async move {
1491-
let blob = RangeSyncBlob::new();
1506+
let blob = SyncTrackingBlob::new();
14921507
let writer = Write::from_pooler(&context, blob.clone(), 0, NZUsize!(8));
14931508

14941509
// A fresh writer has no buffered bytes or prior plain blob mutation, so sync is a no-op.
14951510
writer.sync().await.unwrap();
1496-
let (durable, full_syncs, range_syncs) = blob.snapshot();
1511+
let (durable, writes, full_syncs, range_syncs) = blob.snapshot();
14971512
assert!(durable.is_empty());
1513+
assert_eq!(writes, 0);
14981514
assert_eq!(full_syncs, 0);
14991515
assert_eq!(range_syncs, 0);
15001516

@@ -1503,15 +1519,17 @@ mod tests {
15031519
writer.sync().await.unwrap();
15041520

15051521
// No prior plain blob mutation required a full sync barrier.
1506-
let (durable, full_syncs, range_syncs) = blob.snapshot();
1522+
let (durable, writes, full_syncs, range_syncs) = blob.snapshot();
15071523
assert_eq!(durable.as_slice(), b"abc");
1524+
assert_eq!(writes, 1);
15081525
assert_eq!(full_syncs, 0);
15091526
assert_eq!(range_syncs, 1);
15101527

15111528
// The prior sync used write_at_sync, so there is still no pending full-sync barrier.
15121529
writer.sync().await.unwrap();
1513-
let (durable, full_syncs, range_syncs) = blob.snapshot();
1530+
let (durable, writes, full_syncs, range_syncs) = blob.snapshot();
15141531
assert_eq!(durable.as_slice(), b"abc");
1532+
assert_eq!(writes, 1);
15151533
assert_eq!(full_syncs, 0);
15161534
assert_eq!(range_syncs, 1);
15171535
});
@@ -1521,7 +1539,7 @@ mod tests {
15211539
fn test_write_sync_persists_prior_direct_flushes_with_buffered_tip() {
15221540
let executor = deterministic::Runner::default();
15231541
executor.start(|context| async move {
1524-
let blob = RangeSyncBlob::new();
1542+
let blob = SyncTrackingBlob::new();
15251543
let writer = Write::from_pooler(&context, blob.clone(), 0, NZUsize!(4));
15261544

15271545
// This exceeds the buffer and forces a plain write before the final buffered tip.
@@ -1530,24 +1548,27 @@ mod tests {
15301548
writer.sync().await.unwrap();
15311549

15321550
// The final sync must cover both the prior plain write and the buffered tip.
1533-
let (durable, full_syncs, range_syncs) = blob.snapshot();
1551+
let (durable, writes, full_syncs, range_syncs) = blob.snapshot();
15341552
assert_eq!(durable.as_slice(), b"abcdefg");
1553+
assert_eq!(writes, 2);
15351554
assert_eq!(full_syncs, 1);
15361555
assert_eq!(range_syncs, 0);
15371556

15381557
// With no new writes, sync has no work left.
15391558
writer.sync().await.unwrap();
1540-
let (durable, full_syncs, range_syncs) = blob.snapshot();
1559+
let (durable, writes, full_syncs, range_syncs) = blob.snapshot();
15411560
assert_eq!(durable.as_slice(), b"abcdefg");
1561+
assert_eq!(writes, 2);
15421562
assert_eq!(full_syncs, 1);
15431563
assert_eq!(range_syncs, 0);
15441564

15451565
// After the full sync, the next buffer-only write can use range sync again.
15461566
writer.write_at(7, b"h").await.unwrap();
15471567
writer.sync().await.unwrap();
15481568

1549-
let (durable, full_syncs, range_syncs) = blob.snapshot();
1569+
let (durable, writes, full_syncs, range_syncs) = blob.snapshot();
15501570
assert_eq!(durable.as_slice(), b"abcdefgh");
1571+
assert_eq!(writes, 3);
15511572
assert_eq!(full_syncs, 1);
15521573
assert_eq!(range_syncs, 1);
15531574
});
@@ -1557,7 +1578,7 @@ mod tests {
15571578
fn test_write_sync_uses_full_sync_after_resize() {
15581579
let executor = deterministic::Runner::default();
15591580
executor.start(|context| async move {
1560-
let blob = RangeSyncBlob::new();
1581+
let blob = SyncTrackingBlob::new();
15611582
let writer = Write::from_pooler(&context, blob.clone(), 0, NZUsize!(8));
15621583

15631584
// Establish already-durable data with a range sync.
@@ -1569,8 +1590,9 @@ mod tests {
15691590
writer.sync().await.unwrap();
15701591

15711592
// The resized contents require a full sync barrier to become durable.
1572-
let (durable, full_syncs, range_syncs) = blob.snapshot();
1593+
let (durable, writes, full_syncs, range_syncs) = blob.snapshot();
15731594
assert_eq!(durable.as_slice(), b"abcd");
1595+
assert_eq!(writes, 1);
15741596
assert_eq!(full_syncs, 1);
15751597
assert_eq!(range_syncs, 1);
15761598
});

0 commit comments

Comments
 (0)