Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
777 changes: 402 additions & 375 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 1 addition & 5 deletions src/consensus/parlia/gas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,7 @@ pub fn validate_gas_limit(
return Err("gas_limit below minimum");
}

let diff = if parent_gas_limit > gas_limit {
parent_gas_limit - gas_limit
} else {
gas_limit - parent_gas_limit
};
let diff = parent_gas_limit.abs_diff(gas_limit);

if diff >= delta {
return Err("gas_limit change exceeds bound");
Expand Down
4 changes: 3 additions & 1 deletion src/consensus/parlia/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ pub mod snapshot;
pub mod provider;
pub mod constants;
pub mod gas;
pub mod vote_pool;
pub mod consensus;
pub mod util;
pub mod error;
pub mod consensus;
pub mod validation;
pub mod db;
pub mod seal;
Expand All @@ -17,6 +18,7 @@ pub use constants::*;
pub use error::ParliaConsensusError;
pub use util::hash_with_chain_id;
pub use provider::SnapshotProvider;
pub use vote_pool as votes;
pub use consensus::Parlia;

/// Epoch length.
Expand Down
2 changes: 1 addition & 1 deletion src/consensus/parlia/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ impl<DB: Database + 'static> SnapshotProvider for EnhancedDbSnapshotProvider<DB>
tracing::debug!("Start to apply headers to base snapshot, base_snapshot: {:?}, target_snapshot: {}, apply_length: {}",
working_snapshot.block_number, block_number, headers_to_apply.len());

for (_index, header) in headers_to_apply.iter().enumerate() {
for header in headers_to_apply.iter() {
let epoch_remainder = header.number % working_snapshot.epoch_num;
let miner_check_len = working_snapshot.miner_history_check_len();
let is_epoch_boundary = header.number > 0 && epoch_remainder == miner_check_len;
Expand Down
15 changes: 6 additions & 9 deletions src/consensus/parlia/seal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,13 @@ where
.ok_or_else(|| ConsensusError::Other("Snapshot not found".into()))?;

if !snap.validators.contains(&val) {
return Err(ConsensusError::Other(format!("Unauthorized validator: {val}").into()));
return Err(ConsensusError::Other(format!("Unauthorized validator: {val}")));
}

if snap.sign_recently(val) {
tracing::info!("Signed recently, must wait for others");
return Err(ConsensusError::Other(
format!("Signed recently, must wait for others, validator: {val}").into(),
));
format!("Signed recently, must wait for others, validator: {val}")));
}

let delay = self.delay_for_ramanujan_fork(&snap, header);
Expand Down Expand Up @@ -167,7 +166,7 @@ where
let votes: Vec<VoteEnvelope> = Vec::new();

let (justified_block_number, justified_block_hash) =
self.get_justified_number_and_hash(&parent).map_err(|e| e)?;
self.get_justified_number_and_hash(&parent)?;

let mut attestation = VoteAttestation::new_with_vote_data(VoteData {
source_hash: justified_block_hash,
Expand All @@ -182,9 +181,7 @@ where
format!(
"vote check error, expected: {:?}, real: {:?}",
attestation.data, vote.data,
)
.into(),
));
)));
}
}

Expand All @@ -200,7 +197,7 @@ where
.iter()
.map(|raw| {
blsSignature::from_bytes(raw.as_slice()).map_err(|e| {
ConsensusError::Other(format!("BLS sig decode error: {:?}", e).into())
ConsensusError::Other(format!("BLS sig decode error: {e:?}"))
})
})
.collect::<Result<_, _>>()?;
Expand Down Expand Up @@ -259,7 +256,7 @@ pub fn default_sign_fn(_addr: Address, _: &str, data: &[u8]) -> Result<Vec<u8>,
let hash = keccak256(data);
let private_key = &[0u8; 40]; // TODO get private key by addr
let signing_key = SigningKey::from_slice(private_key)
.map_err(|e| ConsensusError::Other(format!("invalid private key, e:{e}").into()))?;
.map_err(|e| ConsensusError::Other(format!("invalid private key, e:{e}")))?;
let sig_result: Signature = signing_key.sign(hash.as_slice());
Ok(sig_result.to_bytes().to_vec())
}
8 changes: 4 additions & 4 deletions src/consensus/parlia/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ impl Snapshot {
return None;
}
} else {
for (_, &v) in &snap.recent_proposers {
for &v in snap.recent_proposers.values() {
if v == validator {
tracing::warn!("Failed to apply block due to over-proposed, validator: {:?}, block_number: {:?}", validator, block_number);
return None;
Expand Down Expand Up @@ -211,11 +211,11 @@ impl Snapshot {
snap.recent_proposers = Default::default();
snap.recent_proposers.insert(epoch_key, Address::default());
} else {
let old_limit = (snap.validators.len() / 2 + 1) as usize;
let new_limit = (new_validators.len() / 2 + 1) as usize;
let old_limit = snap.validators.len() / 2 + 1;
let new_limit = new_validators.len() / 2 + 1;
if new_limit < old_limit {
for i in 0..(old_limit - new_limit) {
snap.recent_proposers.remove(&(block_number as u64 - new_limit as u64 - i as u64));
snap.recent_proposers.remove(&(block_number - new_limit as u64 - i as u64));
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/consensus/parlia/validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl<ChainSpec: EthChainSpec + BscHardforks + std::fmt::Debug + Send + Sync + 's
}

// Check extra data
self.check_header_extra(header).map_err(|e| ConsensusError::Other(format!("Invalid header extra: {}", e)))?;
self.check_header_extra(header).map_err(|e| ConsensusError::Other(format!("Invalid header extra: {e}")))?;

// Ensure that the mix digest is zero as we don't have fork protection currently
// mix_hash is millisecond timestamp after Lorentz/Maxwell.
Expand Down
56 changes: 56 additions & 0 deletions src/consensus/parlia/vote_pool.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
use once_cell::sync::Lazy;
use std::{collections::HashSet, sync::Mutex};

use alloy_primitives::B256;

use super::vote::VoteEnvelope;

/// Global in-memory pool of incoming Parlia votes.
///
/// This mirrors the simple approach used by the slashing pool: keep votes in
/// memory until they're consumed by another component. Votes are de-duplicated
/// by their RLP hash.
struct VotePool {
/// Hashes of votes we've already seen in this window.
seen_hashes: HashSet<B256>,
/// Collected votes (deduplicated by `seen_hashes`).
votes: Vec<VoteEnvelope>,
}

impl VotePool {
fn new() -> Self {
Self { seen_hashes: HashSet::new(), votes: Vec::new() }
}

fn insert(&mut self, vote: VoteEnvelope) {
let vote_hash = vote.hash();
if self.seen_hashes.insert(vote_hash) {
self.votes.push(vote);
}
}

fn drain(&mut self) -> Vec<VoteEnvelope> {
self.seen_hashes.clear();
std::mem::take(&mut self.votes)
}

fn len(&self) -> usize { self.votes.len() }
}

/// Global singleton pool.
static VOTE_POOL: Lazy<Mutex<VotePool>> = Lazy::new(|| Mutex::new(VotePool::new()));

/// Insert a single vote into the pool (deduplicated by hash).
pub fn put_vote(vote: VoteEnvelope) {
VOTE_POOL.lock().expect("vote pool poisoned").insert(vote);
}

/// Drain all pending votes.
pub fn drain() -> Vec<VoteEnvelope> {
VOTE_POOL.lock().expect("vote pool poisoned").drain()
}

/// Current number of queued votes.
pub fn len() -> usize { VOTE_POOL.lock().expect("vote pool poisoned").len() }


4 changes: 2 additions & 2 deletions src/node/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,15 @@ where
async fn build_consensus(self, ctx: &BuilderContext<Node>) -> eyre::Result<Self::Consensus> {
let snapshot_provider = create_snapshot_provider(ctx)
.unwrap_or_else(|e| {
panic!("Failed to initialize snapshot provider, due to {}", e);
panic!("Failed to initialize snapshot provider, due to {e}");
});

crate::shared::set_snapshot_provider(
snapshot_provider as Arc<dyn crate::consensus::parlia::SnapshotProvider + Send + Sync>,
).unwrap_or_else(|_| panic!("Failed to set global snapshot provider"));

crate::shared::set_header_provider(Arc::new(ctx.provider().clone()))
.unwrap_or_else(|e| panic!("Failed to set global header provider: {}", e));
.unwrap_or_else(|e| panic!("Failed to set global header provider: {e}"));

Ok(Arc::new(BscConsensus::new(ctx.chain_spec())))
}
Expand Down
3 changes: 1 addition & 2 deletions src/node/engine_api/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use super::payload::BscPayloadTypes;
use crate::{chainspec::BscChainSpec, hardforks::BscHardforks, BscBlock, BscPrimitives};
use alloy_consensus::BlockHeader;
use alloy_eips::eip4895::Withdrawal;
use alloy_primitives::{Address, B256};
use alloy_primitives::{B256};
use alloy_rpc_types_engine::PayloadError;
use reth::{
api::{FullNodeComponents, NodeTypes},
Expand All @@ -14,7 +14,6 @@ use reth::{
},
consensus::ConsensusError,
};
use reth_chainspec::EthChainSpec;
use reth_engine_primitives::{ExecutionPayload, PayloadValidator};
use reth_payload_primitives::NewPayloadError;
use reth_primitives::{RecoveredBlock, SealedBlock};
Expand Down
4 changes: 2 additions & 2 deletions src/node/evm/post_execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ where
as_ref().
unwrap().
get_header(target_number)
.ok_or_else(|| BlockExecutionError::msg(format!("Header not found for block number: {}", target_number)))?;
.ok_or_else(|| BlockExecutionError::msg(format!("Header not found for block number: {target_number}")))?;
let snap = self.snapshot_provider.
as_ref().
unwrap().
Expand Down Expand Up @@ -428,7 +428,7 @@ where
}
}

let quorum = (validators.len() * 2 + 2) / 3; // ceil div
let quorum = (validators.len() * 2).div_ceil(3); // ceil div
if valid_vote_count > quorum {
let reward =
((valid_vote_count - quorum) * COLLECT_ADDITIONAL_VOTES_REWARD_RATIO) / 100;
Expand Down
8 changes: 5 additions & 3 deletions src/node/evm/pre_execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ use bit_set::BitSet;

const BLST_DST: &[u8] = b"BLS_SIG_BLS12381G2_XMD:SHA-256_SSWU_RO_POP_";

static VALIDATOR_CACHE: LazyLock<Mutex<LruMap<u64, (Vec<Address>, Vec<VoteAddress>), ByLength>>> = LazyLock::new(|| {
type ValidatorCache = LruMap<u64, (Vec<Address>, Vec<VoteAddress>), ByLength>;

static VALIDATOR_CACHE: LazyLock<Mutex<ValidatorCache>> = LazyLock::new(|| {
Mutex::new(LruMap::new(ByLength::new(1024)))
});

Expand Down Expand Up @@ -232,7 +234,7 @@ where
is_system_transaction: false,
};

let result_and_state = self.evm.transact(tx_env).map_err(|err| BlockExecutionError::other(err))?;
let result_and_state = self.evm.transact(tx_env).map_err(BlockExecutionError::other)?;
if !result_and_state.result.is_success() {
tracing::error!("Failed to eth call, to: {:?}, data: {:?}", to, data);
return Err(BlockExecutionError::msg("ETH call failed"));
Expand Down Expand Up @@ -364,7 +366,7 @@ where
}

// check if voted validator count satisfied 2/3 + 1
let at_least_votes = (validators_count * 2 + 2) / 3; // ceil division
let at_least_votes = (validators_count * 2).div_ceil(3); // ceil division
if vote_addrs.len() < at_least_votes {
return Err(BscBlockExecutionError::InvalidAttestationVoteCount(GotExpected {
got: vote_addrs.len() as u64,
Expand Down
53 changes: 53 additions & 0 deletions src/node/network/bsc_protocol/protocol/handler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
use reth_network_api::{PeerId, Direction};
use reth_network::protocol::{ConnectionHandler, OnNotSupported, ProtocolHandler};
use reth_eth_wire::{capability::SharedCapabilities, multiplex::ProtocolConnection, protocol::Protocol};
use std::net::SocketAddr;
use tokio::sync::mpsc;

use super::proto::{BscProtoMessage};
use crate::node::network::bsc_protocol::stream::{BscProtocolConnection};

#[derive(Clone, Debug, Default)]
pub struct BscProtocolHandler;

#[derive(Clone, Debug)]
pub struct BscConnectionHandler;

impl ProtocolHandler for BscProtocolHandler {
type ConnectionHandler = BscConnectionHandler;

fn on_incoming(&self, _socket_addr: SocketAddr) -> Option<Self::ConnectionHandler> {
Some(BscConnectionHandler)
}

fn on_outgoing(&self, _socket_addr: SocketAddr, _peer_id: PeerId) -> Option<Self::ConnectionHandler> {
Some(BscConnectionHandler)
}
}

impl ConnectionHandler for BscConnectionHandler {
type Connection = BscProtocolConnection;

fn protocol(&self) -> Protocol { BscProtoMessage::protocol() }

fn on_unsupported_by_peer(
self,
_supported: &SharedCapabilities,
_direction: reth_network_api::Direction,
_peer_id: PeerId,
) -> OnNotSupported {
OnNotSupported::KeepAlive
}

fn into_connection(
self,
direction: Direction,
_peer_id: PeerId,
conn: ProtocolConnection,
) -> Self::Connection {
let (_tx, rx) = mpsc::unbounded_channel();
BscProtocolConnection::new(conn, rx, direction.is_outgoing())
}
}


Loading
Loading