Skip to content

Commit e4fb1c6

Browse files
nits
1 parent ce762fb commit e4fb1c6

2 files changed

Lines changed: 137 additions & 33 deletions

File tree

consensus/src/marshal/core/actor.rs

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,7 @@ use commonware_runtime::{
3838
telemetry::metrics::{Gauge, GaugeExt, MetricsExt as _},
3939
BufferPooler, Clock, ContextCell, Handle, Metrics, Spawner, Storage,
4040
};
41-
use commonware_storage::{
42-
archive::Identifier as ArchiveID,
43-
};
41+
use commonware_storage::archive::Identifier as ArchiveID;
4442
use commonware_utils::{
4543
acknowledgement::Exact,
4644
channel::{fallible::OneshotExt, oneshot},
@@ -184,9 +182,7 @@ where
184182
&config.partition_prefix,
185183
)
186184
.await;
187-
let last_processed_height = stream
188-
.processed_height()
189-
.unwrap_or_else(Height::zero);
185+
let last_processed_height = stream.processed_height().unwrap_or_else(Height::zero);
190186

191187
// Genesis is a local anchor. A floor finalization is verified and
192188
// resolved after `run` receives the resolver and buffer.
@@ -203,14 +199,8 @@ where
203199
}
204200
Start::Floor(finalization) => Some(finalization),
205201
};
206-
// A floor transition to height H is persisted as H - 1 until the
207-
// application acknowledges H. If that persisted cursor exists, looking
208-
// one height ahead restores the anchor's round across the crash window.
209-
let last_processed_round = Self::latest_processed_round(
210-
&finalizations_by_height,
211-
stream.next_height(),
212-
)
213-
.await;
202+
let last_processed_round =
203+
Self::latest_processed_round(&finalizations_by_height, last_processed_height).await;
214204

215205
// Create metrics
216206
let finalized_height = context.gauge("finalized_height", "Finalized height of application");

consensus/src/marshal/standard/mod.rs

Lines changed: 133 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ mod tests {
9696
acknowledgement::Exact,
9797
channel::{fallible::OneshotExt, oneshot, oneshot::error::TryRecvError},
9898
ordered::Set,
99+
sequence::U64,
99100
sync::Mutex,
100101
vec::NonEmptyVec,
101102
Acknowledgement as _, NZUsize, NZU16, NZU64,
@@ -442,6 +443,27 @@ mod tests {
442443
.expect("failed to sync seeded finalizations");
443444
}
444445

446+
async fn seed_processed_height(
447+
context: deterministic::Context,
448+
partition_prefix: &str,
449+
height: Height,
450+
) {
451+
let mut metadata: Metadata<deterministic::Context, U64, Height> = Metadata::init(
452+
context.child("seed_application_metadata"),
453+
metadata::Config {
454+
partition: format!("{partition_prefix}-application-metadata"),
455+
codec_config: (),
456+
},
457+
)
458+
.await
459+
.expect("failed to initialize application metadata for seeded restart state");
460+
metadata.put(U64::new(0xFF), height);
461+
metadata
462+
.sync()
463+
.await
464+
.expect("failed to sync seeded application metadata");
465+
}
466+
445467
// Writes a block directly into the cache's per-epoch notarized storage,
446468
// simulating a block that was notarized but never finalized before a crash.
447469
async fn seed_cache_block(
@@ -4911,12 +4933,101 @@ mod tests {
49114933
}
49124934

49134935
#[test_traced("WARN")]
4914-
fn test_standard_floor_round_restored_before_anchor_ack() {
4936+
fn test_standard_round_floor_does_not_restore_missing_next_block() {
49154937
let runner = deterministic::Runner::timed(Duration::from_secs(30));
49164938
runner.start(|mut context| async move {
49174939
let Fixture { schemes, .. } =
49184940
bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
4919-
let partition_prefix = "floor-round-before-anchor-ack";
4941+
let partition_prefix = "round-floor-missing-next-block";
4942+
4943+
let round = Round::new(Epoch::zero(), View::new(1));
4944+
let block = make_raw_block(Sha256::hash(b""), Height::new(1), 100);
4945+
let proposal = Proposal::new(round, View::zero(), StandardHarness::commitment(&block));
4946+
let notarization =
4947+
StandardHarness::make_notarization(proposal.clone(), &schemes, QUORUM);
4948+
let finalization = StandardHarness::make_finalization(proposal, &schemes, QUORUM);
4949+
4950+
seed_processed_height(context.child("metadata"), partition_prefix, Height::zero())
4951+
.await;
4952+
seed_inconsistent_restart_state(
4953+
context.child("storage"),
4954+
partition_prefix,
4955+
&[],
4956+
&[(Height::new(1), finalization)],
4957+
)
4958+
.await;
4959+
4960+
let (mailbox, _buffer, resolver, _actor_handle) = start_standard_actor(
4961+
context.child("validator"),
4962+
partition_prefix,
4963+
ConstantProvider::new(schemes[0].clone()),
4964+
Application::<B>::manual_ack(),
4965+
Some(RecordingBuffer::default()),
4966+
Start::Genesis(StandardHarness::genesis_block(NUM_VALIDATORS as u16)),
4967+
)
4968+
.await;
4969+
4970+
let mut subscription = mailbox.subscribe_by_commitment(
4971+
StandardHarness::commitment(&block),
4972+
CommitmentFallback::FetchByRound { round },
4973+
);
4974+
context.sleep(Duration::from_millis(100)).await;
4975+
assert!(
4976+
matches!(subscription.try_recv(), Err(TryRecvError::Empty)),
4977+
"missing next block must not advance the restored round floor"
4978+
);
4979+
wait_until(
4980+
&context,
4981+
Duration::from_secs(5),
4982+
"fetch-by-round after missing-next restart",
4983+
|| {
4984+
resolver.fetches().iter().any(|fetch| {
4985+
matches!(
4986+
(&fetch.key, &fetch.subscriber),
4987+
(
4988+
handler::Key::Notarized { round: request_round },
4989+
handler::Annotation::Notarization { round: subscriber_round },
4990+
) if *request_round == round && *subscriber_round == round
4991+
)
4992+
})
4993+
},
4994+
)
4995+
.await;
4996+
4997+
let (response, response_rx) = oneshot::channel();
4998+
assert!(resolver
4999+
.enqueue(handler::Message::Deliver {
5000+
delivery: Delivery {
5001+
key: handler::Key::Notarized { round },
5002+
subscribers: NonEmptyVec::new(handler::Annotation::Notarization { round }),
5003+
},
5004+
value: (notarization, block.clone()).encode(),
5005+
response,
5006+
})
5007+
.accepted());
5008+
assert!(
5009+
response_rx.await.expect("delivery response missing"),
5010+
"notarized delivery should validate"
5011+
);
5012+
select! {
5013+
result = subscription => {
5014+
let delivered = result.expect("block subscription should resolve");
5015+
assert_eq!(delivered.digest(), block.digest());
5016+
},
5017+
_ = context.sleep(Duration::from_secs(5)) => {
5018+
panic!("round-floor subscription did not resolve");
5019+
},
5020+
}
5021+
});
5022+
}
5023+
5024+
#[test_traced("WARN")]
5025+
fn test_standard_round_floor_does_not_restore_unacknowledged_anchor() {
5026+
let runner = deterministic::Runner::timed(Duration::from_secs(30));
5027+
runner.start(|mut context| async move {
5028+
let Fixture { schemes, .. } =
5029+
bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
5030+
let partition_prefix = "round-floor-before-anchor-ack";
49205031

49215032
let floor_round = Round::new(Epoch::zero(), View::new(5));
49225033
let floor_block = make_raw_block(Sha256::hash(b"floor-parent"), Height::new(5), 500);
@@ -4975,8 +5086,8 @@ mod tests {
49755086
.await;
49765087

49775088
let fetches_before = resolver.fetches().len();
4978-
let subscription = mailbox.subscribe_by_commitment(
4979-
Sha256::hash(b"missing-at-restored-floor-round"),
5089+
let mut subscription = mailbox.subscribe_by_commitment(
5090+
Sha256::hash(b"missing-before-anchor-ack"),
49805091
CommitmentFallback::FetchByRound { round: floor_round },
49815092
);
49825093
let barrier = make_raw_block(floor_block.digest(), Height::new(6), 600);
@@ -4986,22 +5097,25 @@ mod tests {
49865097
.await,
49875098
"barrier verification should be processed"
49885099
);
4989-
assert_eq!(
4990-
resolver.fetches().len(),
4991-
fetches_before,
4992-
"restart must restore the floor round before the anchor is acknowledged"
4993-
);
4994-
select! {
4995-
result = subscription => {
4996-
assert!(
4997-
result.is_err(),
4998-
"floor-round subscription should be canceled after restart"
4999-
);
5000-
},
5001-
_ = context.sleep(Duration::from_secs(5)) => {
5002-
panic!("floor-round subscription remained open after restart");
5100+
wait_until(
5101+
&context,
5102+
Duration::from_secs(5),
5103+
"round-bound fetch before anchor ack",
5104+
|| {
5105+
resolver.fetches().len() > fetches_before
5106+
&& resolver.fetches().iter().any(|fetch| {
5107+
matches!(
5108+
fetch.key,
5109+
handler::Key::Notarized { round } if round == floor_round
5110+
)
5111+
})
50035112
},
5004-
}
5113+
)
5114+
.await;
5115+
assert!(
5116+
matches!(subscription.try_recv(), Err(TryRecvError::Empty)),
5117+
"unacknowledged anchor round must remain subscribable after restart"
5118+
);
50055119
});
50065120
}
50075121

0 commit comments

Comments
 (0)