Skip to content

Flashblocks #98

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 34 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
10efd2d
Initial for fb
ferranbt Feb 7, 2025
e6fc052
Merge main
ferranbt Feb 9, 2025
e7673c1
Rebase main
ferranbt Feb 12, 2025
46f5f2d
Rebase again
ferranbt Feb 12, 2025
bb8736e
Merge main
ferranbt Feb 17, 2025
00e3622
Merge
ferranbt Feb 17, 2025
99958e1
Use separated crate
ferranbt Feb 17, 2025
44390ba
Some moves
ferranbt Feb 17, 2025
32ab828
Add diff
ferranbt Feb 18, 2025
245b75e
Add partial block building
ferranbt Feb 18, 2025
0074fed
Add lib
ferranbt Feb 18, 2025
6ada749
Rebased main
ferranbt Feb 18, 2025
397628c
Prov update alloy deps
ferranbt Feb 18, 2025
9c26323
Fix imports and make fb service public
ferranbt Feb 18, 2025
0b089a3
Add log
ferranbt Feb 18, 2025
e1d6068
Add better print statements
ferranbt Feb 19, 2025
ada938c
Add outbound server
ferranbt Feb 19, 2025
691a11e
add wss support (#106)
cody-wang-cb Feb 20, 2025
ef4a70c
Add debug statement
ferranbt Feb 27, 2025
6a2a5f7
Merge develop
ferranbt Feb 27, 2025
e74a1fd
Bump rustTls version
ferranbt Feb 27, 2025
9bfd6f3
deps: alloy 0.12.5 (#137)
efbig Mar 18, 2025
e287ac8
Rebase
ferranbt Mar 18, 2025
67f5974
Fix lint
ferranbt Mar 18, 2025
b4de9bf
Rebase main
ferranbt Apr 3, 2025
e20dbc5
Fix lint
ferranbt Apr 4, 2025
dac67b0
reth 1.3.11
cody-wang-cb Apr 16, 2025
e224906
reth 1.3.9 and metrics
cody-wang-cb Apr 16, 2025
6088534
Merge pull request #176 from cody-wang-cb/cody/reth-1.3.9
avalonche Apr 16, 2025
62723d7
metrics updates
jowparks Apr 16, 2025
616df18
Update metrics and error handling
jowparks Apr 17, 2025
2c867b0
add code commenting, remove unneeded error metric, PR comments
jowparks Apr 21, 2025
73e7f02
Merge pull request #178 from jowparks/metrics-updates
avalonche Apr 21, 2025
7691533
Bump deps (#189)
SozinM Apr 28, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
307 changes: 223 additions & 84 deletions Cargo.lock

Large diffs are not rendered by default.

17 changes: 10 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ version = "0.1.0"
edition = "2024"

[dependencies]
op-alloy-rpc-types-engine = "0.12.0"
alloy-rpc-types-engine = "0.13.0"
alloy-eips = { version = "0.13.0", features = ["serde"], optional = true }
alloy-primitives = { version = "0.8.10", features = ["rand"] }
op-alloy-rpc-types-engine = "0.15.0"
alloy-rpc-types-engine = "0.15.5"
alloy-rpc-types-eth = "0.15.5"
alloy-serde = "0.15.5"
alloy-eips = { version = "0.15.5", features = ["serde"], optional = true }
alloy-primitives = { version = "1.0.0", features = ["rand"] }
tokio = { version = "1", features = ["full"] }
tracing = "0.1.4"
tracing-subscriber = { version = "0.3.11", features = ["env-filter", "json"] }
Expand Down Expand Up @@ -39,6 +41,8 @@ tracing-opentelemetry = "0.29.0"
futures = "0.3.31"
metrics = "0.24.0"
metrics-exporter-prometheus = "0.16.0"
tokio-tungstenite = { version = "0.26.2", features = ["native-tls"] }
url = "2.5"
metrics-util = "0.19.0"
eyre = "0.6.12"
paste = "1.0.15"
Expand All @@ -49,15 +53,14 @@ time = { version = "0.3.36", features = ["macros", "formatting", "parsing"], opt
lazy_static = {version = "1.5.0", optional = true }

[dev-dependencies]
op-alloy-consensus = "0.12.0"
alloy-rpc-types-eth = "0.13.0"
op-alloy-consensus = "0.15.0"
anyhow = "1.0"
assert_cmd = "2.0.10"
predicates = "3.1.2"
tokio-util = { version = "0.7.13" }
nix = "0.15.0"
bytes = "1.2"
reth-rpc-layer = { git = "https://github.com/paradigmxyz/reth.git", rev = "v1.3.7" }
reth-rpc-layer = { git = "https://github.com/paradigmxyz/reth.git", rev = "cae744898775882fe99a24f4c3b2dbc27f88bd29" }
ctor = "0.4.1"

[features]
Expand Down
14 changes: 12 additions & 2 deletions src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
use ::tracing::info;
use clap::Parser;
use rollup_boost::{
Args, Commands, DebugClient, DebugCommands, PayloadSource, ProxyLayer, RollupBoostServer,
RpcClient, init_metrics, init_tracing,
Args, Commands, DebugClient, DebugCommands, Flashblocks, PayloadSource, ProxyLayer,
RollupBoostServer, RpcClient, init_metrics, init_tracing,
};
use std::net::SocketAddr;

Expand Down Expand Up @@ -90,11 +90,21 @@ async fn main() -> eyre::Result<()> {
info!("Boost sync enabled");
}

let flashblocks_client = if args.flashblocks.flashblocks {
let inbound_url = args.flashblocks.flashblocks_url;
let outbound_url = args.flashblocks.flashblocks_outbound_url;

Some(Flashblocks::run(inbound_url, outbound_url).unwrap())
} else {
None
};

let rollup_boost = RollupBoostServer::new(
l2_client,
builder_client,
boost_sync_enabled,
args.execution_mode,
flashblocks_client,
);

// Spawn the debug server
Expand Down
19 changes: 19 additions & 0 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,25 @@ pub struct Args {
/// Execution mode to start rollup boost with
#[arg(long, env, default_value = "enabled")]
pub execution_mode: ExecutionMode,

/// Enable Flashblocks client
#[clap(flatten)]
pub flashblocks: FlashblocksArgs,
}

#[derive(Parser, Debug)]
pub struct FlashblocksArgs {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you add docs to README and .env.example

/// Enable Flashblocks client
#[arg(long, env, default_value = "false")]
pub flashblocks: bool,

/// Flashblocks WebSocket URL
#[arg(long, env, default_value = "ws://localhost:1111")]
pub flashblocks_url: String,

/// Flashblocks outbound WebSocket URL
#[arg(long, env, default_value = "127.0.0.1:1112")]
pub flashblocks_outbound_url: String,
}

#[derive(Clone, Debug)]
Expand Down
18 changes: 14 additions & 4 deletions src/client/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ use thiserror::Error;
use tracing::{error, info, instrument};

const INTERNAL_ERROR: i32 = 13;
const INVALID_PAYLOAD: i32 = 1337;
const IO_ERROR: i32 = 1338;
const JWT_ERROR: i32 = 1339;

pub(crate) type ClientResult<T> = Result<T, RpcClientError>;

Expand All @@ -43,7 +46,8 @@ trait Code: Sized {
fn code(&self) -> i32;

fn set_code(self) -> Self {
tracing::Span::current().record("code", self.code());
let code_value: i64 = self.code().into();
tracing::Span::current().record("code", code_value.to_string());
self
}
}
Expand All @@ -61,9 +65,10 @@ impl<T, E: Code> Code for Result<T, E> {
impl Code for RpcClientError {
fn code(&self) -> i32 {
match self {
RpcClientError::InvalidPayload(_) => INVALID_PAYLOAD,
RpcClientError::Io(_) => IO_ERROR,
RpcClientError::Jwt(_) => JWT_ERROR,
RpcClientError::Jsonrpsee(e) => e.code(),
// Status code 13 == internal error
_ => INTERNAL_ERROR,
}
}
}
Expand Down Expand Up @@ -153,6 +158,7 @@ impl RpcClient {
}

if res.is_invalid() {
error!("Invalid payload ({}): {:?}", self.payload_source, res);
return Err(RpcClientError::InvalidPayload(
res.payload_status.status.to_string(),
))
Expand All @@ -170,6 +176,7 @@ impl RpcClient {
target = self.payload_source.to_string(),
url = %self.auth_rpc,
%payload_id,
code,
)
)]
pub async fn get_payload_v3(
Expand Down Expand Up @@ -213,6 +220,7 @@ impl RpcClient {
.set_code()?;

if res.is_invalid() {
error!("Invalid payload ({}): {:?}", self.payload_source, res);
return Err(RpcClientError::InvalidPayload(res.status.to_string()).set_code());
}

Expand All @@ -227,6 +235,7 @@ impl RpcClient {
target = self.payload_source.to_string(),
url = %self.auth_rpc,
%payload_id,
code,
)
)]
pub async fn get_payload_v4(
Expand Down Expand Up @@ -291,6 +300,7 @@ impl RpcClient {
.set_code()?;

if res.is_invalid() {
error!("Invalid payload ({}): {:?}", self.payload_source, res);
return Err(RpcClientError::InvalidPayload(res.status.to_string()).set_code());
}

Expand Down Expand Up @@ -377,7 +387,7 @@ mod tests {
use super::*;

const AUTH_PORT: u32 = 8550;
const AUTH_ADDR: &str = "0.0.0.0";
const AUTH_ADDR: &str = "127.0.0.1";
const SECRET: &str = "f79ae8046bc11c9927afe911db7143c51a806c4a537cc08e0d37140b0192f430";

#[test]
Expand Down
61 changes: 61 additions & 0 deletions src/flashblocks/inbound.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
use super::primitives::FlashblocksPayloadV1;
use futures::StreamExt;
use tokio::sync::mpsc;
use tokio_tungstenite::{connect_async, tungstenite::Message};
use tracing::{error, info};
use url::Url;

pub struct FlashblocksReceiverService {
url: Url,
sender: mpsc::Sender<FlashblocksPayloadV1>,
}

impl FlashblocksReceiverService {
pub fn new(
url: String,
sender: mpsc::Sender<FlashblocksPayloadV1>,
) -> Result<Self, url::ParseError> {
Ok(Self {
url: Url::parse(&url)?,
sender,
})
}

pub async fn run(self) {
loop {
match self.connect_and_handle().await {
Ok(()) => break,
Err(e) => {
error!(
message = "Flashblocks receiver connection error, retrying in 5 seconds",
error = %e
);
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
}
}
}

async fn connect_and_handle(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let (ws_stream, _) = connect_async(self.url.as_str()).await?;
let (_, mut read) = ws_stream.split();

info!("Connected to Flashblocks receiver at {}", self.url);

while let Some(msg) = read.next().await {
let msg = msg?;
match msg {
Message::Text(text) => {
if let Ok(flashblocks_msg) = serde_json::from_str::<FlashblocksPayloadV1>(&text)
// TODO: Version this
{
self.sender.send(flashblocks_msg).await?;
}
}
_ => continue,
}
}

Ok(())
}
}
45 changes: 45 additions & 0 deletions src/flashblocks/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
mod inbound;
mod outbound;
pub mod primitives;
mod service;
use inbound::FlashblocksReceiverService;
use outbound::FlashblocksOutboundService;
use std::net::SocketAddr;
use tokio::sync::mpsc;

pub use service::FlashblocksService;

pub struct Flashblocks {}

impl Flashblocks {
pub fn run(builder_url: String, outbound_addr: String) -> eyre::Result<FlashblocksService> {
let (tx, rx) = mpsc::channel(100);
let (outbound_tx, outbound_rx) = mpsc::channel(100);

let receiver = FlashblocksReceiverService::new(builder_url, tx)?;

tokio::spawn(async move {
let _ = receiver.run().await;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a need for error handling here?

});

// Create and spawn the outbound WebSocket service
let outbound_service = FlashblocksOutboundService::new(outbound_rx);
let addr: SocketAddr = outbound_addr
.parse()
.map_err(|e| eyre::eyre!("Invalid outbound address {}: {}", outbound_addr, e))?;

tokio::spawn(async move {
if let Err(e) = outbound_service.run(addr).await {
tracing::error!("Outbound service error: {}", e);
}
});

let service = FlashblocksService::new(outbound_tx);
let mut service_handle = service.clone();
tokio::spawn(async move {
service_handle.run(rx).await;
});

Ok(service)
}
}
77 changes: 77 additions & 0 deletions src/flashblocks/outbound.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
use super::primitives::FlashblocksPayloadV1;
use futures::{SinkExt, StreamExt};
use std::{net::SocketAddr, sync::Arc};
use tokio::{
net::TcpListener,
sync::{Mutex, mpsc},
};
use tokio_tungstenite::{accept_async, tungstenite::Message};
use tracing::{debug, error, info};

type WebSocketSink =
futures::stream::SplitSink<tokio_tungstenite::WebSocketStream<tokio::net::TcpStream>, Message>;

pub struct FlashblocksOutboundService {
clients: Arc<Mutex<Vec<WebSocketSink>>>,
receiver: mpsc::Receiver<FlashblocksPayloadV1>,
}

impl FlashblocksOutboundService {
pub fn new(receiver: mpsc::Receiver<FlashblocksPayloadV1>) -> Self {
Self {
clients: Arc::new(Mutex::new(Vec::new())),
receiver,
}
}

pub async fn run(mut self, addr: SocketAddr) -> Result<(), Box<dyn std::error::Error>> {
let listener = TcpListener::bind(&addr).await?;
info!("Outbound WebSocket server listening on: {}", addr);

let clients = self.clients.clone();

// Spawn a task to handle new WebSocket connections
tokio::spawn(async move {
while let Ok((stream, addr)) = listener.accept().await {
debug!("New WebSocket connection from: {}", addr);
let clients = clients.clone();

tokio::spawn(async move {
match accept_async(stream).await {
Ok(ws_stream) => {
let (sink, _) = ws_stream.split();
clients.lock().await.push(sink);
debug!("Client added: {}", addr);
}
Err(e) => error!("Error accepting WebSocket connection: {}", e),
}
});
}
});

// Handle incoming messages from the channel and broadcast them
while let Some(payload) = self.receiver.recv().await {
let message = match serde_json::to_string(&payload) {
Ok(msg) => Message::Text(msg.into()),
Err(e) => {
error!("Failed to serialize payload: {}", e);
continue;
}
};

let mut clients = self.clients.lock().await;
clients.retain_mut(|sink| {
let send_future = sink.send(message.clone());
match futures::executor::block_on(send_future) {
Ok(_) => true,
Err(e) => {
error!("Failed to send message to client: {}", e);
false // Remove failed client
}
}
});
}

Ok(())
}
}
Loading