From b28b8c29835cf1ac13d4835d69c433b564376513 Mon Sep 17 00:00:00 2001 From: Ferran Borreguero Date: Tue, 29 Apr 2025 10:53:24 +0100 Subject: [PATCH 1/5] Move function to core engine logic and out of trait --- src/server.rs | 110 +++++++++++++++++++++++++++----------------------- 1 file changed, 60 insertions(+), 50 deletions(-) diff --git a/src/server.rs b/src/server.rs index 59d83cd..9a5020a 100644 --- a/src/server.rs +++ b/src/server.rs @@ -257,56 +257,8 @@ impl EngineApiServer for RollupBoostServer { payload_attributes: Option, ) -> RpcResult { info!("received fork_choice_updated_v3"); - // First get the local payload ID from L2 client - let l2_response = self - .l2_client - .fork_choice_updated_v3(fork_choice_state, payload_attributes.clone()) - .await?; - - let span = tracing::Span::current(); - if let Some(payload_id) = l2_response.payload_id { - span.record("payload_id", payload_id.to_string()); - } - - // TODO: Use _is_block_building_call to log the correct message during the async call to builder - let (should_send_to_builder, _is_block_building_call) = - if let Some(attr) = payload_attributes.as_ref() { - // payload attributes are present. It is a FCU call to start block building - // Do not send to builder if no_tx_pool is set, meaning that the CL node wants - // a deterministic block without txs. We let the fallback EL node compute those. - let use_tx_pool = !attr.no_tx_pool.unwrap_or_default(); - (use_tx_pool, true) - } else { - // no payload attributes. It is a FCU call to lock the head block - // previously synced with the new_payload_v3 call. Only send to builder if boost_sync is enabled - (self.boost_sync, false) - }; - - let execution_mode = self.execution_mode(); - let trace_id = span.id(); - if let Some(payload_id) = l2_response.payload_id { - self.payload_trace_context.store( - payload_id, - fork_choice_state.head_block_hash, - payload_attributes.is_some(), - trace_id, - ); - } - - if execution_mode.is_disabled() { - debug!(message = "execution mode is disabled, skipping FCU call to builder", "head_block_hash" = %fork_choice_state.head_block_hash); - } else if should_send_to_builder { - let builder_client = self.builder_client.clone(); - tokio::spawn(async move { - let _ = builder_client - .fork_choice_updated_v3(fork_choice_state, payload_attributes.clone()) - .await; - }); - } else { - info!(message = "no payload attributes provided or no_tx_pool is set", "head_block_hash" = %fork_choice_state.head_block_hash); - } - - Ok(l2_response) + self.fork_choice_updated_v3(fork_choice_state, payload_attributes) + .await } #[instrument( @@ -507,6 +459,64 @@ impl Version { } impl RollupBoostServer { + async fn fork_choice_updated_v3( + &self, + fork_choice_state: ForkchoiceState, + payload_attributes: Option, + ) -> RpcResult { + info!("received fork_choice_updated_v3"); + // First get the local payload ID from L2 client + let l2_response = self + .l2_client + .fork_choice_updated_v3(fork_choice_state, payload_attributes.clone()) + .await?; + + let span = tracing::Span::current(); + if let Some(payload_id) = l2_response.payload_id { + span.record("payload_id", payload_id.to_string()); + } + + // TODO: Use _is_block_building_call to log the correct message during the async call to builder + let (should_send_to_builder, _is_block_building_call) = + if let Some(attr) = payload_attributes.as_ref() { + // payload attributes are present. It is a FCU call to start block building + // Do not send to builder if no_tx_pool is set, meaning that the CL node wants + // a deterministic block without txs. We let the fallback EL node compute those. + let use_tx_pool = !attr.no_tx_pool.unwrap_or_default(); + (use_tx_pool, true) + } else { + // no payload attributes. It is a FCU call to lock the head block + // previously synced with the new_payload_v3 call. Only send to builder if boost_sync is enabled + (self.boost_sync, false) + }; + + let execution_mode = self.execution_mode(); + let trace_id = span.id(); + if let Some(payload_id) = l2_response.payload_id { + self.payload_trace_context.store( + payload_id, + fork_choice_state.head_block_hash, + payload_attributes.is_some(), + trace_id, + ); + } + + if execution_mode.is_disabled() { + debug!(message = "execution mode is disabled, skipping FCU call to builder", "head_block_hash" = %fork_choice_state.head_block_hash); + } else if should_send_to_builder { + let builder_client = self.builder_client.clone(); + tokio::spawn(async move { + let _ = builder_client + .fork_choice_updated_v3(fork_choice_state, payload_attributes.clone()) + .await; + }); + } else { + info!(message = "no payload attributes provided or no_tx_pool is set", "head_block_hash" = %fork_choice_state.head_block_hash); + } + + Ok(l2_response) + } + async fn new_payload(&self, new_payload: NewPayload) -> RpcResult { let execution_payload = ExecutionPayload::from(new_payload.clone()); let block_hash = execution_payload.block_hash(); From 5e949ea7511744688672ed457a19a3b0064add60 Mon Sep 17 00:00:00 2001 From: Ferran Borreguero Date: Tue, 29 Apr 2025 11:53:19 +0100 Subject: [PATCH 2/5] Some stuff --- src/cli.rs | 12 ++- src/server.rs | 144 +++++++++++++++++++++++++++++++-- tests/common/mod.rs | 10 ++- tests/common/proxy.rs | 2 + tests/engine_calls_in_order.rs | 37 +++++++++ 5 files changed, 193 insertions(+), 12 deletions(-) create mode 100644 tests/engine_calls_in_order.rs diff --git a/src/cli.rs b/src/cli.rs index 54291fc..9f6ab1d 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -5,10 +5,11 @@ use clap::{Parser, Subcommand}; use eyre::bail; use jsonrpsee::{RpcModule, server::Server}; use tokio::signal::unix::{SignalKind, signal as unix_signal}; +use tokio::sync::mpsc; use tracing::{Level, info}; use crate::{ - DebugClient, PayloadSource, ProxyLayer, RollupBoostServer, RpcClient, + DebugClient, EngineHandler, PayloadSource, ProxyLayer, RollupBoostServer, RpcClient, client::rpc::{BuilderArgs, L2ClientArgs}, init_metrics, init_tracing, server::ExecutionMode, @@ -165,7 +166,14 @@ impl Args { // Spawn the debug server rollup_boost.start_debug_server(debug_addr.as_str()).await?; - let module: RpcModule<()> = rollup_boost.try_into()?; + let (tx, rx) = mpsc::unbounded_channel(); + let engine_handler = EngineHandler::new(tx); + let module: RpcModule<()> = engine_handler.try_into()?; + + // Spawn the rollup boost server + tokio::spawn(async move { + rollup_boost.run(rx).await.unwrap(); + }); // Build and start the server info!("Starting server on :{}", self.rpc_port); diff --git a/src/server.rs b/src/server.rs index 9a5020a..8485145 100644 --- a/src/server.rs +++ b/src/server.rs @@ -20,6 +20,10 @@ use op_alloy_rpc_types_engine::{ OpPayloadAttributes, }; use serde::{Deserialize, Serialize}; +use tokio::sync::{ + mpsc::{UnboundedReceiver, UnboundedSender}, + oneshot, +}; use tracing::{debug, info, instrument}; @@ -131,6 +135,23 @@ pub struct RollupBoostServer { execution_mode: Arc>, } +pub enum EngineMessage { + NewPayload { + new_payload: NewPayload, + tx: oneshot::Sender>, + }, + ForkchoiceUpdated { + fork_choice_state: ForkchoiceState, + payload_attributes: Option, + tx: oneshot::Sender>, + }, + GetPayload { + payload_id: PayloadId, + version: Version, + tx: oneshot::Sender>, + }, +} + impl RollupBoostServer { pub fn new( l2_client: RpcClient, @@ -158,7 +179,7 @@ impl RollupBoostServer { } } -impl TryInto> for RollupBoostServer { +impl TryInto> for EngineHandler { type Error = RegisterMethodError; fn try_into(self) -> Result, Self::Error> { @@ -238,8 +259,77 @@ pub trait EngineApi { ) -> RpcResult; } +#[derive(Debug, Clone)] +pub struct EngineHandler { + to_engine: UnboundedSender, +} + +impl EngineHandler { + pub fn new(to_engine: UnboundedSender) -> Self { + Self { to_engine } + } + + async fn send_payload( + &self, + payload_id: PayloadId, + version: Version, + ) -> RpcResult { + let (tx, rx) = oneshot::channel(); + let _ = self.to_engine.send(EngineMessage::GetPayload { + payload_id, + version, + tx, + }); + rx.await.map_err(|_| { + // handle this error + ErrorObject::owned( + INVALID_REQUEST_CODE, + "Engine server disconnected", + None::, + ) + })? + } + + async fn send_new_payload(&self, new_payload: NewPayload) -> RpcResult { + let (tx, rx) = oneshot::channel(); + let _ = self + .to_engine + .send(EngineMessage::NewPayload { new_payload, tx }); + rx.await.map_err(|_| { + // handle this error + ErrorObject::owned( + INVALID_REQUEST_CODE, + "Engine server disconnected", + None::, + ) + })? + } + + async fn send_fork_choice_updated_v3( + &self, + fork_choice_state: ForkchoiceState, + payload_attributes: Option, + ) -> RpcResult { + let (tx, rx) = oneshot::channel(); + let _ = self.to_engine.send(EngineMessage::ForkchoiceUpdated { + fork_choice_state, + payload_attributes, + tx, + }); + + rx.await.map_err(|_| { + // TODO: handle this error + ErrorObject::owned( + INVALID_REQUEST_CODE, + "Engine server disconnected", + None::, + ) + })? + } +} + #[async_trait] -impl EngineApiServer for RollupBoostServer { +impl EngineApiServer for EngineHandler { #[instrument( skip_all, err, @@ -257,7 +347,7 @@ impl EngineApiServer for RollupBoostServer { payload_attributes: Option, ) -> RpcResult { info!("received fork_choice_updated_v3"); - self.fork_choice_updated_v3(fork_choice_state, payload_attributes) + self.send_fork_choice_updated_v3(fork_choice_state, payload_attributes) .await } @@ -276,7 +366,7 @@ impl EngineApiServer for RollupBoostServer { ) -> RpcResult { info!("received get_payload_v3"); - match self.get_payload(payload_id, Version::V3).await? { + match self.send_payload(payload_id, Version::V3).await? { OpExecutionPayloadEnvelope::V3(v3) => Ok(v3), OpExecutionPayloadEnvelope::V4(_) => Err(ErrorObject::owned( INVALID_REQUEST_CODE, @@ -301,7 +391,7 @@ impl EngineApiServer for RollupBoostServer { ) -> RpcResult { info!("received new_payload_v3"); - self.new_payload(NewPayload::V3(NewPayloadV3 { + self.send_new_payload(NewPayload::V3(NewPayloadV3 { payload, versioned_hashes, parent_beacon_block_root, @@ -324,7 +414,7 @@ impl EngineApiServer for RollupBoostServer { ) -> RpcResult { info!("received get_payload_v4"); - match self.get_payload(payload_id, Version::V4).await? { + match self.send_payload(payload_id, Version::V4).await? { OpExecutionPayloadEnvelope::V4(v4) => Ok(v4), OpExecutionPayloadEnvelope::V3(_) => Err(ErrorObject::owned( INVALID_REQUEST_CODE, @@ -350,7 +440,7 @@ impl EngineApiServer for RollupBoostServer { ) -> RpcResult { info!("received new_payload_v4"); - self.new_payload(NewPayload::V4(NewPayloadV4 { + self.send_new_payload(NewPayload::V4(NewPayloadV4 { payload, versioned_hashes, parent_beacon_block_root, @@ -459,6 +549,36 @@ impl Version { } impl RollupBoostServer { + pub async fn run(&self, mut rx: UnboundedReceiver) -> eyre::Result<()> { + while let Some(message) = rx.recv().await { + match message { + EngineMessage::ForkchoiceUpdated { + fork_choice_state, + payload_attributes, + tx, + } => { + let result = self + .fork_choice_updated_v3(fork_choice_state, payload_attributes) + .await; + let _ = tx.send(result); + } + EngineMessage::NewPayload { new_payload, tx } => { + let result = self.new_payload(new_payload).await; + let _ = tx.send(result); + } + EngineMessage::GetPayload { + payload_id, + version, + tx, + } => { + let result = self.get_payload(payload_id, version).await; + let _ = tx.send(result); + } + } + } + Ok(()) + } + async fn fork_choice_updated_v3( &self, fork_choice_state: ForkchoiceState, @@ -646,6 +766,7 @@ mod tests { use std::net::SocketAddr; use std::str::FromStr; use std::sync::Arc; + use tokio::sync::mpsc; use tokio::time::sleep; const HOST: &str = "0.0.0.0"; @@ -746,7 +867,14 @@ mod tests { ExecutionMode::Enabled, ); - let module: RpcModule<()> = rollup_boost_client.try_into().unwrap(); + let (tx, rx) = mpsc::unbounded_channel(); + let engine_handler = EngineHandler::new(tx); + let module: RpcModule<()> = engine_handler.try_into().unwrap(); + + tokio::spawn(async move { + // TODO: Not sure if this is the correct way to spawn in test + rollup_boost_client.run(rx).await.unwrap(); + }); let proxy_server = ServerBuilder::default() .build("0.0.0.0:8556".parse::().unwrap()) diff --git a/tests/common/mod.rs b/tests/common/mod.rs index a7496f3..b5b52bb 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -310,6 +310,7 @@ pub struct SimpleBlockGenerator { latest_hash: B256, timestamp: u64, version: Version, + block_time_sec: u64, } impl SimpleBlockGenerator { @@ -320,9 +321,14 @@ impl SimpleBlockGenerator { latest_hash: B256::ZERO, // temporary value timestamp: 0, // temporary value version: Version::V3, + block_time_sec: 1, } } + pub fn set_block_time(&mut self, block_time_sec: u64) { + self.block_time_sec = block_time_sec; + } + /// Initialize the block generator by fetching the latest block pub async fn init(&mut self) -> eyre::Result<()> { let latest_block = self.engine_api.latest().await?.expect("block not found"); @@ -368,8 +374,8 @@ impl SimpleBlockGenerator { let payload_id = result.payload_id.expect("missing payload id"); - if !empty_blocks { - tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + if !empty_blocks && self.block_time_sec > 0 { + tokio::time::sleep(tokio::time::Duration::from_secs(self.block_time_sec)).await; } let payload = self diff --git a/tests/common/proxy.rs b/tests/common/proxy.rs index 356729b..7c5425d 100644 --- a/tests/common/proxy.rs +++ b/tests/common/proxy.rs @@ -86,6 +86,8 @@ async fn proxy( let json_rpc_response = serde_json::from_slice::(&bytes).unwrap(); let bytes = if let Some(result) = json_rpc_response.clone().result { + println!("Received response: {:?}", json_rpc_response.id); + let value = config .handler .handle(json_rpc_request.method, json_rpc_request.params, result) diff --git a/tests/engine_calls_in_order.rs b/tests/engine_calls_in_order.rs new file mode 100644 index 0000000..1ab4ddc --- /dev/null +++ b/tests/engine_calls_in_order.rs @@ -0,0 +1,37 @@ +mod common; + +use common::{RollupBoostTestHarnessBuilder, proxy::ProxyHandler}; +use futures::FutureExt as _; +use serde_json::Value; +use std::pin::Pin; +use std::sync::Arc; + +struct Handler; + +impl ProxyHandler for Handler { + fn handle( + &self, + _method: String, + _params: Value, + _result: Value, + ) -> Pin> + Send>> { + async move { None }.boxed() + } +} + +#[tokio::test] +async fn engine_calls_in_order() -> eyre::Result<()> { + let harness = RollupBoostTestHarnessBuilder::new("engine_calls_in_order") + .proxy_handler(Arc::new(Handler)) + .build() + .await?; + + let mut block_generator = harness.block_generator().await?; + block_generator.set_block_time(0); + + for _ in 0..100 { + let (_block, _block_creator) = block_generator.generate_block(false).await?; + } + + Ok(()) +} From cb8f87cb4b9f9329572edcb450a01def8c282eec Mon Sep 17 00:00:00 2001 From: Ferran Borreguero Date: Tue, 29 Apr 2025 12:00:37 +0100 Subject: [PATCH 3/5] Fix --- tests/common/proxy.rs | 2 -- tests/engine_calls_in_order.rs | 20 +------------------- 2 files changed, 1 insertion(+), 21 deletions(-) diff --git a/tests/common/proxy.rs b/tests/common/proxy.rs index 7c5425d..356729b 100644 --- a/tests/common/proxy.rs +++ b/tests/common/proxy.rs @@ -86,8 +86,6 @@ async fn proxy( let json_rpc_response = serde_json::from_slice::(&bytes).unwrap(); let bytes = if let Some(result) = json_rpc_response.clone().result { - println!("Received response: {:?}", json_rpc_response.id); - let value = config .handler .handle(json_rpc_request.method, json_rpc_request.params, result) diff --git a/tests/engine_calls_in_order.rs b/tests/engine_calls_in_order.rs index 1ab4ddc..9a6ded2 100644 --- a/tests/engine_calls_in_order.rs +++ b/tests/engine_calls_in_order.rs @@ -1,28 +1,10 @@ mod common; -use common::{RollupBoostTestHarnessBuilder, proxy::ProxyHandler}; -use futures::FutureExt as _; -use serde_json::Value; -use std::pin::Pin; -use std::sync::Arc; - -struct Handler; - -impl ProxyHandler for Handler { - fn handle( - &self, - _method: String, - _params: Value, - _result: Value, - ) -> Pin> + Send>> { - async move { None }.boxed() - } -} +use common::RollupBoostTestHarnessBuilder; #[tokio::test] async fn engine_calls_in_order() -> eyre::Result<()> { let harness = RollupBoostTestHarnessBuilder::new("engine_calls_in_order") - .proxy_handler(Arc::new(Handler)) .build() .await?; From c2d83e7c5035b17cb891d6bbffcd6942d5100c3e Mon Sep 17 00:00:00 2001 From: Ferran Borreguero Date: Tue, 29 Apr 2025 12:00:54 +0100 Subject: [PATCH 4/5] Add comment --- tests/engine_calls_in_order.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/engine_calls_in_order.rs b/tests/engine_calls_in_order.rs index 9a6ded2..a6c03fb 100644 --- a/tests/engine_calls_in_order.rs +++ b/tests/engine_calls_in_order.rs @@ -15,5 +15,7 @@ async fn engine_calls_in_order() -> eyre::Result<()> { let (_block, _block_creator) = block_generator.generate_block(false).await?; } + // TODO: check condition + Ok(()) } From 5053f9eace687ea10cbefb3151e8de2d7851e497 Mon Sep 17 00:00:00 2001 From: Ferran Borreguero Date: Tue, 29 Apr 2025 12:12:46 +0100 Subject: [PATCH 5/5] Fix lint --- src/server.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/server.rs b/src/server.rs index 8485145..1d47729 100644 --- a/src/server.rs +++ b/src/server.rs @@ -142,7 +142,7 @@ pub enum EngineMessage { }, ForkchoiceUpdated { fork_choice_state: ForkchoiceState, - payload_attributes: Option, + payload_attributes: Box>, tx: oneshot::Sender>, }, GetPayload { @@ -313,7 +313,7 @@ impl EngineHandler { let (tx, rx) = oneshot::channel(); let _ = self.to_engine.send(EngineMessage::ForkchoiceUpdated { fork_choice_state, - payload_attributes, + payload_attributes: Box::new(payload_attributes), tx, }); @@ -558,7 +558,7 @@ impl RollupBoostServer { tx, } => { let result = self - .fork_choice_updated_v3(fork_choice_state, payload_attributes) + .fork_choice_updated_v3(fork_choice_state, *payload_attributes) .await; let _ = tx.send(result); }