Skip to content

Commit 4972221

Browse files
authored
Add FilterCtx::io() (#712)
1 parent 1a7493b commit 4972221

File tree

10 files changed

+78
-46
lines changed

10 files changed

+78
-46
lines changed

ntex-io/CHANGES.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# Changes
22

3+
## [3.3.2] - 2026-01-08
4+
5+
* Add FilterCtx::io()
6+
37
## [3.3.1] - 2025-12-18
48

59
* Add IoTaskStatus::ready() helper method

ntex-io/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "ntex-io"
3-
version = "3.3.1"
3+
version = "3.3.2"
44
authors = ["ntex contributors <team@ntex.rs>"]
55
description = "Utilities for abstracting io streams"
66
keywords = ["network", "framework", "async", "futures"]
@@ -27,5 +27,5 @@ log = { workspace = true }
2727
pin-project-lite = { workspace = true }
2828

2929
[dev-dependencies]
30-
ntex = "3.0.0-pre.8"
30+
ntex = "3.0.0-pre.10"
3131
rand = { workspace = true }

ntex-io/src/buf.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,11 +247,19 @@ impl<'a> FilterCtx<'a> {
247247
}
248248

249249
#[inline]
250+
/// Get io
251+
pub fn io(&self) -> &IoRef {
252+
self.io
253+
}
254+
255+
#[inline]
256+
/// Get io tag
250257
pub fn tag(&self) -> &'static str {
251258
self.io.tag()
252259
}
253260

254261
#[inline]
262+
/// Get filter ctx for next filter in chain
255263
pub fn next(&self) -> Self {
256264
Self {
257265
io: self.io,
@@ -261,6 +269,7 @@ impl<'a> FilterCtx<'a> {
261269
}
262270

263271
#[inline]
272+
/// Get current read buffer
264273
pub fn read_buf<F, R>(&self, nbytes: usize, f: F) -> R
265274
where
266275
F: FnOnce(&ReadBuf<'_>) -> R,
@@ -278,6 +287,7 @@ impl<'a> FilterCtx<'a> {
278287
}
279288

280289
#[inline]
290+
/// Get current write buffer
281291
pub fn write_buf<F, R>(&self, f: F) -> R
282292
where
283293
F: FnOnce(&WriteBuf<'_>) -> R,

ntex-net/CHANGES.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# Changes
22

3+
## [3.5.1] - 2026-01-08
4+
5+
* Remove changes queue checks for io-uring driver
6+
37
## [3.5.0] - 2026-01-03
48

59
* Refactor io driver

ntex-net/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "ntex-net"
3-
version = "3.5.0"
3+
version = "3.5.1"
44
authors = ["ntex contributors <team@ntex.rs>"]
55
description = "ntexwork utils for ntex framework"
66
keywords = ["network", "framework", "async", "futures"]

ntex-net/src/polling/connect.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -163,16 +163,18 @@ impl Handler for ConnectOpsBatcher {
163163

164164
fn error(&mut self, id: usize, err: io::Error) {
165165
let mut connects = self.inner.connects.borrow_mut();
166-
log::trace!(
167-
"Connect {id:?} is failed {err:?}, has-con: {}",
168-
connects.contains(id)
169-
);
170166

171167
if connects.contains(id) {
172-
let Item { sock, sender, .. } = connects.remove(id);
168+
let Item {
169+
sock, sender, cfg, ..
170+
} = connects.remove(id);
171+
log::trace!("{}: Connect {id:?} is failed {err:?}", cfg.tag());
172+
173173
let _ = sender.send(Err(err));
174174
self.inner.api.detach(sock.as_raw_fd(), id as u32);
175175
crate::helpers::close_socket(sock);
176+
} else {
177+
log::error!("Connect {id:?} is failed {err:?}");
176178
}
177179
}
178180

ntex-net/src/uring/driver.rs

Lines changed: 40 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
1+
use std::cell::{Cell, UnsafeCell};
12
use std::os::fd::{AsRawFd, FromRawFd, OwnedFd, RawFd};
2-
use std::{
3-
cell::Cell, cell::RefCell, cmp, collections::VecDeque, fmt, io, mem, net, rc::Rc,
4-
sync::Arc,
5-
};
3+
use std::{cmp, collections::VecDeque, fmt, io, mem, net, ptr, rc::Rc, sync::Arc};
64

75
#[cfg(unix)]
86
use std::os::unix::net::UnixStream as OsUnixStream;
@@ -49,14 +47,15 @@ impl DriverApi {
4947
where
5048
F: FnOnce(&mut SEntry),
5149
{
52-
let mut changes = self.inner.changes.borrow_mut();
53-
let sq = self.inner.ring.submission();
54-
if !changes.is_empty() || sq.is_full() {
55-
let mut entry = Default::default();
56-
f(&mut entry);
57-
changes.push_back(entry);
58-
} else {
59-
unsafe {
50+
unsafe {
51+
let changes = &mut *self.inner.changes.get();
52+
let sq = self.inner.ring.submission();
53+
if !changes.is_empty() || sq.is_full() {
54+
changes.push_back(mem::MaybeUninit::uninit());
55+
let entry = changes.back_mut().unwrap();
56+
ptr::write_bytes(entry.as_mut_ptr(), 0, 1);
57+
f(entry.assume_init_mut());
58+
} else {
6059
sq.push_inline(f).expect("Queue size is checked");
6160
}
6261
}
@@ -135,7 +134,7 @@ struct DriverInner {
135134
probe: Probe,
136135
flags: Cell<Flags>,
137136
ring: IoUring<SEntry, CEntry>,
138-
changes: RefCell<VecDeque<SEntry>>,
137+
changes: UnsafeCell<VecDeque<mem::MaybeUninit<SEntry>>>,
139138
}
140139

141140
impl Driver {
@@ -189,7 +188,7 @@ impl Driver {
189188
ring,
190189
probe,
191190
flags: Cell::new(if new { Flags::NEW } else { Flags::empty() }),
192-
changes: RefCell::new(VecDeque::with_capacity(32)),
191+
changes: UnsafeCell::new(VecDeque::with_capacity(32)),
193192
});
194193

195194
Ok(Self {
@@ -225,24 +224,35 @@ impl Driver {
225224
}
226225

227226
fn apply_changes(&self, sq: SubmissionQueue<'_, SEntry>) -> bool {
228-
let mut changes = self.inner.changes.borrow_mut();
229-
if changes.is_empty() {
230-
false
231-
} else {
232-
let num = cmp::min(changes.len(), sq.capacity() - sq.len());
233-
let (s1, s2) = changes.as_slices();
234-
let s1_num = cmp::min(s1.len(), num);
235-
if s1_num > 0 {
236-
unsafe { sq.push_multiple(&s1[0..s1_num]) }.unwrap();
237-
} else if !s2.is_empty() {
238-
let s2_num = cmp::min(s2.len(), num - s1_num);
239-
if s2_num > 0 {
240-
unsafe { sq.push_multiple(&s2[0..s2_num]) }.unwrap();
227+
unsafe {
228+
let changes = &mut *self.inner.changes.get();
229+
if changes.is_empty() {
230+
false
231+
} else {
232+
let num = cmp::min(changes.len(), sq.capacity() - sq.len());
233+
let (s1, s2) = changes.as_slices();
234+
let s1_num = cmp::min(s1.len(), num);
235+
if s1_num > 0 {
236+
// safety: "changes" contains only initialized entries
237+
sq.push_multiple(mem::transmute::<
238+
&[mem::MaybeUninit<SEntry>],
239+
&[SEntry],
240+
>(&s1[0..s1_num]))
241+
.unwrap();
242+
} else if !s2.is_empty() {
243+
let s2_num = cmp::min(s2.len(), num - s1_num);
244+
if s2_num > 0 {
245+
sq.push_multiple(mem::transmute::<
246+
&[mem::MaybeUninit<SEntry>],
247+
&[SEntry],
248+
>(&s2[0..s2_num]))
249+
.unwrap();
250+
}
241251
}
242-
}
243-
changes.drain(0..num);
252+
changes.drain(0..num);
244253

245-
!changes.is_empty()
254+
!changes.is_empty()
255+
}
246256
}
247257
}
248258

ntex-net/src/uring/stream.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ impl StreamOps {
137137
}
138138

139139
pub(crate) fn register(
140-
&self,
140+
self,
141141
io: Socket,
142142
ctx: IoContext,
143143
zc: bool,
@@ -150,7 +150,7 @@ impl StreamOps {
150150
flags: if zc { self.0.default_flags } else { Flags::NO_ZC },
151151
};
152152

153-
let id = self.0.with(move |st| {
153+
let id = self.0.with(|st| {
154154
// handle RDHUP event
155155
let op = opcode::PollAdd::new(item.fd(), libc::POLLRDHUP as u32).build();
156156
let id = st.streams.insert(item);

ntex-rt/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "ntex-rt"
3-
version = "3.4.0"
3+
version = "3.4.1"
44
authors = ["ntex contributors <team@ntex.rs>"]
55
description = "ntex runtime"
66
keywords = ["network", "framework", "async", "futures"]

ntex-rt/src/rt.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -212,8 +212,6 @@ impl RunnableQueue {
212212
}
213213

214214
fn run(&self) -> bool {
215-
self.idle.set(false);
216-
217215
for _ in 0..self.event_interval {
218216
let task = unsafe { (*self.local_queue.get()).pop_front() };
219217
if let Some(task) = task {
@@ -238,11 +236,15 @@ impl RunnableQueue {
238236
}
239237
break;
240238
}
241-
self.idle.set(true);
242239

243-
!unsafe { (*self.local_queue.get()).is_empty() }
240+
let more_tasks = !unsafe { (*self.local_queue.get()).is_empty() }
244241
|| !self.sync_fixed_queue.is_empty()
245-
|| !self.sync_queue.is_empty()
242+
|| !self.sync_queue.is_empty();
243+
244+
if !more_tasks {
245+
self.idle.set(true);
246+
}
247+
more_tasks
246248
}
247249

248250
fn clear(&self) {

0 commit comments

Comments
 (0)