diff --git a/crossbeam-channel/src/channel.rs b/crossbeam-channel/src/channel.rs index 5447e3303..a9b614c36 100644 --- a/crossbeam-channel/src/channel.rs +++ b/crossbeam-channel/src/channel.rs @@ -10,7 +10,8 @@ use std::time::{Duration, Instant}; use crate::context::Context; use crate::counter; use crate::err::{ - RecvError, RecvTimeoutError, SendError, SendTimeoutError, TryRecvError, TrySendError, + ForceSendError, RecvError, RecvTimeoutError, SendError, SendTimeoutError, TryRecvError, + TrySendError, }; use crate::flavors; use crate::select::{Operation, SelectHandle, Token}; @@ -410,6 +411,54 @@ impl Sender { } } + /// Sends a message to a channel without blocking. It will only fail if the channel is disconnected. + /// + /// This method will either send a message into the channel immediately, + /// overwrite and return the last value in the channel or return an error if + /// the channel is full. The returned error contains the original message. + /// + /// If called on a zero-capacity channel, this method will send the message only if there + /// happens to be a pending receive operation on the other side of the channel at the same time, + /// otherwise it will return the value to the sender. + /// + /// ``` + /// use crossbeam_channel::{bounded, ForceSendError}; + /// + /// let (s, r) = bounded(3); + /// + /// assert_eq!(s.force_send(0), Ok(None)); + /// assert_eq!(s.force_send(1), Ok(None)); + /// assert_eq!(s.force_send(2), Ok(None)); + /// assert_eq!(s.force_send(3), Ok(Some(0))); + /// + /// assert_eq!(r.recv(), Ok(1)); + /// + /// assert_eq!(s.force_send(4), Ok(None)); + /// + /// assert_eq!(r.recv(), Ok(2)); + /// assert_eq!(r.recv(), Ok(3)); + /// + /// assert_eq!(s.force_send(5), Ok(None)); + /// assert_eq!(s.force_send(6), Ok(None)); + /// assert_eq!(s.force_send(7), Ok(Some(4))); + /// assert_eq!(s.force_send(8), Ok(Some(5))); + /// + /// assert_eq!(r.recv(), Ok(6)); + /// assert_eq!(r.recv(), Ok(7)); + /// assert_eq!(r.recv(), Ok(8)); + /// + /// drop(r); + /// + /// assert_eq!(s.force_send(9), Err(ForceSendError(9))); + /// `````` + pub fn force_send(&self, msg: T) -> Result, ForceSendError> { + match &self.flavor { + SenderFlavor::Array(chan) => chan.force_send(msg), + SenderFlavor::List(chan) => chan.force_send(msg), + SenderFlavor::Zero(chan) => chan.force_send(msg), + } + } + /// Blocks the current thread until a message is sent or the channel is disconnected. /// /// If the channel is full and not disconnected, this call will block until the send operation diff --git a/crossbeam-channel/src/err.rs b/crossbeam-channel/src/err.rs index 7306c46d5..f414ae48f 100644 --- a/crossbeam-channel/src/err.rs +++ b/crossbeam-channel/src/err.rs @@ -11,6 +11,16 @@ use std::fmt; #[derive(PartialEq, Eq, Clone, Copy)] pub struct SendError(pub T); +/// An error returned from the [`force_send`] method. +/// +/// The message could not be sent because the channel is disconnected. +/// +/// The error contains the message so it can be recovered. +/// +/// [`force_send`]: super::Sender::force_send +#[derive(PartialEq, Eq, Clone, Copy)] +pub struct ForceSendError(pub T); + /// An error returned from the [`try_send`] method. /// /// The error contains the message being sent so it can be recovered. @@ -209,6 +219,54 @@ impl TrySendError { } } +impl fmt::Debug for ForceSendError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match *self { + Self(..) => "ForceSendError(..)".fmt(f), + } + } +} + +impl fmt::Display for ForceSendError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match *self { + Self(..) => "sending on a disconnected channel".fmt(f), + } + } +} + +impl error::Error for ForceSendError {} + +impl From> for ForceSendError { + fn from(err: SendError) -> Self { + match err { + SendError(t) => Self(t), + } + } +} + +impl ForceSendError { + /// Unwraps the message. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_channel::bounded; + /// + /// let (s, r) = bounded(0); + /// drop(r); + /// + /// if let Err(err) = s.force_send("foo") { + /// assert_eq!(err.into_inner(), "foo"); + /// } + /// ``` + pub fn into_inner(self) -> T { + match self { + Self(v) => v, + } + } +} + impl fmt::Debug for SendTimeoutError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { "SendTimeoutError(..)".fmt(f) diff --git a/crossbeam-channel/src/flavors/array.rs b/crossbeam-channel/src/flavors/array.rs index 206a05a86..2a9039130 100644 --- a/crossbeam-channel/src/flavors/array.rs +++ b/crossbeam-channel/src/flavors/array.rs @@ -18,7 +18,7 @@ use std::time::Instant; use crossbeam_utils::{Backoff, CachePadded}; use crate::context::Context; -use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError}; +use crate::err::{ForceSendError, RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError}; use crate::select::{Operation, SelectHandle, Selected, Token}; use crate::waker::SyncWaker; @@ -325,6 +325,30 @@ impl Channel { } } + /// Force send a message into the channel. Only fails if the channel is disconnected + /// + /// Note that this is currently a naive implementation to make the sequential test pass + pub(crate) fn force_send(&self, msg: T) -> Result, ForceSendError> { + if self.is_disconnected() { + return Err(ForceSendError(msg)); + } + + let old_msg = if self.is_full() { + let old_msg = self.try_recv().ok(); + if old_msg.is_none() { + return Err(ForceSendError(msg)); + } + old_msg + } else { + None + }; + + self.try_send(msg) + .map_err(|e| ForceSendError(e.into_inner()))?; + + Ok(old_msg) + } + /// Sends a message into the channel. pub(crate) fn send( &self, diff --git a/crossbeam-channel/src/flavors/list.rs b/crossbeam-channel/src/flavors/list.rs index e86551ad2..8751d6daa 100644 --- a/crossbeam-channel/src/flavors/list.rs +++ b/crossbeam-channel/src/flavors/list.rs @@ -11,7 +11,7 @@ use std::time::Instant; use crossbeam_utils::{Backoff, CachePadded}; use crate::context::Context; -use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError}; +use crate::err::{ForceSendError, RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError}; use crate::select::{Operation, SelectHandle, Selected, Token}; use crate::waker::SyncWaker; @@ -417,6 +417,15 @@ impl Channel { }) } + /// Forces a send, failing only if the channel is disconnected + pub(crate) fn force_send(&self, msg: T) -> Result, ForceSendError> { + match self.send(msg, None) { + Ok(()) => Ok(None), + Err(SendTimeoutError::Disconnected(err)) => Err(ForceSendError(err)), + Err(SendTimeoutError::Timeout(_)) => unreachable!(), + } + } + /// Sends a message into the channel. pub(crate) fn send( &self, diff --git a/crossbeam-channel/src/flavors/zero.rs b/crossbeam-channel/src/flavors/zero.rs index 08d226f87..ae617b56f 100644 --- a/crossbeam-channel/src/flavors/zero.rs +++ b/crossbeam-channel/src/flavors/zero.rs @@ -13,7 +13,7 @@ use std::{fmt, ptr}; use crossbeam_utils::Backoff; use crate::context::Context; -use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError}; +use crate::err::{ForceSendError, RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError}; use crate::select::{Operation, SelectHandle, Selected, Token}; use crate::waker::Waker; @@ -216,6 +216,26 @@ impl Channel { } } + /// Send the message if a receiver is connected, otherwise returns it to the sender + pub(crate) fn force_send(&self, msg: T) -> Result, ForceSendError> { + let token = &mut Token::default(); + let mut inner = self.inner.lock().unwrap(); + + // If there's a waiting receiver, pair up with it. + if let Some(operation) = inner.receivers.try_select() { + token.zero.0 = operation.packet; + drop(inner); + unsafe { + self.write(token, msg).ok().unwrap(); + } + Ok(None) + } else if inner.is_disconnected { + Err(ForceSendError(msg)) + } else { + Ok(Some(msg)) + } + } + /// Sends a message into the channel. pub(crate) fn send( &self, diff --git a/crossbeam-channel/src/lib.rs b/crossbeam-channel/src/lib.rs index 35876c160..2e444265a 100644 --- a/crossbeam-channel/src/lib.rs +++ b/crossbeam-channel/src/lib.rs @@ -370,8 +370,8 @@ pub use crate::{ after, at, bounded, never, tick, unbounded, IntoIter, Iter, Receiver, Sender, TryIter, }, err::{ - ReadyTimeoutError, RecvError, RecvTimeoutError, SelectTimeoutError, SendError, - SendTimeoutError, TryReadyError, TryRecvError, TrySelectError, TrySendError, + ForceSendError, ReadyTimeoutError, RecvError, RecvTimeoutError, SelectTimeoutError, + SendError, SendTimeoutError, TryReadyError, TryRecvError, TrySelectError, TrySendError, }, select::{Select, SelectedOperation}, };