Skip to content

Commit 3988524

Browse files
committed
reader-map: Report Values timings to prometheus
Record the `Values` timings data as prometheus histograms. This CL captures the duration between updates to a given key, as well as the lifetime duration of the key in the `reader-map` before eviction. Fixes: REA-4639 Change-Id: If3e322ffe98306352ac9fc941fc9cdd1ed4e2db6 Release-Note-Core: Add metrics for updates and lifetimes of entries in the reader cache. Reviewed-on: https://gerrit.readyset.name/c/readyset/+/7883 Tested-by: Buildkite CI Reviewed-by: Sidney Cammeresi <[email protected]>
1 parent 235d632 commit 3988524

File tree

8 files changed

+94
-35
lines changed

8 files changed

+94
-35
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

reader-map/Cargo.toml

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,18 @@ authors = ["ReadySet Technology, Inc. <[email protected]>"]
66
edition = "2021"
77

88
[dependencies]
9-
triomphe = { workspace = true }
10-
smallvec = { workspace = true, features = ["union"] }
11-
rand = { workspace = true }
12-
left-right = { workspace = true }
139
itertools = { workspace = true }
10+
iter-enum = { workspace = true }
11+
left-right = { workspace = true }
12+
metrics = { workspace = true }
13+
rand = { workspace = true }
14+
smallvec = { workspace = true, features = ["union"] }
15+
thiserror = { workspace = true }
16+
triomphe = { workspace = true }
1417

18+
partial-map = { path = "../partial-map" }
1519
readyset-client = { path = "../readyset-client" }
1620
readyset-util = { path = "../readyset-util" }
17-
partial-map = { path = "../partial-map" }
18-
thiserror = { workspace = true }
19-
iter-enum = { workspace = true }
2021

2122
[dev-dependencies]
2223
proptest = { workspace = true }

reader-map/src/inner.rs

Lines changed: 42 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,14 @@ use std::hash::{BuildHasher, Hash};
55

66
use iter_enum::{ExactSizeIterator, Iterator};
77
use itertools::Either;
8+
use metrics::{register_histogram, Histogram};
89
use partial_map::PartialMap;
910
use readyset_client::internal::IndexType;
1011
use readyset_util::ranges::{Bound, RangeBounds};
1112

1213
use crate::eviction::{EvictionMeta, EvictionStrategy};
13-
use crate::values::Values;
14+
use crate::recorded::{READER_MAP_LIFETIMES, READER_MAP_UPDATES};
15+
use crate::values::{Metrics, Values};
1416

1517
/// Represents a miss when looking up a range.
1618
///
@@ -155,18 +157,15 @@ impl<K, V, S> Data<K, V, S> {
155157
}
156158
}
157159

158-
pub(crate) fn remove_range<R>(&mut self, range: R)
160+
pub(crate) fn remove_range<R, F>(&mut self, range: R, f: F)
159161
where
160162
R: RangeBounds<K>,
161163
K: Ord + Clone,
164+
F: Fn(&Metrics),
162165
{
163166
match self {
164167
Self::BTreeMap(map, ..) => {
165-
// Returns an iterator, but we don't actually care about the elements. Dropping
166-
// the iterator does not automatically remove the elements from the BTreeMap (as of
167-
// nightly 2023-06-16), so we need to iterate by collect()'ing them
168-
// first.
169-
let _: Vec<_> = map.remove_range(range).collect();
168+
map.remove_range(range).for_each(|(_, v)| f(v.metrics()));
170169
}
171170
Self::HashMap(..) => panic!("remove_range called on a HashMap reader_map"),
172171
}
@@ -312,6 +311,39 @@ where
312311
}
313312
}
314313

314+
#[derive(Clone)]
315+
pub(crate) struct WriteMetrics {
316+
// Captures the duration between updates at a given key (entry).
317+
entry_updated: Histogram,
318+
// Captures the lifetime of an entry, from the time it was first added
319+
// until it's eviction.
320+
lifetime_evict: Histogram,
321+
}
322+
323+
impl WriteMetrics {
324+
fn new() -> Self {
325+
let entry_updated = register_histogram!(READER_MAP_UPDATES);
326+
let lifetime_evict = register_histogram!(READER_MAP_LIFETIMES);
327+
328+
Self {
329+
entry_updated,
330+
lifetime_evict,
331+
}
332+
}
333+
334+
pub(crate) fn record_updated(&self, values: &Metrics) {
335+
if let Some(duration) = values.last_update_interval() {
336+
self.entry_updated.record(duration.as_millis() as f64);
337+
}
338+
}
339+
340+
pub(crate) fn record_evicted(&self, values: &Metrics) {
341+
if let Some(lifetime) = values.lifetime() {
342+
self.lifetime_evict.record(lifetime.as_millis() as f64);
343+
}
344+
}
345+
}
346+
315347
pub(crate) struct Inner<K, V, M, T, S, I> {
316348
pub(crate) data: Data<K, V, S>,
317349
pub(crate) meta: M,
@@ -320,6 +352,7 @@ pub(crate) struct Inner<K, V, M, T, S, I> {
320352
pub(crate) hasher: S,
321353
pub(crate) eviction_strategy: EvictionStrategy,
322354
pub(crate) insertion_order: Option<I>,
355+
pub(crate) metrics: WriteMetrics,
323356
}
324357

325358
impl<K, V, M, T, S, I> fmt::Debug for Inner<K, V, M, T, S, I>
@@ -358,6 +391,7 @@ where
358391
hasher: self.hasher.clone(),
359392
eviction_strategy: self.eviction_strategy.clone(),
360393
insertion_order: self.insertion_order.clone(),
394+
metrics: self.metrics.clone(),
361395
}
362396
}
363397
}
@@ -384,6 +418,7 @@ where
384418
hasher,
385419
eviction_strategy,
386420
insertion_order,
421+
metrics: WriteMetrics::new(),
387422
}
388423
}
389424

reader-map/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,7 @@ mod error;
241241
mod eviction;
242242
mod inner;
243243
mod read;
244+
mod recorded;
244245
mod values;
245246
mod write;
246247

reader-map/src/recorded.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
//! Documents the set of metrics that are currently being recorded within
2+
//! instances of a reader-map.
3+
4+
/// Histogram: The time interval in milliseconds between updates to a given key in the reader-map.
5+
/// Updates include inserts, updates, and deletions from the key's results set.
6+
///
7+
/// | Tag | Description |
8+
/// | --- | ----------- |
9+
/// | node_index | The node index of the reader node in the dataflow graph |
10+
pub const READER_MAP_UPDATES: &str = "readyset_reader.entry_updates_ms";
11+
12+
/// Histogram: The time in milliseconds that an entry was "live" in the `reader-map`, as measured
13+
/// from first insert to eviction.
14+
///
15+
/// | Tag | Description |
16+
/// | --- | ----------- |
17+
/// | node_index | The node index of the reader node in the dataflow graph |
18+
pub const READER_MAP_LIFETIMES: &str = "readyset_reader.entry_lifetimes_ms";

reader-map/src/values.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,10 @@ impl<T> Values<T> {
191191
{
192192
Arc::make_mut(&mut self.values.0).retain(f)
193193
}
194+
195+
pub(crate) fn metrics(&self) -> &Metrics {
196+
&self.metrics
197+
}
194198
}
195199

196200
impl<'a, T: 'a> IntoIterator for &'a Values<T> {

reader-map/src/write.rs

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,7 @@ where
384384
insertion_index,
385385
timestamp,
386386
} => {
387+
let metrics = self.metrics.clone();
387388
let values = self.data_entry(key.clone(), eviction_meta);
388389
// Always insert values in sorted order, even if no ordering method is provided,
389390
// otherwise it will require a linear scan to remove a value
@@ -395,6 +396,7 @@ where
395396
.unwrap_or_else(|i| i);
396397
values.insert(insert_idx, value.clone(), *timestamp);
397398
*insertion_index = Some(insert_idx);
399+
metrics.record_updated(values.metrics());
398400
}
399401
Operation::RemoveValue {
400402
key,
@@ -414,18 +416,33 @@ where
414416
e.remove(remove_idx, *timestamp);
415417
*removal_index = Some(remove_idx);
416418
}
419+
// removing a value from a key is just "updating" that key
420+
self.metrics.record_updated(e.metrics());
417421
}
418422
}
419423
Operation::AddRange(range) => self.data.add_range(range.clone()),
420424
Operation::AddFullRange => self.data.add_full_range(),
421425
Operation::Clear(key, eviction_meta) => {
422426
self.data_entry(key.clone(), eviction_meta).clear()
427+
// `clear()` is invoked on replay when filling a hole, will be followed
428+
// by a call to `add()`. hence don't capture metrics on clear().
423429
}
424430
Operation::RemoveEntry(key) => {
425-
self.data.remove(key);
431+
let v = self.data.remove(key);
432+
433+
// upstream deletes flow through `RemoveValue`, thus these removes are either
434+
// upstream evictions or a call to "mark a hole" (mostly likely an eviction).
435+
if let Some(values) = v {
436+
self.metrics.record_evicted(values.metrics());
437+
}
426438
}
427439
Operation::Purge => self.data.clear(),
428-
Operation::RemoveRange(range) => self.data.remove_range(range.clone()),
440+
Operation::RemoveRange(range) => {
441+
// RemoveRange is only called on evictions (via marking a hole).
442+
self.data.remove_range(range.clone(), |metrics| {
443+
self.metrics.record_evicted(metrics);
444+
});
445+
}
429446
Operation::Retain(key, predicate) => {
430447
if let Some(e) = self.data.get_mut(key) {
431448
let mut first = true;
@@ -503,7 +520,7 @@ where
503520
Operation::RemoveEntry(key) => {
504521
self.data.remove(&key);
505522
}
506-
Operation::RemoveRange(range) => self.data.remove_range(range),
523+
Operation::RemoveRange(range) => self.data.remove_range(range, |_| {}),
507524
Operation::Purge => self.data.clear(),
508525
Operation::Retain(key, mut predicate) => {
509526
if let Some(e) = self.data.get_mut(&key) {

reader-map/tests/lib.rs

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -187,24 +187,6 @@ fn mapref() {
187187
}
188188
}
189189

190-
#[test]
191-
#[cfg_attr(miri, ignore)]
192-
// https://github.com/rust-lang/miri/issues/658
193-
fn panicked_reader_doesnt_block_writer() {
194-
let (mut w, r) = reader_map::new();
195-
w.insert(1, "a");
196-
w.publish();
197-
198-
// reader panics
199-
let r = std::panic::catch_unwind(move || r.get(&1).map(|_| panic!()));
200-
r.unwrap_err();
201-
202-
// writer should still be able to continue
203-
w.insert(1, "b");
204-
w.publish();
205-
w.publish();
206-
}
207-
208190
#[test]
209191
fn read_after_drop() {
210192
let x = ('x', 42);

0 commit comments

Comments
 (0)