From 8daa7d38f89ecdda9a378db73cc97440c3082f52 Mon Sep 17 00:00:00 2001 From: 0xKitsune <0xKitsune@protonmail.com> Date: Wed, 30 Apr 2025 17:47:06 -0400 Subject: [PATCH 1/9] feat: engine queue --- src/proxy.rs | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/src/proxy.rs b/src/proxy.rs index 6c6810b..4a07b4c 100644 --- a/src/proxy.rs +++ b/src/proxy.rs @@ -1,11 +1,14 @@ use crate::client::http::HttpClient; use crate::server::PayloadSource; use alloy_rpc_types_engine::JwtSecret; -use http::Uri; +use http::{Request, Uri}; use jsonrpsee::core::{BoxError, http_helpers}; use jsonrpsee::http_client::{HttpBody, HttpRequest, HttpResponse}; +use reqwest::Body; use std::task::{Context, Poll}; use std::{future::Future, pin::Pin}; +use tokio::sync::mpsc::Sender; +use tokio::sync::oneshot; use tower::{Layer, Service}; use tracing::info; @@ -74,6 +77,7 @@ pub struct ProxyService { inner: S, l2_client: HttpClient, builder_client: HttpClient, + engine_tx: Sender<(Request, oneshot::Receiver)>, } // Consider using `RpcServiceT` when https://github.com/paritytech/jsonrpsee/pull/1521 is merged @@ -104,6 +108,7 @@ where // for an explanation of this pattern let mut service = self.clone(); service.inner = std::mem::replace(&mut self.inner, service.inner); + let engine_tx = self.engine_tx.clone(); let fut = async move { let (parts, body) = req.into_parts(); @@ -118,7 +123,13 @@ where if method.starts_with(ENGINE_METHOD) { let req = HttpRequest::from_parts(parts, HttpBody::from(body_bytes)); info!(target: "proxy::call", message = "proxying request to rollup-boost server", ?method); - service.inner.call(req).await.map_err(|e| e.into()) + + let (rx, tx) = oneshot::channel(); + engine_tx.send((req, tx)); + + // TODO: await response + // service.inner.call(req).await.map_err(|e| e.into()) + todo!() } else if FORWARD_REQUESTS.contains(&method.as_str()) { // If the request should be forwarded, send to both the // default execution client and the builder From 0068138cb339a79664fffba1c0b0087980e9c1ea Mon Sep 17 00:00:00 2001 From: 0xKitsune <0xKitsune@protonmail.com> Date: Wed, 30 Apr 2025 18:23:23 -0400 Subject: [PATCH 2/9] feat: process engine queue --- src/proxy.rs | 37 ++++++++++++++++++++++++++++++++----- 1 file changed, 32 insertions(+), 5 deletions(-) diff --git a/src/proxy.rs b/src/proxy.rs index 4a07b4c..d0e2df9 100644 --- a/src/proxy.rs +++ b/src/proxy.rs @@ -7,7 +7,7 @@ use jsonrpsee::http_client::{HttpBody, HttpRequest, HttpResponse}; use reqwest::Body; use std::task::{Context, Poll}; use std::{future::Future, pin::Pin}; -use tokio::sync::mpsc::Sender; +use tokio::sync::mpsc::{Sender, UnboundedReceiver, UnboundedSender}; use tokio::sync::oneshot; use tower::{Layer, Service}; use tracing::info; @@ -64,11 +64,16 @@ impl Layer for ProxyLayer { PayloadSource::Builder, ); - ProxyService { + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); + let service = ProxyService { inner, l2_client, builder_client, - } + engine_tx: tx, + }; + service.process_engine_queue(); + + service } } @@ -77,7 +82,29 @@ pub struct ProxyService { inner: S, l2_client: HttpClient, builder_client: HttpClient, - engine_tx: Sender<(Request, oneshot::Receiver)>, + engine_tx: UnboundedSender<(Request, oneshot::Sender)>, +} + +impl ProxyService +where + S: Service, Response = HttpResponse> + Send + Sync + Clone + 'static, + S::Response: 'static, + S::Error: Into + 'static, + S::Future: Send + 'static, +{ + pub fn process_engine_queue( + &self, + mut rx: UnboundedReceiver<(Request, oneshot::Sender)>, + ) { + let mut service = self.clone(); + + tokio::spawn(async move { + while let Some((req, resp_tx)) = rx.recv().await { + let resp = service.inner.call(req).await.expect("TODO: handle error"); + resp_tx.send(resp); + } + }); + } } // Consider using `RpcServiceT` when https://github.com/paritytech/jsonrpsee/pull/1521 is merged @@ -124,7 +151,7 @@ where let req = HttpRequest::from_parts(parts, HttpBody::from(body_bytes)); info!(target: "proxy::call", message = "proxying request to rollup-boost server", ?method); - let (rx, tx) = oneshot::channel(); + let (tx, rx) = oneshot::channel(); engine_tx.send((req, tx)); // TODO: await response From cf5b92ed03df8e9fb9eeed1a075bbf4d4a3ca7f9 Mon Sep 17 00:00:00 2001 From: 0xKitsune <0xKitsune@protonmail.com> Date: Wed, 30 Apr 2025 18:54:07 -0400 Subject: [PATCH 3/9] feat: enqueue engine call, handle response --- src/proxy.rs | 37 +++++++++++++++++++++++++------------ 1 file changed, 25 insertions(+), 12 deletions(-) diff --git a/src/proxy.rs b/src/proxy.rs index d0e2df9..4904a68 100644 --- a/src/proxy.rs +++ b/src/proxy.rs @@ -1,10 +1,9 @@ use crate::client::http::HttpClient; use crate::server::PayloadSource; use alloy_rpc_types_engine::JwtSecret; -use http::{Request, Uri}; +use http::Uri; use jsonrpsee::core::{BoxError, http_helpers}; use jsonrpsee::http_client::{HttpBody, HttpRequest, HttpResponse}; -use reqwest::Body; use std::task::{Context, Poll}; use std::{future::Future, pin::Pin}; use tokio::sync::mpsc::{Sender, UnboundedReceiver, UnboundedSender}; @@ -48,7 +47,13 @@ impl ProxyLayer { } } -impl Layer for ProxyLayer { +impl Layer for ProxyLayer +where + S: Service, Response = HttpResponse> + Send + Sync + Clone + 'static, + S::Response: 'static, + S::Error: Into + 'static, + S::Future: Send + 'static, +{ type Service = ProxyService; fn layer(&self, inner: S) -> Self::Service { @@ -71,7 +76,7 @@ impl Layer for ProxyLayer { builder_client, engine_tx: tx, }; - service.process_engine_queue(); + service.process_engine_queue(rx); service } @@ -82,7 +87,7 @@ pub struct ProxyService { inner: S, l2_client: HttpClient, builder_client: HttpClient, - engine_tx: UnboundedSender<(Request, oneshot::Sender)>, + engine_tx: UnboundedSender<(http::Request, oneshot::Sender)>, } impl ProxyService @@ -94,14 +99,23 @@ where { pub fn process_engine_queue( &self, - mut rx: UnboundedReceiver<(Request, oneshot::Sender)>, + mut rx: UnboundedReceiver<( + http::Request, + oneshot::Sender>, + )>, ) { let mut service = self.clone(); tokio::spawn(async move { while let Some((req, resp_tx)) = rx.recv().await { - let resp = service.inner.call(req).await.expect("TODO: handle error"); - resp_tx.send(resp); + let resp = service + .inner + .call(req) + .await + .map_err(|e| e.into()) + .expect("TODO: handle error"); + + resp_tx.send(resp).expect("TODO: handle error"); } }); } @@ -152,11 +166,10 @@ where info!(target: "proxy::call", message = "proxying request to rollup-boost server", ?method); let (tx, rx) = oneshot::channel(); - engine_tx.send((req, tx)); + engine_tx.send((req, tx)).unwrap(); - // TODO: await response - // service.inner.call(req).await.map_err(|e| e.into()) - todo!() + let resp = rx.await.expect("TODO: handle error"); + Ok(resp) } else if FORWARD_REQUESTS.contains(&method.as_str()) { // If the request should be forwarded, send to both the // default execution client and the builder From 8aa0f130d29d76ee8a2a1013c21998a4e2ff4270 Mon Sep 17 00:00:00 2001 From: 0xKitsune <0xKitsune@protonmail.com> Date: Wed, 30 Apr 2025 18:54:46 -0400 Subject: [PATCH 4/9] chore: clippy --- src/proxy.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/proxy.rs b/src/proxy.rs index 4904a68..0fbd403 100644 --- a/src/proxy.rs +++ b/src/proxy.rs @@ -6,7 +6,7 @@ use jsonrpsee::core::{BoxError, http_helpers}; use jsonrpsee::http_client::{HttpBody, HttpRequest, HttpResponse}; use std::task::{Context, Poll}; use std::{future::Future, pin::Pin}; -use tokio::sync::mpsc::{Sender, UnboundedReceiver, UnboundedSender}; +use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use tokio::sync::oneshot; use tower::{Layer, Service}; use tracing::info; From 7eec301d8706e6b72f54875764ca45fdc3bd78fb Mon Sep 17 00:00:00 2001 From: 0xKitsune <0xKitsune@protonmail.com> Date: Wed, 30 Apr 2025 19:06:49 -0400 Subject: [PATCH 5/9] fix: fix oneshot channel type --- src/proxy.rs | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/src/proxy.rs b/src/proxy.rs index 0fbd403..6ca8f1d 100644 --- a/src/proxy.rs +++ b/src/proxy.rs @@ -87,7 +87,10 @@ pub struct ProxyService { inner: S, l2_client: HttpClient, builder_client: HttpClient, - engine_tx: UnboundedSender<(http::Request, oneshot::Sender)>, + engine_tx: UnboundedSender<( + http::Request, + oneshot::Sender, BoxError>>, + )>, } impl ProxyService @@ -101,20 +104,14 @@ where &self, mut rx: UnboundedReceiver<( http::Request, - oneshot::Sender>, + oneshot::Sender, BoxError>>, )>, ) { let mut service = self.clone(); tokio::spawn(async move { while let Some((req, resp_tx)) = rx.recv().await { - let resp = service - .inner - .call(req) - .await - .map_err(|e| e.into()) - .expect("TODO: handle error"); - + let resp = service.inner.call(req).await.map_err(|e| e.into()); resp_tx.send(resp).expect("TODO: handle error"); } }); @@ -168,8 +165,7 @@ where let (tx, rx) = oneshot::channel(); engine_tx.send((req, tx)).unwrap(); - let resp = rx.await.expect("TODO: handle error"); - Ok(resp) + rx.await.expect("TODO: handle error") } else if FORWARD_REQUESTS.contains(&method.as_str()) { // If the request should be forwarded, send to both the // default execution client and the builder From 54f0bff5050de210a9b066cb1ab19fa73ee8362c Mon Sep 17 00:00:00 2001 From: 0xKitsune <0xKitsune@protonmail.com> Date: Wed, 30 Apr 2025 19:15:12 -0400 Subject: [PATCH 6/9] fmt: spacing --- src/proxy.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/proxy.rs b/src/proxy.rs index 6ca8f1d..4d75e05 100644 --- a/src/proxy.rs +++ b/src/proxy.rs @@ -76,8 +76,8 @@ where builder_client, engine_tx: tx, }; - service.process_engine_queue(rx); + service.process_engine_queue(rx); service } } @@ -108,7 +108,6 @@ where )>, ) { let mut service = self.clone(); - tokio::spawn(async move { while let Some((req, resp_tx)) = rx.recv().await { let resp = service.inner.call(req).await.map_err(|e| e.into()); @@ -164,7 +163,6 @@ where let (tx, rx) = oneshot::channel(); engine_tx.send((req, tx)).unwrap(); - rx.await.expect("TODO: handle error") } else if FORWARD_REQUESTS.contains(&method.as_str()) { // If the request should be forwarded, send to both the From 6d300397ac2ca95f8ffcefb7af02dd68992442d9 Mon Sep 17 00:00:00 2001 From: 0xKitsune <0xKitsune@protonmail.com> Date: Thu, 1 May 2025 20:18:17 -0400 Subject: [PATCH 7/9] chore: update to use type alias --- src/proxy.rs | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/src/proxy.rs b/src/proxy.rs index 4d75e05..8a28ca2 100644 --- a/src/proxy.rs +++ b/src/proxy.rs @@ -82,15 +82,22 @@ where } } +pub type EngineResponseTx = UnboundedSender<( + http::Request, + oneshot::Sender, BoxError>>, +)>; + +pub type EngineResponseRx = UnboundedReceiver<( + http::Request, + oneshot::Sender, BoxError>>, +)>; + #[derive(Clone, Debug)] pub struct ProxyService { inner: S, l2_client: HttpClient, builder_client: HttpClient, - engine_tx: UnboundedSender<( - http::Request, - oneshot::Sender, BoxError>>, - )>, + engine_tx: EngineResponseTx, } impl ProxyService @@ -100,13 +107,7 @@ where S::Error: Into + 'static, S::Future: Send + 'static, { - pub fn process_engine_queue( - &self, - mut rx: UnboundedReceiver<( - http::Request, - oneshot::Sender, BoxError>>, - )>, - ) { + pub fn process_engine_queue(&self, mut rx: EngineResponseRx) { let mut service = self.clone(); tokio::spawn(async move { while let Some((req, resp_tx)) = rx.recv().await { From fb482d990fa9d4047288ed4d819e120327d438b0 Mon Sep 17 00:00:00 2001 From: 0xKitsune <0xKitsune@protonmail.com> Date: Sat, 3 May 2025 00:29:24 -0400 Subject: [PATCH 8/9] fix: error handling --- src/proxy.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/proxy.rs b/src/proxy.rs index 8a28ca2..cbee889 100644 --- a/src/proxy.rs +++ b/src/proxy.rs @@ -112,7 +112,9 @@ where tokio::spawn(async move { while let Some((req, resp_tx)) = rx.recv().await { let resp = service.inner.call(req).await.map_err(|e| e.into()); - resp_tx.send(resp).expect("TODO: handle error"); + // Note that we can unwrap here since the rx will only be dropped if the rollup + // boostserver has been shut down + resp_tx.send(resp).unwrap(); } }); } @@ -164,7 +166,7 @@ where let (tx, rx) = oneshot::channel(); engine_tx.send((req, tx)).unwrap(); - rx.await.expect("TODO: handle error") + rx.await.map_err(Box::new)? } else if FORWARD_REQUESTS.contains(&method.as_str()) { // If the request should be forwarded, send to both the // default execution client and the builder From 5d9e7bee4de7cffe9ce3322783523065a0cf8567 Mon Sep 17 00:00:00 2001 From: 0xKitsune <0xkitsune@protonmail.com> Date: Sun, 4 May 2025 12:53:49 -0400 Subject: [PATCH 9/9] docs: fix comment --- src/proxy.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/proxy.rs b/src/proxy.rs index cbee889..05c0f6b 100644 --- a/src/proxy.rs +++ b/src/proxy.rs @@ -112,8 +112,8 @@ where tokio::spawn(async move { while let Some((req, resp_tx)) = rx.recv().await { let resp = service.inner.call(req).await.map_err(|e| e.into()); - // Note that we can unwrap here since the rx will only be dropped if the rollup - // boostserver has been shut down + // Note that we can unwrap here since the rx will only be dropped + // if the rollup boost server has been shut down resp_tx.send(resp).unwrap(); } });