diff --git a/consensus/src/marshal/ancestry.rs b/consensus/src/marshal/ancestry.rs index 0c5c35c5d1..053758b605 100644 --- a/consensus/src/marshal/ancestry.rs +++ b/consensus/src/marshal/ancestry.rs @@ -2,6 +2,7 @@ use crate::{types::Height, Block, Heightable}; use commonware_cryptography::{Digest, Digestible}; +use commonware_runtime::{telemetry::metrics::histogram::Timed, Clock}; use futures::{ future::{BoxFuture, OptionFuture}, FutureExt, Stream, @@ -10,6 +11,7 @@ use pin_project::pin_project; use std::{ future::Future, pin::Pin, + sync::Arc, task::{Context, Poll}, }; @@ -72,23 +74,56 @@ impl ExpectedParent { } } +// Builds a pending parent fetch that records successful fetch latency and carries the +// expected relationship for validation when the parent is delivered. +fn timed_parent_fetch( + clock: &Arc, + marshal: &M, + child: &M::Block, + fetch_duration: &Timed, +) -> PendingFetch +where + C: Clock, + M: BlockProvider, +{ + let expected = ExpectedParent::from_child(child); + let timer = fetch_duration.timer(clock.as_ref()); + let clock = clock.clone(); + marshal + .subscribe_parent(child) + .map(move |parent| { + parent.map(|parent| { + timer.observe(clock.as_ref()); + (expected, parent) + }) + }) + .boxed() +} + /// Yields the ancestors of a block while prefetching parents, including the /// height-zero genesis block if it is available. #[pin_project] -pub struct AncestorStream { +pub struct AncestorStream { buffered: Vec, marshal: M, + fetch_duration: Timed, + clock: Arc, #[pin] pending: OptionFuture>, } -impl AncestorStream { +impl AncestorStream { /// Creates a new [AncestorStream] starting from the given ancestry. /// /// # Panics /// /// Panics if the initial blocks are not contiguous. - pub(crate) fn new(marshal: M, initial: impl IntoIterator) -> Self { + pub(crate) fn new( + clock: Arc, + marshal: M, + initial: impl IntoIterator, + fetch_duration: Timed, + ) -> Self { let mut buffered = initial.into_iter().collect::>(); buffered.sort_by_key(Heightable::height); @@ -109,6 +144,8 @@ impl AncestorStream { Self { marshal, buffered, + fetch_duration, + clock, pending: None.into(), } } @@ -120,18 +157,20 @@ impl AncestorStream { } } -impl Ancestry for AncestorStream +impl Ancestry for AncestorStream where M: BlockProvider, + C: Clock, { fn peek(&self) -> Option<&M::Block> { Self::peek(self) } } -impl Stream for AncestorStream +impl Stream for AncestorStream where M: BlockProvider, + C: Clock, { type Item = M::Block; @@ -146,12 +185,8 @@ where let should_walk_parent = height > END_BOUND; let end_of_buffered = this.buffered.is_empty(); if should_walk_parent && end_of_buffered { - let expected = ExpectedParent::from_child(&block); - let future = this - .marshal - .subscribe_parent(&block) - .map(move |parent| parent.map(|parent| (expected, parent))) - .boxed(); + let future = + timed_parent_fetch(this.clock, this.marshal, &block, this.fetch_duration); *this.pending.as_mut() = Some(future).into(); // Explicitly poll the next future to kick off the fetch. If it's already ready, @@ -185,12 +220,8 @@ where let height = block.height(); let should_walk_parent = height > END_BOUND; if should_walk_parent { - let expected = ExpectedParent::from_child(&block); - let future = this - .marshal - .subscribe_parent(&block) - .map(move |parent| parent.map(|parent| (expected, parent))) - .boxed(); + let future = + timed_parent_fetch(this.clock, this.marshal, &block, this.fetch_duration); *this.pending.as_mut() = Some(future).into(); // Explicitly poll the next future to kick off the fetch. If it's already ready, @@ -221,7 +252,14 @@ mod test { use super::*; use crate::marshal::mocks::block::Block; use commonware_cryptography::{sha256::Digest as Sha256Digest, Digest, Sha256}; - use commonware_macros::test_async; + use commonware_runtime::{ + deterministic, + telemetry::metrics::{ + histogram::{Buckets, Timed}, + MetricsExt as _, + }, + Runner as _, Supervisor as _, + }; use futures::StreamExt; #[derive(Default, Clone)] @@ -251,124 +289,170 @@ mod test { } } + fn timed(context: &deterministic::Context) -> Timed { + Timed::new(context.histogram( + "ancestor_fetch_duration", + "Histogram of time taken to fetch a block via the ancestry stream, in seconds", + Buckets::LOCAL, + )) + } + + fn stream( + context: &deterministic::Context, + marshal: M, + initial: impl IntoIterator, + ) -> AncestorStream + where + M: BlockProvider, + { + let stream_context = context.child("ancestor_stream"); + let fetch_duration = timed(&stream_context); + AncestorStream::new(Arc::new(stream_context), marshal, initial, fetch_duration) + } + #[test] #[should_panic = "initial blocks must be contiguous in height"] fn test_panics_on_non_contiguous_initial_blocks_height() { - AncestorStream::new( - MockProvider::default(), - vec![ - Block::new::((), Sha256Digest::EMPTY, Height::new(1), 1), - Block::new::((), Sha256Digest::EMPTY, Height::new(3), 3), - ], - ); + deterministic::Runner::default().start(|context| async move { + stream( + &context, + MockProvider::default(), + vec![ + Block::new::((), Sha256Digest::EMPTY, Height::new(1), 1), + Block::new::((), Sha256Digest::EMPTY, Height::new(3), 3), + ], + ); + }); } #[test] #[should_panic = "initial blocks must be contiguous in ancestry"] fn test_panics_on_non_contiguous_initial_blocks_digest() { - AncestorStream::new( - MockProvider::default(), - vec![ - Block::new::((), Sha256Digest::EMPTY, Height::new(1), 1), - Block::new::((), Sha256Digest::EMPTY, Height::new(2), 2), - ], - ); + deterministic::Runner::default().start(|context| async move { + stream( + &context, + MockProvider::default(), + vec![ + Block::new::((), Sha256Digest::EMPTY, Height::new(1), 1), + Block::new::((), Sha256Digest::EMPTY, Height::new(2), 2), + ], + ); + }); } #[test] #[should_panic = "fetched parent must be contiguous in height"] fn test_panics_on_non_contiguous_fetched_parent_height() { - let parent = Block::new::((), Sha256Digest::EMPTY, Height::zero(), 0); - let child = Block::new::((), parent.digest(), Height::new(3), 3); - let stream = AncestorStream::new(MockProvider(vec![parent]), [child]); - futures::pin_mut!(stream); - - let waker = futures::task::noop_waker_ref(); - let mut cx = std::task::Context::from_waker(waker); - let _ = futures::Stream::poll_next(stream.as_mut(), &mut cx); + deterministic::Runner::default().start(|context| async move { + let parent = Block::new::((), Sha256Digest::EMPTY, Height::zero(), 0); + let child = Block::new::((), parent.digest(), Height::new(3), 3); + let stream = stream(&context, MockProvider(vec![parent]), [child]); + futures::pin_mut!(stream); + + let waker = futures::task::noop_waker_ref(); + let mut cx = std::task::Context::from_waker(waker); + let _ = futures::Stream::poll_next(stream.as_mut(), &mut cx); + }); } #[test] #[should_panic = "fetched parent must be contiguous in ancestry"] fn test_panics_on_non_contiguous_fetched_parent_digest() { - let expected_parent = Block::new::((), Sha256Digest::EMPTY, Height::zero(), 0); - let fetched_parent = Block::new::((), Sha256Digest::EMPTY, Height::zero(), 1); - let child = Block::new::((), expected_parent.digest(), Height::new(1), 2); - let stream = AncestorStream::new(WrongParentProvider(fetched_parent), [child]); - futures::pin_mut!(stream); - - let waker = futures::task::noop_waker_ref(); - let mut cx = std::task::Context::from_waker(waker); - let _ = futures::Stream::poll_next(stream.as_mut(), &mut cx); + deterministic::Runner::default().start(|context| async move { + let expected_parent = Block::new::((), Sha256Digest::EMPTY, Height::zero(), 0); + let fetched_parent = Block::new::((), Sha256Digest::EMPTY, Height::zero(), 1); + let child = Block::new::((), expected_parent.digest(), Height::new(1), 2); + let stream = stream(&context, WrongParentProvider(fetched_parent), [child]); + futures::pin_mut!(stream); + + let waker = futures::task::noop_waker_ref(); + let mut cx = std::task::Context::from_waker(waker); + let _ = futures::Stream::poll_next(stream.as_mut(), &mut cx); + }); } #[test] fn test_peek_available_through_ancestry_trait() { - fn peek_height(ancestry: impl Ancestry>) -> Option { - ancestry.peek().map(Heightable::height) - } + deterministic::Runner::default().start(|context| async move { + fn peek_height(ancestry: impl Ancestry>) -> Option { + ancestry.peek().map(Heightable::height) + } - let block = Block::new::((), Sha256Digest::EMPTY, Height::new(1), 1); - let stream = AncestorStream::new(MockProvider::default(), [block.clone()]); - assert_eq!(peek_height(stream), Some(block.height())); + let block = Block::new::((), Sha256Digest::EMPTY, Height::new(1), 1); + let stream = stream(&context, MockProvider::default(), [block.clone()]); + assert_eq!(peek_height(stream), Some(block.height())); + }); } - #[test_async] - async fn test_yields_genesis_and_stops() { - let genesis = Block::new::((), Sha256Digest::EMPTY, Height::zero(), 0); - let child = Block::new::((), genesis.digest(), Height::new(1), 1); + #[test] + fn test_yields_genesis_and_stops() { + deterministic::Runner::default().start(|context| async move { + let genesis = Block::new::((), Sha256Digest::EMPTY, Height::zero(), 0); + let child = Block::new::((), genesis.digest(), Height::new(1), 1); - let provider = MockProvider(vec![genesis.clone()]); - let stream = AncestorStream::new(provider, [child.clone()]); + let provider = MockProvider(vec![genesis.clone()]); + let stream = stream(&context, provider, [child.clone()]); - let results = stream.collect::>().await; - assert_eq!(results, vec![child, genesis]); + let results = stream.collect::>().await; + assert_eq!(results, vec![child, genesis]); + }); } - #[test_async] - async fn test_empty_yields_none() { - let mut stream: AncestorStream = - AncestorStream::new(MockProvider::default(), vec![]); - assert_eq!(stream.next().await, None); + #[test] + fn test_empty_yields_none() { + deterministic::Runner::default().start(|context| async move { + let mut stream: AncestorStream = + stream(&context, MockProvider::default(), vec![]); + assert_eq!(stream.next().await, None); + }); } - #[test_async] - async fn test_yields_ancestors() { - let block1 = Block::new::((), Sha256Digest::EMPTY, Height::new(1), 1); - let block2 = Block::new::((), block1.digest(), Height::new(2), 2); - let block3 = Block::new::((), block2.digest(), Height::new(3), 3); + #[test] + fn test_yields_ancestors() { + deterministic::Runner::default().start(|context| async move { + let block1 = Block::new::((), Sha256Digest::EMPTY, Height::new(1), 1); + let block2 = Block::new::((), block1.digest(), Height::new(2), 2); + let block3 = Block::new::((), block2.digest(), Height::new(3), 3); - let provider = MockProvider(vec![block1.clone(), block2.clone()]); - let stream = AncestorStream::new(provider, [block3.clone()]); + let provider = MockProvider(vec![block1.clone(), block2.clone()]); + let stream = stream(&context, provider, [block3.clone()]); - let results = stream.collect::>().await; - assert_eq!(results, vec![block3, block2, block1]); + let results = stream.collect::>().await; + assert_eq!(results, vec![block3, block2, block1]); + }); } - #[test_async] - async fn test_yields_ancestors_all_buffered() { - let block1 = Block::new::((), Sha256Digest::EMPTY, Height::new(1), 1); - let block2 = Block::new::((), block1.digest(), Height::new(2), 2); - let block3 = Block::new::((), block2.digest(), Height::new(3), 3); - - let provider = MockProvider(vec![]); - let stream = - AncestorStream::new(provider, [block1.clone(), block2.clone(), block3.clone()]); + #[test] + fn test_yields_ancestors_all_buffered() { + deterministic::Runner::default().start(|context| async move { + let block1 = Block::new::((), Sha256Digest::EMPTY, Height::new(1), 1); + let block2 = Block::new::((), block1.digest(), Height::new(2), 2); + let block3 = Block::new::((), block2.digest(), Height::new(3), 3); + + let provider = MockProvider(vec![]); + let stream = stream( + &context, + provider, + [block1.clone(), block2.clone(), block3.clone()], + ); - let results = stream.collect::>().await; - assert_eq!(results, vec![block3, block2, block1]); + let results = stream.collect::>().await; + assert_eq!(results, vec![block3, block2, block1]); + }); } - #[test_async] - async fn test_missing_parent_ends_stream() { - let block1 = Block::new::((), Sha256Digest::EMPTY, Height::new(1), 1); - let block2 = Block::new::((), block1.digest(), Height::new(2), 2); - let block3 = Block::new::((), block2.digest(), Height::new(3), 3); + #[test] + fn test_missing_parent_ends_stream() { + deterministic::Runner::default().start(|context| async move { + let block1 = Block::new::((), Sha256Digest::EMPTY, Height::new(1), 1); + let block2 = Block::new::((), block1.digest(), Height::new(2), 2); + let block3 = Block::new::((), block2.digest(), Height::new(3), 3); - let provider = MockProvider(vec![block1]); - let stream = AncestorStream::new(provider, [block3.clone()]); + let provider = MockProvider(vec![block1]); + let stream = stream(&context, provider, [block3.clone()]); - let results = stream.collect::>().await; - assert_eq!(results, vec![block3]); + let results = stream.collect::>().await; + assert_eq!(results, vec![block3]); + }); } } diff --git a/consensus/src/marshal/coding/marshaled.rs b/consensus/src/marshal/coding/marshaled.rs index baa15c9d59..ffad8c58c5 100644 --- a/consensus/src/marshal/coding/marshaled.rs +++ b/consensus/src/marshal/coding/marshaled.rs @@ -176,6 +176,7 @@ where build_duration: Timed, verify_duration: Timed, proposal_parent_fetch_duration: Timed, + ancestor_fetch_duration: Timed, erasure_encode_duration: Timed, } @@ -203,6 +204,7 @@ where build_duration: self.build_duration.clone(), verify_duration: self.verify_duration.clone(), proposal_parent_fetch_duration: self.proposal_parent_fetch_duration.clone(), + ancestor_fetch_duration: self.ancestor_fetch_duration.clone(), erasure_encode_duration: self.erasure_encode_duration.clone(), } } @@ -260,6 +262,13 @@ where ); let proposal_parent_fetch_duration = Timed::new(parent_fetch_histogram); + let ancestor_fetch_histogram = context.histogram( + "ancestor_fetch_duration", + "Histogram of time taken to fetch a block via the ancestry stream, in seconds", + Buckets::LOCAL, + ); + let ancestor_fetch_duration = Timed::new(ancestor_fetch_histogram); + let erasure_histogram = context.histogram( "erasure_encode_duration", "Histogram of time taken to erasure encode a block, in seconds", @@ -280,6 +289,7 @@ where build_duration, verify_duration, proposal_parent_fetch_duration, + ancestor_fetch_duration, erasure_encode_duration, } } @@ -312,6 +322,7 @@ where let mut application = self.application.clone(); let epocher = self.epocher.clone(); let verify_duration = self.verify_duration.clone(); + let ancestor_fetch_duration = self.ancestor_fetch_duration.clone(); let (mut tx, rx) = oneshot::channel(); let context = self @@ -400,7 +411,11 @@ where return; } - let ancestry_stream = marshal.ancestor_stream([block.clone(), parent]); + let ancestry_stream = marshal.ancestor_stream( + Arc::new(runtime_context.child("ancestor_stream")), + [block.clone(), parent], + ancestor_fetch_duration, + ); let validity_request = application.verify( ( runtime_context.child("app_verify"), @@ -661,6 +676,7 @@ where // Metrics let build_duration = self.build_duration.clone(); let proposal_parent_fetch_duration = self.proposal_parent_fetch_duration.clone(); + let ancestor_fetch_duration = self.ancestor_fetch_duration.clone(); let erasure_encode_duration = self.erasure_encode_duration.clone(); let (mut tx, rx) = oneshot::channel(); @@ -770,7 +786,11 @@ where return; } - let ancestor_stream = marshal.ancestor_stream([parent]); + let ancestor_stream = marshal.ancestor_stream( + Arc::new(runtime_context.child("ancestor_stream")), + [parent], + ancestor_fetch_duration, + ); let build_request = application.propose( ( runtime_context.child("app_propose"), diff --git a/consensus/src/marshal/core/mailbox.rs b/consensus/src/marshal/core/mailbox.rs index f24c8ed95c..f2e6fc3228 100644 --- a/consensus/src/marshal/core/mailbox.rs +++ b/consensus/src/marshal/core/mailbox.rs @@ -14,8 +14,12 @@ use commonware_actor::{ }; use commonware_cryptography::{certificate::Scheme, Digestible}; use commonware_p2p::Recipients; +use commonware_runtime::{telemetry::metrics::histogram::Timed, Clock}; use commonware_utils::{channel::oneshot, vec::NonEmptyVec}; -use std::collections::{btree_map::Entry, BTreeMap, VecDeque}; +use std::{ + collections::{btree_map::Entry, BTreeMap, VecDeque}, + sync::Arc, +}; /// Messages sent to the marshal [Actor](super::Actor). /// @@ -535,15 +539,23 @@ impl Mailbox { /// parent's height from its child before issuing a height-bound request. /// /// Do not use this to wait for pending candidate proposal data. - pub(crate) fn ancestor_stream( + pub(crate) fn ancestor_stream( &self, + clock: Arc, initial: I, - ) -> impl Ancestry + use + fetch_duration: Timed, + ) -> impl Ancestry + use where Self: BlockProvider, I: IntoIterator, + C: Clock, { - AncestorStream::new(self.clone(), initial.into_iter().map(V::into_inner)) + AncestorStream::new( + clock, + self.clone(), + initial.into_iter().map(V::into_inner), + fetch_duration, + ) } /// Retrieve `(height, digest)` for a finalized block by height, digest, or latest. @@ -691,18 +703,21 @@ impl Mailbox { /// verify, certify, or repair from. It is not a candidate fetch path. /// /// If the starting block is not found, `None` is returned. - pub async fn ancestry( + pub async fn ancestry( &self, + clock: Arc, (fallback, start_digest): (DigestFallback, ::Digest), - ) -> Option + use> + fetch_duration: Timed, + ) -> Option + use> where Self: BlockProvider, + C: Clock, { let receiver = self.subscribe_by_digest(start_digest, fallback); receiver .await .ok() - .map(|block| self.ancestor_stream([block])) + .map(|block| self.ancestor_stream(clock, [block], fetch_duration)) } /// Returns the verified block previously persisted for `round`, if any. diff --git a/consensus/src/marshal/mocks/harness.rs b/consensus/src/marshal/mocks/harness.rs index e197c361f0..7c90ff4f73 100644 --- a/consensus/src/marshal/mocks/harness.rs +++ b/consensus/src/marshal/mocks/harness.rs @@ -38,7 +38,13 @@ use commonware_macros::select; use commonware_p2p::simulated::{self, Link, Network, Oracle}; use commonware_parallel::Sequential; use commonware_runtime::{ - buffer::paged::CacheRef, deterministic, Clock, Quota, Runner, Supervisor as _, + buffer::paged::CacheRef, + deterministic, + telemetry::metrics::{ + histogram::{Buckets, Timed}, + MetricsExt as _, + }, + Clock, Quota, Runner, Supervisor as _, }; use commonware_storage::{ archive::{immutable, prunable}, @@ -54,6 +60,7 @@ use std::{ collections::BTreeMap, future::Future, num::{NonZeroU16, NonZeroU32, NonZeroU64, NonZeroUsize}, + sync::Arc, time::{Duration, Instant}, }; use tracing::info; @@ -4812,9 +4819,18 @@ where // Stream from latest -> height 1 let (_, commitment) = handle.mailbox.get_info(Identifier::Latest).await.unwrap(); + let fetch_duration = Timed::new(context.histogram( + "ancestor_fetch_duration", + "Histogram of time taken to fetch a block via the ancestry stream, in seconds", + Buckets::LOCAL, + )); let ancestry = handle .mailbox - .ancestry((DigestFallback::Wait, commitment)) + .ancestry( + Arc::new(context.child("ancestor_stream")), + (DigestFallback::Wait, commitment), + fetch_duration, + ) .await .unwrap(); let blocks = ancestry.collect::>().await; diff --git a/consensus/src/marshal/standard/deferred.rs b/consensus/src/marshal/standard/deferred.rs index 748e414515..5edc0caf69 100644 --- a/consensus/src/marshal/standard/deferred.rs +++ b/consensus/src/marshal/standard/deferred.rs @@ -150,6 +150,7 @@ where build_duration: Timed, proposal_parent_fetch_duration: Timed, + ancestor_fetch_duration: Timed, } impl Clone for Deferred @@ -169,6 +170,7 @@ where verification_tasks: self.verification_tasks.clone(), build_duration: self.build_duration.clone(), proposal_parent_fetch_duration: self.proposal_parent_fetch_duration.clone(), + ancestor_fetch_duration: self.ancestor_fetch_duration.clone(), } } } @@ -195,6 +197,12 @@ where Buckets::LOCAL, ); let proposal_parent_fetch_duration = Timed::new(parent_fetch_histogram); + let ancestor_fetch_histogram = context.histogram( + "ancestor_fetch_duration", + "Histogram of time taken to fetch a block via the ancestry stream, in seconds", + Buckets::LOCAL, + ); + let ancestor_fetch_duration = Timed::new(ancestor_fetch_histogram); Self { context: Arc::new(AsyncMutex::new(context)), @@ -205,6 +213,7 @@ where build_duration, proposal_parent_fetch_duration, + ancestor_fetch_duration, } } @@ -227,6 +236,7 @@ where let mut marshal = self.marshal.clone(); let mut application = self.application.clone(); let (mut tx, rx) = oneshot::channel(); + let ancestor_fetch_duration = self.ancestor_fetch_duration.clone(); let runtime_context = self .context .lock() @@ -250,6 +260,7 @@ where &mut marshal, &mut tx, stage, + ancestor_fetch_duration, ) .await { @@ -449,6 +460,7 @@ where // Metrics let build_duration = self.build_duration.clone(); let proposal_parent_fetch_duration = self.proposal_parent_fetch_duration.clone(); + let ancestor_fetch_duration = self.ancestor_fetch_duration.clone(); let (mut tx, rx) = oneshot::channel(); let context = self @@ -555,7 +567,11 @@ where return; } - let ancestor_stream = marshal.ancestor_stream([parent]); + let ancestor_stream = marshal.ancestor_stream( + Arc::new(runtime_context.child("ancestor_stream")), + [parent], + ancestor_fetch_duration, + ); let build_request = application.propose( ( runtime_context.child("app_propose"), diff --git a/consensus/src/marshal/standard/inline.rs b/consensus/src/marshal/standard/inline.rs index e00843ea1b..b2bcc9ad65 100644 --- a/consensus/src/marshal/standard/inline.rs +++ b/consensus/src/marshal/standard/inline.rs @@ -144,6 +144,7 @@ where build_duration: Timed, proposal_parent_fetch_duration: Timed, + ancestor_fetch_duration: Timed, } impl Clone for Inline @@ -163,6 +164,7 @@ where available_blocks: self.available_blocks.clone(), build_duration: self.build_duration.clone(), proposal_parent_fetch_duration: self.proposal_parent_fetch_duration.clone(), + ancestor_fetch_duration: self.ancestor_fetch_duration.clone(), } } } @@ -191,6 +193,12 @@ where Buckets::LOCAL, ); let proposal_parent_fetch_duration = Timed::new(parent_fetch_histogram); + let ancestor_fetch_histogram = context.histogram( + "ancestor_fetch_duration", + "Histogram of time taken to fetch a block via the ancestry stream, in seconds", + Buckets::LOCAL, + ); + let ancestor_fetch_duration = Timed::new(ancestor_fetch_histogram); Self { context: Arc::new(AsyncMutex::new(context)), @@ -200,6 +208,7 @@ where available_blocks: Arc::new(Mutex::new(BTreeSet::new())), build_duration, proposal_parent_fetch_duration, + ancestor_fetch_duration, } } } @@ -230,6 +239,7 @@ where let epocher = self.epocher.clone(); let build_duration = self.build_duration.clone(); let proposal_parent_fetch_duration = self.proposal_parent_fetch_duration.clone(); + let ancestor_fetch_duration = self.ancestor_fetch_duration.clone(); let (mut tx, rx) = oneshot::channel(); let context = self @@ -321,7 +331,11 @@ where return; } - let ancestor_stream = marshal.ancestor_stream([parent]); + let ancestor_stream = marshal.ancestor_stream( + Arc::new(runtime_context.child("ancestor_stream")), + [parent], + ancestor_fetch_duration, + ); let build_request = application.propose( ( runtime_context.child("app_propose"), @@ -389,6 +403,7 @@ where let mut application = self.application.clone(); let epocher = self.epocher.clone(); let available_blocks = self.available_blocks.clone(); + let ancestor_fetch_duration = self.ancestor_fetch_duration.clone(); let (mut tx, rx) = oneshot::channel(); let runtime_context = self @@ -443,6 +458,7 @@ where &mut marshal, &mut tx, Stage::Verified, + ancestor_fetch_duration, ) .await { diff --git a/consensus/src/marshal/standard/validation.rs b/consensus/src/marshal/standard/validation.rs index 59343d3693..4561eae730 100644 --- a/consensus/src/marshal/standard/validation.rs +++ b/consensus/src/marshal/standard/validation.rs @@ -12,9 +12,10 @@ use crate::{ }; use commonware_cryptography::certificate::Scheme; use commonware_macros::select; -use commonware_runtime::{Clock, Metrics, Spawner}; +use commonware_runtime::{telemetry::metrics::histogram::Timed, Clock, Metrics, Spawner}; use commonware_utils::channel::oneshot; use rand::Rng; +use std::sync::Arc; use tracing::debug; /// Validation failures for standard verification. @@ -116,6 +117,7 @@ where /// - `Some(valid)` when a verification verdict is available. /// - `None` when work should stop early (e.g., receiver dropped or parent unavailable). #[inline] +#[allow(clippy::too_many_arguments)] pub(super) async fn verify_with_parent( runtime_context: E, context: Context, @@ -124,6 +126,7 @@ pub(super) async fn verify_with_parent( marshal: &mut Mailbox>, tx: &mut oneshot::Sender, stage: Stage, + ancestor_fetch_duration: Timed, ) -> Option where E: Rng + Spawner + Metrics + Clock, @@ -174,7 +177,11 @@ where } // Request verification from the application over the two-block ancestry prefix. - let ancestry_stream = marshal.ancestor_stream([block.clone(), parent]); + let ancestry_stream = marshal.ancestor_stream( + Arc::new(runtime_context.child("ancestor_stream")), + [block.clone(), parent], + ancestor_fetch_duration, + ); let validity_request = application.verify( (runtime_context.child("app_verify"), context.clone()), ancestry_stream,