Skip to content

Commit 1dcf9ec

Browse files
nits
1 parent 1d60e9e commit 1dcf9ec

6 files changed

Lines changed: 284 additions & 302 deletions

File tree

consensus/src/aggregation/engine.rs

Lines changed: 46 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -378,21 +378,18 @@ impl<
378378
}
379379

380380
// Validate that we need to process the ack
381-
let ack = match self.validate_ack(ack, &sender).await {
382-
Ok(ack) => ack,
383-
Err(err) => {
384-
if err.blockable() {
385-
commonware_p2p::block!(
386-
self.blocker,
387-
sender,
388-
?err,
389-
"ack validation failure"
390-
);
391-
} else {
392-
debug!(?sender, ?err, "ack validate failed");
393-
}
394-
continue;
381+
if let Err(err) = self.validate_ack(&ack, &sender) {
382+
if err.blockable() {
383+
commonware_p2p::block!(
384+
self.blocker,
385+
sender,
386+
?err,
387+
"ack validation failure"
388+
);
389+
} else {
390+
debug!(?sender, ?err, "ack validate failed");
395391
}
392+
continue;
396393
};
397394

398395
// Handle the ack
@@ -498,69 +495,40 @@ impl<
498495
let scheme = self.scheme(ack.epoch)?;
499496
let quorum = scheme.participants().quorum::<N3f1>();
500497

501-
let height = ack.item.height;
502-
let epoch = ack.epoch;
503-
let digest = ack.item.digest;
504-
505-
let acks = {
506-
// Get the acks and check digest consistency
507-
let acks_by_epoch = match self.pending.get_mut(&height) {
508-
None => {
509-
// If the height is not in the pending pool, it may be confirmed
510-
// (i.e. we have a certificate for it).
511-
return Err(Error::AckHeight(height));
512-
}
513-
Some(Pending::Unverified(acks)) => acks,
514-
Some(Pending::Verified(verified, acks)) => {
515-
// If we have a verified digest, ensure the ack matches it
516-
if digest != *verified {
517-
return Err(Error::AckDigest(height));
518-
}
519-
acks
520-
}
521-
};
522-
523-
// Add the attestation (if not already present)
524-
let acks = acks_by_epoch.entry(epoch).or_default();
525-
if acks.contains_key(&ack.attestation.signer) {
526-
return Ok(());
498+
// Get the acks and check digest consistency
499+
let acks_by_epoch = match self.pending.get_mut(&ack.item.height) {
500+
None => {
501+
// If the height is not in the pending pool, it may be confirmed
502+
// (i.e. we have a certificate for it).
503+
return Err(Error::AckHeight(ack.item.height));
527504
}
528-
acks.insert(ack.attestation.signer, ack.clone());
529-
530-
// If a quorum shares this digest, move the map into the task and return it after
531-
// construction.
532-
if acks.values().filter(|a| a.item.digest == digest).count() < quorum as usize {
533-
return Ok(());
505+
Some(Pending::Unverified(acks)) => acks,
506+
Some(Pending::Verified(digest, acks)) => {
507+
// If we have a verified digest, ensure the ack matches it
508+
if ack.item.digest != *digest {
509+
return Err(Error::AckDigest(ack.item.height));
510+
}
511+
acks
534512
}
535-
std::mem::take(acks)
536-
};
537-
538-
let strategy = self.strategy.clone();
539-
let handle = self
540-
.context
541-
.child("construct_certificate")
542-
.with_attribute("height", height)
543-
.with_attribute("epoch", epoch)
544-
.shared(true)
545-
.spawn(move |_| async move {
546-
let certificate = Certificate::from_acks(
547-
&*scheme,
548-
acks.values().filter(|ack| ack.item.digest == digest),
549-
&strategy,
550-
);
551-
(acks, certificate)
552-
});
553-
let (returned, certificate) = handle.await.expect("strategy task failed");
554-
let acks_by_epoch = match self.pending.get_mut(&height) {
555-
Some(Pending::Unverified(acks)) | Some(Pending::Verified(_, acks)) => acks,
556-
None => panic!("pending entry not found"),
557513
};
558-
let replaced = acks_by_epoch.insert(epoch, returned);
559-
assert!(replaced.is_some_and(|acks| acks.is_empty()));
560514

561-
if let Some(certificate) = certificate {
562-
self.metrics.certificates.inc();
563-
self.handle_certificate(certificate).await;
515+
// Add the attestation (if not already present)
516+
let acks = acks_by_epoch.entry(ack.epoch).or_default();
517+
if acks.contains_key(&ack.attestation.signer) {
518+
return Ok(());
519+
}
520+
acks.insert(ack.attestation.signer, ack.clone());
521+
522+
// If there exists a quorum of acks with the same digest (or for the verified digest if it exists), form a certificate
523+
let filtered = acks
524+
.values()
525+
.filter(|a| a.item.digest == ack.item.digest)
526+
.collect::<Vec<_>>();
527+
if filtered.len() >= quorum as usize {
528+
if let Some(certificate) = Certificate::from_acks(&*scheme, filtered, &self.strategy) {
529+
self.metrics.certificates.inc();
530+
self.handle_certificate(certificate).await;
531+
}
564532
}
565533

566534
Ok(())
@@ -640,11 +608,11 @@ impl<
640608
/// Takes a raw ack (from sender) from the p2p network and validates it.
641609
///
642610
/// Returns an error if the ack is invalid.
643-
async fn validate_ack(
611+
fn validate_ack(
644612
&mut self,
645-
ack: Ack<P::Scheme, D>,
613+
ack: &Ack<P::Scheme, D>,
646614
sender: &<P::Scheme as Scheme>::PublicKey,
647-
) -> Result<Ack<P::Scheme, D>, Error> {
615+
) -> Result<(), Error> {
648616
// Validate epoch
649617
{
650618
let (eb_lo, eb_hi) = self.epoch_bounds;
@@ -706,23 +674,11 @@ impl<
706674
}
707675

708676
// Validate signature
709-
let strategy = self.strategy.clone();
710-
let handle =
711-
self.context
712-
.child("verify_ack")
713-
.with_attribute("height", ack.item.height)
714-
.with_attribute("epoch", ack.epoch)
715-
.shared(true)
716-
.spawn(move |mut context| async move {
717-
let valid = ack.verify(&mut context, &*scheme, &strategy);
718-
(ack, valid)
719-
});
720-
let (ack, valid) = handle.await.expect("strategy task failed");
721-
if !valid {
677+
if !ack.verify(self.context.as_mut(), &*scheme, &self.strategy) {
722678
return Err(Error::InvalidAckSignature);
723679
}
724680

725-
Ok(ack)
681+
Ok(())
726682
}
727683

728684
// ---------- Helpers ----------

0 commit comments

Comments
 (0)