diff --git a/src/substream/mod.rs b/src/substream/mod.rs index 0603007a..bf39046c 100644 --- a/src/substream/mod.rs +++ b/src/substream/mod.rs @@ -677,7 +677,15 @@ impl Sink for Substream { delegate_poll_ready!(&mut self.substream, cx); if self.pending_out_bytes >= BACKPRESSURE_BOUNDARY { - return poll_flush!(&mut self.substream, cx).map_err(From::from); + // This attempts to empty 'pending_out_frames' into the socket. + match futures::Sink::poll_flush(self.as_mut(), cx) { + Poll::Ready(Ok(())) => {} + Poll::Ready(Err(e)) => return Poll::Ready(Err(e)), + Poll::Pending => { + // Still flushing. We cannot accept new data yet. + return Poll::Pending; + } + } } Poll::Ready(Ok(())) @@ -745,6 +753,10 @@ impl Sink for Substream { Poll::Ready(Ok(nwritten)) => { pending_frame.advance(nwritten); + // The number of pending bytes is reduced by the number of bytes written + // to ensure that backpressure is properly handled. + self.pending_out_bytes = self.pending_out_bytes.saturating_sub(nwritten); + if !pending_frame.is_empty() { self.pending_out_frame = Some(pending_frame); }