Skip to content

Commit c7070ae

Browse files
authored
Refactor Io filter management (#742)
1 parent 308fa11 commit c7070ae

File tree

5 files changed

+246
-269
lines changed

5 files changed

+246
-269
lines changed

ntex-io/CHANGES.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# Changes
22

3+
## [3.8.0] - 2026-02-03
4+
5+
* Refactor Io filter management
6+
37
## [3.7.0] - 2026-01-31
48

59
* Fix BufferCfg::resize_min()

ntex-io/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "ntex-io"
3-
version = "3.7.1"
3+
version = "3.8.0"
44
authors = ["ntex contributors <team@ntex.rs>"]
55
description = "Utilities for abstracting io streams"
66
keywords = ["network", "framework", "async", "futures"]

ntex-io/src/filterptr.rs

Lines changed: 235 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,235 @@
1+
use std::{cell::UnsafeCell, mem, ptr};
2+
3+
use crate::{Filter, FilterLayer, Layer, Sealed, filter::NullFilter};
4+
5+
enum Repr {
6+
Filter(*const u8, *const dyn Filter),
7+
Sealed(Box<dyn Filter>),
8+
}
9+
10+
impl Default for Repr {
11+
fn default() -> Self {
12+
Repr::Filter(ptr::null(), NullFilter::get())
13+
}
14+
}
15+
16+
pub(crate) struct FilterPtr(UnsafeCell<Repr>);
17+
18+
impl FilterPtr {
19+
pub(crate) const fn null() -> Self {
20+
Self(UnsafeCell::new(Repr::Filter(
21+
ptr::null(),
22+
NullFilter::get(),
23+
)))
24+
}
25+
26+
pub(crate) fn get(&self) -> &dyn Filter {
27+
match self.as_ref() {
28+
Repr::Filter(_, filter) => unsafe { filter.as_ref().unwrap() },
29+
Repr::Sealed(b) => b.as_ref(),
30+
}
31+
}
32+
33+
pub(crate) fn set<F: Filter>(&self, filter: F) {
34+
let filter = Box::new(filter);
35+
let filter_ref = {
36+
let f: &dyn Filter = filter.as_ref();
37+
f as *const dyn Filter
38+
};
39+
*self.as_mut() = Repr::Filter(Box::into_raw(filter).cast(), filter_ref);
40+
}
41+
42+
/// Get filter, panic if it is not filter
43+
pub(crate) fn filter<F: Filter>(&self) -> &F {
44+
if let Repr::Filter(ptr, _) = self.as_ref() {
45+
assert!(!ptr.is_null(), "Filter is not set");
46+
unsafe { (*ptr).cast::<F>().as_ref().unwrap() }
47+
} else {
48+
panic!("Filter is sealed")
49+
}
50+
}
51+
52+
#[allow(clippy::mut_from_ref)]
53+
fn as_mut(&self) -> &mut Repr {
54+
unsafe { &mut *self.0.get() }
55+
}
56+
57+
fn as_ref(&self) -> &Repr {
58+
unsafe { &*self.0.get() }
59+
}
60+
61+
#[allow(clippy::unnecessary_box_returns)]
62+
/// Get filter, panic if it is not set
63+
pub(crate) fn take_filter<F>(&self) -> Box<F> {
64+
if let Repr::Filter(ptr, _) = mem::take(self.as_mut()) {
65+
assert!(!ptr.is_null(), "Filter is not set");
66+
unsafe { Box::from_raw(ptr as *mut F) }
67+
} else {
68+
panic!("Filter is sealed")
69+
}
70+
}
71+
72+
/// Get sealed, panic if it is already sealed
73+
fn take_sealed(&self) -> Sealed {
74+
if let Repr::Sealed(sealed) = mem::take(self.as_mut()) {
75+
Sealed(sealed)
76+
} else {
77+
panic!("Filter is not sealed")
78+
}
79+
}
80+
81+
pub(crate) fn is_set(&self) -> bool {
82+
match self.as_ref() {
83+
Repr::Filter(ptr, _) => !ptr.is_null(),
84+
Repr::Sealed(_) => true,
85+
}
86+
}
87+
88+
pub(crate) fn add_filter<F: Filter, T: FilterLayer>(&self, new: T) {
89+
assert!(self.is_set(), "Filter is not set");
90+
91+
let repr = match self.as_ref() {
92+
Repr::Filter(..) => {
93+
let filter = Box::new(Layer::new(new, *self.take_filter::<F>()));
94+
let filter_ref = {
95+
let f: &dyn Filter = filter.as_ref();
96+
f as *const dyn Filter
97+
};
98+
Repr::Filter(Box::into_raw(filter).cast(), filter_ref)
99+
}
100+
Repr::Sealed(..) => Repr::Sealed(Box::new(Layer::new(new, self.take_sealed()))),
101+
};
102+
*self.as_mut() = repr;
103+
}
104+
105+
pub(crate) fn map_filter<F: Filter, U, R>(&self, f: U)
106+
where
107+
U: FnOnce(F) -> R,
108+
R: Filter,
109+
{
110+
let filter = Box::new(f(*self.take_filter::<F>()));
111+
let filter_ref = {
112+
let f: &dyn Filter = filter.as_ref();
113+
f as *const dyn Filter
114+
};
115+
*self.as_mut() = Repr::Filter(Box::into_raw(filter).cast(), filter_ref);
116+
}
117+
118+
pub(crate) fn seal<F: Filter>(&self) {
119+
assert!(self.is_set(), "Filter is not set");
120+
121+
if matches!(self.as_ref(), Repr::Filter(..)) {
122+
*self.as_mut() = Repr::Sealed(Box::new(*self.take_filter::<F>()));
123+
}
124+
}
125+
126+
pub(crate) fn drop_filter<F>(&self) {
127+
if let Repr::Filter(ptr, _) = self.as_ref() {
128+
if !ptr.is_null() {
129+
self.take_filter::<F>();
130+
}
131+
} else {
132+
self.take_sealed();
133+
}
134+
}
135+
}
136+
137+
#[cfg(test)]
138+
mod tests {
139+
use std::{cell::Cell, io, rc::Rc};
140+
141+
use ntex_bytes::Bytes;
142+
use ntex_codec::BytesCodec;
143+
144+
use super::*;
145+
use crate::{
146+
Base, Handle, Io, IoContext, IoStream, ReadBuf, WriteBuf, testing::IoTest,
147+
};
148+
149+
const BIN: &[u8] = b"GET /test HTTP/1\r\n\r\n";
150+
const TEXT: &str = "GET /test HTTP/1\r\n\r\n";
151+
152+
#[derive(Debug)]
153+
struct DropFilter {
154+
p: Rc<Cell<usize>>,
155+
}
156+
157+
impl Drop for DropFilter {
158+
fn drop(&mut self) {
159+
self.p.set(self.p.get() + 1);
160+
}
161+
}
162+
163+
impl FilterLayer for DropFilter {
164+
fn process_read_buf(&self, buf: &ReadBuf<'_>) -> io::Result<usize> {
165+
if let Some(src) = buf.take_src() {
166+
let len = src.len();
167+
buf.set_dst(Some(src));
168+
Ok(len)
169+
} else {
170+
Ok(0)
171+
}
172+
}
173+
fn process_write_buf(&self, buf: &WriteBuf<'_>) -> io::Result<()> {
174+
if let Some(src) = buf.take_src() {
175+
buf.set_dst(Some(src));
176+
}
177+
Ok(())
178+
}
179+
}
180+
181+
struct IoTestWrapper;
182+
183+
impl IoStream for IoTestWrapper {
184+
fn start(self, _: IoContext) -> Option<Box<dyn Handle>> {
185+
None
186+
}
187+
}
188+
189+
#[ntex::test]
190+
async fn drop_filter() {
191+
let p = Rc::new(Cell::new(0));
192+
193+
let (client, server) = IoTest::create();
194+
let f = DropFilter { p: p.clone() };
195+
let _ = format!("{f:?}");
196+
let io = Io::from(server).add_filter(f);
197+
198+
client.remote_buffer_cap(1024);
199+
client.write(TEXT);
200+
let msg = io.recv(&BytesCodec).await.unwrap().unwrap();
201+
assert_eq!(msg, Bytes::from_static(BIN));
202+
203+
io.send(Bytes::from_static(b"test"), &BytesCodec)
204+
.await
205+
.unwrap();
206+
let buf = client.read().await.unwrap();
207+
assert_eq!(buf, Bytes::from_static(b"test"));
208+
209+
let io2 = io.take();
210+
let mut io3: crate::IoBoxed = io2.into();
211+
let io4 = io3.take();
212+
213+
drop(io);
214+
drop(io3);
215+
drop(io4);
216+
217+
assert_eq!(p.get(), 1);
218+
}
219+
220+
#[test]
221+
fn take_sealed_filter() {
222+
let p = Rc::new(Cell::new(0));
223+
let f = DropFilter { p: p.clone() };
224+
225+
let io = Io::from(IoTestWrapper).seal();
226+
let _io: Io<Layer<DropFilter, Sealed>> = io.add_filter(f);
227+
}
228+
229+
#[test]
230+
#[should_panic(expected = "Filter is not set")]
231+
fn take_filter_access() {
232+
let fptr = FilterPtr::null();
233+
fptr.filter::<Base>();
234+
}
235+
}

0 commit comments

Comments
 (0)