Skip to content

Commit 3ee37ca

Browse files
authored
feat(net,win): make socket & shutdown sync (#789)
1 parent 5d31fd6 commit 3ee37ca

4 files changed

Lines changed: 32 additions & 54 deletions

File tree

compio-driver/src/op.rs

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
//! The operation itself doesn't perform anything.
55
//! You need to pass them to [`crate::Proactor`], and poll the driver.
66
7-
use std::{io, marker::PhantomPinned, mem::ManuallyDrop, net::Shutdown};
7+
use std::{io, marker::PhantomPinned, mem::ManuallyDrop};
88

99
use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut, IoVectoredBuf, SetLen};
1010
use pin_project_lite::pin_project;
@@ -19,8 +19,8 @@ pub use crate::sys::op::{
1919
#[cfg(unix)]
2020
pub use crate::sys::op::{
2121
AcceptMulti, CreateDir, CreateSocket, CurrentDir, FileStat, HardLink, Interest, OpenFile,
22-
PathStat, PollOnce, ReadVectored, ReadVectoredAt, Rename, Stat, Symlink, TruncateFile, Unlink,
23-
WriteVectored, WriteVectoredAt,
22+
PathStat, PollOnce, ReadVectored, ReadVectoredAt, Rename, ShutdownSocket, Stat, Symlink,
23+
TruncateFile, Unlink, WriteVectored, WriteVectoredAt,
2424
};
2525
#[cfg(windows)]
2626
pub use crate::sys::op::{ConnectNamedPipe, DeviceIoControl};
@@ -446,19 +446,6 @@ impl<S> Sync<S> {
446446
}
447447
}
448448

449-
/// Shutdown a socket.
450-
pub struct ShutdownSocket<S> {
451-
pub(crate) fd: S,
452-
pub(crate) how: Shutdown,
453-
}
454-
455-
impl<S> ShutdownSocket<S> {
456-
/// Create [`ShutdownSocket`].
457-
pub fn new(fd: S, how: Shutdown) -> Self {
458-
Self { fd, how }
459-
}
460-
}
461-
462449
/// Close socket fd.
463450
pub struct CloseSocket {
464451
pub(crate) fd: ManuallyDrop<OwnedFd>,

compio-driver/src/sys/iocp/op.rs

Lines changed: 5 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ use std::sync::OnceLock;
33
use std::{
44
io,
55
marker::PhantomPinned,
6-
net::Shutdown,
76
os::windows::io::AsRawSocket,
87
pin::Pin,
98
ptr::{null, null_mut, read_unaligned},
@@ -24,11 +23,11 @@ use windows_sys::{
2423
},
2524
Networking::WinSock::{
2625
CMSGHDR, LPFN_ACCEPTEX, LPFN_CONNECTEX, LPFN_GETACCEPTEXSOCKADDRS, LPFN_WSARECVMSG,
27-
SD_BOTH, SD_RECEIVE, SD_SEND, SIO_GET_EXTENSION_FUNCTION_POINTER,
28-
SO_UPDATE_ACCEPT_CONTEXT, SO_UPDATE_CONNECT_CONTEXT, SOCKADDR, SOCKADDR_STORAGE,
29-
SOL_SOCKET, WSAID_ACCEPTEX, WSAID_CONNECTEX, WSAID_GETACCEPTEXSOCKADDRS,
30-
WSAID_WSARECVMSG, WSAIoctl, WSAMSG, WSARecv, WSARecvFrom, WSASend, WSASendMsg,
31-
WSASendTo, closesocket, setsockopt, shutdown, socklen_t,
26+
SIO_GET_EXTENSION_FUNCTION_POINTER, SO_UPDATE_ACCEPT_CONTEXT,
27+
SO_UPDATE_CONNECT_CONTEXT, SOCKADDR, SOCKADDR_STORAGE, SOL_SOCKET, WSAID_ACCEPTEX,
28+
WSAID_CONNECTEX, WSAID_GETACCEPTEXSOCKADDRS, WSAID_WSARECVMSG, WSAIoctl, WSAMSG,
29+
WSARecv, WSARecvFrom, WSASend, WSASendMsg, WSASendTo, closesocket, setsockopt,
30+
socklen_t,
3231
},
3332
Storage::FileSystem::{FlushFileBuffers, ReadFile, WriteFile},
3433
System::{
@@ -320,23 +319,6 @@ unsafe impl<S: AsFd> OpCode for Sync<S> {
320319
}
321320
}
322321

323-
unsafe impl<S: AsFd> OpCode for ShutdownSocket<S> {
324-
fn op_type(&self) -> OpType {
325-
OpType::Blocking
326-
}
327-
328-
unsafe fn operate(self: Pin<&mut Self>, _optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
329-
let how = match self.how {
330-
Shutdown::Write => SD_SEND,
331-
Shutdown::Read => SD_RECEIVE,
332-
Shutdown::Both => SD_BOTH,
333-
};
334-
Poll::Ready(Ok(
335-
syscall!(SOCKET, shutdown(self.fd.as_fd().as_raw_fd() as _, how))? as _,
336-
))
337-
}
338-
}
339-
340322
unsafe impl OpCode for CloseSocket {
341323
fn op_type(&self) -> OpType {
342324
OpType::Blocking

compio-driver/src/sys/unix_op.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -504,6 +504,19 @@ impl IntoInner for CreateSocket {
504504
}
505505
}
506506

507+
/// Shutdown a socket.
508+
pub struct ShutdownSocket<S> {
509+
pub(crate) fd: S,
510+
pub(crate) how: Shutdown,
511+
}
512+
513+
impl<S> ShutdownSocket<S> {
514+
/// Create [`ShutdownSocket`].
515+
pub fn new(fd: S, how: Shutdown) -> Self {
516+
Self { fd, how }
517+
}
518+
}
519+
507520
impl<S: AsFd> ShutdownSocket<S> {
508521
pub(crate) fn how(&self) -> i32 {
509522
match self.how {

compio-net/src/socket.rs

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,14 @@ use std::{
66

77
use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut, buf_try};
88
#[cfg(unix)]
9-
use compio_driver::op::CreateSocket;
9+
use compio_driver::op::{CreateSocket, ShutdownSocket};
1010
use compio_driver::{
1111
AsRawFd, OpCode, ToSharedFd, impl_raw_fd,
1212
op::{
1313
Accept, BufResultExt, CloseSocket, Connect, Recv, RecvFrom, RecvFromManaged,
1414
RecvFromVectored, RecvManaged, RecvMsg, RecvResultExt, RecvVectored, ResultTakeBuffer,
1515
Send, SendMsg, SendMsgZc, SendTo, SendToVectored, SendToVectoredZc, SendToZc, SendVectored,
16-
SendVectoredZc, SendZc, ShutdownSocket, VecBufResultExt,
16+
SendVectoredZc, SendZc, VecBufResultExt,
1717
},
1818
syscall,
1919
};
@@ -68,12 +68,7 @@ impl Socket {
6868

6969
#[cfg(windows)]
7070
pub async fn new(domain: Domain, ty: Type, protocol: Option<Protocol>) -> io::Result<Self> {
71-
use std::panic::resume_unwind;
72-
73-
let socket = compio_runtime::spawn_blocking(move || Socket2::new(domain, ty, protocol))
74-
.await
75-
.unwrap_or_else(|e| resume_unwind(e))?;
76-
Self::from_socket2(socket)
71+
Self::from_socket2(Socket2::new(domain, ty, protocol)?)
7772
}
7873

7974
#[cfg(unix)]
@@ -121,16 +116,10 @@ impl Socket {
121116

122117
#[cfg(windows)]
123118
pub async fn accept(&self) -> io::Result<(Self, SockAddr)> {
124-
use std::panic::resume_unwind;
125-
126119
let domain = self.local_addr()?.domain();
127-
// We should allow users sending this accepted socket to a new thread.
128120
let ty = self.socket.r#type()?;
129121
let protocol = self.socket.protocol()?;
130-
let accept_sock =
131-
compio_runtime::spawn_blocking(move || Socket2::new(domain, ty, protocol))
132-
.await
133-
.unwrap_or_else(|e| resume_unwind(e))?;
122+
let accept_sock = Socket2::new(domain, ty, protocol)?;
134123
let op = Accept::new(self.to_shared_fd(), accept_sock);
135124
let (_, op) = buf_try!(@try compio_runtime::submit(op).await);
136125
op.update_context()?;
@@ -161,12 +150,19 @@ impl Socket {
161150
}
162151
}
163152

153+
#[cfg(unix)]
164154
pub async fn shutdown(&self) -> io::Result<()> {
165155
let op = ShutdownSocket::new(self.to_shared_fd(), std::net::Shutdown::Write);
166156
compio_runtime::submit(op).await.0?;
167157
Ok(())
168158
}
169159

160+
#[cfg(windows)]
161+
pub async fn shutdown(&self) -> io::Result<()> {
162+
self.socket.shutdown(std::net::Shutdown::Write)?;
163+
Ok(())
164+
}
165+
170166
pub async fn recv<B: IoBufMut>(&self, buffer: B, flags: i32) -> BufResult<usize, B> {
171167
let fd = self.to_shared_fd();
172168
let op = Recv::new(fd, buffer, flags);

0 commit comments

Comments
 (0)