Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion src/substream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,15 @@ impl Sink<Bytes> for Substream {
delegate_poll_ready!(&mut self.substream, cx);

if self.pending_out_bytes >= BACKPRESSURE_BOUNDARY {
return poll_flush!(&mut self.substream, cx).map_err(From::from);
// This attempts to empty 'pending_out_frames' into the socket.
match futures::Sink::poll_flush(self.as_mut(), cx) {
Poll::Ready(Ok(())) => {}
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
Poll::Pending => {
// Still flushing. We cannot accept new data yet.
return Poll::Pending;
}
}
Comment on lines +680 to +688
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this different from poll_flush!?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The poll_flush! should call into the socket / TCP under the hood via AsyncWrite::poll_flush.

By calling the futures::Sink::poll_flush we ensure our impl is always called and try to write any pending frames before calling poll_flush

}

Poll::Ready(Ok(()))
Expand Down Expand Up @@ -745,6 +753,10 @@ impl Sink<Bytes> for Substream {
Poll::Ready(Ok(nwritten)) => {
pending_frame.advance(nwritten);

// The number of pending bytes is reduced by the number of bytes written
// to ensure that backpressure is properly handled.
self.pending_out_bytes = self.pending_out_bytes.saturating_sub(nwritten);

if !pending_frame.is_empty() {
self.pending_out_frame = Some(pending_frame);
}
Expand Down
Loading