Skip to content

Commit 22c45c6

Browse files
committed
preserve commitment-based walkback for coding mailbox BlockProvider
1 parent 4c470f7 commit 22c45c6

2 files changed

Lines changed: 65 additions & 6 deletions

File tree

consensus/src/marshal/coding/mod.rs

Lines changed: 50 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ mod tests {
104104
use commonware_parallel::Sequential;
105105
use commonware_resolver::{Delivery, Fetch, Resolver};
106106
use commonware_runtime::{
107-
buffer::paged::CacheRef, deterministic, Clock, Metrics, Runner, Supervisor as _,
107+
buffer::paged::CacheRef, deterministic, Clock, Metrics, Runner, Spawner, Supervisor as _,
108108
};
109109
use commonware_storage::archive::immutable;
110110
use commonware_utils::{
@@ -125,13 +125,18 @@ mod tests {
125125
/// A coding buffer that records subscriptions and never resolves them.
126126
#[derive(Clone, Default)]
127127
struct RecordingCodingBuffer {
128-
subscriptions: Arc<Mutex<Vec<oneshot::Sender<TestCodedBlock>>>>,
128+
digest_subscriptions: Arc<Mutex<Vec<oneshot::Sender<TestCodedBlock>>>>,
129+
commitment_subscriptions: Arc<Mutex<Vec<oneshot::Sender<TestCodedBlock>>>>,
129130
sends: Arc<Mutex<Vec<CodingSendRecord>>>,
130131
}
131132

132133
impl RecordingCodingBuffer {
133134
fn subscription_count(&self) -> usize {
134-
self.subscriptions.lock().len()
135+
self.digest_subscriptions.lock().len() + self.commitment_subscriptions.lock().len()
136+
}
137+
138+
fn commitment_subscription_count(&self) -> usize {
139+
self.commitment_subscriptions.lock().len()
135140
}
136141
}
137142

@@ -148,7 +153,7 @@ mod tests {
148153

149154
fn subscribe_by_digest(&self, _digest: D) -> oneshot::Receiver<TestCodedBlock> {
150155
let (sender, receiver) = oneshot::channel();
151-
self.subscriptions.lock().push(sender);
156+
self.digest_subscriptions.lock().push(sender);
152157
receiver
153158
}
154159

@@ -157,7 +162,7 @@ mod tests {
157162
_commitment: Commitment,
158163
) -> oneshot::Receiver<TestCodedBlock> {
159164
let (sender, receiver) = oneshot::channel();
160-
self.subscriptions.lock().push(sender);
165+
self.commitment_subscriptions.lock().push(sender);
161166
receiver
162167
}
163168

@@ -459,6 +464,46 @@ mod tests {
459464
(candidate_ctx, coded_candidate)
460465
}
461466

467+
#[test_traced("WARN")]
468+
fn test_coding_block_provider_parent_fetches_by_commitment() {
469+
let runner = deterministic::Runner::timed(Duration::from_secs(30));
470+
runner.start(|mut context| async move {
471+
let Fixture {
472+
participants,
473+
schemes,
474+
..
475+
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
476+
let provider = ConstantProvider::new(schemes[0].clone());
477+
let buffer = RecordingCodingBuffer::default();
478+
let (marshal, _resolver, _actor_handle) = start_coding_actor_with_recording(
479+
context.child("actor_stack"),
480+
"coding-provider-parent-commitment",
481+
provider,
482+
buffer.clone(),
483+
)
484+
.await;
485+
486+
let (parent_ctx, parent) = missing_candidate(participants[0].clone());
487+
let child_ctx = CodingCtx {
488+
round: Round::new(Epoch::zero(), View::new(2)),
489+
leader: participants[0].clone(),
490+
parent: (parent_ctx.round.view(), parent.commitment()),
491+
};
492+
let child = make_coding_block(child_ctx, parent.digest(), Height::new(2), 200);
493+
let subscription = context
494+
.child("subscribe_parent")
495+
.spawn(move |_| BlockProvider::subscribe_parent(marshal, child));
496+
497+
context.sleep(Duration::from_millis(100)).await;
498+
assert_eq!(
499+
buffer.commitment_subscription_count(),
500+
1,
501+
"parent walkback should use the coding parent commitment"
502+
);
503+
drop(subscription);
504+
});
505+
}
506+
462507
#[test_traced("WARN")]
463508
fn test_coding_verify_missing_candidate_waits_without_fetching() {
464509
let runner = deterministic::Runner::timed(Duration::from_secs(30));

consensus/src/marshal/coding/variant.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use crate::{
55
shards,
66
types::{CodedBlock, CodedBlockCfg, StoredCodedBlock},
77
},
8-
core::{Buffer, DigestFallback, Mailbox, Variant},
8+
core::{Buffer, CommitmentFallback, DigestFallback, Mailbox, Variant},
99
},
1010
simplex::types::Context,
1111
types::{coding::Commitment, Round},
@@ -131,4 +131,18 @@ where
131131
.ok()
132132
.map(<Coding<B, C, H, P> as Variant>::into_inner)
133133
}
134+
135+
async fn subscribe_parent(self, block: Self::Block) -> Option<Self::Block> {
136+
let parent_height = block.height().previous()?;
137+
let commitment = block.context().parent.1;
138+
self.subscribe_by_commitment(
139+
commitment,
140+
CommitmentFallback::FetchByCommitment {
141+
height: parent_height,
142+
},
143+
)
144+
.await
145+
.ok()
146+
.map(<Coding<B, C, H, P> as Variant>::into_inner)
147+
}
134148
}

0 commit comments

Comments
 (0)