Skip to content
7 changes: 3 additions & 4 deletions tokio/src/runtime/io/driver/signal.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
use crate::signal::unix::pipe;

use super::{Driver, Handle, TOKEN_SIGNAL};

use std::io;

impl Handle {
pub(crate) fn register_signal_receiver(
&self,
receiver: &mut mio::net::UnixStream,
) -> io::Result<()> {
pub(crate) fn register_signal_receiver(&self, receiver: &mut pipe::Receiver) -> io::Result<()> {
self.registry
.register(receiver, TOKEN_SIGNAL, mio::Interest::READABLE)?;
Ok(())
Expand Down
32 changes: 6 additions & 26 deletions tokio/src/runtime/signal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@

use crate::runtime::{driver, io};
use crate::signal::registry::globals;
use crate::signal::unix::pipe;

use mio::net::UnixStream;
use std::io::{self as std_io, Read};
use std::io::{self as std_io};
use std::sync::{Arc, Weak};
use std::time::Duration;

Expand All @@ -21,7 +21,7 @@ pub(crate) struct Driver {
io: io::Driver,

/// A pipe for receiving wake events from the signal handler
receiver: UnixStream,
receiver: pipe::Receiver,

/// Shared state. The driver keeps a strong ref and the handle keeps a weak
/// ref. The weak ref is used to check if the driver is still active before
Expand All @@ -41,9 +41,6 @@ pub(crate) struct Handle {
impl Driver {
/// Creates a new signal `Driver` instance that delegates wakeups to `park`.
pub(crate) fn new(io: io::Driver, io_handle: &io::Handle) -> std_io::Result<Self> {
use std::mem::ManuallyDrop;
use std::os::unix::io::{AsRawFd, FromRawFd};

// NB: We give each driver a "fresh" receiver file descriptor to avoid
// the issues described in alexcrichton/tokio-process#42.
//
Expand All @@ -63,14 +60,7 @@ impl Driver {
// safe as each dup is registered with separate reactors **and** we
// only expect at least one dup to receive the notification.

// Manually drop as we don't actually own this instance of UnixStream.
let receiver_fd = globals().receiver.as_raw_fd();

// safety: there is nothing unsafe about this, but the `from_raw_fd` fn is marked as unsafe.
let original =
ManuallyDrop::new(unsafe { std::os::unix::net::UnixStream::from_raw_fd(receiver_fd) });
let mut receiver = UnixStream::from_std(original.try_clone()?);

let mut receiver = globals().receiver();
io_handle.register_signal_receiver(&mut receiver)?;

Ok(Self {
Expand Down Expand Up @@ -109,18 +99,8 @@ impl Driver {
return;
}

// Drain the pipe completely so we can receive a new readiness event
// if another signal has come in.
let mut buf = [0; 128];
#[allow(clippy::unused_io_amount)]
loop {
match self.receiver.read(&mut buf) {
Ok(0) => panic!("EOF on self-pipe"),
Ok(_) => continue, // Keep reading
Err(e) if e.kind() == std_io::ErrorKind::WouldBlock => break,
Err(e) => panic!("Bad read on self-pipe: {e}"),
}
}
// consume value
self.receiver.read();

// Broadcast any signals which were received
globals().broadcast();
Expand Down
203 changes: 196 additions & 7 deletions tokio/src/signal/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ use crate::signal::registry::{globals, EventId, EventInfo, Globals, Storage};
use crate::signal::RxFuture;
use crate::sync::watch;

use mio::net::UnixStream;
use std::io::{self, Error, ErrorKind, Write};
use std::io::{self, Error, ErrorKind};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Once;
use std::task::{Context, Poll};
Expand Down Expand Up @@ -61,20 +60,211 @@ impl Storage for OsStorage {
}
}

#[cfg(any(target_os = "linux", target_os = "illumos"))]
pub(crate) mod pipe {
use std::{
io,
os::fd::{AsRawFd, FromRawFd, OwnedFd},
};

use mio::{event, unix::SourceFd};

#[derive(Debug)]
pub(crate) struct Sender {
fd: OwnedFd,
}

#[derive(Debug)]
pub(crate) struct Receiver {
fd: OwnedFd,
}

impl event::Source for Receiver {
fn register(
&mut self,
registry: &mio::Registry,
token: mio::Token,
interests: mio::Interest,
) -> io::Result<()> {
SourceFd(&self.fd.as_raw_fd()).register(registry, token, interests)
}

fn reregister(
&mut self,
registry: &mio::Registry,
token: mio::Token,
interests: mio::Interest,
) -> io::Result<()> {
SourceFd(&self.fd.as_raw_fd()).reregister(registry, token, interests)
}

fn deregister(&mut self, registry: &mio::Registry) -> io::Result<()> {
SourceFd(&self.fd.as_raw_fd()).deregister(registry)
}
}

impl Sender {
pub(crate) fn new() -> Self {
let fd = unsafe { libc::eventfd(0, libc::EFD_NONBLOCK | libc::EFD_CLOEXEC) };
if fd < 0 {
panic!("eventfd failed: {}", io::Error::last_os_error());
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of panicking can we return an error here?

Sender {
fd: unsafe { OwnedFd::from_raw_fd(fd) },
}
}

pub(crate) fn receiver(&self) -> Receiver {
Receiver {
fd: self.fd.try_clone().unwrap(),
}
}
}

impl Sender {
pub(crate) fn write(&self) {
unsafe {
libc::eventfd_write(self.fd.as_raw_fd(), 1);
}
}
}

impl Receiver {
pub(crate) fn read(&mut self) -> libc::c_int {
let fd = &self.fd;
let mut value: libc::eventfd_t = 0;

unsafe { libc::eventfd_read(fd.as_raw_fd(), &mut value as *mut libc::eventfd_t) }
}
}
}

#[cfg(not(any(target_os = "linux", target_os = "illumos")))]
pub(crate) mod pipe {
use mio::net::UnixStream;
use std::io::{self, Read, Write};
use std::mem::ManuallyDrop;
use std::os::unix::io::{AsRawFd, FromRawFd};

#[derive(Debug)]
pub(crate) struct Sender {
inner: UnixStream,
}

#[derive(Debug)]
pub(crate) struct Receiver {
inner: UnixStream,
}

impl Clone for Receiver {
fn clone(&self) -> Self {
let receiver_fd = self.inner.as_raw_fd();
let original = ManuallyDrop::new(unsafe {
std::os::unix::net::UnixStream::from_raw_fd(receiver_fd)
});
let inner =
UnixStream::from_std(original.try_clone().expect("failed to clone UnixStream"));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment, this is a fallible operation, we should return an error here, not panic

Self { inner }
}
}

impl mio::event::Source for Receiver {
fn register(
&mut self,
registry: &mio::Registry,
token: mio::Token,
interests: mio::Interest,
) -> io::Result<()> {
self.inner.register(registry, token, interests)
}

fn reregister(
&mut self,
registry: &mio::Registry,
token: mio::Token,
interests: mio::Interest,
) -> io::Result<()> {
self.inner.reregister(registry, token, interests)
}

fn deregister(&mut self, registry: &mio::Registry) -> io::Result<()> {
self.inner.deregister(registry)
}
}

impl Sender {
pub(crate) fn write(&self) {
let _ = (&self.inner).write(&[1]);
}
}

impl Receiver {
pub(crate) fn read(&mut self) -> libc::c_int {
// Drain the pipe completely so we can receive a new readiness event
// if another signal has come in.
let mut buf = [0; 128];
#[allow(clippy::unused_io_amount)]
loop {
match self.inner.read(&mut buf) {
Ok(0) => panic!("EOF on self-pipe"),
Ok(_) => continue, // Keep reading
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => break,
Err(e) => panic!("Bad read on self-pipe: {e}"),
}
}
0
}
}

pub(crate) fn channel() -> (Sender, Receiver) {
let (sender, receiver) = UnixStream::pair().expect("failed to create UnixStream");
(Sender { inner: sender }, Receiver { inner: receiver })
}
}

#[cfg(any(target_os = "linux", target_os = "illumos"))]
#[derive(Debug)]
pub(crate) struct OsExtraData {
sender: UnixStream,
pub(crate) receiver: UnixStream,
sender: pipe::Sender,
}

#[cfg(any(target_os = "linux", target_os = "illumos"))]
impl Default for OsExtraData {
fn default() -> Self {
let (receiver, sender) = UnixStream::pair().expect("failed to create UnixStream");
let sender = pipe::Sender::new();
Self { sender }
}
}

#[cfg(any(target_os = "linux", target_os = "illumos"))]
impl OsExtraData {
pub(crate) fn receiver(&self) -> pipe::Receiver {
self.sender.receiver()
}
}

#[cfg(not(any(target_os = "linux", target_os = "illumos")))]
#[derive(Debug)]
pub(crate) struct OsExtraData {
sender: pipe::Sender,
receiver: pipe::Receiver,
}

#[cfg(not(any(target_os = "linux", target_os = "illumos")))]
impl Default for OsExtraData {
fn default() -> Self {
let (sender, receiver) = pipe::channel();
Self { sender, receiver }
}
}

#[cfg(not(any(target_os = "linux", target_os = "illumos")))]
impl OsExtraData {
pub(crate) fn receiver(&self) -> pipe::Receiver {
self.receiver.clone()
}
}

/// Represents the specific kind of signal to listen for.
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
pub struct SignalKind(libc::c_int);
Expand Down Expand Up @@ -268,8 +458,7 @@ fn action(globals: &'static Globals, signal: libc::c_int) {

// Send a wakeup, ignore any errors (anything reasonably possible is
// full pipe and then it will wake up anyway).
let mut sender = &globals.sender;
drop(sender.write(&[1]));
globals.sender.write();
}

/// Enables this module to receive signal notifications for the `signal`
Expand Down