Skip to content

Commit d10b000

Browse files
committed
Split async and blocking code
1 parent 3fe9835 commit d10b000

File tree

11 files changed

+649
-646
lines changed

11 files changed

+649
-646
lines changed

src/rx.rs renamed to src/async_rx.rs

Lines changed: 85 additions & 161 deletions
Original file line numberDiff line numberDiff line change
@@ -1,105 +1,14 @@
11
use crate::channel::*;
2-
use crate::m_rx::*;
32
use crate::stream::AsyncStream;
43
use async_trait::async_trait;
54
use crossbeam::channel::Receiver;
6-
pub use crossbeam::channel::{RecvError, RecvTimeoutError, TryRecvError};
5+
pub use crossbeam::channel::{RecvError, TryRecvError};
76
use std::fmt;
87
use std::future::Future;
8+
use std::ops::{Deref, DerefMut};
99
use std::pin::Pin;
1010
use std::sync::Arc;
1111
use std::task::{Context, Poll};
12-
use std::time::Duration;
13-
14-
/// Receiver that works in blocking context
15-
pub struct Rx<T> {
16-
pub(crate) recv: Receiver<T>,
17-
pub(crate) shared: Arc<ChannelShared>,
18-
}
19-
20-
impl<T> fmt::Debug for Rx<T> {
21-
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
22-
write!(f, "Rx")
23-
}
24-
}
25-
26-
impl<T> Drop for Rx<T> {
27-
fn drop(&mut self) {
28-
self.shared.close_rx();
29-
}
30-
}
31-
32-
impl<T> Rx<T> {
33-
#[inline]
34-
pub(crate) fn new(recv: Receiver<T>, shared: Arc<ChannelShared>) -> Self {
35-
Self { recv, shared }
36-
}
37-
38-
/// Receive message, will block when channel is empty.
39-
///
40-
/// Returns Ok(T) when successful.
41-
///
42-
/// Returns Err([RecvError]) when all Tx dropped.
43-
#[inline]
44-
pub fn recv<'a>(&'a self) -> Result<T, RecvError> {
45-
match self.recv.recv() {
46-
Err(e) => return Err(e),
47-
Ok(i) => {
48-
self.shared.on_recv();
49-
return Ok(i);
50-
}
51-
}
52-
}
53-
54-
/// Try to receive message, non-blocking.
55-
///
56-
/// Returns Ok(T) when successful.
57-
///
58-
/// Returns Err([TryRecvError::Empty]) when channel is empty.
59-
///
60-
/// returns Err([TryRecvError::Disconnected]) when all Tx dropped.
61-
#[inline]
62-
pub fn try_recv(&self) -> Result<T, TryRecvError> {
63-
match self.recv.try_recv() {
64-
Err(e) => return Err(e),
65-
Ok(i) => {
66-
self.shared.on_recv();
67-
return Ok(i);
68-
}
69-
}
70-
}
71-
72-
/// Waits for a message to be received from the channel, but only for a limited time.
73-
/// Will block when channel is empty.
74-
///
75-
/// Returns Ok(T) when successful.
76-
///
77-
/// Returns Err([RecvTimeoutError::Timeout]) when a message could not be received because the channel is empty and the operation timed out.
78-
///
79-
/// returns Err([RecvTimeoutError::Disconnected]) when all Tx dropped.
80-
#[inline]
81-
pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
82-
match self.recv.recv_timeout(timeout) {
83-
Err(e) => return Err(e),
84-
Ok(i) => {
85-
self.shared.on_recv();
86-
return Ok(i);
87-
}
88-
}
89-
}
90-
91-
/// Probe possible messages in the channel (not accurate)
92-
#[inline]
93-
pub fn len(&self) -> usize {
94-
self.recv.len()
95-
}
96-
97-
/// Whether there's message in the channel (not accurate)
98-
#[inline]
99-
pub fn is_empty(&self) -> bool {
100-
self.recv.is_empty()
101-
}
102-
}
10312

10413
/// Receiver that works in async context
10514
///
@@ -317,143 +226,158 @@ impl<T> Future for ReceiveFuture<'_, T> {
317226
}
318227
}
319228

320-
impl<T> From<MRx<T>> for Rx<T> {
321-
fn from(rx: MRx<T>) -> Self {
322-
rx.0
323-
}
324-
}
325-
326-
impl<T> From<MAsyncRx<T>> for AsyncRx<T> {
327-
fn from(rx: MAsyncRx<T>) -> Self {
328-
rx.0
329-
}
330-
}
331-
332-
/// For writing generic code with MRx & Rx
333-
pub trait BlockingRxTrait<T: Send + 'static>: Send + 'static {
334-
/// Receive message, will block when channel is empty.
229+
/// For writing generic code with MAsyncRx & AsyncRx
230+
#[async_trait]
231+
pub trait AsyncRxTrait<T: Unpin + Send + 'static>: Send + Sync + 'static {
232+
/// Receive message, will await when channel is empty.
335233
///
336234
/// Returns `Ok(T)` when successful.
337235
///
338-
/// Returns Err([RecvError]) when all Tx dropped.
339-
fn recv<'a>(&'a self) -> Result<T, RecvError>;
236+
/// returns Err([RecvError]) when all Tx dropped.
237+
async fn recv(&self) -> Result<T, RecvError>;
340238

341239
/// Try to receive message, non-blocking.
342240
///
343-
/// Returns `Ok(T)` when successful.
241+
/// Returns Ok(T) when successful.
344242
///
345243
/// Returns Err([TryRecvError::Empty]) when channel is empty.
346244
///
347245
/// Returns Err([TryRecvError::Disconnected]) when all Tx dropped.
348246
fn try_recv(&self) -> Result<T, TryRecvError>;
349247

350-
/// Waits for a message to be received from the channel, but only for a limited time.
351-
/// Will block when channel is empty.
352-
///
353-
/// Returns Ok(T) when successful.
354-
///
355-
/// Returns Err([RecvTimeoutError::Timeout]) when a message could not be received because the channel is empty and the operation timed out.
356-
///
357-
/// returns Err([RecvTimeoutError::Disconnected]) when all Tx dropped.
358-
fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError>;
248+
/// Generate a fixed Sized future object that receive a message
249+
fn make_recv_future<'a>(&'a self) -> ReceiveFuture<'a, T>;
359250

360251
/// Probe possible messages in the channel (not accurate)
361252
fn len(&self) -> usize;
362253

363254
/// Whether there's message in the channel (not accurate)
364255
fn is_empty(&self) -> bool;
256+
257+
/// Returns count of tx / rx wakers stored in channel for debug purpose
258+
#[cfg(test)]
259+
fn get_waker_size(&self) -> (usize, usize);
365260
}
366261

367-
impl<T: Send + 'static> BlockingRxTrait<T> for Rx<T> {
262+
#[async_trait]
263+
impl<T: Unpin + Send + 'static> AsyncRxTrait<T> for AsyncRx<T> {
368264
#[inline(always)]
369-
fn recv<'a>(&'a self) -> Result<T, RecvError> {
370-
Rx::recv(self)
265+
async fn recv(&self) -> Result<T, RecvError> {
266+
AsyncRx::recv(self).await
371267
}
372268

373269
#[inline(always)]
374270
fn try_recv(&self) -> Result<T, TryRecvError> {
375-
Rx::try_recv(self)
271+
AsyncRx::try_recv(self)
376272
}
377273

378274
#[inline(always)]
379-
fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
380-
Rx::recv_timeout(self, timeout)
275+
fn make_recv_future<'a>(&'a self) -> ReceiveFuture<'a, T> {
276+
AsyncRx::make_recv_future(self)
381277
}
382278

383279
#[inline(always)]
384280
fn len(&self) -> usize {
385-
Rx::len(self)
281+
AsyncRx::len(self)
386282
}
387283

388284
#[inline(always)]
389285
fn is_empty(&self) -> bool {
390-
Rx::is_empty(self)
286+
AsyncRx::is_empty(self)
287+
}
288+
289+
#[inline(always)]
290+
#[cfg(test)]
291+
fn get_waker_size(&self) -> (usize, usize) {
292+
AsyncRx::get_waker_size(self)
391293
}
392294
}
393295

394-
/// For writing generic code with MAsyncRx & AsyncRx
395-
#[async_trait]
396-
pub trait AsyncRxTrait<T: Unpin + Send + 'static>: Send + Sync + 'static {
397-
/// Receive message, will await when channel is empty.
398-
///
399-
/// Returns `Ok(T)` when successful.
400-
///
401-
/// returns Err([RecvError]) when all Tx dropped.
402-
async fn recv(&self) -> Result<T, RecvError>;
296+
/// Receiver that works in async context. MC version of [`AsyncRx<T>`] implements [Clone].
297+
///
298+
/// You can use `into()` to convert it to `AsyncRx<T>`.
299+
pub struct MAsyncRx<T>(pub(crate) AsyncRx<T>);
403300

404-
/// Try to receive message, non-blocking.
405-
///
406-
/// Returns Ok(T) when successful.
407-
///
408-
/// Returns Err([TryRecvError::Empty]) when channel is empty.
409-
///
410-
/// Returns Err([TryRecvError::Disconnected]) when all Tx dropped.
411-
fn try_recv(&self) -> Result<T, TryRecvError>;
301+
impl<T> Clone for MAsyncRx<T> {
302+
#[inline]
303+
fn clone(&self) -> Self {
304+
let inner = &self.0;
305+
inner.shared.add_rx();
306+
Self(AsyncRx { recv: inner.recv.clone(), shared: inner.shared.clone() })
307+
}
308+
}
412309

413-
/// Generate a fixed Sized future object that receive a message
414-
fn make_recv_future<'a>(&'a self) -> ReceiveFuture<'a, T>;
310+
impl<T> From<MAsyncRx<T>> for AsyncRx<T> {
311+
fn from(rx: MAsyncRx<T>) -> Self {
312+
rx.0
313+
}
314+
}
415315

416-
/// Probe possible messages in the channel (not accurate)
417-
fn len(&self) -> usize;
316+
impl<T> MAsyncRx<T> {
317+
#[inline]
318+
pub(crate) fn new(recv: Receiver<T>, shared: Arc<ChannelShared>) -> Self {
319+
Self(AsyncRx::new(recv, shared))
320+
}
418321

419-
/// Whether there's message in the channel (not accurate)
420-
fn is_empty(&self) -> bool;
322+
#[inline]
323+
pub fn into_stream(self) -> AsyncStream<T>
324+
where
325+
T: Sync + Send + Unpin + 'static,
326+
{
327+
AsyncStream::new(self.0)
328+
}
329+
}
421330

422-
/// Returns count of tx / rx wakers stored in channel for debug purpose
423-
#[cfg(test)]
424-
fn get_waker_size(&self) -> (usize, usize);
331+
impl<T> Deref for MAsyncRx<T> {
332+
type Target = AsyncRx<T>;
333+
334+
/// inherit all the functions of [AsyncRx]
335+
fn deref(&self) -> &Self::Target {
336+
&self.0
337+
}
338+
}
339+
340+
impl<T> DerefMut for MAsyncRx<T> {
341+
/// inherit all the functions of [AsyncRx]
342+
fn deref_mut(&mut self) -> &mut Self::Target {
343+
&mut self.0
344+
}
425345
}
426346

427347
#[async_trait]
428-
impl<T: Unpin + Send + 'static> AsyncRxTrait<T> for AsyncRx<T> {
348+
impl<T: Unpin + Send + 'static> AsyncRxTrait<T> for MAsyncRx<T> {
429349
#[inline(always)]
430350
async fn recv(&self) -> Result<T, RecvError> {
431-
AsyncRx::recv(self).await
351+
self.0.recv().await
432352
}
433353

434354
#[inline(always)]
435355
fn try_recv(&self) -> Result<T, TryRecvError> {
436-
AsyncRx::try_recv(self)
356+
self.0.try_recv()
437357
}
438358

359+
/// Generate a fixed Sized future object that receive a message
439360
#[inline(always)]
440361
fn make_recv_future<'a>(&'a self) -> ReceiveFuture<'a, T> {
441-
AsyncRx::make_recv_future(self)
362+
self.0.make_recv_future()
442363
}
443364

365+
/// Probe possible messages in the channel (not accurate)
444366
#[inline(always)]
445367
fn len(&self) -> usize {
446-
AsyncRx::len(self)
368+
self.0.len()
447369
}
448370

371+
/// Whether there's message in the channel (not accurate)
449372
#[inline(always)]
450373
fn is_empty(&self) -> bool {
451-
AsyncRx::is_empty(self)
374+
self.0.is_empty()
452375
}
453376

377+
/// Returns count of tx / rx wakers stored in channel for debug purpose
454378
#[inline(always)]
455379
#[cfg(test)]
456380
fn get_waker_size(&self) -> (usize, usize) {
457-
AsyncRx::get_waker_size(self)
381+
self.0.get_waker_size()
458382
}
459383
}

0 commit comments

Comments
 (0)