Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 52 additions & 15 deletions quinn/src/send_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,19 +46,29 @@ impl SendStream {
}
}

/// Write bytes to the stream
/// Write a buffer into this stream, returning how many bytes were written
///
/// Yields the number of bytes written on success. Congestion and flow control may cause this to
/// be shorter than `buf.len()`, indicating that only a prefix of `buf` was written.
/// Unless this method errors, it waits until some amount of `buf` can be written into this
/// stream, and then writes as much as it can without waiting again. Due to congestion and flow
/// control, this may be shorter than `buf.len()`. On success this yields the length of the
/// prefix that was written.
///
/// This operation is cancel-safe.
/// # Cancel safety
///
/// This method is cancellation safe. If this does not resolve, no bytes were written.
pub async fn write(&mut self, buf: &[u8]) -> Result<usize, WriteError> {
poll_fn(|cx| self.execute_poll(cx, |s| s.write(buf))).await
}

/// Convenience method to write an entire buffer to the stream
/// Write a buffer into this stream in its entirety
///
/// This method repeatedly calls [`write`](Self::write) until all bytes are written, or an
/// error occurs.
///
/// # Cancel safety
///
/// This operation is *not* cancel-safe.
/// This method is *not* cancellation safe. Even if this does not resolve, some prefix of `buf`
/// may have been written when previously polled.
pub async fn write_all(&mut self, mut buf: &[u8]) -> Result<(), WriteError> {
while !buf.is_empty() {
let written = self.write(buf).await?;
Expand All @@ -67,28 +77,55 @@ impl SendStream {
Ok(())
}

/// Write chunks to the stream
/// Write a slice of [`Bytes`] into this stream, returning how much was written
///
/// Bytes to try to write are provided to this method as an array of cheaply cloneable chunks.
/// Unless this method errors, it waits until some amount of those bytes can be written into
/// this stream, and then writes as much as it can without waiting again. Due to congestion and
/// flow control, this may be less than the total number of bytes.
///
/// On success, this method both mutates `bufs` and yields an informative [`Written`] struct
/// indicating how much was written:
///
/// Yields the number of bytes and chunks written on success.
/// Congestion and flow control may cause this to be shorter than `buf.len()`,
/// indicating that only a prefix of `bufs` was written
/// - [`Bytes`] chunks that were fully written are mutated to be [empty](Bytes::is_empty).
/// - If a [`Bytes`] chunk was partially written, it is [split to](Bytes::split_to) contain
/// only the suffix of bytes that were not written.
/// - The yielded [`Written`] struct indicates how many chunks were fully written as well as
/// how many bytes were written.
///
/// This operation is cancel-safe.
/// # Cancel safety
///
/// This method is cancellation safe. If this does not resolve, no bytes were written.
pub async fn write_chunks(&mut self, bufs: &mut [Bytes]) -> Result<Written, WriteError> {
poll_fn(|cx| self.execute_poll(cx, |s| s.write_chunks(bufs))).await
}

/// Convenience method to write a single chunk in its entirety to the stream
/// Write a single [`Bytes`] into this stream in its entirety
///
/// Bytes to write are provided to this method as an single cheaply cloneable chunk. This
/// method repeatedly calls [`write_chunks`](Self::write_chunks) until all bytes are written,
/// or an error occurs.
///
/// # Cancel safety
///
/// This operation is *not* cancel-safe.
/// This method is *not* cancellation safe. Even if this does not resolve, some bytes may have
/// been written when previously polled.
pub async fn write_chunk(&mut self, buf: Bytes) -> Result<(), WriteError> {
self.write_all_chunks(&mut [buf]).await?;
Ok(())
}

/// Convenience method to write an entire list of chunks to the stream
/// Write a slice of [`Bytes`] into this stream in its entirety
///
/// Bytes to write are provided to this method as an array of cheaply cloneable chunks. This
/// method repeatedly calls [`write_chunks`](Self::write_chunks) until all bytes are written,
/// or an error occurs. This method mutates `bufs` by mutating all chunks to be
/// [empty](Bytes::is_empty).
///
/// # Cancel safety
///
/// This operation is *not* cancel-safe.
/// This method is *not* cancellation safe. Even if this does not resolve, some bytes may have
/// been written when previously polled.
pub async fn write_all_chunks(&mut self, mut bufs: &mut [Bytes]) -> Result<(), WriteError> {
while !bufs.is_empty() {
let written = self.write_chunks(bufs).await?;
Expand Down
Loading