Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::{
use async_broadcast::{broadcast, InactiveReceiver, Sender};
use async_lock::RwLock;
use async_trait::async_trait;
use futures::{join, select, FutureExt};
use futures::join;
#[cfg(feature = "hotshot-testing")]
use hotshot_types::traits::network::{
AsyncGenerator, NetworkReliability, TestableNetworkingImplementation,
Expand Down Expand Up @@ -440,12 +440,20 @@ impl<TYPES: NodeType> ConnectedNetwork<TYPES::SignatureKey> for CombinedNetworks
async fn recv_message(&self) -> Result<Vec<u8>, NetworkError> {
loop {
// Receive from both networks
let mut primary_fut = self.primary().recv_message().fuse();
let mut secondary_fut = self.secondary().recv_message().fuse();
let primary_fut = async move {
loop {
let p = self.primary().recv_message().await?;
if !p.is_empty() {
return Ok::<_, NetworkError>(p);
}
tracing::warn!("Received empty message from primary network");
}
};
let secondary_fut = self.secondary().recv_message();

// Wait for one to return a message
let (message, from_primary) = select! {
p = primary_fut => (p?, true),
let (message, from_primary) = tokio::select! {
Ok(p) = primary_fut => (p, true),
s = secondary_fut => (s?, false),
};

Expand Down
8 changes: 4 additions & 4 deletions crates/hotshot/task-impls/src/da.rs
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> DaTaskState<TYP
encoded_transactions,
metadata,
view_number,
epoch_number,
..
} = packed_bundle;
let view_number = *view_number;
Expand All @@ -426,12 +427,11 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> DaTaskState<TYP
TYPES::SignatureKey::sign(&self.private_key, &encoded_transactions_hash)
.wrap()?;

let epoch = self.cur_epoch;
let leader = self
.membership_coordinator
.membership_for_epoch(epoch)
.membership_for_epoch(*epoch_number)
.await
.context(warn!("No stake table for epoch"))?
.context(warn!("No stake table for epoch {epoch_number:?}"))?
.leader(view_number)
.await?;
if leader != self.public_key {
Expand Down Expand Up @@ -482,7 +482,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> DaTaskState<TYP
metadata: metadata.clone(),
// Upon entering a new view we want to send a DA Proposal for the next view -> Is it always the case that this is cur_view + 1?
view_number,
epoch,
epoch: *epoch_number,
epoch_transition_indicator,
};

Expand Down
1 change: 1 addition & 0 deletions crates/hotshot/task-impls/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ pub enum HotShotEvent<TYPES: NodeType> {
BuilderCommitment,
<TYPES::BlockPayload as BlockPayload<TYPES>>::Metadata,
TYPES::View,
Option<TYPES::Epoch>,
Vec1<BuilderFee<TYPES>>,
),
/// Event when the transactions task has sequenced transactions. Contains the encoded transactions, the metadata, and the view number
Expand Down
24 changes: 12 additions & 12 deletions crates/hotshot/task-impls/src/quorum_proposal/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use hotshot_task::dependency_task::HandleDepOutput;
use hotshot_types::{
consensus::{CommitmentAndMetadata, OuterConsensus},
data::{Leaf2, QuorumProposal2, QuorumProposalWrapper, VidDisperse, ViewChangeEvidence2},
epoch_membership::EpochMembership,
epoch_membership::EpochMembershipCoordinator,
message::Proposal,
simple_certificate::{
LightClientStateUpdateCertificateV2, NextEpochQuorumCertificate2, QuorumCertificate2,
Expand Down Expand Up @@ -91,7 +91,7 @@ pub struct ProposalDependencyHandle<TYPES: NodeType, V: Versions> {
pub instance_state: Arc<TYPES::InstanceState>,

/// Membership for Quorum Certs/votes
pub membership: EpochMembership<TYPES>,
pub membership_coordinator: EpochMembershipCoordinator<TYPES>,

/// Our public key
pub public_key: TYPES::SignatureKey,
Expand Down Expand Up @@ -152,7 +152,7 @@ impl<TYPES: NodeType, V: Versions> ProposalDependencyHandle<TYPES, V> {
qc,
maybe_next_epoch_qc.as_ref(),
&self.consensus,
&self.membership.coordinator,
&self.membership_coordinator,
&self.upgrade_lock,
self.epoch_height,
)
Expand All @@ -168,7 +168,7 @@ impl<TYPES: NodeType, V: Versions> ProposalDependencyHandle<TYPES, V> {
if let Some(state_cert) = &maybe_state_cert {
if validate_light_client_state_update_certificate(
state_cert,
&self.membership.coordinator,
&self.membership_coordinator,
&self.upgrade_lock,
)
.await
Expand Down Expand Up @@ -235,7 +235,7 @@ impl<TYPES: NodeType, V: Versions> ProposalDependencyHandle<TYPES, V> {
qc,
Some(next_epoch_qc),
&self.consensus,
&self.membership.coordinator,
&self.membership_coordinator,
&self.upgrade_lock,
self.epoch_height,
)
Expand Down Expand Up @@ -278,7 +278,7 @@ impl<TYPES: NodeType, V: Versions> ProposalDependencyHandle<TYPES, V> {
qc,
Some(next_epoch_qc),
&self.consensus,
&self.membership.coordinator,
&self.membership_coordinator,
&self.upgrade_lock,
self.epoch_height,
)
Expand Down Expand Up @@ -360,7 +360,7 @@ impl<TYPES: NodeType, V: Versions> ProposalDependencyHandle<TYPES, V> {
qc,
maybe_next_epoch_qc.as_ref(),
&self.consensus,
&self.membership.coordinator,
&self.membership_coordinator,
&self.upgrade_lock,
self.epoch_height,
)
Expand All @@ -376,7 +376,7 @@ impl<TYPES: NodeType, V: Versions> ProposalDependencyHandle<TYPES, V> {
if let Some(state_cert) = &maybe_state_cert {
if validate_light_client_state_update_certificate(
state_cert,
&self.membership.coordinator,
&self.membership_coordinator,
&self.upgrade_lock,
)
.await
Expand Down Expand Up @@ -457,7 +457,7 @@ impl<TYPES: NodeType, V: Versions> ProposalDependencyHandle<TYPES, V> {
let (parent_leaf, state) = parent_leaf_and_state(
&self.sender,
&self.receiver,
self.membership.coordinator.clone(),
self.membership_coordinator.clone(),
self.public_key.clone(),
self.private_key.clone(),
OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus)),
Expand Down Expand Up @@ -567,8 +567,7 @@ impl<TYPES: NodeType, V: Versions> ProposalDependencyHandle<TYPES, V> {
);

let epoch_membership = self
.membership
.coordinator
.membership_coordinator
.membership_for_epoch(epoch)
.await?;
// Make sure we are the leader for the view and epoch.
Expand Down Expand Up @@ -648,7 +647,7 @@ impl<TYPES: NodeType, V: Versions> ProposalDependencyHandle<TYPES, V> {
signature,
_pd: PhantomData,
};
tracing::info!(
tracing::warn!(
"Sending proposal for view {}, height {}, justify_qc view: {}",
proposed_leaf.view_number(),
proposed_leaf.height(),
Expand Down Expand Up @@ -687,6 +686,7 @@ impl<TYPES: NodeType, V: Versions> ProposalDependencyHandle<TYPES, V> {
builder_commitment,
metadata,
view,
_epoch_number,
fees,
) => {
commit_and_metadata = Some(CommitmentAndMetadata {
Expand Down
Loading
Loading