Skip to content

Commit 77fa91a

Browse files
committed
Switch WsClient from tokio's unbounded channel to futures unbounded channel
1 parent 9e74ff9 commit 77fa91a

File tree

1 file changed

+8
-10
lines changed

1 file changed

+8
-10
lines changed

src/test.rs

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ use std::pin::Pin;
9292
use std::task::{self, Poll};
9393

9494
use bytes::Bytes;
95+
use futures::channel::mpsc;
9596
#[cfg(feature = "websocket")]
9697
use futures::StreamExt;
9798
use futures::{future, FutureExt, TryFutureExt};
@@ -102,9 +103,7 @@ use http::{
102103
use serde::Serialize;
103104
use serde_json;
104105
#[cfg(feature = "websocket")]
105-
use tokio::sync::{mpsc, oneshot};
106-
#[cfg(feature = "websocket")]
107-
use tokio_stream::wrappers::UnboundedReceiverStream;
106+
use tokio::sync::oneshot;
108107

109108
use crate::filter::Filter;
110109
use crate::reject::IsReject;
@@ -484,9 +483,8 @@ impl WsBuilder {
484483
F::Error: IsReject + Send,
485484
{
486485
let (upgraded_tx, upgraded_rx) = oneshot::channel();
487-
let (wr_tx, wr_rx) = mpsc::unbounded_channel();
488-
let wr_rx = UnboundedReceiverStream::new(wr_rx);
489-
let (rd_tx, rd_rx) = mpsc::unbounded_channel();
486+
let (wr_tx, wr_rx) = mpsc::unbounded();
487+
let (rd_tx, rd_rx) = mpsc::unbounded();
490488

491489
tokio::spawn(async move {
492490
use tokio_tungstenite::tungstenite::protocol;
@@ -546,7 +544,7 @@ impl WsBuilder {
546544
Ok(m) => future::ready(!m.is_close()),
547545
})
548546
.for_each(move |item| {
549-
rd_tx.send(item).expect("ws receive error");
547+
rd_tx.unbounded_send(item).expect("ws receive error");
550548
future::ready(())
551549
});
552550

@@ -573,13 +571,13 @@ impl WsClient {
573571

574572
/// Send a websocket message to the server.
575573
pub async fn send(&mut self, msg: crate::ws::Message) {
576-
self.tx.send(msg).unwrap();
574+
self.tx.unbounded_send(msg).unwrap();
577575
}
578576

579577
/// Receive a websocket message from the server.
580578
pub async fn recv(&mut self) -> Result<crate::filters::ws::Message, WsError> {
581579
self.rx
582-
.recv()
580+
.next()
583581
.await
584582
.map(|result| result.map_err(WsError::new))
585583
.unwrap_or_else(|| {
@@ -591,7 +589,7 @@ impl WsClient {
591589
/// Assert the server has closed the connection.
592590
pub async fn recv_closed(&mut self) -> Result<(), WsError> {
593591
self.rx
594-
.recv()
592+
.next()
595593
.await
596594
.map(|result| match result {
597595
Ok(msg) => Err(WsError::new(format!("received message: {:?}", msg))),

0 commit comments

Comments
 (0)