Skip to content

Commit f1d89d3

Browse files
authored
feat(io): copy-bidirectional (#800)
* feat(io): copy-bidirectional * feat(io): flush & shutdown after successful copy
1 parent 1965206 commit f1d89d3

5 files changed

Lines changed: 142 additions & 40 deletions

File tree

compio-fs/src/file.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -287,4 +287,13 @@ impl Splittable for &File {
287287
}
288288
}
289289

290+
impl Splittable for &mut File {
291+
type ReadHalf = File;
292+
type WriteHalf = File;
293+
294+
fn split(self) -> (Self::ReadHalf, Self::WriteHalf) {
295+
(self.clone(), self.clone())
296+
}
297+
}
298+
290299
impl_raw_fd!(File, std::fs::File, inner, file);

compio-io/src/util/copy.rs

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
use futures_util::future::join;
2+
3+
use crate::{
4+
AsyncRead, AsyncWrite, AsyncWriteExt, IoResult,
5+
util::{DEFAULT_BUF_SIZE, Splittable},
6+
};
7+
8+
/// Asynchronously copies the entire contents of a reader into a writer.
9+
///
10+
/// This function returns a future that will continuously read data from
11+
/// `reader` and then write it into `writer` in a streaming fashion until
12+
/// `reader` returns EOF or fails.
13+
///
14+
/// On success, the total number of bytes that were copied from `reader` to
15+
/// `writer` is returned.
16+
///
17+
/// This is an asynchronous version of [`std::io::copy`][std].
18+
///
19+
/// A heap-allocated copy buffer with 8 KiB is created to take data from the
20+
/// reader to the writer.
21+
pub async fn copy<R: AsyncRead, W: AsyncWrite>(reader: &mut R, writer: &mut W) -> IoResult<u64> {
22+
copy_with_size(reader, writer, DEFAULT_BUF_SIZE).await
23+
}
24+
25+
/// Asynchronously copies the entire contents of a reader into a writer with
26+
/// specified buffer sizes.
27+
///
28+
/// This function returns a future that will continuously read data from
29+
/// `reader` and then write it into `writer` in a streaming fashion until
30+
/// `reader` returns EOF or fails.
31+
///
32+
/// On success, the total number of bytes that were copied from `reader` to
33+
/// `writer` is returned.
34+
///
35+
/// This is an asynchronous version of [`std::io::copy`][std].
36+
pub async fn copy_with_size<R: AsyncRead, W: AsyncWrite>(
37+
reader: &mut R,
38+
writer: &mut W,
39+
buf_size: usize,
40+
) -> IoResult<u64> {
41+
let mut buf = Vec::with_capacity(buf_size);
42+
let mut total = 0u64;
43+
44+
loop {
45+
let res;
46+
(res, buf) = reader.read(buf).await.into();
47+
match res {
48+
Ok(0) => break,
49+
Ok(read) => {
50+
total += read as u64;
51+
}
52+
Err(e) if e.kind() == std::io::ErrorKind::Interrupted => {
53+
continue;
54+
}
55+
Err(e) => return Err(e),
56+
}
57+
let res;
58+
(res, buf) = writer.write_all(buf).await.into();
59+
res?;
60+
buf.clear();
61+
}
62+
63+
writer.flush().await?;
64+
writer.shutdown().await?;
65+
66+
Ok(total)
67+
}
68+
69+
/// Asynchronously copies data bidirectionally between two pairs of reader and
70+
/// writer.
71+
///
72+
/// This function takes two `Splittable` objects, `reader` and `writer`, and
73+
/// splits them into their respective read and write halves. It then
74+
/// concurrently copies data from the read half of `reader` to the write half of
75+
/// `writer`, and from the read half of `writer` to the write half of `reader`.
76+
/// The function returns a tuple containing the results of both copy operations,
77+
/// which indicate the total number of bytes copied in each direction or any
78+
/// errors that occurred during the copying process.
79+
pub async fn copy_bidirectional<A, B>(reader: A, writer: B) -> (IoResult<u64>, IoResult<u64>)
80+
where
81+
A: Splittable<ReadHalf: AsyncRead, WriteHalf: AsyncWrite>,
82+
B: Splittable<ReadHalf: AsyncRead, WriteHalf: AsyncWrite>,
83+
{
84+
let (mut ar, mut aw) = reader.split();
85+
let (mut br, mut bw) = writer.split();
86+
87+
join(copy(&mut ar, &mut bw), copy(&mut br, &mut aw)).await
88+
}
89+
90+
/// Asynchronously copies data bidirectionally between two pairs of reader and
91+
/// writer with specified buffer sizes.
92+
///
93+
/// This function is like `copy_bidirectional`, but allows you to specify the
94+
/// buffer sizes for each direction of copying.
95+
pub async fn copy_bidirectional_with_sizes<A, B>(
96+
reader: A,
97+
writer: B,
98+
a_to_b_size: usize,
99+
b_to_a_size: usize,
100+
) -> (IoResult<u64>, IoResult<u64>)
101+
where
102+
A: Splittable<ReadHalf: AsyncRead, WriteHalf: AsyncWrite>,
103+
B: Splittable<ReadHalf: AsyncRead, WriteHalf: AsyncWrite>,
104+
{
105+
let (mut ar, mut aw) = reader.split();
106+
let (mut br, mut bw) = writer.split();
107+
108+
join(
109+
copy_with_size(&mut ar, &mut bw, a_to_b_size),
110+
copy_with_size(&mut br, &mut aw, b_to_a_size),
111+
)
112+
.await
113+
}

compio-io/src/util/mod.rs

Lines changed: 2 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
//! IO related utilities functions for ease of use.
2-
use crate::{AsyncRead, AsyncWrite, AsyncWriteExt, IoResult};
2+
mod copy;
3+
pub use copy::*;
34

45
mod take;
56
pub use take::Take;
@@ -15,42 +16,3 @@ pub(crate) use internal::*;
1516

1617
pub mod split;
1718
pub use split::Splittable;
18-
19-
/// Asynchronously copies the entire contents of a reader into a writer.
20-
///
21-
/// This function returns a future that will continuously read data from
22-
/// `reader` and then write it into `writer` in a streaming fashion until
23-
/// `reader` returns EOF or fails.
24-
///
25-
/// On success, the total number of bytes that were copied from `reader` to
26-
/// `writer` is returned.
27-
///
28-
/// This is an asynchronous version of [`std::io::copy`][std].
29-
///
30-
/// A heap-allocated copy buffer with 8 KiB is created to take data from the
31-
/// reader to the writer.
32-
pub async fn copy<R: AsyncRead, W: AsyncWrite>(reader: &mut R, writer: &mut W) -> IoResult<u64> {
33-
let mut buf = Vec::with_capacity(DEFAULT_BUF_SIZE);
34-
let mut total = 0u64;
35-
36-
loop {
37-
let res;
38-
(res, buf) = reader.read(buf).await.into();
39-
match res {
40-
Ok(0) => break,
41-
Ok(read) => {
42-
total += read as u64;
43-
}
44-
Err(e) if e.kind() == std::io::ErrorKind::Interrupted => {
45-
continue;
46-
}
47-
Err(e) => return Err(e),
48-
}
49-
let res;
50-
(res, buf) = writer.write_all(buf).await.into();
51-
res?;
52-
buf.clear();
53-
}
54-
55-
Ok(total)
56-
}

compio-net/src/tcp.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -605,4 +605,13 @@ impl<'a> Splittable for &'a TcpStream {
605605
}
606606
}
607607

608+
impl<'a> Splittable for &'a mut TcpStream {
609+
type ReadHalf = ReadHalf<'a, TcpStream>;
610+
type WriteHalf = WriteHalf<'a, TcpStream>;
611+
612+
fn split(self) -> (Self::ReadHalf, Self::WriteHalf) {
613+
crate::split(self)
614+
}
615+
}
616+
608617
impl_raw_fd!(TcpStream, socket2::Socket, inner, socket);

compio-net/src/unix.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -524,6 +524,15 @@ impl<'a> Splittable for &'a UnixStream {
524524
}
525525
}
526526

527+
impl<'a> Splittable for &'a mut UnixStream {
528+
type ReadHalf = ReadHalf<'a, UnixStream>;
529+
type WriteHalf = WriteHalf<'a, UnixStream>;
530+
531+
fn split(self) -> (Self::ReadHalf, Self::WriteHalf) {
532+
crate::split(self)
533+
}
534+
}
535+
527536
impl_raw_fd!(UnixStream, socket2::Socket, inner, socket);
528537

529538
#[cfg(windows)]

0 commit comments

Comments
 (0)