Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "crossfire"
version = "2.0.0"
authors = ["plan <frostyplanet@gmail.com>"]
edition = "2024"
edition = "2021"
license = "Apache-2.0"
homepage = "https://github.com/frostyplanet/crossfire-rs"
readme = "README.md"
Expand All @@ -12,6 +12,7 @@ keywords = ["async", "non-blocking", "lock-free", "channel"]
categories = ["concurrency", "data-structures"]
exclude = ["/ci/*", "/bors.toml"]
description = "channels for async and threads"
Comment thread
zach-schoenberger marked this conversation as resolved.
rust-version = "1.61"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

Expand All @@ -23,7 +24,7 @@ async-trait = "0"

[dev-dependencies]
tokio = { version = "1", features = ["time", "sync", "rt-multi-thread", "rt", "macros"] }
rand = "0.7.3"
rand = "0.9"
rstest = "0"
log = "0"
captains-log = "0"
Expand Down
2 changes: 1 addition & 1 deletion rustfmt.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
edition = "2024"
edition = "2021"
fn_params_layout = "Compressed"
newline_style = "Unix"
use_small_heuristics = "Max"
Expand Down
7 changes: 7 additions & 0 deletions src/m_rx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ use crate::rx::*;
use crate::stream::AsyncStream;
use async_trait::async_trait;
use crossbeam::channel::Receiver;
use crossbeam::channel::RecvTimeoutError;
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use std::time::Duration;

/// Receiver that works in blocking context. MC version of [`Rx<T>`] implements [Clone].
///
Expand Down Expand Up @@ -99,6 +101,11 @@ impl<T: Send + 'static> BlockingRxTrait<T> for MRx<T> {
self.0.try_recv()
}

#[inline(always)]
fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
self.0.recv_timeout(timeout)
}

/// Probe possible messages in the channel (not accurate)
#[inline(always)]
fn len(&self) -> usize {
Expand Down
7 changes: 7 additions & 0 deletions src/m_tx.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use crate::channel::*;
use crate::tx::*;
use async_trait::async_trait;
use crossbeam::channel::SendTimeoutError;
use crossbeam::channel::Sender;
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use std::time::Duration;

/// Sender that works in blocking context. MP version of [`Tx<T>`] implements [Clone].
///
Expand Down Expand Up @@ -90,6 +92,11 @@ impl<T: Send + 'static> BlockingTxTrait<T> for MTx<T> {
self.0.try_send(item)
}

#[inline(always)]
fn send_timeout(&self, item: T, timeout: Duration) -> Result<(), SendTimeoutError<T>> {
self.0.send_timeout(item, timeout)
}

#[inline(always)]
fn len(&self) -> usize {
self.0.len()
Expand Down
37 changes: 36 additions & 1 deletion src/rx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ use crate::m_rx::*;
use crate::stream::AsyncStream;
use async_trait::async_trait;
use crossbeam::channel::Receiver;
pub use crossbeam::channel::{RecvError, TryRecvError};
pub use crossbeam::channel::{RecvError, RecvTimeoutError, TryRecvError};
use std::fmt;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;

/// Receiver that works in blocking context
pub struct Rx<T> {
Expand Down Expand Up @@ -68,6 +69,25 @@ impl<T> Rx<T> {
}
}

/// Waits for a message to be received from the channel, but only for a limited time.
/// Will block when channel is empty.
///
/// Returns Ok(T) when successful.
///
/// Returns Err([RecvTimeoutError::Timeout]) when a message could not be received because the channel is empty and the operation timed out.
///
/// returns Err([RecvTimeoutError::Disconnected]) when all Tx dropped.
#[inline]
pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
match self.recv.recv_timeout(timeout) {
Err(e) => return Err(e),
Ok(i) => {
self.shared.on_recv();
return Ok(i);
}
}
}

/// Probe possible messages in the channel (not accurate)
#[inline]
pub fn len(&self) -> usize {
Expand Down Expand Up @@ -327,6 +347,16 @@ pub trait BlockingRxTrait<T: Send + 'static>: Send + 'static {
/// Returns Err([TryRecvError::Disconnected]) when all Tx dropped.
fn try_recv(&self) -> Result<T, TryRecvError>;

/// Waits for a message to be received from the channel, but only for a limited time.
/// Will block when channel is empty.
///
/// Returns Ok(T) when successful.
///
/// Returns Err([RecvTimeoutError::Timeout]) when a message could not be received because the channel is empty and the operation timed out.
///
/// returns Err([RecvTimeoutError::Disconnected]) when all Tx dropped.
fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError>;

/// Probe possible messages in the channel (not accurate)
fn len(&self) -> usize;

Expand All @@ -345,6 +375,11 @@ impl<T: Send + 'static> BlockingRxTrait<T> for Rx<T> {
Rx::try_recv(self)
}

#[inline(always)]
fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
Rx::recv_timeout(self, timeout)
}

#[inline(always)]
fn len(&self) -> usize {
Rx::len(self)
Expand Down
54 changes: 53 additions & 1 deletion src/tests/test_async_blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use super::common::*;
use crate::*;
use log::*;
use rstest::*;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Duration;

Expand Down Expand Up @@ -49,6 +49,58 @@ async fn test_basic_1_tx_async_1_rx_blocking<T: AsyncTxTrait<usize>, R: Blocking
rt.shutdown_background(); // Prevent panic on runtime drop
}

#[rstest]
#[case(spsc::bounded_tx_async_rx_blocking::<usize>(100))]
#[case(mpsc::bounded_tx_async_rx_blocking::<usize>(100))]
#[case(mpmc::bounded_tx_async_rx_blocking::<usize>(100))]
#[tokio::test]
async fn test_timeout_1_tx_async_1_rx_blocking<
T: AsyncTxTrait<usize>,
R: BlockingRxTrait<usize>,
>(
setup_log: (), #[case] channel: (T, R),
) {
let _ = setup_log; // Disable unused var warning
let (tx, rx) = channel;

let rx_res = rx.try_recv();
assert!(rx_res.is_err());
assert!(rx_res.unwrap_err().is_empty());
let batch_1: usize = 100;
let batch_2: usize = 200;
let rt = get_runtime();
rt.spawn(async move {
for i in 0..batch_1 {
let tx_res = tx.send(i).await;
assert!(tx_res.is_ok());
}
for i in batch_1..(batch_1 + batch_2) {
assert!(tx.send(10 + i).await.is_ok());
tokio::time::sleep(Duration::from_millis(2)).await;
}

tokio::time::sleep(Duration::from_millis(200)).await;
assert!(tx.send(123).await.is_ok());
});
for _ in 0..(batch_1 + batch_2) {
match rx.recv() {
Ok(i) => {
debug!("recv {}", i);
}
Err(e) => {
panic!("error {}", e);
}
}
}

assert!(rx.recv_timeout(Duration::from_millis(100)).is_err());
assert!(rx.recv_timeout(Duration::from_millis(200)).is_ok());

let res = rx.recv();
assert!(res.is_err());
rt.shutdown_background(); // Prevent panic on runtime drop
}

#[rstest]
#[case(mpsc::bounded_tx_async_rx_blocking::<usize>(10), 8)]
#[case(mpsc::bounded_tx_async_rx_blocking::<usize>(10), 100)]
Expand Down
50 changes: 49 additions & 1 deletion src/tests/test_blocking_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use crate::*;
use log::*;
use rstest::*;
use std::sync::{
Arc,
atomic::{AtomicUsize, Ordering},
Arc,
};
use std::thread;
use std::time::*;
Expand Down Expand Up @@ -52,6 +52,54 @@ async fn test_basic_1_tx_blocking_1_rx_async<T: BlockingTxTrait<usize>, R: Async
let _ = th.join();
}

#[rstest]
#[case(spsc::bounded_tx_blocking_rx_async::<usize>(10))]
#[case(mpsc::bounded_tx_blocking_rx_async::<usize>(10))]
#[case(mpmc::bounded_tx_blocking_rx_async::<usize>(10))]
#[tokio::test]
async fn test_timeout_1_tx_blocking_1_rx_async<
T: BlockingTxTrait<usize>,
R: AsyncRxTrait<usize>,
>(
setup_log: (), #[case] channel: (T, R),
) {
let _ = setup_log; // Disable unused var warning
let (tx, rx) = channel;
let rx_res = rx.try_recv();
assert!(rx_res.is_err());
assert!(rx_res.unwrap_err().is_empty());
for i in 0usize..10 {
let tx_res = tx.send(i);
assert!(tx_res.is_ok());
}
let tx_res = tx.send_timeout(11, Duration::from_millis(100));
assert!(tx_res.is_err());
assert!(tx_res.unwrap_err().is_timeout());

let th = thread::spawn(move || {
assert!(tx.send_timeout(10, Duration::from_millis(100)).is_err());
assert!(tx.send_timeout(10, Duration::from_millis(200)).is_ok());
});

tokio::time::sleep(Duration::from_millis(200)).await;

for i in 0usize..11 {
match rx.recv().await {
Ok(j) => {
debug!("recv {}", i);
assert_eq!(i, j);
}
Err(e) => {
panic!("error {}", e);
}
}
}
let res = rx.recv().await;
assert!(res.is_err());
debug!("rx close");
let _ = th.join();
}

#[rstest]
#[case(spsc::bounded_tx_blocking_rx_async::<usize>(1))]
#[case(mpsc::bounded_tx_blocking_rx_async::<usize>(1))]
Expand Down
37 changes: 36 additions & 1 deletion src/tx.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use crate::channel::*;
use async_trait::async_trait;
use crossbeam::channel::Sender;
pub use crossbeam::channel::{SendError, TrySendError};
pub use crossbeam::channel::{SendError, SendTimeoutError, TrySendError};
use std::fmt;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;

/// Sender that works in blocking context
pub struct Tx<T> {
Expand Down Expand Up @@ -67,6 +68,25 @@ impl<T> Tx<T> {
}
}

/// Waits for a message to be sent into the channel, but only for a limited time.
/// Will block when channel is empty.
///
/// Returns `Ok(())` when successful.
///
/// Returns Err([SendTimeoutError::Timeout]) when the message could not be sent because the channel is full and the operation timed out.
///
/// Returns Err([SendTimeoutError::Disconnected]) when all Rx dropped.
#[inline]
pub fn send_timeout(&self, item: T, timeout: Duration) -> Result<(), SendTimeoutError<T>> {
match self.sender.send_timeout(item, timeout) {
Err(e) => return Err(e),
Ok(_) => {
self.shared.on_recv();
return Ok(());
}
}
}

/// Probe possible messages in the channel (not accurate)
#[inline]
pub fn len(&self) -> usize {
Expand Down Expand Up @@ -306,6 +326,16 @@ pub trait BlockingTxTrait<T: Send + 'static>: Send + 'static {
/// Returns Err([TrySendError::Disconnected]) when all Rx dropped.
fn try_send(&self, _item: T) -> Result<(), TrySendError<T>>;

/// Waits for a message to be sent into the channel, but only for a limited time.
/// Will block when channel is empty.
///
/// Returns `Ok(())` when successful.
///
/// Returns Err([SendTimeoutError::Timeout]) when the message could not be sent because the channel is full and the operation timed out.
///
/// Returns Err([SendTimeoutError::Disconnected]) when all Rx dropped.
fn send_timeout(&self, item: T, timeout: Duration) -> Result<(), SendTimeoutError<T>>;

/// Probe possible messages in the channel (not accurate)
fn len(&self) -> usize;

Expand All @@ -324,6 +354,11 @@ impl<T: Send + 'static> BlockingTxTrait<T> for Tx<T> {
Tx::try_send(self, item)
}

#[inline(always)]
fn send_timeout(&self, item: T, timeout: Duration) -> Result<(), SendTimeoutError<T>> {
Tx::send_timeout(&self, item, timeout)
}

#[inline(always)]
fn len(&self) -> usize {
Tx::len(self)
Expand Down