From 88f9bd695ff63bc00257e25d751f3639ef13425b Mon Sep 17 00:00:00 2001 From: Henry Gomersall Date: Tue, 23 Feb 2021 14:32:26 +0000 Subject: [PATCH] [WIP] First pass implementation of returning the inner types from the server --- src/pipeline/server.rs | 219 +++++++++++++++++++++++++++++++++++------ 1 file changed, 190 insertions(+), 29 deletions(-) diff --git a/src/pipeline/server.rs b/src/pipeline/server.rs index d3e4954..cd7d2ad 100644 --- a/src/pipeline/server.rs +++ b/src/pipeline/server.rs @@ -8,6 +8,22 @@ use std::task::{Context, Poll}; use std::{error, fmt}; use tower_service::Service; +#[derive(Debug)] +struct Inner { + transport: Option, + service: Option, +} + +impl Inner { + fn take(&mut self) -> (Option, Option) { + (self.transport.take(), self.service.take()) + } + + fn is_populated(&self) -> bool { + self.transport.is_some() & self.service.is_some() + } +} + /// This type provides an implementation of a Tower /// [`Service`](https://docs.rs/tokio-service/0.1/tokio_service/trait.Service.html) on top of a /// request-at-a-time protocol transport. In particular, it wraps a transport that implements @@ -17,14 +33,12 @@ use tower_service::Service; #[derive(Debug)] pub struct Server where - T: Sink + TryStream, + T: Sink + TryStream + Unpin, S: Service<::Ok>, { #[pin] pending: FuturesOrdered, - #[pin] - transport: T, - service: S, + inner: Inner, in_flight: usize, finish: bool, @@ -33,7 +47,7 @@ where /// An error that occurred while servicing a request. pub enum Error where - T: Sink + TryStream, + T: Sink + TryStream + Unpin, S: Service<::Ok>, { /// The underlying transport failed to produce a request. @@ -44,11 +58,14 @@ where /// The underlying service failed to process a request. Service(S::Error), + + /// The future has completed or errored and should now be discarded. + CompletedOrErrored, } impl fmt::Display for Error where - T: Sink + TryStream, + T: Sink + TryStream + Unpin, S: Service<::Ok>, >::Error: fmt::Display, ::Error: fmt::Display, @@ -59,13 +76,14 @@ where Error::BrokenTransportRecv(ref se) => fmt::Display::fmt(se, f), Error::BrokenTransportSend(ref se) => fmt::Display::fmt(se, f), Error::Service(ref se) => fmt::Display::fmt(se, f), + Error::CompletedOrErrored => write!(f, "Completed or errored future"), } } } impl fmt::Debug for Error where - T: Sink + TryStream, + T: Sink + TryStream + Unpin, S: Service<::Ok>, >::Error: fmt::Debug, ::Error: fmt::Debug, @@ -76,13 +94,14 @@ where Error::BrokenTransportRecv(ref se) => write!(f, "BrokenTransportRecv({:?})", se), Error::BrokenTransportSend(ref se) => write!(f, "BrokenTransportSend({:?})", se), Error::Service(ref se) => write!(f, "Service({:?})", se), + Error::CompletedOrErrored => write!(f, "Completed or errored future"), } } } impl error::Error for Error where - T: Sink + TryStream, + T: Sink + TryStream + Unpin, S: Service<::Ok>, >::Error: error::Error, ::Error: error::Error, @@ -93,6 +112,7 @@ where Error::BrokenTransportSend(ref se) => Some(se), Error::BrokenTransportRecv(ref se) => Some(se), Error::Service(ref se) => Some(se), + Error::CompletedOrErrored => None, } } @@ -102,13 +122,14 @@ where Error::BrokenTransportSend(ref se) => se.description(), Error::BrokenTransportRecv(ref se) => se.description(), Error::Service(ref se) => se.description(), + Error::CompletedOrErrored => "Completed or errored future", } } } impl Error where - T: Sink + TryStream, + T: Sink + TryStream + Unpin, S: Service<::Ok>, { fn from_sink_error(e: >::Error) -> Self { @@ -124,9 +145,112 @@ where } } +impl From>> for Error +where + T: Sink + TryStream + Unpin, + S: Service<::Ok>, + >::Error: error::Error, + ::Error: error::Error, + S::Error: error::Error, +{ + fn from(source: ErrorWithInner>) -> Self { + let ErrorWithInner { error, .. } = source; + error + } +} + +trait MapErrWithInner +where E: fmt::Display + fmt::Debug + error::Error, +{ + fn map_err_with_inner(self, inner: &mut Inner) + -> Result>; +} + +impl MapErrWithInner for Result +where E: fmt::Display + fmt::Debug + error::Error, +{ + fn map_err_with_inner(self, inner: &mut Inner) + -> Result> + { + match self { + Ok(t) => Ok(t), + Err(e) => { + let (transport, service) = inner.take(); + Err(ErrorWithInner { error: e, transport, service }) + } + } + } +} + +trait MapPollErrWithInner +where E: fmt::Display + fmt::Debug + error::Error, +{ + fn map_err_with_inner(self, inner: &mut Inner) + -> Poll>>; +} + +impl MapPollErrWithInner for Poll> +where E: fmt::Display + fmt::Debug + error::Error, +{ + fn map_err_with_inner(self, inner: &mut Inner) + -> Poll>> + { + match self { + Poll::Ready(res) => Poll::Ready(res.map_err_with_inner(inner)), + Poll::Pending => Poll::Pending, + } + } +} + +/// Error type encapsulates the inner transport and service as well as the +/// error. +pub struct ErrorWithInner +{ + /// Wrapped error + pub error: E, + /// Inner transport + pub transport: Option, + /// Inner service + pub service: Option, +} + +impl fmt::Display for ErrorWithInner +where + E: fmt::Display + error::Error, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + ::fmt(&self.error, f) + } +} + +impl fmt::Debug for ErrorWithInner +where + E: fmt::Debug + error::Error, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + ::fmt(&self.error, f) + } +} + +impl error::Error for ErrorWithInner +where + E: error::Error, +{ + #[allow(deprecated)] + fn cause(&self) -> Option<&dyn error::Error> { + ::cause(&self.error) + } + + #[allow(deprecated)] + fn description(&self) -> &str { + ::description(&self.error) + } +} + + impl Server where - T: Sink + TryStream, + T: Sink + TryStream + Unpin, S: Service<::Ok>, { /// Construct a new [`Server`] over the given `transport` that services requests using the @@ -139,8 +263,7 @@ where pub fn new(transport: T, service: S) -> Self { Server { pending: FuturesOrdered::new(), - transport, - service, + inner: Inner { transport: Some(transport), service: Some(service) }, in_flight: 0, finish: false, } @@ -178,10 +301,13 @@ where impl Future for Server where - T: Sink + TryStream, + T: Sink + TryStream + Unpin, S: Service<::Ok>, + >::Error: error::Error, + ::Error: error::Error, + ::Ok>>::Error: error::Error, { - type Output = Result<(), Error>; + type Output = Result<(Option, Option), ErrorWithInner>>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let span = tracing::trace_span!("poll"); @@ -191,19 +317,31 @@ where // go through the deref so we can do partial borrows let this = self.project(); - // we never move transport or pending, nor do we ever hand out &mut to it - let mut transport: Pin<_> = this.transport; + // we never move pending, nor do we ever hand out &mut to it let mut pending: Pin<_> = this.pending; + let inner: &mut Inner = this.inner; + + if !inner.is_populated() { + return Poll::Ready( + Err(Error::CompletedOrErrored) + .map_err_with_inner(inner) + ) + } + // track how many times we have iterated let mut i = 0; loop { // first, poll pending futures to see if any have produced responses // note that we only poll for completed service futures if we can send the response - while let Poll::Ready(r) = transport.as_mut().poll_ready(cx) { + while let Poll::Ready(r) = Pin::new(inner.transport.as_mut().unwrap()).poll_ready(cx) { if let Err(e) = r { - return Poll::Ready(Err(Error::from_sink_error(e))); + return Poll::Ready( + Err(Error::from_sink_error(e)) + .map_err_with_inner(inner) + ); + //return Poll::Ready(Err(Error::from_sink_error(e))); } tracing::trace!( @@ -213,15 +351,18 @@ where ); match pending.as_mut().try_poll_next(cx) { Poll::Ready(Some(Err(e))) => { - return Poll::Ready(Err(Error::from_service_error(e))); + return Poll::Ready( + Err(Error::from_service_error(e)) + .map_err_with_inner(inner) + ); } Poll::Ready(Some(Ok(rsp))) => { tracing::trace!("transport.start_send"); // try to send the response! - transport - .as_mut() + Pin::new(inner.transport.as_mut().unwrap()) .start_send(rsp) - .map_err(Error::from_sink_error)?; + .map_err(Error::from_sink_error) + .map_err_with_inner(inner)?; *this.in_flight -= 1; } _ => { @@ -233,15 +374,16 @@ where // also try to make progress on sending tracing::trace!(finish = *this.finish, "transport.poll_flush"); - if let Poll::Ready(()) = transport + if let Poll::Ready(()) = Pin::new(inner.transport.as_mut().unwrap()) .as_mut() .poll_flush(cx) - .map_err(Error::from_sink_error)? + .map_err(Error::from_sink_error) + .map_err_with_inner(inner)? { if *this.finish && pending.as_mut().is_empty() { // there are no more requests // and we've finished all the work! - return Poll::Ready(Ok(())); + return Poll::Ready(Ok(inner.take())); } } @@ -262,16 +404,35 @@ where // is the service ready? tracing::trace!("service.poll_ready"); - ready!(this.service.poll_ready(cx)).map_err(Error::from_service_error)?; + ready!(inner + .service + .as_mut() + .unwrap() + .poll_ready(cx) + ).map_err(Error::from_service_error).map_err_with_inner(inner)?; tracing::trace!("transport.poll_next"); - let rq = ready!(transport.as_mut().try_poll_next(cx)) + let rq = ready!( + Pin::new( + inner + .transport + .as_mut() + .unwrap() + ).try_poll_next(cx)) .transpose() - .map_err(Error::from_stream_error)?; + .map_err(Error::from_stream_error) + .map_err_with_inner(inner)?; + if let Some(rq) = rq { // the service is ready, and we have another request! // you know what that means: - pending.push(this.service.call(rq)); + pending.push( + inner + .service + .as_mut() + .unwrap() + .call(rq) + ); *this.in_flight += 1; } else { // there are no more requests coming