Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "crossfire"
version = "2.0.0"
authors = ["plan <frostyplanet@gmail.com>"]
edition = "2024"
edition = "2021"
license = "Apache-2.0"
homepage = "https://github.com/frostyplanet/crossfire-rs"
readme = "README.md"
Expand All @@ -11,7 +11,7 @@ documentation = "https://docs.rs/crossfire"
keywords = ["async", "non-blocking", "lock-free", "channel"]
categories = ["concurrency", "data-structures"]
exclude = ["/ci/*", "/bors.toml"]
description = "channels for async and threads"
Comment thread
zach-schoenberger marked this conversation as resolved.
rust-version = "1.61"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

Expand All @@ -23,7 +23,7 @@ async-trait = "0"

[dev-dependencies]
tokio = { version = "1", features = ["time", "sync", "rt-multi-thread", "rt", "macros"] }
rand = "0.7.3"
rand = "0.9"
rstest = "0"
log = "0"
captains-log = "0"
Expand Down
7 changes: 7 additions & 0 deletions src/m_rx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ use crate::rx::*;
use crate::stream::AsyncStream;
use async_trait::async_trait;
use crossbeam::channel::Receiver;
use crossbeam::channel::RecvTimeoutError;
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use std::time::Duration;

/// Receiver that works in blocking context. MC version of [`Rx<T>`] implements [Clone].
///
Expand Down Expand Up @@ -99,6 +101,11 @@ impl<T: Send + 'static> BlockingRxTrait<T> for MRx<T> {
self.0.try_recv()
}

#[inline(always)]
fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
self.0.recv_timeout(timeout)
}

/// Probe possible messages in the channel (not accurate)
#[inline(always)]
fn len(&self) -> usize {
Expand Down
7 changes: 7 additions & 0 deletions src/m_tx.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use crate::channel::*;
use crate::tx::*;
use async_trait::async_trait;
use crossbeam::channel::SendTimeoutError;
use crossbeam::channel::Sender;
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use std::time::Duration;

/// Sender that works in blocking context. MP version of [`Tx<T>`] implements [Clone].
///
Expand Down Expand Up @@ -90,6 +92,11 @@ impl<T: Send + 'static> BlockingTxTrait<T> for MTx<T> {
self.0.try_send(item)
}

#[inline(always)]
fn send_timeout(&self, item: T, timeout: Duration) -> Result<(), SendTimeoutError<T>> {
self.0.send_timeout(item, timeout)
}

#[inline(always)]
fn len(&self) -> usize {
self.0.len()
Expand Down
36 changes: 36 additions & 0 deletions src/rx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ use crate::m_rx::*;
use crate::stream::AsyncStream;
use async_trait::async_trait;
use crossbeam::channel::Receiver;
use crossbeam::channel::RecvTimeoutError;
pub use crossbeam::channel::{RecvError, TryRecvError};
use std::fmt;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;

/// Receiver that works in blocking context
pub struct Rx<T> {
Expand Down Expand Up @@ -68,6 +70,25 @@ impl<T> Rx<T> {
}
}

/// Waits for a message to be received from the channel, but only for a limited time.
/// Will block when channel is empty.
///
/// Returns Ok(T) when successful.
///
/// Returns Err([TryRecvError::Empty]) when channel is empty.
///
/// returns Err([TryRecvError::Disconnected]) when all Tx dropped.
#[inline]
pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
match self.recv.recv_timeout(timeout) {
Err(e) => return Err(e),
Ok(i) => {
self.shared.on_recv();
return Ok(i);
}
}
}

/// Probe possible messages in the channel (not accurate)
#[inline]
pub fn len(&self) -> usize {
Expand Down Expand Up @@ -327,6 +348,16 @@ pub trait BlockingRxTrait<T: Send + 'static>: Send + 'static {
/// Returns Err([TryRecvError::Disconnected]) when all Tx dropped.
fn try_recv(&self) -> Result<T, TryRecvError>;

/// Waits for a message to be received from the channel, but only for a limited time.
/// Will block when channel is empty.
///
/// Returns Ok(T) when successful.
///
/// Returns Err([TryRecvError::Empty]) when channel is empty.
///
/// returns Err([TryRecvError::Disconnected]) when all Tx dropped.
fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError>;

/// Probe possible messages in the channel (not accurate)
fn len(&self) -> usize;

Expand All @@ -345,6 +376,11 @@ impl<T: Send + 'static> BlockingRxTrait<T> for Rx<T> {
Rx::try_recv(self)
}

#[inline(always)]
fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
Rx::recv_timeout(self, timeout)
}

#[inline(always)]
fn len(&self) -> usize {
Rx::len(self)
Expand Down
37 changes: 36 additions & 1 deletion src/tx.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use crate::channel::*;
use async_trait::async_trait;
use crossbeam::channel::Sender;
pub use crossbeam::channel::{SendError, TrySendError};
use crossbeam::channel::{SendTimeoutError, Sender};
use std::fmt;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;

/// Sender that works in blocking context
pub struct Tx<T> {
Expand Down Expand Up @@ -67,6 +68,25 @@ impl<T> Tx<T> {
}
}

/// Waits for a message to be sent into the channel, but only for a limited time.
/// Will block when channel is empty.
///
/// Returns `Ok(())` when successful.
///
/// Returns Err([TrySendError::Full]) on channel full for bounded channel.
///
/// Returns Err([TrySendError::Disconnected]) when all Rx dropped.
#[inline]
pub fn send_timeout(&self, item: T, timeout: Duration) -> Result<(), SendTimeoutError<T>> {
match self.sender.send_timeout(item, timeout) {
Err(e) => return Err(e),
Ok(_) => {
self.shared.on_recv();
return Ok(());
}
}
}

/// Probe possible messages in the channel (not accurate)
#[inline]
pub fn len(&self) -> usize {
Expand Down Expand Up @@ -306,6 +326,16 @@ pub trait BlockingTxTrait<T: Send + 'static>: Send + 'static {
/// Returns Err([TrySendError::Disconnected]) when all Rx dropped.
fn try_send(&self, _item: T) -> Result<(), TrySendError<T>>;

/// Waits for a message to be sent into the channel, but only for a limited time.
/// Will block when channel is empty.
///
/// Returns `Ok(())` when successful.
///
/// Returns Err([TrySendError::Full]) on channel full for bounded channel.
///
/// Returns Err([TrySendError::Disconnected]) when all Rx dropped.
fn send_timeout(&self, item: T, timeout: Duration) -> Result<(), SendTimeoutError<T>>;

/// Probe possible messages in the channel (not accurate)
fn len(&self) -> usize;

Expand All @@ -324,6 +354,11 @@ impl<T: Send + 'static> BlockingTxTrait<T> for Tx<T> {
Tx::try_send(self, item)
}

#[inline(always)]
fn send_timeout(&self, item: T, timeout: Duration) -> Result<(), SendTimeoutError<T>> {
Tx::send_timeout(&self, item, timeout)
}

#[inline(always)]
fn len(&self) -> usize {
Tx::len(self)
Expand Down