Skip to content

Commit 170a4cb

Browse files
authored
Cleanup and improvements for neon driver (#700)
1 parent c1a6a88 commit 170a4cb

File tree

11 files changed

+84
-106
lines changed

11 files changed

+84
-106
lines changed

ntex-io/CHANGES.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# Changes
22

3+
## [3.3.1] - 2025-12-18
4+
5+
* Add IoTaskStatus::ready() helper method
6+
37
## [3.3.0] - 2025-12-17
48

59
* Upgrade to ntex-service v4

ntex-io/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "ntex-io"
3-
version = "3.3.0"
3+
version = "3.3.1"
44
authors = ["ntex contributors <team@ntex.rs>"]
55
description = "Utilities for abstracting io streams"
66
keywords = ["network", "framework", "async", "futures"]

ntex-io/src/lib.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,14 @@ pub enum IoTaskStatus {
122122
Stop,
123123
}
124124

125+
impl IoTaskStatus {
126+
#[inline]
127+
/// Ready for more io ops
128+
pub fn ready(self) -> bool {
129+
self == IoTaskStatus::Io
130+
}
131+
}
132+
125133
/// Io status
126134
#[derive(Debug)]
127135
pub enum IoStatusUpdate {

ntex-net/CHANGES.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# Changes
22

3+
## [3.4.1] - 2025-12-18
4+
5+
* Cleanup and improvements for polling driver
6+
37
## [3.4.0] - 2025-12-17
48

59
* Upgrade to ntex-service v4

ntex-net/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "ntex-net"
3-
version = "3.4.0"
3+
version = "3.4.1"
44
authors = ["ntex contributors <team@ntex.rs>"]
55
description = "ntexwork utils for ntex framework"
66
keywords = ["network", "framework", "async", "futures"]
@@ -35,7 +35,7 @@ io-uring-compat = []
3535
ntex-service = "4"
3636
ntex-bytes = "1"
3737
ntex-http = "1"
38-
ntex-io = "3"
38+
ntex-io = "3.3.1"
3939
ntex-rt = "3.3"
4040
ntex-util = "3"
4141

ntex-net/src/helpers.rs

Lines changed: 3 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,7 @@
1-
use std::{error, io, net, net::SocketAddr, path::Path};
1+
use std::{io, net, net::SocketAddr, path::Path};
22

3-
use ntex_util::channel::oneshot::channel;
43
use socket2::{Domain, Protocol, SockAddr, Socket, Type};
54

6-
pub(crate) fn pool_io_err<T, E>(result: std::result::Result<T, E>) -> io::Result<T>
7-
where
8-
E: error::Error + Send + Sync + 'static,
9-
{
10-
result.map_err(|e| io::Error::other(e))
11-
}
12-
135
pub(crate) async fn connect(addr: SocketAddr) -> io::Result<Socket> {
146
let addr = SockAddr::from(addr);
157
let domain = addr.domain();
@@ -28,17 +20,9 @@ async fn connect_inner(
2820
protocol: Option<Protocol>,
2921
) -> io::Result<Socket> {
3022
let sock = prep_socket(Socket::new(domain, ty, protocol)?)?;
31-
32-
let (sender, rx) = channel();
33-
34-
crate::rt_impl::connect::ConnectOps::current().connect(sock, addr, sender)?;
35-
36-
let sock = rx
23+
crate::rt_impl::connect::ConnectOps::current()
24+
.connect(sock, addr)
3725
.await
38-
.map_err(|_| io::Error::other("IO Driver is gone"))
39-
.and_then(|item| item)?;
40-
41-
Ok(sock)
4226
}
4327

4428
pub(crate) fn prep_socket(sock: Socket) -> io::Result<Socket> {

ntex-net/src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
//! Utility for async runtime abstraction
22
#![deny(rust_2018_idioms, unreachable_pub, missing_debug_implementations)]
3-
#![allow(unused_variables, dead_code)]
43

54
mod compat;
65
pub mod connect;

ntex-net/src/rt_polling/connect.rs

Lines changed: 17 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,13 @@ use std::{cell::RefCell, io, os::fd::AsRawFd, os::fd::RawFd, rc::Rc};
22

33
use ntex_neon::driver::{DriverApi, Event, Handler};
44
use ntex_neon::{Runtime, syscall};
5-
use ntex_util::channel::oneshot::Sender;
5+
use ntex_util::channel::oneshot::{Sender, channel};
66
use slab::Slab;
77
use socket2::{SockAddr, Socket};
88

99
#[derive(Clone)]
1010
pub(crate) struct ConnectOps(Rc<ConnectOpsInner>);
1111

12-
#[derive(Debug)]
13-
enum Change {
14-
Event(Event),
15-
Error(io::Error),
16-
}
17-
1812
struct ConnectOpsBatcher {
1913
inner: Rc<ConnectOpsInner>,
2014
}
@@ -52,21 +46,25 @@ impl ConnectOps {
5246
})
5347
}
5448

55-
pub(crate) fn connect(
56-
&self,
57-
sock: Socket,
58-
addr: SockAddr,
59-
sender: Sender<io::Result<Socket>>,
60-
) -> io::Result<usize> {
49+
pub(crate) async fn connect(&self, sock: Socket, addr: SockAddr) -> io::Result<Socket> {
6150
let result = syscall!(
6251
break libc::connect(sock.as_raw_fd(), addr.as_ptr().cast(), addr.len())
6352
)?;
64-
let fd = sock.as_raw_fd();
65-
let item = Item { sock, sender };
66-
let id = self.0.connects.borrow_mut().insert(item);
67-
self.0.api.attach(fd, id as u32, Event::writable(0));
68-
69-
Ok(id)
53+
if result.is_ready() {
54+
// socket is connected
55+
Ok(sock)
56+
} else {
57+
// connect is async
58+
let (sender, rx) = channel();
59+
let fd = sock.as_raw_fd();
60+
let item = Item { sock, sender };
61+
let id = self.0.connects.borrow_mut().insert(item);
62+
self.0.api.attach(fd, id as u32, Event::writable(0));
63+
64+
rx.await
65+
.map_err(|_| io::Error::other("IO Driver is gone"))
66+
.and_then(|item| item)
67+
}
7068
}
7169
}
7270

ntex-net/src/rt_polling/driver.rs

Lines changed: 11 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,6 @@ struct StreamOpsInner {
5050
delayed_drop: Cell<bool>,
5151
delayed_feed: Cell<Option<Vec<IdType>>>,
5252
streams: Cell<Option<Box<Slab<StreamItem>>>>,
53-
lw: usize,
54-
hw: usize,
5553
}
5654

5755
impl StreamOps {
@@ -65,8 +63,6 @@ impl StreamOps {
6563
delayed_drop: Cell::new(false),
6664
delayed_feed: Cell::new(Some(Vec::new())),
6765
streams: Cell::new(Some(Box::new(Slab::new()))),
68-
lw: 1024,
69-
hw: 1024 * 16,
7066
});
7167
inner = Some(ops.clone());
7268
Box::new(StreamOpsHandler { inner: ops })
@@ -131,21 +127,18 @@ impl Handler for StreamOpsHandler {
131127
log::trace!("{}: Event ({:?}): {ev:?} {:?}", io.tag(), io.fd(), io.flags);
132128

133129
if ev.readable {
134-
match io.read(id as u32, &self.inner.api, self.inner.lw, self.inner.hw) {
135-
IoTaskStatus::Io => {
136-
renew.readable = true;
137-
io.flags.insert(Flags::RD);
138-
}
139-
IoTaskStatus::Pause | IoTaskStatus::Stop => {
140-
io.flags.remove(Flags::RD);
141-
}
130+
if io.read().ready() {
131+
renew.readable = true;
132+
io.flags.insert(Flags::RD);
133+
} else {
134+
io.flags.remove(Flags::RD);
142135
}
143136
} else if io.flags.contains(Flags::RD) {
144137
renew.readable = true;
145138
}
146139

147140
if ev.writable {
148-
if io.write(id as u32, &self.inner.api) == IoTaskStatus::Io {
141+
if io.write().ready() {
149142
renew.writable = true;
150143
io.flags.insert(Flags::WR);
151144
} else {
@@ -248,7 +241,6 @@ impl StreamOpsInner {
248241
if let Some(mut streams) = self.streams.take() {
249242
let idx = id as usize;
250243
let item = &mut streams[idx];
251-
let fd = item.fd();
252244

253245
if item.flags.contains(Flags::DROPPED_PRI) {
254246
let item = streams.remove(idx);
@@ -298,7 +290,7 @@ impl StreamCtl {
298290
})
299291
.await
300292
.map_err(io::Error::other)
301-
.and_then(crate::helpers::pool_io_err)
293+
.and_then(|res| res.map_err(io::Error::other))
302294
}
303295

304296
/// Modify poll interest for the stream
@@ -318,9 +310,7 @@ impl StreamCtl {
318310
if io.flags.contains(Flags::RD) {
319311
event.readable = true;
320312
want_update_read = false;
321-
} else if io.read(self.id, &self.inner.api, self.inner.lw, self.inner.hw)
322-
== IoTaskStatus::Io
323-
{
313+
} else if io.read().ready() {
324314
event.readable = true;
325315
io.flags.insert(Flags::RD);
326316
} else {
@@ -337,7 +327,7 @@ impl StreamCtl {
337327
if io.flags.contains(Flags::WR) {
338328
event.writable = true;
339329
want_update_write = false;
340-
} else if io.write(self.id, &self.inner.api) == IoTaskStatus::Io {
330+
} else if io.write().ready() {
341331
event.writable = true;
342332
io.flags.insert(Flags::WR);
343333
} else {
@@ -393,7 +383,7 @@ impl StreamItem {
393383
self.ctx.tag()
394384
}
395385

396-
fn write(&mut self, id: u32, api: &DriverApi) -> IoTaskStatus {
386+
fn write(&mut self) -> IoTaskStatus {
397387
if let Some(buf) = self.ctx.get_write_buf() {
398388
let fd = self.fd();
399389
log::trace!("{}: Write ({fd:?}), buf: {:?}", self.ctx.tag(), buf.len());
@@ -403,7 +393,7 @@ impl StreamItem {
403393
IoTaskStatus::Pause
404394
}
405395

406-
fn read(&mut self, id: u32, api: &DriverApi, lw: usize, hw: usize) -> IoTaskStatus {
396+
fn read(&mut self) -> IoTaskStatus {
407397
let mut buf = self.ctx.get_read_buf();
408398

409399
let fd = self.fd();

ntex-net/src/rt_uring/connect.rs

Lines changed: 22 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,13 @@ use std::{cell::RefCell, io, os::fd::AsRawFd, rc::Rc};
22

33
use ntex_neon::driver::io_uring::{opcode, types::Fd};
44
use ntex_neon::{Runtime, driver::DriverApi, driver::Handler};
5-
use ntex_util::channel::oneshot::Sender;
5+
use ntex_util::channel::oneshot::{Sender, channel};
66
use slab::Slab;
77
use socket2::{SockAddr, Socket};
88

99
#[derive(Clone)]
1010
pub(crate) struct ConnectOps(Rc<ConnectOpsInner>);
1111

12-
#[derive(Debug)]
13-
enum Change {
14-
Readable,
15-
Writable,
16-
Error(io::Error),
17-
}
18-
1912
struct ConnectOpsHandler {
2013
inner: Rc<ConnectOpsInner>,
2114
}
@@ -47,27 +40,26 @@ impl ConnectOps {
4740
})
4841
}
4942

50-
pub(crate) fn connect(
51-
&self,
52-
sock: Socket,
53-
addr: SockAddr,
54-
sender: Sender<io::Result<Socket>>,
55-
) -> io::Result<()> {
56-
let addr2 = addr.clone();
57-
let mut ops = self.0.ops.borrow_mut();
58-
59-
// addr must be stable, neon submits ops at the end of rt turn
60-
let addr = Box::new(addr);
61-
let (addr_ptr, addr_len) = (addr.as_ref().as_ptr().cast(), addr.len());
62-
63-
let fd = sock.as_raw_fd();
64-
let id = ops.insert((addr, sock, sender));
65-
self.0.api.submit(
66-
id as u32,
67-
opcode::Connect::new(Fd(fd), addr_ptr, addr_len).build(),
68-
);
69-
70-
Ok(())
43+
pub(crate) async fn connect(&self, sock: Socket, addr: SockAddr) -> io::Result<Socket> {
44+
{
45+
let mut ops = self.0.ops.borrow_mut();
46+
47+
// addr must be stable, neon submits ops at the end of rt turn
48+
let addr = Box::new(addr);
49+
let (addr_ptr, addr_len) = (addr.as_ref().as_ptr().cast(), addr.len());
50+
51+
let (sender, rx) = channel();
52+
let fd = sock.as_raw_fd();
53+
let id = ops.insert((addr, sock, sender));
54+
self.0.api.submit(
55+
id as u32,
56+
opcode::Connect::new(Fd(fd), addr_ptr, addr_len).build(),
57+
);
58+
rx
59+
}
60+
.await
61+
.map_err(|_| io::Error::other("IO Driver is gone"))
62+
.and_then(|item| item)
7163
}
7264
}
7365

@@ -77,7 +69,7 @@ impl Handler for ConnectOpsHandler {
7769
self.inner.ops.borrow_mut().remove(user_data);
7870
}
7971

80-
fn completed(&mut self, user_data: usize, flags: u32, result: io::Result<usize>) {
72+
fn completed(&mut self, user_data: usize, _: u32, result: io::Result<usize>) {
8173
let (addr, sock, tx) = self.inner.ops.borrow_mut().remove(user_data);
8274
log::trace!(
8375
"connect-op is completed {:?} result: {:?}, addr: {:?}",

0 commit comments

Comments
 (0)