diff --git a/node/bft/src/bft.rs b/node/bft/src/bft.rs index 64b48cad35..3ebb959fda 100644 --- a/node/bft/src/bft.rs +++ b/node/bft/src/bft.rs @@ -80,7 +80,7 @@ pub struct BFT { leader_certificate_timer: Arc, /// The consensus sender. consensus_sender: Arc>>, - /// The spawned handles. + /// Handles for all spawned tasks. handles: Arc>>>, /// The BFT lock. lock: Arc>, @@ -109,6 +109,9 @@ impl BFT { } /// Run the BFT instance. + /// + /// This will return as soon as all required tasks are spawned. + /// The function must not be called more than once per instance. pub async fn run( &mut self, consensus_sender: Option>, diff --git a/node/bft/src/helpers/ready.rs b/node/bft/src/helpers/ready.rs index 3e4dfd7b06..92a74b5daf 100644 --- a/node/bft/src/helpers/ready.rs +++ b/node/bft/src/helpers/ready.rs @@ -25,6 +25,7 @@ use snarkvm::{ use indexmap::{IndexMap, IndexSet}; use std::collections::{HashMap, VecDeque, hash_map::Entry::Vacant}; +/// Maintains a queue of verified ("ready") transmissions. #[derive(Clone, Debug)] pub struct Ready { /// Maps each transmission ID to its logical index (physical index + offset) diff --git a/node/bft/src/primary.rs b/node/bft/src/primary.rs index 80edaaee45..a6bb6a87ca 100644 --- a/node/bft/src/primary.rs +++ b/node/bft/src/primary.rs @@ -1133,6 +1133,13 @@ impl Primary { impl Primary { /// Starts the primary handlers. + /// + /// For each receiver in the `primary_receiver` struct, there will be a dedicated tasks + /// that awaits new data and handles it accordingly. + /// Additionally, this spawns a task that periodically issues PrimaryPings and one that periodically + /// tries to move the the next round of batches. + /// + /// This function is called exactly once, in `Self::run()`. fn start_handlers(&self, primary_receiver: PrimaryReceiver) { let PrimaryReceiver { mut rx_batch_propose, @@ -1143,7 +1150,7 @@ impl Primary { mut rx_unconfirmed_transaction, } = primary_receiver; - // Start the primary ping. + // Start the primary ping sender. let self_ = self.clone(); self.spawn(async move { loop { @@ -1277,7 +1284,7 @@ impl Primary { } }); - // Process the proposed batch. + // Start the proposed batch handler. let self_ = self.clone(); self.spawn(async move { while let Some((peer_ip, batch_propose)) = rx_batch_propose.recv().await { @@ -1298,7 +1305,7 @@ impl Primary { } }); - // Process the batch signature. + // Start the batch signature handler. let self_ = self.clone(); self.spawn(async move { while let Some((peer_ip, batch_signature)) = rx_batch_signature.recv().await { @@ -1319,7 +1326,7 @@ impl Primary { } }); - // Process the certified batch. + // Start the certified batch handler. let self_ = self.clone(); self.spawn(async move { while let Some((peer_ip, batch_certificate)) = rx_batch_certified.recv().await { @@ -1346,7 +1353,8 @@ impl Primary { } }); - // Periodically try to increment to the next round. + // This task periodically tries to move to the next round. + // // Note: This is necessary to ensure that the primary is not stuck on a previous round // despite having received enough certificates to advance to the next round. let self_ = self.clone(); @@ -1385,7 +1393,7 @@ impl Primary { } }); - // Process the unconfirmed solutions. + // Start a handler to process new unconfirmed solutions. let self_ = self.clone(); self.spawn(async move { while let Some((solution_id, solution, callback)) = rx_unconfirmed_solution.recv().await { @@ -1411,7 +1419,7 @@ impl Primary { } }); - // Process the unconfirmed transactions. + // Start a handler to process new unconfirmed transactions. let self_ = self.clone(); self.spawn(async move { while let Some((transaction_id, transaction, callback)) = rx_unconfirmed_transaction.recv().await { diff --git a/node/bft/src/worker.rs b/node/bft/src/worker.rs index bc724c35e3..9e31fc4db3 100644 --- a/node/bft/src/worker.rs +++ b/node/bft/src/worker.rs @@ -42,6 +42,8 @@ use rand::seq::IteratorRandom; use std::{future::Future, net::SocketAddr, sync::Arc, time::Duration}; use tokio::{sync::oneshot, task::JoinHandle, time::timeout}; +/// A worker's main role is maintaining a queue of verified ("ready") transmissions, +/// which will eventually be fetched by the primary when the primary generates a new batch. #[derive(Clone)] pub struct Worker { /// The worker ID. @@ -111,8 +113,6 @@ impl Worker { /// The maximum number of transmissions allowed in a worker ping. pub const MAX_TRANSMISSIONS_PER_WORKER_PING: usize = BatchHeader::::MAX_TRANSMISSIONS_PER_BATCH / 10; - // transmissions - /// Returns the number of transmissions in the ready queue. pub fn num_transmissions(&self) -> usize { self.ready.read().num_transmissions() diff --git a/node/consensus/README.md b/node/consensus/README.md index 800d1ea643..fe3106241b 100644 --- a/node/consensus/README.md +++ b/node/consensus/README.md @@ -6,5 +6,5 @@ The `snarkos-node-consensus` crate provides the consensus layer for the snarkOS node. -It builds on top of the `snarkos-node-bft` crate, which provides an abstract implementationv of the AleoBFT. -More concretely, this crate provides the communication logic that allows multiple AleoBFT nodes to interact with each other. +The crate builds on top of the `snarkos-node-bft`, which implements AleoBFT. +It manages a ratelimiter/mempool for incoming transmissions, and manages construction of blocks from batches that have been confirmed by the BFT layer. diff --git a/node/consensus/src/lib.rs b/node/consensus/src/lib.rs index bc139587b7..c152f6bb51 100644 --- a/node/consensus/src/lib.rs +++ b/node/consensus/src/lib.rs @@ -25,11 +25,11 @@ use snarkos_node_bft::{ Primary, helpers::{ ConsensusReceiver, - PrimaryReceiver, PrimarySender, Storage as NarwhalStorage, fmt_id, init_consensus_channels, + init_primary_channels, }, spawn_blocking, }; @@ -55,10 +55,7 @@ use lru::LruCache; #[cfg(not(feature = "locktick"))] use parking_lot::Mutex; use std::{future::Future, net::SocketAddr, num::NonZeroUsize, sync::Arc, time::Duration}; -use tokio::{ - sync::{OnceCell, oneshot}, - task::JoinHandle, -}; +use tokio::{sync::oneshot, task::JoinHandle}; #[cfg(feature = "metrics")] use std::collections::HashMap; @@ -91,6 +88,12 @@ impl Default for TransactionsQueue { } } +/// Wrapper around `BFT` that adds additional functionality, such as a mempool. +/// +/// Consensus acts as a rate limiter to prevents workers in BFT from being overloaded. +/// Each worker maintains a ready queue (which is essentially also a mempool), but verifies transactions/solutions +/// before enquing them. +/// Consensus only passes more transactions/solutions to the BFT layer if its ready queues are not already full. #[derive(Clone)] pub struct Consensus { /// The ledger. @@ -98,7 +101,7 @@ pub struct Consensus { /// The BFT. bft: BFT, /// The primary sender. - primary_sender: Arc>>, + primary_sender: PrimarySender, /// The unconfirmed solutions queue. solutions_queue: Arc, Solution>>>, /// The unconfirmed transactions queue. @@ -109,13 +112,13 @@ pub struct Consensus { seen_transactions: Arc>>, #[cfg(feature = "metrics")] transmissions_queue_timestamps: Arc, i64>>>, - /// The spawned handles. + /// The handles of all spawned tasks. handles: Arc>>>, } impl Consensus { - /// Initializes a new instance of consensus. - pub fn new( + /// Initializes a new instance of consensus and spawn its background tasks. + pub async fn new( account: Account, ledger: Arc>, block_sync: Arc>, @@ -123,17 +126,19 @@ impl Consensus { trusted_validators: &[SocketAddr], storage_mode: StorageMode, ) -> Result { + // Initialize the primary channels. + let (primary_sender, primary_receiver) = init_primary_channels::(); // Initialize the Narwhal transmissions. let transmissions = Arc::new(BFTPersistentStorage::open(storage_mode.clone())?); // Initialize the Narwhal storage. let storage = NarwhalStorage::new(ledger.clone(), transmissions, BatchHeader::::MAX_GC_ROUNDS as u64); // Initialize the BFT. let bft = BFT::new(account, storage, ledger.clone(), block_sync, ip, trusted_validators, storage_mode)?; - // Return the consensus. - Ok(Self { + // Create a new instance of Consensus. + let mut _self = Self { ledger, bft, - primary_sender: Default::default(), + primary_sender, solutions_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_SOLUTIONS).unwrap()))), transactions_queue: Default::default(), seen_solutions: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(1 << 16).unwrap()))), @@ -141,52 +146,38 @@ impl Consensus { #[cfg(feature = "metrics")] transmissions_queue_timestamps: Default::default(), handles: Default::default(), - }) - } + }; - /// Run the consensus instance. - pub async fn run(&mut self, primary_sender: PrimarySender, primary_receiver: PrimaryReceiver) -> Result<()> { info!("Starting the consensus instance..."); - // Set the primary sender. - self.primary_sender.set(primary_sender.clone()).expect("Primary sender already set"); // First, initialize the consensus channels. let (consensus_sender, consensus_receiver) = init_consensus_channels(); // Then, start the consensus handlers. - self.start_handlers(consensus_receiver); - // Lastly, the consensus. - self.bft.run(Some(consensus_sender), primary_sender, primary_receiver).await?; - Ok(()) - } + _self.start_handlers(consensus_receiver); + // Lastly, also start BFTs handlers. + _self.bft.run(Some(consensus_sender), _self.primary_sender.clone(), primary_receiver).await?; - /// Returns the ledger. - pub const fn ledger(&self) -> &Arc> { - &self.ledger + Ok(_self) } - /// Returns the BFT. + /// Returns the underlying `BFT` struct. pub const fn bft(&self) -> &BFT { &self.bft } - - /// Returns the primary sender. - pub fn primary_sender(&self) -> &PrimarySender { - self.primary_sender.get().expect("Primary sender not set") - } } impl Consensus { - /// Returns the number of unconfirmed transmissions. + /// Returns the number of unconfirmed transmissions in the BFT's workers (not in the mempool). pub fn num_unconfirmed_transmissions(&self) -> usize { self.bft.num_unconfirmed_transmissions() } - /// Returns the number of unconfirmed ratifications. + /// Returns the number of unconfirmed ratifications in the BFT's workers (not in the mempool). pub fn num_unconfirmed_ratifications(&self) -> usize { self.bft.num_unconfirmed_ratifications() } - /// Returns the number of solutions. + /// Returns the number unconfirmed solutions in the BFT's workers (not in the mempool). pub fn num_unconfirmed_solutions(&self) -> usize { self.bft.num_unconfirmed_solutions() } @@ -285,7 +276,8 @@ impl Consensus { } impl Consensus { - /// Adds the given unconfirmed solution to the memory pool. + /// Adds the given unconfirmed solution to the memory pool, which will then eventually be passed + /// to the BFT layer for inclusion in a batch. pub async fn add_unconfirmed_solution(&self, solution: Solution) -> Result<()> { // Calculate the transmission checksum. let checksum = Data::>::Buffer(solution.to_bytes_le()?.into()).to_checksum::()?; @@ -321,8 +313,9 @@ impl Consensus { self.process_unconfirmed_solutions().await } - /// Processes unconfirmed transactions in the memory pool. - pub async fn process_unconfirmed_solutions(&self) -> Result<()> { + /// Processes unconfirmed solutions in the mempool, and passes them to the BFT layer + /// (if sufficient space is available). + async fn process_unconfirmed_solutions(&self) -> Result<()> { // If the memory pool of this node is full, return early. let num_unconfirmed_solutions = self.num_unconfirmed_solutions(); let num_unconfirmed_transmissions = self.num_unconfirmed_transmissions(); @@ -347,11 +340,11 @@ impl Consensus { let solution_id = solution.id(); trace!("Adding unconfirmed solution '{}' to the memory pool...", fmt_id(solution_id)); // Send the unconfirmed solution to the primary. - if let Err(e) = self.primary_sender().send_unconfirmed_solution(solution_id, Data::Object(solution)).await { + if let Err(e) = self.primary_sender.send_unconfirmed_solution(solution_id, Data::Object(solution)).await { // If the BFT is synced, then log the warning. if self.bft.is_synced() { // If error occurs after the first 10 blocks of the epoch, log it as a warning, otherwise ignore. - if self.ledger().latest_block_height() % N::NUM_BLOCKS_PER_EPOCH > 10 { + if self.ledger.latest_block_height() % N::NUM_BLOCKS_PER_EPOCH > 10 { warn!("Failed to add unconfirmed solution '{}' to the memory pool - {e}", fmt_id(solution_id)) }; } @@ -360,7 +353,8 @@ impl Consensus { Ok(()) } - /// Adds the given unconfirmed transaction to the memory pool. + /// Adds the given unconfirmed transaction to the memory pool, which will then eventually be passed + /// to the BFT layer for inclusion in a batch. pub async fn add_unconfirmed_transaction(&self, transaction: Transaction) -> Result<()> { // Calculate the transmission checksum. let checksum = Data::>::Buffer(transaction.to_bytes_le()?.into()).to_checksum::()?; @@ -404,8 +398,9 @@ impl Consensus { } } - /// Processes unconfirmed transactions in the memory pool. - pub async fn process_unconfirmed_transactions(&self) -> Result<()> { + /// Processes unconfirmed transctions in the mempool, and passes them to the BFT layer + /// (if sufficient space is available). + async fn process_unconfirmed_transactions(&self) -> Result<()> { // If the memory pool of this node is full, return early. let num_unconfirmed_transmissions = self.num_unconfirmed_transmissions(); if num_unconfirmed_transmissions >= Primary::::MAX_TRANSMISSIONS_TOLERANCE { @@ -441,7 +436,7 @@ impl Consensus { trace!("Adding unconfirmed transaction '{}' to the memory pool...", fmt_id(transaction_id)); // Send the unconfirmed transaction to the primary. if let Err(e) = - self.primary_sender().send_unconfirmed_transaction(transaction_id, Data::Object(transaction)).await + self.primary_sender.send_unconfirmed_transaction(transaction_id, Data::Object(transaction)).await { // If the BFT is synced, then log the warning. if self.bft.is_synced() { @@ -458,6 +453,8 @@ impl Consensus { impl Consensus { /// Starts the consensus handlers. + /// + /// This is only invoked once, in the constructor. fn start_handlers(&self, consensus_receiver: ConsensusReceiver) { let ConsensusReceiver { mut rx_consensus_subdag } = consensus_receiver; @@ -470,6 +467,9 @@ impl Consensus { }); // Process the unconfirmed transactions in the memory pool. + // + // TODO (kaimast): This shouldn't happen periodically but only when new batches/blocks are accepted + // by the BFT layer, after which the worker's ready queue may have capacity for more transactions/solutions. let self_ = self.clone(); self.spawn(async move { loop { @@ -487,7 +487,7 @@ impl Consensus { }); } - /// Processes the committed subdag and transmissions from the BFT. + /// Attempts to build a new block from the given subDAG, and (tries to) advance the legder to it. async fn process_bft_subdag( &self, subdag: Subdag, @@ -510,7 +510,7 @@ impl Consensus { callback.send(result).ok(); } - /// Attempts to advance to the next block. + /// Attempts to advance the ledger to the next block, and upadtes the metrics (if enabled) accordingly. fn try_advance_to_next_block( &self, subdag: Subdag, @@ -538,6 +538,9 @@ impl Consensus { self.bft.primary().clear_worker_solutions(); } + // TODO(kaimast): This should also remove any transmissions/solutions contained in the block from the mempool. + // Removal currently happens when Consensus eventually passes them to the worker, which then just discards them. + #[cfg(feature = "metrics")] { let elapsed = std::time::Duration::from_secs((snarkos_node_bft::helpers::now() - start) as u64); @@ -587,11 +590,11 @@ impl Consensus { (TransmissionID::Ratification, Transmission::Ratification) => return Ok(()), (TransmissionID::Solution(solution_id, _), Transmission::Solution(solution)) => { // Send the solution to the primary. - self.primary_sender().tx_unconfirmed_solution.send((solution_id, solution, callback)).await?; + self.primary_sender.tx_unconfirmed_solution.send((solution_id, solution, callback)).await?; } (TransmissionID::Transaction(transaction_id, _), Transmission::Transaction(transaction)) => { // Send the transaction to the primary. - self.primary_sender().tx_unconfirmed_transaction.send((transaction_id, transaction, callback)).await?; + self.primary_sender.tx_unconfirmed_transaction.send((transaction_id, transaction, callback)).await?; } _ => bail!("Mismatching `(transmission_id, transmission)` pair in consensus"), } @@ -604,7 +607,7 @@ impl Consensus { self.handles.lock().push(tokio::spawn(future)); } - /// Shuts down the BFT. + /// Shuts down the consensus and BFT layers. pub async fn shut_down(&self) { info!("Shutting down consensus..."); // Shut down the BFT. diff --git a/node/src/validator/mod.rs b/node/src/validator/mod.rs index b010b3c5c4..bd829655a0 100644 --- a/node/src/validator/mod.rs +++ b/node/src/validator/mod.rs @@ -17,7 +17,7 @@ mod router; use crate::traits::NodeInterface; use snarkos_account::Account; -use snarkos_node_bft::{helpers::init_primary_channels, ledger_service::CoreLedgerService, spawn_blocking}; +use snarkos_node_bft::{ledger_service::CoreLedgerService, spawn_blocking}; use snarkos_node_consensus::Consensus; use snarkos_node_rest::Rest; use snarkos_node_router::{ @@ -131,18 +131,16 @@ impl> Validator { let sync = Arc::new(BlockSync::new(ledger_service.clone())); // Initialize the consensus layer. - let mut consensus = Consensus::new( + let consensus = Consensus::new( account.clone(), ledger_service.clone(), sync.clone(), bft_ip, trusted_validators, storage_mode.clone(), - )?; - // Initialize the primary channels. - let (primary_sender, primary_receiver) = init_primary_channels::(); - // Start the consensus. - consensus.run(primary_sender, primary_receiver).await?; + ) + .await?; + // Initialize the node. let mut node = Self { ledger: ledger.clone(),