Skip to content

Add Anchor Processor #57

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Dec 9, 2024
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 5 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
members = [
"anchor",
"anchor/client",
"anchor/common/version",
"anchor/http_api",
"anchor/http_metrics",
"anchor/qbft",
"anchor/network",
"anchor/common/version"
"anchor/processor",
"anchor/qbft",
]
resolver = "2"

Expand All @@ -21,6 +22,7 @@ http_api = { path = "anchor/http_api" }
http_metrics = { path = "anchor/http_metrics" }
network = { path ="anchor/network"}
version = { path ="anchor/common/version"}
processor = { path = "anchor/processor" }
lighthouse_network = { git = "https://github.com/sigp/lighthouse", branch = "unstable"}
task_executor = { git = "https://github.com/sigp/lighthouse", branch = "unstable", default-features = false, features = [ "tracing", ] }
metrics = { git = "https://github.com/agemanning/lighthouse", branch = "modularize-vc" }
Expand All @@ -38,6 +40,7 @@ either = "1.13.0"
futures = "0.3.30"
tower-http = {version = "0.6", features = ["cors"] }
hyper = "1.4"
num_cpus = "1"
parking_lot = "0.12"
serde = { version = "1.0.208", features = ["derive"] }
strum = { version = "0.24", features = ["derive"] }
Expand Down
1 change: 1 addition & 0 deletions anchor/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ network = { workspace = true }
unused_port = { workspace = true }
tokio = { workspace = true }
parking_lot = { workspace = true }
processor = { workspace = true }
# Local dependencies
fdlimit = "0.3"
ethereum_hashing = "0.7.0"
3 changes: 3 additions & 0 deletions anchor/client/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ pub struct Config {
/// A list of custom certificates that the validator client will additionally use when
/// connecting to an execution node over SSL/TLS.
pub execution_nodes_tls_certs: Option<Vec<PathBuf>>,
/// Configuration for the processor
pub processor: processor::Config,
}

impl Default for Config {
Expand Down Expand Up @@ -74,6 +76,7 @@ impl Default for Config {
network: <_>::default(),
beacon_nodes_tls_certs: None,
execution_nodes_tls_certs: None,
processor: <_>::default(),
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions anchor/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ impl Client {
"Starting the Anchor client"
);

// Start the processor
let processor_senders = processor::spawn(config.processor, executor.clone()).await;

// Optionally start the metrics server.
let _http_metrics_shared_state = if config.http_metrics.enabled {
let shared_state = Arc::new(RwLock::new(http_metrics::Shared { genesis_time: None }));
Expand Down
12 changes: 12 additions & 0 deletions anchor/processor/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
[package]
name = "processor"
version = "0.1.0"
authors = ["Sigma Prime <[email protected]"]
edition = { workspace = true }

[dependencies]
tokio = { workspace = true, features = ["sync"] }
tracing = { workspace = true }
task_executor = { workspace = true }
serde = { workspace = true }
num_cpus = { workspace = true }
145 changes: 145 additions & 0 deletions anchor/processor/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
use serde::{Deserialize, Serialize};
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use task_executor::TaskExecutor;
use tokio::select;
use tokio::sync::mpsc::error::TrySendError;
use tokio::sync::{mpsc, Semaphore};
use tracing::{error, warn};

#[derive(Clone, Serialize, Deserialize)]
pub struct Config {
pub max_workers: usize,
}

impl Default for Config {
fn default() -> Self {
Self {
max_workers: num_cpus::get(),
}
}
}

pub struct Sender {
name: &'static str,
tx: mpsc::Sender<WorkItem>,
}

impl Sender {
fn new(name: &'static str, tx: mpsc::Sender<WorkItem>) -> Self {
Self { name, tx }
}

pub fn send_async(&mut self, future: AsyncFn) {
self.send_work_item(WorkItem::new_async(self.name, future));
}

pub fn send_blocking(&mut self, func: BlockingFn) {
self.send_work_item(WorkItem::new_blocking(self.name, func));
}

pub fn send_work_item(&mut self, item: WorkItem) {
if let Err(err) = self.tx.try_send(item) {
match err {
TrySendError::Full(item) => {
warn!(task = item.name, "Processor queue full")
}
TrySendError::Closed(_) => {
error!("Processor queue closed unexpectedly")
}
}
}
}
}

pub struct Senders {
example_tx: Sender,
// todo add all the needed queues here
}

struct Receivers {
example_rx: mpsc::Receiver<WorkItem>,
// todo add all the needed queues here
}

pub type AsyncFn = Pin<Box<dyn Future<Output = ()> + Send + Sync>>;
pub type BlockingFn = Box<dyn FnOnce() + Send + Sync>;

enum AsyncOrBlocking {
Async(AsyncFn),
Blocking(BlockingFn),
}
pub struct WorkItem {
name: &'static str,
func: AsyncOrBlocking,
}

impl WorkItem {
pub fn new_async(name: &'static str, func: AsyncFn) -> Self {
Self {
name,
func: AsyncOrBlocking::Async(func),
}
}

pub fn new_blocking(name: &'static str, func: BlockingFn) -> Self {
Self {
name,
func: AsyncOrBlocking::Blocking(func),
}
}
}

pub async fn spawn(config: Config, executor: TaskExecutor) -> Senders {
// todo macro? just specifying name and capacity?
let (example_tx, example_rx) = mpsc::channel(1000);

let senders = Senders {
example_tx: Sender::new("example", example_tx),
};
let receivers = Receivers { example_rx };

executor.spawn(processor(config, receivers, executor.clone()), "processor");
senders
}

async fn processor(config: Config, mut receivers: Receivers, executor: TaskExecutor) {
// TODO: consider having separate limits for blocking and async?
let semaphore = Arc::new(Semaphore::new(config.max_workers));

loop {
let Ok(permit) = semaphore.clone().acquire_owned().await else {
error!("Processor semaphore closed unexpectedly");
break;
};

let work_item = select! {
biased;
Some(w) = receivers.example_rx.recv() => w,
else => {
error!("Processor queues closed unexpectedly");
break;
}
};

match work_item.func {
AsyncOrBlocking::Async(async_fn) => executor.spawn(
async move {
async_fn.await;
drop(permit);
},
work_item.name,
),
AsyncOrBlocking::Blocking(blocking_fn) => {
executor.spawn_blocking(
move || {
blocking_fn();
drop(permit);
},
work_item.name,
);
}
}
}
}
Loading