Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
/target
.idea
44 changes: 31 additions & 13 deletions src/transport/webrtc/substream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ use crate::{
use bytes::{Buf, BufMut, BytesMut};
use futures::{Future, Stream};
use parking_lot::Mutex;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::mpsc::{channel, OwnedPermit, Receiver, Sender};

use futures::future::BoxFuture;
use std::{
pin::Pin,
sync::Arc,
Expand Down Expand Up @@ -72,6 +73,11 @@ pub struct Substream {

/// RX channel for receiving messages from `peer`.
rx: Receiver<Event>,

/// Future that resolves when a permit to send on `tx` is available.
permit_fut: Option<
BoxFuture<'static, Result<OwnedPermit<Event>, tokio::sync::mpsc::error::SendError<()>>>,
>,
}

impl Substream {
Expand All @@ -92,6 +98,7 @@ impl Substream {
tx: outbound_tx,
rx: inbound_rx,
read_buffer: BytesMut::new(),
permit_fut: None,
},
handle,
)
Expand Down Expand Up @@ -190,29 +197,40 @@ impl tokio::io::AsyncRead for Substream {

impl tokio::io::AsyncWrite for Substream {
fn poll_write(
self: Pin<&mut Self>,
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, std::io::Error>> {
if let State::SendClosed = *self.state.lock() {
return Poll::Ready(Err(std::io::ErrorKind::BrokenPipe.into()));
}

// TODO: try to coalesce multiple calls to `poll_write()` into single `Event::Message`
loop {
if self.permit_fut.is_none() {
let tx = self.tx.clone();
// We use reserve_owned so the future owns the handle and doesn't borrow `self`
self.permit_fut = Some(Box::pin(async move { tx.reserve_owned().await }));
}

let num_bytes = std::cmp::min(MAX_FRAME_SIZE, buf.len());
let future = self.tx.reserve();
futures::pin_mut!(future);
match self.permit_fut.as_mut().unwrap().as_mut().poll(cx) {
Poll::Pending => return Poll::Pending, // Future stays alive in `self`!
Poll::Ready(result) => {
self.permit_fut = None;

let permit = match futures::ready!(future.poll(cx)) {
Err(_) => return Poll::Ready(Err(std::io::ErrorKind::BrokenPipe.into())),
Ok(permit) => permit,
};
let permit = match result {
Ok(p) => p,
Err(_) => return Poll::Ready(Err(std::io::ErrorKind::BrokenPipe.into())),
};

let num_bytes = std::cmp::min(MAX_FRAME_SIZE, buf.len());
let frame = buf[..num_bytes].to_vec();

let frame = buf[..num_bytes].to_vec();
permit.send(Event::Message(frame));
permit.send(Event::Message(frame));

Poll::Ready(Ok(num_bytes))
return Poll::Ready(Ok(num_bytes));
}
}
}
}

fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
Expand Down