Skip to content

Commit 5eb93de

Browse files
committed
[runtime/utils/buffer/write] cleanup
1 parent 858babd commit 5eb93de

1 file changed

Lines changed: 56 additions & 33 deletions

File tree

runtime/src/utils/buffer/write.rs

Lines changed: 56 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,11 @@ use crate::{buffer::tip::Buffer, Blob, Buf, BufferPool, BufferPooler, Error, IoB
22
use commonware_utils::sync::AsyncRwLock;
33
use std::{num::NonZeroUsize, sync::Arc};
44

5-
/// Buffered tip data and durability bookkeeping.
6-
struct State {
5+
/// Buffered tip data, blob handle, and durability bookkeeping.
6+
struct State<B: Blob> {
7+
/// The underlying blob to write to.
8+
blob: B,
9+
710
/// The buffer containing the data yet to be appended to the tip of the
811
/// underlying blob.
912
buffer: Buffer,
@@ -16,6 +19,46 @@ struct State {
1619
needs_sync: bool,
1720
}
1821

22+
impl<B: Blob> State<B> {
23+
async fn read_at(&self, offset: u64, len: usize) -> Result<IoBufs, Error> {
24+
Ok(self.blob.read_at(offset, len).await?.freeze())
25+
}
26+
27+
async fn write_at(&mut self, offset: u64, bufs: impl Into<IoBufs> + Send) -> Result<(), Error> {
28+
self.blob.write_at(offset, bufs).await?;
29+
self.needs_sync = true;
30+
Ok(())
31+
}
32+
33+
async fn write_at_sync(
34+
&mut self,
35+
offset: u64,
36+
bufs: impl Into<IoBufs> + Send,
37+
) -> Result<(), Error> {
38+
if self.needs_sync {
39+
self.write_at(offset, bufs).await?;
40+
self.sync().await
41+
} else {
42+
self.blob.write_at_sync(offset, bufs).await
43+
}
44+
}
45+
46+
async fn resize(&mut self, len: u64) -> Result<(), Error> {
47+
self.blob.resize(len).await?;
48+
self.needs_sync = true;
49+
Ok(())
50+
}
51+
52+
async fn sync(&mut self) -> Result<(), Error> {
53+
if !self.needs_sync {
54+
return Ok(());
55+
}
56+
self.blob.sync().await?;
57+
self.needs_sync = false;
58+
Ok(())
59+
}
60+
}
61+
1962
/// A writer that buffers the raw content of a [Blob] to optimize the performance of appending or
2063
/// updating data.
2164
///
@@ -59,20 +102,17 @@ struct State {
59102
/// ```
60103
#[derive(Clone)]
61104
pub struct Write<B: Blob> {
62-
/// The underlying blob to write to.
63-
blob: B,
64-
65-
/// Shared tip buffer and durability state.
66-
state: Arc<AsyncRwLock<State>>,
105+
/// Shared blob, tip buffer, and durability state.
106+
state: Arc<AsyncRwLock<State<B>>>,
67107
}
68108

69109
impl<B: Blob> Write<B> {
70110
/// Creates a new [Write] that buffers up to `capacity` bytes of data to be appended to the tip
71111
/// of `blob` with the provided `size`.
72112
pub fn new(blob: B, size: u64, capacity: NonZeroUsize, pool: BufferPool) -> Self {
73113
Self {
74-
blob,
75114
state: Arc::new(AsyncRwLock::new(State {
115+
blob,
76116
buffer: Buffer::new(size, capacity.get(), pool),
77117
needs_sync: false,
78118
})),
@@ -128,15 +168,15 @@ impl<B: Blob> Write<B> {
128168

129169
// Entirely in blob.
130170
if end_offset <= buffer.offset {
131-
return Ok(self.blob.read_at(offset, len).await?.freeze());
171+
return state.read_at(offset, len).await;
132172
}
133173

134174
// Overlaps blob and buffered tip.
135175
let blob_len = (buffer.offset - offset) as usize;
136176
let tip_len = len - blob_len;
137177
let tip = buffer.slice(..tip_len);
138178

139-
let mut blob = self.blob.read_at(offset, blob_len).await?.freeze();
179+
let mut blob = state.read_at(offset, blob_len).await?;
140180
blob.append(tip);
141181
Ok(blob)
142182
}
@@ -177,8 +217,7 @@ impl<B: Blob> Write<B> {
177217
let chunk_end = current_offset + chunk_len as u64;
178218
if state.buffer.offset < chunk_end {
179219
if let Some((old_buf, old_offset)) = state.buffer.take() {
180-
self.blob.write_at(old_offset, old_buf).await?;
181-
state.needs_sync = true;
220+
state.write_at(old_offset, old_buf).await?;
182221
if state.buffer.merge(chunk, current_offset) {
183222
bufs.advance(chunk_len);
184223
current_offset += chunk_len as u64;
@@ -192,8 +231,7 @@ impl<B: Blob> Write<B> {
192231
// once when the buffer is flushed above, then again when we write the chunk
193232
// below. Removing this inefficiency may not be worth the additional complexity.
194233
let direct = bufs.split_to(chunk_len);
195-
self.blob.write_at(current_offset, direct).await?;
196-
state.needs_sync = true;
234+
state.write_at(current_offset, direct).await?;
197235
current_offset += chunk_len as u64;
198236

199237
// Maintain the "buffer at tip" invariant by advancing offset to the end of this
@@ -216,13 +254,11 @@ impl<B: Blob> Write<B> {
216254
//
217255
// This can only happen if the new size is greater than the current size.
218256
if let Some((buf, offset)) = state.buffer.resize(len) {
219-
self.blob.write_at(offset, buf).await?;
220-
state.needs_sync = true;
257+
state.write_at(offset, buf).await?;
221258
}
222259

223260
// Resize the underlying blob.
224-
self.blob.resize(len).await?;
225-
state.needs_sync = true;
261+
state.resize(len).await?;
226262

227263
Ok(())
228264
}
@@ -231,22 +267,9 @@ impl<B: Blob> Write<B> {
231267
pub async fn sync(&self) -> Result<(), Error> {
232268
let mut state = self.state.write().await;
233269
if let Some((buf, offset)) = state.buffer.take() {
234-
if !state.needs_sync {
235-
return self.blob.write_at_sync(offset, buf).await;
236-
}
237-
238-
self.blob.write_at(offset, buf).await?;
239-
self.blob.sync().await?;
240-
state.needs_sync = false;
241-
return Ok(());
242-
}
243-
244-
if !state.needs_sync {
245-
return Ok(());
270+
return state.write_at_sync(offset, buf).await;
246271
}
247272

248-
self.blob.sync().await?;
249-
state.needs_sync = false;
250-
Ok(())
273+
state.sync().await
251274
}
252275
}

0 commit comments

Comments
 (0)