Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 41 additions & 1 deletion crossbeam-channel/src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ use std::time::{Duration, Instant};
use crate::context::Context;
use crate::counter;
use crate::err::{
RecvError, RecvTimeoutError, SendError, SendTimeoutError, TryRecvError, TrySendError,
ForceSendError, RecvError, RecvTimeoutError, SendError, SendTimeoutError, TryRecvError,
TrySendError,
};
use crate::flavors;
use crate::select::{Operation, SelectHandle, Token};
Expand Down Expand Up @@ -410,6 +411,45 @@ impl<T> Sender<T> {
}
}

/// Sends a message to a channel without blocking. It will only fail if the channel is disconnected.
///
/// This method will either send a message into the channel immediately,
/// overwrite and return the last value in the channel or return an error if
/// the channel is full. The returned error contains the original message.
///
/// If called on a zero-capacity channel, this method will send the message only if there
/// happens to be a receive operation on the other side of the channel at the same time.
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not clear from the description what would happen if there is no receive operation. Would the call fail (despite what the start of the comment says)? Would the data be returned to the caller?

///
/// ```
/// use crossbeam_channel::{bounded, ForceSendError};
///
/// let (s, r) = bounded(3);
/// assert_eq!(s.force_send(0), Ok(None));
/// assert_eq!(s.force_send(1), Ok(None));
/// assert_eq!(s.force_send(2), Ok(None));
/// assert_eq!(s.force_send(3), Ok(Some(2)));
/// assert_eq!(r.recv(), Ok(0));
/// assert_eq!(s.force_send(4), Ok(None));
/// assert_eq!(r.recv(), Ok(1));
/// assert_eq!(r.recv(), Ok(3));
/// assert_eq!(s.force_send(5), Ok(None));
/// assert_eq!(s.force_send(6), Ok(None));
/// assert_eq!(s.force_send(7), Ok(Some(6)));
/// assert_eq!(s.force_send(8), Ok(Some(7)));
/// assert_eq!(r.recv(), Ok(4));
/// assert_eq!(r.recv(), Ok(5));
/// assert_eq!(r.recv(), Ok(8));
/// drop(r);
/// assert_eq!(s.force_send(9), Err(ForceSendError::Disconnected(9)));
/// ``````
pub fn force_send(&self, msg: T) -> Result<Option<T>, ForceSendError<T>> {
match &self.flavor {
SenderFlavor::Array(chan) => chan.force_send(msg),
SenderFlavor::List(chan) => chan.force_send(msg),
SenderFlavor::Zero(chan) => chan.force_send(msg),
}
}

/// Blocks the current thread until a message is sent or the channel is disconnected.
///
/// If the channel is full and not disconnected, this call will block until the send operation
Expand Down
64 changes: 64 additions & 0 deletions crossbeam-channel/src/err.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,17 @@ use std::fmt;
#[derive(PartialEq, Eq, Clone, Copy)]
pub struct SendError<T>(pub T);

/// An error returned from the [`force_send`] method.
///
/// The error contains the message being sent so it can be recovered.
///
/// [`force_send`]: super::Sender::force_send
#[derive(PartialEq, Eq, Clone, Copy)]
pub enum ForceSendError<T> {
Comment thread
bergkvist marked this conversation as resolved.
Outdated
/// The message could not be sent because the channel is disconnected.
Disconnected(T),
}

/// An error returned from the [`try_send`] method.
///
/// The error contains the message being sent so it can be recovered.
Expand Down Expand Up @@ -209,6 +220,59 @@ impl<T> TrySendError<T> {
}
}

impl<T> fmt::Debug for ForceSendError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
Self::Disconnected(..) => "Disconnected(..)".fmt(f),
}
}
}

impl<T> fmt::Display for ForceSendError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
Self::Disconnected(..) => "sending on a disconnected channel".fmt(f),
}
}
}

impl<T: Send> error::Error for ForceSendError<T> {}

impl<T> From<SendError<T>> for ForceSendError<T> {
fn from(err: SendError<T>) -> Self {
match err {
SendError(t) => Self::Disconnected(t),
}
}
}

impl<T> ForceSendError<T> {
/// Unwraps the message.
///
/// # Examples
///
/// ```
/// use crossbeam_channel::bounded;
///
/// let (s, r) = bounded(0);
/// drop(r);
///
/// if let Err(err) = s.force_send("foo") {
/// assert_eq!(err.into_inner(), "foo");
/// }
/// ```
pub fn into_inner(self) -> T {
match self {
Self::Disconnected(v) => v,
}
}

/// Returns `true` if the send operation failed because the channel is disconnected.
pub fn is_disconnected(&self) -> bool {
matches!(self, Self::Disconnected(_))
}
}

impl<T> fmt::Debug for SendTimeoutError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
"SendTimeoutError(..)".fmt(f)
Expand Down
23 changes: 22 additions & 1 deletion crossbeam-channel/src/flavors/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::time::Instant;
use crossbeam_utils::{Backoff, CachePadded};

use crate::context::Context;
use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
use crate::err::{ForceSendError, RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
use crate::select::{Operation, SelectHandle, Selected, Token};
use crate::waker::SyncWaker;

Expand Down Expand Up @@ -325,6 +325,27 @@ impl<T> Channel<T> {
}
}

/// Force send a message into the channel. Only fails if the channel is disconnected
pub(crate) fn force_send(&self, mut msg: T) -> Result<Option<T>, ForceSendError<T>> {
let mut token = Token::default();
if self.start_send(&mut token) {
match unsafe { self.write(&mut token, msg) } {
Ok(()) => Ok(None),
Err(msg) => Err(ForceSendError::Disconnected(msg)),
}
} else {
let tail = self.tail.load(Ordering::Acquire);
let prev_index = match tail & (self.mark_bit - 1) {
0 => self.cap() - 1,
x => x - 1,
};
let queued_msg =
unsafe { (*self.buffer.get_unchecked(prev_index).msg.get()).assume_init_mut() };
std::mem::swap(&mut msg, queued_msg);
Ok(Some(msg))
}
}

/// Sends a message into the channel.
pub(crate) fn send(
&self,
Expand Down
11 changes: 10 additions & 1 deletion crossbeam-channel/src/flavors/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::time::Instant;
use crossbeam_utils::{Backoff, CachePadded};

use crate::context::Context;
use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
use crate::err::{ForceSendError, RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
use crate::select::{Operation, SelectHandle, Selected, Token};
use crate::waker::SyncWaker;

Expand Down Expand Up @@ -417,6 +417,15 @@ impl<T> Channel<T> {
})
}

/// Forces a send, failing only if the channel is disconnected
pub(crate) fn force_send(&self, msg: T) -> Result<Option<T>, ForceSendError<T>> {
match self.send(msg, None) {
Ok(()) => Ok(None),
Err(SendTimeoutError::Disconnected(err)) => Err(ForceSendError::Disconnected(err)),
Err(SendTimeoutError::Timeout(_)) => unreachable!(),
}
}

/// Sends a message into the channel.
pub(crate) fn send(
&self,
Expand Down
22 changes: 21 additions & 1 deletion crossbeam-channel/src/flavors/zero.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::{fmt, ptr};
use crossbeam_utils::Backoff;

use crate::context::Context;
use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
use crate::err::{ForceSendError, RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
use crate::select::{Operation, SelectHandle, Selected, Token};
use crate::waker::Waker;

Expand Down Expand Up @@ -216,6 +216,26 @@ impl<T> Channel<T> {
}
}

/// Send the message if a receiver is connected, otherwise returns it to the sender
pub(crate) fn force_send(&self, msg: T) -> Result<Option<T>, ForceSendError<T>> {
let token = &mut Token::default();
let mut inner = self.inner.lock().unwrap();

// If there's a waiting receiver, pair up with it.
if let Some(operation) = inner.receivers.try_select() {
token.zero.0 = operation.packet;
drop(inner);
unsafe {
self.write(token, msg).ok().unwrap();
}
Ok(None)
} else if inner.is_disconnected {
Err(ForceSendError::Disconnected(msg))
} else {
Ok(Some(msg))
}
}

/// Sends a message into the channel.
pub(crate) fn send(
&self,
Expand Down
4 changes: 2 additions & 2 deletions crossbeam-channel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,8 +370,8 @@ pub use crate::{
after, at, bounded, never, tick, unbounded, IntoIter, Iter, Receiver, Sender, TryIter,
},
err::{
ReadyTimeoutError, RecvError, RecvTimeoutError, SelectTimeoutError, SendError,
SendTimeoutError, TryReadyError, TryRecvError, TrySelectError, TrySendError,
ForceSendError, ReadyTimeoutError, RecvError, RecvTimeoutError, SelectTimeoutError,
SendError, SendTimeoutError, TryReadyError, TryRecvError, TrySelectError, TrySendError,
},
select::{Select, SelectedOperation},
};