Skip to content

Commit 03c0661

Browse files
[marshal] Send Tip Updates (#2065)
1 parent 2b9aa00 commit 03c0661

5 files changed

Lines changed: 88 additions & 30 deletions

File tree

consensus/src/marshal/actor.rs

Lines changed: 28 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use super::{
1010
SchemeProvider,
1111
};
1212
use crate::{
13-
marshal::ingress::mailbox::Identifier as BlockID,
13+
marshal::{ingress::mailbox::Identifier as BlockID, Update},
1414
simplex::{
1515
signing_scheme::Scheme,
1616
types::{Finalization, Notarization},
@@ -35,7 +35,6 @@ use governor::clock::Clock as GClock;
3535
use prometheus_client::metrics::gauge::Gauge;
3636
use rand::{CryptoRng, Rng};
3737
use std::{
38-
cmp::max,
3938
collections::{btree_map::Entry, BTreeMap},
4039
time::Instant,
4140
};
@@ -95,6 +94,8 @@ pub struct Actor<
9594
// ---------- State ----------
9695
// Last view processed
9796
last_processed_round: Round,
97+
// Highest known finalized height
98+
tip: u64,
9899
// Outstanding subscriptions for blocks
99100
block_subscriptions: BTreeMap<B::Commitment, BlockSubscription<B>>,
100101

@@ -238,6 +239,7 @@ impl<
238239
block_codec_config: config.block_codec_config,
239240
partition_prefix: config.partition_prefix,
240241
last_processed_round: Round::new(0, 0),
242+
tip: 0,
241243
block_subscriptions: BTreeMap::new(),
242244
cache,
243245
finalizations_by_height,
@@ -252,7 +254,7 @@ impl<
252254
/// Start the actor.
253255
pub fn start<R, K>(
254256
mut self,
255-
application: impl Reporter<Activity = B>,
257+
application: impl Reporter<Activity = Update<B>>,
256258
buffer: buffered::Mailbox<K, B>,
257259
resolver: (mpsc::Receiver<handler::Message<B>>, R),
258260
) -> Handle<()>
@@ -266,7 +268,7 @@ impl<
266268
/// Run the application actor.
267269
async fn run<R, K>(
268270
mut self,
269-
application: impl Reporter<Activity = B>,
271+
mut application: impl Reporter<Activity = Update<B>>,
270272
mut buffer: buffered::Mailbox<K, B>,
271273
(mut resolver_rx, mut resolver): (mpsc::Receiver<handler::Message<B>>, R),
272274
) where
@@ -280,7 +282,7 @@ impl<
280282
let finalizer = Finalizer::new(
281283
self.context.with_label("finalizer"),
282284
format!("{}-finalizer", self.partition_prefix.clone()),
283-
application,
285+
application.clone(),
284286
orchestrator,
285287
notifier_rx,
286288
)
@@ -290,6 +292,14 @@ impl<
290292
// Create a local pool for waiter futures
291293
let mut waiters = AbortablePool::<(B::Commitment, B)>::default();
292294

295+
// Get tip and send to application
296+
let tip = self.get_latest().await;
297+
if let Some((height, commitment)) = tip {
298+
application.report(Update::Tip(height, commitment)).await;
299+
self.tip = height;
300+
self.finalized_height.set(height as i64);
301+
}
302+
293303
// Handle messages
294304
loop {
295305
// Remove any dropped subscribers. If all subscribers dropped, abort the waiter.
@@ -369,7 +379,7 @@ impl<
369379
if let Some(block) = self.find_block(&mut buffer, commitment).await {
370380
// If found, persist the block
371381
let height = block.height();
372-
self.finalize(height, commitment, block, Some(finalization), &mut notifier_tx).await;
382+
self.finalize(height, commitment, block, Some(finalization), &mut notifier_tx, &mut application).await;
373383
debug!(?round, height, "finalized block stored");
374384
} else {
375385
// Otherwise, fetch the block from the network.
@@ -504,7 +514,7 @@ impl<
504514
let commitment = cursor.parent();
505515
if let Some(block) = self.find_block(&mut buffer, commitment).await {
506516
let finalization = self.cache.get_finalization_for(commitment).await;
507-
self.finalize(block.height(), commitment, block.clone(), finalization, &mut notifier_tx).await;
517+
self.finalize(block.height(), commitment, block.clone(), finalization, &mut notifier_tx, &mut application).await;
508518
debug!(height = block.height(), "repaired block");
509519
cursor = block;
510520
} else {
@@ -595,7 +605,7 @@ impl<
595605
// Persist the block, also persisting the finalization if we have it
596606
let height = block.height();
597607
let finalization = self.cache.get_finalization_for(commitment).await;
598-
self.finalize(height, commitment, block, finalization, &mut notifier_tx).await;
608+
self.finalize(height, commitment, block, finalization, &mut notifier_tx, &mut application).await;
599609
debug!(?commitment, height, "received block");
600610
let _ = response.send(true);
601611
},
@@ -629,7 +639,7 @@ impl<
629639
// Valid finalization received
630640
debug!(height, "received finalization");
631641
let _ = response.send(true);
632-
self.finalize(height, block.commitment(), block, Some(finalization), &mut notifier_tx).await;
642+
self.finalize(height, block.commitment(), block, Some(finalization), &mut notifier_tx, &mut application).await;
633643
},
634644
Request::Notarized { round } => {
635645
let Some(scheme) = self.scheme_provider.scheme(round.epoch()) else {
@@ -670,7 +680,7 @@ impl<
670680
// the request for the block.
671681
let height = block.height();
672682
if let Some(finalization) = self.cache.get_finalization_for(commitment).await {
673-
self.finalize(height, commitment, block.clone(), Some(finalization), &mut notifier_tx).await;
683+
self.finalize(height, commitment, block.clone(), Some(finalization), &mut notifier_tx, &mut application).await;
674684
}
675685

676686
// Cache the notarization and block
@@ -746,6 +756,7 @@ impl<
746756
block: B,
747757
finalization: Option<Finalization<S, B::Commitment>>,
748758
notifier: &mut mpsc::Sender<()>,
759+
application: &mut impl Reporter<Activity = Update<B>>,
749760
) {
750761
self.notify_subscribers(commitment, &block).await;
751762

@@ -766,13 +777,15 @@ impl<
766777
panic!("failed to finalize: {e}");
767778
}
768779

769-
// Update metrics
770-
let new_value: i64 = height as i64;
771-
let old_value: i64 = self.finalized_height.get();
772-
self.finalized_height.set(max(new_value, old_value));
773-
774780
// Notify the finalizer
775781
let _ = notifier.try_send(());
782+
783+
// Update metrics and send tip update to application
784+
if height > self.tip {
785+
application.report(Update::Tip(height, commitment)).await;
786+
self.tip = height;
787+
self.finalized_height.set(height as i64);
788+
}
776789
}
777790

778791
/// Get the latest finalized block information (height and commitment tuple).

consensus/src/marshal/finalizer.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1-
use crate::{marshal::ingress::orchestrator::Orchestrator, Block, Reporter};
1+
use crate::{
2+
marshal::{ingress::orchestrator::Orchestrator, Update},
3+
Block, Reporter,
4+
};
25
use commonware_runtime::{spawn_cell, Clock, ContextCell, Handle, Metrics, Spawner, Storage};
36
use commonware_storage::metadata::{self, Metadata};
47
use commonware_utils::{fixed_bytes, sequence::FixedBytes};
@@ -13,7 +16,11 @@ const LATEST_KEY: FixedBytes<1> = fixed_bytes!("00");
1316
///
1417
/// Stores the highest height for which the application has processed. This allows resuming
1518
/// processing from the last processed height after a restart.
16-
pub struct Finalizer<B: Block, R: Spawner + Clock + Metrics + Storage, Z: Reporter<Activity = B>> {
19+
pub struct Finalizer<
20+
B: Block,
21+
R: Spawner + Clock + Metrics + Storage,
22+
Z: Reporter<Activity = Update<B>>,
23+
> {
1724
context: ContextCell<R>,
1825

1926
// Application that processes the finalized blocks.
@@ -29,7 +36,7 @@ pub struct Finalizer<B: Block, R: Spawner + Clock + Metrics + Storage, Z: Report
2936
metadata: Metadata<R, FixedBytes<1>, u64>,
3037
}
3138

32-
impl<B: Block, R: Spawner + Clock + Metrics + Storage, Z: Reporter<Activity = B>>
39+
impl<B: Block, R: Spawner + Clock + Metrics + Storage, Z: Reporter<Activity = Update<B>>>
3340
Finalizer<B, R, Z>
3441
{
3542
/// Initialize the finalizer.
@@ -89,7 +96,7 @@ impl<B: Block, R: Spawner + Clock + Metrics + Storage, Z: Reporter<Activity = B>
8996
// height is processed by the application), it is possible that the application may
9097
// be asked to process a block it has already seen (which it can simply ignore).
9198
let commitment = block.commitment();
92-
self.application.report(block).await;
99+
self.application.report(Update::Block(block)).await;
93100

94101
// Record that we have processed up through this height.
95102
latest = height;
Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::{Block, Reporter};
1+
use crate::{marshal::Update, Block, Reporter};
22
use std::{
33
collections::BTreeMap,
44
sync::{Arc, Mutex},
@@ -8,12 +8,15 @@ use std::{
88
#[derive(Clone)]
99
pub struct Application<B: Block> {
1010
blocks: Arc<Mutex<BTreeMap<u64, B>>>,
11+
#[allow(clippy::type_complexity)]
12+
tip: Arc<Mutex<Option<(u64, B::Commitment)>>>,
1113
}
1214

1315
impl<B: Block> Default for Application<B> {
1416
fn default() -> Self {
1517
Self {
1618
blocks: Default::default(),
19+
tip: Default::default(),
1720
}
1821
}
1922
}
@@ -23,15 +26,24 @@ impl<B: Block> Application<B> {
2326
pub fn blocks(&self) -> BTreeMap<u64, B> {
2427
self.blocks.lock().unwrap().clone()
2528
}
29+
30+
/// Returns the tip.
31+
pub fn tip(&self) -> Option<(u64, B::Commitment)> {
32+
*self.tip.lock().unwrap()
33+
}
2634
}
2735

2836
impl<B: Block> Reporter for Application<B> {
29-
type Activity = B;
37+
type Activity = Update<B>;
3038

3139
async fn report(&mut self, activity: Self::Activity) {
32-
self.blocks
33-
.lock()
34-
.unwrap()
35-
.insert(activity.height(), activity);
40+
match activity {
41+
Update::Block(block) => {
42+
self.blocks.lock().unwrap().insert(block.height(), block);
43+
}
44+
Update::Tip(height, commitment) => {
45+
*self.tip.lock().unwrap() = Some((height, commitment));
46+
}
47+
}
3648
}
3749
}

consensus/src/marshal/mod.rs

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ pub mod ingress;
6464
pub use ingress::mailbox::Mailbox;
6565
pub mod resolver;
6666

67-
use crate::{simplex::signing_scheme::Scheme, types::Epoch};
67+
use crate::{simplex::signing_scheme::Scheme, types::Epoch, Block};
6868
use std::sync::Arc;
6969

7070
/// Supplies the signing scheme the marshal should use for a given epoch.
@@ -76,6 +76,18 @@ pub trait SchemeProvider: Clone + Send + Sync + 'static {
7676
fn scheme(&self, epoch: Epoch) -> Option<Arc<Self::Scheme>>;
7777
}
7878

79+
/// An update reported to the application, either a new finalized tip or a finalized block.
80+
///
81+
/// Finalized tips are reported as soon as known, whether or not we hold all blocks up to that height.
82+
/// Finalized blocks are reported to the application in monotonically increasing order (no gaps permitted).
83+
#[derive(Clone, Debug)]
84+
pub enum Update<B: Block> {
85+
/// A new finalized tip.
86+
Tip(u64, B::Commitment),
87+
/// A new finalized block.
88+
Block(B),
89+
}
90+
7991
#[cfg(test)]
8092
pub mod mocks;
8193

@@ -417,6 +429,14 @@ mod tests {
417429
finished = false;
418430
break;
419431
}
432+
let Some((height, _)) = app.tip() else {
433+
finished = false;
434+
break;
435+
};
436+
if height < NUM_BLOCKS {
437+
finished = false;
438+
break;
439+
}
420440
}
421441
}
422442

@@ -859,7 +879,7 @@ mod tests {
859879
} = bls12381_threshold::<V, _>(&mut context, NUM_VALIDATORS);
860880

861881
let me = participants[0].clone();
862-
let (_application, mut actor) = setup_validator(
882+
let (application, mut actor) = setup_validator(
863883
context.with_label("validator-0"),
864884
&mut oracle,
865885
me,
@@ -870,6 +890,7 @@ mod tests {
870890
// Before any finalization, GetBlock::Latest should be None
871891
let latest_block = actor.get_block(Identifier::Latest).await;
872892
assert!(latest_block.is_none());
893+
assert!(application.tip().is_none());
873894

874895
// Finalize a block at height 1
875896
let parent = Sha256::hash(b"");
@@ -889,6 +910,7 @@ mod tests {
889910
let by_height = actor.get_block(1).await.expect("missing block by height");
890911
assert_eq!(by_height.height(), 1);
891912
assert_eq!(by_height.digest(), commitment);
913+
assert_eq!(application.tip(), Some((1, commitment)));
892914

893915
// Get by latest
894916
let by_latest = actor

examples/reshare/src/dkg/ingress.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
//! [Actor]: super::Actor
44
55
use crate::{application::Block, dkg::DealOutcome};
6-
use commonware_consensus::Reporter;
6+
use commonware_consensus::{marshal::Update, Reporter};
77
use commonware_cryptography::{bls12381::primitives::variant::Variant, Hasher, Signer};
88
use futures::{
99
channel::{mpsc, oneshot},
@@ -78,12 +78,16 @@ where
7878
C: Signer,
7979
V: Variant,
8080
{
81-
type Activity = Block<H, C, V>;
81+
type Activity = Update<Block<H, C, V>>;
8282

83-
async fn report(&mut self, block: Self::Activity) {
83+
async fn report(&mut self, update: Self::Activity) {
8484
let (sender, receiver) = oneshot::channel();
8585

8686
// Report the finalized block to the DKG actor on a best-effort basis.
87+
let Update::Block(block) = update else {
88+
// We ignore any other updates sent by marshal.
89+
return;
90+
};
8791
let _ = self
8892
.sender
8993
.send(Message::Finalized {

0 commit comments

Comments
 (0)