Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 50 additions & 1 deletion crossbeam-channel/src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ use std::time::{Duration, Instant};
use crate::context::Context;
use crate::counter;
use crate::err::{
RecvError, RecvTimeoutError, SendError, SendTimeoutError, TryRecvError, TrySendError,
ForceSendError, RecvError, RecvTimeoutError, SendError, SendTimeoutError, TryRecvError,
TrySendError,
};
use crate::flavors;
use crate::select::{Operation, SelectHandle, Token};
Expand Down Expand Up @@ -410,6 +411,54 @@ impl<T> Sender<T> {
}
}

/// Sends a message to a channel without blocking. It will only fail if the channel is disconnected.
///
/// This method will either send a message into the channel immediately,
/// overwrite and return the last value in the channel or return an error if
/// the channel is full. The returned error contains the original message.
///
/// If called on a zero-capacity channel, this method will send the message only if there
/// happens to be a pending receive operation on the other side of the channel at the same time,
/// otherwise it will return the value to the sender.
///
/// ```
/// use crossbeam_channel::{bounded, ForceSendError};
///
/// let (s, r) = bounded(3);
///
/// assert_eq!(s.force_send(0), Ok(None));
/// assert_eq!(s.force_send(1), Ok(None));
/// assert_eq!(s.force_send(2), Ok(None));
/// assert_eq!(s.force_send(3), Ok(Some(0)));
///
/// assert_eq!(r.recv(), Ok(1));
///
/// assert_eq!(s.force_send(4), Ok(None));
///
/// assert_eq!(r.recv(), Ok(2));
/// assert_eq!(r.recv(), Ok(3));
///
/// assert_eq!(s.force_send(5), Ok(None));
/// assert_eq!(s.force_send(6), Ok(None));
/// assert_eq!(s.force_send(7), Ok(Some(4)));
/// assert_eq!(s.force_send(8), Ok(Some(5)));
///
/// assert_eq!(r.recv(), Ok(6));
/// assert_eq!(r.recv(), Ok(7));
/// assert_eq!(r.recv(), Ok(8));
///
/// drop(r);
///
/// assert_eq!(s.force_send(9), Err(ForceSendError(9)));
/// ``````
pub fn force_send(&self, msg: T) -> Result<Option<T>, ForceSendError<T>> {
match &self.flavor {
SenderFlavor::Array(chan) => chan.force_send(msg),
SenderFlavor::List(chan) => chan.force_send(msg),
SenderFlavor::Zero(chan) => chan.force_send(msg),
}
}

/// Blocks the current thread until a message is sent or the channel is disconnected.
///
/// If the channel is full and not disconnected, this call will block until the send operation
Expand Down
58 changes: 58 additions & 0 deletions crossbeam-channel/src/err.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,16 @@ use std::fmt;
#[derive(PartialEq, Eq, Clone, Copy)]
pub struct SendError<T>(pub T);

/// An error returned from the [`force_send`] method.
///
/// The message could not be sent because the channel is disconnected.
///
/// The error contains the message so it can be recovered.
///
/// [`force_send`]: super::Sender::force_send
#[derive(PartialEq, Eq, Clone, Copy)]
pub struct ForceSendError<T>(pub T);

/// An error returned from the [`try_send`] method.
///
/// The error contains the message being sent so it can be recovered.
Expand Down Expand Up @@ -209,6 +219,54 @@ impl<T> TrySendError<T> {
}
}

impl<T> fmt::Debug for ForceSendError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
Self(..) => "ForceSendError(..)".fmt(f),
}
}
}

impl<T> fmt::Display for ForceSendError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
Self(..) => "sending on a disconnected channel".fmt(f),
}
}
}

impl<T: Send> error::Error for ForceSendError<T> {}

impl<T> From<SendError<T>> for ForceSendError<T> {
fn from(err: SendError<T>) -> Self {
match err {
SendError(t) => Self(t),
}
}
}

impl<T> ForceSendError<T> {
/// Unwraps the message.
///
/// # Examples
///
/// ```
/// use crossbeam_channel::bounded;
///
/// let (s, r) = bounded(0);
/// drop(r);
///
/// if let Err(err) = s.force_send("foo") {
/// assert_eq!(err.into_inner(), "foo");
/// }
/// ```
pub fn into_inner(self) -> T {
match self {
Self(v) => v,
}
}
}

impl<T> fmt::Debug for SendTimeoutError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
"SendTimeoutError(..)".fmt(f)
Expand Down
26 changes: 25 additions & 1 deletion crossbeam-channel/src/flavors/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::time::Instant;
use crossbeam_utils::{Backoff, CachePadded};

use crate::context::Context;
use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
use crate::err::{ForceSendError, RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
use crate::select::{Operation, SelectHandle, Selected, Token};
use crate::waker::SyncWaker;

Expand Down Expand Up @@ -325,6 +325,30 @@ impl<T> Channel<T> {
}
}

/// Force send a message into the channel. Only fails if the channel is disconnected
///
/// Note that this is currently a naive implementation to make the sequential test pass
pub(crate) fn force_send(&self, msg: T) -> Result<Option<T>, ForceSendError<T>> {
if self.is_disconnected() {
return Err(ForceSendError(msg));
}

let old_msg = if self.is_full() {
let old_msg = self.try_recv().ok();
if old_msg.is_none() {
return Err(ForceSendError(msg));
}
old_msg
} else {
None
};

self.try_send(msg)
.map_err(|e| ForceSendError(e.into_inner()))?;

Ok(old_msg)
}

/// Sends a message into the channel.
pub(crate) fn send(
&self,
Expand Down
11 changes: 10 additions & 1 deletion crossbeam-channel/src/flavors/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::time::Instant;
use crossbeam_utils::{Backoff, CachePadded};

use crate::context::Context;
use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
use crate::err::{ForceSendError, RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
use crate::select::{Operation, SelectHandle, Selected, Token};
use crate::waker::SyncWaker;

Expand Down Expand Up @@ -417,6 +417,15 @@ impl<T> Channel<T> {
})
}

/// Forces a send, failing only if the channel is disconnected
pub(crate) fn force_send(&self, msg: T) -> Result<Option<T>, ForceSendError<T>> {
match self.send(msg, None) {
Ok(()) => Ok(None),
Err(SendTimeoutError::Disconnected(err)) => Err(ForceSendError(err)),
Err(SendTimeoutError::Timeout(_)) => unreachable!(),
}
}

/// Sends a message into the channel.
pub(crate) fn send(
&self,
Expand Down
22 changes: 21 additions & 1 deletion crossbeam-channel/src/flavors/zero.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::{fmt, ptr};
use crossbeam_utils::Backoff;

use crate::context::Context;
use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
use crate::err::{ForceSendError, RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
use crate::select::{Operation, SelectHandle, Selected, Token};
use crate::waker::Waker;

Expand Down Expand Up @@ -216,6 +216,26 @@ impl<T> Channel<T> {
}
}

/// Send the message if a receiver is connected, otherwise returns it to the sender
pub(crate) fn force_send(&self, msg: T) -> Result<Option<T>, ForceSendError<T>> {
let token = &mut Token::default();
let mut inner = self.inner.lock().unwrap();

// If there's a waiting receiver, pair up with it.
if let Some(operation) = inner.receivers.try_select() {
token.zero.0 = operation.packet;
drop(inner);
unsafe {
self.write(token, msg).ok().unwrap();
}
Ok(None)
} else if inner.is_disconnected {
Err(ForceSendError(msg))
} else {
Ok(Some(msg))
}
}

/// Sends a message into the channel.
pub(crate) fn send(
&self,
Expand Down
4 changes: 2 additions & 2 deletions crossbeam-channel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,8 +370,8 @@ pub use crate::{
after, at, bounded, never, tick, unbounded, IntoIter, Iter, Receiver, Sender, TryIter,
},
err::{
ReadyTimeoutError, RecvError, RecvTimeoutError, SelectTimeoutError, SendError,
SendTimeoutError, TryReadyError, TryRecvError, TrySelectError, TrySendError,
ForceSendError, ReadyTimeoutError, RecvError, RecvTimeoutError, SelectTimeoutError,
SendError, SendTimeoutError, TryReadyError, TryRecvError, TrySelectError, TrySendError,
},
select::{Select, SelectedOperation},
};