Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ members = [
"crates/tagged",
"crates/tagged-debug-derive",
"crates/util",
"crates/validator-fetcher",
"crates/version",
]
exclude = [
Expand Down
23 changes: 7 additions & 16 deletions crates/consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ impl<V: ValuePayload + 'static, A: ValidatorAddress + 'static> Consensus<V, A> {
pub fn recover<P: ValidatorSetProvider<A> + 'static>(
config: Config<A>,
validator_sets: Arc<P>,
) -> Self {
) -> anyhow::Result<Self> {
use crate::wal::recovery;

tracing::info!(
Expand Down Expand Up @@ -360,7 +360,7 @@ impl<V: ValuePayload + 'static, A: ValidatorAddress + 'static> Consensus<V, A> {
"Recovering height from WAL"
);

let validator_set = validator_sets.get_validator_set(height);
let validator_set = validator_sets.get_validator_set(height)?;
let mut internal_consensus = consensus.create_consensus(height, &validator_set);
internal_consensus.handle_command(ConsensusCommand::StartHeight(height, validator_set));
internal_consensus.recover_from_wal(entries);
Expand All @@ -373,7 +373,7 @@ impl<V: ValuePayload + 'static, A: ValidatorAddress + 'static> Consensus<V, A> {
"Completed consensus recovery"
);

consensus
Ok(consensus)
}

fn create_consensus(
Expand Down Expand Up @@ -958,17 +958,8 @@ impl<V: Debug, A: Debug> std::fmt::Debug for ConsensusEvent<V, A> {
/// that are eligible to participate in consensus at any given height.
///
/// This is useful for handling validator set changes across heights.
pub trait ValidatorSetProvider<A>: Send + Sync {
/// Get the validator set for the given height.
///
/// ## Arguments
///
/// - `height`: The blockchain height
///
/// ## Returns
///
/// Returns the validator set for the given height.
fn get_validator_set(&self, height: u64) -> ValidatorSet<A>;
pub trait ValidatorSetProvider<A> {
fn get_validator_set(&self, height: u64) -> Result<ValidatorSet<A>, anyhow::Error>;
}

/// A validator set provider that always returns the same validator set.
Expand All @@ -992,8 +983,8 @@ impl<A> StaticValidatorSetProvider<A> {
}

impl<A: Clone + Send + Sync> ValidatorSetProvider<A> for StaticValidatorSetProvider<A> {
fn get_validator_set(&self, _height: u64) -> ValidatorSet<A> {
self.validator_set.clone()
fn get_validator_set(&self, _height: u64) -> Result<ValidatorSet<A>, anyhow::Error> {
Ok(self.validator_set.clone())
}
}

Expand Down
9 changes: 6 additions & 3 deletions crates/consensus/tests/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,16 +244,19 @@ async fn recover_from_wal_restores_and_continues() {
#[derive(Clone)]
struct StaticSet(ValidatorSet<NodeAddress>);
impl ValidatorSetProvider<NodeAddress> for StaticSet {
fn get_validator_set(&self, _height: u64) -> ValidatorSet<NodeAddress> {
self.0.clone()
fn get_validator_set(
&self,
_height: u64,
) -> Result<ValidatorSet<NodeAddress>, anyhow::Error> {
Ok(self.0.clone())
}
}

debug!("---------------------- Recovering from WAL ----------------------");

// Now recover from WAL
let mut consensus: Consensus<ConsensusValue, NodeAddress> =
Consensus::recover(config.clone(), Arc::new(StaticSet(validators)));
Consensus::recover(config.clone(), Arc::new(StaticSet(validators))).unwrap();

debug!("------------ Driving consensus post WAL recovery ----------------");

Expand Down
1 change: 1 addition & 0 deletions crates/pathfinder/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ tracing-subscriber = { workspace = true, features = [
] }
url = { workspace = true }
util = { path = "../util" }
validator-fetcher = { path = "../validator-fetcher" }
zeroize = { workspace = true }
zstd = { workspace = true }

Expand Down
1 change: 1 addition & 0 deletions crates/pathfinder/src/consensus/inner.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mod consensus_task;
mod fetch_validators;
mod p2p_task;

use std::path::PathBuf;
Expand Down
38 changes: 11 additions & 27 deletions crates/pathfinder/src/consensus/inner/consensus_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,15 @@ use pathfinder_consensus::{
ConsensusEvent,
NetworkMessage,
Proposal,
PublicKey,
Round,
SignedVote,
SigningKey,
StaticValidatorSetProvider,
Validator,
ValidatorSet,
ValidatorSetProvider,
};
use pathfinder_storage::Storage;
use tokio::sync::{mpsc, watch};

use super::fetch_validators::L2ValidatorSetProvider;
use super::{ConsensusTaskEvent, ConsensusValue, HeightExt, P2PTaskEvent};
use crate::config::ConsensusConfig;
use crate::validator::{FinalizedBlock, ValidatorBlockInfoStage};
Expand All @@ -51,27 +49,12 @@ pub fn spawn(
storage: Storage, // For dummy proposal creation
) -> tokio::task::JoinHandle<anyhow::Result<()>> {
util::task::spawn(async move {
// Get the validator address and validator set provider
let validator_address = config.my_validator_address;

let validators = std::iter::once(validator_address)
.chain(config.validator_addresses)
.map(|address| {
let sk = SigningKey::new(rand::rngs::OsRng);
let vk = sk.verification_key();
let public_key = PublicKey::from_bytes(vk.to_bytes());

Validator {
address,
public_key,
voting_power: 1,
}
})
.collect::<Vec<Validator<_>>>();

let validator_set = ValidatorSet::new(validators);
let validator_set_provider = L2ValidatorSetProvider::new(storage.clone(), chain_id, config);

let mut consensus = Consensus::recover(
Config::new(config.my_validator_address)
Config::new(validator_address)
.with_wal_dir(wal_directory)
.with_history_depth(
// TODO: We don't support round certificates yet, and we want to limit
Expand All @@ -82,9 +65,10 @@ pub fn spawn(
),
// TODO use a dynamic validator set provider, once fetching the validator set from the
// staking contract is implemented. Related issue: https://github.com/eqlabs/pathfinder/issues/2936
Arc::new(StaticValidatorSetProvider::new(validator_set.clone())),
);
Arc::new(validator_set_provider.clone()),
)?;

// Get the current height
let mut current_height = consensus.current_height().unwrap_or_default();

// A validator that joins the consensus network and is lagging behind will vote
Expand All @@ -99,7 +83,7 @@ pub fn spawn(
&mut consensus,
&mut started_heights,
current_height,
validator_set.clone(),
validator_set_provider.get_validator_set(current_height)?,
);

loop {
Expand Down Expand Up @@ -255,7 +239,7 @@ pub fn spawn(
&mut consensus,
&mut started_heights,
current_height,
validator_set.clone(),
validator_set_provider.get_validator_set(current_height)?,
);
}
}
Expand Down Expand Up @@ -306,7 +290,7 @@ pub fn spawn(
&mut consensus,
&mut started_heights,
cmd_height,
validator_set.clone(),
validator_set_provider.get_validator_set(cmd_height)?,
);
}
}
Expand Down
110 changes: 110 additions & 0 deletions crates/pathfinder/src/consensus/inner/fetch_validators.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
use pathfinder_common::{ChainId, ContractAddress};
use pathfinder_consensus::{PublicKey, SigningKey, Validator, ValidatorSet};
use pathfinder_storage::Storage;
use rand::rngs::OsRng;

use crate::config::ConsensusConfig;

#[derive(Clone)]
pub struct L2ValidatorSetProvider {
storage: Storage,
chain_id: ChainId,
config: ConsensusConfig,
}

impl L2ValidatorSetProvider {
pub fn new(storage: Storage, chain_id: ChainId, config: ConsensusConfig) -> Self {
Self {
storage,
chain_id,
config,
}
}
}

impl pathfinder_consensus::ValidatorSetProvider<ContractAddress> for L2ValidatorSetProvider {
fn get_validator_set(
&self,
height: u64,
) -> Result<ValidatorSet<ContractAddress>, anyhow::Error> {
fetch_validators(&self.storage, self.chain_id, height, &self.config)
}
}

// TODO:
//
// Currently, the validator fetching functionality lives in its own crate
// (validator-fetcher) because we have a temporary internal RPC method that we
// use for convenient testing.
//
// This separation allows us to easily expose and test the functionality through
// the RPC while the specification for validator fetching is still being
// finalized.
//
// Once we have a final spec, the functionality from the validator-fetcher crate
// will be migrated into this file and the temporary crate (along with its RPC
// method) will be removed.

/// Fetches validators for a given height
///
/// Uses config-based validators if validator addresses are provided in config,
/// otherwise fetches validators from the contract.
pub fn fetch_validators(
storage: &Storage,
chain_id: ChainId,
height: u64,
config: &ConsensusConfig,
) -> Result<ValidatorSet<ContractAddress>, anyhow::Error> {
if config.validator_addresses.is_empty() {
fetch_validators_from_l2(storage, chain_id, height)
} else {
create_validators_from_config(config)
}
}

/// Creates validators from consensus config
///
/// This is the original logic that was in consensus_task.rs.
/// It creates validators with random keys and equal voting power.
fn create_validators_from_config(
config: &ConsensusConfig,
) -> Result<ValidatorSet<ContractAddress>, anyhow::Error> {
let validator_address = config.my_validator_address;

let validators = std::iter::once(validator_address)
.chain(config.validator_addresses.clone())
.map(|address| {
let sk = SigningKey::new(OsRng);
let vk = sk.verification_key();
let public_key = PublicKey::from_bytes(vk.to_bytes());

Validator {
address,
public_key,
voting_power: 1,
}
})
.collect::<Vec<Validator<ContractAddress>>>();

Ok(ValidatorSet::new(validators))
}

/// Fetches validators from the L2 contract
///
/// This logic is temporary until we have a final spec for validator fetching.
fn fetch_validators_from_l2(
storage: &Storage,
chain_id: ChainId,
height: u64,
) -> Result<ValidatorSet<ContractAddress>, anyhow::Error> {
let validators = validator_fetcher::get_validators_at_height(storage, chain_id, height)?;
let validators = validators
.into_iter()
.map(|validator| Validator {
address: validator.address,
public_key: validator.public_key,
voting_power: validator.voting_power,
})
.collect::<Vec<Validator<ContractAddress>>>();
Ok(ValidatorSet::new(validators))
}
2 changes: 2 additions & 0 deletions crates/rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ mime = { workspace = true }
pathfinder-class-hash = { path = "../class-hash" }
pathfinder-common = { path = "../common" }
pathfinder-compiler = { path = "../compiler" }
pathfinder-consensus = { path = "../consensus" }
pathfinder-crypto = { path = "../crypto" }
pathfinder-ethereum = { path = "../ethereum" }
pathfinder-executor = { path = "../executor" }
Expand Down Expand Up @@ -55,6 +56,7 @@ tower-http = { workspace = true, features = [
] }
tracing = { workspace = true }
util = { path = "../util" }
validator-fetcher = { path = "../validator-fetcher" }
zstd = { workspace = true }

[dev-dependencies]
Expand Down
2 changes: 2 additions & 0 deletions crates/rpc/src/method.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub mod chain_id;
pub mod consensus_info;
pub mod estimate_fee;
pub mod estimate_message_fee;
pub mod fetch_validators;
pub mod get_block_transaction_count;
pub mod get_block_with_receipts;
pub mod get_block_with_tx_hashes;
Expand Down Expand Up @@ -47,6 +48,7 @@ pub use call::call;
pub use chain_id::chain_id;
pub use estimate_fee::estimate_fee;
pub use estimate_message_fee::estimate_message_fee;
pub use fetch_validators::fetch_validators;
pub use get_block_transaction_count::get_block_transaction_count;
pub use get_block_with_receipts::get_block_with_receipts;
pub use get_block_with_tx_hashes::get_block_with_tx_hashes;
Expand Down
Loading