Skip to content

Commit e5df9f5

Browse files
committed
Add channel for blocking context
(Currently exactly like crossbeam)
1 parent 0594848 commit e5df9f5

File tree

3 files changed

+91
-0
lines changed

3 files changed

+91
-0
lines changed

src/mpmc.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,19 @@ use crate::blocking_tx::*;
55
/// Multi producers, multi consumers
66
use crate::channel::*;
77

8+
/// Initiate a unbounded channel for blocking context.
9+
///
10+
/// Sender will never block, so we use the same TxBlocking for threads
11+
pub fn unbounded_blocking<T: Unpin>() -> (MTx<T>, MRx<T>) {
12+
let (tx, rx) = crossbeam::channel::unbounded();
13+
let send_wakers = SendWakersBlocking::new();
14+
let recv_wakers = RecvWakersBlocking::new();
15+
let shared = ChannelShared::new(send_wakers, recv_wakers);
16+
let tx = MTx::new(tx, shared.clone());
17+
let rx = MRx::new(rx, shared);
18+
(tx, rx)
19+
}
20+
821
/// Initiate a unbounded channel.
922
///
1023
/// Although sender type is MTx, will never block.
@@ -19,6 +32,23 @@ pub fn unbounded_async<T: Unpin>() -> (MTx<T>, MAsyncRx<T>) {
1932
(tx, rx)
2033
}
2134

35+
/// Initiate a bounded channel for blocking context
36+
///
37+
/// Special case: 0 size is not supported yet, threat it as 1 size for now.
38+
pub fn bounded_blocking<T: Unpin>(mut size: usize) -> (MTx<T>, MRx<T>) {
39+
if size == 0 {
40+
size = 1;
41+
}
42+
let (tx, rx) = crossbeam::channel::bounded(size);
43+
let send_wakers = SendWakersBlocking::new();
44+
let recv_wakers = RecvWakersBlocking::new();
45+
let shared = ChannelShared::new(send_wakers, recv_wakers);
46+
47+
let tx = MTx::new(tx, shared.clone());
48+
let rx = MRx::new(rx, shared);
49+
(tx, rx)
50+
}
51+
2252
/// Initiate a bounded channel that sender and receiver are async.
2353
///
2454
/// Special case: 0 size is not supported yet, threat it as 1 size for now.

src/mpsc.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,19 @@ use crate::blocking_tx::*;
55
/// Multi producers, single consumer
66
use crate::channel::*;
77

8+
/// Initiate a unbounded channel for blocking context.
9+
///
10+
/// Sender will never block, so we use the same TxBlocking for threads
11+
pub fn unbounded_blocking<T: Unpin>() -> (MTx<T>, Rx<T>) {
12+
let (tx, rx) = crossbeam::channel::unbounded();
13+
let send_wakers = SendWakersBlocking::new();
14+
let recv_wakers = RecvWakersBlocking::new();
15+
let shared = ChannelShared::new(send_wakers, recv_wakers);
16+
let tx = MTx::new(tx, shared.clone());
17+
let rx = Rx::new(rx, shared);
18+
(tx, rx)
19+
}
20+
821
/// Initiate a unbounded channel.
922
///
1023
/// Although sender type is MTx, will never block.
@@ -18,6 +31,23 @@ pub fn unbounded_async<T: Unpin>() -> (MTx<T>, AsyncRx<T>) {
1831
(tx, rx)
1932
}
2033

34+
/// Initiate a bounded channel for blocking context
35+
///
36+
/// Special case: 0 size is not supported yet, threat it as 1 size for now.
37+
pub fn bounded_blocking<T: Unpin>(mut size: usize) -> (MTx<T>, Rx<T>) {
38+
if size == 0 {
39+
size = 1;
40+
}
41+
let (tx, rx) = crossbeam::channel::bounded(size);
42+
let send_wakers = SendWakersBlocking::new();
43+
let recv_wakers = RecvWakersBlocking::new();
44+
let shared = ChannelShared::new(send_wakers, recv_wakers);
45+
46+
let tx = MTx::new(tx, shared.clone());
47+
let rx = Rx::new(rx, shared);
48+
(tx, rx)
49+
}
50+
2151
/// Initiate a bounded channel that sender and receiver is async.
2252
///
2353
/// Special case: 0 size is not supported yet, threat it as 1 size for now.

src/spsc.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,21 @@ use crate::blocking_tx::*;
55
/// Single producer, single consumer
66
use crate::channel::*;
77

8+
/// Initiate a unbounded channel for blocking context.
9+
///
10+
/// Sender will never block, so we use the same TxBlocking for threads
11+
pub fn unbounded_blocking<T: Unpin>() -> (Tx<T>, Rx<T>) {
12+
let (tx, rx) = crossbeam::channel::unbounded();
13+
let send_wakers = SendWakersBlocking::new();
14+
let recv_wakers = RecvWakersBlocking::new();
15+
let shared = ChannelShared::new(send_wakers, recv_wakers);
16+
let tx = Tx::new(tx, shared.clone());
17+
let rx = Rx::new(rx, shared);
18+
(tx, rx)
19+
}
20+
821
/// Initiate a unbounded channel.
22+
///
923
/// Sender will never block, so we use the same TxBlocking for threads
1024
pub fn unbounded_async<T: Unpin>() -> (Tx<T>, AsyncRx<T>) {
1125
let (tx, rx) = crossbeam::channel::unbounded();
@@ -17,6 +31,23 @@ pub fn unbounded_async<T: Unpin>() -> (Tx<T>, AsyncRx<T>) {
1731
(tx, rx)
1832
}
1933

34+
/// Initiate a bounded channel for blocking context
35+
///
36+
/// Special case: 0 size is not supported yet, threat it as 1 size for now.
37+
pub fn bounded_blocking<T: Unpin>(mut size: usize) -> (Tx<T>, Rx<T>) {
38+
if size == 0 {
39+
size = 1;
40+
}
41+
let (tx, rx) = crossbeam::channel::bounded(size);
42+
let send_wakers = SendWakersBlocking::new();
43+
let recv_wakers = RecvWakersBlocking::new();
44+
let shared = ChannelShared::new(send_wakers, recv_wakers);
45+
46+
let tx = Tx::new(tx, shared.clone());
47+
let rx = Rx::new(rx, shared);
48+
(tx, rx)
49+
}
50+
2051
/// Initiate a bounded channel that sender and receiver are async
2152
///
2253
/// Special case: 0 size is not supported yet, threat it as 1 size for now.

0 commit comments

Comments
 (0)