Skip to content

Commit bfa4f8f

Browse files
[storage/archive] Add Benchmarks (#845)
1 parent 8adf7d3 commit bfa4f8f

11 files changed

Lines changed: 342 additions & 8 deletions

File tree

storage/Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,11 @@ name="bmt"
4545
harness = false
4646
path = "src/bmt/benches/bench.rs"
4747

48+
[[bench]]
49+
name="index"
50+
harness = false
51+
path = "src/index/benches/bench.rs"
52+
4853
[[bench]]
4954
name="journal"
5055
harness = false
Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,8 @@
11
use criterion::criterion_main;
22

3-
mod hashmap_insert;
4-
mod hashmap_insert_fixed;
5-
mod hashmap_iteration;
3+
mod get;
4+
mod put;
5+
mod restart;
6+
mod utils;
67

7-
criterion_main!(
8-
hashmap_iteration::benches,
9-
hashmap_insert_fixed::benches,
10-
hashmap_insert::benches,
11-
);
8+
criterion_main!(put::benches, get::benches, restart::benches);

storage/src/archive/benches/get.rs

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
//! Random key-lookup benchmark for Archive.
2+
3+
use super::utils::{append_random, get_archive, ArchiveType, Key};
4+
use commonware_runtime::{
5+
benchmarks::{context, tokio},
6+
tokio::Config,
7+
Runner,
8+
};
9+
use commonware_storage::archive::Identifier;
10+
use criterion::{black_box, criterion_group, Criterion};
11+
use futures::future::try_join_all;
12+
use rand::{rngs::StdRng, Rng, SeedableRng};
13+
use std::time::Instant;
14+
15+
/// Items pre-loaded into the archive.
16+
const ITEMS: u64 = 1_000_000;
17+
18+
fn select_keys(keys: &[Key], reads: usize) -> Vec<Key> {
19+
let mut rng = StdRng::seed_from_u64(42);
20+
let mut selected_keys = Vec::with_capacity(reads);
21+
for _ in 0..reads {
22+
selected_keys.push(keys[rng.gen_range(0..ITEMS as usize)].clone());
23+
}
24+
selected_keys
25+
}
26+
27+
fn select_indices(reads: usize) -> Vec<u64> {
28+
let mut rng = StdRng::seed_from_u64(42);
29+
let mut selected_indices = Vec::with_capacity(reads);
30+
for _ in 0..reads {
31+
selected_indices.push(rng.gen_range(0..ITEMS));
32+
}
33+
selected_indices
34+
}
35+
36+
async fn read_serial_keys(a: &ArchiveType, reads: &[Key]) {
37+
for k in reads {
38+
black_box(a.get(Identifier::Key(k)).await.unwrap().unwrap());
39+
}
40+
}
41+
42+
async fn read_serial_indices(a: &ArchiveType, indices: &[u64]) {
43+
for idx in indices {
44+
black_box(a.get(Identifier::Index(*idx)).await.unwrap().unwrap());
45+
}
46+
}
47+
48+
async fn read_concurrent_keys(a: &ArchiveType, reads: Vec<Key>) {
49+
let futures = reads.iter().map(|k| a.get(Identifier::Key(k)));
50+
black_box(try_join_all(futures).await.unwrap());
51+
}
52+
53+
async fn read_concurrent_indices(a: &ArchiveType, indices: &[u64]) {
54+
let mut futs = Vec::with_capacity(indices.len());
55+
for idx in indices {
56+
futs.push(a.get(Identifier::Index(*idx)));
57+
}
58+
black_box(try_join_all(futs).await.unwrap());
59+
}
60+
61+
fn bench_get(c: &mut Criterion) {
62+
// Create a config we can use across all benchmarks (with a fixed `storage_directory`).
63+
let cfg = Config::default();
64+
for compression in [None, Some(3)] {
65+
// Create a shared on-disk archive once so later setup is fast.
66+
let builder = commonware_runtime::tokio::Runner::new(cfg.clone());
67+
let keys = builder.start(|ctx| async move {
68+
let mut a = get_archive(ctx, compression).await;
69+
let keys = append_random(&mut a, ITEMS).await;
70+
a.close().await.unwrap();
71+
keys
72+
});
73+
74+
// Run the benchmarks.
75+
let runner = tokio::Runner::new(cfg.clone());
76+
for mode in ["serial", "concurrent"] {
77+
for pattern in ["key", "index"] {
78+
for reads in [1_000, 10_000, 100_000] {
79+
let label = format!(
80+
"{}/mode={} pattern={} comp={} reads={}",
81+
module_path!(),
82+
mode,
83+
pattern,
84+
compression
85+
.map(|l| l.to_string())
86+
.unwrap_or_else(|| "off".into()),
87+
reads
88+
);
89+
c.bench_function(&label, |b| {
90+
let keys = keys.clone();
91+
b.to_async(&runner).iter_custom(move |iters| {
92+
let keys = keys.clone();
93+
async move {
94+
let ctx = context::get::<commonware_runtime::tokio::Context>();
95+
let archive = get_archive(ctx, compression).await;
96+
if pattern == "key" {
97+
let selected_keys = select_keys(&keys, reads);
98+
let start = Instant::now();
99+
for _ in 0..iters {
100+
match mode {
101+
"serial" => {
102+
read_serial_keys(&archive, &selected_keys).await
103+
}
104+
"concurrent" => {
105+
read_concurrent_keys(
106+
&archive,
107+
selected_keys.clone(),
108+
)
109+
.await
110+
}
111+
_ => unreachable!(),
112+
}
113+
}
114+
start.elapsed()
115+
} else {
116+
let selected_indices = select_indices(reads);
117+
let start = Instant::now();
118+
for _ in 0..iters {
119+
match mode {
120+
"serial" => {
121+
read_serial_indices(&archive, &selected_indices)
122+
.await
123+
}
124+
"concurrent" => {
125+
read_concurrent_indices(&archive, &selected_indices)
126+
.await
127+
}
128+
_ => unreachable!(),
129+
}
130+
}
131+
start.elapsed()
132+
}
133+
}
134+
});
135+
});
136+
}
137+
}
138+
}
139+
140+
// Clean up shared artifacts.
141+
let cleaner = commonware_runtime::tokio::Runner::new(cfg.clone());
142+
cleaner.start(|ctx| async move {
143+
let a = get_archive(ctx, compression).await;
144+
a.destroy().await.unwrap();
145+
});
146+
}
147+
}
148+
149+
criterion_group! {
150+
name = benches;
151+
config = Criterion::default().sample_size(10);
152+
targets = bench_get
153+
}

storage/src/archive/benches/put.rs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
use super::utils::{append_random, get_archive};
2+
use commonware_runtime::benchmarks::{context, tokio};
3+
use criterion::{criterion_group, Criterion};
4+
use std::time::{Duration, Instant};
5+
6+
fn bench_put(c: &mut Criterion) {
7+
let runner = tokio::Runner::default();
8+
for compression in [None, Some(3)] {
9+
for items in [10_000, 100_000, 250_000] {
10+
let label = format!(
11+
"{}/items={} comp={}",
12+
module_path!(),
13+
items,
14+
compression
15+
.map(|l| l.to_string())
16+
.unwrap_or_else(|| "off".into()),
17+
);
18+
c.bench_function(&label, |b| {
19+
b.to_async(&runner).iter_custom(move |iters| async move {
20+
let ctx = context::get::<commonware_runtime::tokio::Context>();
21+
let mut total = Duration::ZERO;
22+
for _ in 0..iters {
23+
let mut archive = get_archive(ctx.clone(), compression).await;
24+
25+
let start = Instant::now();
26+
append_random(&mut archive, items).await;
27+
total += start.elapsed();
28+
29+
archive.destroy().await.unwrap();
30+
}
31+
total
32+
});
33+
});
34+
}
35+
}
36+
}
37+
38+
criterion_group! {
39+
name = benches;
40+
config = Criterion::default().sample_size(10);
41+
targets = bench_put
42+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
use super::utils::{append_random, get_archive};
2+
use commonware_runtime::{
3+
benchmarks::{context, tokio},
4+
tokio::Config,
5+
Runner,
6+
};
7+
use criterion::{criterion_group, Criterion};
8+
use std::time::{Duration, Instant};
9+
10+
fn bench_restart(c: &mut Criterion) {
11+
// Create a config we can use across all benchmarks (with a fixed `storage_directory`).
12+
let cfg = Config::default();
13+
for compression in [None, Some(3)] {
14+
for items in [10_000, 100_000, 250_000] {
15+
let builder = commonware_runtime::tokio::Runner::new(cfg.clone());
16+
builder.start(|ctx| async move {
17+
let mut a = get_archive(ctx, compression).await;
18+
append_random(&mut a, items).await;
19+
a.close().await.unwrap();
20+
});
21+
22+
// Run the benchmarks
23+
let runner = tokio::Runner::new(cfg.clone());
24+
c.bench_function(
25+
&format!(
26+
"{}/items={} comp={}",
27+
module_path!(),
28+
items,
29+
compression
30+
.map(|l| l.to_string())
31+
.unwrap_or_else(|| "off".into())
32+
),
33+
|b| {
34+
b.to_async(&runner).iter_custom(|iters| async move {
35+
let ctx = context::get::<commonware_runtime::tokio::Context>();
36+
let mut total = Duration::ZERO;
37+
for _ in 0..iters {
38+
let start = Instant::now();
39+
let a = get_archive(ctx.clone(), compression).await; // replay happens inside init
40+
total += start.elapsed();
41+
a.close().await.unwrap();
42+
}
43+
total
44+
});
45+
},
46+
);
47+
48+
// Tear down
49+
let cleaner = commonware_runtime::tokio::Runner::new(cfg.clone());
50+
cleaner.start(|ctx| async move {
51+
let a = get_archive(ctx, compression).await;
52+
a.destroy().await.unwrap();
53+
});
54+
}
55+
}
56+
}
57+
58+
criterion_group! {
59+
name = benches;
60+
config = Criterion::default().sample_size(10);
61+
targets = bench_restart
62+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
//! Helpers shared by the Archive benchmarks.
2+
3+
use commonware_runtime::tokio::Context;
4+
use commonware_storage::{
5+
archive::{Archive, Config},
6+
index::translator::TwoCap,
7+
};
8+
use commonware_utils::array::FixedBytes;
9+
use rand::{rngs::StdRng, RngCore, SeedableRng};
10+
11+
/// Partition used across all archive benchmarks.
12+
pub const PARTITION: &str = "archive_bench_partition";
13+
14+
/// Number of buffered writes before a forced sync.
15+
const PENDING_WRITES: usize = 1_000;
16+
17+
/// Section-mask that yields reasonably small blobs for local testing.
18+
const SECTION_MASK: u64 = 0xffff_ffff_ffff_ff00u64;
19+
20+
/// Fixed-length key and value types.
21+
pub type Key = FixedBytes<64>;
22+
pub type Val = FixedBytes<32>;
23+
24+
/// Concrete archive type reused by every benchmark.
25+
pub type ArchiveType = Archive<TwoCap, Context, Key, (), Val>;
26+
27+
/// Open (or create) a fresh archive with optional compression.
28+
///
29+
/// The caller is responsible for closing or destroying it.
30+
pub async fn get_archive(ctx: Context, compression: Option<u8>) -> ArchiveType {
31+
let cfg = Config {
32+
partition: PARTITION.into(),
33+
translator: TwoCap,
34+
compression,
35+
codec_config: (),
36+
section_mask: SECTION_MASK,
37+
pending_writes: PENDING_WRITES,
38+
replay_concurrency: 1,
39+
};
40+
Archive::init(ctx, cfg).await.unwrap()
41+
}
42+
43+
/// Append `count` random (index,key,value) triples and sync once.
44+
pub async fn append_random(archive: &mut ArchiveType, count: u64) -> Vec<Key> {
45+
let mut rng = StdRng::seed_from_u64(0);
46+
let mut key_buf = [0u8; 64];
47+
let mut val_buf = [0u8; 32];
48+
49+
let mut keys = Vec::with_capacity(count as usize);
50+
for i in 0..count {
51+
rng.fill_bytes(&mut key_buf);
52+
let key = Key::new(key_buf);
53+
keys.push(key.clone());
54+
rng.fill_bytes(&mut val_buf);
55+
archive.put(i, key, Val::new(val_buf)).await.unwrap();
56+
}
57+
archive.sync().await.unwrap();
58+
keys
59+
}

storage/src/archive/storage.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -417,4 +417,9 @@ impl<T: Translator, E: Storage + Metrics, K: Array, VC: CodecConfig + Copy, V: C
417417
pub async fn close(self) -> Result<(), Error> {
418418
self.journal.close().await.map_err(Error::Journal)
419419
}
420+
421+
/// Remove all on-disk data created by this `Archive`.
422+
pub async fn destroy(self) -> Result<(), Error> {
423+
self.journal.destroy().await.map_err(Error::Journal)
424+
}
420425
}

storage/src/index/benches/bench.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
use criterion::criterion_main;
2+
3+
mod hashmap_insert;
4+
mod hashmap_insert_fixed;
5+
mod hashmap_iteration;
6+
7+
criterion_main!(
8+
hashmap_iteration::benches,
9+
hashmap_insert_fixed::benches,
10+
hashmap_insert::benches,
11+
);
File renamed without changes.
File renamed without changes.

0 commit comments

Comments
 (0)