Skip to content

Commit 2761535

Browse files
authored
Use ntex_dispatcher::Dispatcher instead of ntex-io (#727)
1 parent 67b97b4 commit 2761535

File tree

10 files changed

+60
-59
lines changed

10 files changed

+60
-59
lines changed

Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,10 @@ ntex-macros = { path = "ntex-macros" }
4242
ntex-util = { path = "ntex-util" }
4343

4444
[workspace.dependencies]
45-
ntex = "3.0.0-pre.14"
45+
ntex = "3.0.0-pre.15"
4646
ntex-bytes = "1.4.1"
4747
ntex-codec = "1.1.0"
48-
ntex-io = "3.5.0"
48+
ntex-io = "3.6.0"
4949
ntex-dispatcher = "3.0.0"
5050
ntex-net = "3.5.1"
5151
ntex-http = "1.0.0"
@@ -56,7 +56,7 @@ ntex-service = "4.0.0"
5656
ntex-tls = "3.2.0"
5757
ntex-macros = "3.1.0"
5858
ntex-util = "3.4.0"
59-
ntex-h2 = "3.4.0"
59+
ntex-h2 = "3.5.0"
6060

6161
ntex-polling = "3.10.0"
6262
ntex-io-uring = "0.7.120"

ntex-io/CHANGES.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# Changes
22

3+
## [3.6.0] - 2026-01-29
4+
5+
* Deprecated Dispatcher
6+
37
## [3.5.0] - 2026-01-21
48

59
* Set IoConfig ref lifetime

ntex-io/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "ntex-io"
3-
version = "3.5.0"
3+
version = "3.6.0"
44
authors = ["ntex contributors <team@ntex.rs>"]
55
description = "Utilities for abstracting io streams"
66
keywords = ["network", "framework", "async", "futures"]

ntex-io/src/dispatcher.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
//! Framed transport dispatcher
2-
#![allow(clippy::let_underscore_future)]
2+
#![allow(deprecated, clippy::let_underscore_future)]
33
use std::task::{Context, Poll, ready};
44
use std::{cell::Cell, future::Future, pin::Pin, rc::Rc};
55

@@ -14,6 +14,7 @@ type Response<U> = <U as Encoder>::Item;
1414
pin_project_lite::pin_project! {
1515
/// Dispatcher - is a future that reads frames from bytes stream
1616
/// and pass then to the service.
17+
#[deprecated]
1718
pub struct Dispatcher<S, U>
1819
where
1920
S: Service<DispatchItem<U>, Response = Option<Response<U>>>,

ntex-io/src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
//! Utilities for abstructing io streams
22
#![deny(rust_2018_idioms, unreachable_pub, missing_debug_implementations)]
3-
#![allow(async_fn_in_trait)]
3+
#![allow(deprecated, async_fn_in_trait)]
44

55
use std::io::{Error as IoError, Result as IoResult};
66
use std::{any::Any, any::TypeId, fmt, task::Context, task::Poll};
@@ -176,6 +176,7 @@ where
176176
}
177177
}
178178

179+
#[deprecated]
179180
/// Dispatcher item
180181
pub enum DispatchItem<U: Encoder + Decoder> {
181182
Item(<U as Decoder>::Item),

ntex/CHANGES.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
# Changes
22

3-
## [3.0.0-pre.14] 2026-01-xx
3+
## [3.0.0-pre.15] 2026-01-29
4+
5+
* Use ntex_dispatcher::Dispatcher instead of ntex-io
6+
7+
## [3.0.0-pre.14] 2026-01-28
48

59
* MSRV 1.88
610

ntex/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "ntex"
3-
version = "3.0.0-pre.14"
3+
version = "3.0.0-pre.15"
44
authors = ["ntex contributors <team@ntex.rs>"]
55
description = "Framework for composable network services"
66
readme = "README.md"
@@ -72,6 +72,7 @@ no-test-logging = []
7272
[dependencies]
7373
ntex-bytes = { workspace = true }
7474
ntex-codec = { workspace = true }
75+
ntex-dispatcher = { workspace = true }
7576
ntex-http = { workspace = true }
7677
ntex-router = { workspace = true }
7778
ntex-service = { workspace = true }

ntex/src/lib.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,14 @@ pub mod time {
104104

105105
pub mod io {
106106
//! IO streaming utilities.
107-
pub use ntex_io::*;
107+
pub use ntex_dispatcher::*;
108+
109+
pub use ntex_io::{
110+
Base, Decoded, Filter, FilterCtx, FilterLayer, FilterReadStatus, Flags, Framed,
111+
Handle, Io, IoBoxed, IoConfig, IoContext, IoRef, IoStatusUpdate, IoStream,
112+
IoTaskStatus, Layer, OnDisconnect, ReadBuf, Readiness, RecvError, Sealed,
113+
TimerHandle, WriteBuf, cfg, seal, testing, types,
114+
};
108115
}
109116

110117
pub mod testing {

ntex/src/web/ws.rs

Lines changed: 26 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,13 @@ use std::{fmt, rc::Rc};
44
pub use crate::ws::{CloseCode, CloseReason, Frame, Message, WsSink};
55

66
use crate::http::{StatusCode, body::BodySize, h1};
7+
use crate::io::{DispatchItem, IoConfig, Reason};
78
use crate::service::{
8-
IntoServiceFactory, ServiceFactory, apply_fn, chain_factory, fn_factory_with_config,
9+
IntoServiceFactory, ServiceFactory, chain_factory, fn_factory_with_config, fn_service,
910
};
1011
use crate::web::{HttpRequest, HttpResponse};
1112
use crate::ws::{self, error::HandshakeError, error::WsError, handshake};
12-
use crate::{
13-
SharedCfg, io::DispatchItem, io::IoConfig, rt, time::Seconds, util::Either, util::Ready,
14-
};
13+
use crate::{SharedCfg, rt, time::Seconds};
1514

1615
thread_local! {
1716
static CFG: SharedCfg = SharedCfg::new("WS")
@@ -29,44 +28,30 @@ where
2928
{
3029
let inner_factory = Rc::new(chain_factory(factory).map_err(WsError::Service));
3130

32-
let factory = fn_factory_with_config(move |sink: WsSink| {
33-
let factory = inner_factory.clone();
34-
35-
async move {
36-
let srv = factory.create(sink.clone()).await?;
37-
let sink = sink.clone();
38-
39-
Ok::<_, T::InitError>(apply_fn(srv, move |req, srv| match req {
40-
DispatchItem::<ws::Codec>::Item(item) => {
41-
let s = if matches!(item, Frame::Close(_)) {
42-
Some(sink.clone())
43-
} else {
44-
None
45-
};
46-
Either::Left(async move {
47-
let result = srv.call(item).await;
48-
if let Some(s) = s {
49-
let _ = rt::spawn(async move { s.io().close() });
50-
}
51-
result
52-
})
53-
}
54-
DispatchItem::WBackPressureEnabled
55-
| DispatchItem::WBackPressureDisabled => Either::Right(Ready::Ok(None)),
56-
DispatchItem::KeepAliveTimeout => {
57-
Either::Right(Ready::Err(WsError::KeepAlive))
58-
}
59-
DispatchItem::ReadTimeout => {
60-
Either::Right(Ready::Err(WsError::ReadTimeout))
61-
}
62-
DispatchItem::DecoderError(e) | DispatchItem::EncoderError(e) => {
63-
Either::Right(Ready::Err(WsError::Protocol(e)))
64-
}
65-
DispatchItem::Disconnect(e) => {
66-
Either::Right(Ready::Err(WsError::Disconnected(e)))
31+
let factory = fn_factory_with_config(async move |sink: WsSink| {
32+
let srv = inner_factory.pipeline(sink.clone()).await?;
33+
let sink = sink.clone();
34+
35+
Ok::<_, T::InitError>(fn_service(async move |req| match req {
36+
DispatchItem::<ws::Codec>::Item(item) => {
37+
let s = if matches!(item, Frame::Close(_)) {
38+
Some(sink.clone())
39+
} else {
40+
None
41+
};
42+
let result = srv.call(item).await;
43+
if let Some(s) = s {
44+
let _ = rt::spawn(async move { s.io().close() });
6745
}
68-
}))
69-
}
46+
result
47+
}
48+
DispatchItem::Control(_) => Ok(None),
49+
DispatchItem::Stop(Reason::KeepAliveTimeout) => Err(WsError::KeepAlive),
50+
DispatchItem::Stop(Reason::ReadTimeout) => Err(WsError::ReadTimeout),
51+
DispatchItem::Stop(Reason::Decoder(e))
52+
| DispatchItem::Stop(Reason::Encoder(e)) => Err(WsError::Protocol(e)),
53+
DispatchItem::Stop(Reason::Io(e)) => Err(WsError::Disconnected(e)),
54+
}))
7055
});
7156

7257
start_with(req, factory).await

ntex/src/ws/client.rs

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use crate::connect::{Connect, ConnectError, Connector};
1616
use crate::http::header::{self, AUTHORIZATION, HeaderMap, HeaderName, HeaderValue};
1717
use crate::http::{ConnectionType, RequestHead, RequestHeadType, StatusCode, Uri};
1818
use crate::http::{body::BodySize, error::HttpError, h1};
19-
use crate::io::{Base, DispatchItem, Dispatcher, Filter, Io, Layer, Sealed};
19+
use crate::io::{Base, DispatchItem, Dispatcher, Filter, Io, Layer, Reason, Sealed};
2020
use crate::service::{IntoService, Pipeline, apply_fn, fn_service};
2121
use crate::time::{Millis, timeout};
2222
use crate::{Service, ServiceFactory, SharedCfg, channel::mpsc, rt, util::Ready, ws};
@@ -743,14 +743,12 @@ impl WsConnection<Sealed> {
743743
|req, svc| async move {
744744
match req {
745745
DispatchItem::<ws::Codec>::Item(item) => svc.call(item).await,
746-
DispatchItem::WBackPressureEnabled
747-
| DispatchItem::WBackPressureDisabled => Ok(None),
748-
DispatchItem::KeepAliveTimeout => Err(WsError::KeepAlive),
749-
DispatchItem::ReadTimeout => Err(WsError::ReadTimeout),
750-
DispatchItem::DecoderError(e) | DispatchItem::EncoderError(e) => {
751-
Err(WsError::Protocol(e))
752-
}
753-
DispatchItem::Disconnect(e) => Err(WsError::Disconnected(e)),
746+
DispatchItem::Control(_) => Ok(None),
747+
DispatchItem::Stop(Reason::KeepAliveTimeout) => Err(WsError::KeepAlive),
748+
DispatchItem::Stop(Reason::ReadTimeout) => Err(WsError::ReadTimeout),
749+
DispatchItem::Stop(Reason::Decoder(e))
750+
| DispatchItem::Stop(Reason::Encoder(e)) => Err(WsError::Protocol(e)),
751+
DispatchItem::Stop(Reason::Io(e)) => Err(WsError::Disconnected(e)),
754752
}
755753
},
756754
);

0 commit comments

Comments
 (0)