Skip to content

Commit 5a63d1f

Browse files
authored
[Storage] Add contiguous journal and QMDB metrics (#3721)
1 parent b155ef4 commit 5a63d1f

27 files changed

Lines changed: 1496 additions & 157 deletions

File tree

runtime/src/telemetry/metrics/histogram.rs

Lines changed: 89 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
//! Utilities for working with histograms.
22
3-
use super::{raw, Histogram};
4-
use crate::Clock;
5-
use std::time::SystemTime;
3+
use super::{raw, Histogram, MetricsExt as _};
4+
use crate::{Clock, Metrics};
5+
use std::{sync::Arc, time::SystemTime};
66

77
/// Convenience methods for Prometheus histograms.
88
pub trait HistogramExt {
@@ -101,3 +101,89 @@ impl Timer {
101101
self.histogram.observe_between(self.start, clock.current());
102102
}
103103
}
104+
105+
/// A timer guard that observes its duration when dropped.
106+
///
107+
/// Built on top of [`Timer`]. Useful for `?`-heavy async code where every early-return path
108+
/// would otherwise need to remember to call [`Timer::observe`]. Validation failures after the
109+
/// guard is created are still part of the recorded duration; if a code path should not record
110+
/// a sample, call [`ScopedTimer::cancel`] before the guard is dropped.
111+
pub struct ScopedTimer<C: Clock> {
112+
timer: Option<Timer>,
113+
clock: Arc<C>,
114+
}
115+
116+
impl<C: Clock> ScopedTimer<C> {
117+
/// Cancel the guard so it does not observe a sample on drop.
118+
pub fn cancel(mut self) {
119+
self.timer = None;
120+
}
121+
}
122+
123+
impl<C: Clock> Drop for ScopedTimer<C> {
124+
fn drop(&mut self) {
125+
if let Some(timer) = self.timer.take() {
126+
timer.observe(self.clock.as_ref());
127+
}
128+
}
129+
}
130+
131+
impl Timed {
132+
/// Start a timer guard that observes the elapsed duration when dropped.
133+
pub fn scoped<C: Clock>(&self, clock: &Arc<C>) -> ScopedTimer<C> {
134+
ScopedTimer {
135+
timer: Some(self.timer(clock.as_ref())),
136+
clock: clock.clone(),
137+
}
138+
}
139+
}
140+
141+
/// Register a duration histogram using [`Buckets::LOCAL`] (storage-style work).
142+
pub fn duration_histogram<M: Metrics>(
143+
context: &M,
144+
name: &'static str,
145+
help: &'static str,
146+
) -> Histogram {
147+
context.histogram(name, help, Buckets::LOCAL)
148+
}
149+
150+
#[cfg(test)]
151+
mod tests {
152+
use super::*;
153+
use crate::{deterministic, Runner as _, Supervisor as _};
154+
use std::time::Duration;
155+
156+
#[test]
157+
fn duration_records_all_calls() {
158+
deterministic::Runner::default().start(|context| async move {
159+
let histogram = duration_histogram(&context, "test_duration", "test duration");
160+
let timed = Timed::new(histogram);
161+
let clock = Arc::new(context.child("timer"));
162+
163+
{
164+
let _timer = timed.scoped(&clock);
165+
context.sleep(Duration::from_millis(1)).await;
166+
let result: Result<(), ()> = Ok(());
167+
assert!(result.is_ok());
168+
}
169+
170+
{
171+
let _timer = timed.scoped(&clock);
172+
context.sleep(Duration::from_millis(1)).await;
173+
let result: Result<(), ()> = Err(());
174+
assert!(result.is_err());
175+
}
176+
177+
{
178+
let _timer = timed.scoped(&clock);
179+
context.sleep(Duration::from_millis(1)).await;
180+
}
181+
182+
let metrics = context.encode();
183+
assert!(
184+
metrics.contains("test_duration_count 3"),
185+
"unexpected metrics: {metrics}"
186+
);
187+
});
188+
}
189+
}

storage/src/journal/contiguous/fixed.rs

Lines changed: 117 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@
5757
use super::Reader as _;
5858
use crate::{
5959
journal::{
60-
contiguous::{Many, Mutable},
60+
contiguous::{metrics::FixedMetrics as Metrics, Many, Mutable},
6161
segmented::fixed::{Config as SegmentedConfig, Journal as SegmentedJournal},
6262
Error,
6363
},
@@ -198,12 +198,16 @@ pub struct Journal<E: Context, A: CodecFixedShared> {
198198

199199
/// The maximum number of items per blob (section).
200200
items_per_blob: u64,
201+
202+
/// Metrics for monitoring journal state and activity.
203+
metrics: Metrics<E>,
201204
}
202205

203206
/// A reader guard that holds a consistent snapshot of the journal's bounds.
204207
pub struct Reader<'a, E: Context, A: CodecFixedShared> {
205208
guard: AsyncRwLockReadGuard<'a, Inner<E, A>>,
206209
items_per_blob: u64,
210+
metrics: &'a Metrics<E>,
207211
}
208212

209213
impl<E: Context, A: CodecFixedShared> super::Reader for Reader<'_, E, A> {
@@ -214,16 +218,27 @@ impl<E: Context, A: CodecFixedShared> super::Reader for Reader<'_, E, A> {
214218
}
215219

216220
async fn read(&self, pos: u64) -> Result<A, Error> {
217-
self.guard.read(pos, self.items_per_blob).await
221+
let _timer = self.metrics.read_timer();
222+
self.metrics.read_calls.inc();
223+
let result = match self.guard.read(pos, self.items_per_blob).await {
224+
Ok(item) => {
225+
self.metrics.items_read.inc();
226+
Ok(item)
227+
}
228+
Err(error) => Err(error),
229+
};
230+
result
218231
}
219232

220233
async fn read_many(&self, positions: &[u64]) -> Result<Vec<A>, Error> {
221234
if positions.is_empty() {
222235
return Ok(Vec::new());
223236
}
237+
let _timer = self.metrics.read_many_timer();
238+
self.metrics.read_many_calls.inc();
224239
debug_assert!(
225240
positions.windows(2).all(|w| w[0] < w[1]),
226-
"positions must be sorted and unique"
241+
"positions must be strictly increasing"
227242
);
228243
// Validate all positions.
229244
for &pos in positions {
@@ -259,8 +274,14 @@ impl<E: Context, A: CodecFixedShared> super::Reader for Reader<'_, E, A> {
259274
}
260275

261276
if miss_positions.is_empty() {
277+
self.metrics.record_cache_hits(positions.len() as u64);
278+
self.metrics.items_read.inc_by(positions.len() as u64);
262279
return Ok(result.into_iter().map(|r| r.unwrap()).collect());
263280
}
281+
self.metrics
282+
.record_cache_hits((positions.len() - miss_positions.len()) as u64);
283+
self.metrics
284+
.record_cache_misses(miss_positions.len() as u64);
264285

265286
// Phase 2: Read cache misses grouped by section (sequential).
266287
let mut reusable_buf = vec![0u8; miss_positions.len() * chunk_size];
@@ -308,11 +329,25 @@ impl<E: Context, A: CodecFixedShared> super::Reader for Reader<'_, E, A> {
308329
group_start = group_end;
309330
}
310331

332+
self.metrics.items_read.inc_by(positions.len() as u64);
311333
Ok(result.into_iter().map(|r| r.unwrap()).collect())
312334
}
313335

314336
fn try_read_sync(&self, pos: u64) -> Option<A> {
315-
self.guard.try_read_sync(pos, self.items_per_blob)
337+
self.guard
338+
.try_read_sync(pos, self.items_per_blob)
339+
.map_or_else(
340+
|| {
341+
self.metrics.record_cache_misses(1);
342+
None
343+
},
344+
|item| {
345+
self.metrics.record_cache_hits(1);
346+
self.metrics.try_read_sync_hits.inc();
347+
self.metrics.items_read.inc();
348+
Some(item)
349+
},
350+
)
316351
}
317352

318353
async fn replay(
@@ -467,6 +502,9 @@ impl<E: Context, A: CodecFixedShared> Journal<E, A> {
467502
let tail_section = size / items_per_blob;
468503
journal.ensure_section_exists(tail_section).await?;
469504

505+
let metrics = Metrics::new(context);
506+
metrics.update(size, pruning_boundary, items_per_blob);
507+
470508
Ok(Self {
471509
inner: UpgradableAsyncRwLock::new(Inner {
472510
journal,
@@ -475,6 +513,7 @@ impl<E: Context, A: CodecFixedShared> Journal<E, A> {
475513
pruning_boundary,
476514
}),
477515
items_per_blob,
516+
metrics,
478517
})
479518
}
480519

@@ -676,6 +715,9 @@ impl<E: Context, A: CodecFixedShared> Journal<E, A> {
676715
metadata.sync().await?;
677716
}
678717

718+
let metrics = Metrics::new(context);
719+
metrics.update(size, size, items_per_blob);
720+
679721
Ok(Self {
680722
inner: UpgradableAsyncRwLock::new(Inner {
681723
journal,
@@ -684,6 +726,7 @@ impl<E: Context, A: CodecFixedShared> Journal<E, A> {
684726
pruning_boundary: size, // No data exists yet
685727
}),
686728
items_per_blob,
729+
metrics,
687730
})
688731
}
689732

@@ -700,6 +743,8 @@ impl<E: Context, A: CodecFixedShared> Journal<E, A> {
700743
/// Only the tail section can have pending updates since historical sections are synced
701744
/// when they become full.
702745
pub async fn sync(&self) -> Result<(), Error> {
746+
let _timer = self.metrics.sync_timer();
747+
self.metrics.sync_calls.inc();
703748
// Serialize with append/prune/rewind to ensure section selection is stable, while still allowing
704749
// concurrent readers.
705750
let inner = self.inner.upgradable_read().await;
@@ -750,6 +795,7 @@ impl<E: Context, A: CodecFixedShared> Journal<E, A> {
750795
Reader {
751796
guard: self.inner.read().await,
752797
items_per_blob: self.items_per_blob,
798+
metrics: &self.metrics,
753799
}
754800
}
755801

@@ -762,7 +808,9 @@ impl<E: Context, A: CodecFixedShared> Journal<E, A> {
762808
/// Append a new item to the journal. Return the item's position in the journal, or error if the
763809
/// operation fails.
764810
pub async fn append(&self, item: &A) -> Result<u64, Error> {
765-
self.append_many(Many::Flat(std::slice::from_ref(item)))
811+
let _timer = self.metrics.append_timer();
812+
self.metrics.append_calls.inc();
813+
self.append_many_inner(Many::Flat(std::slice::from_ref(item)))
766814
.await
767815
}
768816

@@ -771,6 +819,13 @@ impl<E: Context, A: CodecFixedShared> Journal<E, A> {
771819
/// Acquires the write lock once for all items instead of per-item.
772820
/// Returns [Error::EmptyAppend] if items is empty.
773821
pub async fn append_many<'a>(&'a self, items: Many<'a, A>) -> Result<u64, Error> {
822+
let _timer = self.metrics.append_many_timer();
823+
self.metrics.append_many_calls.inc();
824+
self.append_many_inner(items).await
825+
}
826+
827+
// Shared implementation for `append` and `append_many`; public wrappers record metrics.
828+
async fn append_many_inner<'a>(&'a self, items: Many<'a, A>) -> Result<u64, Error> {
774829
if items.is_empty() {
775830
return Err(Error::EmptyAppend);
776831
}
@@ -825,6 +880,8 @@ impl<E: Context, A: CodecFixedShared> Journal<E, A> {
825880
}
826881
}
827882

883+
self.metrics
884+
.update(inner.size, inner.pruning_boundary, self.items_per_blob);
828885
Ok(inner.size - 1)
829886
}
830887

@@ -859,6 +916,8 @@ impl<E: Context, A: CodecFixedShared> Journal<E, A> {
859916

860917
inner.journal.rewind(section, byte_offset).await?;
861918
inner.size = size;
919+
self.metrics
920+
.update(inner.size, inner.pruning_boundary, self.items_per_blob);
862921

863922
Ok(())
864923
}
@@ -898,6 +957,8 @@ impl<E: Context, A: CodecFixedShared> Journal<E, A> {
898957
// Pruning boundary only moves forward
899958
assert!(inner.pruning_boundary < new_oldest * self.items_per_blob);
900959
inner.pruning_boundary = new_oldest * self.items_per_blob;
960+
self.metrics
961+
.update(inner.size, inner.pruning_boundary, self.items_per_blob);
901962
}
902963

903964
Ok(pruned)
@@ -947,6 +1008,8 @@ impl<E: Context, A: CodecFixedShared> Journal<E, A> {
9471008
inner.metadata.sync().await?;
9481009
}
9491010

1011+
self.metrics
1012+
.update(inner.size, inner.pruning_boundary, self.items_per_blob);
9501013
Ok(())
9511014
}
9521015

@@ -1060,7 +1123,7 @@ mod tests {
10601123
use commonware_macros::test_traced;
10611124
use commonware_runtime::{
10621125
deterministic::{self, Context},
1063-
Blob, BufferPooler, Error as RuntimeError, Runner, Storage, Supervisor as _,
1126+
Blob, BufferPooler, Error as RuntimeError, Metrics as _, Runner, Storage, Supervisor as _,
10641127
};
10651128
use commonware_utils::{NZUsize, NZU16, NZU64};
10661129
use futures::{pin_mut, StreamExt};
@@ -3090,4 +3153,52 @@ mod tests {
30903153
journal.destroy().await.unwrap();
30913154
});
30923155
}
3156+
3157+
#[test_traced]
3158+
fn test_fixed_journal_metrics() {
3159+
let executor = deterministic::Runner::default();
3160+
executor.start(|context| async move {
3161+
let cfg = test_cfg(&context, NZU64!(2));
3162+
let journal = Journal::<_, Digest>::init(context.child("fixed_metrics"), cfg.clone())
3163+
.await
3164+
.unwrap();
3165+
3166+
let items: Vec<_> = (0..5).map(test_digest).collect();
3167+
journal.append_many(Many::Flat(&items)).await.unwrap();
3168+
journal.append(&test_digest(5)).await.unwrap();
3169+
journal.sync().await.unwrap();
3170+
journal.reader().await.read(0).await.unwrap();
3171+
journal.reader().await.try_read_sync(0).unwrap();
3172+
journal.reader().await.read_many(&[1, 2, 4]).await.unwrap();
3173+
journal.prune(2).await.unwrap();
3174+
journal.rewind(4).await.unwrap();
3175+
3176+
let buffer = context.encode();
3177+
for expected in [
3178+
"fixed_metrics_size 4",
3179+
"fixed_metrics_pruning_boundary 2",
3180+
"fixed_metrics_retained 2",
3181+
"fixed_metrics_tail_items 2",
3182+
"fixed_metrics_append_calls_total 1",
3183+
"fixed_metrics_append_many_calls_total 1",
3184+
"fixed_metrics_read_calls_total 1",
3185+
"fixed_metrics_read_many_calls_total 1",
3186+
"fixed_metrics_try_read_sync_hits_total 1",
3187+
"fixed_metrics_items_read_total 5",
3188+
"fixed_metrics_sync_calls_total 1",
3189+
"fixed_metrics_append_duration_count 1",
3190+
"fixed_metrics_append_many_duration_count 1",
3191+
"fixed_metrics_read_duration_count 1",
3192+
"fixed_metrics_read_many_duration_count 1",
3193+
"fixed_metrics_sync_duration_count 1",
3194+
"fixed_metrics_cache_hits_total",
3195+
"fixed_metrics_cache_misses_total",
3196+
"fixed_metrics_blobs_tracked",
3197+
] {
3198+
assert!(buffer.contains(expected), "{expected}\n{buffer}");
3199+
}
3200+
3201+
journal.destroy().await.unwrap();
3202+
});
3203+
}
30933204
}

0 commit comments

Comments
 (0)