Skip to content

Commit b9c5957

Browse files
authored
[Storage] Move tokio-based storage implementation to its own module (#736)
1 parent 2a4fe82 commit b9c5957

5 files changed

Lines changed: 335 additions & 266 deletions

File tree

runtime/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ pub mod mocks;
3030
cfg_if::cfg_if! {
3131
if #[cfg(not(target_arch = "wasm32"))] {
3232
pub mod tokio;
33+
mod storage;
3334
}
3435
}
3536
pub mod telemetry;

runtime/src/storage/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
//! Implementations of the `Storage` trait that can be used by the runtime.
2+
pub mod tokio;

runtime/src/storage/tokio.rs

Lines changed: 313 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,313 @@
1+
use crate::Error;
2+
use commonware_utils::{from_hex, hex};
3+
use prometheus_client::metrics::{counter::Counter, gauge::Gauge};
4+
use prometheus_client::registry::Registry;
5+
use std::io::SeekFrom;
6+
use std::{path::PathBuf, sync::Arc};
7+
use tokio::{
8+
fs,
9+
io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt},
10+
sync::Mutex as AsyncMutex,
11+
};
12+
13+
pub struct Metrics {
14+
pub open_blobs: Gauge,
15+
pub storage_reads: Counter,
16+
pub storage_read_bytes: Counter,
17+
pub storage_writes: Counter,
18+
pub storage_write_bytes: Counter,
19+
}
20+
21+
impl Metrics {
22+
/// Initialize the `Metrics` struct and register the metrics in the provided registry.
23+
pub fn new(registry: &mut Registry) -> Self {
24+
let metrics = Self {
25+
open_blobs: Gauge::default(),
26+
storage_reads: Counter::default(),
27+
storage_read_bytes: Counter::default(),
28+
storage_writes: Counter::default(),
29+
storage_write_bytes: Counter::default(),
30+
};
31+
32+
registry.register(
33+
"open_blobs",
34+
"Number of open blobs",
35+
metrics.open_blobs.clone(),
36+
);
37+
registry.register(
38+
"storage_reads",
39+
"Total number of disk reads",
40+
metrics.storage_reads.clone(),
41+
);
42+
registry.register(
43+
"storage_read_bytes",
44+
"Total amount of data read from disk",
45+
metrics.storage_read_bytes.clone(),
46+
);
47+
registry.register(
48+
"storage_writes",
49+
"Total number of disk writes",
50+
metrics.storage_writes.clone(),
51+
);
52+
registry.register(
53+
"storage_write_bytes",
54+
"Total amount of data written to disk",
55+
metrics.storage_write_bytes.clone(),
56+
);
57+
58+
metrics
59+
}
60+
}
61+
62+
#[derive(Clone)]
63+
pub struct Config {
64+
pub storage_directory: PathBuf,
65+
pub maximum_buffer_size: usize,
66+
}
67+
68+
impl Config {
69+
pub fn new(storage_directory: PathBuf, maximum_buffer_size: usize) -> Self {
70+
Self {
71+
storage_directory,
72+
maximum_buffer_size,
73+
}
74+
}
75+
}
76+
77+
#[derive(Clone)]
78+
pub struct Storage {
79+
metrics: Arc<Metrics>,
80+
fs: Arc<AsyncMutex<()>>,
81+
cfg: Config,
82+
}
83+
84+
impl Storage {
85+
pub fn new(metrics: &mut Registry, cfg: Config) -> Self {
86+
Self {
87+
metrics: Arc::new(Metrics::new(metrics)),
88+
fs: AsyncMutex::new(()).into(),
89+
cfg,
90+
}
91+
}
92+
}
93+
94+
pub struct Blob {
95+
metrics: Arc<Metrics>,
96+
partition: String,
97+
name: Vec<u8>,
98+
// Files must be seeked prior to any read or write operation and are thus
99+
// not safe to concurrently interact with. If we switched to mapping files
100+
// we could remove this lock.
101+
//
102+
// We also track the virtual file size because metadata isn't updated until
103+
// the file is synced (not to mention it is a lot less fs calls).
104+
file: Arc<AsyncMutex<(fs::File, u64)>>,
105+
}
106+
107+
impl Blob {
108+
fn new(
109+
metrics: Arc<Metrics>,
110+
partition: String,
111+
name: &[u8],
112+
file: fs::File,
113+
len: u64,
114+
) -> Self {
115+
metrics.open_blobs.inc();
116+
Self {
117+
metrics,
118+
partition,
119+
name: name.into(),
120+
file: Arc::new(AsyncMutex::new((file, len))),
121+
}
122+
}
123+
}
124+
125+
impl Clone for Blob {
126+
fn clone(&self) -> Self {
127+
// We implement `Clone` manually to ensure the `open_blobs` gauge is updated.
128+
self.metrics.open_blobs.inc();
129+
Self {
130+
metrics: self.metrics.clone(),
131+
partition: self.partition.clone(),
132+
name: self.name.clone(),
133+
file: self.file.clone(),
134+
}
135+
}
136+
}
137+
138+
impl crate::Storage for Storage {
139+
type Blob = Blob;
140+
141+
async fn open(&self, partition: &str, name: &[u8]) -> Result<Blob, Error> {
142+
// Acquire the filesystem lock
143+
let _guard = self.fs.lock().await;
144+
145+
// Construct the full path
146+
let path = self.cfg.storage_directory.join(partition).join(hex(name));
147+
let parent = match path.parent() {
148+
Some(parent) => parent,
149+
None => return Err(Error::PartitionCreationFailed(partition.into())),
150+
};
151+
152+
// Create the partition directory if it does not exist
153+
fs::create_dir_all(parent)
154+
.await
155+
.map_err(|_| Error::PartitionCreationFailed(partition.into()))?;
156+
157+
// Open the file in read-write mode, create if it does not exist
158+
let mut file = fs::OpenOptions::new()
159+
.read(true)
160+
.write(true)
161+
.create(true)
162+
.truncate(false)
163+
.open(&path)
164+
.await
165+
.map_err(|_| Error::BlobOpenFailed(partition.into(), hex(name)))?;
166+
167+
// Set the maximum buffer size
168+
file.set_max_buf_size(self.cfg.maximum_buffer_size);
169+
170+
// Get the file length
171+
let len = file.metadata().await.map_err(|_| Error::ReadFailed)?.len();
172+
173+
// Construct the blob
174+
Ok(Blob::new(
175+
self.metrics.clone(),
176+
partition.into(),
177+
name,
178+
file,
179+
len,
180+
))
181+
}
182+
183+
async fn remove(&self, partition: &str, name: Option<&[u8]>) -> Result<(), Error> {
184+
// Acquire the filesystem lock
185+
let _guard = self.fs.lock().await;
186+
187+
// Remove all related files
188+
let path = self.cfg.storage_directory.join(partition);
189+
if let Some(name) = name {
190+
let blob_path = path.join(hex(name));
191+
fs::remove_file(blob_path)
192+
.await
193+
.map_err(|_| Error::BlobMissing(partition.into(), hex(name)))?;
194+
} else {
195+
fs::remove_dir_all(path)
196+
.await
197+
.map_err(|_| Error::PartitionMissing(partition.into()))?;
198+
}
199+
Ok(())
200+
}
201+
202+
async fn scan(&self, partition: &str) -> Result<Vec<Vec<u8>>, Error> {
203+
// Acquire the filesystem lock
204+
let _guard = self.fs.lock().await;
205+
206+
// Scan the partition directory
207+
let path = self.cfg.storage_directory.join(partition);
208+
let mut entries = fs::read_dir(path)
209+
.await
210+
.map_err(|_| Error::PartitionMissing(partition.into()))?;
211+
let mut blobs = Vec::new();
212+
while let Some(entry) = entries.next_entry().await.map_err(|_| Error::ReadFailed)? {
213+
let file_type = entry.file_type().await.map_err(|_| Error::ReadFailed)?;
214+
if !file_type.is_file() {
215+
return Err(Error::PartitionCorrupt(partition.into()));
216+
}
217+
if let Some(name) = entry.file_name().to_str() {
218+
let name = from_hex(name).ok_or(Error::PartitionCorrupt(partition.into()))?;
219+
blobs.push(name);
220+
}
221+
}
222+
Ok(blobs)
223+
}
224+
}
225+
226+
impl crate::Blob for Blob {
227+
async fn len(&self) -> Result<u64, Error> {
228+
let (_, len) = *self.file.lock().await;
229+
Ok(len)
230+
}
231+
232+
async fn read_at(&self, buf: &mut [u8], offset: u64) -> Result<(), Error> {
233+
// Ensure the read is within bounds
234+
let mut file = self.file.lock().await;
235+
if offset + buf.len() as u64 > file.1 {
236+
return Err(Error::BlobInsufficientLength);
237+
}
238+
239+
// Perform the read
240+
file.0
241+
.seek(SeekFrom::Start(offset))
242+
.await
243+
.map_err(|_| Error::ReadFailed)?;
244+
file.0
245+
.read_exact(buf)
246+
.await
247+
.map_err(|_| Error::ReadFailed)?;
248+
self.metrics.storage_reads.inc();
249+
self.metrics.storage_read_bytes.inc_by(buf.len() as u64);
250+
Ok(())
251+
}
252+
253+
async fn write_at(&self, buf: &[u8], offset: u64) -> Result<(), Error> {
254+
// Perform the write
255+
let mut file = self.file.lock().await;
256+
file.0
257+
.seek(SeekFrom::Start(offset))
258+
.await
259+
.map_err(|_| Error::WriteFailed)?;
260+
file.0
261+
.write_all(buf)
262+
.await
263+
.map_err(|_| Error::WriteFailed)?;
264+
265+
// Update the virtual file size
266+
let max_len = offset + buf.len() as u64;
267+
if max_len > file.1 {
268+
file.1 = max_len;
269+
}
270+
self.metrics.storage_writes.inc();
271+
self.metrics.storage_write_bytes.inc_by(buf.len() as u64);
272+
Ok(())
273+
}
274+
275+
async fn truncate(&self, len: u64) -> Result<(), Error> {
276+
// Perform the truncate
277+
let mut file = self.file.lock().await;
278+
file.0
279+
.set_len(len)
280+
.await
281+
.map_err(|_| Error::BlobTruncateFailed(self.partition.clone(), hex(&self.name)))?;
282+
283+
// Update the virtual file size
284+
file.1 = len;
285+
Ok(())
286+
}
287+
288+
async fn sync(&self) -> Result<(), Error> {
289+
let file = self.file.lock().await;
290+
file.0
291+
.sync_all()
292+
.await
293+
.map_err(|_| Error::BlobSyncFailed(self.partition.clone(), hex(&self.name)))
294+
}
295+
296+
async fn close(self) -> Result<(), Error> {
297+
let mut file = self.file.lock().await;
298+
file.0
299+
.sync_all()
300+
.await
301+
.map_err(|_| Error::BlobSyncFailed(self.partition.clone(), hex(&self.name)))?;
302+
file.0
303+
.shutdown()
304+
.await
305+
.map_err(|_| Error::BlobCloseFailed(self.partition.clone(), hex(&self.name)))
306+
}
307+
}
308+
309+
impl Drop for Blob {
310+
fn drop(&mut self) {
311+
self.metrics.open_blobs.dec();
312+
}
313+
}

0 commit comments

Comments
 (0)