Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
/target
.idea
21 changes: 12 additions & 9 deletions src/transport/webrtc/substream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::{
use bytes::{Buf, BufMut, BytesMut};
use futures::{Future, Stream};
use parking_lot::Mutex;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::mpsc::{channel, error::TrySendError, Receiver, Sender};

use std::{
pin::Pin,
Expand Down Expand Up @@ -198,17 +198,20 @@ impl tokio::io::AsyncWrite for Substream {
return Poll::Ready(Err(std::io::ErrorKind::BrokenPipe.into()));
}

// TODO: try to coalesce multiple calls to `poll_write()` into single `Event::Message`

let num_bytes = std::cmp::min(MAX_FRAME_SIZE, buf.len());
let future = self.tx.reserve();
futures::pin_mut!(future);

let permit = match futures::ready!(future.poll(cx)) {
Err(_) => return Poll::Ready(Err(std::io::ErrorKind::BrokenPipe.into())),
let permit = match self.tx.try_reserve() {
Ok(permit) => permit,
Err(err) =>
return match err {
TrySendError::Full(_) => {
cx.waker().wake_by_ref();
Poll::Pending
},
TrySendError::Closed(_) =>
Poll::Ready(Err(std::io::ErrorKind::BrokenPipe.into())),
},
};

let num_bytes = std::cmp::min(MAX_FRAME_SIZE, buf.len());
let frame = buf[..num_bytes].to_vec();
permit.send(Event::Message(frame));

Expand Down