Skip to content

Commit 1a5ab87

Browse files
committed
Merge remote-tracking branch 'origin/main' into danlaine/immutable-inactivity-floor
2 parents a112f07 + 598246c commit 1a5ab87

51 files changed

Lines changed: 9860 additions & 2132 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

consensus/fuzz/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -375,7 +375,7 @@ where
375375
propose_latency: (10.0, 5.0),
376376
verify_latency: (10.0, 5.0),
377377
certify_latency: (10.0, 5.0),
378-
should_certify: application::Certifier::Sometimes,
378+
should_certify: application::Certifier::Always,
379379
};
380380
let (actor, application) =
381381
application::Application::new(context.with_label("application"), app_cfg);
@@ -609,7 +609,7 @@ fn run_with_twin_mutator<P: simplex::Simplex>(input: FuzzInput) {
609609
propose_latency: (10.0, 5.0),
610610
verify_latency: (10.0, 5.0),
611611
certify_latency: (10.0, 5.0),
612-
should_certify: application::Certifier::Sometimes,
612+
should_certify: application::Certifier::Always,
613613
};
614614
let (actor, application) =
615615
application::Application::new(primary_context.with_label("application"), app_cfg);

consensus/src/lib.rs

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,13 @@ stability_scope!(BETA, cfg(not(target_arch = "wasm32")) {
9999
/// If it is possible to generate a payload, the Digest should be returned over the provided
100100
/// channel. If it is not possible to generate a payload, the channel can be dropped. If construction
101101
/// takes too long, the consensus engine may drop the provided proposal.
102+
///
103+
/// Returning a payload from `propose` commits the local proposer to verifying
104+
/// the same `(context, payload)`.
105+
///
106+
/// For [`CertifiableAutomaton`] implementations, returning a payload from
107+
/// `propose` also commits the local proposer to certifying that same
108+
/// `(round, payload)` if it later becomes notarized.
102109
fn propose(
103110
&mut self,
104111
context: Self::Context,
@@ -134,18 +141,12 @@ stability_scope!(BETA, cfg(not(target_arch = "wasm32")) {
134141
/// Determine whether a verified payload is safe to commit.
135142
///
136143
/// The round parameter identifies which consensus round is being certified, allowing
137-
/// applications to associate certification with the correct verification context.
138-
///
139-
/// Note: In applications where payloads incorporate the round number (recommended),
140-
/// each round will have a unique payload digest. However, the same payload may appear
141-
/// in multiple rounds when re-proposing notarized blocks at epoch boundaries or in
142-
/// integrations where payloads are round-agnostic.
143-
///
144-
/// This is particularly useful for applications that employ erasure coding, which
145-
/// can override this method to delay or prevent finalization until they have
146-
/// reconstructed and validated the full block (e.g., after receiving enough shards).
144+
/// applications to associate certification with the correct verification context. The
145+
/// same payload may appear in multiple rounds, so implementations must key any state
146+
/// on `(round, payload)` rather than `payload` alone.
147147
///
148-
/// Like [`Automaton::verify`], certification is single-shot for the given
148+
/// Like [`Automaton::verify`], payloads produced by [`Automaton::propose`] are certifiable-by-construction.
149+
/// Also like [`Automaton::verify`], certification is single-shot for the given
149150
/// `(round, payload)`. Once the returned channel resolves or closes, consensus treats
150151
/// certification as concluded and will not retry the same request.
151152
///
@@ -193,7 +194,7 @@ stability_scope!(BETA, cfg(not(target_arch = "wasm32")) {
193194
/// treat every broadcast identically can set this to `()`.
194195
type Plan: Send;
195196

196-
/// Broadcast a payload to the given recipients.
197+
/// Broadcast a payload according to the given plan.
197198
fn broadcast(
198199
&mut self,
199200
payload: Self::Digest,

consensus/src/marshal/application/validation.rs

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,37 @@
33
//! This module centralizes pure invariant checks shared across marshal verification
44
//! and certification flows.
55
6-
use crate::types::{Epoch, Epocher, Height, Round};
7-
use commonware_utils::sync::Mutex;
8-
use std::sync::Arc;
6+
use crate::{
7+
marshal::core::{Mailbox, Variant},
8+
types::{Epoch, Epocher, Height, Round},
9+
};
10+
use commonware_cryptography::certificate::Scheme;
911

10-
/// Cache for the last block built during proposal, shared between the
11-
/// proposer task and the broadcast path.
12-
pub(crate) type LastBuilt<B> = Arc<Mutex<Option<(Round, B)>>>;
12+
/// Which stage of verification a block has reached.
13+
///
14+
/// This is used to determine which marshal cache a block should be stored in.
15+
#[derive(Clone, Copy, Debug)]
16+
pub(crate) enum Stage {
17+
/// The block has been verified (store in `verified_blocks`).
18+
Verified,
19+
/// The block has been certified (store in `notarized_blocks`).
20+
Certified,
21+
}
22+
23+
impl Stage {
24+
/// Store `block` in the marshal cache for the provided stage.
25+
pub(crate) async fn store<S: Scheme, V: Variant>(
26+
self,
27+
marshal: &mut Mailbox<S, V>,
28+
round: Round,
29+
block: V::Block,
30+
) -> bool {
31+
match self {
32+
Self::Verified => marshal.verified(round, block).await,
33+
Self::Certified => marshal.certified(round, block).await,
34+
}
35+
}
36+
}
1337

1438
/// Returns true if the block is at an epoch boundary (last block in its epoch).
1539
#[inline]

consensus/src/marshal/application/verification_tasks.rs

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,3 +59,114 @@ where
5959
.retain(|(task_round, _), _| task_round > finalized_round);
6060
}
6161
}
62+
63+
#[cfg(test)]
64+
mod tests {
65+
use super::*;
66+
use crate::types::{Epoch, View};
67+
use commonware_cryptography::{sha256::Digest as Sha256Digest, Hasher, Sha256};
68+
69+
type D = Sha256Digest;
70+
71+
fn round(view: u64) -> Round {
72+
Round::new(Epoch::zero(), View::new(view))
73+
}
74+
75+
fn pending_task() -> oneshot::Receiver<bool> {
76+
let (_tx, rx) = oneshot::channel();
77+
rx
78+
}
79+
80+
#[test]
81+
fn test_insert_and_take_returns_task() {
82+
let tasks = VerificationTasks::<D>::new();
83+
let digest = Sha256::hash(b"block");
84+
tasks.insert(round(1), digest, pending_task());
85+
86+
assert!(tasks.take(round(1), digest).is_some());
87+
assert!(
88+
tasks.take(round(1), digest).is_none(),
89+
"taking twice should yield None"
90+
);
91+
}
92+
93+
#[test]
94+
fn test_take_absent_key_is_none() {
95+
let tasks = VerificationTasks::<D>::new();
96+
assert!(tasks.take(round(1), Sha256::hash(b"missing")).is_none());
97+
}
98+
99+
#[test]
100+
fn test_take_distinguishes_rounds_and_digests() {
101+
let tasks = VerificationTasks::<D>::new();
102+
let digest_a = Sha256::hash(b"a");
103+
let digest_b = Sha256::hash(b"b");
104+
tasks.insert(round(1), digest_a, pending_task());
105+
tasks.insert(round(2), digest_a, pending_task());
106+
tasks.insert(round(1), digest_b, pending_task());
107+
108+
assert!(tasks.take(round(1), digest_a).is_some());
109+
assert!(tasks.take(round(2), digest_a).is_some());
110+
assert!(tasks.take(round(1), digest_b).is_some());
111+
}
112+
113+
#[test]
114+
fn test_retain_after_drops_at_and_below_boundary() {
115+
let tasks = VerificationTasks::<D>::new();
116+
let digest = Sha256::hash(b"block");
117+
tasks.insert(round(1), digest, pending_task());
118+
tasks.insert(round(2), digest, pending_task());
119+
tasks.insert(round(3), digest, pending_task());
120+
121+
tasks.retain_after(&round(2));
122+
123+
assert!(
124+
tasks.take(round(1), digest).is_none(),
125+
"tasks strictly below boundary should be dropped"
126+
);
127+
assert!(
128+
tasks.take(round(2), digest).is_none(),
129+
"tasks at boundary should be dropped"
130+
);
131+
assert!(
132+
tasks.take(round(3), digest).is_some(),
133+
"tasks strictly above boundary should be retained"
134+
);
135+
}
136+
137+
#[test]
138+
fn test_retain_after_spans_epochs() {
139+
let tasks = VerificationTasks::<D>::new();
140+
let digest = Sha256::hash(b"block");
141+
let early = Round::new(Epoch::zero(), View::new(100));
142+
let late = Round::new(Epoch::new(1), View::zero());
143+
tasks.insert(early, digest, pending_task());
144+
tasks.insert(late, digest, pending_task());
145+
146+
tasks.retain_after(&early);
147+
148+
assert!(
149+
tasks.take(early, digest).is_none(),
150+
"task at boundary must be dropped"
151+
);
152+
assert!(
153+
tasks.take(late, digest).is_some(),
154+
"task in later epoch must outlive an earlier boundary"
155+
);
156+
}
157+
158+
#[test]
159+
fn test_retain_after_empty_map_is_noop() {
160+
let tasks = VerificationTasks::<D>::new();
161+
tasks.retain_after(&round(5));
162+
assert!(tasks.take(round(5), Sha256::hash(b"x")).is_none());
163+
}
164+
165+
#[test]
166+
fn test_default_matches_new() {
167+
let default = <VerificationTasks<D> as Default>::default();
168+
let digest = Sha256::hash(b"block");
169+
default.insert(round(1), digest, pending_task());
170+
assert!(default.take(round(1), digest).is_some());
171+
}
172+
}

0 commit comments

Comments
 (0)