Skip to content

Commit 0fdfd16

Browse files
danlaineclabby
andauthored
[Storage] Add read-many methods (#3574)
Co-authored-by: clabby <ben@clab.by>
1 parent d90fe22 commit 0fdfd16

5 files changed

Lines changed: 709 additions & 1 deletion

File tree

runtime/src/utils/buffer/paged/append.rs

Lines changed: 311 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -451,6 +451,95 @@ impl<B: Blob> Append<B> {
451451
Ok((bufs, available))
452452
}
453453

454+
/// Read multiple fixed-size items at sorted byte offsets into a contiguous caller buffer.
455+
///
456+
/// `buf` must be exactly `offsets.len() * item_size` bytes. All offsets must be sorted,
457+
/// non-overlapping, and within bounds. This amortizes lock acquisition and avoids
458+
/// per-item buffer allocation compared to calling [`read_at`](Self::read_at) in a loop.
459+
pub async fn read_many_into(
460+
&self,
461+
buf: &mut [u8],
462+
offsets: &[u64],
463+
item_size: usize,
464+
) -> Result<(), Error> {
465+
debug_assert_eq!(buf.len(), offsets.len() * item_size);
466+
if offsets.is_empty() {
467+
return Ok(());
468+
}
469+
470+
let last_end = offsets[offsets.len() - 1]
471+
.checked_add(item_size as u64)
472+
.ok_or(Error::OffsetOverflow)?;
473+
474+
// Acquire the buffer lock once for all items.
475+
let buffer = self.buffer.read().await;
476+
if last_end > buffer.size() {
477+
return Err(Error::BlobInsufficientLength);
478+
}
479+
480+
// Resolve tip-buffer overlap for all items, tracking which indices need cache reads.
481+
// cache_indices stores (item_index, byte_len, offset) for items needing cache reads.
482+
let mut cache_indices: Vec<(usize, usize, u64)> = Vec::new();
483+
for (i, &offset) in offsets.iter().enumerate() {
484+
let item_buf = &mut buf[i * item_size..(i + 1) * item_size];
485+
let end = offset + item_size as u64;
486+
487+
if end <= buffer.offset {
488+
// Entirely below tip -- needs cache read.
489+
cache_indices.push((i, item_size, offset));
490+
} else if offset >= buffer.offset {
491+
// Entirely in tip buffer.
492+
let src = (offset - buffer.offset) as usize;
493+
item_buf.copy_from_slice(&buffer.as_ref()[src..src + item_size]);
494+
} else {
495+
// Straddles tip boundary.
496+
let prefix_len = (buffer.offset - offset) as usize;
497+
item_buf[prefix_len..].copy_from_slice(&buffer.as_ref()[..item_size - prefix_len]);
498+
cache_indices.push((i, prefix_len, offset));
499+
}
500+
}
501+
502+
drop(buffer);
503+
504+
if cache_indices.is_empty() {
505+
return Ok(());
506+
}
507+
508+
// Build mutable slices for the cache read. We split buf into non-overlapping
509+
// sub-slices using raw pointer arithmetic (items are at fixed strides).
510+
// SAFETY: Each (index, len) pair refers to a disjoint region of buf since
511+
// indices are unique and item_size-aligned.
512+
let mut cache_ranges: Vec<(&mut [u8], u64)> = cache_indices
513+
.iter()
514+
.map(|&(idx, len, offset)| {
515+
let start = idx * item_size;
516+
// SAFETY: start < buf.len() because idx < offsets.len() and
517+
// buf.len() == offsets.len() * item_size.
518+
let ptr = unsafe { buf.as_mut_ptr().add(start) };
519+
// SAFETY: Each idx is unique so the resulting slices are
520+
// non-overlapping, and len <= item_size keeps each within its slot.
521+
let slice = unsafe { core::slice::from_raw_parts_mut(ptr, len) };
522+
(slice, offset)
523+
})
524+
.collect();
525+
526+
// Fast path: try page cache for all ranges in a single lock acquisition.
527+
let fully_cached = self.cache_ref.read_cached_many(self.id, &mut cache_ranges);
528+
529+
if fully_cached == cache_ranges.len() {
530+
return Ok(());
531+
}
532+
533+
// Slow path: cache miss on some ranges. Fall back to per-range reads.
534+
let blob_guard = self.blob_state.read().await;
535+
for (item_buf, offset) in &mut cache_ranges[fully_cached..] {
536+
self.cache_ref
537+
.read(&blob_guard.blob, self.id, item_buf, *offset)
538+
.await?;
539+
}
540+
Ok(())
541+
}
542+
454543
/// Reads bytes starting at `logical_offset` into `buf`.
455544
///
456545
/// This method allows reading directly into a mutable slice without taking ownership of the
@@ -836,7 +925,9 @@ impl<B: Blob> Append<B> {
836925
#[cfg(test)]
837926
mod tests {
838927
use super::*;
839-
use crate::{deterministic, BufferPool, BufferPoolConfig, Runner as _, Storage as _};
928+
use crate::{
929+
deterministic, BufferPool, BufferPoolConfig, Metrics as _, Runner as _, Storage as _,
930+
};
840931
use commonware_codec::ReadExt;
841932
use commonware_macros::test_traced;
842933
use commonware_utils::{NZUsize, NZU16};
@@ -846,6 +937,225 @@ mod tests {
846937
const PAGE_SIZE: NonZeroU16 = NZU16!(103); // janky size to ensure we test page alignment
847938
const BUFFER_SIZE: usize = PAGE_SIZE.get() as usize * 2;
848939

940+
#[test_traced("DEBUG")]
941+
fn test_read_many_into_empty() {
942+
let executor = deterministic::Runner::default();
943+
executor.start(|context: deterministic::Context| async move {
944+
let (blob, blob_size) = context.open("test_partition", b"rmany").await.unwrap();
945+
let cache_ref = CacheRef::from_pooler(
946+
&context.with_label("cache"),
947+
PAGE_SIZE,
948+
NZUsize!(BUFFER_SIZE),
949+
);
950+
let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref)
951+
.await
952+
.unwrap();
953+
954+
append.append(&[0u8; 8]).await.unwrap();
955+
assert_eq!(append.size().await, 8);
956+
957+
// Empty offsets should succeed immediately.
958+
let mut buf = [];
959+
append.read_many_into(&mut buf, &[], 4).await.unwrap();
960+
});
961+
}
962+
963+
#[test_traced("DEBUG")]
964+
fn test_read_many_into_all_in_tip() {
965+
// All items reside in the unflushed tip buffer.
966+
let executor = deterministic::Runner::default();
967+
executor.start(|context: deterministic::Context| async move {
968+
let (blob, blob_size) = context.open("test_partition", b"rmany").await.unwrap();
969+
let cache_ref = CacheRef::from_pooler(
970+
&context.with_label("cache"),
971+
PAGE_SIZE,
972+
NZUsize!(BUFFER_SIZE),
973+
);
974+
let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref)
975+
.await
976+
.unwrap();
977+
978+
let data: Vec<u8> = (0..20).collect();
979+
append.append(&data).await.unwrap();
980+
assert_eq!(append.size().await, 20);
981+
982+
// Read 4-byte items at offsets 0, 4, 8, 12, 16.
983+
let offsets = [0u64, 4, 8, 12, 16];
984+
let mut buf = vec![0u8; 5 * 4];
985+
append.read_many_into(&mut buf, &offsets, 4).await.unwrap();
986+
987+
for (i, &off) in offsets.iter().enumerate() {
988+
assert_eq!(
989+
&buf[i * 4..(i + 1) * 4],
990+
&data[off as usize..off as usize + 4],
991+
);
992+
}
993+
});
994+
}
995+
996+
#[test_traced("DEBUG")]
997+
fn test_read_many_into_all_from_cache() {
998+
// Sync data to disk so tip buffer is empty; reads go through page cache / blob.
999+
let executor = deterministic::Runner::default();
1000+
executor.start(|context: deterministic::Context| async move {
1001+
let (blob, blob_size) = context.open("test_partition", b"rmany").await.unwrap();
1002+
let cache_ref = CacheRef::from_pooler(
1003+
&context.with_label("cache"),
1004+
PAGE_SIZE,
1005+
NZUsize!(BUFFER_SIZE),
1006+
);
1007+
let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref)
1008+
.await
1009+
.unwrap();
1010+
1011+
let data: Vec<u8> = (0..20).collect();
1012+
append.append(&data).await.unwrap();
1013+
append.sync().await.unwrap();
1014+
assert_eq!(append.size().await, 20);
1015+
1016+
let offsets = [0u64, 8, 16];
1017+
let mut buf = vec![0u8; 3 * 4];
1018+
append.read_many_into(&mut buf, &offsets, 4).await.unwrap();
1019+
1020+
for (i, &off) in offsets.iter().enumerate() {
1021+
assert_eq!(
1022+
&buf[i * 4..(i + 1) * 4],
1023+
&data[off as usize..off as usize + 4],
1024+
);
1025+
}
1026+
});
1027+
}
1028+
1029+
#[test_traced("DEBUG")]
1030+
fn test_read_many_into_mixed_tip_and_cache() {
1031+
// First chunk synced to disk, second chunk still in tip buffer.
1032+
let executor = deterministic::Runner::default();
1033+
executor.start(|context: deterministic::Context| async move {
1034+
let (blob, blob_size) = context.open("test_partition", b"rmany").await.unwrap();
1035+
let cache_ref = CacheRef::from_pooler(
1036+
&context.with_label("cache"),
1037+
PAGE_SIZE,
1038+
NZUsize!(BUFFER_SIZE),
1039+
);
1040+
let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref)
1041+
.await
1042+
.unwrap();
1043+
1044+
let first: Vec<u8> = (0..16).collect();
1045+
append.append(&first).await.unwrap();
1046+
append.sync().await.unwrap();
1047+
1048+
let second: Vec<u8> = (16..32).collect();
1049+
append.append(&second).await.unwrap();
1050+
assert_eq!(append.size().await, 32);
1051+
1052+
// Offsets span both synced and unsynced regions.
1053+
let offsets = [0u64, 4, 16, 24];
1054+
let mut buf = vec![0u8; 4 * 4];
1055+
append.read_many_into(&mut buf, &offsets, 4).await.unwrap();
1056+
1057+
let all: Vec<u8> = (0..32).collect();
1058+
for (i, &off) in offsets.iter().enumerate() {
1059+
assert_eq!(
1060+
&buf[i * 4..(i + 1) * 4],
1061+
&all[off as usize..off as usize + 4],
1062+
);
1063+
}
1064+
});
1065+
}
1066+
1067+
#[test_traced("DEBUG")]
1068+
fn test_read_many_into_out_of_bounds() {
1069+
let executor = deterministic::Runner::default();
1070+
executor.start(|context: deterministic::Context| async move {
1071+
let (blob, blob_size) = context.open("test_partition", b"rmany").await.unwrap();
1072+
let cache_ref = CacheRef::from_pooler(
1073+
&context.with_label("cache"),
1074+
PAGE_SIZE,
1075+
NZUsize!(BUFFER_SIZE),
1076+
);
1077+
let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref)
1078+
.await
1079+
.unwrap();
1080+
1081+
append.append(&[0u8; 8]).await.unwrap();
1082+
assert_eq!(append.size().await, 8);
1083+
1084+
// Last offset's end (8 + 4 = 12) exceeds size (8).
1085+
let mut buf = vec![0u8; 4];
1086+
let err = append.read_many_into(&mut buf, &[8], 4).await.unwrap_err();
1087+
assert!(matches!(err, Error::BlobInsufficientLength));
1088+
});
1089+
}
1090+
1091+
#[test_traced("DEBUG")]
1092+
fn test_read_many_into_single_item() {
1093+
let executor = deterministic::Runner::default();
1094+
executor.start(|context: deterministic::Context| async move {
1095+
let (blob, blob_size) = context.open("test_partition", b"rmany").await.unwrap();
1096+
let cache_ref = CacheRef::from_pooler(
1097+
&context.with_label("cache"),
1098+
PAGE_SIZE,
1099+
NZUsize!(BUFFER_SIZE),
1100+
);
1101+
let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref)
1102+
.await
1103+
.unwrap();
1104+
1105+
let data = vec![0xAA; 8];
1106+
append.append(&data).await.unwrap();
1107+
assert_eq!(append.size().await, 8);
1108+
1109+
let mut buf = vec![0u8; 8];
1110+
append.read_many_into(&mut buf, &[0], 8).await.unwrap();
1111+
assert_eq!(&buf, &data);
1112+
});
1113+
}
1114+
1115+
#[test_traced("DEBUG")]
1116+
fn test_read_many_into_matches_read_at() {
1117+
// Verify read_many_into returns the same bytes as individual read_at calls.
1118+
let executor = deterministic::Runner::default();
1119+
executor.start(|context: deterministic::Context| async move {
1120+
let (blob, blob_size) = context.open("test_partition", b"rmany").await.unwrap();
1121+
let cache_ref = CacheRef::from_pooler(
1122+
&context.with_label("cache"),
1123+
PAGE_SIZE,
1124+
NZUsize!(BUFFER_SIZE),
1125+
);
1126+
let append = Append::new(blob, blob_size, BUFFER_SIZE, cache_ref)
1127+
.await
1128+
.unwrap();
1129+
1130+
// Write enough data to span multiple pages (PAGE_SIZE=103).
1131+
let data: Vec<u8> = (0u8..=255).cycle().take(300).collect();
1132+
append.append(&data).await.unwrap();
1133+
append.sync().await.unwrap();
1134+
// Add more in tip buffer.
1135+
let more: Vec<u8> = (0u8..50).collect();
1136+
append.append(&more).await.unwrap();
1137+
assert_eq!(append.size().await, 350);
1138+
1139+
let item_size = 10;
1140+
let offsets: Vec<u64> = (0..35).map(|i| i * item_size as u64).collect();
1141+
let mut batch_buf = vec![0u8; offsets.len() * item_size];
1142+
append
1143+
.read_many_into(&mut batch_buf, &offsets, item_size)
1144+
.await
1145+
.unwrap();
1146+
1147+
// Compare each item against individual read_at.
1148+
for (i, &off) in offsets.iter().enumerate() {
1149+
let single = append.read_at(off, item_size).await.unwrap().coalesce();
1150+
assert_eq!(
1151+
&batch_buf[i * item_size..(i + 1) * item_size],
1152+
single.as_ref(),
1153+
"mismatch at offset {off}",
1154+
);
1155+
}
1156+
});
1157+
}
1158+
8491159
#[test_traced("DEBUG")]
8501160
fn test_append_crc_empty() {
8511161
let executor = deterministic::Runner::default();

runtime/src/utils/buffer/paged/cache.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,32 @@ impl CacheRef {
237237
original_len - buf.len()
238238
}
239239

240+
/// Read multiple disjoint byte ranges from the page cache in a single lock acquisition.
241+
///
242+
/// Each element of `ranges` is `(dest_slice, logical_offset)`. Returns the number of
243+
/// ranges that were *fully* read from cache before encountering a miss. Ranges must be
244+
/// sorted by offset and non-overlapping.
245+
pub(super) fn read_cached_many(&self, blob_id: u64, ranges: &mut [(&mut [u8], u64)]) -> usize {
246+
let page_cache = self.cache.read();
247+
let mut fully_read = 0;
248+
for (buf, logical_offset) in ranges.iter_mut() {
249+
let mut remaining = buf.len();
250+
let mut offset = *logical_offset;
251+
let mut dst = 0;
252+
while remaining > 0 {
253+
let count = page_cache.read_at(blob_id, &mut buf[dst..], offset);
254+
if count == 0 {
255+
return fully_read;
256+
}
257+
offset += count as u64;
258+
dst += count;
259+
remaining -= count;
260+
}
261+
fully_read += 1;
262+
}
263+
fully_read
264+
}
265+
240266
/// Read the specified bytes, preferentially from the page cache. Bytes not found in the cache
241267
/// will be read from the provided `blob` and cached for future reads.
242268
pub(super) async fn read<B: Blob>(

0 commit comments

Comments
 (0)