Skip to content

Commit 1598eee

Browse files
[runtime/storage] remove Blob::len(); add blob length to Storage::open return type (#809)
Co-authored-by: Patrick O'Grady <[email protected]>
1 parent 47eae0f commit 1598eee

14 files changed

Lines changed: 255 additions & 292 deletions

File tree

runtime/src/deterministic.rs

Lines changed: 6 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -255,16 +255,6 @@ impl Auditor {
255255
*hash = hasher.finalize().to_vec();
256256
}
257257

258-
pub(crate) fn len(&self, partition: &str, name: &[u8]) {
259-
let mut hash = self.hash.lock().unwrap();
260-
let mut hasher = Sha256::new();
261-
hasher.update(&*hash);
262-
hasher.update(b"len");
263-
hasher.update(partition.as_bytes());
264-
hasher.update(name);
265-
*hash = hasher.finalize().to_vec();
266-
}
267-
268258
pub(crate) fn read_at(&self, partition: &str, name: &[u8], buf: usize, offset: u64) {
269259
let mut hash = self.hash.lock().unwrap();
270260
let mut hasher = Sha256::new();
@@ -1338,7 +1328,7 @@ impl CryptoRng for Context {}
13381328
impl crate::Storage for Context {
13391329
type Blob = <MeteredStorage<AuditedStorage<MemStorage>> as crate::Storage>::Blob;
13401330

1341-
async fn open(&self, partition: &str, name: &[u8]) -> Result<Self::Blob, Error> {
1331+
async fn open(&self, partition: &str, name: &[u8]) -> Result<(Self::Blob, u64), Error> {
13421332
self.storage.open(partition, name).await
13431333
}
13441334

@@ -1461,7 +1451,7 @@ mod tests {
14611451

14621452
// Run some tasks, sync storage, and recover the runtime
14631453
let (context, state) = executor1.start(|context| async move {
1464-
let blob = context.open(partition, name).await.unwrap();
1454+
let (blob, _) = context.open(partition, name).await.unwrap();
14651455
blob.write_at(data, 0).await.unwrap();
14661456
blob.sync().await.unwrap();
14671457
let state = context.auditor().state();
@@ -1475,10 +1465,9 @@ mod tests {
14751465
// Check that synced storage persists after recovery
14761466
let executor = Runner::from(recovered_context);
14771467
executor.start(|context| async move {
1478-
let blob = context.open(partition, name).await.unwrap();
1479-
let len = blob.len().await.unwrap();
1468+
let (blob, len) = context.open(partition, name).await.unwrap();
14801469
assert_eq!(len, data.len() as u64);
1481-
let mut buf = vec![0; len as usize];
1470+
let mut buf = vec![0; data.len()];
14821471
blob.read_at(&mut buf, 0).await.unwrap();
14831472
assert_eq!(buf, data);
14841473
});
@@ -1495,7 +1484,7 @@ mod tests {
14951484
// Run some tasks without syncing storage
14961485
let context = executor.start(|context| async move {
14971486
let context = context.clone();
1498-
let blob = context.open(partition, name).await.unwrap();
1487+
let (blob, _) = context.open(partition, name).await.unwrap();
14991488
blob.write_at(&data, 0).await.unwrap();
15001489
// Intentionally do not call sync() here
15011490
context
@@ -1507,8 +1496,7 @@ mod tests {
15071496

15081497
// Check that unsynced storage does not persist after recovery
15091498
executor.start(|context| async move {
1510-
let blob = context.open(partition, name).await.unwrap();
1511-
let len = blob.len().await.unwrap();
1499+
let (_, len) = context.open(partition, name).await.unwrap();
15121500
assert_eq!(len, 0);
15131501
});
15141502
}

runtime/src/lib.rs

Lines changed: 17 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -270,15 +270,16 @@ pub trait Storage: Clone + Send + Sync + 'static {
270270
/// The readable/writeable storage buffer that can be opened by this Storage.
271271
type Blob: Blob;
272272

273-
/// Open an existing blob in a given partition or create a new one.
273+
/// Open an existing blob in a given partition or create a new one, returning
274+
/// the blob and its length.
274275
///
275276
/// Multiple instances of the same blob can be opened concurrently, however,
276277
/// writing to the same blob concurrently may lead to undefined behavior.
277278
fn open(
278279
&self,
279280
partition: &str,
280281
name: &[u8],
281-
) -> impl Future<Output = Result<Self::Blob, Error>> + Send;
282+
) -> impl Future<Output = Result<(Self::Blob, u64), Error>> + Send;
282283

283284
/// Remove a blob from a given partition.
284285
///
@@ -305,9 +306,6 @@ pub trait Storage: Clone + Send + Sync + 'static {
305306
/// and writing to both is undefined behavior.
306307
#[allow(clippy::len_without_is_empty)]
307308
pub trait Blob: Clone + Send + Sync + 'static {
308-
/// Get the length of the blob.
309-
fn len(&self) -> impl Future<Output = Result<u64, Error>> + Send;
310-
311309
/// Read from the blob at the given offset.
312310
///
313311
/// `read_at` does not return the number of bytes read because it
@@ -521,7 +519,7 @@ mod tests {
521519
let name = b"test_blob";
522520

523521
// Open a new blob
524-
let blob = context
522+
let (blob, _) = context
525523
.open(partition, name)
526524
.await
527525
.expect("Failed to open blob");
@@ -542,10 +540,6 @@ mod tests {
542540
.expect("Failed to read from blob");
543541
assert_eq!(&buffer, data);
544542

545-
// Get blob length
546-
let length = blob.len().await.expect("Failed to get blob length");
547-
assert_eq!(length, data.len() as u64);
548-
549543
// Close the blob
550544
blob.close().await.expect("Failed to close blob");
551545

@@ -557,10 +551,11 @@ mod tests {
557551
assert!(blobs.contains(&name.to_vec()));
558552

559553
// Reopen the blob
560-
let blob = context
554+
let (blob, len) = context
561555
.open(partition, name)
562556
.await
563557
.expect("Failed to reopen blob");
558+
assert_eq!(len, data.len() as u64);
564559

565560
// Read data part of message back
566561
let mut buffer = vec![0u8; 7];
@@ -606,7 +601,7 @@ mod tests {
606601
let name = b"test_blob_rw";
607602

608603
// Open a new blob
609-
let blob = context
604+
let (blob, _) = context
610605
.open(partition, name)
611606
.await
612607
.expect("Failed to open blob");
@@ -621,10 +616,6 @@ mod tests {
621616
.await
622617
.expect("Failed to write data2");
623618

624-
// Assert that length tracks pending data
625-
let length = blob.len().await.expect("Failed to get blob length");
626-
assert_eq!(length, 10);
627-
628619
// Read data back
629620
let mut buffer = vec![0u8; 10];
630621
blob.read_at(&mut buffer, 0)
@@ -638,13 +629,9 @@ mod tests {
638629
blob.write_at(data3, 5)
639630
.await
640631
.expect("Failed to write data3");
641-
let length = blob.len().await.expect("Failed to get blob length");
642-
assert_eq!(length, 10);
643632

644633
// Truncate the blob
645634
blob.truncate(5).await.expect("Failed to truncate blob");
646-
let length = blob.len().await.expect("Failed to get blob length");
647-
assert_eq!(length, 5);
648635
let mut buffer = vec![0u8; 5];
649636
blob.read_at(&mut buffer, 0)
650637
.await
@@ -654,7 +641,7 @@ mod tests {
654641
// Full read after truncation
655642
let mut buffer = vec![0u8; 10];
656643
let result = blob.read_at(&mut buffer, 0).await;
657-
assert!(matches!(result, Err(Error::BlobInsufficientLength)));
644+
assert!(result.is_err());
658645

659646
// Close the blob
660647
blob.close().await.expect("Failed to close blob");
@@ -668,17 +655,17 @@ mod tests {
668655
runner.start(|context| async move {
669656
let partitions = ["partition1", "partition2", "partition3"];
670657
let name = b"test_blob_rw";
658+
let data1 = b"Hello";
659+
let data2 = b"World";
671660

672661
for (additional, partition) in partitions.iter().enumerate() {
673662
// Open a new blob
674-
let blob = context
663+
let (blob, _) = context
675664
.open(partition, name)
676665
.await
677666
.expect("Failed to open blob");
678667

679668
// Write data at different offsets
680-
let data1 = b"Hello";
681-
let data2 = b"World";
682669
blob.write_at(data1, 0)
683670
.await
684671
.expect("Failed to write data1");
@@ -692,10 +679,11 @@ mod tests {
692679

693680
for (additional, partition) in partitions.iter().enumerate() {
694681
// Open a new blob
695-
let blob = context
682+
let (blob, len) = context
696683
.open(partition, name)
697684
.await
698685
.expect("Failed to open blob");
686+
assert_eq!(len, (data1.len() + data2.len() + additional) as u64);
699687

700688
// Read data back
701689
let mut buffer = vec![0u8; 10 + additional];
@@ -720,15 +708,15 @@ mod tests {
720708
let name = b"test_blob_rw";
721709

722710
// Open a new blob
723-
let blob = context
711+
let (blob, _) = context
724712
.open(partition, name)
725713
.await
726714
.expect("Failed to open blob");
727715

728716
// Read data past file length (empty file)
729717
let mut buffer = vec![0u8; 10];
730718
let result = blob.read_at(&mut buffer, 0).await;
731-
assert!(matches!(result, Err(Error::BlobInsufficientLength)));
719+
assert!(result.is_err());
732720

733721
// Write data to the blob
734722
let data = b"Hello, Storage!";
@@ -739,7 +727,7 @@ mod tests {
739727
// Read data past file length (non-empty file)
740728
let mut buffer = vec![0u8; 20];
741729
let result = blob.read_at(&mut buffer, 0).await;
742-
assert!(matches!(result, Err(Error::BlobInsufficientLength)));
730+
assert!(result.is_err());
743731
})
744732
}
745733

@@ -752,7 +740,7 @@ mod tests {
752740
let name = b"test_blob_rw";
753741

754742
// Open a new blob
755-
let blob = context
743+
let (blob, _) = context
756744
.open(partition, name)
757745
.await
758746
.expect("Failed to open blob");
@@ -800,10 +788,6 @@ mod tests {
800788
.expect("Failed to read from blob");
801789
assert_eq!(&buffer, data);
802790

803-
// Get blob length
804-
let length = blob.len().await.expect("Failed to get blob length");
805-
assert_eq!(length, data.len() as u64);
806-
807791
// Close the blob
808792
blob.close().await.expect("Failed to close blob");
809793

runtime/src/storage/audited.rs

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,18 @@ impl<S: crate::Storage> Storage<S> {
1616
impl<S: crate::Storage> crate::Storage for Storage<S> {
1717
type Blob = Blob<S::Blob>;
1818

19-
async fn open(&self, partition: &str, name: &[u8]) -> Result<Self::Blob, Error> {
19+
async fn open(&self, partition: &str, name: &[u8]) -> Result<(Self::Blob, u64), Error> {
2020
self.auditor.open(partition, name);
21-
self.inner.open(partition, name).await.map(|blob| Blob {
22-
auditor: self.auditor.clone(),
23-
inner: blob,
24-
partition: partition.into(),
25-
name: name.to_vec(),
21+
self.inner.open(partition, name).await.map(|(blob, len)| {
22+
(
23+
Blob {
24+
auditor: self.auditor.clone(),
25+
inner: blob,
26+
partition: partition.into(),
27+
name: name.to_vec(),
28+
},
29+
len,
30+
)
2631
})
2732
}
2833

@@ -46,11 +51,6 @@ pub struct Blob<B: crate::Blob> {
4651
}
4752

4853
impl<B: crate::Blob> crate::Blob for Blob<B> {
49-
async fn len(&self) -> Result<u64, Error> {
50-
self.auditor.len(&self.partition, &self.name);
51-
self.inner.len().await
52-
}
53-
5454
async fn read_at(&self, buf: &mut [u8], offset: u64) -> Result<(), Error> {
5555
self.auditor
5656
.read_at(&self.partition, &self.name, buf.len(), offset);
@@ -114,8 +114,8 @@ mod tests {
114114
let storage2 = AuditedStorage::new(inner2, auditor2.clone());
115115

116116
// Perform a sequence of operations on both storages simultaneously
117-
let blob1 = storage1.open("partition", b"test_blob").await.unwrap();
118-
let blob2 = storage2.open("partition", b"test_blob").await.unwrap();
117+
let (blob1, _) = storage1.open("partition", b"test_blob").await.unwrap();
118+
let (blob2, _) = storage2.open("partition", b"test_blob").await.unwrap();
119119

120120
// Write data to the blobs
121121
blob1.write_at(b"hello world", 0).await.unwrap();
@@ -142,10 +142,6 @@ mod tests {
142142
// Truncate the blobs
143143
blob1.truncate(5).await.unwrap();
144144
blob2.truncate(5).await.unwrap();
145-
let len1 = blob1.len().await.unwrap();
146-
let len2 = blob2.len().await.unwrap();
147-
assert_eq!(len1, 5, "Blob1 length after truncation is incorrect");
148-
assert_eq!(len2, 5, "Blob2 length after truncation is incorrect");
149145
assert_eq!(
150146
auditor1.state(),
151147
auditor2.state(),

runtime/src/storage/memory.rs

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
1+
use commonware_utils::hex;
12
use std::collections::HashMap;
23
use std::sync::{Arc, Mutex, RwLock};
34

4-
use commonware_utils::hex;
5-
65
/// In-memory storage implementation for the commonware runtime.
76
#[derive(Clone)]
87
pub struct Storage {
@@ -20,15 +19,18 @@ impl Default for Storage {
2019
impl crate::Storage for Storage {
2120
type Blob = Blob;
2221

23-
async fn open(&self, partition: &str, name: &[u8]) -> Result<Self::Blob, crate::Error> {
22+
async fn open(&self, partition: &str, name: &[u8]) -> Result<(Self::Blob, u64), crate::Error> {
2423
let mut partitions = self.partitions.lock().unwrap();
2524
let partition_entry = partitions.entry(partition.into()).or_default();
2625
let content = partition_entry.entry(name.into()).or_default();
27-
Ok(Blob::new(
28-
self.partitions.clone(),
29-
partition.into(),
30-
name,
31-
content.clone(),
26+
Ok((
27+
Blob::new(
28+
self.partitions.clone(),
29+
partition.into(),
30+
name,
31+
content.clone(),
32+
),
33+
content.len() as u64,
3234
))
3335
}
3436

@@ -92,11 +94,6 @@ impl Blob {
9294
}
9395

9496
impl crate::Blob for Blob {
95-
async fn len(&self) -> Result<u64, crate::Error> {
96-
let content = self.content.read().unwrap();
97-
Ok(content.len() as u64)
98-
}
99-
10097
async fn read_at(&self, buf: &mut [u8], offset: u64) -> Result<(), crate::Error> {
10198
let offset = offset
10299
.try_into()

0 commit comments

Comments
 (0)