diff --git a/actor/src/benches/mailbox.rs b/actor/src/benches/mailbox.rs index f52d4e3e656..caa0c5f6028 100644 --- a/actor/src/benches/mailbox.rs +++ b/actor/src/benches/mailbox.rs @@ -1,4 +1,4 @@ -use commonware_actor::{mailbox, Feedback}; +use commonware_actor::{mailbox, Feedback, Unreliable}; use commonware_runtime::{ telemetry::metrics::{Metric, Registered, Registration}, Metrics, Name, Supervisor, @@ -51,10 +51,10 @@ impl Metrics for NoopMetrics { } } -fn new( +fn new( capacity: std::num::NonZeroUsize, -) -> (mailbox::Sender, mailbox::Receiver) { - mailbox::new(NoopMetrics, capacity) +) -> (mailbox::UnreliableSender, mailbox::UnreliableReceiver) { + mailbox::new_unreliable(NoopMetrics, capacity) } #[derive(Clone, Copy, Debug, PartialEq, Eq)] @@ -89,7 +89,7 @@ impl Message { } } -impl mailbox::Policy for Message { +impl mailbox::UnreliablePolicy for Message { type Overflow = VecDeque; fn handle(overflow: &mut VecDeque, message: Self) -> bool { @@ -125,7 +125,7 @@ fn bench_enqueue_ready(c: &mut Criterion) { |(sender, _receiver)| { for _ in 0..MESSAGES { let result = sender.enqueue(black_box(Message::drop())); - assert_eq!(result, Feedback::Ok); + assert_eq!(result, Unreliable::new(Feedback::Ok)); black_box(result); } }, @@ -145,7 +145,10 @@ fn bench_try_recv_ready(c: &mut Criterion) { || { let (sender, receiver) = new::(NZUsize!(CAPACITY)); for _ in 0..MESSAGES { - assert_eq!(sender.enqueue(Message::drop()), Feedback::Ok); + assert_eq!( + sender.enqueue(Message::drop()), + Unreliable::new(Feedback::Ok) + ); } receiver }, @@ -170,10 +173,16 @@ fn bench_try_recv_overflow(c: &mut Criterion) { || { let (sender, receiver) = new::(NZUsize!(CAPACITY)); for _ in 0..CAPACITY { - assert_eq!(sender.enqueue(Message::drop()), Feedback::Ok); + assert_eq!( + sender.enqueue(Message::drop()), + Unreliable::new(Feedback::Ok) + ); } for _ in 0..MESSAGES { - assert_eq!(sender.enqueue(Message::spill()), Feedback::Backoff); + assert_eq!( + sender.enqueue(Message::spill()), + Unreliable::new(Feedback::Backoff) + ); } receiver }, @@ -199,7 +208,7 @@ fn bench_round_trip_ready(c: &mut Criterion) { |(sender, mut receiver)| { for _ in 0..MESSAGES { let result = sender.enqueue(black_box(Message::drop())); - assert_eq!(result, Feedback::Ok); + assert_eq!(result, Unreliable::new(Feedback::Ok)); black_box(result); black_box(receiver.try_recv().unwrap()); } @@ -231,7 +240,7 @@ fn bench_recv_waiting(c: &mut Criterion) { .await; let result = sender.enqueue(Message::drop()); - assert_eq!(result, Feedback::Ok); + assert_eq!(result, Unreliable::new(Feedback::Ok)); black_box(result); black_box(next.await.unwrap()); } @@ -252,13 +261,16 @@ fn bench_overflow_drop(c: &mut Criterion) { b.iter_batched( || { let (sender, receiver) = new::(NZUsize!(1)); - assert_eq!(sender.enqueue(Message::drop()), Feedback::Ok); + assert_eq!( + sender.enqueue(Message::drop()), + Unreliable::new(Feedback::Ok) + ); (sender, receiver) }, |(sender, _receiver)| { for _ in 0..MESSAGES { let result = sender.enqueue(black_box(Message::drop())); - assert_eq!(result, Feedback::Rejected); + assert_eq!(result, Unreliable::Rejected); black_box(result); } }, @@ -277,13 +289,16 @@ fn bench_overflow_spill(c: &mut Criterion) { b.iter_batched( || { let (sender, receiver) = new::(NZUsize!(1)); - assert_eq!(sender.enqueue(Message::drop()), Feedback::Ok); + assert_eq!( + sender.enqueue(Message::drop()), + Unreliable::new(Feedback::Ok) + ); (sender, receiver) }, |(sender, _receiver)| { for _ in 0..MESSAGES { let result = sender.enqueue(black_box(Message::spill())); - assert_eq!(result, Feedback::Backoff); + assert_eq!(result, Unreliable::new(Feedback::Backoff)); black_box(result); } }, @@ -294,17 +309,31 @@ fn bench_overflow_spill(c: &mut Criterion) { group.finish(); } -fn replace_queue(newest: bool) -> (mailbox::Sender, mailbox::Receiver) { +fn replace_queue( + newest: bool, +) -> ( + mailbox::UnreliableSender, + mailbox::UnreliableReceiver, +) { let (sender, receiver) = new::(NZUsize!(REPLACE_CAPACITY)); for _ in 0..REPLACE_CAPACITY { - assert_eq!(sender.enqueue(Message::drop()), Feedback::Ok); + assert_eq!( + sender.enqueue(Message::drop()), + Unreliable::new(Feedback::Ok) + ); } - assert_eq!(sender.enqueue(Message::replace()), Feedback::Backoff); + assert_eq!( + sender.enqueue(Message::replace()), + Unreliable::new(Feedback::Backoff) + ); if !newest { for _ in 1..REPLACE_CAPACITY { - assert_eq!(sender.enqueue(Message::spill()), Feedback::Backoff); + assert_eq!( + sender.enqueue(Message::spill()), + Unreliable::new(Feedback::Backoff) + ); } } @@ -324,7 +353,7 @@ fn bench_overflow_replace(c: &mut Criterion) { |(sender, _receiver)| { for _ in 0..MESSAGES { let result = sender.enqueue(black_box(Message::replace())); - assert_eq!(result, Feedback::Backoff); + assert_eq!(result, Unreliable::new(Feedback::Backoff)); black_box(result); } }, @@ -352,7 +381,7 @@ fn bench_concurrent_enqueue(c: &mut Criterion) { scope.spawn(move || { for _ in 0..PRODUCER_MESSAGES { let result = sender.enqueue(Message::drop()); - assert_eq!(result, Feedback::Ok); + assert_eq!(result, Unreliable::new(Feedback::Ok)); black_box(result); } }); diff --git a/actor/src/lib.rs b/actor/src/lib.rs index 795fe344bf8..3195779ab3c 100644 --- a/actor/src/lib.rs +++ b/actor/src/lib.rs @@ -17,8 +17,6 @@ commonware_macros::stability_scope!(BETA { Ok, /// The submission exceeded configured capacity but was handled by the overflow policy. Backoff, - /// The submission exceeded configured capacity and was rejected by the overflow policy. - Rejected, /// The endpoint is closed. Closed, } @@ -30,5 +28,49 @@ commonware_macros::stability_scope!(BETA { } } + /// Feedback from endpoints that may reject work under backpressure. + #[derive(Clone, Copy, Debug, PartialEq, Eq)] + pub enum Unreliable { + /// Endpoint outcome from the submission attempt. + Outcome(T), + /// The work was rejected by the endpoint. + Rejected, + } + + impl Unreliable { + /// Wrap an outcome for an operation that may reject work. + pub const fn new(outcome: T) -> Self { + Self::Outcome(outcome) + } + + /// Create a rejected result. + pub const fn rejected() -> Self { + Self::Rejected + } + + /// Returns `true` when the operation was rejected before producing an outcome. + pub const fn is_rejected(&self) -> bool { + matches!(self, Self::Rejected) + } + + /// Returns the outcome when the operation was not rejected. + pub fn outcome(self) -> Option { + match self { + Self::Outcome(outcome) => Some(outcome), + Self::Rejected => None, + } + } + } + + impl Unreliable { + /// Returns `true` when the endpoint handled the submission. + pub const fn accepted(self) -> bool { + match self { + Self::Outcome(feedback) => feedback.accepted(), + Self::Rejected => false, + } + } + } + pub mod mailbox; }); diff --git a/actor/src/mailbox.rs b/actor/src/mailbox.rs index 07db962651e..0471bae2aac 100644 --- a/actor/src/mailbox.rs +++ b/actor/src/mailbox.rs @@ -5,7 +5,8 @@ //! The mailbox is split into two queues: a bounded `ready` queue //! that producers push to and the receiver pops from, and an unbounded //! `overflow` queue that holds messages displaced when ready is full. A -//! [`Policy`] decides how overflow is updated when overflow is contended. +//! [`Policy`] or [`UnreliablePolicy`] decides how overflow is updated when +//! overflow is contended. //! //! ```text //! senders @@ -39,7 +40,7 @@ //! Enqueue calls from the same sender will be delivered in order. Concurrent enqueue calls, //! however, are not globally ordered and may be observed in any interleaving. -use crate::Feedback; +use crate::{Feedback, Unreliable}; use commonware_runtime::{ telemetry::metrics::{Counter, MetricsExt as _}, Metrics, @@ -48,6 +49,7 @@ use std::{ collections::VecDeque, fmt, future::poll_fn, + marker::PhantomData, num::NonZeroUsize, sync::mpsc::TryRecvError, task::{Context, Poll}, @@ -91,17 +93,39 @@ pub trait Policy: Sized { /// Overflow storage used by this policy. type Overflow: Overflow; - /// Handle `message` when it cannot enter the bounded ready queue immediately. + /// Reliably handle `message` when it cannot enter the bounded ready queue immediately. /// - /// Returns `true` when the policy considered the message's effects. This includes - /// retaining the message, coalescing it with retained work, replacing older retained work, - /// or deliberately doing no work because the message is already satisfied, superseded, or no - /// longer needed (for example, a request whose response channel is already closed). These - /// no-op cases are still handled: there is no remaining work, so there is no loss to report. + /// This may retain the message, coalesce it with retained work, replace older retained work, + /// or deliberately do no work because the message is already satisfied, superseded, or no + /// longer needed (for example, a request whose response channel is already closed). /// - /// Returns `false` only when the policy rejects the message under backpressure. This is the - /// lossy case: the submitted work was not semantically handled, and callers that care should - /// retry or treat the submission as failed. + /// # Warning + /// + /// Do not enqueue into the same mailbox from this method or from destructors triggered by + /// editing `overflow`. This method runs while the mailbox holds its overflow lock, so same + /// mailbox re-entry can deadlock. + /// + /// This method should not unwind after mutating `overflow`. A panic, including one from a + /// destructor triggered while editing `overflow`, can leave retained overflow data stranded in + /// the mailbox. + fn handle(overflow: &mut Self::Overflow, message: Self); +} + +/// Overflow behavior for actor messages that can be rejected when an inbox is full. +pub trait UnreliablePolicy: Sized { + /// Overflow storage used by this policy. + type Overflow: Overflow; + + /// Unreliably handle `message` when it cannot enter the bounded ready queue immediately. + /// + /// Returns `true` when the policy considered the message's effects. This includes retaining + /// the message, coalescing it with retained work, replacing older retained work, or deliberately + /// doing no work because the message is already satisfied, superseded, or no longer needed. + /// + /// Returns `false` only when the policy rejects the message under backpressure without + /// retaining, coalescing, replacing, or otherwise handling it. This is the unreliable case: the + /// submitted work was not semantically handled, and callers that care should retry or treat the + /// submission as failed. /// /// # Warning /// @@ -115,6 +139,247 @@ pub trait Policy: Sized { fn handle(overflow: &mut Self::Overflow, message: Self) -> bool; } +// Marker types that select the mailbox overflow policy. +mod mode { + /// Uses a policy that always handles overflow messages. + pub(super) struct Reliable; + + /// Uses a policy that may reject overflow messages. + pub(super) struct Unreliable; +} + +trait Mode: Sized { + /// Overflow storage used by this mode. + type Overflow: Overflow; + /// Feedback returned from enqueue attempts. + type Feedback; + + /// Updates overflow for a full inbox and reports whether the message was handled. + fn handle(overflow: &mut Self::Overflow, message: T) -> bool; + /// Maps ready-path feedback into this mode's feedback type. + fn ready_feedback(feedback: Feedback) -> Self::Feedback; + /// Maps overflow handling into this mode's feedback type. + fn overflow_feedback(handled: bool) -> Self::Feedback; + /// Returns `true` when this feedback should count as backoff. + fn is_backoff(feedback: &Self::Feedback) -> bool; + /// Returns `true` when this feedback means the receiver is closed. + fn is_closed(feedback: &Self::Feedback) -> bool; +} + +impl Mode for mode::Reliable { + type Overflow = T::Overflow; + type Feedback = Feedback; + + fn handle(overflow: &mut Self::Overflow, message: T) -> bool { + T::handle(overflow, message); + true + } + + fn ready_feedback(feedback: Feedback) -> Self::Feedback { + feedback + } + + fn overflow_feedback(_handled: bool) -> Self::Feedback { + Feedback::Backoff + } + + fn is_backoff(feedback: &Self::Feedback) -> bool { + *feedback == Feedback::Backoff + } + + fn is_closed(feedback: &Self::Feedback) -> bool { + *feedback == Feedback::Closed + } +} + +impl Mode for mode::Unreliable { + type Overflow = T::Overflow; + type Feedback = Unreliable; + + fn handle(overflow: &mut Self::Overflow, message: T) -> bool { + T::handle(overflow, message) + } + + fn ready_feedback(feedback: Feedback) -> Self::Feedback { + Unreliable::new(feedback) + } + + fn overflow_feedback(handled: bool) -> Self::Feedback { + if handled { + Unreliable::new(Feedback::Backoff) + } else { + Unreliable::Rejected + } + } + + fn is_backoff(feedback: &Self::Feedback) -> bool { + *feedback == Unreliable::new(Feedback::Backoff) + } + + fn is_closed(feedback: &Self::Feedback) -> bool { + *feedback == Unreliable::new(Feedback::Closed) + } +} + +/// Sender half of a mailbox. +pub struct Sender { + state: Arc>, +} + +/// Sender half of an unreliable mailbox. +pub struct UnreliableSender { + state: Arc>, +} + +impl Clone for Sender { + fn clone(&self) -> Self { + Self { + state: clone_sender_state(&self.state), + } + } +} + +impl Clone for UnreliableSender { + fn clone(&self) -> Self { + Self { + state: clone_sender_state(&self.state), + } + } +} + +impl Drop for Sender { + fn drop(&mut self) { + drop_sender_state(&self.state); + } +} + +impl Drop for UnreliableSender { + fn drop(&mut self) { + drop_sender_state(&self.state); + } +} + +impl fmt::Debug for Sender { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt_sender_state("Sender", &self.state, f) + } +} + +impl fmt::Debug for UnreliableSender { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt_sender_state("UnreliableSender", &self.state, f) + } +} + +impl Sender { + /// Submit a message without waiting for inbox capacity. + #[must_use = "caller must handle enqueue feedback"] + pub fn enqueue(&self, message: T) -> Feedback { + self.state.enqueue(message) + } +} + +impl UnreliableSender { + /// Submit a message without waiting for inbox capacity, allowing policy rejection. + #[must_use = "caller must handle enqueue feedback"] + pub fn enqueue(&self, message: T) -> Unreliable { + self.state.enqueue(message) + } +} + +/// Receiver half of a mailbox. +/// +/// Dropping the receiver closes the mailbox and drains buffered messages. +/// +/// Dropping the last sender disconnects the mailbox, but the receiver continues +/// returning buffered messages until ready and overflow are empty. +pub struct Receiver { + state: Arc>, +} + +/// Receiver half of an unreliable mailbox. +/// +/// Dropping the receiver closes the mailbox and drains buffered messages. +/// +/// Dropping the last sender disconnects the mailbox, but the receiver continues +/// returning buffered messages until ready and overflow are empty. +pub struct UnreliableReceiver { + state: Arc>, +} + +impl Receiver { + /// Receive the next message. + /// + /// Returns `None` after all senders are dropped and all buffered messages + /// have been drained. + pub async fn recv(&mut self) -> Option { + recv_from(&self.state).await + } + + /// Try to receive the next message without waiting. + /// + /// Returns [`TryRecvError::Disconnected`] after all senders are dropped and + /// all buffered messages have been drained. + pub fn try_recv(&mut self) -> Result { + try_recv_from(&self.state) + } +} + +impl UnreliableReceiver { + /// Receive the next message. + /// + /// Returns `None` after all senders are dropped and all buffered messages + /// have been drained. + pub async fn recv(&mut self) -> Option { + recv_from(&self.state).await + } + + /// Try to receive the next message without waiting. + /// + /// Returns [`TryRecvError::Disconnected`] after all senders are dropped and + /// all buffered messages have been drained. + pub fn try_recv(&mut self) -> Result { + try_recv_from(&self.state) + } +} + +impl Drop for Receiver { + fn drop(&mut self) { + self.state.close(); + } +} + +impl Drop for UnreliableReceiver { + fn drop(&mut self) { + self.state.close(); + } +} + +/// Create a new bounded mailbox. +pub fn new(metrics: impl Metrics, capacity: NonZeroUsize) -> (Sender, Receiver) { + let state = new_state(metrics, capacity); + ( + Sender { + state: state.clone(), + }, + Receiver { state }, + ) +} + +/// Create a new bounded unreliable mailbox. +pub fn new_unreliable( + metrics: impl Metrics, + capacity: NonZeroUsize, +) -> (UnreliableSender, UnreliableReceiver) { + let state = new_state(metrics, capacity); + ( + UnreliableSender { + state: state.clone(), + }, + UnreliableReceiver { state }, + ) +} + // `activity` packs the published overflow state and in-flight overflow // mutations into one atomic word. The overflow lock serializes actual // overflow changes (this word lets the ready fast path avoid that lock when @@ -260,17 +525,19 @@ cfg_if::cfg_if! { } } -struct OverflowState { - queue: Mutex, +struct OverflowState> { + queue: Mutex, activity: AtomicUsize, + _phantom: PhantomData T>, } -impl OverflowState { +impl> OverflowState { #[allow(clippy::missing_const_for_fn)] fn new() -> Self { Self { - queue: Mutex::new(T::Overflow::default()), + queue: Mutex::new(M::Overflow::default()), activity: AtomicUsize::new(0), + _phantom: PhantomData, } } @@ -282,13 +549,18 @@ impl OverflowState { ready.push(message) } - fn enqueue(&self, ready: &Ready, message: T, is_closed: impl Fn() -> bool) -> Feedback { + fn enqueue_overflow( + &self, + ready: &Ready, + message: T, + is_closed: impl Fn() -> bool, + ) -> M::Feedback { // Mark overflow active so racing senders stay off the ready fast path. let mutation = Mutation::begin(&self.activity); let mut queue = lock(&self.queue); if is_closed() { mutation.publish(queue.is_empty()); - return Feedback::Closed; + return M::ready_feedback(Feedback::Closed); } // The fast-path push may have observed stale ready fullness. Retry @@ -298,7 +570,7 @@ impl OverflowState { match ready.push(message) { Ok(()) => { mutation.publish(queue.is_empty()); - return Feedback::Ok; + return M::ready_feedback(Feedback::Ok); } Err(message) => message, } @@ -307,13 +579,9 @@ impl OverflowState { }; // Preserve overflow order, or handle a still-full ready queue. - let handled = T::handle(&mut queue, message); + let handled = M::handle(&mut queue, message); mutation.publish(queue.is_empty()); - if handled { - Feedback::Backoff - } else { - Feedback::Rejected - } + M::overflow_feedback(handled) } fn refill(&self, ready: &Ready) { @@ -381,105 +649,56 @@ impl Drop for Mutation<'_> { } } -struct State { +struct State> { ready: Ready, - overflow: OverflowState, + overflow: OverflowState, backoff: Counter, closed: AtomicBool, senders: AtomicUsize, waker: AtomicWaker, } -/// Sender half of a mailbox. -pub struct Sender { - state: Arc>, -} - -impl Clone for Sender { - fn clone(&self) -> Self { - // Live sender count drives receiver disconnect detection. - self.state.senders.fetch_add(1, Ordering::Relaxed); - Self { - state: self.state.clone(), - } - } -} - -impl Drop for Sender { - fn drop(&mut self) { - let previous = self.state.senders.fetch_sub(1, Ordering::AcqRel); - assert!(previous > 0); - // Wake a receiver that is parked waiting for data or disconnect. - if previous == 1 { - self.state.waker.wake(); - } - } -} - -impl fmt::Debug for Sender { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Sender") - .field("capacity", &self.state.ready.capacity()) - .field("closed", &self.state.closed.load(Ordering::Acquire)) - .finish() - } -} - -impl Sender { - /// Submit a message without waiting for inbox capacity. - #[must_use = "caller must handle enqueue feedback"] - pub fn enqueue(&self, message: T) -> Feedback { +impl> State { + fn enqueue(&self, message: T) -> M::Feedback { // Receiver closure makes new sends fail immediately. - if self.state.closed.load(Ordering::Acquire) { - return Feedback::Closed; + if self.closed.load(Ordering::Acquire) { + return M::ready_feedback(Feedback::Closed); } // Common case: publish directly to ready without taking overflow lock. - let message = match self.state.overflow.try_ready(&self.state.ready, message) { + let message = match self.overflow.try_ready(&self.ready, message) { Ok(()) => { - if self.state.closed.load(Ordering::Acquire) { - self.state.overflow.drain(&self.state.ready); - return Feedback::Closed; + if self.closed.load(Ordering::Acquire) { + self.overflow.drain(&self.ready); + return M::ready_feedback(Feedback::Closed); } - self.state.waker.wake(); - return Feedback::Ok; + self.waker.wake(); + return M::ready_feedback(Feedback::Ok); } Err(message) => message, }; // Slow path: serialize through overflow and apply the policy. - let feedback = self.state.overflow.enqueue(&self.state.ready, message, || { - self.state.closed.load(Ordering::Acquire) - }); + let feedback = self + .overflow + .enqueue_overflow(&self.ready, message, || self.closed.load(Ordering::Acquire)); // Record any backoff. - if feedback == Feedback::Backoff { - self.state.backoff.inc(); + if M::is_backoff(&feedback) { + self.backoff.inc(); } // Wake after any non-closed slow-path enqueue because a receiver may // have skipped refill while this overflow mutation was active. By the // time we wake, the mutation has published its overflow state. Spurious // wakes are acceptable. - if feedback != Feedback::Closed { - self.state.waker.wake(); + if !M::is_closed(&feedback) { + self.waker.wake(); } feedback } -} -/// Receiver half of a mailbox. -/// -/// Dropping the receiver closes the mailbox and drains buffered messages. -/// -/// Dropping the last sender disconnects the mailbox, but the receiver continues -/// returning buffered messages until ready and overflow are empty. -pub struct Receiver { - state: Arc>, -} - -impl Receiver { - fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll> { + fn poll_recv(&self, cx: &mut Context<'_>) -> Poll> { // Fast path avoids waker churn when a message is already ready. if let Some(message) = self.pop() { return Poll::Ready(Some(message)); @@ -489,7 +708,7 @@ impl Receiver { return Poll::Ready(self.pop()); } - register_waker(&self.state.waker, cx.waker()); + register_waker(&self.waker, cx.waker()); // A sender can enqueue and wake after the first pop but before this // waker is installed. Re-check before sleeping so the wake is not lost. @@ -504,69 +723,78 @@ impl Receiver { } } - fn pop(&mut self) -> Option { - if let Some(message) = self.state.ready.pop() { + fn pop(&self) -> Option { + if let Some(message) = self.ready.pop() { // A freed ready slot may let the oldest overflow message advance. - self.state.overflow.refill(&self.state.ready); + self.overflow.refill(&self.ready); return Some(message); } // Empty ready may race with stale activity, so let `refill` // decide whether overflow is worth locking. - self.state.overflow.refill(&self.state.ready); - self.state.ready.pop() + self.overflow.refill(&self.ready); + self.ready.pop() } fn is_disconnected(&self) -> bool { - self.state.closed.load(Ordering::Acquire) || self.state.senders.load(Ordering::Acquire) == 0 - } - - /// Receive the next message. - /// - /// Returns `None` after all senders are dropped and all buffered messages - /// have been drained. - pub async fn recv(&mut self) -> Option { - poll_fn(|cx| self.poll_recv(cx)).await + self.closed.load(Ordering::Acquire) || self.senders.load(Ordering::Acquire) == 0 } - /// Try to receive the next message without waiting. - /// - /// Returns [`TryRecvError::Disconnected`] after all senders are dropped and - /// all buffered messages have been drained. - pub fn try_recv(&mut self) -> Result { - if let Some(message) = self.pop() { - return Ok(message); - } - if self.is_disconnected() { - return self.pop().ok_or(TryRecvError::Disconnected); - } - Err(TryRecvError::Empty) + fn close(&self) { + self.closed.store(true, Ordering::Release); + self.overflow.drain(&self.ready); } } -impl Drop for Receiver { - fn drop(&mut self) { - self.state.closed.store(true, Ordering::Release); - self.state.overflow.drain(&self.state.ready); - } -} - -/// Create a new bounded mailbox. -pub fn new(metrics: impl Metrics, capacity: NonZeroUsize) -> (Sender, Receiver) { - let state = Arc::new(State { +fn new_state>(metrics: impl Metrics, capacity: NonZeroUsize) -> Arc> { + Arc::new(State { ready: Ready::new(capacity.get()), overflow: OverflowState::new(), backoff: metrics.counter("backoff", "number of enqueue calls that requested backoff"), closed: AtomicBool::new(false), senders: AtomicUsize::new(1), waker: AtomicWaker::new(), - }); - ( - Sender { - state: state.clone(), - }, - Receiver { state }, - ) + }) +} + +fn clone_sender_state>(state: &Arc>) -> Arc> { + // Live sender count drives receiver disconnect detection. + state.senders.fetch_add(1, Ordering::Relaxed); + state.clone() +} + +fn drop_sender_state>(state: &State) { + let previous = state.senders.fetch_sub(1, Ordering::AcqRel); + assert!(previous > 0); + // Wake a receiver that is parked waiting for data or disconnect. + if previous == 1 { + state.waker.wake(); + } +} + +fn fmt_sender_state>( + name: &str, + state: &State, + f: &mut fmt::Formatter<'_>, +) -> fmt::Result { + f.debug_struct(name) + .field("capacity", &state.ready.capacity()) + .field("closed", &state.closed.load(Ordering::Acquire)) + .finish() +} + +async fn recv_from>(state: &State) -> Option { + poll_fn(|cx| state.poll_recv(cx)).await +} + +fn try_recv_from>(state: &State) -> Result { + if let Some(message) = state.pop() { + return Ok(message); + } + if state.is_disconnected() { + return state.pop().ok_or(TryRecvError::Disconnected); + } + Err(TryRecvError::Empty) } #[cfg(test)] @@ -631,6 +859,12 @@ mod tests { super::new(mocks::Metrics, capacity) } + fn new_unreliable( + capacity: NonZeroUsize, + ) -> (UnreliableSender, UnreliableReceiver) { + super::new_unreliable(mocks::Metrics, capacity) + } + #[derive(Debug, PartialEq, Eq)] enum Message { Update(u64), @@ -640,7 +874,7 @@ mod tests { Hint(u64), } - impl Policy for Message { + impl UnreliablePolicy for Message { type Overflow = VecDeque; fn handle(overflow: &mut VecDeque, message: Self) -> bool { @@ -682,9 +916,8 @@ mod tests { impl Policy for Ack { type Overflow = VecDeque; - fn handle(overflow: &mut VecDeque, message: Self) -> bool { + fn handle(overflow: &mut VecDeque, message: Self) { overflow.push_back(message); - true } } @@ -728,10 +961,19 @@ mod tests { #[test_async] async fn full_inbox_replaces_stale_overflow_message() { - let (sender, mut receiver) = new(NZUsize!(1)); - assert_eq!(sender.enqueue(Message::Update(1)), Feedback::Ok); - assert_eq!(sender.enqueue(Message::Update(2)), Feedback::Backoff); - assert_eq!(sender.enqueue(Message::Update(3)), Feedback::Backoff); + let (sender, mut receiver) = new_unreliable(NZUsize!(1)); + assert_eq!( + sender.enqueue(Message::Update(1)), + Unreliable::new(Feedback::Ok) + ); + assert_eq!( + sender.enqueue(Message::Update(2)), + Unreliable::new(Feedback::Backoff) + ); + assert_eq!( + sender.enqueue(Message::Update(3)), + Unreliable::new(Feedback::Backoff) + ); assert_eq!(receiver.recv().await, Some(Message::Update(1))); assert_eq!(receiver.recv().await, Some(Message::Update(3))); @@ -739,11 +981,23 @@ mod tests { #[test_async] async fn policy_can_replace_stale_overflow_at_back() { - let (sender, mut receiver) = new(NZUsize!(1)); - assert_eq!(sender.enqueue(Message::Vote(1)), Feedback::Ok); - assert_eq!(sender.enqueue(Message::Update(2)), Feedback::Backoff); - assert_eq!(sender.enqueue(Message::Required(3)), Feedback::Backoff); - assert_eq!(sender.enqueue(Message::Update(4)), Feedback::Backoff); + let (sender, mut receiver) = new_unreliable(NZUsize!(1)); + assert_eq!( + sender.enqueue(Message::Vote(1)), + Unreliable::new(Feedback::Ok) + ); + assert_eq!( + sender.enqueue(Message::Update(2)), + Unreliable::new(Feedback::Backoff) + ); + assert_eq!( + sender.enqueue(Message::Required(3)), + Unreliable::new(Feedback::Backoff) + ); + assert_eq!( + sender.enqueue(Message::Update(4)), + Unreliable::new(Feedback::Backoff) + ); assert_eq!(receiver.recv().await, Some(Message::Vote(1))); assert_eq!(receiver.recv().await, Some(Message::Required(3))); @@ -752,18 +1006,27 @@ mod tests { #[test_async] async fn full_inbox_rejects_non_replaceable_message() { - let (sender, mut receiver) = new(NZUsize!(1)); - assert_eq!(sender.enqueue(Message::Vote(1)), Feedback::Ok); - assert_eq!(sender.enqueue(Message::Vote(2)), Feedback::Rejected); + let (sender, mut receiver) = new_unreliable(NZUsize!(1)); + assert_eq!( + sender.enqueue(Message::Vote(1)), + Unreliable::new(Feedback::Ok) + ); + assert_eq!(sender.enqueue(Message::Vote(2)), Unreliable::Rejected); assert_eq!(receiver.recv().await, Some(Message::Vote(1))); } #[test_async] async fn full_inbox_retains_required_message() { - let (sender, mut receiver) = new(NZUsize!(1)); - assert_eq!(sender.enqueue(Message::Vote(1)), Feedback::Ok); - assert_eq!(sender.enqueue(Message::Buffered(2)), Feedback::Backoff); + let (sender, mut receiver) = new_unreliable(NZUsize!(1)); + assert_eq!( + sender.enqueue(Message::Vote(1)), + Unreliable::new(Feedback::Ok) + ); + assert_eq!( + sender.enqueue(Message::Buffered(2)), + Unreliable::new(Feedback::Backoff) + ); assert_eq!(receiver.recv().await, Some(Message::Vote(1))); assert_eq!(receiver.recv().await, Some(Message::Buffered(2))); @@ -771,9 +1034,15 @@ mod tests { #[test] fn try_recv_refills_from_overflow() { - let (sender, mut receiver) = new(NZUsize!(1)); - assert_eq!(sender.enqueue(Message::Vote(1)), Feedback::Ok); - assert_eq!(sender.enqueue(Message::Buffered(2)), Feedback::Backoff); + let (sender, mut receiver) = new_unreliable(NZUsize!(1)); + assert_eq!( + sender.enqueue(Message::Vote(1)), + Unreliable::new(Feedback::Ok) + ); + assert_eq!( + sender.enqueue(Message::Buffered(2)), + Unreliable::new(Feedback::Backoff) + ); assert_eq!(receiver.try_recv(), Ok(Message::Vote(1))); assert_eq!(receiver.try_recv(), Ok(Message::Buffered(2))); @@ -783,10 +1052,19 @@ mod tests { fn backoff_metric_counts_backoff_feedback() { let executor = deterministic::Runner::default(); executor.start(|context| async move { - let (sender, _receiver) = super::new(context.child("mailbox"), NZUsize!(1)); - assert_eq!(sender.enqueue(Message::Vote(1)), Feedback::Ok); - assert_eq!(sender.enqueue(Message::Buffered(2)), Feedback::Backoff); - assert_eq!(sender.enqueue(Message::Buffered(3)), Feedback::Backoff); + let (sender, _receiver) = super::new_unreliable(context.child("mailbox"), NZUsize!(1)); + assert_eq!( + sender.enqueue(Message::Vote(1)), + Unreliable::new(Feedback::Ok) + ); + assert_eq!( + sender.enqueue(Message::Buffered(2)), + Unreliable::new(Feedback::Backoff) + ); + assert_eq!( + sender.enqueue(Message::Buffered(3)), + Unreliable::new(Feedback::Backoff) + ); let buffer = context.encode(); assert!( @@ -797,14 +1075,17 @@ mod tests { } #[test] - fn rejected_feedback_is_not_accepted_or_counted_as_backoff() { + fn unreliable_rejected_feedback_is_not_accepted_or_counted_as_backoff() { let executor = deterministic::Runner::default(); executor.start(|context| async move { - let (sender, _receiver) = super::new(context.child("mailbox"), NZUsize!(1)); - assert_eq!(sender.enqueue(Message::Vote(1)), Feedback::Ok); + let (sender, _receiver) = super::new_unreliable(context.child("mailbox"), NZUsize!(1)); + assert_eq!( + sender.enqueue(Message::Vote(1)), + Unreliable::new(Feedback::Ok) + ); let feedback = sender.enqueue(Message::Vote(2)); - assert_eq!(feedback, Feedback::Rejected); + assert_eq!(feedback, Unreliable::Rejected); assert!(!feedback.accepted()); let buffer = context.encode(); @@ -817,9 +1098,15 @@ mod tests { #[test] fn try_recv_drains_buffered_messages_after_senders_drop() { - let (sender, mut receiver) = new(NZUsize!(1)); - assert_eq!(sender.enqueue(Message::Vote(1)), Feedback::Ok); - assert_eq!(sender.enqueue(Message::Buffered(2)), Feedback::Backoff); + let (sender, mut receiver) = new_unreliable(NZUsize!(1)); + assert_eq!( + sender.enqueue(Message::Vote(1)), + Unreliable::new(Feedback::Ok) + ); + assert_eq!( + sender.enqueue(Message::Buffered(2)), + Unreliable::new(Feedback::Backoff) + ); drop(sender); assert_eq!(receiver.try_recv(), Ok(Message::Vote(1))); @@ -829,53 +1116,86 @@ mod tests { #[test] fn poll_recv_drains_buffered_messages_after_senders_drop() { - let (sender, mut receiver) = new(NZUsize!(1)); + let (sender, receiver) = new_unreliable(NZUsize!(1)); let wakes = Arc::new(WakeCounter::default()); let waker = waker_ref(&wakes); let mut cx = Context::from_waker(&waker); - assert_eq!(sender.enqueue(Message::Vote(1)), Feedback::Ok); - assert_eq!(sender.enqueue(Message::Buffered(2)), Feedback::Backoff); + assert_eq!( + sender.enqueue(Message::Vote(1)), + Unreliable::new(Feedback::Ok) + ); + assert_eq!( + sender.enqueue(Message::Buffered(2)), + Unreliable::new(Feedback::Backoff) + ); drop(sender); assert_eq!( - receiver.poll_recv(&mut cx), + receiver.state.poll_recv(&mut cx), Poll::Ready(Some(Message::Vote(1))) ); assert_eq!( - receiver.poll_recv(&mut cx), + receiver.state.poll_recv(&mut cx), Poll::Ready(Some(Message::Buffered(2))) ); - assert_eq!(receiver.poll_recv(&mut cx), Poll::Ready(None)); + assert_eq!(receiver.state.poll_recv(&mut cx), Poll::Ready(None)); } #[test] fn enqueue_uses_ready_capacity_after_partial_drain() { - let (sender, mut receiver) = new(NZUsize!(2)); - assert_eq!(sender.enqueue(Message::Vote(1)), Feedback::Ok); - assert_eq!(sender.enqueue(Message::Vote(2)), Feedback::Ok); - assert_eq!(sender.enqueue(Message::Required(3)), Feedback::Backoff); + let (sender, mut receiver) = new_unreliable(NZUsize!(2)); + assert_eq!( + sender.enqueue(Message::Vote(1)), + Unreliable::new(Feedback::Ok) + ); + assert_eq!( + sender.enqueue(Message::Vote(2)), + Unreliable::new(Feedback::Ok) + ); + assert_eq!( + sender.enqueue(Message::Required(3)), + Unreliable::new(Feedback::Backoff) + ); assert_eq!(receiver.try_recv(), Ok(Message::Vote(1))); assert_eq!(receiver.try_recv(), Ok(Message::Vote(2))); - assert_eq!(sender.enqueue(Message::Vote(4)), Feedback::Ok); + assert_eq!( + sender.enqueue(Message::Vote(4)), + Unreliable::new(Feedback::Ok) + ); assert_eq!(receiver.try_recv(), Ok(Message::Required(3))); assert_eq!(receiver.try_recv(), Ok(Message::Vote(4))); } #[test] fn receiver_refills_overflow_after_partial_drain() { - let (sender, mut receiver) = new(NZUsize!(3)); - assert_eq!(sender.enqueue(Message::Vote(1)), Feedback::Ok); - assert_eq!(sender.enqueue(Message::Vote(2)), Feedback::Ok); - assert_eq!(sender.enqueue(Message::Vote(3)), Feedback::Ok); - assert_eq!(sender.enqueue(Message::Required(4)), Feedback::Backoff); + let (sender, mut receiver) = new_unreliable(NZUsize!(3)); + assert_eq!( + sender.enqueue(Message::Vote(1)), + Unreliable::new(Feedback::Ok) + ); + assert_eq!( + sender.enqueue(Message::Vote(2)), + Unreliable::new(Feedback::Ok) + ); + assert_eq!( + sender.enqueue(Message::Vote(3)), + Unreliable::new(Feedback::Ok) + ); + assert_eq!( + sender.enqueue(Message::Required(4)), + Unreliable::new(Feedback::Backoff) + ); assert_eq!(receiver.try_recv(), Ok(Message::Vote(1))); assert_eq!(receiver.try_recv(), Ok(Message::Vote(2))); - assert_eq!(sender.enqueue(Message::Vote(5)), Feedback::Ok); + assert_eq!( + sender.enqueue(Message::Vote(5)), + Unreliable::new(Feedback::Ok) + ); assert_eq!(receiver.try_recv(), Ok(Message::Vote(3))); assert_eq!(receiver.try_recv(), Ok(Message::Required(4))); assert_eq!(receiver.try_recv(), Ok(Message::Vote(5))); @@ -883,9 +1203,15 @@ mod tests { #[test_async] async fn full_inbox_retains_unmatched_replaceable_message() { - let (sender, mut receiver) = new(NZUsize!(1)); - assert_eq!(sender.enqueue(Message::Vote(1)), Feedback::Ok); - assert_eq!(sender.enqueue(Message::Required(2)), Feedback::Backoff); + let (sender, mut receiver) = new_unreliable(NZUsize!(1)); + assert_eq!( + sender.enqueue(Message::Vote(1)), + Unreliable::new(Feedback::Ok) + ); + assert_eq!( + sender.enqueue(Message::Required(2)), + Unreliable::new(Feedback::Backoff) + ); assert_eq!(receiver.recv().await, Some(Message::Vote(1))); assert_eq!(receiver.recv().await, Some(Message::Required(2))); @@ -893,11 +1219,23 @@ mod tests { #[test_async] async fn full_inbox_replaces_stale_overflow_after_ready_fills() { - let (sender, mut receiver) = new(NZUsize!(2)); - assert_eq!(sender.enqueue(Message::Vote(1)), Feedback::Ok); - assert_eq!(sender.enqueue(Message::Update(2)), Feedback::Ok); - assert_eq!(sender.enqueue(Message::Update(3)), Feedback::Backoff); - assert_eq!(sender.enqueue(Message::Update(4)), Feedback::Backoff); + let (sender, mut receiver) = new_unreliable(NZUsize!(2)); + assert_eq!( + sender.enqueue(Message::Vote(1)), + Unreliable::new(Feedback::Ok) + ); + assert_eq!( + sender.enqueue(Message::Update(2)), + Unreliable::new(Feedback::Ok) + ); + assert_eq!( + sender.enqueue(Message::Update(3)), + Unreliable::new(Feedback::Backoff) + ); + assert_eq!( + sender.enqueue(Message::Update(4)), + Unreliable::new(Feedback::Backoff) + ); assert_eq!(receiver.recv().await, Some(Message::Vote(1))); assert_eq!(receiver.recv().await, Some(Message::Update(2))); @@ -906,10 +1244,19 @@ mod tests { #[test_async] async fn mailbox_capacity_is_soft_limit_for_required_messages() { - let (sender, mut receiver) = new(NZUsize!(1)); - assert_eq!(sender.enqueue(Message::Vote(1)), Feedback::Ok); - assert_eq!(sender.enqueue(Message::Required(2)), Feedback::Backoff); - assert_eq!(sender.enqueue(Message::Required(3)), Feedback::Backoff); + let (sender, mut receiver) = new_unreliable(NZUsize!(1)); + assert_eq!( + sender.enqueue(Message::Vote(1)), + Unreliable::new(Feedback::Ok) + ); + assert_eq!( + sender.enqueue(Message::Required(2)), + Unreliable::new(Feedback::Backoff) + ); + assert_eq!( + sender.enqueue(Message::Required(3)), + Unreliable::new(Feedback::Backoff) + ); assert_eq!(receiver.recv().await, Some(Message::Vote(1))); assert_eq!(receiver.recv().await, Some(Message::Required(2))); @@ -918,19 +1265,34 @@ mod tests { #[test_async] async fn full_inbox_rejects_hint() { - let (sender, mut receiver) = new(NZUsize!(1)); - assert_eq!(sender.enqueue(Message::Vote(1)), Feedback::Ok); - assert_eq!(sender.enqueue(Message::Hint(2)), Feedback::Backoff); + let (sender, mut receiver) = new_unreliable(NZUsize!(1)); + assert_eq!( + sender.enqueue(Message::Vote(1)), + Unreliable::new(Feedback::Ok) + ); + assert_eq!( + sender.enqueue(Message::Hint(2)), + Unreliable::new(Feedback::Backoff) + ); assert_eq!(receiver.recv().await, Some(Message::Vote(1))); } #[test_async] async fn full_inbox_can_replace_or_drop_by_message() { - let (sender, mut receiver) = new(NZUsize!(1)); - assert_eq!(sender.enqueue(Message::Vote(1)), Feedback::Ok); - assert_eq!(sender.enqueue(Message::Update(2)), Feedback::Backoff); - assert_eq!(sender.enqueue(Message::Hint(3)), Feedback::Backoff); + let (sender, mut receiver) = new_unreliable(NZUsize!(1)); + assert_eq!( + sender.enqueue(Message::Vote(1)), + Unreliable::new(Feedback::Ok) + ); + assert_eq!( + sender.enqueue(Message::Update(2)), + Unreliable::new(Feedback::Backoff) + ); + assert_eq!( + sender.enqueue(Message::Hint(3)), + Unreliable::new(Feedback::Backoff) + ); assert_eq!(receiver.recv().await, Some(Message::Vote(1))); assert_eq!(receiver.recv().await, Some(Message::Hint(3))); @@ -938,45 +1300,51 @@ mod tests { #[test_async] async fn empty_inbox_wakes_on_enqueue() { - let (sender, mut receiver) = new(NZUsize!(1)); + let (sender, mut receiver) = new_unreliable(NZUsize!(1)); let next = receiver.recv(); pin_mut!(next); assert!(next.as_mut().now_or_never().is_none()); - assert_eq!(sender.enqueue(Message::Vote(1)), Feedback::Ok); + assert_eq!( + sender.enqueue(Message::Vote(1)), + Unreliable::new(Feedback::Ok) + ); assert_eq!(next.await, Some(Message::Vote(1))); } #[test] fn pending_recv_wakes_when_senders_drop() { - let (sender, mut receiver) = new::(NZUsize!(1)); + let (sender, receiver) = new_unreliable::(NZUsize!(1)); let wakes = Arc::new(WakeCounter::default()); let waker = waker_ref(&wakes); let mut cx = Context::from_waker(&waker); - assert_eq!(receiver.poll_recv(&mut cx), Poll::Pending); + assert_eq!(receiver.state.poll_recv(&mut cx), Poll::Pending); assert_eq!(wakes.count(), 0); drop(sender); assert_eq!(wakes.count(), 1); - assert_eq!(receiver.poll_recv(&mut cx), Poll::Ready(None)); + assert_eq!(receiver.state.poll_recv(&mut cx), Poll::Ready(None)); } #[test] fn pending_recv_wakes_on_handled_overflow_enqueue() { - let (sender, mut receiver) = new(NZUsize!(1)); + let (sender, mut receiver) = new_unreliable(NZUsize!(1)); let wakes = Arc::new(WakeCounter::default()); let waker = waker_ref(&wakes); let mut cx = Context::from_waker(&waker); - assert_eq!(receiver.poll_recv(&mut cx), Poll::Pending); + assert_eq!(receiver.state.poll_recv(&mut cx), Poll::Pending); assert_eq!(wakes.count(), 0); // Prime ready directly to isolate the overflow wake after registration. assert_eq!(sender.state.ready.push(Message::Vote(1)), Ok(())); - assert_eq!(sender.enqueue(Message::Buffered(2)), Feedback::Backoff); + assert_eq!( + sender.enqueue(Message::Buffered(2)), + Unreliable::new(Feedback::Backoff) + ); assert_eq!(wakes.count(), 1); assert_eq!(receiver.try_recv(), Ok(Message::Vote(1))); @@ -985,21 +1353,24 @@ mod tests { #[test] fn receiver_drop_blocks_ready_fast_path_feedback() { - let (sender, mut receiver) = new(NZUsize!(1)); + let (sender, receiver) = new_unreliable(NZUsize!(1)); let wakes = Arc::new(WakeCounter::default()); let waker = waker_ref(&wakes); let mut cx = Context::from_waker(&waker); - assert_eq!(receiver.poll_recv(&mut cx), Poll::Pending); + assert_eq!(receiver.state.poll_recv(&mut cx), Poll::Pending); drop(receiver); - assert_eq!(sender.enqueue(Message::Vote(1)), Feedback::Closed); + assert_eq!( + sender.enqueue(Message::Vote(1)), + Unreliable::new(Feedback::Closed) + ); assert_eq!(wakes.count(), 0); } #[test_async] async fn empty_inbox_closes_when_senders_drop() { - let (sender, mut receiver) = new::(NZUsize!(1)); + let (sender, mut receiver) = new_unreliable::(NZUsize!(1)); drop(sender); assert_eq!(receiver.try_recv(), Err(TryRecvError::Disconnected)); @@ -1008,10 +1379,13 @@ mod tests { #[test] fn enqueue_after_receiver_drop_returns_closed() { - let (sender, receiver) = new(NZUsize!(1)); + let (sender, receiver) = new_unreliable(NZUsize!(1)); drop(receiver); - assert_eq!(sender.enqueue(Message::Vote(1)), Feedback::Closed); + assert_eq!( + sender.enqueue(Message::Vote(1)), + Unreliable::new(Feedback::Closed) + ); } #[test_async] @@ -1042,10 +1416,9 @@ mod tests { impl Policy for ClearingMessage { type Overflow = VecDeque; - fn handle(overflow: &mut VecDeque, message: Self) -> bool { + fn handle(overflow: &mut VecDeque, message: Self) { overflow.push_back(message); overflow.clear(); - true } } @@ -1074,9 +1447,8 @@ mod tests { impl Policy for SpillMessage { type Overflow = VecDeque; - fn handle(overflow: &mut VecDeque, message: Self) -> bool { + fn handle(overflow: &mut VecDeque, message: Self) { overflow.push_back(message); - true } } @@ -1087,7 +1459,7 @@ mod tests { let waker = waker_ref(&wakes); let mut cx = Context::from_waker(&waker); - assert_eq!(receiver.poll_recv(&mut cx), Poll::Pending); + assert_eq!(receiver.state.poll_recv(&mut cx), Poll::Pending); assert_eq!(wakes.count(), 0); assert_eq!(sender.state.ready.push(SpillMessage::FillReady), Ok(())); @@ -1120,6 +1492,12 @@ mod loom_tests { super::new(mocks::Metrics, capacity) } + fn new_unreliable( + capacity: NonZeroUsize, + ) -> (UnreliableSender, UnreliableReceiver) { + super::new_unreliable(mocks::Metrics, capacity) + } + #[derive(Clone, Copy, Debug, PartialEq, Eq)] enum Message { Drop(u8), @@ -1165,7 +1543,7 @@ mod loom_tests { } } - impl Policy for Message { + impl UnreliablePolicy for Message { type Overflow = VecDeque; fn handle(overflow: &mut VecDeque, message: Self) -> bool { @@ -1182,7 +1560,7 @@ mod loom_tests { impl Policy for OrderedMessage { type Overflow = VecDeque; - fn handle(overflow: &mut VecDeque, message: Self) -> bool { + fn handle(overflow: &mut VecDeque, message: Self) { let gate = match &message { Self::Item(_) => None, Self::Coordinated(_, gate) => Some(gate.clone()), @@ -1194,11 +1572,10 @@ mod loom_tests { thread::yield_now(); } } - true } } - impl Policy for ReplacingMessage { + impl UnreliablePolicy for ReplacingMessage { type Overflow = VecDeque; fn handle(overflow: &mut VecDeque, message: Self) -> bool { @@ -1223,18 +1600,16 @@ mod loom_tests { impl Policy for TrackedMessage { type Overflow = VecDeque; - fn handle(overflow: &mut VecDeque, message: Self) -> bool { + fn handle(overflow: &mut VecDeque, message: Self) { overflow.push_back(message); - true } } impl Policy for CyclicMessage { type Overflow = VecDeque; - fn handle(overflow: &mut VecDeque, message: Self) -> bool { + fn handle(overflow: &mut VecDeque, message: Self) { overflow.push_back(message); - true } } @@ -1307,7 +1682,7 @@ mod loom_tests { #[test] fn sender_drop_racing_waker_registration_wakes_or_disconnects() { loom::model(|| { - let (sender, mut receiver) = new::(NZUsize!(1)); + let (sender, receiver) = new_unreliable::(NZUsize!(1)); let wakes = Arc::new(AtomicUsize::new(0)); let waker = counting_waker(wakes.clone()); let mut cx = Context::from_waker(&waker); @@ -1316,14 +1691,14 @@ mod loom_tests { drop(sender); }); - let poll = receiver.poll_recv(&mut cx); + let poll = receiver.state.poll_recv(&mut cx); close.join().unwrap(); match poll { Poll::Ready(None) => {} Poll::Pending => { assert!(wakes.load(Ordering::Acquire) > 0); - assert_eq!(receiver.poll_recv(&mut cx), Poll::Ready(None)); + assert_eq!(receiver.state.poll_recv(&mut cx), Poll::Ready(None)); } Poll::Ready(Some(_)) => panic!("unexpected message"), } @@ -1333,16 +1708,19 @@ mod loom_tests { #[test] fn sender_enqueue_then_drop_racing_poll_recv_drains_message() { loom::model(|| { - let (sender, mut receiver) = new::(NZUsize!(1)); + let (sender, receiver) = new_unreliable::(NZUsize!(1)); let wakes = Arc::new(AtomicUsize::new(0)); let waker = counting_waker(wakes.clone()); let mut cx = Context::from_waker(&waker); let enqueue = thread::spawn(move || { - assert_eq!(sender.enqueue(Message::Spill(0)), Feedback::Ok); + assert_eq!( + sender.enqueue(Message::Spill(0)), + Unreliable::new(Feedback::Ok) + ); }); - let poll = receiver.poll_recv(&mut cx); + let poll = receiver.state.poll_recv(&mut cx); enqueue.join().unwrap(); match poll { @@ -1350,7 +1728,7 @@ mod loom_tests { Poll::Pending => { assert!(wakes.load(Ordering::Acquire) > 0); assert_eq!( - receiver.poll_recv(&mut cx), + receiver.state.poll_recv(&mut cx), Poll::Ready(Some(Message::Spill(0))) ); } @@ -1358,17 +1736,20 @@ mod loom_tests { Poll::Ready(Some(message)) => panic!("unexpected message: {message:?}"), } - assert_eq!(receiver.poll_recv(&mut cx), Poll::Ready(None)); + assert_eq!(receiver.state.poll_recv(&mut cx), Poll::Ready(None)); }); } #[test] fn sender_enqueue_then_drop_racing_try_recv_drains_message() { loom::model(|| { - let (sender, mut receiver) = new::(NZUsize!(1)); + let (sender, mut receiver) = new_unreliable::(NZUsize!(1)); let enqueue = thread::spawn(move || { - assert_eq!(sender.enqueue(Message::Spill(0)), Feedback::Ok); + assert_eq!( + sender.enqueue(Message::Spill(0)), + Unreliable::new(Feedback::Ok) + ); }); let result = receiver.try_recv(); @@ -1392,7 +1773,7 @@ mod loom_tests { #[test] fn handled_enqueue_wakes_registered_receiver() { loom::model(|| { - let (sender, mut receiver) = new::(NZUsize!(1)); + let (sender, mut receiver) = new_unreliable::(NZUsize!(1)); let wakes = Arc::new(AtomicUsize::new(0)); let waker = counting_waker(wakes.clone()); let mut cx = Context::from_waker(&waker); @@ -1400,7 +1781,10 @@ mod loom_tests { let next = receiver.recv(); pin_mut!(next); assert!(matches!(next.as_mut().poll(&mut cx), Poll::Pending)); - assert_eq!(sender.enqueue(Message::Spill(0)), Feedback::Ok); + assert_eq!( + sender.enqueue(Message::Spill(0)), + Unreliable::new(Feedback::Ok) + ); assert_eq!(wakes.load(Ordering::Acquire), 1); assert_eq!( @@ -1413,12 +1797,12 @@ mod loom_tests { #[test] fn receiver_drop_racing_ready_fast_path_feedback_wakes_if_ready() { loom::model(|| { - let (sender, mut receiver) = new::(NZUsize!(1)); + let (sender, receiver) = new_unreliable::(NZUsize!(1)); let wakes = Arc::new(AtomicUsize::new(0)); let waker = counting_waker(wakes.clone()); let mut cx = Context::from_waker(&waker); - assert_eq!(receiver.poll_recv(&mut cx), Poll::Pending); + assert_eq!(receiver.state.poll_recv(&mut cx), Poll::Pending); let close = thread::spawn(move || { drop(receiver); @@ -1429,9 +1813,12 @@ mod loom_tests { if feedback.accepted() { assert!(wakes.load(Ordering::Acquire) > 0); } else { - assert_eq!(feedback, Feedback::Closed); + assert_eq!(feedback, Unreliable::new(Feedback::Closed)); } - assert_eq!(sender.enqueue(Message::Spill(1)), Feedback::Closed); + assert_eq!( + sender.enqueue(Message::Spill(1)), + Unreliable::new(Feedback::Closed) + ); }); } @@ -1565,7 +1952,7 @@ mod loom_tests { #[test] fn concurrent_close_and_ready_enqueue_remains_closed() { loom::model(|| { - let (sender, receiver) = new::(NZUsize!(1)); + let (sender, receiver) = new_unreliable::(NZUsize!(1)); let enqueue_sender = sender.clone(); let enqueue = thread::spawn(move || { @@ -1578,15 +1965,21 @@ mod loom_tests { enqueue.join().unwrap(); close.join().unwrap(); - assert_eq!(sender.enqueue(Message::Spill(2)), Feedback::Closed); + assert_eq!( + sender.enqueue(Message::Spill(2)), + Unreliable::new(Feedback::Closed) + ); }); } #[test] fn concurrent_close_and_overflow_enqueue_remains_closed() { loom::model(|| { - let (sender, receiver) = new::(NZUsize!(1)); - assert_eq!(sender.enqueue(Message::Drop(0)), Feedback::Ok); + let (sender, receiver) = new_unreliable::(NZUsize!(1)); + assert_eq!( + sender.enqueue(Message::Drop(0)), + Unreliable::new(Feedback::Ok) + ); let enqueue_sender = sender.clone(); let enqueue = thread::spawn(move || { @@ -1599,16 +1992,22 @@ mod loom_tests { enqueue.join().unwrap(); close.join().unwrap(); - assert_eq!(sender.enqueue(Message::Spill(2)), Feedback::Closed); + assert_eq!( + sender.enqueue(Message::Spill(2)), + Unreliable::new(Feedback::Closed) + ); }); } #[test] fn concurrent_spill_and_refill_preserves_messages() { loom::model(|| { - let (sender, mut receiver) = new::(NZUsize!(1)); + let (sender, mut receiver) = new_unreliable::(NZUsize!(1)); let idle_sender = sender.clone(); - assert_eq!(sender.enqueue(Message::Spill(0)), Feedback::Ok); + assert_eq!( + sender.enqueue(Message::Spill(0)), + Unreliable::new(Feedback::Ok) + ); let seen = Arc::new(AtomicUsize::new(0)); let enqueue = thread::spawn(move || { @@ -1639,9 +2038,12 @@ mod loom_tests { #[test] fn concurrent_spill_senders_preserve_messages() { loom::model(|| { - let (sender, mut receiver) = new::(NZUsize!(1)); + let (sender, mut receiver) = new_unreliable::(NZUsize!(1)); let idle_sender = sender.clone(); - assert_eq!(sender.enqueue(Message::Spill(0)), Feedback::Ok); + assert_eq!( + sender.enqueue(Message::Spill(0)), + Unreliable::new(Feedback::Ok) + ); let sender_1 = sender.clone(); let enqueue_1 = thread::spawn(move || sender_1.enqueue(Message::Spill(1))); @@ -1664,20 +2066,29 @@ mod loom_tests { #[test] fn concurrent_replace_keeps_one_overflow_message() { loom::model(|| { - let (sender, mut receiver) = new::(NZUsize!(1)); + let (sender, mut receiver) = new_unreliable::(NZUsize!(1)); let idle_sender = sender.clone(); - assert_eq!(sender.enqueue(ReplacingMessage::FillReady), Feedback::Ok); + assert_eq!( + sender.enqueue(ReplacingMessage::FillReady), + Unreliable::new(Feedback::Ok) + ); assert_eq!( sender.enqueue(ReplacingMessage::Replace(1)), - Feedback::Backoff + Unreliable::new(Feedback::Backoff) ); let sender_1 = sender.clone(); let replace_1 = thread::spawn(move || sender_1.enqueue(ReplacingMessage::Replace(2))); let replace_2 = thread::spawn(move || sender.enqueue(ReplacingMessage::Replace(3))); - assert_eq!(replace_1.join().unwrap(), Feedback::Backoff); - assert_eq!(replace_2.join().unwrap(), Feedback::Backoff); + assert_eq!( + replace_1.join().unwrap(), + Unreliable::new(Feedback::Backoff) + ); + assert_eq!( + replace_2.join().unwrap(), + Unreliable::new(Feedback::Backoff) + ); assert_eq!(receiver.try_recv(), Ok(ReplacingMessage::FillReady)); let retained = replacement_value(receiver.try_recv().unwrap()).unwrap(); @@ -1690,15 +2101,27 @@ mod loom_tests { #[test] fn stale_overflow_hint_retries_ready_before_policy() { loom::model(|| { - let (sender, mut receiver) = new::(NZUsize!(2)); - assert_eq!(sender.enqueue(Message::Drop(0)), Feedback::Ok); - assert_eq!(sender.enqueue(Message::Drop(1)), Feedback::Ok); - assert_eq!(sender.enqueue(Message::Spill(2)), Feedback::Backoff); + let (sender, mut receiver) = new_unreliable::(NZUsize!(2)); + assert_eq!( + sender.enqueue(Message::Drop(0)), + Unreliable::new(Feedback::Ok) + ); + assert_eq!( + sender.enqueue(Message::Drop(1)), + Unreliable::new(Feedback::Ok) + ); + assert_eq!( + sender.enqueue(Message::Spill(2)), + Unreliable::new(Feedback::Backoff) + ); assert_eq!(receiver.try_recv(), Ok(Message::Drop(0))); assert_eq!(receiver.try_recv(), Ok(Message::Drop(1))); - assert_eq!(sender.enqueue(Message::Drop(3)), Feedback::Ok); + assert_eq!( + sender.enqueue(Message::Drop(3)), + Unreliable::new(Feedback::Ok) + ); assert_eq!(receiver.try_recv(), Ok(Message::Spill(2))); assert_eq!(receiver.try_recv(), Ok(Message::Drop(3))); }); diff --git a/broadcast/src/buffered/ingress.rs b/broadcast/src/buffered/ingress.rs index f53beb0c21f..e7373be96dd 100644 --- a/broadcast/src/buffered/ingress.rs +++ b/broadcast/src/buffered/ingress.rs @@ -77,13 +77,12 @@ impl Overflow> for Pending { impl Policy for Message { type Overflow = Pending; - fn handle(overflow: &mut Self::Overflow, message: Self) -> bool { + fn handle(overflow: &mut Self::Overflow, message: Self) { if message.response_closed() { - return true; + return; } overflow.0.push_back(message); - true } } diff --git a/broadcast/src/buffered/mod.rs b/broadcast/src/buffered/mod.rs index bd6a5a80fb6..86865aef7dc 100644 --- a/broadcast/src/buffered/mod.rs +++ b/broadcast/src/buffered/mod.rs @@ -196,13 +196,13 @@ mod tests { let (current_responder, current_receiver) = commonware_utils::channel::oneshot::channel(); drop(current_receiver); - assert!( as Policy>::handle( + as Policy>::handle( &mut overflow, Message::Get { digest: current_get.digest(), responder: current_responder, }, - )); + ); let mut drained = VecDeque::new(); overflow.drain(|message| { diff --git a/collector/fuzz/fuzz_targets/collector.rs b/collector/fuzz/fuzz_targets/collector.rs index 00d267ac940..3f5ab2d997c 100644 --- a/collector/fuzz/fuzz_targets/collector.rs +++ b/collector/fuzz/fuzz_targets/collector.rs @@ -1,7 +1,7 @@ #![no_main] use arbitrary::Arbitrary; -use commonware_actor::Feedback; +use commonware_actor::{Feedback, Unreliable}; use commonware_codec::{ Encode, EncodeSize, Error as CodecError, FixedSize, RangeCfg, Read, ReadExt, ReadRangeExt, Write, @@ -231,8 +231,8 @@ impl CheckedSender for MockCheckedSender { Vec::new() } - fn send(self, _message: impl Into + Send, _priority: bool) -> Feedback { - Feedback::Ok + fn send(self, _message: impl Into + Send, _priority: bool) -> Unreliable { + Unreliable::new(Feedback::Ok) } } diff --git a/collector/src/p2p/ingress.rs b/collector/src/p2p/ingress.rs index 124b413c570..eac7d8aa0a4 100644 --- a/collector/src/p2p/ingress.rs +++ b/collector/src/p2p/ingress.rs @@ -22,7 +22,7 @@ pub enum Message { impl Policy for Message { type Overflow = VecDeque; - fn handle(overflow: &mut Self::Overflow, message: Self) -> bool { + fn handle(overflow: &mut Self::Overflow, message: Self) { match message { Self::Send { request, @@ -57,7 +57,6 @@ impl Policy for Message } } } - true } } diff --git a/collector/src/p2p/mocks/sender.rs b/collector/src/p2p/mocks/sender.rs index ab95819a9f2..5972619b7ee 100644 --- a/collector/src/p2p/mocks/sender.rs +++ b/collector/src/p2p/mocks/sender.rs @@ -1,6 +1,6 @@ //! Mock sender implementations for testing. -use commonware_actor::Feedback; +use commonware_actor::{Feedback, Unreliable}; use commonware_cryptography::PublicKey; use commonware_p2p::{CheckedSender, LimitedSender, Recipients}; use commonware_runtime::IoBufs; @@ -43,7 +43,7 @@ impl CheckedSender for CheckedFailing

{ Vec::new() } - fn send(self, _message: impl Into + Send, _priority: bool) -> Feedback { - Feedback::Closed + fn send(self, _message: impl Into + Send, _priority: bool) -> Unreliable { + Unreliable::new(Feedback::Closed) } } diff --git a/consensus/src/aggregation/mocks/reporter.rs b/consensus/src/aggregation/mocks/reporter.rs index 84793b09b78..b99805e4493 100644 --- a/consensus/src/aggregation/mocks/reporter.rs +++ b/consensus/src/aggregation/mocks/reporter.rs @@ -30,9 +30,8 @@ enum Message { impl Policy for Message { type Overflow = VecDeque; - fn handle(overflow: &mut VecDeque, message: Self) -> bool { + fn handle(overflow: &mut VecDeque, message: Self) { overflow.push_back(message); - true } } diff --git a/consensus/src/marshal/coding/shards/mailbox.rs b/consensus/src/marshal/coding/shards/mailbox.rs index 0caffc91fb6..e427d5ac3c3 100644 --- a/consensus/src/marshal/coding/shards/mailbox.rs +++ b/consensus/src/marshal/coding/shards/mailbox.rs @@ -180,13 +180,12 @@ where { type Overflow = Pending; - fn handle(overflow: &mut Self::Overflow, message: Self) -> bool { + fn handle(overflow: &mut Self::Overflow, message: Self) { if message.response_closed() { - return true; + return; } overflow.0.push_back(message); - true } } diff --git a/consensus/src/marshal/core/mailbox.rs b/consensus/src/marshal/core/mailbox.rs index 2f850450c0d..3be619d1dd8 100644 --- a/consensus/src/marshal/core/mailbox.rs +++ b/consensus/src/marshal/core/mailbox.rs @@ -477,10 +477,10 @@ impl Overflow> for Pending { impl Policy for Message { type Overflow = Pending; - fn handle(overflow: &mut Self::Overflow, message: Self) -> bool { + fn handle(overflow: &mut Self::Overflow, message: Self) { // A closed responder cannot be served if message.response_closed() { - return true; + return; } match message { // Coalesce hints: a single entry per height with a unioned target set @@ -498,14 +498,13 @@ impl Policy for Message { // Queue if the new message is still useful message => { if message.stale(overflow.height()) { - return true; + return; } overflow .messages .push_back(PendingMessage::Message(message)); } } - true } } @@ -1087,12 +1086,9 @@ mod tests { }, ); - assert!(::handle(&mut overflow, wait)); - assert!(::handle(&mut overflow, by_round)); - assert!(::handle( - &mut overflow, - by_commitment - )); + ::handle(&mut overflow, wait); + ::handle(&mut overflow, by_round); + ::handle(&mut overflow, by_commitment); let drained = drain(&mut overflow); assert_eq!(drained.len(), 3); @@ -1139,10 +1135,7 @@ mod tests { let (current_closed, current_closed_rx) = subscribe_by_digest(3); drop(current_closed_rx); - assert!(::handle( - &mut overflow, - current_closed - )); + ::handle(&mut overflow, current_closed); assert!(!has_subscription(&overflow, 1)); assert!(has_subscription(&overflow, 2)); @@ -1170,10 +1163,7 @@ mod tests { let (current_closed, current_closed_rx) = get_finalization(3); drop(current_closed_rx); - assert!(::handle( - &mut overflow, - current_closed - )); + ::handle(&mut overflow, current_closed); assert!(!has_get_block(&overflow, 1)); assert!(has_get_info(&overflow, 2)); diff --git a/consensus/src/marshal/resolver/handler.rs b/consensus/src/marshal/resolver/handler.rs index 2b66b7ea6fc..669175487ee 100644 --- a/consensus/src/marshal/resolver/handler.rs +++ b/consensus/src/marshal/resolver/handler.rs @@ -82,12 +82,11 @@ impl Overflow> for Pending { impl Policy for Message { type Overflow = Pending; - fn handle(overflow: &mut Self::Overflow, message: Self) -> bool { + fn handle(overflow: &mut Self::Overflow, message: Self) { if message.response_closed() { - return true; + return; } overflow.0.push_back(message); - true } } diff --git a/consensus/src/ordered_broadcast/mocks/reporter.rs b/consensus/src/ordered_broadcast/mocks/reporter.rs index b2300a6f7de..d46583ca92d 100644 --- a/consensus/src/ordered_broadcast/mocks/reporter.rs +++ b/consensus/src/ordered_broadcast/mocks/reporter.rs @@ -29,9 +29,8 @@ enum Message { impl Policy for Message { type Overflow = VecDeque; - fn handle(overflow: &mut VecDeque, message: Self) -> bool { + fn handle(overflow: &mut VecDeque, message: Self) { overflow.push_back(message); - true } } diff --git a/consensus/src/reporter.rs b/consensus/src/reporter.rs index df2e7f63d7c..5c0fa1c2dde 100644 --- a/consensus/src/reporter.rs +++ b/consensus/src/reporter.rs @@ -50,7 +50,6 @@ where const fn combine(a: Feedback, b: Feedback) -> Feedback { match (a, b) { (Feedback::Closed, _) | (_, Feedback::Closed) => Feedback::Closed, - (Feedback::Rejected, _) | (_, Feedback::Rejected) => Feedback::Rejected, (Feedback::Backoff, _) | (_, Feedback::Backoff) => Feedback::Backoff, (Feedback::Ok, Feedback::Ok) => Feedback::Ok, } @@ -146,14 +145,6 @@ mod tests { Feedback::Closed ); assert_eq!(combine(Feedback::Backoff, Feedback::Ok), Feedback::Backoff); - assert_eq!( - combine(Feedback::Rejected, Feedback::Ok), - Feedback::Rejected - ); - assert_eq!( - combine(Feedback::Rejected, Feedback::Backoff), - Feedback::Rejected - ); assert_eq!(combine(Feedback::Ok, Feedback::Ok), Feedback::Ok); } } diff --git a/consensus/src/simplex/actors/batcher/ingress.rs b/consensus/src/simplex/actors/batcher/ingress.rs index f0689b72ff5..f95688711ac 100644 --- a/consensus/src/simplex/actors/batcher/ingress.rs +++ b/consensus/src/simplex/actors/batcher/ingress.rs @@ -90,7 +90,7 @@ impl Overflow> for Pending { impl Policy for Message { type Overflow = Pending; - fn handle(overflow: &mut Self::Overflow, message: Self) -> bool { + fn handle(overflow: &mut Self::Overflow, message: Self) { match message { update @ Self::Update { current: new_current, @@ -107,7 +107,7 @@ impl Policy for Message { let old = (*old_current, *old_finalized); let new = (new_current, new_finalized); if new <= old { - return true; + return; } } overflow.update = Some(update); @@ -124,7 +124,7 @@ impl Policy for Message { Some(Self::Update { current: old_current, finalized: old_finalized, .. }) if Self::prunes(*old_current, *old_finalized, &new_vote) ) { - return true; + return; } // Ignore the constructed vote if it is a duplicate @@ -133,12 +133,11 @@ impl Policy for Message { .iter() .any(|old_vote| Self::similar(old_vote, &new_vote)) { - return true; + return; } overflow.constructed.push_back(new_vote); } } - true } } diff --git a/consensus/src/simplex/actors/resolver/ingress.rs b/consensus/src/simplex/actors/resolver/ingress.rs index 89be27f91e5..8883433118a 100644 --- a/consensus/src/simplex/actors/resolver/ingress.rs +++ b/consensus/src/simplex/actors/resolver/ingress.rs @@ -67,7 +67,7 @@ impl Overflow> for Pending { impl Policy for MailboxMessage { type Overflow = Pending; - fn handle(overflow: &mut Self::Overflow, message: Self) -> bool { + fn handle(overflow: &mut Self::Overflow, message: Self) { // Ignore the message if there exists a queued finalization // with a view greater than or equal to the new view let new_view = message.view(); @@ -76,7 +76,7 @@ impl Policy for MailboxMessage { Some(Self::Certificate(Certificate::Finalization(old_finalized))) if old_finalized.view() >= new_view ) { - return true; + return; } // Retain only the highest-view finalization and any messages with a view greater than the new view @@ -85,7 +85,7 @@ impl Policy for MailboxMessage { .messages .retain(|old_message| old_message.view() > new_view); overflow.finalization = Some(message); - return true; + return; } // Ignore the message if it is a duplicate @@ -109,10 +109,9 @@ impl Policy for MailboxMessage { _ => false, }) { - return true; + return; } overflow.messages.push_back(message); - true } } @@ -194,12 +193,11 @@ impl Overflow for HandlerPending { impl Policy for HandlerMessage { type Overflow = HandlerPending; - fn handle(overflow: &mut Self::Overflow, message: Self) -> bool { + fn handle(overflow: &mut Self::Overflow, message: Self) { if message.response_closed() { - return true; + return; } overflow.0.push_back(message); - true } } diff --git a/consensus/src/simplex/actors/voter/ingress.rs b/consensus/src/simplex/actors/voter/ingress.rs index 9f3784e01d5..ad25c9b353a 100644 --- a/consensus/src/simplex/actors/voter/ingress.rs +++ b/consensus/src/simplex/actors/voter/ingress.rs @@ -77,7 +77,7 @@ impl Overflow> for Pending { impl Policy for Message { type Overflow = Pending; - fn handle(overflow: &mut Self::Overflow, message: Self) -> bool { + fn handle(overflow: &mut Self::Overflow, message: Self) { // Ignore the message if there exists a queued finalization // with a view greater than or equal to the new view let new_view = message.view(); @@ -86,7 +86,7 @@ impl Policy for Message { Some(Self::Verified(Certificate::Finalization(old_finalized), _)) if old_finalized.view() >= new_view ) { - return true; + return; } // Retain only the highest-view finalization and any messages with a view greater than the new view @@ -95,7 +95,7 @@ impl Policy for Message { .messages .retain(|old_message| old_message.view() > new_view); overflow.finalization = Some(message); - return true; + return; } // Ignore the message if it is a duplicate @@ -119,10 +119,9 @@ impl Policy for Message { _ => false, }) { - return true; + return; } overflow.messages.push_back(message); - true } } diff --git a/consensus/src/simplex/scheme/bls12381_threshold/mod.rs b/consensus/src/simplex/scheme/bls12381_threshold/mod.rs index 3f77ea06c87..3c03084c1e0 100644 --- a/consensus/src/simplex/scheme/bls12381_threshold/mod.rs +++ b/consensus/src/simplex/scheme/bls12381_threshold/mod.rs @@ -5,7 +5,7 @@ //! - [`standard`]: Certificates contain only a vote signature (requires half the computation to verify //! partial signatures and recover threshold signatures as [`vrf`]). //! -//! - [`vrf`]: Certificates contain a vote signature and a view signature (a seed that can be used +//! - [`vrf`]: Certificates contain a vote signature and a round signature (a seed that can be used //! as a VRF). //! //! # Non-Attributable Signatures diff --git a/consensus/src/simplex/scheme/bls12381_threshold/vrf.rs b/consensus/src/simplex/scheme/bls12381_threshold/vrf.rs index c775876cafb..898d57fb47c 100644 --- a/consensus/src/simplex/scheme/bls12381_threshold/vrf.rs +++ b/consensus/src/simplex/scheme/bls12381_threshold/vrf.rs @@ -1,6 +1,6 @@ //! BLS12-381 threshold VRF implementation of the [`Scheme`] trait for `simplex`. //! -//! Certificates contain a vote signature and a view signature (a seed that can be used +//! Certificates contain a vote signature and a round signature (a seed that can be used //! as a VRF). //! //! # Using the VRF @@ -314,7 +314,7 @@ where pub struct Signature { /// Signature over the consensus vote message (partial or recovered aggregate). pub vote_signature: V::Signature, - /// Signature over the per-view seed (partial or recovered aggregate). + /// Signature over the per-round seed (partial or recovered aggregate). pub seed_signature: V::Signature, } @@ -833,18 +833,18 @@ impl certificate::Scheme for Scheme { let vote_message = context.message(); entries.push((vote_namespace, vote_message, cert.vote_signature)); - // Seed signatures are per-view, so multiple certificates for the same view + // Seed signatures are per-round, so multiple certificates for the same round // (e.g., notarization and finalization) share the same seed. We only include // each unique seed once in the aggregate, but verify all certificates for a - // view have matching seeds. - if let Some(previous) = seeds.get(&context.view()) { + // round have matching seeds. + let seed_message = seed_message_from_subject(&context); + if let Some(previous) = seeds.get(&seed_message) { if *previous != cert.seed_signature { return false; } } else { - let seed_message = seed_message_from_subject(&context); - entries.push((&namespace.seed, seed_message, cert.seed_signature)); - seeds.insert(context.view(), cert.seed_signature); + entries.push((&namespace.seed, seed_message.clone(), cert.seed_signature)); + seeds.insert(seed_message, cert.seed_signature); } } @@ -2027,6 +2027,154 @@ mod tests { verify_certificates_rejects_malleability::(); } + fn assemble_notarization_certificate( + schemes: &[Scheme], + proposal: &Proposal, + ) -> Certificate { + let quorum = N3f1::quorum(schemes.len()) as usize; + let votes: Vec<_> = schemes + .iter() + .take(quorum) + .map(|scheme| scheme.sign(Subject::Notarize { proposal }).unwrap()) + .collect(); + + schemes[0] + .assemble::<_, N3f1>(votes, &Sequential) + .expect("assemble notarization certificate") + } + + fn assemble_finalization_certificate( + schemes: &[Scheme], + proposal: &Proposal, + ) -> Certificate { + let quorum = N3f1::quorum(schemes.len()) as usize; + let votes: Vec<_> = schemes + .iter() + .skip(schemes.len() - quorum) + .map(|scheme| scheme.sign(Subject::Finalize { proposal }).unwrap()) + .collect(); + + schemes[0] + .assemble::<_, N3f1>(votes, &Sequential) + .expect("assemble finalization certificate") + } + + fn verify_certificates_accepts_shared_round_seed() { + let mut rng = test_rng(); + let (schemes, verifier) = setup_signers::(4, 81); + let proposal = sample_proposal(Epoch::new(1), View::new(35), 19); + let notarization_certificate = assemble_notarization_certificate(&schemes, &proposal); + let finalization_certificate = assemble_finalization_certificate(&schemes, &proposal); + + assert!(verifier.verify_certificates::<_, Sha256Digest, _, N3f1>( + &mut rng, + [ + ( + Subject::Notarize { + proposal: &proposal, + }, + ¬arization_certificate, + ), + ( + Subject::Finalize { + proposal: &proposal, + }, + &finalization_certificate, + ), + ] + .into_iter(), + &Sequential, + )); + } + + #[test] + fn test_verify_certificates_accepts_shared_round_seed() { + verify_certificates_accepts_shared_round_seed::(); + verify_certificates_accepts_shared_round_seed::(); + } + + fn verify_certificates_rejects_cross_epoch_seed_replay() { + let mut rng = test_rng(); + let (schemes, verifier) = setup_signers::(4, 83); + let view = View::new(35); + let proposal1 = sample_proposal(Epoch::new(1), view, 19); + let proposal2 = sample_proposal(Epoch::new(2), view, 20); + let certificate1 = assemble_notarization_certificate(&schemes, &proposal1); + let certificate2 = assemble_notarization_certificate(&schemes, &proposal2); + + assert!(verifier.verify_certificates::<_, Sha256Digest, _, N3f1>( + &mut rng, + [ + ( + Subject::Notarize { + proposal: &proposal1, + }, + &certificate1, + ), + ( + Subject::Notarize { + proposal: &proposal2, + }, + &certificate2, + ), + ] + .into_iter(), + &Sequential, + )); + + let cert1 = certificate1.get().unwrap(); + let cert2 = certificate2.get().unwrap(); + let forged_certificate2: Certificate = Signature { + vote_signature: cert2.vote_signature, + seed_signature: cert1.seed_signature, + } + .into(); + + assert!(!verifier.verify_certificate::<_, Sha256Digest, N3f1>( + &mut rng, + Subject::Notarize { + proposal: &proposal2, + }, + &forged_certificate2, + &Sequential, + )); + + let batch = [ + ( + Subject::Notarize { + proposal: &proposal1, + }, + &certificate1, + ), + ( + Subject::Notarize { + proposal: &proposal2, + }, + &forged_certificate2, + ), + ]; + + assert!(!verifier.verify_certificates::<_, Sha256Digest, _, N3f1>( + &mut rng, + batch.iter().copied(), + &Sequential, + )); + assert_eq!( + verifier.verify_certificates_bisect::<_, Sha256Digest, N3f1>( + &mut rng, + &batch, + &Sequential, + ), + vec![true, false], + ); + } + + #[test] + fn test_verify_certificates_rejects_cross_epoch_seed_replay() { + verify_certificates_rejects_cross_epoch_seed_replay::(); + verify_certificates_rejects_cross_epoch_seed_replay::(); + } + #[cfg(feature = "arbitrary")] mod conformance { use super::*; diff --git a/examples/bridge/src/application/ingress.rs b/examples/bridge/src/application/ingress.rs index a711ff69044..49aa111bd5f 100644 --- a/examples/bridge/src/application/ingress.rs +++ b/examples/bridge/src/application/ingress.rs @@ -33,9 +33,8 @@ pub enum Message { impl Policy for Message { type Overflow = VecDeque; - fn handle(overflow: &mut VecDeque, message: Self) -> bool { + fn handle(overflow: &mut VecDeque, message: Self) { overflow.push_back(message); - true } } diff --git a/examples/log/src/application/ingress.rs b/examples/log/src/application/ingress.rs index 93a9cd32465..4cb3cb2e396 100644 --- a/examples/log/src/application/ingress.rs +++ b/examples/log/src/application/ingress.rs @@ -18,9 +18,8 @@ pub enum Message { impl Policy for Message { type Overflow = VecDeque; - fn handle(overflow: &mut VecDeque, message: Self) -> bool { + fn handle(overflow: &mut VecDeque, message: Self) { overflow.push_back(message); - true } } diff --git a/examples/reshare/src/dkg/ingress.rs b/examples/reshare/src/dkg/ingress.rs index 516898fb1e6..203bbe62644 100644 --- a/examples/reshare/src/dkg/ingress.rs +++ b/examples/reshare/src/dkg/ingress.rs @@ -47,9 +47,8 @@ where { type Overflow = VecDeque; - fn handle(overflow: &mut VecDeque, message: Self) -> bool { + fn handle(overflow: &mut VecDeque, message: Self) { overflow.push_back(message); - true } } diff --git a/examples/reshare/src/orchestrator/ingress.rs b/examples/reshare/src/orchestrator/ingress.rs index 3b2eeacff18..3f675469465 100644 --- a/examples/reshare/src/orchestrator/ingress.rs +++ b/examples/reshare/src/orchestrator/ingress.rs @@ -19,7 +19,7 @@ pub enum Message { impl Policy for Message { type Overflow = VecDeque; - fn handle(overflow: &mut VecDeque, message: Self) -> bool { + fn handle(overflow: &mut VecDeque, message: Self) { match message { Self::Enter(transition) => { let epoch = transition.epoch; @@ -42,7 +42,6 @@ impl Policy for Message { } } } - true } } diff --git a/p2p/src/authenticated/discovery/actors/peer/actor.rs b/p2p/src/authenticated/discovery/actors/peer/actor.rs index d7171df4add..594d553ff1f 100644 --- a/p2p/src/authenticated/discovery/actors/peer/actor.rs +++ b/p2p/src/authenticated/discovery/actors/peer/actor.rs @@ -34,9 +34,9 @@ pub struct Actor { max_peers: usize, mailbox: Mailbox, - control: mailbox::Receiver>, - high: mailbox::Receiver>, - low: mailbox::Receiver>, + control: mailbox::UnreliableReceiver>, + high: mailbox::UnreliableReceiver>, + low: mailbox::UnreliableReceiver>, sent_messages: CounterFamily>, received_messages: CounterFamily>, @@ -124,10 +124,10 @@ impl peer: &C, batch_size: usize, batch: &mut Vec, - control: &mut mailbox::Receiver>, + control: &mut mailbox::UnreliableReceiver>, pool: &commonware_runtime::BufferPool, - high: &mut mailbox::Receiver>, - low: &mut mailbox::Receiver>, + high: &mut mailbox::UnreliableReceiver>, + low: &mut mailbox::UnreliableReceiver>, rate_limits: &HashMap, sent_messages: &CounterFamily>, ) -> Result<(), Error> { @@ -459,7 +459,7 @@ mod tests { } fn create_channels(context: impl BufferPooler + Metrics) -> Channels { - let (router_sender, _router_receiver) = commonware_actor::mailbox::new::< + let (router_sender, _router_receiver) = commonware_actor::mailbox::new_unreliable::< router::Message, >( context.child("router_mailbox"), NZUsize!(10) diff --git a/p2p/src/authenticated/discovery/actors/peer/ingress.rs b/p2p/src/authenticated/discovery/actors/peer/ingress.rs index 2a9438b3786..2f44163810d 100644 --- a/p2p/src/authenticated/discovery/actors/peer/ingress.rs +++ b/p2p/src/authenticated/discovery/actors/peer/ingress.rs @@ -1,5 +1,5 @@ use crate::authenticated::discovery::types; -use commonware_actor::mailbox::{self, Policy}; +use commonware_actor::mailbox::{self, UnreliablePolicy}; use commonware_cryptography::PublicKey; use commonware_runtime::Metrics; use std::{collections::VecDeque, fmt, num::NonZeroUsize}; @@ -17,7 +17,7 @@ pub enum Message { Kill, } -impl Policy for Message { +impl UnreliablePolicy for Message { type Overflow = VecDeque; fn handle(overflow: &mut Self::Overflow, message: Self) -> bool { @@ -31,11 +31,14 @@ impl Policy for Message { } } -pub struct Mailbox(mailbox::Sender>); +pub struct Mailbox(mailbox::UnreliableSender>); impl Mailbox { - pub fn new(metrics: impl Metrics, size: NonZeroUsize) -> (Self, mailbox::Receiver>) { - let (sender, receiver) = mailbox::new(metrics, size); + pub fn new( + metrics: impl Metrics, + size: NonZeroUsize, + ) -> (Self, mailbox::UnreliableReceiver>) { + let (sender, receiver) = mailbox::new_unreliable(metrics, size); (Self(sender), receiver) } diff --git a/p2p/src/authenticated/discovery/actors/router/actor.rs b/p2p/src/authenticated/discovery/actors/router/actor.rs index eb031c87dbb..b31f468c0e8 100644 --- a/p2p/src/authenticated/discovery/actors/router/actor.rs +++ b/p2p/src/authenticated/discovery/actors/router/actor.rs @@ -19,7 +19,7 @@ use tracing::debug; pub struct Actor { context: ContextCell, - control: mailbox::Receiver>, + control: mailbox::UnreliableReceiver>, connections: BTreeMap>, open_subscriptions: Vec>>, } @@ -30,7 +30,7 @@ impl Actor { pub fn new(context: E, cfg: Config) -> (Self, Mailbox

, Messenger

) { // Create mailbox let (control_sender, control_receiver) = - mailbox::new::>(context.child("mailbox"), cfg.mailbox_size); + mailbox::new_unreliable::>(context.child("mailbox"), cfg.mailbox_size); let pool = context.network_buffer_pool().clone(); // Create actor diff --git a/p2p/src/authenticated/discovery/actors/router/ingress.rs b/p2p/src/authenticated/discovery/actors/router/ingress.rs index d98a6d82a7c..ba218a7af7b 100644 --- a/p2p/src/authenticated/discovery/actors/router/ingress.rs +++ b/p2p/src/authenticated/discovery/actors/router/ingress.rs @@ -8,8 +8,8 @@ use crate::{ Channel, Recipients, }; use commonware_actor::{ - mailbox::{self, Policy}, - Feedback, + mailbox::{self, UnreliablePolicy}, + Feedback, Unreliable, }; use commonware_cryptography::PublicKey; use commonware_runtime::{BufferPool, IoBufs}; @@ -39,7 +39,7 @@ pub enum Message { SubscribePeers { sender: ring::Sender> }, } -impl Policy for Message

{ +impl UnreliablePolicy for Message

{ type Overflow = VecDeque; fn handle(overflow: &mut Self::Overflow, message: Self) -> bool { @@ -54,7 +54,7 @@ impl Policy for Message

{ } /// Mailbox for the router actor. -pub struct Mailbox(mailbox::Sender>); +pub struct Mailbox(mailbox::UnreliableSender>); impl Clone for Mailbox

{ fn clone(&self) -> Self { @@ -70,7 +70,7 @@ impl fmt::Debug for Mailbox

{ impl Mailbox

{ /// Returns a router mailbox around the provided sender. - pub const fn new(sender: mailbox::Sender>) -> Self { + pub const fn new(sender: mailbox::UnreliableSender>) -> Self { Self(sender) } @@ -92,7 +92,10 @@ impl Mailbox

{ /// This may fail during shutdown if the router has already exited, /// which is harmless since the router no longer tracks any peers. pub fn release(&self, peer: P) -> Feedback { - self.0.enqueue(Message::Release { peer }) + match self.0.enqueue(Message::Release { peer }) { + Unreliable::Outcome(feedback) => feedback, + Unreliable::Rejected => unreachable!("router release cannot be rejected"), + } } } @@ -120,7 +123,7 @@ impl Messenger

{ channel: Channel, message: IoBufs, priority: bool, - ) -> Feedback { + ) -> Unreliable { // Build Data and encode Payload::Data once for all recipients let encoded = types::Payload::

::encode_data(&self.pool, channel, message); @@ -157,8 +160,10 @@ mod tests { fn test_overflow_rejects_content_but_retains_control() { let executor = deterministic::Runner::default(); executor.start(|context| async move { - let (control_sender, mut receiver) = - mailbox::new::>(context.child("control_mailbox"), NZUsize!(1)); + let (control_sender, mut receiver) = mailbox::new_unreliable::>( + context.child("control_mailbox"), + NZUsize!(1), + ); let mailbox = Mailbox::new(control_sender.clone()); let messenger = Messenger::new( context.network_buffer_pool().clone(), @@ -173,11 +178,11 @@ mod tests { IoBuf::from(b"one").into(), false ), - Feedback::Ok + Unreliable::new(Feedback::Ok) ); assert_eq!( messenger.content(Recipients::One(peer), 7, IoBuf::from(b"two").into(), false), - Feedback::Rejected + Unreliable::Rejected ); assert_eq!( mailbox.release(PrivateKey::from_seed(2).public_key()), diff --git a/p2p/src/authenticated/discovery/actors/spawner/actor.rs b/p2p/src/authenticated/discovery/actors/spawner/actor.rs index deb56732fdd..6c7f3772ae0 100644 --- a/p2p/src/authenticated/discovery/actors/spawner/actor.rs +++ b/p2p/src/authenticated/discovery/actors/spawner/actor.rs @@ -37,7 +37,7 @@ pub struct Actor< peer_gossip_max_count: usize, info_verifier: InfoVerifier, - receiver: mailbox::Receiver>, + receiver: mailbox::UnreliableReceiver>, sent_messages: CounterFamily>, received_messages: CounterFamily>, diff --git a/p2p/src/authenticated/discovery/actors/spawner/ingress.rs b/p2p/src/authenticated/discovery/actors/spawner/ingress.rs index 2b656f8a0e3..344121dad92 100644 --- a/p2p/src/authenticated/discovery/actors/spawner/ingress.rs +++ b/p2p/src/authenticated/discovery/actors/spawner/ingress.rs @@ -1,5 +1,5 @@ use crate::authenticated::{discovery::actors::tracker::Reservation, Mailbox}; -use commonware_actor::{mailbox::Policy, Feedback}; +use commonware_actor::{mailbox::UnreliablePolicy, Feedback, Unreliable}; use commonware_cryptography::PublicKey; use commonware_runtime::{Sink, Stream}; use commonware_stream::encrypted::{Receiver, Sender}; @@ -18,7 +18,7 @@ pub enum Message { }, } -impl Policy for Message { +impl UnreliablePolicy for Message { type Overflow = VecDeque; fn handle(_overflow: &mut Self::Overflow, _message: Self) -> bool { @@ -32,13 +32,13 @@ impl Policy for Message { impl Mailbox> { /// Send a message to the actor to spawn a new task for the given peer. /// - /// This may fail during shutdown if the spawner has already exited, - /// which is harmless since no new connections need to be spawned. + /// This may be rejected when the spawner is backlogged, or return closed after shutdown, which + /// is harmless since stale connections do not need to be spawned. pub fn spawn( &mut self, connection: (Sender, Receiver), reservation: Reservation

, - ) -> Feedback { + ) -> Unreliable { self.0.enqueue(Message::Spawn { peer: reservation.metadata().public_key().clone(), connection, @@ -154,10 +154,13 @@ mod tests { Reservation::new(Metadata::Listener(peer_1.clone()), releaser.clone()); let reservation_2 = Reservation::new(Metadata::Listener(peer_2.clone()), releaser); - assert_eq!(spawner.spawn(connection_1, reservation_1), Feedback::Ok); + assert_eq!( + spawner.spawn(connection_1, reservation_1), + Unreliable::new(Feedback::Ok) + ); assert_eq!( spawner.spawn(connection_2, reservation_2), - Feedback::Rejected + Unreliable::Rejected ); let release = tracker_receiver diff --git a/p2p/src/authenticated/discovery/actors/tracker/ingress.rs b/p2p/src/authenticated/discovery/actors/tracker/ingress.rs index 4267c28e0ed..ed34fa24da3 100644 --- a/p2p/src/authenticated/discovery/actors/tracker/ingress.rs +++ b/p2p/src/authenticated/discovery/actors/tracker/ingress.rs @@ -143,9 +143,8 @@ pub enum Message { impl Policy for Message { type Overflow = VecDeque; - fn handle(overflow: &mut Self::Overflow, message: Self) -> bool { + fn handle(overflow: &mut Self::Overflow, message: Self) { overflow.push_back(message); - true } } diff --git a/p2p/src/authenticated/discovery/channels.rs b/p2p/src/authenticated/discovery/channels.rs index 851a8563949..e50b49a5949 100644 --- a/p2p/src/authenticated/discovery/channels.rs +++ b/p2p/src/authenticated/discovery/channels.rs @@ -4,8 +4,8 @@ use crate::{ Channel, Message as NetworkMessage, Recipients, }; use commonware_actor::{ - mailbox::{self, Policy}, - Feedback, + mailbox::{self, UnreliablePolicy}, + Feedback, Unreliable, }; use commonware_cryptography::PublicKey; use commonware_runtime::{Clock, IoBufs, Metrics, Quota}; @@ -18,7 +18,7 @@ use std::{ pub(crate) struct Inbound(pub(crate) NetworkMessage

); -impl Policy for Inbound

{ +impl UnreliablePolicy for Inbound

{ type Overflow = VecDeque; fn handle(_overflow: &mut Self::Overflow, _message: Self) -> bool { @@ -44,7 +44,7 @@ impl crate::UnlimitedSender for UnlimitedSender

{ recipients: Recipients, message: impl Into + Send, priority: bool, - ) -> Feedback { + ) -> Unreliable { let message = message.into(); assert!( message.len() <= self.max_size as usize, @@ -110,7 +110,7 @@ where /// Channel to asynchronously receive messages from a channel. pub struct Receiver { - receiver: mailbox::Receiver>, + receiver: mailbox::UnreliableReceiver>, } impl Debug for Receiver

{ @@ -120,7 +120,7 @@ impl Debug for Receiver

{ } impl Receiver

{ - pub(super) const fn new(receiver: mailbox::Receiver>) -> Self { + pub(super) const fn new(receiver: mailbox::UnreliableReceiver>) -> Self { Self { receiver } } } @@ -146,7 +146,7 @@ impl crate::Receiver for Receiver

{ pub struct Channels { messenger: Messenger

, max_size: u32, - receivers: BTreeMap>)>, + receivers: BTreeMap>)>, } impl Channels

{ @@ -166,7 +166,7 @@ impl Channels

{ context: C, ) -> (Sender, Receiver

) { let backlog = NonZeroUsize::new(backlog).expect("message backlog must be non-zero"); - let (sender, receiver) = mailbox::new(context.child("mailbox"), backlog); + let (sender, receiver) = mailbox::new_unreliable(context.child("mailbox"), backlog); if self.receivers.insert(channel, (rate, sender)).is_some() { panic!("duplicate channel registration: {channel}"); } @@ -182,7 +182,7 @@ impl Channels

{ ) } - pub fn collect(self) -> BTreeMap>)> { + pub fn collect(self) -> BTreeMap>)> { self.receivers } } diff --git a/p2p/src/authenticated/discovery/mod.rs b/p2p/src/authenticated/discovery/mod.rs index e78a40461a5..e9bc6cf332e 100644 --- a/p2p/src/authenticated/discovery/mod.rs +++ b/p2p/src/authenticated/discovery/mod.rs @@ -267,7 +267,7 @@ mod tests { CheckedSender as _, Ingress, LimitedSender as _, Manager, Provider, Receiver, Recipients, Sender, }; - use commonware_actor::Feedback; + use commonware_actor::{Feedback, Unreliable}; use commonware_cryptography::{ed25519, Signer as _}; use commonware_macros::{select, select_loop, test_group, test_traced}; use commonware_runtime::{ @@ -2415,7 +2415,11 @@ mod tests { // broadcast. for i in 0..11 { let sent = messenger.content(Recipients::All, 0, message.clone().into(), false); - assert_ne!(sent, Feedback::Closed, "Broadcast {i} should be accepted"); + assert_ne!( + sent, + Unreliable::new(Feedback::Closed), + "Broadcast {i} should be accepted" + ); assert!(fast_receivers.low.recv().await.is_some()); } diff --git a/p2p/src/authenticated/lookup/actors/peer/actor.rs b/p2p/src/authenticated/lookup/actors/peer/actor.rs index bf2af0cb272..3bf05dda3df 100644 --- a/p2p/src/authenticated/lookup/actors/peer/actor.rs +++ b/p2p/src/authenticated/lookup/actors/peer/actor.rs @@ -29,8 +29,8 @@ pub struct Actor { send_batch_size: usize, control: ring::Receiver, - high: mailbox::Receiver>, - low: mailbox::Receiver>, + high: mailbox::UnreliableReceiver>, + low: mailbox::UnreliableReceiver>, sent_messages: CounterFamily>, received_messages: CounterFamily>, @@ -99,8 +99,8 @@ impl batch_size: usize, batch: &mut Vec, control: &mut ring::Receiver, - high: &mut mailbox::Receiver>, - low: &mut mailbox::Receiver>, + high: &mut mailbox::UnreliableReceiver>, + low: &mut mailbox::UnreliableReceiver>, rate_limits: &HashMap, sent_messages: &CounterFamily>, ) -> Result<(), Error> { @@ -127,8 +127,8 @@ impl async fn recv_prioritized( control: &mut ring::Receiver, - high: &mut mailbox::Receiver>, - low: &mut mailbox::Receiver>, + high: &mut mailbox::UnreliableReceiver>, + low: &mut mailbox::UnreliableReceiver>, ) -> Prioritized { select! { msg = control.next() => msg.map_or(Prioritized::Closed, Prioritized::Control), @@ -405,7 +405,7 @@ mod tests { } fn create_channels(context: impl BufferPooler + Metrics) -> Channels { - let (router_sender, _router_receiver) = commonware_actor::mailbox::new::< + let (router_sender, _router_receiver) = commonware_actor::mailbox::new_unreliable::< router::Message, >( context.child("router_mailbox"), NZUsize!(10) diff --git a/p2p/src/authenticated/lookup/actors/router/actor.rs b/p2p/src/authenticated/lookup/actors/router/actor.rs index 108bd26913c..f050b3d40b7 100644 --- a/p2p/src/authenticated/lookup/actors/router/actor.rs +++ b/p2p/src/authenticated/lookup/actors/router/actor.rs @@ -19,7 +19,7 @@ use tracing::debug; pub struct Actor { context: ContextCell, - control: mailbox::Receiver>, + control: mailbox::UnreliableReceiver>, connections: BTreeMap>, open_subscriptions: Vec>>, } @@ -30,7 +30,7 @@ impl Actor { pub fn new(context: E, cfg: Config) -> (Self, Mailbox

, Messenger

) { // Create mailbox let (control_sender, control_receiver) = - mailbox::new::>(context.child("mailbox"), cfg.mailbox_size); + mailbox::new_unreliable::>(context.child("mailbox"), cfg.mailbox_size); let pool = context.network_buffer_pool().clone(); // Create actor diff --git a/p2p/src/authenticated/lookup/actors/router/ingress.rs b/p2p/src/authenticated/lookup/actors/router/ingress.rs index 5b8ef368a15..74674119d25 100644 --- a/p2p/src/authenticated/lookup/actors/router/ingress.rs +++ b/p2p/src/authenticated/lookup/actors/router/ingress.rs @@ -8,8 +8,8 @@ use crate::{ Channel, Recipients, }; use commonware_actor::{ - mailbox::{self, Policy}, - Feedback, + mailbox::{self, UnreliablePolicy}, + Feedback, Unreliable, }; use commonware_cryptography::PublicKey; use commonware_runtime::{BufferPool, IoBufs}; @@ -39,7 +39,7 @@ pub enum Message { SubscribePeers { sender: ring::Sender> }, } -impl Policy for Message

{ +impl UnreliablePolicy for Message

{ type Overflow = VecDeque; fn handle(overflow: &mut Self::Overflow, message: Self) -> bool { @@ -54,7 +54,7 @@ impl Policy for Message

{ } /// Mailbox for the router actor. -pub struct Mailbox(mailbox::Sender>); +pub struct Mailbox(mailbox::UnreliableSender>); impl Clone for Mailbox

{ fn clone(&self) -> Self { @@ -70,7 +70,7 @@ impl fmt::Debug for Mailbox

{ impl Mailbox

{ /// Returns a router mailbox around the provided sender. - pub const fn new(sender: mailbox::Sender>) -> Self { + pub const fn new(sender: mailbox::UnreliableSender>) -> Self { Self(sender) } @@ -92,7 +92,10 @@ impl Mailbox

{ /// This may fail during shutdown if the router has already exited, /// which is harmless since the router no longer tracks any peers. pub fn release(&self, peer: P) -> Feedback { - self.0.enqueue(Message::Release { peer }) + match self.0.enqueue(Message::Release { peer }) { + Unreliable::Outcome(feedback) => feedback, + Unreliable::Rejected => unreachable!("router release cannot be rejected"), + } } } @@ -120,7 +123,7 @@ impl Messenger

{ channel: Channel, message: IoBufs, priority: bool, - ) -> Feedback { + ) -> Unreliable { // Build Data and encode Message::Data once for all recipients let encoded = types::Message::encode_data(&self.pool, channel, message); diff --git a/p2p/src/authenticated/lookup/actors/spawner/actor.rs b/p2p/src/authenticated/lookup/actors/spawner/actor.rs index bba49fe66dd..72fbf331029 100644 --- a/p2p/src/authenticated/lookup/actors/spawner/actor.rs +++ b/p2p/src/authenticated/lookup/actors/spawner/actor.rs @@ -30,7 +30,7 @@ pub struct Actor< send_batch_size: NonZeroUsize, ping_frequency: std::time::Duration, - receiver: mailbox::Receiver>, + receiver: mailbox::UnreliableReceiver>, sent_messages: CounterFamily>, received_messages: CounterFamily>, diff --git a/p2p/src/authenticated/lookup/actors/spawner/ingress.rs b/p2p/src/authenticated/lookup/actors/spawner/ingress.rs index 6c680467aa9..e17b72b29f0 100644 --- a/p2p/src/authenticated/lookup/actors/spawner/ingress.rs +++ b/p2p/src/authenticated/lookup/actors/spawner/ingress.rs @@ -1,5 +1,5 @@ use crate::authenticated::{lookup::actors::tracker::Reservation, Mailbox}; -use commonware_actor::{mailbox::Policy, Feedback}; +use commonware_actor::{mailbox::UnreliablePolicy, Feedback, Unreliable}; use commonware_cryptography::PublicKey; use commonware_runtime::{Sink, Stream}; use commonware_stream::encrypted::{Receiver, Sender}; @@ -18,7 +18,7 @@ pub enum Message { }, } -impl Policy for Message { +impl UnreliablePolicy for Message { type Overflow = VecDeque; fn handle(_overflow: &mut Self::Overflow, _message: Self) -> bool { @@ -32,13 +32,13 @@ impl Policy for Message { impl Mailbox> { /// Send a message to the actor to spawn a new task for the given peer. /// - /// This may fail during shutdown if the spawner has already exited, - /// which is harmless since no new connections need to be spawned. + /// This may be rejected when the spawner is backlogged, or return closed after shutdown, which + /// is harmless since stale connections do not need to be spawned. pub fn spawn( &mut self, connection: (Sender, Receiver), reservation: Reservation

, - ) -> Feedback { + ) -> Unreliable { self.0.enqueue(Message::Spawn { peer: reservation.metadata().public_key().clone(), connection, @@ -154,10 +154,13 @@ mod tests { Reservation::new(Metadata::Listener(peer_1.clone()), releaser.clone()); let reservation_2 = Reservation::new(Metadata::Listener(peer_2.clone()), releaser); - assert_eq!(spawner.spawn(connection_1, reservation_1), Feedback::Ok); + assert_eq!( + spawner.spawn(connection_1, reservation_1), + Unreliable::new(Feedback::Ok) + ); assert_eq!( spawner.spawn(connection_2, reservation_2), - Feedback::Rejected + Unreliable::Rejected ); let release = tracker_receiver diff --git a/p2p/src/authenticated/lookup/actors/tracker/ingress.rs b/p2p/src/authenticated/lookup/actors/tracker/ingress.rs index d1307d6bdfa..16b29b66972 100644 --- a/p2p/src/authenticated/lookup/actors/tracker/ingress.rs +++ b/p2p/src/authenticated/lookup/actors/tracker/ingress.rs @@ -117,9 +117,8 @@ pub enum Message { impl Policy for Message { type Overflow = VecDeque; - fn handle(overflow: &mut Self::Overflow, message: Self) -> bool { + fn handle(overflow: &mut Self::Overflow, message: Self) { overflow.push_back(message); - true } } diff --git a/p2p/src/authenticated/lookup/channels.rs b/p2p/src/authenticated/lookup/channels.rs index 26ce1f80471..a893821153f 100644 --- a/p2p/src/authenticated/lookup/channels.rs +++ b/p2p/src/authenticated/lookup/channels.rs @@ -5,8 +5,8 @@ use crate::{ Channel, Message as NetworkMessage, Recipients, }; use commonware_actor::{ - mailbox::{self, Policy}, - Feedback, + mailbox::{self, UnreliablePolicy}, + Feedback, Unreliable, }; use commonware_cryptography::PublicKey; use commonware_runtime::{Clock, IoBufs, Metrics, Quota}; @@ -19,7 +19,7 @@ use std::{ pub(crate) struct Inbound(pub(crate) NetworkMessage

); -impl Policy for Inbound

{ +impl UnreliablePolicy for Inbound

{ type Overflow = VecDeque; fn handle(_overflow: &mut Self::Overflow, _message: Self) -> bool { @@ -45,7 +45,7 @@ impl crate::UnlimitedSender for UnlimitedSender

{ recipients: Recipients, message: impl Into + Send, priority: bool, - ) -> Feedback { + ) -> Unreliable { let message = message.into(); assert!( message.len() <= self.max_size as usize, @@ -111,7 +111,7 @@ where /// Channel to asynchronously receive messages from a channel. pub struct Receiver { - receiver: mailbox::Receiver>, + receiver: mailbox::UnreliableReceiver>, } impl Debug for Receiver

{ @@ -121,7 +121,7 @@ impl Debug for Receiver

{ } impl Receiver

{ - pub(super) const fn new(receiver: mailbox::Receiver>) -> Self { + pub(super) const fn new(receiver: mailbox::UnreliableReceiver>) -> Self { Self { receiver } } } @@ -147,7 +147,7 @@ impl crate::Receiver for Receiver

{ pub struct Channels { messenger: router::Messenger

, max_size: u32, - receivers: BTreeMap>)>, + receivers: BTreeMap>)>, } impl Channels

{ @@ -167,7 +167,7 @@ impl Channels

{ context: C, ) -> (Sender, Receiver

) { let backlog = NonZeroUsize::new(backlog).expect("message backlog must be non-zero"); - let (sender, receiver) = mailbox::new(context.child("mailbox"), backlog); + let (sender, receiver) = mailbox::new_unreliable(context.child("mailbox"), backlog); if self.receivers.insert(channel, (rate, sender)).is_some() { panic!("duplicate channel registration: {channel}"); } @@ -183,7 +183,7 @@ impl Channels

{ ) } - pub fn collect(self) -> BTreeMap>)> { + pub fn collect(self) -> BTreeMap>)> { self.receivers } } diff --git a/p2p/src/authenticated/lookup/mod.rs b/p2p/src/authenticated/lookup/mod.rs index ac9956271b6..158ac1aa04c 100644 --- a/p2p/src/authenticated/lookup/mod.rs +++ b/p2p/src/authenticated/lookup/mod.rs @@ -219,7 +219,7 @@ mod tests { Address, AddressableManager, CheckedSender as _, Ingress, LimitedSender as _, Provider, Receiver, Recipients, Sender, }; - use commonware_actor::Feedback; + use commonware_actor::{Feedback, Unreliable}; use commonware_cryptography::{ed25519, Signer as _}; use commonware_macros::{select, test_group, test_traced}; use commonware_runtime::{ @@ -2145,7 +2145,11 @@ mod tests { let message = IoBuf::from(vec![0u8; 100]); for i in 0..11 { let sent = messenger.content(Recipients::All, 0, message.clone().into(), false); - assert_ne!(sent, Feedback::Closed, "Broadcast {i} should be accepted"); + assert_ne!( + sent, + Unreliable::new(Feedback::Closed), + "Broadcast {i} should be accepted" + ); assert!(fast_receivers.low.recv().await.is_some()); } diff --git a/p2p/src/authenticated/mailbox.rs b/p2p/src/authenticated/mailbox.rs index c808102b105..b45daf51fd5 100644 --- a/p2p/src/authenticated/mailbox.rs +++ b/p2p/src/authenticated/mailbox.rs @@ -4,17 +4,20 @@ use std::num::NonZeroUsize; /// A mailbox wraps a sender for messages of type `T`. #[derive(Debug)] -pub struct Mailbox(pub(crate) mailbox::Sender); +pub struct Mailbox(pub(crate) mailbox::UnreliableSender); -impl Mailbox { +impl Mailbox { /// Returns a new mailbox with the given sender. - pub fn new(metrics: impl Metrics, size: NonZeroUsize) -> (Self, mailbox::Receiver) { - let (sender, receiver) = mailbox::new(metrics, size); + pub fn new( + metrics: impl Metrics, + size: NonZeroUsize, + ) -> (Self, mailbox::UnreliableReceiver) { + let (sender, receiver) = mailbox::new_unreliable(metrics, size); (Self(sender), receiver) } } -impl Clone for Mailbox { +impl Clone for Mailbox { fn clone(&self) -> Self { Self(self.0.clone()) } diff --git a/p2p/src/authenticated/relay.rs b/p2p/src/authenticated/relay.rs index c2e5d33697c..ae24ffdc562 100644 --- a/p2p/src/authenticated/relay.rs +++ b/p2p/src/authenticated/relay.rs @@ -1,6 +1,6 @@ use commonware_actor::{ - mailbox::{self, Policy}, - Feedback, + mailbox::{self, UnreliablePolicy}, + Feedback, Unreliable, }; use commonware_macros::select; use commonware_runtime::Metrics; @@ -14,7 +14,7 @@ impl Message { } } -impl Policy for Message { +impl UnreliablePolicy for Message { type Overflow = VecDeque; fn handle(_overflow: &mut Self::Overflow, _message: Self) -> bool { @@ -23,21 +23,21 @@ impl Policy for Message { } pub(crate) struct Receivers { - pub(crate) low: mailbox::Receiver>, - pub(crate) high: mailbox::Receiver>, + pub(crate) low: mailbox::UnreliableReceiver>, + pub(crate) high: mailbox::UnreliableReceiver>, } #[derive(Clone, Debug)] pub struct Relay { - low: mailbox::Sender>, - high: mailbox::Sender>, + low: mailbox::UnreliableSender>, + high: mailbox::UnreliableSender>, } impl Relay { /// Creates a prioritized relay backed by bounded low and high priority mailboxes. pub fn new(metrics: impl Metrics, size: NonZeroUsize) -> (Self, Receivers) { - let (low_sender, low_receiver) = mailbox::new(metrics.child("low"), size); - let (high_sender, high_receiver) = mailbox::new(metrics.child("high"), size); + let (low_sender, low_receiver) = mailbox::new_unreliable(metrics.child("low"), size); + let (high_sender, high_receiver) = mailbox::new_unreliable(metrics.child("high"), size); ( Self { low: low_sender, @@ -52,9 +52,9 @@ impl Relay { /// Submits `message` to the priority channel selected by `priority`. /// - /// This never waits for capacity. [`Feedback::Rejected`] means the selected channel was full + /// This never waits for capacity. [`Unreliable::Rejected`] means the selected channel was full /// and did not handle the message, and [`Feedback::Closed`] means the receiver is gone. - pub fn send(&self, message: T, priority: bool) -> Feedback { + pub fn send(&self, message: T, priority: bool) -> Unreliable { let sender = if priority { &self.high } else { &self.low }; sender.enqueue(Message(message)) } @@ -71,10 +71,10 @@ pub enum Prioritized { } /// Awaits a message from control, high, or low priority receivers. -pub async fn recv_prioritized( - control: &mut mailbox::Receiver, - high: &mut mailbox::Receiver>, - low: &mut mailbox::Receiver>, +pub async fn recv_prioritized( + control: &mut mailbox::UnreliableReceiver, + high: &mut mailbox::UnreliableReceiver>, + low: &mut mailbox::UnreliableReceiver>, ) -> Prioritized { select! { msg = control.recv() => msg.map_or(Prioritized::Closed, Prioritized::Control), @@ -88,7 +88,7 @@ pub async fn recv_prioritized( } /// Attempts to receive one data message from a relay receiver. -pub(crate) fn try_recv(receiver: &mut mailbox::Receiver>) -> Option { +pub(crate) fn try_recv(receiver: &mut mailbox::UnreliableReceiver>) -> Option { receiver.try_recv().ok().map(Message::into_inner) } @@ -103,7 +103,7 @@ mod tests { let (relay, mut receivers) = Relay::new(Metrics, NZUsize!(1)); let data = 123; - assert_eq!(relay.send(data, true), Feedback::Ok); + assert_eq!(relay.send(data, true), Unreliable::new(Feedback::Ok)); match receivers.high.try_recv().map(Message::into_inner) { Ok(received_data) => { assert_eq!(data, received_data); @@ -113,7 +113,7 @@ mod tests { assert!(receivers.low.try_recv().is_err()); let data = 456; - assert_eq!(relay.send(data, false), Feedback::Ok); + assert_eq!(relay.send(data, false), Unreliable::new(Feedback::Ok)); match receivers.low.try_recv().map(Message::into_inner) { Ok(received_data) => { assert_eq!(data, received_data); @@ -127,8 +127,8 @@ mod tests { fn test_relay_rejects_on_overflow() { let (relay, mut receivers) = Relay::new(Metrics, NZUsize!(1)); - assert_eq!(relay.send(1, false), Feedback::Ok); - assert_eq!(relay.send(2, false), Feedback::Rejected); + assert_eq!(relay.send(1, false), Unreliable::new(Feedback::Ok)); + assert_eq!(relay.send(2, false), Unreliable::Rejected); assert_eq!(receivers.low.try_recv().map(Message::into_inner), Ok(1)); assert!(receivers.low.try_recv().is_err()); } diff --git a/p2p/src/lib.rs b/p2p/src/lib.rs index d0880c2fd0b..71569321572 100644 --- a/p2p/src/lib.rs +++ b/p2p/src/lib.rs @@ -14,7 +14,7 @@ use commonware_macros::{stability_mod, stability_scope}; stability_mod!(ALPHA, pub mod simulated); stability_scope!(BETA { - use commonware_actor::Feedback; + use commonware_actor::{Feedback, Unreliable}; use commonware_cryptography::PublicKey; use commonware_runtime::{IoBuf, IoBufs}; use commonware_utils::{ @@ -62,13 +62,14 @@ stability_scope!(BETA { /// # Returns /// /// Feedback from submitting the message for delivery. + /// [`Unreliable`] indicates that local submission may be rejected under backpressure. /// [`Feedback::accepted`] does not guarantee that the recipient will receive the message. fn send( &mut self, recipients: Recipients, message: impl Into + Send, priority: bool, - ) -> Feedback; + ) -> Unreliable; } /// Interface for constructing a [`CheckedSender`] from a set of [`Recipients`], @@ -120,8 +121,9 @@ stability_scope!(BETA { /// # Returns /// /// Feedback from submitting the message for delivery. + /// [`Unreliable`] indicates that local submission may be rejected under backpressure. /// [`Feedback::accepted`] does not guarantee that the recipient will receive the message. - fn send(self, message: impl Into + Send, priority: bool) -> Feedback; + fn send(self, message: impl Into + Send, priority: bool) -> Unreliable; } /// Interface for sending messages to a set of recipients. diff --git a/p2p/src/simulated/network.rs b/p2p/src/simulated/network.rs index 294eae27049..9b669c36438 100644 --- a/p2p/src/simulated/network.rs +++ b/p2p/src/simulated/network.rs @@ -14,7 +14,7 @@ use crate::{ Channel, Message as NetworkMessage, PeerSetUpdate, Recipients, TrackedPeers, UnlimitedSender as _, }; -use commonware_actor::Feedback; +use commonware_actor::{Feedback, Unreliable}; use commonware_codec::{DecodeExt, FixedSize}; use commonware_cryptography::PublicKey; use commonware_macros::select_loop; @@ -960,7 +960,7 @@ impl crate::UnlimitedSender for UnlimitedSender { recipients: Recipients

, message: impl Into + Send, priority: bool, - ) -> Feedback { + ) -> Unreliable { let message = message.into().coalesce(); assert!( message.len() <= self.max_size as usize, @@ -970,7 +970,7 @@ impl crate::UnlimitedSender for UnlimitedSender { ); if !self.active.load(Ordering::Acquire) || self.sender.is_closed() { - return Feedback::Closed; + return Unreliable::new(Feedback::Closed); } // The simulated network handles send submissions and topology updates @@ -983,9 +983,9 @@ impl crate::UnlimitedSender for UnlimitedSender { message, priority, }) { - Feedback::Ok + Unreliable::new(Feedback::Ok) } else { - Feedback::Closed + Unreliable::new(Feedback::Closed) } } } @@ -1141,13 +1141,13 @@ impl<'a, P: PublicKey, E: Clock, F: SplitForwarder

> crate::CheckedSender crate::CheckedSender::recipients(&self.checked) } - fn send(self, message: impl Into + Send, priority: bool) -> Feedback { + fn send(self, message: impl Into + Send, priority: bool) -> Unreliable { // Convert to IoBuf here since forwarder needs to inspect the message let message = message.into().coalesce(); // Determine the set of recipients that will receive the message let Some(recipients) = (self.forwarder)(self.replica, &self.recipients, &message) else { - return Feedback::Rejected; + return Unreliable::Rejected; }; // Extract the inner sender and send directly with the new recipients diff --git a/p2p/src/utils/codec.rs b/p2p/src/utils/codec.rs index 40583947408..e525e01b279 100644 --- a/p2p/src/utils/codec.rs +++ b/p2p/src/utils/codec.rs @@ -1,7 +1,7 @@ //! Codec wrapper for [Sender] and [Receiver]. use crate::{Blocker, CheckedSender, Receiver, Recipients, Sender}; -use commonware_actor::{mailbox, Feedback}; +use commonware_actor::{mailbox, Feedback, Unreliable}; use commonware_codec::{Codec, Error}; use commonware_cryptography::PublicKey; use commonware_macros::select_loop; @@ -86,7 +86,7 @@ impl<'a, S: Sender, V: Codec> CheckedWrappedSender<'a, S, V> { self.sender.recipients() } - pub fn send(self, message: V, priority: bool) -> Feedback { + pub fn send(self, message: V, priority: bool) -> Unreliable { let encoded = message.encode_with_pool(self.pool); self.sender.send(encoded, priority) } @@ -134,7 +134,7 @@ impl WrappedReceiver { /// messages are dropped (they would likely no longer be useful by the time we get back to them). struct Decoded(P, V); -impl mailbox::Policy for Decoded { +impl mailbox::UnreliablePolicy for Decoded { type Overflow = VecDeque; fn handle(_overflow: &mut Self::Overflow, _message: Self) -> bool { @@ -144,7 +144,7 @@ impl mailbox::Policy for Decoded { /// Receiver half for successfully decoded messages from a [`WrappedBackgroundReceiver`]. pub struct BackgroundReceiver { - receiver: mailbox::Receiver>, + receiver: mailbox::UnreliableReceiver>, } impl BackgroundReceiver { @@ -169,7 +169,7 @@ where receiver: R, codec_config: V::Cfg, blocker: B, - sender: mailbox::Sender>, + sender: mailbox::UnreliableSender>, max_concurrency: usize, } @@ -194,7 +194,7 @@ where channel_capacity: NonZeroUsize, strategy: &impl Strategy, ) -> (Self, BackgroundReceiver) { - let (tx, rx) = mailbox::new(context.child("mailbox"), channel_capacity); + let (tx, rx) = mailbox::new_unreliable(context.child("mailbox"), channel_capacity); ( Self { context: ContextCell::new(context), @@ -272,7 +272,7 @@ where fn handle_decode_result( blocker: &mut B, - sender: &mut mailbox::Sender>, + sender: &mut mailbox::UnreliableSender>, result: (P, Result), ) { let (peer, decode_result) = result; diff --git a/p2p/src/utils/limited.rs b/p2p/src/utils/limited.rs index 79daba2fc9a..9b1e21b696e 100644 --- a/p2p/src/utils/limited.rs +++ b/p2p/src/utils/limited.rs @@ -1,7 +1,7 @@ //! Rate-limited [`UnlimitedSender`] wrapper. use crate::{Recipients, UnlimitedSender}; -use commonware_actor::Feedback; +use commonware_actor::{Feedback, Unreliable}; use commonware_cryptography::PublicKey; use commonware_runtime::{Clock, IoBufs, KeyedRateLimiter, Quota}; use commonware_utils::{channel::ring, sync::Mutex}; @@ -210,7 +210,7 @@ impl<'a, S: UnlimitedSender> crate::CheckedSender for CheckedSender<'a, S> { } } - fn send(self, message: impl Into + Send, priority: bool) -> Feedback { + fn send(self, message: impl Into + Send, priority: bool) -> Unreliable { self.sender.send(self.recipients, message, priority) } } @@ -260,10 +260,10 @@ mod tests { recipients: Recipients, message: impl Into + Send, priority: bool, - ) -> Feedback { + ) -> Unreliable { let message = message.into().coalesce(); self.sent.lock().push((recipients, message, priority)); - Feedback::Ok + Unreliable::new(Feedback::Ok) } } @@ -332,7 +332,10 @@ mod tests { let mut limited = LimitedSender::new(sender, quota_per_second(10), context, peers); let checked = limited.check(Recipients::One(key(1))).unwrap(); - assert_eq!(checked.send(IoBuf::from(b"hello"), false), Feedback::Ok); + assert_eq!( + checked.send(IoBuf::from(b"hello"), false), + Unreliable::new(Feedback::Ok) + ); }); } @@ -365,7 +368,10 @@ mod tests { let peers_list = vec![key(1), key(2), key(3)]; let checked = limited.check(Recipients::Some(peers_list)).unwrap(); - assert_eq!(checked.send(IoBuf::from(b"hello"), false), Feedback::Ok); + assert_eq!( + checked.send(IoBuf::from(b"hello"), false), + Unreliable::new(Feedback::Ok) + ); assert_sent_to(&sender, 0, &[key(1), key(2), key(3)]); }); } diff --git a/p2p/src/utils/mocks/mod.rs b/p2p/src/utils/mocks/mod.rs index 69fc4020088..5bb2807b1fa 100644 --- a/p2p/src/utils/mocks/mod.rs +++ b/p2p/src/utils/mocks/mod.rs @@ -1,7 +1,7 @@ //! Mock implementations for testing. use crate::{CheckedSender, LimitedSender, Receiver, Recipients}; -use commonware_actor::Feedback; +use commonware_actor::{Feedback, Unreliable}; use commonware_cryptography::PublicKey; use commonware_runtime::{ telemetry::metrics::{Metric, Registered, Registration}, @@ -92,8 +92,8 @@ impl CheckedSender for InertCheckedSender

{ self.recipients.clone() } - fn send(self, _: impl Into + Send, _: bool) -> Feedback { - Feedback::Ok + fn send(self, _: impl Into + Send, _: bool) -> Unreliable { + Unreliable::new(Feedback::Ok) } } @@ -153,8 +153,8 @@ mod tests { vec![self.peer.clone()] } - fn send(self, _message: impl Into + Send, _priority: bool) -> Feedback { - Feedback::Rejected + fn send(self, _message: impl Into + Send, _priority: bool) -> Unreliable { + Unreliable::Rejected } } diff --git a/p2p/src/utils/mux.rs b/p2p/src/utils/mux.rs index d1afd8fafef..026991a0cd2 100644 --- a/p2p/src/utils/mux.rs +++ b/p2p/src/utils/mux.rs @@ -9,7 +9,7 @@ //! even if the muxer is already running. use crate::{Channel, CheckedSender, LimitedSender, Message, Receiver, Recipients, Sender}; -use commonware_actor::Feedback; +use commonware_actor::{Feedback, Unreliable}; use commonware_codec::{varint::UInt, Encode, Error as CodecError, ReadExt}; use commonware_macros::select_loop; use commonware_runtime::{spawn_cell, ContextCell, Handle, IoBuf, IoBufs, Spawner}; @@ -301,9 +301,9 @@ impl GlobalSender { recipients: Recipients, payload: impl Into + Send, priority: bool, - ) -> Feedback { + ) -> Unreliable { self.check(recipients).map_or_else( - |_| Feedback::Rejected, + |_| Unreliable::Rejected, |checked| checked.with_subchannel(subchannel).send(payload, priority), ) } @@ -347,7 +347,7 @@ impl<'a, S: Sender> CheckedSender for CheckedGlobalSender<'a, S> { self.inner.recipients() } - fn send(self, message: impl Into + Send, priority: bool) -> Feedback { + fn send(self, message: impl Into + Send, priority: bool) -> Unreliable { let subchannel = UInt(self.subchannel.expect("subchannel not set")); let mut message = message.into(); message.prepend(subchannel.encode().into()); @@ -674,7 +674,7 @@ mod tests { unreachable!("rate-limited sender should not produce a checked sender"); } - fn send(self, _: impl Into + Send, _: bool) -> Feedback { + fn send(self, _: impl Into + Send, _: bool) -> Unreliable { unreachable!("rate-limited sender should not send"); } } @@ -695,7 +695,7 @@ mod tests { fn test_global_sender_rate_limited_send_rejected() { let mut sender = GlobalSender::new(RateLimitedSender); let feedback = sender.send(0, Recipients::One(pk(0)), b"rate-limited", false); - assert_eq!(feedback, Feedback::Rejected); + assert_eq!(feedback, Unreliable::Rejected); assert!(!feedback.accepted()); } diff --git a/resolver/src/p2p/fetcher.rs b/resolver/src/p2p/fetcher.rs index 148251dfa7d..4ce22e435f7 100644 --- a/resolver/src/p2p/fetcher.rs +++ b/resolver/src/p2p/fetcher.rs @@ -1,5 +1,5 @@ use crate::p2p::wire; -use commonware_actor::Feedback; +use commonware_actor::{Feedback, Unreliable}; use commonware_cryptography::PublicKey; use commonware_p2p::{utils::codec::WrappedSender, Recipients, Sender}; use commonware_runtime::{ @@ -292,7 +292,7 @@ where payload: wire::Payload::Request(key.clone()), }; match checked.send(message, self.priority_requests) { - Feedback::Ok | Feedback::Backoff => { + Unreliable::Outcome(Feedback::Ok | Feedback::Backoff) => { // Success - move from pending to active self.requests_sent.inc(Status::Success); self.pending.remove(&key); @@ -310,7 +310,7 @@ where self.key_to_id.insert(key, id); return; } - feedback @ (Feedback::Rejected | Feedback::Closed) => { + feedback @ (Unreliable::Rejected | Unreliable::Outcome(Feedback::Closed)) => { // Send was not handled, try next peer self.requests_sent.inc(Status::Dropped); debug!(?peer, ?feedback, "send failed"); @@ -536,6 +536,7 @@ where mod tests { use super::*; use crate::p2p::mocks::Key as MockKey; + use commonware_actor::Unreliable; use commonware_cryptography::{ ed25519::{PrivateKey, PublicKey}, Signer, @@ -565,7 +566,7 @@ mod tests { } } - fn send(self, message: impl Into + Send, priority: bool) -> Feedback { + fn send(self, message: impl Into + Send, priority: bool) -> Unreliable { self.sender.send(self.recipients, message, priority) } } @@ -581,8 +582,8 @@ mod tests { _recipients: Recipients, _message: impl Into + Send, _priority: bool, - ) -> Feedback { - Feedback::Rejected + ) -> Unreliable { + Unreliable::Rejected } } @@ -617,9 +618,9 @@ mod tests { recipients: Recipients, _message: impl Into + Send, _priority: bool, - ) -> Feedback { + ) -> Unreliable { match recipients { - Recipients::One(_) => Feedback::Ok, + Recipients::One(_) => Unreliable::new(Feedback::Ok), _ => unimplemented!(), } } diff --git a/resolver/src/p2p/ingress.rs b/resolver/src/p2p/ingress.rs index 5c0485d97b2..50e52b614b2 100644 --- a/resolver/src/p2p/ingress.rs +++ b/resolver/src/p2p/ingress.rs @@ -135,7 +135,7 @@ where { type Overflow = Pending; - fn handle(overflow: &mut Pending, message: Self) -> bool { + fn handle(overflow: &mut Pending, message: Self) { match message { Self::Fetch(keys) => { for key in keys { @@ -171,7 +171,6 @@ where overflow.modifications.push_back(predicate); } } - true } }