Skip to content

Commit eaa0c03

Browse files
[commonware-runtime/commonware-storage benches] reuse large journal across benchmarks, fix Runner naming (#816)
Co-authored-by: Patrick O'Grady <me@patrickogrady.xyz>
1 parent 1598eee commit eaa0c03

7 files changed

Lines changed: 65 additions & 64 deletions

File tree

runtime/src/benchmarks/tokio.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
//! Implements a [criterion]-compatible executor for the [tokio] runtime.
22
33
use super::context;
4-
use crate::{tokio, Runner};
4+
use crate::{tokio, Runner as _};
55
use criterion::async_executor::AsyncExecutor;
66
use futures::Future;
77

@@ -15,7 +15,7 @@ use futures::Future;
1515
/// use std::time::Duration;
1616
///
1717
/// fn my_benchmark(c: &mut Criterion) {
18-
/// let executor = tokio::Executor::default();
18+
/// let executor = tokio::Runner::default();
1919
/// c.bench_function("sleep_benchmark", |b| {
2020
/// b.to_async(&executor).iter_batched(|| (),
2121
/// |_| async {
@@ -28,28 +28,28 @@ use futures::Future;
2828
/// }
2929
/// ```
3030
#[derive(Clone)]
31-
pub struct Executor {
31+
pub struct Runner {
3232
cfg: tokio::Config,
3333
}
3434

35-
impl Executor {
35+
impl Runner {
3636
/// Create a new bencher with the given configuration
3737
pub fn new(cfg: tokio::Config) -> Self {
3838
Self { cfg }
3939
}
4040
}
4141

42-
impl Default for Executor {
42+
impl Default for Runner {
4343
fn default() -> Self {
4444
Self::new(tokio::Config::default())
4545
}
4646
}
4747

48-
impl AsyncExecutor for &Executor {
48+
impl AsyncExecutor for &Runner {
4949
fn block_on<T>(&self, future: impl Future<Output = T>) -> T {
50-
let executor = tokio::Runner::new(self.cfg.clone());
50+
let runner = tokio::Runner::new(self.cfg.clone());
5151

52-
let result = executor.start(|ctx| {
52+
let result = runner.start(|ctx| {
5353
// Create and store our context
5454
context::set(ctx);
5555

storage/src/journal/benches/bench.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,21 +16,25 @@ criterion_main!(
1616
fixed_replay::benches,
1717
);
1818

19-
/// Append `items_to_write` random items to a journal of items with ITEM_SIZE bytes each. The journal
20-
/// is configured to use `items_per_blob` items per blob.
21-
async fn append_random_data<const ITEM_SIZE: usize>(
19+
/// Open and return a temp journal with the given config parameters and items of size ITEM_SIZE.
20+
async fn get_journal<const ITEM_SIZE: usize>(
2221
context: Context,
2322
partition_name: &str,
2423
items_per_blob: u64,
25-
items_to_write: u64,
2624
) -> Journal<Context, FixedBytes<ITEM_SIZE>> {
2725
// Initialize the journal at the given partition.
2826
let journal_config = JConfig {
2927
partition: partition_name.to_string(),
3028
items_per_blob,
3129
};
32-
let mut journal = Journal::init(context, journal_config).await.unwrap();
30+
Journal::init(context, journal_config).await.unwrap()
31+
}
3332

33+
/// Append `items_to_write` random items to the given journal, syncing the changes before returning.
34+
async fn append_random_data<const ITEM_SIZE: usize>(
35+
journal: &mut Journal<Context, FixedBytes<ITEM_SIZE>>,
36+
items_to_write: u64,
37+
) {
3438
// Append `items_to_write` random items to the journal.
3539
let mut rng = StdRng::seed_from_u64(0);
3640
let mut arr = [0; ITEM_SIZE];
@@ -44,6 +48,4 @@ async fn append_random_data<const ITEM_SIZE: usize>(
4448

4549
// Sync the journal to ensure all data is written to disk.
4650
journal.sync().await.unwrap();
47-
48-
journal
4951
}

storage/src/journal/benches/fixed_append.rs

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::append_random_data;
1+
use crate::{append_random_data, get_journal};
22
use commonware_runtime::benchmarks::{context, tokio};
33
use criterion::{criterion_group, Criterion};
44
use std::time::{Duration, Instant};
@@ -13,7 +13,7 @@ const ITEMS_PER_BLOB: u64 = 10_000;
1313
const ITEM_SIZE: usize = 32;
1414

1515
fn bench_fixed_append(c: &mut Criterion) {
16-
let executor = tokio::Executor::default();
16+
let runner = tokio::Runner::default();
1717
for items_to_write in [1_000, 10_000, 100_000, 1_000_000] {
1818
c.bench_function(
1919
&format!(
@@ -23,18 +23,17 @@ fn bench_fixed_append(c: &mut Criterion) {
2323
ITEM_SIZE
2424
),
2525
|b| {
26-
b.to_async(&executor).iter_custom(|iters| async move {
26+
b.to_async(&runner).iter_custom(|iters| async move {
2727
let ctx = context::get::<commonware_runtime::tokio::Context>();
2828
let mut duration = Duration::ZERO;
2929
for _ in 0..iters {
30+
// Create a new journal for each iteration
31+
let mut j =
32+
get_journal::<ITEM_SIZE>(ctx.clone(), PARTITION, ITEMS_PER_BLOB).await;
33+
34+
// Append random data to the journal
3035
let start = Instant::now();
31-
let j = append_random_data::<ITEM_SIZE>(
32-
ctx.clone(),
33-
PARTITION,
34-
ITEMS_PER_BLOB,
35-
items_to_write,
36-
)
37-
.await;
36+
append_random_data(&mut j, items_to_write).await;
3837
duration += start.elapsed();
3938

4039
// Destroy the journal after appending to avoid polluting the next iteration

storage/src/journal/benches/fixed_read_random.rs

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1-
use super::append_random_data;
1+
use super::{append_random_data, get_journal};
22
use commonware_runtime::{
33
benchmarks::{context, tokio},
4-
tokio::Context,
4+
tokio::{Config, Context, Runner},
5+
Runner as _,
56
};
67
use commonware_storage::journal::fixed::Journal;
78
use commonware_utils::array::FixedBytes;
@@ -47,7 +48,21 @@ async fn bench_run_concurrent(
4748
}
4849

4950
fn bench_fixed_read_random(c: &mut Criterion) {
50-
let executor = tokio::Executor::default();
51+
// Create a config we can use across all benchmarks (with a fixed `storage_directory`), allowing the
52+
// same test file to be re-used.
53+
let cfg = Config::default();
54+
55+
// Generate a large temp journal with random data.
56+
let runner = Runner::new(cfg.clone());
57+
runner.start(|ctx| async move {
58+
// Create a large temp journal with random data.
59+
let mut j = get_journal(ctx, PARTITION, ITEMS_PER_BLOB).await;
60+
append_random_data::<ITEM_SIZE>(&mut j, ITEMS_TO_WRITE).await;
61+
j.close().await.unwrap();
62+
});
63+
64+
// Run the benchmarks
65+
let runner = tokio::Runner::new(cfg.clone());
5166
for mode in ["serial", "concurrent"] {
5267
for items_to_read in [100, 1_000, 10_000, 100_000] {
5368
c.bench_function(
@@ -59,18 +74,9 @@ fn bench_fixed_read_random(c: &mut Criterion) {
5974
ITEM_SIZE
6075
),
6176
|b| {
62-
b.to_async(&executor).iter_custom(|iters| async move {
63-
// Append random data to the journal
77+
b.to_async(&runner).iter_custom(|iters| async move {
6478
let ctx = context::get::<commonware_runtime::tokio::Context>();
65-
let j = append_random_data(
66-
ctx.clone(),
67-
PARTITION,
68-
ITEMS_PER_BLOB,
69-
ITEMS_TO_WRITE,
70-
)
71-
.await;
72-
73-
// Run the benchmark
79+
let j = get_journal(ctx.clone(), PARTITION, ITEMS_PER_BLOB).await;
7480
let mut duration = Duration::ZERO;
7581
for _ in 0..iters {
7682
let start = Instant::now();
@@ -81,16 +87,19 @@ fn bench_fixed_read_random(c: &mut Criterion) {
8187
}
8288
duration += start.elapsed();
8389
}
84-
85-
// Destroy the journal after reading to avoid polluting the next iteration
86-
j.destroy().await.unwrap();
87-
8890
duration
8991
});
9092
},
9193
);
9294
}
9395
}
96+
97+
// Clean up the temp journal
98+
let runner = Runner::new(cfg);
99+
runner.start(|context| async move {
100+
let j = get_journal::<ITEM_SIZE>(context, PARTITION, ITEMS_PER_BLOB).await;
101+
j.destroy().await.unwrap();
102+
});
94103
}
95104

96105
criterion_group! {

storage/src/journal/benches/fixed_read_sequential.rs

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use super::append_random_data;
1+
use super::{append_random_data, get_journal};
22
use commonware_runtime::{
33
benchmarks::{context, tokio},
44
tokio::Context,
@@ -27,21 +27,16 @@ async fn bench_run(journal: &Journal<Context, FixedBytes<ITEM_SIZE>>, items_to_r
2727
/// Benchmark the sequential read of items from a journal containing exactly that
2828
/// number of items.
2929
fn bench_fixed_read_sequential(c: &mut Criterion) {
30-
let executor = tokio::Executor::default();
30+
let runner = tokio::Runner::default();
3131
for items in [1_000, 10_000, 100_000, 500_000] {
3232
c.bench_function(
3333
&format!("{}/items={} size={}", module_path!(), items, ITEM_SIZE),
3434
|b| {
35-
b.to_async(&executor).iter_custom(|iters| async move {
35+
b.to_async(&runner).iter_custom(|iters| async move {
3636
// Append random data to the journal
3737
let ctx = context::get::<commonware_runtime::tokio::Context>();
38-
let j = append_random_data::<ITEM_SIZE>(
39-
ctx.clone(),
40-
PARTITION,
41-
ITEMS_PER_BLOB,
42-
items,
43-
)
44-
.await;
38+
let mut j = get_journal::<ITEM_SIZE>(ctx, PARTITION, ITEMS_PER_BLOB).await;
39+
append_random_data::<ITEM_SIZE>(&mut j, items).await;
4540
let sz = j.size().await.unwrap();
4641
assert_eq!(sz, items);
4742

storage/src/journal/benches/fixed_replay.rs

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use super::append_random_data;
1+
use super::{append_random_data, get_journal};
22
use commonware_runtime::{
33
benchmarks::{context, tokio},
44
tokio::Context,
@@ -20,7 +20,7 @@ const ITEM_SIZE: usize = 32;
2020

2121
/// Replay all items in the given `journal`.
2222
async fn bench_run(journal: &mut Journal<Context, FixedBytes<ITEM_SIZE>>, items_to_read: u64) {
23-
let concurrency = (items_to_read / ITEMS_PER_BLOB) as usize;
23+
let concurrency = std::cmp::max(1, (items_to_read / ITEMS_PER_BLOB) as usize);
2424
let stream = journal
2525
.replay(concurrency)
2626
.await
@@ -39,21 +39,16 @@ async fn bench_run(journal: &mut Journal<Context, FixedBytes<ITEM_SIZE>>, items_
3939
/// Benchmark the replaying of items from a journal containing exactly that
4040
/// number of items.
4141
fn bench_fixed_replay(c: &mut Criterion) {
42-
let executor = tokio::Executor::default();
42+
let runner = tokio::Runner::default();
4343
for items in [1_000, 10_000, 100_000, 500_000] {
4444
c.bench_function(
4545
&format!("{}/items={} size={}", module_path!(), items, ITEM_SIZE),
4646
|b| {
47-
b.to_async(&executor).iter_custom(|iters| async move {
47+
b.to_async(&runner).iter_custom(|iters| async move {
4848
// Append random data to the journal
4949
let ctx = context::get::<commonware_runtime::tokio::Context>();
50-
let mut j = append_random_data::<ITEM_SIZE>(
51-
ctx.clone(),
52-
PARTITION,
53-
ITEMS_PER_BLOB,
54-
items,
55-
)
56-
.await;
50+
let mut j = get_journal::<ITEM_SIZE>(ctx, PARTITION, ITEMS_PER_BLOB).await;
51+
append_random_data::<ITEM_SIZE>(&mut j, items).await;
5752
let sz = j.size().await.unwrap();
5853
assert_eq!(sz, items);
5954

storage/src/journal/fixed.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,7 @@ impl<E: Storage + Metrics, A: Array> Journal<E, A> {
346346
&mut self,
347347
concurrency: usize,
348348
) -> Result<impl Stream<Item = Result<(u64, A), Error>> + '_, Error> {
349+
assert!(concurrency > 0);
349350
// Collect all blobs to replay
350351
let mut blobs = Vec::with_capacity(self.blobs.len());
351352
let (newest_blob_index, _) = self.newest_blob();

0 commit comments

Comments
 (0)