Skip to content

Commit 20466e8

Browse files
committed
Implement Stream for WsClient
1 parent 1fe8a99 commit 20466e8

File tree

1 file changed

+21
-1
lines changed

1 file changed

+21
-1
lines changed

src/test.rs

+21-1
Original file line numberDiff line numberDiff line change
@@ -88,11 +88,13 @@ use std::future::Future;
8888
use std::net::SocketAddr;
8989
#[cfg(feature = "websocket")]
9090
use std::pin::Pin;
91+
#[cfg(feature = "websocket")]
9192
use std::task::Context;
9293
#[cfg(feature = "websocket")]
9394
use std::task::{self, Poll};
9495

9596
use bytes::Bytes;
97+
#[cfg(feature = "websocket")]
9698
use futures::channel::mpsc;
9799
#[cfg(feature = "websocket")]
98100
use futures::StreamExt;
@@ -107,11 +109,14 @@ use serde_json;
107109
use tokio::sync::oneshot;
108110

109111
use crate::filter::Filter;
112+
#[cfg(feature = "websocket")]
110113
use crate::filters::ws::Message;
111114
use crate::reject::IsReject;
112115
use crate::reply::Reply;
113116
use crate::route::{self, Route};
114-
use crate::{Request, Sink};
117+
use crate::Request;
118+
#[cfg(feature = "websocket")]
119+
use crate::{Sink, Stream};
115120

116121
use self::inner::OneOrTuple;
117122

@@ -646,6 +651,21 @@ impl Sink<crate::ws::Message> for WsClient {
646651
}
647652
}
648653

654+
#[cfg(feature = "websocket")]
655+
impl Stream for WsClient {
656+
type Item = Result<crate::ws::Message, WsError>;
657+
658+
fn poll_next(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Option<Self::Item>> {
659+
let this = Pin::into_inner(self);
660+
let rx = Pin::new(&mut this.rx);
661+
match rx.poll_next(context) {
662+
Poll::Ready(Some(result)) => Poll::Ready(Some(result.map_err(WsError::new))),
663+
Poll::Ready(None) => Poll::Ready(None),
664+
Poll::Pending => Poll::Pending,
665+
}
666+
}
667+
}
668+
649669
// ===== impl WsError =====
650670

651671
#[cfg(feature = "websocket")]

0 commit comments

Comments
 (0)