Skip to content

Commit 18bae6f

Browse files
[storage/journal] batch read/append journal optimizations (#3709)
Co-authored-by: Dan Laine <dan@commonware.xyz>
1 parent 5ceb723 commit 18bae6f

13 files changed

Lines changed: 856 additions & 208 deletions

File tree

storage/src/journal/authenticated.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use commonware_codec::{CodecFixedShared, CodecShared, Encode, EncodeShared};
2727
use commonware_cryptography::{Digest, Hasher};
2828
use commonware_parallel::{Sequential, Strategy};
2929
use core::num::NonZeroU64;
30-
use futures::{future::try_join_all, try_join, TryFutureExt as _};
30+
use futures::{try_join, TryFutureExt as _};
3131
use thiserror::Error;
3232
use tracing::{debug, warn};
3333

@@ -646,10 +646,8 @@ where
646646
)
647647
.await?;
648648

649-
let futures = (*start_loc..*end_loc)
650-
.map(|i| reader.read(i))
651-
.collect::<Vec<_>>();
652-
let ops = try_join_all(futures).await?;
649+
let positions: Vec<u64> = (*start_loc..*end_loc).collect();
650+
let ops = reader.read_many(&positions).await?;
653651

654652
Ok((proof, ops))
655653
}

storage/src/journal/benches/bench.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,15 @@ mod fixed_append;
1919
mod fixed_read_random;
2020
mod fixed_read_sequential;
2121
mod fixed_replay;
22+
mod variable_read_random;
2223
mod variable_replay;
2324

2425
criterion_main!(
2526
fixed_append::benches,
2627
fixed_read_random::benches,
2728
fixed_read_sequential::benches,
2829
fixed_replay::benches,
30+
variable_read_random::benches,
2931
variable_replay::benches,
3032
);
3133

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
use crate::{append_fixed_random_data, get_variable_journal};
2+
use commonware_runtime::{
3+
benchmarks::{context, tokio},
4+
tokio::{Config, Context, Runner},
5+
Runner as _, Supervisor as _,
6+
};
7+
use commonware_storage::journal::contiguous::{variable::Journal, Reader as _};
8+
use commonware_utils::{sequence::FixedBytes, NZU64};
9+
use criterion::{criterion_group, Criterion};
10+
use futures::future::try_join_all;
11+
use rand::{rngs::StdRng, Rng, SeedableRng};
12+
use std::{
13+
hint::black_box,
14+
num::NonZeroU64,
15+
time::{Duration, Instant},
16+
};
17+
18+
/// Partition name to use in the journal config.
19+
const PARTITION: &str = "variable-random-test-partition";
20+
21+
/// Value of items_per_section to use in the journal config.
22+
const ITEMS_PER_SECTION: NonZeroU64 = NZU64!(10_000);
23+
24+
/// Number of items to write to the journal we will be reading from.
25+
const ITEMS_TO_WRITE: u64 = 5_000_000;
26+
27+
/// Size of each journal item in bytes.
28+
const ITEM_SIZE: usize = 32;
29+
30+
/// Read `items_to_read` random items from the given `journal`, awaiting each
31+
/// result before continuing.
32+
async fn bench_run_serial(journal: &Journal<Context, FixedBytes<ITEM_SIZE>>, items_to_read: usize) {
33+
let reader = journal.reader().await;
34+
let mut rng = StdRng::seed_from_u64(0);
35+
for _ in 0..items_to_read {
36+
let pos = rng.gen_range(0..ITEMS_TO_WRITE);
37+
black_box(reader.read(pos).await.expect("failed to read data"));
38+
}
39+
}
40+
41+
/// Concurrently read (via try_join_all) `items_to_read` random items from the given `journal`.
42+
async fn bench_run_concurrent(
43+
journal: &Journal<Context, FixedBytes<ITEM_SIZE>>,
44+
items_to_read: usize,
45+
) {
46+
let reader = journal.reader().await;
47+
let mut rng = StdRng::seed_from_u64(0);
48+
let mut futures = Vec::with_capacity(items_to_read);
49+
for _ in 0..items_to_read {
50+
let pos = rng.gen_range(0..ITEMS_TO_WRITE);
51+
futures.push(reader.read(pos));
52+
}
53+
try_join_all(futures).await.expect("failed to read data");
54+
}
55+
56+
/// Batch-read `items_to_read` random items via `read_many`.
57+
async fn bench_run_read_many(
58+
journal: &Journal<Context, FixedBytes<ITEM_SIZE>>,
59+
items_to_read: usize,
60+
) {
61+
let reader = journal.reader().await;
62+
let mut rng = StdRng::seed_from_u64(0);
63+
let mut positions: Vec<u64> = (0..items_to_read)
64+
.map(|_| rng.gen_range(0..ITEMS_TO_WRITE))
65+
.collect();
66+
positions.sort_unstable();
67+
positions.dedup();
68+
black_box(
69+
reader
70+
.read_many(&positions)
71+
.await
72+
.expect("failed to read data"),
73+
);
74+
}
75+
76+
fn bench_variable_read_random(c: &mut Criterion) {
77+
let cfg = Config::default();
78+
let mut initialized = false;
79+
let runner = tokio::Runner::new(cfg.clone());
80+
for mode in ["serial", "concurrent", "read_many"] {
81+
for items_to_read in [100, 1_000, 10_000, 100_000] {
82+
c.bench_function(
83+
&format!(
84+
"{}/mode={} items={} size={}",
85+
module_path!(),
86+
mode,
87+
items_to_read,
88+
ITEM_SIZE
89+
),
90+
|b| {
91+
// Setup: populate journal (once, on first sample).
92+
if !initialized {
93+
Runner::new(cfg.clone()).start(|ctx| async move {
94+
let mut j =
95+
get_variable_journal(ctx, PARTITION, ITEMS_PER_SECTION).await;
96+
append_fixed_random_data::<_, ITEM_SIZE>(&mut j, ITEMS_TO_WRITE).await;
97+
j.sync().await.unwrap();
98+
});
99+
initialized = true;
100+
}
101+
102+
// Benchmark: measure read time.
103+
b.to_async(&runner).iter_custom(|iters| async move {
104+
let ctx = context::get::<commonware_runtime::tokio::Context>();
105+
let j = get_variable_journal(
106+
ctx.child("storage"),
107+
PARTITION,
108+
ITEMS_PER_SECTION,
109+
)
110+
.await;
111+
let mut duration = Duration::ZERO;
112+
for _ in 0..iters {
113+
let start = Instant::now();
114+
match mode {
115+
"serial" => bench_run_serial(&j, items_to_read).await,
116+
"concurrent" => bench_run_concurrent(&j, items_to_read).await,
117+
"read_many" => bench_run_read_many(&j, items_to_read).await,
118+
_ => unreachable!(),
119+
}
120+
duration += start.elapsed();
121+
}
122+
duration
123+
});
124+
},
125+
);
126+
}
127+
}
128+
129+
// Cleanup: destroy journal.
130+
if initialized {
131+
Runner::new(cfg).start(|context| async move {
132+
let j = get_variable_journal::<ITEM_SIZE>(context, PARTITION, ITEMS_PER_SECTION).await;
133+
j.destroy().await.unwrap();
134+
});
135+
}
136+
}
137+
138+
criterion_group! {
139+
name = benches;
140+
config = Criterion::default().sample_size(10);
141+
targets = bench_variable_read_random
142+
}

storage/src/journal/contiguous/fixed.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -158,14 +158,20 @@ impl<E: Context, A: CodecFixedShared> Inner<E, A> {
158158

159159
/// Read an item if it can be done synchronously (e.g. without I/O), returning `None` otherwise.
160160
fn try_read_sync(&self, pos: u64, items_per_blob: u64) -> Option<A> {
161+
let mut buf = vec![0u8; SegmentedJournal::<E, A>::CHUNK_SIZE];
162+
self.try_read_sync_into(pos, items_per_blob, &mut buf)
163+
}
164+
165+
/// Read an item synchronously using caller-provided buffer.
166+
fn try_read_sync_into(&self, pos: u64, items_per_blob: u64, buf: &mut [u8]) -> Option<A> {
161167
if pos >= self.size || pos < self.pruning_boundary {
162168
return None;
163169
}
164170
let section = pos / items_per_blob;
165171
let section_start = section * items_per_blob;
166172
let first_in_section = self.pruning_boundary.max(section_start);
167173
let pos_in_section = pos - first_in_section;
168-
self.journal.try_get_sync(section, pos_in_section)
174+
self.journal.try_get_sync_into(section, pos_in_section, buf)
169175
}
170176
}
171177

@@ -238,8 +244,12 @@ impl<E: Context, A: CodecFixedShared> super::Reader for Reader<'_, E, A> {
238244
let mut miss_indices: Vec<usize> = Vec::new();
239245
let mut miss_positions: Vec<u64> = Vec::new();
240246

247+
let mut sync_buf = vec![0u8; chunk_size];
241248
for (i, &pos) in positions.iter().enumerate() {
242-
if let Some(item) = self.guard.try_read_sync(pos, items_per_blob) {
249+
if let Some(item) = self
250+
.guard
251+
.try_read_sync_into(pos, items_per_blob, &mut sync_buf)
252+
{
243253
result.push(Some(item));
244254
} else {
245255
result.push(None);

storage/src/journal/contiguous/mod.rs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,11 @@ pub trait Reader: Send + Sync {
3535
/// Guaranteed not to return [Error::ItemPruned] for positions within `bounds()`.
3636
fn read(&self, position: u64) -> impl Future<Output = Result<Self::Item, Error>> + Send;
3737

38-
/// Read multiple items at the given positions, which must be sorted in ascending order.
38+
/// Read multiple items at the given positions. `positions` must be sorted in strictly ascending
39+
/// order (sorted and unique).
3940
///
40-
/// The default implementation calls [`read`](Self::read) in a loop.
41-
/// Fixed-size journal implementations override this to amortize lock
42-
/// acquisition and avoid per-item buffer allocation.
41+
/// The default implementation calls [`read`](Self::read) in a loop. Concrete journal
42+
/// implementations override this to amortize lock acquisition and batch I/O.
4343
fn read_many(
4444
&self,
4545
positions: &[u64],
@@ -111,6 +111,14 @@ pub enum Many<'a, T> {
111111
}
112112

113113
impl<T> Many<'_, T> {
114+
/// Returns the total number of items across all segments.
115+
pub fn len(&self) -> usize {
116+
match self {
117+
Self::Flat(items) => items.len(),
118+
Self::Nested(nested_items) => nested_items.iter().map(|items| items.len()).sum(),
119+
}
120+
}
121+
114122
/// Returns `true` if there are no items across all segments.
115123
pub fn is_empty(&self) -> bool {
116124
match self {

storage/src/journal/contiguous/tests.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ where
6868
test_persistence_basic(&indexed_factory).await;
6969
test_persistence_after_prune(&indexed_factory).await;
7070
test_read_by_position(&indexed_factory).await;
71+
test_read_many(&indexed_factory).await;
7172
test_read_out_of_range(&indexed_factory).await;
7273
test_read_after_prune(&indexed_factory).await;
7374
test_rewind_to_middle(&indexed_factory).await;
@@ -732,6 +733,26 @@ where
732733
journal.destroy().await.unwrap();
733734
}
734735

736+
/// Test reading multiple items by position.
737+
pub(super) async fn test_read_many<F, J>(factory: &F)
738+
where
739+
F: Fn(String) -> BoxFuture<'static, Result<J, Error>>,
740+
J: PersistableContiguous,
741+
{
742+
let mut journal = factory("read-many".into()).await.unwrap();
743+
744+
for i in 0..15u64 {
745+
journal.append(&(i * 100)).await.unwrap();
746+
}
747+
748+
let reader = journal.reader().await;
749+
let items = reader.read_many(&[1, 4, 12]).await.unwrap();
750+
assert_eq!(items, vec![100, 400, 1200]);
751+
drop(reader);
752+
753+
journal.destroy().await.unwrap();
754+
}
755+
735756
/// Test read errors for out-of-range positions.
736757
pub(super) async fn test_read_out_of_range<F, J>(factory: &F)
737758
where

0 commit comments

Comments
 (0)