From f949d974e346eee46bbf1aa9eb3deae349771600 Mon Sep 17 00:00:00 2001 From: Yuyi Wang Date: Wed, 22 Apr 2026 18:13:06 +0800 Subject: [PATCH 1/4] feat(driver,net)!: accept with provided socket --- compio-driver/src/sys/op/socket/iocp.rs | 20 +++++++++----------- compio-net/src/incoming/windows.rs | 2 +- compio-net/src/socket/mod.rs | 15 ++++++++++----- compio-net/src/tcp.rs | 9 +++++++++ compio-net/src/unix.rs | 9 +++++++++ 5 files changed, 38 insertions(+), 17 deletions(-) diff --git a/compio-driver/src/sys/op/socket/iocp.rs b/compio-driver/src/sys/op/socket/iocp.rs index 2ed9662b..c480bd69 100644 --- a/compio-driver/src/sys/op/socket/iocp.rs +++ b/compio-driver/src/sys/op/socket/iocp.rs @@ -1,5 +1,3 @@ -use std::os::windows::io::AsRawSocket; - use rustix::net::RecvFlags; use windows_sys::Win32::{ Networking::WinSock::{ @@ -35,15 +33,15 @@ unsafe impl OpCode for CloseSocket { } /// Accept a connection. -pub struct Accept { +pub struct Accept { pub(crate) fd: S, - pub(crate) accept_fd: socket2::Socket, + pub(crate) accept_fd: SA, pub(crate) buffer: [u8; ACCEPT_BUFFER_SIZE], } -impl Accept { +impl Accept { /// Create [`Accept`]. `accept_fd` should not be bound. - pub fn new(fd: S, accept_fd: socket2::Socket) -> Self { + pub fn new(fd: S, accept_fd: SA) -> Self { Self { fd, accept_fd, @@ -52,14 +50,14 @@ impl Accept { } } -impl Accept { +impl Accept { /// Update accept context. pub fn update_context(&self) -> io::Result<()> { let fd = self.fd.as_fd().as_raw_fd(); syscall!( SOCKET, setsockopt( - self.accept_fd.as_raw_socket() as _, + self.accept_fd.as_fd().as_raw_fd() as _, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, &fd as *const _ as _, @@ -70,7 +68,7 @@ impl Accept { } /// Get the remote address from the inner buffer. - pub fn into_addr(self) -> io::Result<(socket2::Socket, SockAddr)> { + pub fn into_addr(self) -> io::Result<(SA, SockAddr)> { let get_addrs_fn = GET_ADDRS .get_or_try_init(|| { get_wsa_fn(self.fd.as_fd().as_raw_fd(), WSAID_GETACCEPTEXSOCKADDRS) @@ -109,7 +107,7 @@ impl Accept { } } -unsafe impl OpCode for Accept { +unsafe impl OpCode for Accept { type Control = (); unsafe fn operate(&mut self, _: &mut (), optr: *mut OVERLAPPED) -> Poll> { @@ -122,7 +120,7 @@ unsafe impl OpCode for Accept { let res = unsafe { accept_fn( self.fd.as_fd().as_raw_fd() as _, - self.accept_fd.as_raw_socket() as _, + self.accept_fd.as_fd().as_raw_fd() as _, self.buffer.sys_slice_mut().ptr() as _, 0, ACCEPT_ADDR_BUFFER_SIZE as _, diff --git a/compio-net/src/incoming/windows.rs b/compio-net/src/incoming/windows.rs index 510d8df7..d760480f 100644 --- a/compio-net/src/incoming/windows.rs +++ b/compio-net/src/incoming/windows.rs @@ -14,7 +14,7 @@ use crate::Socket; pub struct Incoming<'a> { listener: &'a Socket, - state: Option>>>, + state: Option, Socket2>>>, } impl<'a> Incoming<'a> { diff --git a/compio-net/src/socket/mod.rs b/compio-net/src/socket/mod.rs index 861f9ba9..1238a325 100644 --- a/compio-net/src/socket/mod.rs +++ b/compio-net/src/socket/mod.rs @@ -12,7 +12,8 @@ use compio_buf::{ #[cfg(unix)] use compio_driver::op::{Bind, CreateSocket, Listen, ShutdownSocket}; use compio_driver::{ - AsRawFd, BufferRef, OpCode, RawFd, ResultTakeBuffer, SharedFd, TakeBuffer, ToSharedFd, + AsFd, AsRawFd, BorrowedFd, BufferRef, OpCode, RawFd, ResultTakeBuffer, SharedFd, TakeBuffer, + ToSharedFd, op::{ Accept, BufResultExt, CloseSocket, Connect, Recv, RecvFlags, RecvFrom, RecvFromManaged, RecvFromMulti, RecvFromMultiResult, RecvFromVectored, RecvManaged, RecvMsg, RecvMsgManaged, @@ -168,11 +169,16 @@ impl Socket { let ty = self.socket.r#type()?; let protocol = self.socket.protocol()?; let accept_sock = Socket2::new(domain, ty, protocol)?; + self.accept_with(Self::from_socket2(accept_sock)?).await + } + + #[cfg(windows)] + pub async fn accept_with(&self, accept_sock: Self) -> io::Result<(Self, SockAddr)> { let op = Accept::new(self.to_shared_fd(), accept_sock); let (_, op) = buf_try!(@try compio_runtime::submit(op).await); op.update_context()?; let (accept_sock, addr) = op.into_addr()?; - Ok((Self::from_socket2(accept_sock)?, addr)) + Ok((accept_sock, addr)) } pub fn incoming(&self) -> Incoming<'_> { @@ -642,9 +648,8 @@ impl AsRawFd for Socket { } } -#[cfg(unix)] -impl std::os::fd::AsFd for Socket { - fn as_fd(&self) -> std::os::fd::BorrowedFd<'_> { +impl AsFd for Socket { + fn as_fd(&self) -> BorrowedFd<'_> { self.socket.as_fd() } } diff --git a/compio-net/src/tcp.rs b/compio-net/src/tcp.rs index 7222f364..d82fb349 100644 --- a/compio-net/src/tcp.rs +++ b/compio-net/src/tcp.rs @@ -119,6 +119,15 @@ impl TcpListener { Ok((stream, addr.as_socket().expect("should be SocketAddr"))) } + /// Accepts a new incoming connection from this listener using the provided + /// socket. + #[cfg(windows)] + pub async fn accept_with(&self, sock: TcpSocket) -> io::Result<(TcpStream, SocketAddr)> { + let (socket, addr) = self.inner.accept_with(sock.inner).await?; + let stream = TcpStream { inner: socket }; + Ok((stream, addr.as_socket().expect("should be SocketAddr"))) + } + /// Returns a stream of incoming connections to this listener. /// /// ## Platform specific diff --git a/compio-net/src/unix.rs b/compio-net/src/unix.rs index 4c9b9ca5..ba12eafa 100644 --- a/compio-net/src/unix.rs +++ b/compio-net/src/unix.rs @@ -116,6 +116,15 @@ impl UnixListener { Ok((stream, addr)) } + /// Accepts a new incoming connection from this listener using the provided + /// socket. + #[cfg(windows)] + pub async fn accept_with(&self, sock: UnixSocket) -> io::Result<(UnixStream, SockAddr)> { + let (socket, addr) = self.inner.accept_with(sock.inner).await?; + let stream = UnixStream { inner: socket }; + Ok((stream, addr)) + } + /// Returns a stream of incoming connections to this listener. /// /// ## Platform specific From 85e01e9d162af91e24aa9154ff57d2f68c26e5e6 Mon Sep 17 00:00:00 2001 From: Yuyi Wang Date: Wed, 22 Apr 2026 21:14:32 +0800 Subject: [PATCH 2/4] feat(driver,net,iocp): disconnect socket and reuse --- compio-driver/src/sys/op/socket/iocp.rs | 43 +++++++++++++++++++++---- compio-net/src/socket/mod.rs | 8 +++++ compio-net/src/tcp.rs | 8 +++++ compio-net/src/unix.rs | 8 +++++ compio-net/tests/tcp_disconnect.rs | 28 ++++++++++++++++ 5 files changed, 89 insertions(+), 6 deletions(-) create mode 100644 compio-net/tests/tcp_disconnect.rs diff --git a/compio-driver/src/sys/op/socket/iocp.rs b/compio-driver/src/sys/op/socket/iocp.rs index c480bd69..96add8d5 100644 --- a/compio-driver/src/sys/op/socket/iocp.rs +++ b/compio-driver/src/sys/op/socket/iocp.rs @@ -1,11 +1,11 @@ use rustix::net::RecvFlags; use windows_sys::Win32::{ Networking::WinSock::{ - LPFN_ACCEPTEX, LPFN_CONNECTEX, LPFN_GETACCEPTEXSOCKADDRS, LPFN_WSARECVMSG, - SO_UPDATE_ACCEPT_CONTEXT, SO_UPDATE_CONNECT_CONTEXT, SOCKADDR, SOCKADDR_STORAGE, - SOL_SOCKET, WSAID_ACCEPTEX, WSAID_CONNECTEX, WSAID_GETACCEPTEXSOCKADDRS, WSAID_WSARECVMSG, - WSAMSG, WSARecv, WSARecvFrom, WSASend, WSASendMsg, WSASendTo, closesocket, setsockopt, - socklen_t, + LPFN_ACCEPTEX, LPFN_CONNECTEX, LPFN_DISCONNECTEX, LPFN_GETACCEPTEXSOCKADDRS, + LPFN_WSARECVMSG, SO_UPDATE_ACCEPT_CONTEXT, SO_UPDATE_CONNECT_CONTEXT, SOCKADDR, + SOCKADDR_STORAGE, SOL_SOCKET, TF_REUSE_SOCKET, WSAID_ACCEPTEX, WSAID_CONNECTEX, + WSAID_DISCONNECTEX, WSAID_GETACCEPTEXSOCKADDRS, WSAID_WSARECVMSG, WSAMSG, WSARecv, + WSARecvFrom, WSASend, WSASendMsg, WSASendTo, closesocket, setsockopt, socklen_t, }, System::IO::OVERLAPPED, }; @@ -185,7 +185,38 @@ unsafe impl OpCode for Connect { } } -/// Receive data from remote. +/// Disconnect a connected socket and reuse it for another connection. +pub struct Disconnect { + pub(crate) fd: S, +} + +impl Disconnect { + /// Create [`Disconnect`]. + pub fn new(fd: S) -> Self { + Self { fd } + } +} + +static DISCONNECT_EX: OnceLock = OnceLock::new(); + +unsafe impl OpCode for Disconnect { + type Control = (); + + unsafe fn operate(&mut self, _: &mut (), optr: *mut OVERLAPPED) -> Poll> { + let disconnect_fn = DISCONNECT_EX + .get_or_try_init(|| get_wsa_fn(self.fd.as_fd().as_raw_fd(), WSAID_DISCONNECTEX))? + .ok_or_else(|| { + io::Error::new(io::ErrorKind::Unsupported, "cannot retrieve DisconnectEx") + })?; + let res = + unsafe { disconnect_fn(self.fd.as_fd().as_raw_fd() as _, optr, TF_REUSE_SOCKET, 0) }; + win32_result(res, 0) + } + + fn cancel(&mut self, _: &mut (), optr: *mut OVERLAPPED) -> io::Result<()> { + cancel(self.fd.as_fd().as_raw_fd(), optr) + } +} #[derive(Default)] #[doc(hidden)] diff --git a/compio-net/src/socket/mod.rs b/compio-net/src/socket/mod.rs index 1238a325..5f65d02e 100644 --- a/compio-net/src/socket/mod.rs +++ b/compio-net/src/socket/mod.rs @@ -9,6 +9,8 @@ use std::{ use compio_buf::{ BufResult, IntoInner, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut, SetLen, buf_try, }; +#[cfg(windows)] +use compio_driver::op::Disconnect; #[cfg(unix)] use compio_driver::op::{Bind, CreateSocket, Listen, ShutdownSocket}; use compio_driver::{ @@ -154,6 +156,12 @@ impl Socket { Ok(()) } + #[cfg(windows)] + pub async fn disconnect(&self) -> io::Result<()> { + let op = Disconnect::new(self.to_shared_fd()); + compio_runtime::submit(op).await.0.map(|_| ()) + } + #[cfg(unix)] pub async fn accept(&self) -> io::Result<(Self, SockAddr)> { let op = Accept::new(self.to_shared_fd()); diff --git a/compio-net/src/tcp.rs b/compio-net/src/tcp.rs index d82fb349..cfc96d5b 100644 --- a/compio-net/src/tcp.rs +++ b/compio-net/src/tcp.rs @@ -343,6 +343,14 @@ impl TcpStream { self.inner.into_poll_fd() } + /// Close the connection of the socket, and reuse it to create a new + /// connection. + #[cfg(windows)] + pub async fn disconnect(self) -> io::Result { + self.inner.disconnect().await?; + Ok(TcpSocket { inner: self.inner }) + } + /// Gets the value of the `TCP_NODELAY` option on this socket. /// /// For more information about this option, see diff --git a/compio-net/src/unix.rs b/compio-net/src/unix.rs index ba12eafa..0a405686 100644 --- a/compio-net/src/unix.rs +++ b/compio-net/src/unix.rs @@ -297,6 +297,14 @@ impl UnixStream { self.inner.into_poll_fd() } + /// Close the connection of the socket, and reuse it to create a new + /// connection. + #[cfg(windows)] + pub async fn disconnect(self) -> io::Result { + self.inner.disconnect().await?; + Ok(UnixSocket { inner: self.inner }) + } + /// Signifies whether the underlying socket was non-empty after the last /// receive operation. /// diff --git a/compio-net/tests/tcp_disconnect.rs b/compio-net/tests/tcp_disconnect.rs new file mode 100644 index 00000000..091fe88f --- /dev/null +++ b/compio-net/tests/tcp_disconnect.rs @@ -0,0 +1,28 @@ +#![cfg(windows)] + +use compio_io::AsyncWrite; +use compio_net::{TcpListener, TcpStream}; +use compio_runtime::ResumeUnwind; + +#[test] +fn disconnect() { + compio_runtime::Runtime::new().unwrap().block_on(async { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + let task = compio_runtime::spawn(async move { + let (socket, _) = listener.accept().await.unwrap(); + let socket = socket.disconnect().await.unwrap(); + let (mut socket, _) = listener.accept_with(socket).await.unwrap(); + socket.shutdown().await.unwrap(); + socket.close().await.unwrap(); + }); + + for _i in 0..2 { + let mut client = TcpStream::connect(addr).await.unwrap(); + client.shutdown().await.unwrap(); + client.close().await.unwrap(); + } + + task.await.resume_unwind().expect("shouldn't be cancelled"); + }) +} From 33ec12a9c25277a9762e515fecefdcc651aba83c Mon Sep 17 00:00:00 2001 From: Yuyi Wang Date: Wed, 22 Apr 2026 21:33:29 +0800 Subject: [PATCH 3/4] docs(net): disconnect --- compio-net/src/tcp.rs | 6 +++++- compio-net/src/unix.rs | 4 +++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/compio-net/src/tcp.rs b/compio-net/src/tcp.rs index cfc96d5b..2a09434f 100644 --- a/compio-net/src/tcp.rs +++ b/compio-net/src/tcp.rs @@ -344,7 +344,9 @@ impl TcpStream { } /// Close the connection of the socket, and reuse it to create a new - /// connection. + /// connection. This method is useful when the socket is created by + /// [`TcpListener::accept`], and will be reused in + /// [`TcpListener::accept_with`] to accept a new connection. #[cfg(windows)] pub async fn disconnect(self) -> io::Result { self.inner.disconnect().await?; @@ -1082,6 +1084,8 @@ impl TcpSocket { /// The [`TcpSocket`] is consumed. Once the connection is established, a /// connected [`TcpStream`] is returned. If the connection fails, the /// encountered error is returned. + /// + /// On Windows, the socket should be bound to an address before connecting. pub async fn connect(self, addr: SocketAddr) -> io::Result { self.inner.connect_async(&addr.into()).await?; Ok(TcpStream { inner: self.inner }) diff --git a/compio-net/src/unix.rs b/compio-net/src/unix.rs index 0a405686..0a588a48 100644 --- a/compio-net/src/unix.rs +++ b/compio-net/src/unix.rs @@ -298,7 +298,9 @@ impl UnixStream { } /// Close the connection of the socket, and reuse it to create a new - /// connection. + /// connection. This method is useful when the socket is created by + /// [`UnixListener::accept`], and will be reused in + /// [`UnixListener::accept_with`] to accept a new connection. #[cfg(windows)] pub async fn disconnect(self) -> io::Result { self.inner.disconnect().await?; From 264082ab0dd7bff9cb2b18e61c515477d8cc6d00 Mon Sep 17 00:00:00 2001 From: Yuyi Wang Date: Wed, 22 Apr 2026 22:12:52 +0800 Subject: [PATCH 4/4] test(net): connect reuse --- compio-net/tests/tcp_disconnect.rs | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/compio-net/tests/tcp_disconnect.rs b/compio-net/tests/tcp_disconnect.rs index 091fe88f..89245654 100644 --- a/compio-net/tests/tcp_disconnect.rs +++ b/compio-net/tests/tcp_disconnect.rs @@ -26,3 +26,29 @@ fn disconnect() { task.await.resume_unwind().expect("shouldn't be cancelled"); }) } + +#[test] +fn reuse() { + compio_runtime::Runtime::new().unwrap().block_on(async { + let listener1 = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr1 = listener1.local_addr().unwrap(); + let listener2 = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr2 = listener2.local_addr().unwrap(); + + let task = compio_runtime::spawn(async move { + let (socket, _) = listener1.accept().await.unwrap(); + let socket = socket.disconnect().await.unwrap(); + let (mut socket, _) = listener2.accept_with(socket).await.unwrap(); + socket.shutdown().await.unwrap(); + socket.close().await.unwrap(); + }); + + let client = TcpStream::connect(addr1).await.unwrap(); + let client = client.disconnect().await.unwrap(); + let mut client = client.connect(addr2).await.unwrap(); + client.shutdown().await.unwrap(); + client.close().await.unwrap(); + + task.await.resume_unwind().expect("shouldn't be cancelled"); + }) +}