Skip to content

Commit 1831119

Browse files
authored
feat(io): add duplex forwarding for BufReader/BufWriter (#695)
... and optimize repeat::read_vectored * feat(io): add duplex forwarding for BufReader and BufWriter * perf(io): add read_vectored fast path for repeat
1 parent 21160a1 commit 1831119

4 files changed

Lines changed: 328 additions & 8 deletions

File tree

compio-io/src/read/buf.rs

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut, IoVectoredBufMut, buf_try};
1+
use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut, buf_try};
22

3-
use crate::{AsyncRead, IoResult, buffer::Buffer, util::DEFAULT_BUF_SIZE};
3+
use crate::{AsyncRead, AsyncWrite, IoResult, buffer::Buffer, util::DEFAULT_BUF_SIZE};
44
/// # AsyncBufRead
55
///
66
/// Async read with buffered content.
@@ -34,6 +34,10 @@ impl<A: AsyncBufRead + ?Sized> AsyncBufRead for &mut A {
3434
/// times. It also provides no advantage when reading from a source that is
3535
/// already in memory, like a `Vec<u8>`.
3636
///
37+
/// If the underlying reader also implements [`AsyncWrite`], `BufReader<R>`
38+
/// forwards write operations directly to the inner writer without touching the
39+
/// read buffer.
40+
///
3741
/// When the `BufReader<R>` is dropped, the contents of its buffer will be
3842
/// discarded. Reading from the underlying reader after unwrapping the
3943
/// `BufReader<R>` with [`BufReader::into_inner`] can cause data loss.
@@ -112,6 +116,24 @@ impl<R: AsyncRead> AsyncBufRead for BufReader<R> {
112116
}
113117
}
114118

119+
impl<R: AsyncRead + AsyncWrite> AsyncWrite for BufReader<R> {
120+
async fn write<B: IoBuf>(&mut self, buf: B) -> BufResult<usize, B> {
121+
self.reader.write(buf).await
122+
}
123+
124+
async fn write_vectored<B: IoVectoredBuf>(&mut self, buf: B) -> BufResult<usize, B> {
125+
self.reader.write_vectored(buf).await
126+
}
127+
128+
async fn flush(&mut self) -> IoResult<()> {
129+
self.reader.flush().await
130+
}
131+
132+
async fn shutdown(&mut self) -> IoResult<()> {
133+
self.reader.shutdown().await
134+
}
135+
}
136+
115137
impl<R> IntoInner for BufReader<R> {
116138
type Inner = R;
117139

compio-io/src/util/repeat.rs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::mem::MaybeUninit;
22

3-
use compio_buf::BufResult;
3+
use compio_buf::{BufResult, IoVectoredBufMut};
44

55
use crate::{AsyncBufRead, AsyncRead, IoResult};
66

@@ -35,10 +35,26 @@ impl AsyncRead for Repeat {
3535

3636
let len = slice.len();
3737
slice.fill(MaybeUninit::new(self.0));
38+
// SAFETY: we just initialized exactly `len` bytes in `buf` from index 0.
3839
unsafe { buf.advance_to(len) };
3940

4041
BufResult(Ok(len), buf)
4142
}
43+
44+
async fn read_vectored<V: IoVectoredBufMut>(&mut self, mut buf: V) -> BufResult<usize, V> {
45+
let mut len: usize = 0;
46+
for slice in buf.iter_uninit_slice() {
47+
len = len
48+
.checked_add(slice.len())
49+
.expect("total vectored buffer length overflow");
50+
slice.fill(MaybeUninit::new(self.0));
51+
}
52+
debug_assert_eq!(len, buf.total_capacity());
53+
// SAFETY: every byte counted in `len` is initialized in the loop above.
54+
unsafe { buf.advance_vec_to(len) };
55+
56+
BufResult(Ok(len), buf)
57+
}
4258
}
4359

4460
impl AsyncBufRead for Repeat {

compio-io/src/write/buf.rs

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut, IoVectoredBuf, buf_try};
1+
use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut, buf_try};
22

33
use crate::{
4-
AsyncWrite, IoResult,
4+
AsyncBufRead, AsyncRead, AsyncWrite, IoResult,
55
buffer::Buffer,
66
util::{DEFAULT_BUF_SIZE, slice_to_buf},
77
};
@@ -18,6 +18,10 @@ use crate::{
1818
/// times. It also provides no advantage when writing to a destination that is
1919
/// in memory, like a `Vec<u8>`.
2020
///
21+
/// If the underlying writer also implements [`AsyncRead`] or [`AsyncBufRead`],
22+
/// `BufWriter<W>` forwards read operations directly to the inner reader without
23+
/// flushing the write buffer.
24+
///
2125
/// Dropping `BufWriter<W>` also discards any bytes left in the buffer, so it is
2226
/// critical to call [`flush`] before `BufWriter<W>` is dropped. Calling
2327
/// [`flush`] ensures that the buffer is empty and thus no data is lost.
@@ -116,6 +120,26 @@ impl<W: AsyncWrite> AsyncWrite for BufWriter<W> {
116120
}
117121
}
118122

123+
impl<W: AsyncRead + AsyncWrite> AsyncRead for BufWriter<W> {
124+
async fn read<B: IoBufMut>(&mut self, buf: B) -> BufResult<usize, B> {
125+
self.writer.read(buf).await
126+
}
127+
128+
async fn read_vectored<V: IoVectoredBufMut>(&mut self, buf: V) -> BufResult<usize, V> {
129+
self.writer.read_vectored(buf).await
130+
}
131+
}
132+
133+
impl<W: AsyncBufRead + AsyncWrite> AsyncBufRead for BufWriter<W> {
134+
async fn fill_buf(&mut self) -> IoResult<&'_ [u8]> {
135+
self.writer.fill_buf().await
136+
}
137+
138+
fn consume(&mut self, amount: usize) {
139+
self.writer.consume(amount)
140+
}
141+
}
142+
119143
impl<W> IntoInner for BufWriter<W> {
120144
type Inner = W;
121145

0 commit comments

Comments
 (0)