Skip to content
7 changes: 3 additions & 4 deletions tokio/src/runtime/io/driver/signal.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
use crate::signal::unix::pipe;

use super::{Driver, Handle, TOKEN_SIGNAL};

use std::io;

impl Handle {
pub(crate) fn register_signal_receiver(
&self,
receiver: &mut mio::net::UnixStream,
) -> io::Result<()> {
pub(crate) fn register_signal_receiver(&self, receiver: &mut pipe::Receiver) -> io::Result<()> {
self.registry
.register(receiver, TOKEN_SIGNAL, mio::Interest::READABLE)?;
Ok(())
Expand Down
39 changes: 11 additions & 28 deletions tokio/src/runtime/signal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@

use crate::runtime::{driver, io};
use crate::signal::registry::globals;
use crate::signal::unix::pipe;

use mio::net::UnixStream;
use std::io::{self as std_io, Read};
use std::io as std_io;
use std::sync::{Arc, Weak};
use std::time::Duration;

Expand All @@ -21,7 +21,7 @@ pub(crate) struct Driver {
io: io::Driver,

/// A pipe for receiving wake events from the signal handler
receiver: UnixStream,
receiver: pipe::Receiver,

/// Shared state. The driver keeps a strong ref and the handle keeps a weak
/// ref. The weak ref is used to check if the driver is still active before
Expand All @@ -41,9 +41,6 @@ pub(crate) struct Handle {
impl Driver {
/// Creates a new signal `Driver` instance that delegates wakeups to `park`.
pub(crate) fn new(io: io::Driver, io_handle: &io::Handle) -> std_io::Result<Self> {
use std::mem::ManuallyDrop;
use std::os::unix::io::{AsRawFd, FromRawFd};

// NB: We give each driver a "fresh" receiver file descriptor to avoid
// the issues described in alexcrichton/tokio-process#42.
//
Expand All @@ -63,14 +60,7 @@ impl Driver {
// safe as each dup is registered with separate reactors **and** we
// only expect at least one dup to receive the notification.

// Manually drop as we don't actually own this instance of UnixStream.
let receiver_fd = globals().receiver.as_raw_fd();

// safety: there is nothing unsafe about this, but the `from_raw_fd` fn is marked as unsafe.
let original =
ManuallyDrop::new(unsafe { std::os::unix::net::UnixStream::from_raw_fd(receiver_fd) });
let mut receiver = UnixStream::from_std(original.try_clone()?);

let mut receiver = globals()?.receiver()?;
io_handle.register_signal_receiver(&mut receiver)?;

Ok(Self {
Expand Down Expand Up @@ -109,21 +99,14 @@ impl Driver {
return;
}

// Drain the pipe completely so we can receive a new readiness event
// if another signal has come in.
let mut buf = [0; 128];
#[allow(clippy::unused_io_amount)]
loop {
match self.receiver.read(&mut buf) {
Ok(0) => panic!("EOF on self-pipe"),
Ok(_) => continue, // Keep reading
Err(e) if e.kind() == std_io::ErrorKind::WouldBlock => break,
Err(e) => panic!("Bad read on self-pipe: {e}"),
}
}
// consume value
let _ = self.receiver.read();

// Broadcast any signals which were received
globals().broadcast();
// We do a best-effort broadcast here
if let Ok(globals) = globals() {
// Broadcast any signals which were received.
globals.broadcast();
}
}
}

Expand Down
107 changes: 107 additions & 0 deletions tokio/src/signal/pipe/eventfd.rs
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comparing with what mio does, this seems to not have a bunch of logic to reset the eventfd and so on... Is it missing here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A read on fd returned by eventfd resets eventfd. Here's libc::eventfd_read in Receiver::read() function.

Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
use std::{
io,
os::fd::{AsRawFd, FromRawFd, OwnedFd},
};

use mio::{event, unix::SourceFd};

#[derive(Debug)]
pub(crate) struct Sender {
fd: OwnedFd,
}

#[derive(Debug)]
pub(crate) struct Receiver {
fd: OwnedFd,
}

impl event::Source for Receiver {
fn register(
&mut self,
registry: &mio::Registry,
token: mio::Token,
interests: mio::Interest,
) -> io::Result<()> {
SourceFd(&self.fd.as_raw_fd()).register(registry, token, interests)
}

fn reregister(
&mut self,
registry: &mio::Registry,
token: mio::Token,
interests: mio::Interest,
) -> io::Result<()> {
SourceFd(&self.fd.as_raw_fd()).reregister(registry, token, interests)
}

fn deregister(&mut self, registry: &mio::Registry) -> io::Result<()> {
SourceFd(&self.fd.as_raw_fd()).deregister(registry)
}
}

impl Sender {
pub(crate) fn new() -> std::io::Result<Self> {
// SAFETY: it's ok to call libc API
let fd = unsafe { libc::eventfd(0, libc::EFD_NONBLOCK | libc::EFD_CLOEXEC) };
if fd == -1 {
return Err(io::Error::last_os_error());
}
Ok(Sender {
// SAFETY: fd just opened by the above libc::eventfd
fd: unsafe { OwnedFd::from_raw_fd(fd) },
})
}

pub(crate) fn receiver(&self) -> std::io::Result<Receiver> {
Ok(Receiver {
fd: self.fd.try_clone()?,
})
}
}

impl Sender {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be merged with the impl above.

pub(crate) fn write(&self) -> std::io::Result<usize> {
// SAFETY: it's ok to call libc API
let r = unsafe { libc::eventfd_write(self.fd.as_raw_fd(), 1) };
if r == 0 {
Ok(0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a small different between the eventfd and unixstream impls here.
The eventfd's Sender::write() returns 0 on success. The UnixStream returns the number of written bytes (1).
AFAIS the successful result is not used.
Maybe change eventfd to return 1 too or change both to return Result<()> ?!

} else {
Err(std::io::Error::last_os_error())
}
}
}

impl Receiver {
pub(crate) fn read(&mut self) -> std::io::Result<libc::c_int> {
let fd = self.fd.as_raw_fd();
let mut value: libc::eventfd_t = 0;

// SAFETY: it's ok to call libc API
let r = unsafe { libc::eventfd_read(fd, &mut value as *mut libc::eventfd_t) };
if r == 0 {
Ok(0)
} else {
Err(std::io::Error::last_os_error())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it is non-blocking it may return EAGAIN (ErrorKind::WouldBlock) which is not an error.

    let err = std::io::Error::last_os_error();
    // On a non-blocking eventfd, it is expected to get an EAGAIN
    // when the counter is 0.
    if err.kind() == std::io::ErrorKind::WouldBlock {
        Ok(0)
    } else {
        Err(err)
    }

}
}
}

pub(crate) struct OsExtraData {
sender: Sender,
}

impl OsExtraData {
pub(crate) fn new() -> std::io::Result<Self> {
Sender::new().map(|sender| Self { sender })
}
}

impl OsExtraData {
pub(crate) fn receiver(&self) -> std::io::Result<Receiver> {
self.sender.receiver()
}

pub(crate) fn sender(&self) -> &Sender {
&self.sender
}
}
95 changes: 95 additions & 0 deletions tokio/src/signal/pipe/unixstream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
use mio::net::UnixStream;
use std::io::{self, Read, Write};
use std::mem::ManuallyDrop;
use std::os::unix::io::{AsRawFd, FromRawFd};

pub(crate) struct Sender {
inner: UnixStream,
}

#[derive(Debug)]
pub(crate) struct Receiver {
inner: UnixStream,
}

impl mio::event::Source for Receiver {
fn register(
&mut self,
registry: &mio::Registry,
token: mio::Token,
interests: mio::Interest,
) -> io::Result<()> {
self.inner.register(registry, token, interests)
}

fn reregister(
&mut self,
registry: &mio::Registry,
token: mio::Token,
interests: mio::Interest,
) -> io::Result<()> {
self.inner.reregister(registry, token, interests)
}

fn deregister(&mut self, registry: &mio::Registry) -> io::Result<()> {
self.inner.deregister(registry)
}
}

impl Sender {
pub(crate) fn write(&self) -> std::io::Result<usize> {
(&self.inner).write(&[1])
}
}

impl Receiver {
pub(crate) fn read(&mut self) -> std::io::Result<libc::c_int> {
// Drain the pipe completely so we can receive a new readiness event
// if another signal has come in.
let mut buf = [0; 128];
#[allow(clippy::unused_io_amount)]
loop {
match self.inner.read(&mut buf) {
Ok(0) => panic!("EOF on self-pipe"),
Ok(_) => continue, // Keep reading
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => break,
Err(e) => {
return Err(e);
}
}
}
Ok(0)
}
}

pub(crate) fn channel() -> std::io::Result<(Sender, Receiver)> {
let (sender, receiver) = UnixStream::pair()?;
Ok((Sender { inner: sender }, Receiver { inner: receiver }))
}

pub(crate) struct OsExtraData {
sender: Sender,
receiver: Receiver,
}

impl OsExtraData {
pub(crate) fn new() -> std::io::Result<Self> {
let (sender, receiver) = channel()?;
Ok(Self { sender, receiver })
}
}

impl OsExtraData {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This impl could be merged with the one above

pub(crate) fn receiver(&self) -> std::io::Result<Receiver> {
let receiver_fd = self.receiver.inner.as_raw_fd();
// SAFETY: fd owned by receiver is opened
let original =
ManuallyDrop::new(unsafe { std::os::unix::net::UnixStream::from_raw_fd(receiver_fd) });
let inner = UnixStream::from_std(original.try_clone()?);
Ok(Receiver { inner })
}

pub(crate) fn sender(&self) -> &Sender {
&self.sender
}
}
21 changes: 12 additions & 9 deletions tokio/src/signal/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,25 +142,28 @@ impl Globals {
}
}

fn globals_init() -> Globals
fn globals_init() -> std::io::Result<Globals>
where
OsExtraData: 'static + Send + Sync + Default,
OsExtraData: 'static + Send + Sync,
OsStorage: 'static + Send + Sync + Default,
{
Globals {
extra: OsExtraData::default(),
Ok(Globals {
extra: OsExtraData::new()?,
registry: Registry::new(OsStorage::default()),
}
})
}

pub(crate) fn globals() -> &'static Globals
pub(crate) fn globals() -> std::io::Result<&'static Globals>
where
OsExtraData: 'static + Send + Sync + Default,
OsExtraData: 'static + Send + Sync,
OsStorage: 'static + Send + Sync + Default,
{
static GLOBALS: OnceLock<Globals> = OnceLock::new();
static GLOBALS: OnceLock<std::io::Result<Globals>> = OnceLock::new();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Drive by review: I think you should store Result<Globals, i32> like I did for the per-signal result storage. You can then convert the i32 to an error via Error::from_raw_os_error and avoid allocating.


GLOBALS.get_or_init(globals_init)
match GLOBALS.get_or_init(globals_init) {
Ok(globals) => Ok(globals),
Err(e) => Err(std::io::Error::new(e.kind(), e.to_string())),
}
}

#[cfg(all(test, not(loom)))]
Expand Down
Loading