Skip to content

Commit 6d64285

Browse files
committed
Fix send_blocking missing on_send()
1 parent 988029d commit 6d64285

File tree

2 files changed

+99
-77
lines changed

2 files changed

+99
-77
lines changed

src/rx.rs

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,8 @@ impl<T> Rx<T> {
8383

8484
/// Receiver that works in async context
8585
///
86-
/// **NOTE**: this is not clonable. If you need concurrent access, use [crate::MAsyncRx] instead.
86+
/// **NOTE: this is not clonable.**
87+
/// If you need concurrent access, use [MAsyncRx](crate::MAsyncRx) instead.
8788
pub struct AsyncRx<T> {
8889
pub(crate) recv: Receiver<T>,
8990
pub(crate) shared: Arc<ChannelShared>,
@@ -121,7 +122,8 @@ impl<T> AsyncRx<T> {
121122
///
122123
/// returns [RecvError] when all Tx dropped.
123124
///
124-
/// **NOTE**: Do not call concurrently. If you need concurrent access, use [crate::MAsyncRx::recv()] instead.
125+
/// **NOTE: Do not call concurrently.**
126+
/// If you need concurrent access, use [MAsyncRx](crate::MAsyncRx) instead.
125127
#[inline(always)]
126128
pub async fn recv(&self) -> Result<T, RecvError> {
127129
match self.try_recv() {
@@ -135,18 +137,6 @@ impl<T> AsyncRx<T> {
135137
}
136138
}
137139

138-
/// Receive a message while blocking the current thread. (If you know what you're doing)
139-
#[inline(always)]
140-
pub fn recv_blocking(&self) -> Result<T, RecvError> {
141-
match self.recv.recv() {
142-
Err(e) => return Err(e),
143-
Ok(i) => {
144-
self.shared.on_recv();
145-
return Ok(i);
146-
}
147-
}
148-
}
149-
150140
/// Try to receive message, non-blocking.
151141
///
152142
/// Returns Ok(T) on successful.
@@ -247,6 +237,24 @@ impl<T> AsyncRx<T> {
247237
pub fn get_waker_size(&self) -> (usize, usize) {
248238
return self.shared.get_waker_size();
249239
}
240+
241+
/// Receive a message while **blocking the current thread**. Be careful!
242+
///
243+
/// Returns `Ok(T)` on successful.
244+
///
245+
/// Returns Err([RecvError]) when all Tx dropped.
246+
///
247+
/// **NOTE: Do not use it in async context otherwise will block the runtime.**
248+
#[inline(always)]
249+
pub fn recv_blocking(&self) -> Result<T, RecvError> {
250+
match self.recv.recv() {
251+
Err(e) => return Err(e),
252+
Ok(i) => {
253+
self.shared.on_recv();
254+
return Ok(i);
255+
}
256+
}
257+
}
250258
}
251259

252260
/// A fixed-sized future object contructed by [AsyncRx::make_recv_future()]

src/tx.rs

Lines changed: 77 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use crate::channel::*;
22
use async_trait::async_trait;
3-
pub use crossbeam::channel::{SendError, Sender, TrySendError};
3+
use crossbeam::channel::Sender;
4+
pub use crossbeam::channel::{SendError, TrySendError};
45
use std::fmt;
56
use std::future::Future;
67
use std::pin::Pin;
@@ -33,9 +34,9 @@ impl<T> Tx<T> {
3334

3435
/// Send message. Will block when channel is full.
3536
///
36-
/// Returns Ok(()) on successful.
37+
/// Returns `Ok(())` on successful.
3738
///
38-
/// Returns SendError(item) when all Rx is dropped.
39+
/// Returns Err([SendError]) when all Rx is dropped.
3940
///
4041
#[inline]
4142
pub fn send(&self, item: T) -> Result<(), SendError<T>> {
@@ -50,11 +51,11 @@ impl<T> Tx<T> {
5051

5152
/// Try to send message, non-blocking
5253
///
53-
/// Returns Ok() when successful.
54+
/// Returns `Ok(())` when successful.
5455
///
55-
/// Returns [TrySendError::Full] on channel full for bounded channel.
56+
/// Returns Err([TrySendError::Full]) on channel full for bounded channel.
5657
///
57-
/// Returns [TrySendError::Disconnected] when all Rx dropped.
58+
/// Returns Err([TrySendError::Disconnected]) when all Rx dropped.
5859
#[inline]
5960
pub fn try_send(&self, item: T) -> Result<(), TrySendError<T>> {
6061
match self.sender.try_send(item) {
@@ -81,7 +82,8 @@ impl<T> Tx<T> {
8182

8283
/// Sender that works in async context
8384
///
84-
/// **NOTE**: this is not clonable. If you need concurrent access, use [crate::MAsyncTx] instead.
85+
/// **NOTE: this is not clonable.**
86+
/// If you need concurrent access, use [MAsyncTx](crate::MAsyncTx) instead.
8587
pub struct AsyncTx<T> {
8688
pub(crate) sender: Sender<T>,
8789
pub(crate) shared: Arc<ChannelShared>,
@@ -99,58 +101,15 @@ impl<T> Drop for AsyncTx<T> {
99101
}
100102
}
101103

102-
impl<T> AsyncTx<T> {
103-
#[inline]
104-
pub(crate) fn new(sender: Sender<T>, shared: Arc<ChannelShared>) -> Self {
105-
Self { sender, shared }
106-
}
107-
108-
/// Send a message while blocking the current thread. (Used outside async context,
109-
/// if you know what you're doing)
110-
#[inline]
111-
pub fn send_blocking(&self, item: T) -> Result<(), SendError<T>> {
112-
self.sender.send(item)
113-
}
114-
115-
/// Try to send message, non-blocking
116-
///
117-
/// Returns Ok() when successful.
118-
///
119-
/// Returns [TrySendError::Full] on channel full for bounded channel.
120-
///
121-
/// Returns [TrySendError::Disconnected] when all Rx dropped.
122-
#[inline]
123-
pub fn try_send(&self, item: T) -> Result<(), TrySendError<T>> {
124-
match self.sender.try_send(item) {
125-
Err(e) => return Err(e),
126-
Ok(_) => {
127-
self.shared.on_send();
128-
return Ok(());
129-
}
130-
}
131-
}
132-
133-
/// Probe possible messages in the channel (not accurate)
134-
#[inline]
135-
pub fn len(&self) -> usize {
136-
self.sender.len()
137-
}
138-
139-
/// Whether there's message in the channel (not accurate)
140-
#[inline]
141-
pub fn is_empty(&self) -> bool {
142-
self.sender.is_empty()
143-
}
144-
}
145-
146104
impl<T: Unpin + Send + 'static> AsyncTx<T> {
147105
/// Send message. Will await when channel is full.
148106
///
149-
/// Returns Ok(()) on successful.
107+
/// Returns `Ok(())` on successful.
150108
///
151-
/// Returns SendError(item) when all Rx is dropped.
109+
/// Returns Err([SendError]) when all Rx is dropped.
152110
///
153-
/// **NOTE**: Do not call concurrently. If you need concurrent access, use [crate::MAsyncTx::send()] instead.
111+
/// **NOTE: Do not call concurrently.**
112+
/// If you need concurrent access, use [MAsyncTx](crate::MAsyncTx) instead.
154113
#[inline(always)]
155114
pub async fn send(&self, item: T) -> Result<(), SendError<T>> {
156115
match self.try_send(item) {
@@ -170,7 +129,7 @@ impl<T: Unpin + Send + 'static> AsyncTx<T> {
170129

171130
/// This is only useful when you're writing your own future.
172131
///
173-
/// Returns Ok(()) on message sent.
132+
/// Returns `Ok(())` on message sent.
174133
///
175134
/// Returns Err([TrySendError::Full]) for Poll::Pending case.
176135
///
@@ -230,6 +189,61 @@ impl<T: Unpin + Send + 'static> AsyncTx<T> {
230189
}
231190
}
232191

192+
impl<T> AsyncTx<T> {
193+
#[inline]
194+
pub(crate) fn new(sender: Sender<T>, shared: Arc<ChannelShared>) -> Self {
195+
Self { sender, shared }
196+
}
197+
198+
/// Try to send message, non-blocking
199+
///
200+
/// Returns `Ok(())` when successful.
201+
///
202+
/// Returns Err([TrySendError::Full]) on channel full for bounded channel.
203+
///
204+
/// Returns Err([TrySendError::Disconnected]) when all Rx dropped.
205+
#[inline]
206+
pub fn try_send(&self, item: T) -> Result<(), TrySendError<T>> {
207+
match self.sender.try_send(item) {
208+
Err(e) => return Err(e),
209+
Ok(_) => {
210+
self.shared.on_send();
211+
return Ok(());
212+
}
213+
}
214+
}
215+
216+
/// Probe possible messages in the channel (not accurate)
217+
#[inline]
218+
pub fn len(&self) -> usize {
219+
self.sender.len()
220+
}
221+
222+
/// Whether there's message in the channel (not accurate)
223+
#[inline]
224+
pub fn is_empty(&self) -> bool {
225+
self.sender.is_empty()
226+
}
227+
228+
/// Send a message while **blocking the current thread**. Be careful!
229+
///
230+
/// Returns `Ok(())`on successful.
231+
///
232+
/// Returns Err([SendError]) when all Rx is dropped.
233+
///
234+
/// **NOTE: Do not use it in async context otherwise will block the runtime.**
235+
#[inline]
236+
pub fn send_blocking(&self, item: T) -> Result<(), SendError<T>> {
237+
match self.sender.send(item) {
238+
Ok(()) => {
239+
self.shared.on_send();
240+
return Ok(());
241+
}
242+
Err(e) => return Err(e),
243+
}
244+
}
245+
}
246+
233247
/// A fixed-sized future object construted by [AsyncTx::make_send_future()]
234248
pub struct SendFuture<'a, T: Unpin> {
235249
tx: &'a AsyncTx<T>,
@@ -278,18 +292,18 @@ impl<T: Unpin + Send + 'static> Future for SendFuture<'_, T> {
278292
pub trait BlockingTxTrait<T: Send + 'static>: Send + 'static {
279293
/// Send message. Will block when channel is full.
280294
///
281-
/// Returns Ok(()) on successful.
295+
/// Returns `Ok(())` on successful.
282296
///
283-
/// Returns SendError(item) when all Rx is dropped.
297+
/// Returns Err([SendError]) when all Rx is dropped.
284298
fn send(&self, _item: T) -> Result<(), SendError<T>>;
285299

286300
/// Try to send message, non-blocking
287301
///
288-
/// Returns Ok() when successful.
302+
/// Returns `Ok(())` when successful.
289303
///
290-
/// Returns [TrySendError::Full] on channel full for bounded channel.
304+
/// Returns Err([TrySendError::Full]) on channel full for bounded channel.
291305
///
292-
/// Returns [TrySendError::Disconnected] when all Rx dropped.
306+
/// Returns Err([TrySendError::Disconnected]) when all Rx dropped.
293307
fn try_send(&self, _item: T) -> Result<(), TrySendError<T>>;
294308

295309
/// Probe possible messages in the channel (not accurate)
@@ -326,9 +340,9 @@ impl<T: Send + 'static> BlockingTxTrait<T> for Tx<T> {
326340
pub trait AsyncTxTrait<T: Unpin + Send + 'static>: Send + Sync + 'static {
327341
/// Send message. Will await when channel is full.
328342
///
329-
/// Returns Ok(()) on successful.
343+
/// Returns `Ok(())` on successful.
330344
///
331-
/// Returns SendError(item) when all Rx is dropped.
345+
/// Returns Err([SendError]) when all Rx is dropped.
332346
async fn send(&self, item: T) -> Result<(), SendError<T>>;
333347

334348
/// Just for debugging purpose, to monitor queue size
@@ -337,7 +351,7 @@ pub trait AsyncTxTrait<T: Unpin + Send + 'static>: Send + Sync + 'static {
337351

338352
/// Try to send message, non-blocking
339353
///
340-
/// Returns Ok() when successful.
354+
/// Returns `Ok(())` when successful.
341355
///
342356
/// Returns [TrySendError::Full] on channel full for bounded channel.
343357
///

0 commit comments

Comments
 (0)