From 8aadeeba1bac4787dc4c0f7d453b89729ea46dcf Mon Sep 17 00:00:00 2001 From: Brendon Fish Date: Tue, 15 Jul 2025 15:59:29 +0000 Subject: [PATCH] add new transaction task for building in process add the file properly create metadata and send event tie everything together fix a deadlock and check if we are leader before making block add metric for proposal recv to send time add task for keeping track of lots of stats fixes fix build don't broadcast txns --- crates/hotshot/hotshot/src/tasks/mod.rs | 5 +- .../hotshot/hotshot/src/tasks/task_state.rs | 41 +- .../hotshot/task-impls/src/block_builder.rs | 389 ++++++++++++++++++ crates/hotshot/task-impls/src/events.rs | 15 + crates/hotshot/task-impls/src/lib.rs | 3 + .../task-impls/src/quorum_vote/handlers.rs | 14 + crates/hotshot/task-impls/src/stats.rs | 5 +- crates/hotshot/task-impls/src/transactions.rs | 139 ++++--- 8 files changed, 543 insertions(+), 68 deletions(-) create mode 100644 crates/hotshot/task-impls/src/block_builder.rs diff --git a/crates/hotshot/hotshot/src/tasks/mod.rs b/crates/hotshot/hotshot/src/tasks/mod.rs index 7a5d323ae85..2d92873f2b7 100644 --- a/crates/hotshot/hotshot/src/tasks/mod.rs +++ b/crates/hotshot/hotshot/src/tasks/mod.rs @@ -21,13 +21,13 @@ use hotshot_task::task::Task; #[cfg(feature = "rewind")] use hotshot_task_impls::rewind::RewindTaskState; use hotshot_task_impls::{ + block_builder::BlockBuilderTaskState, da::DaTaskState, events::HotShotEvent, network::{NetworkEventTaskState, NetworkMessageTaskState}, request::NetworkRequestState, response::{run_response_task, NetworkResponseState}, stats::StatsTaskState, - transactions::TransactionTaskState, upgrade::UpgradeTaskState, vid::VidTaskState, view_sync::ViewSyncTaskState, @@ -232,7 +232,8 @@ pub async fn add_consensus_tasks, handle.add_task(ViewSyncTaskState::::create_from(handle).await); handle.add_task(VidTaskState::::create_from(handle).await); handle.add_task(DaTaskState::::create_from(handle).await); - handle.add_task(TransactionTaskState::::create_from(handle).await); + // handle.add_task(TransactionTaskState::::create_from(handle).await); + handle.add_task(BlockBuilderTaskState::::create_from(handle).await); { let mut upgrade_certificate_lock = handle diff --git a/crates/hotshot/hotshot/src/tasks/task_state.rs b/crates/hotshot/hotshot/src/tasks/task_state.rs index 3c6433e4105..d9953ec4a14 100644 --- a/crates/hotshot/hotshot/src/tasks/task_state.rs +++ b/crates/hotshot/hotshot/src/tasks/task_state.rs @@ -5,7 +5,8 @@ // along with the HotShot repository. If not, see . use std::{ - collections::BTreeMap, + collections::{BTreeMap, HashSet}, + num::NonZero, sync::{atomic::AtomicBool, Arc}, time::Instant, }; @@ -13,19 +14,22 @@ use std::{ use async_trait::async_trait; use chrono::Utc; use hotshot_task_impls::{ - builder::BuilderClient, consensus::ConsensusTaskState, da::DaTaskState, - quorum_proposal::QuorumProposalTaskState, quorum_proposal_recv::QuorumProposalRecvTaskState, - quorum_vote::QuorumVoteTaskState, request::NetworkRequestState, rewind::RewindTaskState, - stats::StatsTaskState, transactions::TransactionTaskState, upgrade::UpgradeTaskState, - vid::VidTaskState, view_sync::ViewSyncTaskState, + block_builder::BlockBuilderTaskState, builder::BuilderClient, consensus::ConsensusTaskState, + da::DaTaskState, quorum_proposal::QuorumProposalTaskState, + quorum_proposal_recv::QuorumProposalRecvTaskState, quorum_vote::QuorumVoteTaskState, + request::NetworkRequestState, rewind::RewindTaskState, stats::StatsTaskState, + transactions::TransactionTaskState, upgrade::UpgradeTaskState, vid::VidTaskState, + view_sync::ViewSyncTaskState, }; use hotshot_types::{ consensus::OuterConsensus, traits::{ consensus_api::ConsensusApi, node_implementation::{ConsensusTime, NodeImplementation, NodeType}, + signature_key::BuilderSignatureKey, }, }; +use lru::LruCache; use tokio::spawn; use crate::{types::SystemContextHandle, Versions}; @@ -219,6 +223,31 @@ impl, V: Versions> CreateTaskState } } +#[async_trait] +impl, V: Versions> CreateTaskState + for BlockBuilderTaskState +{ + async fn create_from(handle: &SystemContextHandle) -> Self { + let (builder_key, builder_private_key) = + TYPES::BuilderSignatureKey::generated_from_seed_indexed([0; 32], handle.hotshot.id); + Self { + cur_view: handle.cur_view().await, + cur_epoch: handle.cur_epoch().await, + membership_coordinator: handle.hotshot.membership_coordinator.clone(), + upgrade_lock: handle.hotshot.upgrade_lock.clone(), + epoch_height: handle.epoch_height, + consensus: OuterConsensus::new(handle.hotshot.consensus()), + transactions: LruCache::new(NonZero::new(10000).unwrap()), + instance_state: handle.hotshot.instance_state(), + base_fee: 1, + public_key: handle.public_key().clone(), + builder_public_key: builder_key, + builder_private_key, + decided_not_seen_txns: HashSet::new(), + } + } +} + #[async_trait] impl, V: Versions> CreateTaskState for QuorumVoteTaskState diff --git a/crates/hotshot/task-impls/src/block_builder.rs b/crates/hotshot/task-impls/src/block_builder.rs new file mode 100644 index 00000000000..1d23feaba60 --- /dev/null +++ b/crates/hotshot/task-impls/src/block_builder.rs @@ -0,0 +1,389 @@ +use std::{collections::HashMap, sync::Arc}; + +use alloy::primitives::map::HashSet; +use async_broadcast::{Receiver, Sender}; +use async_trait::async_trait; +use committable::{Commitment, Committable}; +use hotshot_task::task::TaskState; +use hotshot_types::{ + consensus::{Consensus, OuterConsensus}, + data::{Leaf2, PackedBundle}, + epoch_membership::EpochMembershipCoordinator, + message::UpgradeLock, + traits::{ + block_contents::{BlockHeader, BuilderFee}, + node_implementation::{ConsensusTime, NodeType, Versions}, + signature_key::BuilderSignatureKey, + BlockPayload, EncodeBytes, ValidatedState, + }, + utils::{is_epoch_transition, is_last_block}, +}; +use hotshot_utils::anytrace::*; +use lru::LruCache; +use vbs::version::{StaticVersionType, Version}; + +use crate::{ + events::{HotShotEvent, HotShotTaskCompleted}, + helpers::broadcast_event, + transactions::send_empty_block, +}; + +pub struct BlockBuilderTaskState { + /// View number this view is executing in. + pub cur_view: TYPES::View, + + /// Epoch number this node is executing in. + pub cur_epoch: Option, + + /// Membership for the quorum + pub membership_coordinator: EpochMembershipCoordinator, + + /// Lock for a decided upgrade + pub upgrade_lock: UpgradeLock, + + /// Number of blocks in an epoch, zero means there are no epochs + pub epoch_height: u64, + + /// The consensus state + pub consensus: OuterConsensus, + + pub transactions: LruCache::Transaction>, TYPES::Transaction>, + + /// Instance state + pub instance_state: Arc, + + /// Base fee + pub base_fee: u64, + + /// This Nodes Public Key + pub public_key: TYPES::SignatureKey, + + /// This Nodes Public Key + pub builder_public_key: TYPES::BuilderSignatureKey, + + /// Our Private Key + pub builder_private_key: ::BuilderPrivateKey, + + /// Transactions that were decided but not seen in the block builder + pub decided_not_seen_txns: HashSet::Transaction>>, +} + +async fn collect_txns( + proposed_leaf: &Leaf2, + consensus: &Consensus, +) -> HashMap::Transaction>, TYPES::Transaction> { + let mut txns = HashMap::new(); + + // We've reached decide, now get the leaf chain all the way back to the last decided view, not including it. + let old_anchor_view = consensus.last_decided_view(); + let mut current_leaf = Some(proposed_leaf.clone()); + while current_leaf + .as_ref() + .is_some_and(|leaf| leaf.view_number() > old_anchor_view) + { + // unwrap is safe, we just checked that he option is some + let leaf = &mut current_leaf.unwrap(); + + // If the block payload is available for this leaf add the transactions to the set + if let Some(payload) = consensus.saved_payloads().get(&leaf.view_number()) { + for txn in payload.payload.transactions(leaf.block_header().metadata()) { + txns.insert(txn.commit(), txn.clone()); + } + } + + current_leaf = consensus + .saved_leaves() + .get(&leaf.justify_qc().data.leaf_commit) + .cloned(); + } + + txns +} + +impl BlockBuilderTaskState { + pub async fn build_block( + &mut self, + view: TYPES::View, + epoch: Option, + event_stream: Sender>>, + version: Version, + ) -> Option { + let Some(proposal) = self + .consensus + .read() + .await + .last_proposals() + .get(&view) + .cloned() + else { + tracing::error!("No proposal found for view {view}, sending empty block"); + send_empty_block::( + &self.consensus, + &self.membership_coordinator, + &event_stream, + view, + epoch, + version, + ) + .await; + return None; + }; + + let leaf = Leaf2::from_quorum_proposal(&proposal.data); + let consensus_reader = self.consensus.read().await; + let in_flight_txns = collect_txns(&leaf, &*consensus_reader).await; + + let mut block = vec![]; + for (txn_hash, txn) in self.transactions.iter().rev() { + if !in_flight_txns.contains_key(txn_hash) { + block.push(txn.clone()); + } + } + + let consensus_reader = self.consensus.read().await; + + let maybe_validated_state = match consensus_reader.validated_state_map().get(&view) { + Some(view) => view.state().cloned(), + None => None, + }; + + let validated_state = maybe_validated_state + .unwrap_or_else(|| Arc::new(TYPES::ValidatedState::from_header(leaf.block_header()))); + + let Some((payload, metadata)) = + >::from_transactions( + block.into_iter(), + &validated_state, + &self.instance_state, + ) + .await + .ok() + else { + tracing::error!("Failed to build block payload, sending empty block"); + send_empty_block::( + &self.consensus, + &self.membership_coordinator, + &event_stream, + view, + epoch, + version, + ) + .await; + return None; + }; + + let encoded_payload = payload.encode(); + let encoded_txns: Vec = encoded_payload.to_vec(); + let block_size: u64 = encoded_txns.len() as u64; + let offered_fee: u64 = self.base_fee * block_size; + + let Some(signature_over_fee_info) = + TYPES::BuilderSignatureKey::sign_fee(&self.builder_private_key, offered_fee, &metadata) + .ok() + else { + tracing::error!("Failed to sign fee, sending empty block"); + send_empty_block::( + &self.consensus, + &self.membership_coordinator, + &event_stream, + view, + epoch, + version, + ) + .await; + return None; + }; + let builder_fee = BuilderFee { + fee_amount: offered_fee, + fee_account: self.builder_public_key.clone(), + fee_signature: signature_over_fee_info, + }; + + broadcast_event( + Arc::new(HotShotEvent::BlockRecv(PackedBundle::new( + encoded_payload, + metadata, + view, + epoch, + vec1::vec1![builder_fee], + ))), + &event_stream, + ) + .await; + + None + } + pub async fn handle_view_change( + &mut self, + view: TYPES::View, + epoch: Option, + event_stream: Sender>>, + ) -> Option { + let version = match self.upgrade_lock.version(view).await { + Ok(v) => v, + Err(err) => { + tracing::error!( + "Upgrade certificate requires unsupported version, refusing to request \ + blocks: {err}" + ); + return None; + }, + }; + + // Short circuit if we are in epochs and we are likely proposing a transition block + // If it's the first view of the upgrade, we don't need to check for transition blocks + if version >= V::Epochs::VERSION { + let Some(epoch) = epoch else { + tracing::error!("Epoch is required for epoch-based view change"); + return None; + }; + let high_qc = self.consensus.read().await.high_qc().clone(); + let mut high_qc_block_number = if let Some(bn) = high_qc.data.block_number { + bn + } else { + // If it's the first view after the upgrade the high QC won't have a block number + // So just use the highest_block number we've stored + if view + > self + .upgrade_lock + .upgrade_view() + .await + .unwrap_or(TYPES::View::new(0)) + + 1 + { + tracing::warn!("High QC in epoch version and not the first QC after upgrade"); + send_empty_block::( + &self.consensus, + &self.membership_coordinator, + &event_stream, + view, + Some(epoch), + version, + ) + .await; + return None; + } + // 0 here so we use the highest block number in the calculation below + 0 + }; + high_qc_block_number = std::cmp::max( + high_qc_block_number, + self.consensus.read().await.highest_block, + ); + if self + .consensus + .read() + .await + .transition_qc() + .is_some_and(|qc| { + let Some(e) = qc.0.data.epoch else { + return false; + }; + e == epoch + }) + || is_epoch_transition(high_qc_block_number, self.epoch_height) + { + // We are proposing a transition block it should be empty + if !is_last_block(high_qc_block_number, self.epoch_height) { + tracing::info!( + "Sending empty block event. View number: {view}. Parent Block number: \ + {high_qc_block_number}" + ); + send_empty_block::( + &self.consensus, + &self.membership_coordinator, + &event_stream, + view, + Some(epoch), + version, + ) + .await; + return None; + } + } + } + self.build_block(view, epoch, event_stream, version).await; + None + } + + async fn handle_transactions( + &mut self, + transactions: &Vec, + ) -> Option { + for txn in transactions { + // ignore decided txns + if self.decided_not_seen_txns.remove(&txn.commit()) { + continue; + } + self.transactions.push(txn.commit(), txn.clone()); + } + None + } + + pub async fn handle( + &mut self, + event: Arc>, + sender: Sender>>, + ) -> Result<()> { + match event.as_ref() { + HotShotEvent::TransactionsRecv(transactions) => { + self.handle_transactions(transactions).await; + // broadcast_event( + // Arc::new(HotShotEvent::TransactionsRecv(transactions.clone())), + // &sender, + // ) + // .await; + }, + HotShotEvent::ViewChange(view, epoch) => { + let view = TYPES::View::new(std::cmp::max(1, **view)); + ensure!( + *view > *self.cur_view && *epoch >= self.cur_epoch, + debug!( + "Received a view change to an older view and epoch: tried to change view \ + to {view}and epoch {epoch:?} though we are at view {} and epoch {:?}", + self.cur_view, self.cur_epoch + ) + ); + self.cur_view = view; + self.cur_epoch = *epoch; + + let leader = self + .membership_coordinator + .membership_for_epoch(*epoch) + .await? + .leader(view) + .await?; + if leader == self.public_key { + self.handle_view_change(view, *epoch, sender.clone()).await; + return Ok(()); + } + }, + HotShotEvent::ViewDecided(_, txns) => { + for txn in txns { + // Remove the txn from our mempool if it's in there, else store it to prevent a later duplicate + if self.transactions.pop(txn).is_none() { + self.decided_not_seen_txns.insert(*txn); + } + } + }, + _ => {}, + } + Ok(()) + } +} + +#[async_trait] +impl TaskState for BlockBuilderTaskState { + type Event = HotShotEvent; + + async fn handle_event( + &mut self, + event: Arc, + sender: &Sender>, + _receiver: &Receiver>, + ) -> Result<()> { + self.handle(event, sender.clone()).await + } + + fn cancel_subtasks(&mut self) {} +} diff --git a/crates/hotshot/task-impls/src/events.rs b/crates/hotshot/task-impls/src/events.rs index 287299c9f48..8c4ec85a94d 100644 --- a/crates/hotshot/task-impls/src/events.rs +++ b/crates/hotshot/task-impls/src/events.rs @@ -7,6 +7,7 @@ use std::fmt::Display; use async_broadcast::Sender; +use committable::Commitment; use either::Either; use hotshot_task::task::TaskEvent; use hotshot_types::{ @@ -297,6 +298,12 @@ pub enum HotShotEvent { EpochRootQcRecv(EpochRootQuorumCertificate, TYPES::SignatureKey), /// We decided the given leaves LeavesDecided(Vec>), + + /// A view has been decided + ViewDecided( + Vec>, + Vec::Transaction>>, + ), } impl HotShotEvent { @@ -393,6 +400,7 @@ impl HotShotEvent { }, HotShotEvent::SetFirstEpoch(..) => None, HotShotEvent::LeavesDecided(..) => None, + HotShotEvent::ViewDecided(leaves, _) => leaves.first().map(|leaf| leaf.view_number()), } } } @@ -713,6 +721,13 @@ impl Display for HotShotEvent { HotShotEvent::LeavesDecided(leaf) => { write!(f, "LeavesDecided(leaf={leaf:?})") }, + HotShotEvent::ViewDecided(leaves, _) => { + write!( + f, + "ViewDecided(view_number={:?}", + leaves.first().map(|leaf| leaf.view_number()) + ) + }, } } } diff --git a/crates/hotshot/task-impls/src/lib.rs b/crates/hotshot/task-impls/src/lib.rs index 313772fbd47..2e807200c2d 100644 --- a/crates/hotshot/task-impls/src/lib.rs +++ b/crates/hotshot/task-impls/src/lib.rs @@ -10,6 +10,9 @@ /// The task which implements the core state logic of consensus. pub mod consensus; +/// The task which implements the block builder +pub mod block_builder; + /// The task which handles the logic for the quorum vote. pub mod quorum_vote; diff --git a/crates/hotshot/task-impls/src/quorum_vote/handlers.rs b/crates/hotshot/task-impls/src/quorum_vote/handlers.rs index 24fdc6791bf..6d1c0b4573e 100644 --- a/crates/hotshot/task-impls/src/quorum_vote/handlers.rs +++ b/crates/hotshot/task-impls/src/quorum_vote/handlers.rs @@ -213,12 +213,14 @@ pub(crate) async fn handle_quorum_proposal_validated< // We don't need to hold this while we broadcast drop(consensus_writer); + let mut decided_leaves = vec![]; for leaf_info in &leaf_views { tracing::info!( "Sending decide for view {:?} at height {:?}", leaf_info.leaf.view_number(), leaf_info.leaf.block_header().block_number(), ); + decided_leaves.push(leaf_info.leaf.clone()); } broadcast_event( @@ -232,6 +234,18 @@ pub(crate) async fn handle_quorum_proposal_validated< ) .await; + let decided_txns = if let Some(txns) = &included_txns { + txns.iter().cloned().collect() + } else { + vec![] + }; + + broadcast_event( + Arc::new(HotShotEvent::ViewDecided(decided_leaves, decided_txns)), + event_sender, + ) + .await; + // Send an update to everyone saying that we've reached a decide broadcast_event( Event { diff --git a/crates/hotshot/task-impls/src/stats.rs b/crates/hotshot/task-impls/src/stats.rs index b045427bb2f..1cecabe1bec 100644 --- a/crates/hotshot/task-impls/src/stats.rs +++ b/crates/hotshot/task-impls/src/stats.rs @@ -185,7 +185,9 @@ impl TaskState for StatsTaskState { self.replica_entry(proposal.data.view_number()) .proposal_recv = Some(now); }, - HotShotEvent::QuorumVoteRecv(_vote) => {}, + HotShotEvent::QuorumVoteRecv(_vote) => { + // self.leader_entry(vote.view_number()).vote_recv = Some(now); + }, HotShotEvent::TimeoutVoteRecv(_vote) => {}, HotShotEvent::TimeoutVoteSend(vote) => { self.replica_entry(vote.view_number()).timeout_vote_send = Some(now); @@ -218,6 +220,7 @@ impl TaskState for StatsTaskState { // calculate the elapsed time as milliseconds (from nanoseconds) let elapsed_time = (now - previous_proposal_time) / 1_000_000; + if elapsed_time > 0 { self.consensus .read() diff --git a/crates/hotshot/task-impls/src/transactions.rs b/crates/hotshot/task-impls/src/transactions.rs index bd05abb1ebe..0f4ccd79b55 100644 --- a/crates/hotshot/task-impls/src/transactions.rs +++ b/crates/hotshot/task-impls/src/transactions.rs @@ -55,6 +55,59 @@ const BUILDER_MINIMUM_QUERY_TIME: Duration = Duration::from_millis(300); /// Delay between re-tries on unsuccessful calls const RETRY_DELAY: Duration = Duration::from_millis(100); +/// Send the event to the event stream that we are proposing an empty block +pub async fn send_empty_block( + consensus: &OuterConsensus, + membership_coordinator: &EpochMembershipCoordinator, + event_stream: &Sender>>, + block_view: TYPES::View, + block_epoch: Option, + version: Version, +) { + // If we couldn't get a block, send an empty block + tracing::info!("Failed to get a block for view {block_view}, proposing empty block"); + + // Increment the metric for number of empty blocks proposed + consensus + .write() + .await + .metrics + .number_of_empty_blocks_proposed + .add(1); + + let num_storage_nodes = match membership_coordinator + .stake_table_for_epoch(block_epoch) + .await + { + Ok(epoch_stake_table) => epoch_stake_table.total_nodes().await, + Err(e) => { + tracing::warn!("Failed to get num_storage_nodes for epoch {block_epoch:?}: {e}"); + return; + }, + }; + + let Some(null_fee) = null_block::builder_fee::(num_storage_nodes, version) else { + tracing::error!("Failed to get null fee"); + return; + }; + + // Create an empty block payload and metadata + let (_, metadata) = ::BlockPayload::empty(); + + // Broadcast the empty block + broadcast_event( + Arc::new(HotShotEvent::BlockRecv(PackedBundle::new( + vec![].into(), + metadata, + block_view, + block_epoch, + vec1::vec1![null_fee], + ))), + event_stream, + ) + .await; +} + /// Builder Provided Responses pub struct BuilderResponse { /// Fee information @@ -170,8 +223,15 @@ impl TransactionTaskState { + 1 { tracing::warn!("High QC in epoch version and not the first QC after upgrade"); - self.send_empty_block(event_stream, block_view, block_epoch, version) - .await; + send_empty_block::( + &self.consensus, + &self.membership_coordinator, + event_stream, + block_view, + block_epoch, + version, + ) + .await; return None; } // 0 here so we use the highest block number in the calculation below @@ -200,8 +260,15 @@ impl TransactionTaskState { "Sending empty block event. View number: {block_view}. Parent Block \ number: {high_qc_block_number}" ); - self.send_empty_block(event_stream, block_view, block_epoch, version) - .await; + send_empty_block::( + &self.consensus, + &self.membership_coordinator, + event_stream, + block_view, + block_epoch, + version, + ) + .await; return None; } } @@ -241,66 +308,20 @@ impl TransactionTaskState { ) .await; } else { - self.send_empty_block(event_stream, block_view, block_epoch, version) - .await; + send_empty_block::( + &self.consensus, + &self.membership_coordinator, + event_stream, + block_view, + block_epoch, + version, + ) + .await; }; return None; } - /// Send the event to the event stream that we are proposing an empty block - async fn send_empty_block( - &self, - event_stream: &Sender>>, - block_view: TYPES::View, - block_epoch: Option, - version: Version, - ) { - // If we couldn't get a block, send an empty block - tracing::info!("Failed to get a block for view {block_view}, proposing empty block"); - - // Increment the metric for number of empty blocks proposed - self.consensus - .write() - .await - .metrics - .number_of_empty_blocks_proposed - .add(1); - - let num_storage_nodes = match self - .membership_coordinator - .stake_table_for_epoch(block_epoch) - .await - { - Ok(epoch_stake_table) => epoch_stake_table.total_nodes().await, - Err(e) => { - tracing::warn!("Failed to get num_storage_nodes for epoch {block_epoch:?}: {e}"); - return; - }, - }; - - let Some(null_fee) = null_block::builder_fee::(num_storage_nodes, version) else { - tracing::error!("Failed to get null fee"); - return; - }; - - // Create an empty block payload and metadata - let (_, metadata) = ::BlockPayload::empty(); - - // Broadcast the empty block - broadcast_event( - Arc::new(HotShotEvent::BlockRecv(PackedBundle::new( - vec![].into(), - metadata, - block_view, - block_epoch, - vec1::vec1![null_fee], - ))), - event_stream, - ) - .await; - } - /// Produce a null block pub async fn null_block( &self,