Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@ include = [
# By default Mio only provides a shell implementation.
default = ["log"]

# Enables the Event* hanle registration.
os-extended = ["os-ext"]
# Enables the `Poll` and `Registry` types.
os-poll = []
# Enables additional OS specific extensions, e.g. Unix `pipe(2)`.
os-ext = [
"os-poll",
"os-poll",
"windows-sys/Win32_System_Pipes",
"windows-sys/Win32_Security",
]
Expand Down Expand Up @@ -64,6 +66,7 @@ features = [
"Win32_Security", # Enables NtCreateFile
"Win32_System_IO", # IO types like OVERLAPPED etc
"Win32_System_WindowsProgramming", # General future used for various types/funcs
"Win32_System_Threading", # Needed for testing CreateWaitableTimerExW
]

[target.'cfg(target_os = "wasi")'.dependencies]
Expand Down Expand Up @@ -108,5 +111,9 @@ required-features = ["os-poll", "net"]
name = "udp_server"
required-features = ["os-poll", "net"]

[[example]]
name = "windows_event"
required-features = ["os-extended", "net"]

[lints.rust]
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(mio_unsupported_force_poll_poll)', 'cfg(mio_unsupported_force_waker_pipe)'] }
1 change: 1 addition & 0 deletions examples/tcp_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ fn main() -> io::Result<()> {
poll.registry()
.register(&mut server, SERVER, Interest::READABLE)?;


// Map of `Token` -> `TcpStream`.
let mut connections = HashMap::new();
// Unique token for each incoming connection.
Expand Down
200 changes: 200 additions & 0 deletions examples/windows_event.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
// You can run this example from the root of the mio repo:
// cargo run --example udp_server --features="os-poll net"

use std::io;

#[cfg(target_os = "windows")]
pub mod os_spec
{

use std::{io, os::windows::io::{AsHandle, AsRawHandle, FromRawHandle, OwnedHandle}, ptr::null};
use mio::{Events, Interest, Poll, Token, net::UdpSocket, windows::SourceEventHndl};
use log::warn;
use windows_sys::Win32::System::Threading::{CreateWaitableTimerExW, EVENT_ALL_ACCESS, SetWaitableTimer};


#[repr(transparent)]
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct HANDLE(pub *mut core::ffi::c_void);
impl HANDLE
{
pub
fn try_into_owned(self) -> Result<OwnedHandle, String>
{
if self.0 == -1 as _ || self.0 == 0 as _
{
return Err(format!("invalid handle!"));
}
else
{
return Ok(unsafe { OwnedHandle::from_raw_handle(self.0) });
}
}
}

/// Our instace that we want to poll
#[derive(Debug)]
pub struct PrimitiveTimer
{
hndl_timer: OwnedHandle
}

impl AsHandle for PrimitiveTimer
{
fn as_handle(&self) -> std::os::windows::prelude::BorrowedHandle<'_>
{
return self.hndl_timer.as_handle();
}
}

impl PrimitiveTimer
{
fn new(name: &str) -> PrimitiveTimer
{
let mut label_cstr: Vec<u16> = name.encode_utf16().collect();
label_cstr.push(0);

let hndl_timer =
unsafe
{
HANDLE(
CreateWaitableTimerExW(
null(),
label_cstr.as_ptr(),
0,
EVENT_ALL_ACCESS
)
)
.try_into_owned()
.unwrap()
};

return Self{ hndl_timer: hndl_timer};
}

fn arm_relative(&self, timeout: i64)
{
let time: i64 = timeout / 100;
unsafe
{
SetWaitableTimer(
self.hndl_timer.as_raw_handle(),
&time as *const i64,
0,
None,
null(),
false.into()
)
};
}
}

// A token to allow us to identify which event is for the `UdpSocket`.
const UDP_SOCKET: Token = Token(0);
const TIMER_EVENT: Token = Token(1);

pub
fn main1() -> io::Result<()>
{
env_logger::init();

// Create a poll instance.
let mut poll = Poll::new()?;
// Create storage for events. Since we will only register a single socket, a
// capacity of 1 will do.
let mut events = Events::with_capacity(1);

// Setup the UDP socket.
let addr = "127.0.0.1:9000".parse().unwrap();

let socket = UdpSocket::bind(addr)?;

// Setup timer
let mut se_hndl_timer =
SourceEventHndl::new(PrimitiveTimer::new("timer_1")).unwrap();

// Register our socket with the token defined above and an interest in being
// `READABLE`.
poll
.registry()
.register(&mut se_hndl_timer, TIMER_EVENT, Interest::READABLE)?;

println!("You can connect to the server using `nc`:");
println!(" $ nc -u 127.0.0.1 9000");
println!("Anything you type will be echoed back to you.");

// Initialize a buffer for the UDP packet. We use the maximum size of a UDP
// packet, which is the maximum value of a 16-bit integer (65536).
let mut buf = [0; 1 << 16];

// set connection timeout
se_hndl_timer.inner().arm_relative(-5_000_000_000); // 5 sec relative

// Our event loop.
loop {
// Poll to check if we have events waiting for us.
if let Err(err) = poll.poll(&mut events, None) {
if err.kind() == io::ErrorKind::Interrupted {
continue;
}
return Err(err);
}

// Process each event.
for event in events.iter() {
// Validate the token we registered our socket with,
// in this example it will only ever be one but we
// make sure it's valid none the less.
match event.token()
{
TIMER_EVENT =>
{
eprintln!("timeout!");
return Ok(());
},
UDP_SOCKET => loop
{
// In this loop we receive all packets queued for the socket.
match socket.recv_from(&mut buf) {
Ok((packet_size, source_address)) => {
// Echo the data.
socket.send_to(&buf[..packet_size], source_address)?;
}
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
// If we get a `WouldBlock` error we know our socket
// has no more packets queued, so we can return to
// polling and wait for some more.
break;
}
Err(e) => {
// If it was any other kind of error, something went
// wrong and we terminate with an error.
return Err(e);
}
}
},
_ => {
// This should never happen as we only registered our
// `UdpSocket` using the `UDP_SOCKET` token, but if it ever
// does we'll log it.
warn!("Got event for unexpected token: {event:?}");
}
}
}
}
}


}

#[cfg(target_os = "windows")]
fn main() -> io::Result<()>
{
return self::os_spec::main1();
}

#[cfg(not(target_os = "windows"))]
fn main() -> io::Result<()>
{
panic!("can't monitor event not on windows")
}
7 changes: 7 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
unused_imports,
dead_code
)]
#![allow(dead_code)]
#![cfg_attr(docsrs, feature(doc_cfg))]
// Disallow warnings when running tests.
#![cfg_attr(test, deny(warnings))]
Expand Down Expand Up @@ -102,6 +103,12 @@ pub mod windows {
//! Windows only extensions.

pub use crate::sys::named_pipe::NamedPipe;

#[cfg(feature = "os-extended")]
pub use crate::sys::source_hndl::SourceEventHndl;

#[cfg(feature = "os-extended")]
pub use crate::sys::source_hndl::SourceHndl;
}

pub mod features {
Expand Down
11 changes: 11 additions & 0 deletions src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,17 @@ macro_rules! cfg_os_ext {
}
}

/// The `os-extended` feature is enabled.
macro_rules! cfg_os_extended {
($($item:item)*) => {
$(
#[cfg(feature = "os-extended")]
#[cfg_attr(docsrs, doc(cfg(feature = "os-extended")))]
$item
)*
}
}

/// The `net` feature is enabled.
macro_rules! cfg_net {
($($item:item)*) => {
Expand Down
18 changes: 14 additions & 4 deletions src/sys/windows/afd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use windows_sys::Win32::Foundation::{
};
use windows_sys::Win32::System::IO::{IO_STATUS_BLOCK, IO_STATUS_BLOCK_0};


const IOCTL_AFD_POLL: u32 = 0x00012024;

/// Winsock2 AFD driver instance.
Expand Down Expand Up @@ -117,7 +118,8 @@ cfg_io_source! {
use std::mem::zeroed;
use std::os::windows::io::{FromRawHandle, RawHandle};
use std::ptr::null_mut;
use std::sync::atomic::{AtomicUsize, Ordering};
//use std::sync::atomic::{AtomicUsize, Ordering};


use windows_sys::Wdk::Foundation::OBJECT_ATTRIBUTES;
use windows_sys::Wdk::Storage::FileSystem::{NtCreateFile, FILE_OPEN};
Expand All @@ -127,6 +129,8 @@ cfg_io_source! {
};
use windows_sys::Win32::System::WindowsProgramming::FILE_SKIP_SET_EVENT_ON_HANDLE;

use crate::sys::windows::tokens::{TokenGenerator, TokenAfd};

use super::iocp::CompletionPort;

const AFD_HELPER_ATTRIBUTES: OBJECT_ATTRIBUTES = OBJECT_ATTRIBUTES {
Expand Down Expand Up @@ -162,7 +166,8 @@ cfg_io_source! {
'o' as _
];

static NEXT_TOKEN: AtomicUsize = AtomicUsize::new(0);
//static NEXT_TOKEN: AtomicUsize = AtomicUsize::new(0);
static NEXT_TOKEN: TokenGenerator<TokenAfd> = TokenGenerator::new();

impl AfdPollInfo {
pub fn zeroed() -> AfdPollInfo {
Expand Down Expand Up @@ -201,11 +206,16 @@ cfg_io_source! {
return Err(io::Error::new(raw_err.kind(), msg));
}
let fd = File::from_raw_handle(afd_helper_handle as RawHandle);
// Increment by 2 to reserve space for other types of handles.

/* // Increment by 2 to reserve space for other types of handles.
// Non-AFD types (currently only NamedPipe), use odd numbered
// tokens. This allows the selector to differentiate between them
// and dispatch events accordingly.
let token = NEXT_TOKEN.fetch_add(2, Ordering::Relaxed) + 2;
let token = NEXT_TOKEN.fetch_add(2, Ordering::Relaxed) + 2;*/

// Generate token.
let token = NEXT_TOKEN.next();

let afd = Afd { fd };
cp.add_handle(token, &afd.fd)?;
match SetFileCompletionNotificationModes(
Expand Down
19 changes: 10 additions & 9 deletions src/sys/windows/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use super::afd;
use super::iocp::CompletionStatus;
use crate::Token;


#[derive(Clone)]
pub struct Event {
pub flags: u32,
Expand Down Expand Up @@ -31,23 +32,22 @@ impl Event {
self.flags |= afd::POLL_SEND;
}


pub(super) fn from_completion_status(status: &CompletionStatus) -> Event {
Event {
flags: status.bytes_transferred(),
data: status.token() as u64,
}
}

pub(super) fn to_completion_status(&self) -> CompletionStatus {
CompletionStatus::new(self.flags, self.data as usize, std::ptr::null_mut())
}

#[cfg(feature = "os-ext")]
pub(super) fn to_completion_status_with_overlapped(

#[cfg(feature = "os-poll")]
pub(super)
fn to_completion_status_with_overlapped(
&self,
overlapped: *mut super::Overlapped,
) -> CompletionStatus {
CompletionStatus::new(self.flags, self.data as usize, overlapped)
) -> CompletionStatus
{
CompletionStatus::from_event_overlapped(self, overlapped)
}
}

Expand All @@ -62,6 +62,7 @@ pub(crate) const READ_CLOSED_FLAGS: u32 =
afd::POLL_DISCONNECT | afd::POLL_ABORT | afd::POLL_CONNECT_FAIL;
pub(crate) const WRITE_CLOSED_FLAGS: u32 = afd::POLL_ABORT | afd::POLL_CONNECT_FAIL;


pub fn is_readable(event: &Event) -> bool {
event.flags & READABLE_FLAGS != 0
}
Expand Down
Loading