Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions compio-driver/src/iour/op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl<
impl OpCode for OpenFile {
fn create_entry(self: Pin<&mut Self>) -> OpEntry {
opcode::OpenAt::new(Fd(libc::AT_FDCWD), self.path.as_ptr())
.flags(self.flags)
.flags(self.flags | libc::O_CLOEXEC)
.mode(self.mode)
.build()
.into()
Expand Down Expand Up @@ -262,9 +262,13 @@ impl OpCode for HardLink {

impl OpCode for CreateSocket {
fn create_entry(self: Pin<&mut Self>) -> OpEntry {
opcode::Socket::new(self.domain, self.socket_type, self.protocol)
.build()
.into()
opcode::Socket::new(
self.domain,
self.socket_type | libc::SOCK_CLOEXEC,
self.protocol,
)
.build()
.into()
}

fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
Expand Down Expand Up @@ -294,6 +298,7 @@ impl<S: AsRawFd> OpCode for Accept<S> {
&mut this.buffer as *mut sockaddr_storage as *mut libc::sockaddr,
&mut this.addr_len,
)
.flags(libc::SOCK_CLOEXEC)
.build()
.into()
}
Expand Down
119 changes: 108 additions & 11 deletions compio-driver/src/poll/op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{
ffi::CString,
io,
marker::PhantomPinned,
os::fd::{FromRawFd, OwnedFd},
os::fd::{FromRawFd, IntoRawFd, OwnedFd},
pin::Pin,
task::Poll,
};
Expand All @@ -20,7 +20,7 @@ use libc::open64 as open;
use libc::{pread, preadv, pwrite, pwritev};
#[cfg(any(target_os = "linux", target_os = "android", target_os = "hurd"))]
use libc::{pread64 as pread, preadv64 as preadv, pwrite64 as pwrite, pwritev64 as pwritev};
use socket2::SockAddr;
use socket2::{SockAddr, Socket as Socket2};

use super::{AsRawFd, Decision, OpCode, OpType, sockaddr_storage, socklen_t, syscall};
pub use crate::unix::op::*;
Expand Down Expand Up @@ -56,7 +56,7 @@ impl OpCode for OpenFile {
fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
Poll::Ready(Ok(syscall!(open(
self.path.as_ptr(),
self.flags,
self.flags | libc::O_CLOEXEC,
self.mode as libc::c_int
))? as _))
}
Expand Down Expand Up @@ -467,15 +467,73 @@ impl OpCode for HardLink {
}
}

impl CreateSocket {
unsafe fn call(self: Pin<&mut Self>) -> io::Result<libc::c_int> {
#[allow(unused_mut)]
let mut ty: i32 = self.socket_type;
#[cfg(any(
target_os = "android",
target_os = "dragonfly",
target_os = "freebsd",
target_os = "fuchsia",
target_os = "hurd",
target_os = "illumos",
target_os = "linux",
target_os = "netbsd",
target_os = "openbsd",
target_os = "cygwin",
))]
{
ty |= libc::SOCK_CLOEXEC | libc::SOCK_NONBLOCK;
}
let fd = syscall!(libc::socket(self.domain, ty, self.protocol))?;
let socket = Socket2::from_raw_fd(fd);
#[cfg(not(any(
target_os = "android",
target_os = "dragonfly",
target_os = "freebsd",
target_os = "fuchsia",
target_os = "hurd",
target_os = "illumos",
target_os = "linux",
target_os = "netbsd",
target_os = "openbsd",
target_os = "espidf",
target_os = "vita",
target_os = "cygwin",
)))]
socket.set_cloexec(true)?;
#[cfg(any(
target_os = "ios",
target_os = "macos",
target_os = "tvos",
target_os = "watchos",
))]
socket.set_nosigpipe(true)?;
#[cfg(not(any(
target_os = "android",
target_os = "dragonfly",
target_os = "freebsd",
target_os = "fuchsia",
target_os = "hurd",
target_os = "illumos",
target_os = "linux",
target_os = "netbsd",
target_os = "openbsd",
target_os = "cygwin",
)))]
socket.set_nonblocking(true)?;
Ok(socket.into_raw_fd())
}
}

impl OpCode for CreateSocket {
fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
Ok(Decision::Blocking)
}

fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
Poll::Ready(Ok(
syscall!(libc::socket(self.domain, self.socket_type, self.protocol))? as _,
))
Poll::Ready(Ok(unsafe { self.call()? } as _))
}
}

Expand Down Expand Up @@ -504,11 +562,50 @@ impl OpCode for CloseSocket {
impl<S: AsRawFd> Accept<S> {
unsafe fn call(self: Pin<&mut Self>) -> libc::c_int {
let this = self.get_unchecked_mut();
libc::accept(
this.fd.as_raw_fd(),
&mut this.buffer as *mut _ as *mut _,
&mut this.addr_len,
)
#[cfg(any(
target_os = "android",
target_os = "dragonfly",
target_os = "freebsd",
target_os = "fuchsia",
target_os = "illumos",
target_os = "linux",
target_os = "netbsd",
target_os = "openbsd",
target_os = "cygwin",
))]
{
libc::accept4(
this.fd.as_raw_fd(),
&mut this.buffer as *mut _ as *mut _,
&mut this.addr_len,
libc::SOCK_NONBLOCK | libc::SOCK_CLOEXEC,
)
}
#[cfg(not(any(
target_os = "android",
target_os = "dragonfly",
target_os = "freebsd",
target_os = "fuchsia",
target_os = "illumos",
target_os = "linux",
target_os = "netbsd",
target_os = "openbsd",
target_os = "cygwin",
)))]
{
|| -> io::Result<libc::c_int> {
let fd = syscall!(libc::accept(
this.fd.as_raw_fd(),
&mut this.buffer as *mut _ as *mut _,
&mut this.addr_len,
))?;
let socket = Socket2::from_raw_fd(fd);
socket.set_cloexec(true)?;
socket.set_nonblocking(true)?;
Ok(socket.into_raw_fd())
}()
.unwrap_or(-1)
}
}
}

Expand Down
59 changes: 1 addition & 58 deletions compio-net/src/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,41 +28,6 @@ pub struct Socket {

impl Socket {
pub fn from_socket2(socket: Socket2) -> io::Result<Self> {
#[cfg(unix)]
{
#[cfg(not(any(
target_os = "android",
target_os = "dragonfly",
target_os = "freebsd",
target_os = "fuchsia",
target_os = "hurd",
target_os = "illumos",
target_os = "linux",
target_os = "netbsd",
target_os = "openbsd",
target_os = "espidf",
target_os = "vita",
)))]
socket.set_cloexec(true)?;
#[cfg(any(
target_os = "ios",
target_os = "macos",
target_os = "tvos",
target_os = "watchos",
))]
socket.set_nosigpipe(true)?;
// On Linux we use blocking socket
// Newer kernels have the patch that allows to arm io_uring poll mechanism for
// non blocking socket when there is no connections in listen queue
//
// https://patchwork.kernel.org/project/linux-block/patch/f999615b-205c-49b7-b272-c4e42e45e09d@kernel.dk/#22949861
if cfg!(not(all(target_os = "linux", feature = "io-uring")))
|| compio_driver::DriverType::is_polling()
{
socket.set_nonblocking(true)?;
}
}

Ok(Self {
socket: Attacher::new(socket)?,
})
Expand Down Expand Up @@ -98,26 +63,9 @@ impl Socket {
pub async fn new(domain: Domain, ty: Type, protocol: Option<Protocol>) -> io::Result<Self> {
use std::os::fd::FromRawFd;

#[allow(unused_mut)]
let mut ty: i32 = ty.into();
#[cfg(any(
target_os = "android",
target_os = "dragonfly",
target_os = "freebsd",
target_os = "fuchsia",
target_os = "hurd",
target_os = "illumos",
target_os = "linux",
target_os = "netbsd",
target_os = "openbsd",
))]
{
ty |= libc::SOCK_CLOEXEC;
}

let op = CreateSocket::new(
domain.into(),
ty,
ty.into(),
protocol.map(|p| p.into()).unwrap_or_default(),
);
let BufResult(res, _) = compio_runtime::submit(op).await;
Expand Down Expand Up @@ -163,11 +111,6 @@ impl Socket {
let BufResult(res, op) = compio_runtime::submit(op).await;
let addr = op.into_addr();
let accept_sock = unsafe { Socket2::from_raw_fd(res? as _) };
if cfg!(not(all(target_os = "linux", feature = "io-uring")))
|| compio_driver::DriverType::is_polling()
{
accept_sock.set_nonblocking(true)?;
}
let accept_sock = Self::from_socket2(accept_sock)?;
Ok((accept_sock, addr))
}
Expand Down
Loading