Skip to content

Commit b37b54b

Browse files
authored
Readiness handling in io Dispatcher (#693)
1 parent 9201bfb commit b37b54b

File tree

20 files changed

+123
-123
lines changed

20 files changed

+123
-123
lines changed

.github/workflows/cov.yml

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,13 @@ jobs:
1414
- name: Install cargo-llvm-cov
1515
uses: taiki-e/install-action@cargo-llvm-cov
1616

17-
- uses: actions-rust-lang/setup-rust-toolchain@v1
1817
- name: Generate Cargo.lock
19-
run: cargo generate-lockfile
18+
uses: actions-rs/cargo@v1
19+
with:
20+
command: generate-lockfile
21+
22+
- name: Cache Dependencies
23+
uses: Swatinem/rust-cache@v1.0.1
2024

2125
- name: Clean coverage results
2226
run: cargo llvm-cov clean --workspace

.github/workflows/linux.yml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,14 @@ jobs:
2323
with:
2424
toolchain: ${{ matrix.version }}-x86_64-unknown-linux-gnu
2525

26+
- name: Generate Cargo.lock
27+
uses: actions-rs/cargo@v1
28+
with:
29+
command: generate-lockfile
30+
31+
- name: Cache Dependencies
32+
uses: Swatinem/rust-cache@v1.0.1
33+
2634
- name: Run tests (neon)
2735
timeout-minutes: 15
2836
run: |

.github/workflows/osx.yml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,12 @@ jobs:
2323
toolchain: ${{ matrix.version }}-aarch64-apple-darwin
2424

2525
- name: Generate Cargo.lock
26-
run: cargo generate-lockfile
26+
uses: actions-rs/cargo@v1
27+
with:
28+
command: generate-lockfile
29+
30+
- name: Cache Dependencies
31+
uses: Swatinem/rust-cache@v1.0.1
2732

2833
- name: Run tests (neon)
2934
timeout-minutes: 40

.github/workflows/windows.yml

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,17 @@ jobs:
2626
toolchain: ${{ matrix.version }}-x86_64-pc-windows-msvc
2727

2828
- name: Generate Cargo.lock
29-
run: cargo generate-lockfile
29+
uses: actions-rs/cargo@v1
30+
with:
31+
command: generate-lockfile
32+
33+
- name: Cache Dependencies
34+
uses: Swatinem/rust-cache@v1.0.1
3035

3136
- name: Cache vcpkg
3237
uses: actions/cache@v4
3338
with:
34-
path: C:\vcpkg\installed\x64-windows\
39+
path: C:\vcpkg\installed\
3540
key: x86_64-pc-windows-msvc-openssl
3641

3742
- name: Cache cargo registry

ntex-bytes/src/storage.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -451,11 +451,7 @@ impl Storage {
451451

452452
#[inline]
453453
pub(crate) fn capacity(&self) -> usize {
454-
if self.is_inline() {
455-
INLINE_CAP
456-
} else {
457-
self.cap
458-
}
454+
if self.is_inline() { INLINE_CAP } else { self.cap }
459455
}
460456

461457
pub(crate) fn split_off(&mut self, at: usize, create_inline: bool) -> Storage {

ntex-io/CHANGES.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# Changes
22

3+
## [3.2.0] - 2025-12-16
4+
5+
* Shutdown service if readiness fails
6+
37
## [3.1.1] - 2025-12-14
48

59
* Add fmt::Debug impl for RecvError

ntex-io/Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "ntex-io"
3-
version = "3.1.1"
3+
version = "3.2.0"
44
authors = ["ntex contributors <team@ntex.rs>"]
55
description = "Utilities for abstracting io streams"
66
keywords = ["network", "framework", "async", "futures"]
@@ -19,8 +19,8 @@ path = "src/lib.rs"
1919
[dependencies]
2020
ntex-codec = "1"
2121
ntex-bytes = "1"
22-
ntex-util = "3.1"
23-
ntex-service = "3.6.3"
22+
ntex-util = "3.3"
23+
ntex-service = "3.7.3"
2424

2525
bitflags = { workspace = true }
2626
log = { workspace = true }

ntex-io/src/dispatcher.rs

Lines changed: 37 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ enum DispatcherError<S, U> {
8080

8181
enum PollService<U: Encoder + Decoder> {
8282
Item(DispatchItem<U>),
83+
ItemWait(DispatchItem<U>),
8384
Continue,
8485
Ready,
8586
}
@@ -203,30 +204,30 @@ where
203204
loop {
204205
match slf.st {
205206
DispatcherState::Processing => {
206-
let item = match ready!(slf.poll_service(cx)) {
207+
let (item, nowait) = match ready!(slf.poll_service(cx)) {
207208
PollService::Ready => {
208209
// decode incoming bytes if buffer is ready
209210
match slf.shared.io.poll_recv_decode(&slf.shared.codec, cx) {
210211
Ok(decoded) => {
211212
slf.update_timer(&decoded);
212213
if let Some(el) = decoded.item {
213-
DispatchItem::Item(el)
214+
(DispatchItem::Item(el), true)
214215
} else {
215216
return Poll::Pending;
216217
}
217218
}
218219
Err(RecvError::KeepAlive) => {
219220
if let Err(err) = slf.handle_timeout() {
220221
slf.st = DispatcherState::Stop;
221-
err
222+
(err, true)
222223
} else {
223224
continue;
224225
}
225226
}
226227
Err(RecvError::WriteBackpressure) => {
227228
// instruct write task to notify dispatcher when data is flushed
228229
slf.st = DispatcherState::Backpressure;
229-
DispatchItem::WBackPressureEnabled
230+
(DispatchItem::WBackPressureEnabled, true)
230231
}
231232
Err(RecvError::Decoder(err)) => {
232233
log::trace!(
@@ -235,7 +236,7 @@ where
235236
err
236237
);
237238
slf.st = DispatcherState::Stop;
238-
DispatchItem::DecoderError(err)
239+
(DispatchItem::DecoderError(err), true)
239240
}
240241
Err(RecvError::PeerGone(err)) => {
241242
log::trace!(
@@ -244,21 +245,23 @@ where
244245
err
245246
);
246247
slf.st = DispatcherState::Stop;
247-
DispatchItem::Disconnect(err)
248+
(DispatchItem::Disconnect(err), true)
248249
}
249250
}
250251
}
251-
PollService::Item(item) => item,
252+
PollService::Item(item) => (item, true),
253+
PollService::ItemWait(item) => (item, false),
252254
PollService::Continue => continue,
253255
};
254256

255-
slf.call_service(cx, item);
257+
slf.call_service(cx, item, nowait);
256258
}
257259
// handle write back-pressure
258260
DispatcherState::Backpressure => {
259261
match ready!(slf.poll_service(cx)) {
260262
PollService::Ready => (),
261-
PollService::Item(item) => slf.call_service(cx, item),
263+
PollService::Item(item) => slf.call_service(cx, item, true),
264+
PollService::ItemWait(item) => slf.call_service(cx, item, false),
262265
PollService::Continue => continue,
263266
};
264267

@@ -270,7 +273,7 @@ where
270273
slf.st = DispatcherState::Processing;
271274
DispatchItem::WBackPressureDisabled
272275
};
273-
slf.call_service(cx, item);
276+
slf.call_service(cx, item, false);
274277
}
275278
// drain service responses and shutdown io
276279
DispatcherState::Stop => {
@@ -335,8 +338,12 @@ where
335338
S: Service<DispatchItem<U>, Response = Option<Response<U>>> + 'static,
336339
U: Decoder + Encoder + 'static,
337340
{
338-
fn call_service(&mut self, cx: &mut Context<'_>, item: DispatchItem<U>) {
339-
let mut fut = self.shared.service.call_nowait(item);
341+
fn call_service(&mut self, cx: &mut Context<'_>, item: DispatchItem<U>, nowait: bool) {
342+
let mut fut = if nowait {
343+
self.shared.service.call_nowait(item)
344+
} else {
345+
self.shared.service.call(item)
346+
};
340347
let inflight = self.shared.inflight.get() + 1;
341348
self.shared.inflight.set(inflight);
342349
if inflight == 1 {
@@ -405,7 +412,7 @@ where
405412
self.shared.io.tag()
406413
);
407414
self.st = DispatcherState::Stop;
408-
Poll::Ready(PollService::Item(DispatchItem::KeepAliveTimeout))
415+
Poll::Ready(PollService::ItemWait(DispatchItem::KeepAliveTimeout))
409416
}
410417
IoStatusUpdate::PeerGone(err) => {
411418
log::trace!(
@@ -414,11 +421,13 @@ where
414421
err
415422
);
416423
self.st = DispatcherState::Stop;
417-
Poll::Ready(PollService::Item(DispatchItem::Disconnect(err)))
424+
Poll::Ready(PollService::ItemWait(DispatchItem::Disconnect(err)))
418425
}
419426
IoStatusUpdate::WriteBackpressure => {
420427
self.st = DispatcherState::Backpressure;
421-
Poll::Ready(PollService::Item(DispatchItem::WBackPressureEnabled))
428+
Poll::Ready(PollService::ItemWait(
429+
DispatchItem::WBackPressureEnabled,
430+
))
422431
}
423432
}
424433
}
@@ -431,7 +440,7 @@ where
431440
self.st = DispatcherState::Stop;
432441
self.error = Some(err);
433442
self.shared.insert_flags(Flags::READY_ERR);
434-
Poll::Ready(PollService::Item(DispatchItem::Disconnect(None)))
443+
Poll::Ready(PollService::Continue)
435444
}
436445
}
437446
}
@@ -618,7 +627,7 @@ mod tests {
618627
}
619628

620629
#[ntex::test]
621-
async fn test_basic() {
630+
async fn basics() {
622631
let (client, server) = IoTest::create();
623632
client.remote_buffer_cap(1024);
624633
client.write("GET /test HTTP/1\r\n\r\n");
@@ -654,7 +663,7 @@ mod tests {
654663
}
655664

656665
#[ntex::test]
657-
async fn test_sink() {
666+
async fn sink() {
658667
let (client, server) = IoTest::create();
659668
client.remote_buffer_cap(1024);
660669
client.write("GET /test HTTP/1\r\n\r\n");
@@ -693,7 +702,7 @@ mod tests {
693702
}
694703

695704
#[ntex::test]
696-
async fn test_err_in_service() {
705+
async fn err_in_service() {
697706
let (client, server) = IoTest::create();
698707
client.remote_buffer_cap(0);
699708
client.write("GET /test HTTP/1\r\n\r\n");
@@ -730,7 +739,7 @@ mod tests {
730739
}
731740

732741
#[ntex::test]
733-
async fn test_err_in_service_ready() {
742+
async fn err_in_service_ready() {
734743
let (client, server) = IoTest::create();
735744
client.remote_buffer_cap(0);
736745
client.write("GET /test HTTP/1\r\n\r\n");
@@ -787,7 +796,7 @@ mod tests {
787796
}
788797

789798
#[ntex::test]
790-
async fn test_write_backpressure() {
799+
async fn write_backpressure() {
791800
let (client, server) = IoTest::create();
792801
// do not allow to write to socket
793802
client.remote_buffer_cap(0);
@@ -862,7 +871,7 @@ mod tests {
862871
}
863872

864873
#[ntex::test]
865-
async fn test_disconnect_during_read_backpressure() {
874+
async fn disconnect_during_read_backpressure() {
866875
let (client, server) = IoTest::create();
867876
client.remote_buffer_cap(0);
868877

@@ -911,7 +920,7 @@ mod tests {
911920
}
912921

913922
#[ntex::test]
914-
async fn test_keepalive() {
923+
async fn keepalive() {
915924
let (client, server) = IoTest::create();
916925
client.remote_buffer_cap(1024);
917926
client.write("GET /test HTTP/1\r\n\r\n");
@@ -961,7 +970,7 @@ mod tests {
961970
}
962971

963972
#[ntex::test]
964-
async fn test_keepalive2() {
973+
async fn keepalive2() {
965974
let (client, server) = IoTest::create();
966975
client.remote_buffer_cap(1024);
967976

@@ -1012,7 +1021,7 @@ mod tests {
10121021

10131022
/// Update keep-alive timer after receiving frame
10141023
#[ntex::test]
1015-
async fn test_keepalive3() {
1024+
async fn keepalive3() {
10161025
let (client, server) = IoTest::create();
10171026
client.remote_buffer_cap(1024);
10181027

@@ -1069,7 +1078,7 @@ mod tests {
10691078
}
10701079

10711080
#[ntex::test]
1072-
async fn test_read_timeout() {
1081+
async fn read_timeout() {
10731082
let (client, server) = IoTest::create();
10741083
client.remote_buffer_cap(1024);
10751084

@@ -1129,7 +1138,7 @@ mod tests {
11291138
}
11301139

11311140
#[ntex::test]
1132-
async fn test_idle_timeout() {
1141+
async fn idle_timeout() {
11331142
let (client, server) = IoTest::create();
11341143
client.remote_buffer_cap(1024);
11351144

@@ -1181,7 +1190,7 @@ mod tests {
11811190
}
11821191

11831192
#[ntex::test]
1184-
async fn test_unhandled_data() {
1193+
async fn unhandled_data() {
11851194
let handled = Arc::new(AtomicBool::new(false));
11861195
let handled2 = handled.clone();
11871196

ntex-net/src/rt_uring/driver.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -134,11 +134,7 @@ impl StreamOps {
134134
rd_op: None,
135135
wr_op: None,
136136
ref_count: 1,
137-
flags: if zc {
138-
self.0.default_flags
139-
} else {
140-
Flags::NO_ZC
141-
},
137+
flags: if zc { self.0.default_flags } else { Flags::NO_ZC },
142138
context: Some(context),
143139
};
144140

ntex-router/src/path.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -68,11 +68,7 @@ impl<T: ResourcePath> Path<T> {
6868
pub fn path(&self) -> &str {
6969
let skip = self.skip as usize;
7070
let path = self.path.path();
71-
if skip <= path.len() {
72-
&path[skip..]
73-
} else {
74-
""
75-
}
71+
if skip <= path.len() { &path[skip..] } else { "" }
7672
}
7773

7874
#[inline]

0 commit comments

Comments
 (0)