diff --git a/src/cli.rs b/src/cli.rs index 54291fc..9f6ab1d 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -5,10 +5,11 @@ use clap::{Parser, Subcommand}; use eyre::bail; use jsonrpsee::{RpcModule, server::Server}; use tokio::signal::unix::{SignalKind, signal as unix_signal}; +use tokio::sync::mpsc; use tracing::{Level, info}; use crate::{ - DebugClient, PayloadSource, ProxyLayer, RollupBoostServer, RpcClient, + DebugClient, EngineHandler, PayloadSource, ProxyLayer, RollupBoostServer, RpcClient, client::rpc::{BuilderArgs, L2ClientArgs}, init_metrics, init_tracing, server::ExecutionMode, @@ -165,7 +166,14 @@ impl Args { // Spawn the debug server rollup_boost.start_debug_server(debug_addr.as_str()).await?; - let module: RpcModule<()> = rollup_boost.try_into()?; + let (tx, rx) = mpsc::unbounded_channel(); + let engine_handler = EngineHandler::new(tx); + let module: RpcModule<()> = engine_handler.try_into()?; + + // Spawn the rollup boost server + tokio::spawn(async move { + rollup_boost.run(rx).await.unwrap(); + }); // Build and start the server info!("Starting server on :{}", self.rpc_port); diff --git a/src/server.rs b/src/server.rs index 59d83cd..1d47729 100644 --- a/src/server.rs +++ b/src/server.rs @@ -20,6 +20,10 @@ use op_alloy_rpc_types_engine::{ OpPayloadAttributes, }; use serde::{Deserialize, Serialize}; +use tokio::sync::{ + mpsc::{UnboundedReceiver, UnboundedSender}, + oneshot, +}; use tracing::{debug, info, instrument}; @@ -131,6 +135,23 @@ pub struct RollupBoostServer { execution_mode: Arc>, } +pub enum EngineMessage { + NewPayload { + new_payload: NewPayload, + tx: oneshot::Sender>, + }, + ForkchoiceUpdated { + fork_choice_state: ForkchoiceState, + payload_attributes: Box>, + tx: oneshot::Sender>, + }, + GetPayload { + payload_id: PayloadId, + version: Version, + tx: oneshot::Sender>, + }, +} + impl RollupBoostServer { pub fn new( l2_client: RpcClient, @@ -158,7 +179,7 @@ impl RollupBoostServer { } } -impl TryInto> for RollupBoostServer { +impl TryInto> for EngineHandler { type Error = RegisterMethodError; fn try_into(self) -> Result, Self::Error> { @@ -238,8 +259,77 @@ pub trait EngineApi { ) -> RpcResult; } +#[derive(Debug, Clone)] +pub struct EngineHandler { + to_engine: UnboundedSender, +} + +impl EngineHandler { + pub fn new(to_engine: UnboundedSender) -> Self { + Self { to_engine } + } + + async fn send_payload( + &self, + payload_id: PayloadId, + version: Version, + ) -> RpcResult { + let (tx, rx) = oneshot::channel(); + let _ = self.to_engine.send(EngineMessage::GetPayload { + payload_id, + version, + tx, + }); + rx.await.map_err(|_| { + // handle this error + ErrorObject::owned( + INVALID_REQUEST_CODE, + "Engine server disconnected", + None::, + ) + })? + } + + async fn send_new_payload(&self, new_payload: NewPayload) -> RpcResult { + let (tx, rx) = oneshot::channel(); + let _ = self + .to_engine + .send(EngineMessage::NewPayload { new_payload, tx }); + rx.await.map_err(|_| { + // handle this error + ErrorObject::owned( + INVALID_REQUEST_CODE, + "Engine server disconnected", + None::, + ) + })? + } + + async fn send_fork_choice_updated_v3( + &self, + fork_choice_state: ForkchoiceState, + payload_attributes: Option, + ) -> RpcResult { + let (tx, rx) = oneshot::channel(); + let _ = self.to_engine.send(EngineMessage::ForkchoiceUpdated { + fork_choice_state, + payload_attributes: Box::new(payload_attributes), + tx, + }); + + rx.await.map_err(|_| { + // TODO: handle this error + ErrorObject::owned( + INVALID_REQUEST_CODE, + "Engine server disconnected", + None::, + ) + })? + } +} + #[async_trait] -impl EngineApiServer for RollupBoostServer { +impl EngineApiServer for EngineHandler { #[instrument( skip_all, err, @@ -257,56 +347,8 @@ impl EngineApiServer for RollupBoostServer { payload_attributes: Option, ) -> RpcResult { info!("received fork_choice_updated_v3"); - // First get the local payload ID from L2 client - let l2_response = self - .l2_client - .fork_choice_updated_v3(fork_choice_state, payload_attributes.clone()) - .await?; - - let span = tracing::Span::current(); - if let Some(payload_id) = l2_response.payload_id { - span.record("payload_id", payload_id.to_string()); - } - - // TODO: Use _is_block_building_call to log the correct message during the async call to builder - let (should_send_to_builder, _is_block_building_call) = - if let Some(attr) = payload_attributes.as_ref() { - // payload attributes are present. It is a FCU call to start block building - // Do not send to builder if no_tx_pool is set, meaning that the CL node wants - // a deterministic block without txs. We let the fallback EL node compute those. - let use_tx_pool = !attr.no_tx_pool.unwrap_or_default(); - (use_tx_pool, true) - } else { - // no payload attributes. It is a FCU call to lock the head block - // previously synced with the new_payload_v3 call. Only send to builder if boost_sync is enabled - (self.boost_sync, false) - }; - - let execution_mode = self.execution_mode(); - let trace_id = span.id(); - if let Some(payload_id) = l2_response.payload_id { - self.payload_trace_context.store( - payload_id, - fork_choice_state.head_block_hash, - payload_attributes.is_some(), - trace_id, - ); - } - - if execution_mode.is_disabled() { - debug!(message = "execution mode is disabled, skipping FCU call to builder", "head_block_hash" = %fork_choice_state.head_block_hash); - } else if should_send_to_builder { - let builder_client = self.builder_client.clone(); - tokio::spawn(async move { - let _ = builder_client - .fork_choice_updated_v3(fork_choice_state, payload_attributes.clone()) - .await; - }); - } else { - info!(message = "no payload attributes provided or no_tx_pool is set", "head_block_hash" = %fork_choice_state.head_block_hash); - } - - Ok(l2_response) + self.send_fork_choice_updated_v3(fork_choice_state, payload_attributes) + .await } #[instrument( @@ -324,7 +366,7 @@ impl EngineApiServer for RollupBoostServer { ) -> RpcResult { info!("received get_payload_v3"); - match self.get_payload(payload_id, Version::V3).await? { + match self.send_payload(payload_id, Version::V3).await? { OpExecutionPayloadEnvelope::V3(v3) => Ok(v3), OpExecutionPayloadEnvelope::V4(_) => Err(ErrorObject::owned( INVALID_REQUEST_CODE, @@ -349,7 +391,7 @@ impl EngineApiServer for RollupBoostServer { ) -> RpcResult { info!("received new_payload_v3"); - self.new_payload(NewPayload::V3(NewPayloadV3 { + self.send_new_payload(NewPayload::V3(NewPayloadV3 { payload, versioned_hashes, parent_beacon_block_root, @@ -372,7 +414,7 @@ impl EngineApiServer for RollupBoostServer { ) -> RpcResult { info!("received get_payload_v4"); - match self.get_payload(payload_id, Version::V4).await? { + match self.send_payload(payload_id, Version::V4).await? { OpExecutionPayloadEnvelope::V4(v4) => Ok(v4), OpExecutionPayloadEnvelope::V3(_) => Err(ErrorObject::owned( INVALID_REQUEST_CODE, @@ -398,7 +440,7 @@ impl EngineApiServer for RollupBoostServer { ) -> RpcResult { info!("received new_payload_v4"); - self.new_payload(NewPayload::V4(NewPayloadV4 { + self.send_new_payload(NewPayload::V4(NewPayloadV4 { payload, versioned_hashes, parent_beacon_block_root, @@ -507,6 +549,94 @@ impl Version { } impl RollupBoostServer { + pub async fn run(&self, mut rx: UnboundedReceiver) -> eyre::Result<()> { + while let Some(message) = rx.recv().await { + match message { + EngineMessage::ForkchoiceUpdated { + fork_choice_state, + payload_attributes, + tx, + } => { + let result = self + .fork_choice_updated_v3(fork_choice_state, *payload_attributes) + .await; + let _ = tx.send(result); + } + EngineMessage::NewPayload { new_payload, tx } => { + let result = self.new_payload(new_payload).await; + let _ = tx.send(result); + } + EngineMessage::GetPayload { + payload_id, + version, + tx, + } => { + let result = self.get_payload(payload_id, version).await; + let _ = tx.send(result); + } + } + } + Ok(()) + } + + async fn fork_choice_updated_v3( + &self, + fork_choice_state: ForkchoiceState, + payload_attributes: Option, + ) -> RpcResult { + info!("received fork_choice_updated_v3"); + // First get the local payload ID from L2 client + let l2_response = self + .l2_client + .fork_choice_updated_v3(fork_choice_state, payload_attributes.clone()) + .await?; + + let span = tracing::Span::current(); + if let Some(payload_id) = l2_response.payload_id { + span.record("payload_id", payload_id.to_string()); + } + + // TODO: Use _is_block_building_call to log the correct message during the async call to builder + let (should_send_to_builder, _is_block_building_call) = + if let Some(attr) = payload_attributes.as_ref() { + // payload attributes are present. It is a FCU call to start block building + // Do not send to builder if no_tx_pool is set, meaning that the CL node wants + // a deterministic block without txs. We let the fallback EL node compute those. + let use_tx_pool = !attr.no_tx_pool.unwrap_or_default(); + (use_tx_pool, true) + } else { + // no payload attributes. It is a FCU call to lock the head block + // previously synced with the new_payload_v3 call. Only send to builder if boost_sync is enabled + (self.boost_sync, false) + }; + + let execution_mode = self.execution_mode(); + let trace_id = span.id(); + if let Some(payload_id) = l2_response.payload_id { + self.payload_trace_context.store( + payload_id, + fork_choice_state.head_block_hash, + payload_attributes.is_some(), + trace_id, + ); + } + + if execution_mode.is_disabled() { + debug!(message = "execution mode is disabled, skipping FCU call to builder", "head_block_hash" = %fork_choice_state.head_block_hash); + } else if should_send_to_builder { + let builder_client = self.builder_client.clone(); + tokio::spawn(async move { + let _ = builder_client + .fork_choice_updated_v3(fork_choice_state, payload_attributes.clone()) + .await; + }); + } else { + info!(message = "no payload attributes provided or no_tx_pool is set", "head_block_hash" = %fork_choice_state.head_block_hash); + } + + Ok(l2_response) + } + async fn new_payload(&self, new_payload: NewPayload) -> RpcResult { let execution_payload = ExecutionPayload::from(new_payload.clone()); let block_hash = execution_payload.block_hash(); @@ -636,6 +766,7 @@ mod tests { use std::net::SocketAddr; use std::str::FromStr; use std::sync::Arc; + use tokio::sync::mpsc; use tokio::time::sleep; const HOST: &str = "0.0.0.0"; @@ -736,7 +867,14 @@ mod tests { ExecutionMode::Enabled, ); - let module: RpcModule<()> = rollup_boost_client.try_into().unwrap(); + let (tx, rx) = mpsc::unbounded_channel(); + let engine_handler = EngineHandler::new(tx); + let module: RpcModule<()> = engine_handler.try_into().unwrap(); + + tokio::spawn(async move { + // TODO: Not sure if this is the correct way to spawn in test + rollup_boost_client.run(rx).await.unwrap(); + }); let proxy_server = ServerBuilder::default() .build("0.0.0.0:8556".parse::().unwrap()) diff --git a/tests/common/mod.rs b/tests/common/mod.rs index a7496f3..b5b52bb 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -310,6 +310,7 @@ pub struct SimpleBlockGenerator { latest_hash: B256, timestamp: u64, version: Version, + block_time_sec: u64, } impl SimpleBlockGenerator { @@ -320,9 +321,14 @@ impl SimpleBlockGenerator { latest_hash: B256::ZERO, // temporary value timestamp: 0, // temporary value version: Version::V3, + block_time_sec: 1, } } + pub fn set_block_time(&mut self, block_time_sec: u64) { + self.block_time_sec = block_time_sec; + } + /// Initialize the block generator by fetching the latest block pub async fn init(&mut self) -> eyre::Result<()> { let latest_block = self.engine_api.latest().await?.expect("block not found"); @@ -368,8 +374,8 @@ impl SimpleBlockGenerator { let payload_id = result.payload_id.expect("missing payload id"); - if !empty_blocks { - tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + if !empty_blocks && self.block_time_sec > 0 { + tokio::time::sleep(tokio::time::Duration::from_secs(self.block_time_sec)).await; } let payload = self diff --git a/tests/engine_calls_in_order.rs b/tests/engine_calls_in_order.rs new file mode 100644 index 0000000..a6c03fb --- /dev/null +++ b/tests/engine_calls_in_order.rs @@ -0,0 +1,21 @@ +mod common; + +use common::RollupBoostTestHarnessBuilder; + +#[tokio::test] +async fn engine_calls_in_order() -> eyre::Result<()> { + let harness = RollupBoostTestHarnessBuilder::new("engine_calls_in_order") + .build() + .await?; + + let mut block_generator = harness.block_generator().await?; + block_generator.set_block_time(0); + + for _ in 0..100 { + let (_block, _block_creator) = block_generator.generate_block(false).await?; + } + + // TODO: check condition + + Ok(()) +}