Skip to content

Commit 7c63128

Browse files
committed
reader-map: Add unique identifier to metrics
Plumb through the global node index of the reader down to the reader-map metrics, and add it as a label to the histograms. This is the only identifier available when creating the reader node during migration/recovery. Change-Id: Ic6052036dc92d76d87f797c5af8b5aa42fbfa02c Reviewed-on: https://gerrit.readyset.name/c/readyset/+/7884 Tested-by: Buildkite CI Reviewed-by: Sidney Cammeresi <[email protected]>
1 parent 3988524 commit 7c63128

File tree

6 files changed

+97
-14
lines changed

6 files changed

+97
-14
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

reader-map/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ itertools = { workspace = true }
1010
iter-enum = { workspace = true }
1111
left-right = { workspace = true }
1212
metrics = { workspace = true }
13+
petgraph = { workspace = true }
1314
rand = { workspace = true }
1415
smallvec = { workspace = true, features = ["union"] }
1516
thiserror = { workspace = true }

reader-map/src/inner.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use iter_enum::{ExactSizeIterator, Iterator};
77
use itertools::Either;
88
use metrics::{register_histogram, Histogram};
99
use partial_map::PartialMap;
10+
use petgraph::graph::NodeIndex;
1011
use readyset_client::internal::IndexType;
1112
use readyset_util::ranges::{Bound, RangeBounds};
1213

@@ -321,9 +322,13 @@ pub(crate) struct WriteMetrics {
321322
}
322323

323324
impl WriteMetrics {
324-
fn new() -> Self {
325-
let entry_updated = register_histogram!(READER_MAP_UPDATES);
326-
let lifetime_evict = register_histogram!(READER_MAP_LIFETIMES);
325+
fn new(node_index: Option<NodeIndex>) -> Self {
326+
let idx = match node_index {
327+
Some(idx) => idx.index().to_string(),
328+
None => "-1".to_string(),
329+
};
330+
let entry_updated = register_histogram!(READER_MAP_UPDATES, "node_idx" => idx.clone());
331+
let lifetime_evict = register_histogram!(READER_MAP_LIFETIMES, "node_idx" => idx);
327332

328333
Self {
329334
entry_updated,
@@ -409,6 +414,7 @@ where
409414
hasher: S,
410415
eviction_strategy: EvictionStrategy,
411416
insertion_order: Option<I>,
417+
node_index: Option<NodeIndex>,
412418
) -> Self {
413419
Inner {
414420
data: Data::with_index_type_and_hasher(index_type, hasher.clone()),
@@ -418,7 +424,7 @@ where
418424
hasher,
419425
eviction_strategy,
420426
insertion_order,
421-
metrics: WriteMetrics::new(),
427+
metrics: WriteMetrics::new(node_index),
422428
}
423429
}
424430

reader-map/src/lib.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,7 @@ use std::hash::{BuildHasher, Hash};
230230

231231
pub use eviction::EvictionStrategy;
232232
use partial_map::InsertionOrder;
233+
use petgraph::graph::NodeIndex;
233234
use readyset_client::internal::IndexType;
234235

235236
use crate::inner::Inner;
@@ -277,6 +278,9 @@ pub struct Options<M, T, S, I> {
277278
capacity: Option<usize>,
278279
eviction_strategy: EvictionStrategy,
279280
insertion_order: Option<I>,
281+
282+
// The global index of the reader node that will hold this `reader-map`.
283+
node_index: Option<NodeIndex>,
280284
}
281285

282286
impl<M, T, S, I> fmt::Debug for Options<M, T, S, I>
@@ -291,6 +295,7 @@ where
291295
.field("timestamp", &self.timestamp)
292296
.field("capacity", &self.capacity)
293297
.field("order", &self.insertion_order)
298+
.field("node_index", &self.node_index)
294299
.finish_non_exhaustive()
295300
}
296301
}
@@ -305,6 +310,7 @@ impl Default for Options<(), (), RandomState, DefaultInsertionOrder> {
305310
capacity: None,
306311
eviction_strategy: Default::default(),
307312
insertion_order: None,
313+
node_index: None,
308314
}
309315
}
310316
}
@@ -320,6 +326,7 @@ impl<M, T, S, I> Options<M, T, S, I> {
320326
capacity: self.capacity,
321327
eviction_strategy: self.eviction_strategy,
322328
insertion_order: self.insertion_order,
329+
node_index: self.node_index,
323330
}
324331
}
325332

@@ -333,6 +340,7 @@ impl<M, T, S, I> Options<M, T, S, I> {
333340
capacity: self.capacity,
334341
eviction_strategy: self.eviction_strategy,
335342
insertion_order: self.insertion_order,
343+
node_index: self.node_index,
336344
}
337345
}
338346

@@ -346,6 +354,7 @@ impl<M, T, S, I> Options<M, T, S, I> {
346354
capacity: Some(capacity),
347355
eviction_strategy: self.eviction_strategy,
348356
insertion_order: self.insertion_order,
357+
node_index: self.node_index,
349358
}
350359
}
351360

@@ -359,6 +368,7 @@ impl<M, T, S, I> Options<M, T, S, I> {
359368
capacity: self.capacity,
360369
eviction_strategy: self.eviction_strategy,
361370
insertion_order: self.insertion_order,
371+
node_index: self.node_index,
362372
}
363373
}
364374

@@ -372,6 +382,7 @@ impl<M, T, S, I> Options<M, T, S, I> {
372382
capacity: self.capacity,
373383
eviction_strategy: self.eviction_strategy,
374384
insertion_order,
385+
node_index: self.node_index,
375386
}
376387
}
377388

@@ -387,6 +398,12 @@ impl<M, T, S, I> Options<M, T, S, I> {
387398
self
388399
}
389400

401+
/// Sets the eviction strategy for the map.
402+
pub fn with_node_index(mut self, node_index: NodeIndex) -> Self {
403+
self.node_index = Some(node_index);
404+
self
405+
}
406+
390407
/// Create the map, and construct the read and write handles used to access it.
391408
#[allow(clippy::type_complexity)]
392409
pub fn construct<K, V>(self) -> (WriteHandle<K, V, I, M, T, S>, ReadHandle<K, V, I, M, T, S>)
@@ -405,6 +422,7 @@ impl<M, T, S, I> Options<M, T, S, I> {
405422
self.hasher,
406423
self.eviction_strategy,
407424
self.insertion_order,
425+
self.node_index,
408426
);
409427

410428
let (mut w, r) = left_right::new_from_empty(inner);

readyset-dataflow/src/backlog/mod.rs

Lines changed: 60 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,16 @@ pub(crate) fn new(
3232
cols: usize,
3333
index: Index,
3434
reader_processing: ReaderProcessing,
35+
node_index: NodeIndex,
3536
) -> (SingleReadHandle, WriteHandle) {
36-
new_inner(cols, index, None, EvictionKind::Random, reader_processing)
37+
new_inner(
38+
cols,
39+
index,
40+
None,
41+
EvictionKind::Random,
42+
reader_processing,
43+
node_index,
44+
)
3745
}
3846

3947
/// Allocate a new partially materialized end-user facing result table.
@@ -55,6 +63,7 @@ pub(crate) fn new_partial<F>(
5563
trigger: F,
5664
eviction_kind: EvictionKind,
5765
reader_processing: ReaderProcessing,
66+
node_index: NodeIndex,
5867
) -> (SingleReadHandle, WriteHandle)
5968
where
6069
F: Fn(&mut dyn Iterator<Item = KeyComparison>, Relation) -> bool + 'static + Send + Sync,
@@ -65,6 +74,7 @@ where
6574
Some(Arc::new(trigger)),
6675
eviction_kind,
6776
reader_processing,
77+
node_index,
6878
)
6979
}
7080

@@ -85,6 +95,7 @@ fn new_inner(
8595
>,
8696
eviction_kind: EvictionKind,
8797
reader_processing: ReaderProcessing,
98+
node_index: NodeIndex,
8899
) -> (SingleReadHandle, WriteHandle) {
89100
let contiguous = {
90101
let mut contiguous = true;
@@ -117,6 +128,7 @@ fn new_inner(
117128
use reader_map;
118129
let (mut w, r) = reader_map::Options::default()
119130
.with_meta(-1)
131+
.with_node_index(node_index)
120132
.with_timestamp(Timestamp::default())
121133
.with_hasher(RandomState::default())
122134
.with_index_type(index.index_type)
@@ -615,7 +627,12 @@ mod tests {
615627
fn store_works() {
616628
let a = vec![1i32.into(), "a".into()].into_boxed_slice();
617629

618-
let (r, mut w) = new(2, Index::hash_map(vec![0]), ReaderProcessing::default());
630+
let (r, mut w) = new(
631+
2,
632+
Index::hash_map(vec![0]),
633+
ReaderProcessing::default(),
634+
Default::default(),
635+
);
619636

620637
w.swap();
621638

@@ -639,7 +656,12 @@ mod tests {
639656
use std::thread;
640657

641658
let n = 1_000;
642-
let (r, mut w) = new(1, Index::hash_map(vec![0]), ReaderProcessing::default());
659+
let (r, mut w) = new(
660+
1,
661+
Index::hash_map(vec![0]),
662+
ReaderProcessing::default(),
663+
Default::default(),
664+
);
643665
let jh = thread::spawn(move || {
644666
for i in 0..n {
645667
w.add(vec![Record::Positive(vec![i.into()])]);
@@ -668,7 +690,12 @@ mod tests {
668690
let a = vec![1i32.into(), "a".into()].into_boxed_slice();
669691
let b = vec![1i32.into(), "b".into()].into_boxed_slice();
670692

671-
let (r, mut w) = new(2, Index::hash_map(vec![0]), ReaderProcessing::default());
693+
let (r, mut w) = new(
694+
2,
695+
Index::hash_map(vec![0]),
696+
ReaderProcessing::default(),
697+
Default::default(),
698+
);
672699
w.add(vec![Record::Positive(a.to_vec())]);
673700
w.swap();
674701
w.add(vec![Record::Positive(b.to_vec())]);
@@ -683,7 +710,12 @@ mod tests {
683710
let b = vec![1i32.into(), "b".into()].into_boxed_slice();
684711
let c = vec![1i32.into(), "c".into()].into_boxed_slice();
685712

686-
let (r, mut w) = new(2, Index::hash_map(vec![0]), ReaderProcessing::default());
713+
let (r, mut w) = new(
714+
2,
715+
Index::hash_map(vec![0]),
716+
ReaderProcessing::default(),
717+
Default::default(),
718+
);
687719
w.add(vec![Record::Positive(a.to_vec())]);
688720
w.add(vec![Record::Positive(b.to_vec())]);
689721
w.swap();
@@ -699,7 +731,12 @@ mod tests {
699731
let a = vec![1i32.into(), "a".into()].into_boxed_slice();
700732
let b = vec![1i32.into(), "b".into()].into_boxed_slice();
701733

702-
let (r, mut w) = new(2, Index::hash_map(vec![0]), ReaderProcessing::default());
734+
let (r, mut w) = new(
735+
2,
736+
Index::hash_map(vec![0]),
737+
ReaderProcessing::default(),
738+
Default::default(),
739+
);
703740
w.add(vec![Record::Positive(a.to_vec())]);
704741
w.add(vec![Record::Positive(b.to_vec())]);
705742
w.add(vec![Record::Negative(a.to_vec())]);
@@ -714,7 +751,12 @@ mod tests {
714751
let a = vec![1i32.into(), "a".into()].into_boxed_slice();
715752
let b = vec![1i32.into(), "b".into()].into_boxed_slice();
716753

717-
let (r, mut w) = new(2, Index::hash_map(vec![0]), ReaderProcessing::default());
754+
let (r, mut w) = new(
755+
2,
756+
Index::hash_map(vec![0]),
757+
ReaderProcessing::default(),
758+
Default::default(),
759+
);
718760
w.add(vec![Record::Positive(a.to_vec())]);
719761
w.add(vec![Record::Positive(b.to_vec())]);
720762
w.swap();
@@ -731,7 +773,12 @@ mod tests {
731773
let b = vec![1i32.into(), "b".into()].into_boxed_slice();
732774
let c = vec![1i32.into(), "c".into()].into_boxed_slice();
733775

734-
let (r, mut w) = new(2, Index::hash_map(vec![0]), ReaderProcessing::default());
776+
let (r, mut w) = new(
777+
2,
778+
Index::hash_map(vec![0]),
779+
ReaderProcessing::default(),
780+
Default::default(),
781+
);
735782
w.add(vec![
736783
Record::Positive(a.to_vec()),
737784
Record::Positive(b.to_vec()),
@@ -761,6 +808,7 @@ mod tests {
761808
|_: &mut dyn Iterator<Item = KeyComparison>, _| true,
762809
EvictionKind::Random,
763810
ReaderProcessing::default(),
811+
Default::default(),
764812
);
765813
w.swap();
766814

@@ -786,6 +834,7 @@ mod tests {
786834
|_: &mut dyn Iterator<Item = KeyComparison>, _| true,
787835
EvictionKind::Random,
788836
ReaderProcessing::default(),
837+
Default::default(),
789838
);
790839
w.swap();
791840

@@ -805,6 +854,7 @@ mod tests {
805854
|_: &mut dyn Iterator<Item = KeyComparison>, _| true,
806855
EvictionKind::Random,
807856
ReaderProcessing::default(),
857+
Default::default(),
808858
);
809859
w.swap();
810860

@@ -835,6 +885,7 @@ mod tests {
835885
|_: &mut dyn Iterator<Item = KeyComparison>, _| true,
836886
EvictionKind::Random,
837887
ReaderProcessing::default(),
888+
Default::default(),
838889
);
839890
w.swap();
840891

@@ -856,6 +907,7 @@ mod tests {
856907
|_: &mut dyn Iterator<Item = KeyComparison>, _| true,
857908
EvictionKind::Random,
858909
ReaderProcessing::default(),
910+
Default::default(),
859911
);
860912
w.swap();
861913

readyset-dataflow/src/domain/mod.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1846,6 +1846,7 @@ impl Domain {
18461846
},
18471847
self.eviction_kind,
18481848
r.reader_processing().clone(),
1849+
node_index,
18491850
);
18501851

18511852
let shard = *self.shard.as_ref().unwrap_or(&0);
@@ -1895,8 +1896,12 @@ impl Domain {
18951896
expected_type: NodeType::Reader,
18961897
})?;
18971898

1898-
let (r_part, w_part) =
1899-
backlog::new(num_columns, index, r.reader_processing().clone());
1899+
let (r_part, w_part) = backlog::new(
1900+
num_columns,
1901+
index,
1902+
r.reader_processing().clone(),
1903+
node_index,
1904+
);
19001905

19011906
let shard = *self.shard.as_ref().unwrap_or(&0);
19021907
// TODO(ENG-838): Don't recreate every single node on leader failure.

0 commit comments

Comments
 (0)