Skip to content

Process engine requests sequentially #192

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ use clap::{Parser, Subcommand};
use eyre::bail;
use jsonrpsee::{RpcModule, server::Server};
use tokio::signal::unix::{SignalKind, signal as unix_signal};
use tokio::sync::mpsc;
use tracing::{Level, info};

use crate::{
DebugClient, PayloadSource, ProxyLayer, RollupBoostServer, RpcClient,
DebugClient, EngineHandler, PayloadSource, ProxyLayer, RollupBoostServer, RpcClient,
client::rpc::{BuilderArgs, L2ClientArgs},
init_metrics, init_tracing,
server::ExecutionMode,
Expand Down Expand Up @@ -165,7 +166,14 @@ impl Args {
// Spawn the debug server
rollup_boost.start_debug_server(debug_addr.as_str()).await?;

let module: RpcModule<()> = rollup_boost.try_into()?;
let (tx, rx) = mpsc::unbounded_channel();
let engine_handler = EngineHandler::new(tx);
let module: RpcModule<()> = engine_handler.try_into()?;

// Spawn the rollup boost server
tokio::spawn(async move {
rollup_boost.run(rx).await.unwrap();
});

// Build and start the server
info!("Starting server on :{}", self.rpc_port);
Expand Down
252 changes: 195 additions & 57 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ use op_alloy_rpc_types_engine::{
OpPayloadAttributes,
};
use serde::{Deserialize, Serialize};
use tokio::sync::{
mpsc::{UnboundedReceiver, UnboundedSender},
oneshot,
};

use tracing::{debug, info, instrument};

Expand Down Expand Up @@ -131,6 +135,23 @@ pub struct RollupBoostServer {
execution_mode: Arc<Mutex<ExecutionMode>>,
}

pub enum EngineMessage {
NewPayload {
new_payload: NewPayload,
tx: oneshot::Sender<RpcResult<PayloadStatus>>,
},
ForkchoiceUpdated {
fork_choice_state: ForkchoiceState,
payload_attributes: Box<Option<OpPayloadAttributes>>,
tx: oneshot::Sender<RpcResult<ForkchoiceUpdated>>,
},
GetPayload {
payload_id: PayloadId,
version: Version,
tx: oneshot::Sender<RpcResult<OpExecutionPayloadEnvelope>>,
},
}

impl RollupBoostServer {
pub fn new(
l2_client: RpcClient,
Expand Down Expand Up @@ -158,7 +179,7 @@ impl RollupBoostServer {
}
}

impl TryInto<RpcModule<()>> for RollupBoostServer {
impl TryInto<RpcModule<()>> for EngineHandler {
type Error = RegisterMethodError;

fn try_into(self) -> Result<RpcModule<()>, Self::Error> {
Expand Down Expand Up @@ -238,8 +259,77 @@ pub trait EngineApi {
) -> RpcResult<PayloadStatus>;
}

#[derive(Debug, Clone)]
pub struct EngineHandler {
to_engine: UnboundedSender<EngineMessage>,
}

impl EngineHandler {
pub fn new(to_engine: UnboundedSender<EngineMessage>) -> Self {
Self { to_engine }
}

async fn send_payload(
&self,
payload_id: PayloadId,
version: Version,
) -> RpcResult<OpExecutionPayloadEnvelope> {
let (tx, rx) = oneshot::channel();
let _ = self.to_engine.send(EngineMessage::GetPayload {
payload_id,
version,
tx,
});
rx.await.map_err(|_| {
// handle this error
ErrorObject::owned(
INVALID_REQUEST_CODE,
"Engine server disconnected",
None::<String>,
)
})?
}

async fn send_new_payload(&self, new_payload: NewPayload) -> RpcResult<PayloadStatus> {
let (tx, rx) = oneshot::channel();
let _ = self
.to_engine
.send(EngineMessage::NewPayload { new_payload, tx });
rx.await.map_err(|_| {
// handle this error
ErrorObject::owned(
INVALID_REQUEST_CODE,
"Engine server disconnected",
None::<String>,
)
})?
}

async fn send_fork_choice_updated_v3(
&self,
fork_choice_state: ForkchoiceState,
payload_attributes: Option<OpPayloadAttributes>,
) -> RpcResult<ForkchoiceUpdated> {
let (tx, rx) = oneshot::channel();
let _ = self.to_engine.send(EngineMessage::ForkchoiceUpdated {
fork_choice_state,
payload_attributes: Box::new(payload_attributes),
tx,
});

rx.await.map_err(|_| {
// TODO: handle this error
ErrorObject::owned(
INVALID_REQUEST_CODE,
"Engine server disconnected",
None::<String>,
)
})?
}
}

#[async_trait]
impl EngineApiServer for RollupBoostServer {
impl EngineApiServer for EngineHandler {
#[instrument(
skip_all,
err,
Expand All @@ -257,56 +347,8 @@ impl EngineApiServer for RollupBoostServer {
payload_attributes: Option<OpPayloadAttributes>,
) -> RpcResult<ForkchoiceUpdated> {
info!("received fork_choice_updated_v3");
// First get the local payload ID from L2 client
let l2_response = self
.l2_client
.fork_choice_updated_v3(fork_choice_state, payload_attributes.clone())
.await?;

let span = tracing::Span::current();
if let Some(payload_id) = l2_response.payload_id {
span.record("payload_id", payload_id.to_string());
}

// TODO: Use _is_block_building_call to log the correct message during the async call to builder
let (should_send_to_builder, _is_block_building_call) =
if let Some(attr) = payload_attributes.as_ref() {
// payload attributes are present. It is a FCU call to start block building
// Do not send to builder if no_tx_pool is set, meaning that the CL node wants
// a deterministic block without txs. We let the fallback EL node compute those.
let use_tx_pool = !attr.no_tx_pool.unwrap_or_default();
(use_tx_pool, true)
} else {
// no payload attributes. It is a FCU call to lock the head block
// previously synced with the new_payload_v3 call. Only send to builder if boost_sync is enabled
(self.boost_sync, false)
};

let execution_mode = self.execution_mode();
let trace_id = span.id();
if let Some(payload_id) = l2_response.payload_id {
self.payload_trace_context.store(
payload_id,
fork_choice_state.head_block_hash,
payload_attributes.is_some(),
trace_id,
);
}

if execution_mode.is_disabled() {
debug!(message = "execution mode is disabled, skipping FCU call to builder", "head_block_hash" = %fork_choice_state.head_block_hash);
} else if should_send_to_builder {
let builder_client = self.builder_client.clone();
tokio::spawn(async move {
let _ = builder_client
.fork_choice_updated_v3(fork_choice_state, payload_attributes.clone())
.await;
});
} else {
info!(message = "no payload attributes provided or no_tx_pool is set", "head_block_hash" = %fork_choice_state.head_block_hash);
}

Ok(l2_response)
self.send_fork_choice_updated_v3(fork_choice_state, payload_attributes)
.await
}

#[instrument(
Expand All @@ -324,7 +366,7 @@ impl EngineApiServer for RollupBoostServer {
) -> RpcResult<OpExecutionPayloadEnvelopeV3> {
info!("received get_payload_v3");

match self.get_payload(payload_id, Version::V3).await? {
match self.send_payload(payload_id, Version::V3).await? {
OpExecutionPayloadEnvelope::V3(v3) => Ok(v3),
OpExecutionPayloadEnvelope::V4(_) => Err(ErrorObject::owned(
INVALID_REQUEST_CODE,
Expand All @@ -349,7 +391,7 @@ impl EngineApiServer for RollupBoostServer {
) -> RpcResult<PayloadStatus> {
info!("received new_payload_v3");

self.new_payload(NewPayload::V3(NewPayloadV3 {
self.send_new_payload(NewPayload::V3(NewPayloadV3 {
payload,
versioned_hashes,
parent_beacon_block_root,
Expand All @@ -372,7 +414,7 @@ impl EngineApiServer for RollupBoostServer {
) -> RpcResult<OpExecutionPayloadEnvelopeV4> {
info!("received get_payload_v4");

match self.get_payload(payload_id, Version::V4).await? {
match self.send_payload(payload_id, Version::V4).await? {
OpExecutionPayloadEnvelope::V4(v4) => Ok(v4),
OpExecutionPayloadEnvelope::V3(_) => Err(ErrorObject::owned(
INVALID_REQUEST_CODE,
Expand All @@ -398,7 +440,7 @@ impl EngineApiServer for RollupBoostServer {
) -> RpcResult<PayloadStatus> {
info!("received new_payload_v4");

self.new_payload(NewPayload::V4(NewPayloadV4 {
self.send_new_payload(NewPayload::V4(NewPayloadV4 {
payload,
versioned_hashes,
parent_beacon_block_root,
Expand Down Expand Up @@ -507,6 +549,94 @@ impl Version {
}

impl RollupBoostServer {
pub async fn run(&self, mut rx: UnboundedReceiver<EngineMessage>) -> eyre::Result<()> {
while let Some(message) = rx.recv().await {
match message {
EngineMessage::ForkchoiceUpdated {
fork_choice_state,
payload_attributes,
tx,
} => {
let result = self
.fork_choice_updated_v3(fork_choice_state, *payload_attributes)
.await;
let _ = tx.send(result);
}
EngineMessage::NewPayload { new_payload, tx } => {
let result = self.new_payload(new_payload).await;
let _ = tx.send(result);
}
EngineMessage::GetPayload {
payload_id,
version,
tx,
} => {
let result = self.get_payload(payload_id, version).await;
let _ = tx.send(result);
}
}
}
Ok(())
}

async fn fork_choice_updated_v3(
&self,
fork_choice_state: ForkchoiceState,
payload_attributes: Option<OpPayloadAttributes>,
) -> RpcResult<ForkchoiceUpdated> {
info!("received fork_choice_updated_v3");
// First get the local payload ID from L2 client
let l2_response = self
.l2_client
.fork_choice_updated_v3(fork_choice_state, payload_attributes.clone())
.await?;

let span = tracing::Span::current();
if let Some(payload_id) = l2_response.payload_id {
span.record("payload_id", payload_id.to_string());
}

// TODO: Use _is_block_building_call to log the correct message during the async call to builder
let (should_send_to_builder, _is_block_building_call) =
if let Some(attr) = payload_attributes.as_ref() {
// payload attributes are present. It is a FCU call to start block building
// Do not send to builder if no_tx_pool is set, meaning that the CL node wants
// a deterministic block without txs. We let the fallback EL node compute those.
let use_tx_pool = !attr.no_tx_pool.unwrap_or_default();
(use_tx_pool, true)
} else {
// no payload attributes. It is a FCU call to lock the head block
// previously synced with the new_payload_v3 call. Only send to builder if boost_sync is enabled
(self.boost_sync, false)
};

let execution_mode = self.execution_mode();
let trace_id = span.id();
if let Some(payload_id) = l2_response.payload_id {
self.payload_trace_context.store(
payload_id,
fork_choice_state.head_block_hash,
payload_attributes.is_some(),
trace_id,
);
}

if execution_mode.is_disabled() {
debug!(message = "execution mode is disabled, skipping FCU call to builder", "head_block_hash" = %fork_choice_state.head_block_hash);
} else if should_send_to_builder {
let builder_client = self.builder_client.clone();
tokio::spawn(async move {
let _ = builder_client
.fork_choice_updated_v3(fork_choice_state, payload_attributes.clone())
.await;
});
} else {
info!(message = "no payload attributes provided or no_tx_pool is set", "head_block_hash" = %fork_choice_state.head_block_hash);
}

Ok(l2_response)
}

async fn new_payload(&self, new_payload: NewPayload) -> RpcResult<PayloadStatus> {
let execution_payload = ExecutionPayload::from(new_payload.clone());
let block_hash = execution_payload.block_hash();
Expand Down Expand Up @@ -636,6 +766,7 @@ mod tests {
use std::net::SocketAddr;
use std::str::FromStr;
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio::time::sleep;

const HOST: &str = "0.0.0.0";
Expand Down Expand Up @@ -736,7 +867,14 @@ mod tests {
ExecutionMode::Enabled,
);

let module: RpcModule<()> = rollup_boost_client.try_into().unwrap();
let (tx, rx) = mpsc::unbounded_channel();
let engine_handler = EngineHandler::new(tx);
let module: RpcModule<()> = engine_handler.try_into().unwrap();

tokio::spawn(async move {
// TODO: Not sure if this is the correct way to spawn in test
rollup_boost_client.run(rx).await.unwrap();
});

let proxy_server = ServerBuilder::default()
.build("0.0.0.0:8556".parse::<SocketAddr>().unwrap())
Expand Down
10 changes: 8 additions & 2 deletions tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ pub struct SimpleBlockGenerator {
latest_hash: B256,
timestamp: u64,
version: Version,
block_time_sec: u64,
}

impl SimpleBlockGenerator {
Expand All @@ -320,9 +321,14 @@ impl SimpleBlockGenerator {
latest_hash: B256::ZERO, // temporary value
timestamp: 0, // temporary value
version: Version::V3,
block_time_sec: 1,
}
}

pub fn set_block_time(&mut self, block_time_sec: u64) {
self.block_time_sec = block_time_sec;
}

/// Initialize the block generator by fetching the latest block
pub async fn init(&mut self) -> eyre::Result<()> {
let latest_block = self.engine_api.latest().await?.expect("block not found");
Expand Down Expand Up @@ -368,8 +374,8 @@ impl SimpleBlockGenerator {

let payload_id = result.payload_id.expect("missing payload id");

if !empty_blocks {
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
if !empty_blocks && self.block_time_sec > 0 {
tokio::time::sleep(tokio::time::Duration::from_secs(self.block_time_sec)).await;
}

let payload = self
Expand Down
Loading
Loading