Skip to content

Commit d7699b7

Browse files
committed
simplify framed dispatcher states
1 parent f52f7e6 commit d7699b7

File tree

3 files changed

+32
-28
lines changed

3 files changed

+32
-28
lines changed

ntex-codec/src/framed.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,13 +126,13 @@ impl<T, U> Framed<T, U> {
126126

127127
#[inline]
128128
/// Get read buffer.
129-
pub fn read_buf_mut(&mut self) -> &mut BytesMut {
129+
pub fn read_buf(&mut self) -> &mut BytesMut {
130130
&mut self.read_buf
131131
}
132132

133133
#[inline]
134134
/// Get write buffer.
135-
pub fn write_buf_mut(&mut self) -> &mut BytesMut {
135+
pub fn write_buf(&mut self) -> &mut BytesMut {
136136
&mut self.write_buf
137137
}
138138

@@ -556,6 +556,8 @@ mod tests {
556556
let data = Bytes::from_static(b"GET /test HTTP/1.1\r\n\r\n");
557557
Pin::new(&mut server).start_send(data).unwrap();
558558
assert_eq!(client.read_any(), b"".as_ref());
559+
assert_eq!(server.read_buf(), b"".as_ref());
560+
assert_eq!(server.write_buf(), b"GET /test HTTP/1.1\r\n\r\n".as_ref());
559561

560562
assert!(lazy(|cx| Pin::new(&mut server).poll_flush(cx))
561563
.await

ntex/CHANGES.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# Changes
22

3-
## [0.1.9] - 2020-04-xx
3+
## [0.1.9] - 2020-04-13
44

55
* ntex::util: Refcator framed dispatcher
66

ntex/src/util/framed.rs

Lines changed: 27 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -180,8 +180,7 @@ where
180180

181181
enum FramedState<S: Service, U: Encoder + Decoder> {
182182
Processing,
183-
Error(DispatcherError<S::Error, U>),
184-
FlushAndStop,
183+
FlushAndStop(Option<DispatcherError<S::Error, U>>),
185184
Shutdown(Option<DispatcherError<S::Error, U>>),
186185
ShutdownIo(Delay, Option<Result<(), DispatcherError<S::Error, U>>>),
187186
}
@@ -192,15 +191,6 @@ enum PollResult {
192191
Pending,
193192
}
194193

195-
impl<S: Service, U: Encoder + Decoder> FramedState<S, U> {
196-
fn take_error(&mut self) -> DispatcherError<S::Error, U> {
197-
match std::mem::replace(self, FramedState::Processing) {
198-
FramedState::Error(err) => err,
199-
_ => panic!(),
200-
}
201-
}
202-
}
203-
204194
struct InnerDispatcher<S, T, U, Out>
205195
where
206196
S: Service<Request = Request<U>, Response = Option<Response<U>>>,
@@ -263,7 +253,8 @@ where
263253
}
264254
Poll::Pending => return PollResult::Pending,
265255
Poll::Ready(Err(err)) => {
266-
self.state = FramedState::Error(DispatcherError::Service(err));
256+
self.state =
257+
FramedState::FlushAndStop(Some(DispatcherError::Service(err)));
267258
return PollResult::Continue;
268259
}
269260
}
@@ -285,7 +276,9 @@ where
285276
continue;
286277
}
287278
Poll::Ready(Some(Err(err))) => {
288-
self.state = FramedState::Error(DispatcherError::Service(err));
279+
self.state = FramedState::FlushAndStop(Some(
280+
DispatcherError::Service(err),
281+
));
289282
return PollResult::Continue;
290283
}
291284
Poll::Ready(None) | Poll::Pending => {}
@@ -304,7 +297,7 @@ where
304297
}
305298
Poll::Ready(None) => {
306299
let _ = self.sink.take();
307-
self.state = FramedState::FlushAndStop;
300+
self.state = FramedState::FlushAndStop(None);
308301
return PollResult::Continue;
309302
}
310303
Poll::Pending => (),
@@ -346,16 +339,7 @@ where
346339
return Poll::Pending;
347340
}
348341
}
349-
FramedState::Error(_) => {
350-
// flush write buffer
351-
if !self.framed.is_write_buf_empty() {
352-
if let Poll::Pending = self.framed.flush(cx) {
353-
return Poll::Pending;
354-
}
355-
}
356-
self.state = FramedState::Shutdown(Some(self.state.take_error()));
357-
}
358-
FramedState::FlushAndStop => {
342+
FramedState::FlushAndStop(ref mut err) => {
359343
// drain service responses
360344
match Pin::new(&mut self.rx).poll_next(cx) {
361345
Poll::Ready(Some(Ok(msg))) => {
@@ -385,7 +369,7 @@ where
385369
Poll::Ready(_) => (),
386370
}
387371
};
388-
self.state = FramedState::Shutdown(None);
372+
self.state = FramedState::Shutdown(err.take());
389373
}
390374
FramedState::Shutdown(ref mut err) => {
391375
return if self.service.poll_shutdown(cx, err.is_some()).is_ready() {
@@ -440,14 +424,32 @@ where
440424
#[cfg(test)]
441425
mod tests {
442426
use bytes::{Bytes, BytesMut};
427+
use derive_more::Display;
443428
use futures::future::ok;
429+
use std::io;
444430

445431
use super::*;
446432
use crate::channel::mpsc;
447433
use crate::codec::{BytesCodec, Framed};
448434
use crate::rt::time::delay_for;
449435
use crate::testing::Io;
450436

437+
#[test]
438+
fn test_err() {
439+
#[derive(Debug, Display)]
440+
struct TestError;
441+
type T = DispatcherError<TestError, BytesCodec>;
442+
let err = T::Encoder(io::Error::new(io::ErrorKind::Other, "err"));
443+
assert!(format!("{:?}", err).contains("DispatcherError::Encoder"));
444+
assert!(format!("{}", err).contains("Custom"));
445+
let err = T::Decoder(io::Error::new(io::ErrorKind::Other, "err"));
446+
assert!(format!("{:?}", err).contains("DispatcherError::Decoder"));
447+
assert!(format!("{}", err).contains("Custom"));
448+
let err = T::from(TestError);
449+
assert!(format!("{:?}", err).contains("DispatcherError::Service"));
450+
assert_eq!(format!("{}", err), "TestError");
451+
}
452+
451453
#[ntex_rt::test]
452454
async fn test_basic() {
453455
let (client, server) = Io::create();

0 commit comments

Comments
 (0)