Skip to content

Commit c7232f5

Browse files
authored
Merge pull request #3543 from kaimast/deduplicate-blocksync
Deduplicate blocksync
2 parents d1b54a9 + 3c75ef7 commit c7232f5

File tree

21 files changed

+487
-453
lines changed

21 files changed

+487
-453
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

node/bft/examples/simple_node.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use snarkos_node_bft::{
2626
};
2727
use snarkos_node_bft_ledger_service::TranslucentLedgerService;
2828
use snarkos_node_bft_storage_service::BFTMemoryService;
29+
use snarkos_node_sync::BlockSync;
2930
use snarkvm::{
3031
console::{account::PrivateKey, algorithms::BHP256, types::Address},
3132
ledger::{
@@ -142,7 +143,9 @@ pub async fn start_bft(
142143
// Initialize the consensus receiver handler.
143144
consensus_handler(consensus_receiver);
144145
// Initialize the BFT instance.
145-
let mut bft = BFT::<CurrentNetwork>::new(account, storage, ledger, ip, &trusted_validators, storage_mode)?;
146+
let block_sync = Arc::new(BlockSync::new(ledger.clone()));
147+
let mut bft =
148+
BFT::<CurrentNetwork>::new(account, storage, ledger, block_sync, ip, &trusted_validators, storage_mode)?;
146149
// Run the BFT instance.
147150
bft.run(Some(consensus_sender), sender.clone(), receiver).await?;
148151
// Retrieve the BFT's primary.
@@ -180,7 +183,9 @@ pub async fn start_primary(
180183
// Initialize the trusted validators.
181184
let trusted_validators = trusted_validators(node_id, num_nodes, peers);
182185
// Initialize the primary instance.
183-
let mut primary = Primary::<CurrentNetwork>::new(account, storage, ledger, ip, &trusted_validators, storage_mode)?;
186+
let block_sync = Arc::new(BlockSync::new(ledger.clone()));
187+
let mut primary =
188+
Primary::<CurrentNetwork>::new(account, storage, ledger, block_sync, ip, &trusted_validators, storage_mode)?;
184189
// Run the primary instance.
185190
primary.run(None, sender.clone(), receiver).await?;
186191
// Handle OS signals.

node/bft/src/bft.rs

Lines changed: 44 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use crate::{
3030
};
3131
use snarkos_account::Account;
3232
use snarkos_node_bft_ledger_service::LedgerService;
33+
use snarkos_node_sync::BlockSync;
3334
use snarkvm::{
3435
console::account::Address,
3536
ledger::{
@@ -69,9 +70,9 @@ use tokio::{
6970

7071
#[derive(Clone)]
7172
pub struct BFT<N: Network> {
72-
/// The primary.
73+
/// The primary for this node.
7374
primary: Primary<N>,
74-
/// The DAG.
75+
/// The DAG of batches from which we build the blockchain.
7576
dag: Arc<RwLock<DAG<N>>>,
7677
/// The batch certificate of the leader from the current even round, if one was present.
7778
leader_certificate: Arc<RwLock<Option<BatchCertificate<N>>>>,
@@ -91,12 +92,13 @@ impl<N: Network> BFT<N> {
9192
account: Account<N>,
9293
storage: Storage<N>,
9394
ledger: Arc<dyn LedgerService<N>>,
95+
block_sync: Arc<BlockSync<N>>,
9496
ip: Option<SocketAddr>,
9597
trusted_validators: &[SocketAddr],
9698
storage_mode: StorageMode,
9799
) -> Result<Self> {
98100
Ok(Self {
99-
primary: Primary::new(account, storage, ledger, ip, trusted_validators, storage_mode)?,
101+
primary: Primary::new(account, storage, ledger, block_sync, ip, trusted_validators, storage_mode)?,
100102
dag: Default::default(),
101103
leader_certificate: Default::default(),
102104
leader_certificate_timer: Default::default(),
@@ -931,6 +933,7 @@ mod tests {
931933
use snarkos_account::Account;
932934
use snarkos_node_bft_ledger_service::MockLedgerService;
933935
use snarkos_node_bft_storage_service::BFTMemoryService;
936+
use snarkos_node_sync::BlockSync;
934937
use snarkvm::{
935938
console::account::{Address, PrivateKey},
936939
ledger::{
@@ -970,6 +973,18 @@ mod tests {
970973
(committee, account, ledger, storage)
971974
}
972975

976+
// Helper function to set up BFT for testing.
977+
fn initialize_bft(
978+
account: Account<CurrentNetwork>,
979+
storage: Storage<CurrentNetwork>,
980+
ledger: Arc<MockLedgerService<CurrentNetwork>>,
981+
) -> anyhow::Result<BFT<CurrentNetwork>> {
982+
// Create the block synchronization logic.
983+
let block_sync = Arc::new(BlockSync::new(ledger.clone()));
984+
// Initialize the BFT.
985+
BFT::new(account.clone(), storage.clone(), ledger.clone(), block_sync, None, &[], StorageMode::new_test(None))
986+
}
987+
973988
#[test]
974989
#[tracing_test::traced_test]
975990
fn test_is_leader_quorum_odd() -> Result<()> {
@@ -1001,7 +1016,7 @@ mod tests {
10011016
// Initialize the account.
10021017
let account = Account::new(rng)?;
10031018
// Initialize the BFT.
1004-
let bft = BFT::new(account.clone(), storage.clone(), ledger.clone(), None, &[], StorageMode::new_test(None))?;
1019+
let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
10051020
assert!(bft.is_timer_expired());
10061021
// Ensure this call succeeds on an odd round.
10071022
let result = bft.is_leader_quorum_or_nonleaders_available(1);
@@ -1035,8 +1050,8 @@ mod tests {
10351050
assert_eq!(storage.current_round(), 1);
10361051
assert_eq!(storage.max_gc_rounds(), 10);
10371052

1038-
// Initialize the BFT.
1039-
let bft = BFT::new(account, storage, ledger, None, &[], StorageMode::new_test(None))?;
1053+
// Set up the BFT logic.
1054+
let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
10401055
assert!(bft.is_timer_expired()); // 0 + 5 < now()
10411056

10421057
// Store is at round 1, and we are checking for round 2.
@@ -1057,8 +1072,8 @@ mod tests {
10571072
assert_eq!(storage.current_round(), 2);
10581073
assert_eq!(storage.max_gc_rounds(), 10);
10591074

1060-
// Initialize the BFT.
1061-
let bft = BFT::new(account, storage, ledger, None, &[], StorageMode::new_test(None))?;
1075+
// Set up the BFT logic.
1076+
let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
10621077
assert!(bft.is_timer_expired()); // 0 + 5 < now()
10631078

10641079
// Ensure this call fails on an even round.
@@ -1097,8 +1112,11 @@ mod tests {
10971112
let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10);
10981113
// Initialize the account.
10991114
let account = Account::new(rng)?;
1100-
// Initialize the BFT.
1101-
let bft = BFT::new(account.clone(), storage.clone(), ledger.clone(), None, &[], StorageMode::new_test(None))?;
1115+
1116+
// Set up the BFT logic.
1117+
let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1118+
assert!(bft.is_timer_expired()); // 0 + 5 < now()
1119+
11021120
// Set the leader certificate.
11031121
let leader_certificate = sample_batch_certificate_for_round(2, rng);
11041122
*bft.leader_certificate.write() = Some(leader_certificate);
@@ -1110,8 +1128,7 @@ mod tests {
11101128
assert!(result);
11111129

11121130
// Initialize a new BFT.
1113-
let bft_timer =
1114-
BFT::new(account.clone(), storage.clone(), ledger.clone(), None, &[], StorageMode::new_test(None))?;
1131+
let bft_timer = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
11151132
// If the leader certificate is not set and the timer has not expired, we are not ready for the next round.
11161133
let result = bft_timer.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
11171134
if !bft_timer.is_timer_expired() {
@@ -1142,7 +1159,8 @@ mod tests {
11421159
assert_eq!(storage.max_gc_rounds(), 10);
11431160

11441161
// Initialize the BFT.
1145-
let bft = BFT::new(account, storage, ledger, None, &[], StorageMode::new_test(None))?;
1162+
let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1163+
assert!(bft.is_timer_expired()); // 0 + 5 < now()
11461164

11471165
// Ensure this call fails on an odd round.
11481166
let result = bft.update_leader_certificate_to_even_round(1);
@@ -1160,7 +1178,7 @@ mod tests {
11601178
assert_eq!(storage.max_gc_rounds(), 10);
11611179

11621180
// Initialize the BFT.
1163-
let bft = BFT::new(account, storage, ledger, None, &[], StorageMode::new_test(None))?;
1181+
let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
11641182

11651183
// Ensure this call succeeds on an even round.
11661184
let result = bft.update_leader_certificate_to_even_round(6);
@@ -1212,7 +1230,7 @@ mod tests {
12121230

12131231
// Initialize the BFT.
12141232
let account = Account::new(rng)?;
1215-
let bft = BFT::new(account, storage.clone(), ledger, None, &[], StorageMode::new_test(None))?;
1233+
let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
12161234

12171235
// Set the leader certificate.
12181236
*bft.leader_certificate.write() = Some(leader_certificate);
@@ -1250,7 +1268,7 @@ mod tests {
12501268
// Initialize the storage.
12511269
let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
12521270
// Initialize the BFT.
1253-
let bft = BFT::new(account.clone(), storage, ledger.clone(), None, &[], StorageMode::new_test(None))?;
1271+
let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
12541272

12551273
// Insert a mock DAG in the BFT.
12561274
*bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(3);
@@ -1280,7 +1298,7 @@ mod tests {
12801298
// Initialize the storage.
12811299
let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
12821300
// Initialize the BFT.
1283-
let bft = BFT::new(account, storage, ledger, None, &[], StorageMode::new_test(None))?;
1301+
let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
12841302

12851303
// Insert a mock DAG in the BFT.
12861304
*bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(2);
@@ -1338,7 +1356,7 @@ mod tests {
13381356
/* Test missing previous certificate. */
13391357

13401358
// Initialize the BFT.
1341-
let bft = BFT::new(account, storage, ledger, None, &[], StorageMode::new_test(None))?;
1359+
let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
13421360

13431361
// The expected error message.
13441362
let error_msg = format!(
@@ -1399,8 +1417,8 @@ mod tests {
13991417

14001418
// Initialize the BFT.
14011419
let account = Account::new(rng)?;
1402-
let bft = BFT::new(account, storage.clone(), ledger, None, &[], StorageMode::new_test(None))?;
1403-
// Insert a mock DAG in the BFT.
1420+
let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1421+
14041422
*bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(commit_round);
14051423

14061424
// Ensure that the `gc_round` has not been updated yet.
@@ -1465,7 +1483,7 @@ mod tests {
14651483

14661484
// Initialize the BFT.
14671485
let account = Account::new(rng)?;
1468-
let bft = BFT::new(account.clone(), storage, ledger.clone(), None, &[], StorageMode::new_test(None))?;
1486+
let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
14691487

14701488
// Insert a mock DAG in the BFT.
14711489
*bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(commit_round);
@@ -1483,7 +1501,7 @@ mod tests {
14831501
// Initialize a new instance of storage.
14841502
let storage_2 = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
14851503
// Initialize a new instance of BFT.
1486-
let bootup_bft = BFT::new(account, storage_2, ledger, None, &[], StorageMode::new_test(None))?;
1504+
let bootup_bft = initialize_bft(account.clone(), storage_2, ledger)?;
14871505

14881506
// Sync the BFT DAG at bootup.
14891507
bootup_bft.sync_bft_dag_at_bootup(certificates.clone()).await;
@@ -1637,7 +1655,7 @@ mod tests {
16371655

16381656
// Initialize the BFT without bootup.
16391657
let account = Account::new(rng)?;
1640-
let bft = BFT::new(account.clone(), storage, ledger.clone(), None, &[], StorageMode::new_test(None))?;
1658+
let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
16411659

16421660
// Insert a mock DAG in the BFT without bootup.
16431661
*bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(0);
@@ -1662,8 +1680,7 @@ mod tests {
16621680
let bootup_storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
16631681

16641682
// Initialize a new instance of BFT with bootup.
1665-
let bootup_bft =
1666-
BFT::new(account, bootup_storage.clone(), ledger.clone(), None, &[], StorageMode::new_test(None))?;
1683+
let bootup_bft = initialize_bft(account.clone(), bootup_storage.clone(), ledger.clone())?;
16671684

16681685
// Sync the BFT DAG at bootup.
16691686
bootup_bft.sync_bft_dag_at_bootup(pre_shutdown_certificates.clone()).await;
@@ -1841,8 +1858,8 @@ mod tests {
18411858
}
18421859
// Initialize the bootup BFT.
18431860
let account = Account::new(rng)?;
1844-
let bootup_bft =
1845-
BFT::new(account.clone(), storage.clone(), ledger.clone(), None, &[], StorageMode::new_test(None))?;
1861+
let bootup_bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1862+
18461863
// Insert a mock DAG in the BFT without bootup.
18471864
*bootup_bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(0);
18481865
// Sync the BFT DAG at bootup.

node/bft/src/gateway.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,8 @@ pub trait Transport<N: Network>: Send + Sync {
110110
fn broadcast(&self, event: Event<N>);
111111
}
112112

113+
/// The gateway maintains connections to other validators.
114+
/// For connections with clients and provers, the Router logic is used.
113115
#[derive(Clone)]
114116
pub struct Gateway<N: Network> {
115117
/// The account of the node.
@@ -514,9 +516,8 @@ impl<N: Network> Gateway<N> {
514516
self.update_metrics();
515517
}
516518

517-
/// Inserts the given peer into the connected peers.
519+
/// Inserts the given peer into the connected peers. This is only used in testing.
518520
#[cfg(test)]
519-
// For unit tests, we need to make this public so we can inject peers.
520521
pub fn insert_connected_peer(&self, peer_ip: SocketAddr, peer_addr: SocketAddr, address: Address<N>) {
521522
// Adds a bidirectional map between the listener address and (ambiguous) peer address.
522523
self.resolver.insert_peer(peer_ip, peer_addr, address);

node/bft/src/helpers/channels.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ pub fn init_consensus_channels<N: Network>() -> (ConsensusSender<N>, ConsensusRe
6060
(sender, receiver)
6161
}
6262

63+
/// "Interface" that enables, for example, sending data from storage to the the BFT logic.
6364
#[derive(Clone, Debug)]
6465
pub struct BFTSender<N: Network> {
6566
pub tx_primary_round: mpsc::Sender<(u64, oneshot::Sender<bool>)>,
@@ -100,6 +101,7 @@ impl<N: Network> BFTSender<N> {
100101
}
101102
}
102103

104+
/// Receiving counterpart to `BFTSender`
103105
#[derive(Debug)]
104106
pub struct BFTReceiver<N: Network> {
105107
pub rx_primary_round: mpsc::Receiver<(u64, oneshot::Sender<bool>)>,
@@ -108,7 +110,7 @@ pub struct BFTReceiver<N: Network> {
108110
pub rx_sync_bft: mpsc::Receiver<(BatchCertificate<N>, oneshot::Sender<Result<()>>)>,
109111
}
110112

111-
/// Initializes the BFT channels.
113+
/// Initializes the BFT channels, and returns the sending and receiving ends.
112114
pub fn init_bft_channels<N: Network>() -> (BFTSender<N>, BFTReceiver<N>) {
113115
let (tx_primary_round, rx_primary_round) = mpsc::channel(MAX_CHANNEL_SIZE);
114116
let (tx_primary_certificate, rx_primary_certificate) = mpsc::channel(MAX_CHANNEL_SIZE);

node/bft/src/helpers/dag.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,12 @@ use snarkvm::{
2222
use indexmap::IndexSet;
2323
use std::collections::{BTreeMap, HashMap};
2424

25+
/// Maintains an directed acyclic graph (DAG) of batches, from which we build a totally-ordered blockchain.
26+
/// The DAG is updated in rounds, where each validator adds at most one new batch.
2527
#[derive(Debug)]
2628
pub struct DAG<N: Network> {
2729
/// The in-memory collection of certificates that comprise the DAG.
30+
/// For each round, there is a mapping from node address to batch.
2831
graph: BTreeMap<u64, HashMap<Address<N>, BatchCertificate<N>>>,
2932
/// The in-memory collection of recently committed certificate IDs (up to GC).
3033
recent_committed_ids: BTreeMap<u64, IndexSet<Field<N>>>,

node/bft/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,10 +61,10 @@ pub const MAX_TIMESTAMP_DELTA_IN_SECS: i64 = 10; // seconds
6161
/// The maximum number of workers that can be spawned.
6262
pub const MAX_WORKERS: u8 = 1; // worker(s)
6363

64-
/// The frequency at which each primary broadcasts a ping to every other node.
64+
/// The interval at which each primary broadcasts a ping to every other node.
6565
/// Note: If this is updated, be sure to update `MAX_BLOCKS_BEHIND` to correspond properly.
6666
pub const PRIMARY_PING_IN_MS: u64 = 2 * MAX_BATCH_DELAY_IN_MS; // ms
67-
/// The frequency at which each worker broadcasts a ping to every other node.
67+
/// The interval at which each worker broadcasts a ping to every other node.
6868
pub const WORKER_PING_IN_MS: u64 = 4 * MAX_BATCH_DELAY_IN_MS; // ms
6969

7070
/// A helper macro to spawn a blocking task.

0 commit comments

Comments
 (0)