Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 52 additions & 4 deletions src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ use jsonrpsee::core::{BoxError, http_helpers};
use jsonrpsee::http_client::{HttpBody, HttpRequest, HttpResponse};
use std::task::{Context, Poll};
use std::{future::Future, pin::Pin};
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tokio::sync::oneshot;
use tower::{Layer, Service};
use tracing::info;

Expand Down Expand Up @@ -45,7 +47,13 @@ impl ProxyLayer {
}
}

impl<S> Layer<S> for ProxyLayer {
impl<S> Layer<S> for ProxyLayer
where
S: Service<HttpRequest<HttpBody>, Response = HttpResponse> + Send + Sync + Clone + 'static,
S::Response: 'static,
S::Error: Into<BoxError> + 'static,
S::Future: Send + 'static,
{
type Service = ProxyService<S>;

fn layer(&self, inner: S) -> Self::Service {
Expand All @@ -61,19 +69,55 @@ impl<S> Layer<S> for ProxyLayer {
PayloadSource::Builder,
);

ProxyService {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let service = ProxyService {
inner,
l2_client,
builder_client,
}
engine_tx: tx,
};

service.process_engine_queue(rx);
service
}
}

pub type EngineResponseTx = UnboundedSender<(
http::Request<HttpBody>,
oneshot::Sender<Result<http::Response<HttpBody>, BoxError>>,
)>;

pub type EngineResponseRx = UnboundedReceiver<(
http::Request<HttpBody>,
oneshot::Sender<Result<http::Response<HttpBody>, BoxError>>,
)>;

#[derive(Clone, Debug)]
pub struct ProxyService<S> {
inner: S,
l2_client: HttpClient,
builder_client: HttpClient,
engine_tx: EngineResponseTx,
}

impl<S> ProxyService<S>
where
S: Service<HttpRequest<HttpBody>, Response = HttpResponse> + Send + Sync + Clone + 'static,
S::Response: 'static,
S::Error: Into<BoxError> + 'static,
S::Future: Send + 'static,
{
pub fn process_engine_queue(&self, mut rx: EngineResponseRx) {
let mut service = self.clone();
tokio::spawn(async move {
while let Some((req, resp_tx)) = rx.recv().await {
let resp = service.inner.call(req).await.map_err(|e| e.into());
// Note that we can unwrap here since the rx will only be dropped
// if the rollup boost server has been shut down
resp_tx.send(resp).unwrap();
}
});
}
}

// Consider using `RpcServiceT` when https://github.com/paritytech/jsonrpsee/pull/1521 is merged
Expand Down Expand Up @@ -104,6 +148,7 @@ where
// for an explanation of this pattern
let mut service = self.clone();
service.inner = std::mem::replace(&mut self.inner, service.inner);
let engine_tx = self.engine_tx.clone();

let fut = async move {
let (parts, body) = req.into_parts();
Expand All @@ -118,7 +163,10 @@ where
if method.starts_with(ENGINE_METHOD) {
let req = HttpRequest::from_parts(parts, HttpBody::from(body_bytes));
info!(target: "proxy::call", message = "proxying request to rollup-boost server", ?method);
service.inner.call(req).await.map_err(|e| e.into())

let (tx, rx) = oneshot::channel();
engine_tx.send((req, tx)).unwrap();
rx.await.map_err(Box::new)?
} else if FORWARD_REQUESTS.contains(&method.as_str()) {
// If the request should be forwarded, send to both the
// default execution client and the builder
Expand Down
Loading