diff --git a/src/proxy.rs b/src/proxy.rs index 6c6810b..05c0f6b 100644 --- a/src/proxy.rs +++ b/src/proxy.rs @@ -6,6 +6,8 @@ use jsonrpsee::core::{BoxError, http_helpers}; use jsonrpsee::http_client::{HttpBody, HttpRequest, HttpResponse}; use std::task::{Context, Poll}; use std::{future::Future, pin::Pin}; +use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; +use tokio::sync::oneshot; use tower::{Layer, Service}; use tracing::info; @@ -45,7 +47,13 @@ impl ProxyLayer { } } -impl Layer for ProxyLayer { +impl Layer for ProxyLayer +where + S: Service, Response = HttpResponse> + Send + Sync + Clone + 'static, + S::Response: 'static, + S::Error: Into + 'static, + S::Future: Send + 'static, +{ type Service = ProxyService; fn layer(&self, inner: S) -> Self::Service { @@ -61,19 +69,55 @@ impl Layer for ProxyLayer { PayloadSource::Builder, ); - ProxyService { + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); + let service = ProxyService { inner, l2_client, builder_client, - } + engine_tx: tx, + }; + + service.process_engine_queue(rx); + service } } +pub type EngineResponseTx = UnboundedSender<( + http::Request, + oneshot::Sender, BoxError>>, +)>; + +pub type EngineResponseRx = UnboundedReceiver<( + http::Request, + oneshot::Sender, BoxError>>, +)>; + #[derive(Clone, Debug)] pub struct ProxyService { inner: S, l2_client: HttpClient, builder_client: HttpClient, + engine_tx: EngineResponseTx, +} + +impl ProxyService +where + S: Service, Response = HttpResponse> + Send + Sync + Clone + 'static, + S::Response: 'static, + S::Error: Into + 'static, + S::Future: Send + 'static, +{ + pub fn process_engine_queue(&self, mut rx: EngineResponseRx) { + let mut service = self.clone(); + tokio::spawn(async move { + while let Some((req, resp_tx)) = rx.recv().await { + let resp = service.inner.call(req).await.map_err(|e| e.into()); + // Note that we can unwrap here since the rx will only be dropped + // if the rollup boost server has been shut down + resp_tx.send(resp).unwrap(); + } + }); + } } // Consider using `RpcServiceT` when https://github.com/paritytech/jsonrpsee/pull/1521 is merged @@ -104,6 +148,7 @@ where // for an explanation of this pattern let mut service = self.clone(); service.inner = std::mem::replace(&mut self.inner, service.inner); + let engine_tx = self.engine_tx.clone(); let fut = async move { let (parts, body) = req.into_parts(); @@ -118,7 +163,10 @@ where if method.starts_with(ENGINE_METHOD) { let req = HttpRequest::from_parts(parts, HttpBody::from(body_bytes)); info!(target: "proxy::call", message = "proxying request to rollup-boost server", ?method); - service.inner.call(req).await.map_err(|e| e.into()) + + let (tx, rx) = oneshot::channel(); + engine_tx.send((req, tx)).unwrap(); + rx.await.map_err(Box::new)? } else if FORWARD_REQUESTS.contains(&method.as_str()) { // If the request should be forwarded, send to both the // default execution client and the builder