forked from compio-rs/compio
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsplit.rs
More file actions
75 lines (59 loc) · 2.27 KB
/
split.rs
File metadata and controls
75 lines (59 loc) · 2.27 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
use std::io::{Read, Write};
use compio_buf::BufResult;
use compio_io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use compio_net::{TcpStream, UnixListener, UnixStream};
use compio_runtime::ResumeUnwind;
#[compio_macros::test]
async fn tcp_split() {
const MSG: &[u8] = b"split";
let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
let addr = listener.local_addr().unwrap();
let handle = compio_runtime::spawn_blocking(move || {
let (mut stream, _) = listener.accept().unwrap();
stream.write_all(MSG).unwrap();
let mut read_buf = [0u8; 32];
let read_len = stream.read(&mut read_buf).unwrap();
assert_eq!(&read_buf[..read_len], MSG);
});
let stream = TcpStream::connect(&addr).await.unwrap();
let (mut read_half, mut write_half) = stream.into_split();
let read_buf = [0u8; 32];
let (read_res, buf) = read_half.read(read_buf).await.unwrap();
assert_eq!(read_res, MSG.len());
assert_eq!(&buf[..MSG.len()], MSG);
write_half.write_all(MSG).await.unwrap();
handle.await.resume_unwind();
}
#[compio_macros::test]
async fn unix_split() {
let dir = tempfile::Builder::new()
.prefix("compio-uds-split-tests")
.tempdir()
.unwrap();
let sock_path = dir.path().join("connect.sock");
let listener = UnixListener::bind(&sock_path).await.unwrap();
let (client, (server, _)) =
futures_util::try_join!(UnixStream::connect(&sock_path), listener.accept()).unwrap();
let (mut a_read, mut a_write) = server.into_split();
let (mut b_read, mut b_write) = client.into_split();
let (a_response, b_response) = futures_util::future::try_join(
send_recv_all(&mut a_read, &mut a_write, b"A"),
send_recv_all(&mut b_read, &mut b_write, b"B"),
)
.await
.unwrap();
assert_eq!(a_response, b"B");
assert_eq!(b_response, b"A");
}
async fn send_recv_all<R: AsyncRead, W: AsyncWrite>(
read: &mut R,
write: &mut W,
input: &'static [u8],
) -> std::io::Result<Vec<u8>> {
write.write_all(input).await.0?;
write.shutdown().await?;
let output = Vec::with_capacity(2);
let BufResult(res, buf) = read.read_exact(output).await;
assert_eq!(res.unwrap_err().kind(), std::io::ErrorKind::UnexpectedEof);
Ok(buf)
}