Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions compio-driver/src/op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ pub use crate::sys::op::{
Accept, Recv, RecvFrom, RecvFromVectored, RecvMsg, RecvVectored, Send, SendMsg, SendTo,
SendToVectored, SendVectored,
};
#[cfg(windows)]
pub use crate::sys::op::{ConnectNamedPipe, DeviceIoControl};
#[cfg(unix)]
pub use crate::sys::op::{
CreateDir, CreateSocket, CurrentDir, FileStat, HardLink, Interest, OpenFile, PathStat,
PollOnce, ReadVectored, ReadVectoredAt, Rename, Stat, Symlink, TruncateFile, Unlink,
AcceptMulti, CreateDir, CreateSocket, CurrentDir, FileStat, HardLink, Interest, OpenFile,
PathStat, PollOnce, ReadVectored, ReadVectoredAt, Rename, Stat, Symlink, TruncateFile, Unlink,
WriteVectored, WriteVectoredAt,
};
#[cfg(windows)]
pub use crate::sys::op::{ConnectNamedPipe, DeviceIoControl};
#[cfg(io_uring)]
pub use crate::sys::op::{
ReadManaged, ReadManagedAt, ReadMulti, ReadMultiAt, RecvFromManaged, RecvManaged, RecvMulti,
Expand Down Expand Up @@ -665,7 +665,9 @@ pub(crate) mod managed {
}

#[cfg(not(io_uring))]
pub use managed::*;
pub use managed::{
ReadManaged, ReadManagedAt, ReadMulti, ReadMultiAt, RecvFromManaged, RecvManaged, RecvMulti,
};

bitflags::bitflags! {
/// Flags for operations.
Expand Down
1 change: 1 addition & 0 deletions compio-driver/src/sys/fusion/op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ mod iour { pub use crate::sys::iour::{op::*, OpCode}; }
#[rustfmt::skip]
mod poll { pub use crate::sys::poll::{op::*, OpCode}; }

op!(<S: AsFd> AcceptMulti(fd: S));
op!(<T: IoBufMut, S: AsFd> RecvFrom(fd: S, buffer: T, flags: i32));
op!(<T: IoBuf, S: AsFd> SendTo(fd: S, buffer: T, addr: SockAddr, flags: i32));
op!(<T: IoVectoredBufMut, S: AsFd> RecvFromVectored(fd: S, buffer: T, flags: i32));
Expand Down
79 changes: 78 additions & 1 deletion compio-driver/src/sys/iour/op.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use std::{
collections::VecDeque,
ffi::CString,
io,
marker::PhantomPinned,
os::fd::{AsFd, AsRawFd, FromRawFd, OwnedFd},
os::fd::{AsFd, AsRawFd, FromRawFd, IntoRawFd, OwnedFd},
pin::Pin,
};

Expand Down Expand Up @@ -557,6 +558,82 @@ unsafe impl<S: AsFd> OpCode for Accept<S> {
}
}

struct AcceptMultishotResult {
res: io::Result<Socket2>,
extra: crate::Extra,
}

impl AcceptMultishotResult {
pub unsafe fn new(res: io::Result<usize>, extra: crate::Extra) -> Self {
Self {
res: res.map(|fd| unsafe { Socket2::from_raw_fd(fd as _) }),
extra,
}
}

pub fn into_result(self) -> BufResult<usize, crate::Extra> {
BufResult(self.res.map(|fd| fd.into_raw_fd() as _), self.extra)
}
}

pin_project! {
/// Accept multiple connections.
pub struct AcceptMulti<S> {
#[pin]
pub(crate) op: Accept<S>,
multishots: VecDeque<AcceptMultishotResult>
}
}

impl<S> AcceptMulti<S> {
/// Create [`AcceptMulti`].
pub fn new(fd: S) -> Self {
Self {
op: Accept::new(fd),
multishots: VecDeque::new(),
}
}
}

unsafe impl<S: AsFd> OpCode for AcceptMulti<S> {
fn create_entry(self: Pin<&mut Self>) -> OpEntry {
let this = self.project();
opcode::AcceptMulti::new(Fd(this.op.fd.as_fd().as_raw_fd()))
.flags(libc::SOCK_CLOEXEC)
.build()
.into()
Comment thread
Berrysoft marked this conversation as resolved.
}

fn create_entry_fallback(self: Pin<&mut Self>) -> OpEntry {
self.project().op.create_entry()
}

unsafe fn set_result(self: Pin<&mut Self>, res: &io::Result<usize>, extra: &crate::Extra) {
unsafe { self.project().op.set_result(res, extra) }
}

unsafe fn push_multishot(self: Pin<&mut Self>, res: io::Result<usize>, extra: crate::Extra) {
self.project()
.multishots
.push_back(unsafe { AcceptMultishotResult::new(res, extra) });
}

fn pop_multishot(self: Pin<&mut Self>) -> Option<BufResult<usize, crate::sys::Extra>> {
self.project()
.multishots
.pop_front()
.map(|res| res.into_result())
}
}

impl<S> IntoInner for AcceptMulti<S> {
type Inner = Socket2;

fn into_inner(self) -> Self::Inner {
self.op.into_inner().0
}
}

unsafe impl<S: AsFd> OpCode for Connect<S> {
fn create_entry(self: Pin<&mut Self>) -> OpEntry {
opcode::Connect::new(
Expand Down
38 changes: 38 additions & 0 deletions compio-driver/src/sys/poll/op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -768,6 +768,44 @@ unsafe impl<S: AsFd> OpCode for Accept<S> {
}
}

pin_project! {
/// Accept multiple connections.
pub struct AcceptMulti<S> {
Comment thread
Berrysoft marked this conversation as resolved.
#[pin]
pub(crate) op: Accept<S>,
}
}

impl<S> AcceptMulti<S> {
/// Create [`AcceptMulti`].
pub fn new(fd: S) -> Self {
Self {
op: Accept::new(fd),
}
}
}

unsafe impl<S: AsFd> OpCode for AcceptMulti<S> {
fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
self.project().op.pre_submit()
}

fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
self.project().op.op_type()
}

fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
self.project().op.operate()
}
Comment thread
Berrysoft marked this conversation as resolved.
}

impl<S> IntoInner for AcceptMulti<S> {
type Inner = Socket2;

fn into_inner(self) -> Self::Inner {
self.op.into_inner().0
}
}
unsafe impl<S: AsFd> OpCode for Connect<S> {
fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
syscall!(
Expand Down
24 changes: 23 additions & 1 deletion compio-driver/src/sys/stub/op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use std::ffi::CString;

use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut};
use socket2::{SockAddr, SockAddrStorage, socklen_t};
use socket2::{SockAddr, SockAddrStorage, Socket as Socket2, socklen_t};

use super::{OpCode, stub_unimpl};
pub use crate::sys::unix_op::*;
Expand Down Expand Up @@ -125,6 +125,28 @@ impl OpCode for CloseSocket {}

impl<S: AsFd> OpCode for Accept<S> {}

/// Accept multiple connections.
pub struct AcceptMulti<S> {
fd: S,
}

impl<S> AcceptMulti<S> {
/// Create [`AcceptMulti`].
pub fn new(fd: S) -> Self {
Self { fd }
}
}

impl<S> IntoInner for AcceptMulti<S> {
type Inner = Socket2;

fn into_inner(self) -> Self::Inner {
stub_unimpl()
}
}

impl<S: AsFd> OpCode for AcceptMulti<S> {}

impl<S: AsFd> OpCode for Connect<S> {}

impl<T: IoBufMut, S: AsFd> OpCode for Recv<T, S> {}
Expand Down
7 changes: 5 additions & 2 deletions compio-driver/src/sys/unix_op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -548,9 +548,12 @@ impl<S> Accept<S> {
_p: PhantomPinned,
}
}
}

impl<S> IntoInner for Accept<S> {
type Inner = (Socket2, SockAddr);

/// Get the remote address from the inner buffer.
pub fn into_addr(mut self) -> (Socket2, SockAddr) {
fn into_inner(mut self) -> Self::Inner {
let socket = self.accepted_fd.take().expect("socket not accepted");
(socket, unsafe { SockAddr::new(self.buffer, self.addr_len) })
}
Comment thread
Berrysoft marked this conversation as resolved.
Expand Down
Loading