Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 50 additions & 21 deletions actor/src/benches/mailbox.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use commonware_actor::{mailbox, Feedback};
use commonware_actor::{mailbox, Feedback, Unreliable};
use commonware_runtime::{
telemetry::metrics::{Metric, Registered, Registration},
Metrics, Name, Supervisor,
Expand Down Expand Up @@ -51,10 +51,10 @@ impl Metrics for NoopMetrics {
}
}

fn new<T: mailbox::Policy>(
fn new<T: mailbox::UnreliablePolicy>(
capacity: std::num::NonZeroUsize,
) -> (mailbox::Sender<T>, mailbox::Receiver<T>) {
mailbox::new(NoopMetrics, capacity)
) -> (mailbox::UnreliableSender<T>, mailbox::UnreliableReceiver<T>) {
mailbox::new_unreliable(NoopMetrics, capacity)
}

#[derive(Clone, Copy, Debug, PartialEq, Eq)]
Expand Down Expand Up @@ -89,7 +89,7 @@ impl Message {
}
}

impl mailbox::Policy for Message {
impl mailbox::UnreliablePolicy for Message {
type Overflow = VecDeque<Self>;

fn handle(overflow: &mut VecDeque<Self>, message: Self) -> bool {
Expand Down Expand Up @@ -125,7 +125,7 @@ fn bench_enqueue_ready(c: &mut Criterion) {
|(sender, _receiver)| {
for _ in 0..MESSAGES {
let result = sender.enqueue(black_box(Message::drop()));
assert_eq!(result, Feedback::Ok);
assert_eq!(result, Unreliable::new(Feedback::Ok));
black_box(result);
}
},
Expand All @@ -145,7 +145,10 @@ fn bench_try_recv_ready(c: &mut Criterion) {
|| {
let (sender, receiver) = new::<Message>(NZUsize!(CAPACITY));
for _ in 0..MESSAGES {
assert_eq!(sender.enqueue(Message::drop()), Feedback::Ok);
assert_eq!(
sender.enqueue(Message::drop()),
Unreliable::new(Feedback::Ok)
);
}
receiver
},
Expand All @@ -170,10 +173,16 @@ fn bench_try_recv_overflow(c: &mut Criterion) {
|| {
let (sender, receiver) = new::<Message>(NZUsize!(CAPACITY));
for _ in 0..CAPACITY {
assert_eq!(sender.enqueue(Message::drop()), Feedback::Ok);
assert_eq!(
sender.enqueue(Message::drop()),
Unreliable::new(Feedback::Ok)
);
}
for _ in 0..MESSAGES {
assert_eq!(sender.enqueue(Message::spill()), Feedback::Backoff);
assert_eq!(
sender.enqueue(Message::spill()),
Unreliable::new(Feedback::Backoff)
);
}
receiver
},
Expand All @@ -199,7 +208,7 @@ fn bench_round_trip_ready(c: &mut Criterion) {
|(sender, mut receiver)| {
for _ in 0..MESSAGES {
let result = sender.enqueue(black_box(Message::drop()));
assert_eq!(result, Feedback::Ok);
assert_eq!(result, Unreliable::new(Feedback::Ok));
black_box(result);
black_box(receiver.try_recv().unwrap());
}
Expand Down Expand Up @@ -231,7 +240,7 @@ fn bench_recv_waiting(c: &mut Criterion) {
.await;

let result = sender.enqueue(Message::drop());
assert_eq!(result, Feedback::Ok);
assert_eq!(result, Unreliable::new(Feedback::Ok));
black_box(result);
black_box(next.await.unwrap());
}
Expand All @@ -252,13 +261,16 @@ fn bench_overflow_drop(c: &mut Criterion) {
b.iter_batched(
|| {
let (sender, receiver) = new::<Message>(NZUsize!(1));
assert_eq!(sender.enqueue(Message::drop()), Feedback::Ok);
assert_eq!(
sender.enqueue(Message::drop()),
Unreliable::new(Feedback::Ok)
);
(sender, receiver)
},
|(sender, _receiver)| {
for _ in 0..MESSAGES {
let result = sender.enqueue(black_box(Message::drop()));
assert_eq!(result, Feedback::Rejected);
assert_eq!(result, Unreliable::Rejected);
black_box(result);
}
},
Expand All @@ -277,13 +289,16 @@ fn bench_overflow_spill(c: &mut Criterion) {
b.iter_batched(
|| {
let (sender, receiver) = new::<Message>(NZUsize!(1));
assert_eq!(sender.enqueue(Message::drop()), Feedback::Ok);
assert_eq!(
sender.enqueue(Message::drop()),
Unreliable::new(Feedback::Ok)
);
(sender, receiver)
},
|(sender, _receiver)| {
for _ in 0..MESSAGES {
let result = sender.enqueue(black_box(Message::spill()));
assert_eq!(result, Feedback::Backoff);
assert_eq!(result, Unreliable::new(Feedback::Backoff));
black_box(result);
}
},
Expand All @@ -294,17 +309,31 @@ fn bench_overflow_spill(c: &mut Criterion) {
group.finish();
}

fn replace_queue(newest: bool) -> (mailbox::Sender<Message>, mailbox::Receiver<Message>) {
fn replace_queue(
newest: bool,
) -> (
mailbox::UnreliableSender<Message>,
mailbox::UnreliableReceiver<Message>,
) {
let (sender, receiver) = new::<Message>(NZUsize!(REPLACE_CAPACITY));

for _ in 0..REPLACE_CAPACITY {
assert_eq!(sender.enqueue(Message::drop()), Feedback::Ok);
assert_eq!(
sender.enqueue(Message::drop()),
Unreliable::new(Feedback::Ok)
);
}
assert_eq!(sender.enqueue(Message::replace()), Feedback::Backoff);
assert_eq!(
sender.enqueue(Message::replace()),
Unreliable::new(Feedback::Backoff)
);

if !newest {
for _ in 1..REPLACE_CAPACITY {
assert_eq!(sender.enqueue(Message::spill()), Feedback::Backoff);
assert_eq!(
sender.enqueue(Message::spill()),
Unreliable::new(Feedback::Backoff)
);
}
}

Expand All @@ -324,7 +353,7 @@ fn bench_overflow_replace(c: &mut Criterion) {
|(sender, _receiver)| {
for _ in 0..MESSAGES {
let result = sender.enqueue(black_box(Message::replace()));
assert_eq!(result, Feedback::Backoff);
assert_eq!(result, Unreliable::new(Feedback::Backoff));
black_box(result);
}
},
Expand Down Expand Up @@ -352,7 +381,7 @@ fn bench_concurrent_enqueue(c: &mut Criterion) {
scope.spawn(move || {
for _ in 0..PRODUCER_MESSAGES {
let result = sender.enqueue(Message::drop());
assert_eq!(result, Feedback::Ok);
assert_eq!(result, Unreliable::new(Feedback::Ok));
black_box(result);
}
});
Expand Down
46 changes: 44 additions & 2 deletions actor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ commonware_macros::stability_scope!(BETA {
Ok,
/// The submission exceeded configured capacity but was handled by the overflow policy.
Backoff,
/// The submission exceeded configured capacity and was rejected by the overflow policy.
Rejected,
/// The endpoint is closed.
Comment thread
patrick-ogrady marked this conversation as resolved.
Closed,
}
Expand All @@ -30,5 +28,49 @@ commonware_macros::stability_scope!(BETA {
}
}

/// Feedback from endpoints that may reject work under backpressure.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum Unreliable<T> {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is kept generic in case we start adding more "outcomes" ... if we eventually go a different route, we can do UnreliableFeedback or something.

/// Endpoint outcome from the submission attempt.
Outcome(T),
/// The work was rejected by the endpoint.
Rejected,
}

impl<T> Unreliable<T> {
/// Wrap an outcome for an operation that may reject work.
pub const fn new(outcome: T) -> Self {
Self::Outcome(outcome)
}

/// Create a rejected result.
pub const fn rejected() -> Self {
Self::Rejected
}

/// Returns `true` when the operation was rejected before producing an outcome.
pub const fn is_rejected(&self) -> bool {
matches!(self, Self::Rejected)
}

/// Returns the outcome when the operation was not rejected.
pub fn outcome(self) -> Option<T> {
match self {
Self::Outcome(outcome) => Some(outcome),
Self::Rejected => None,
}
}
}

impl Unreliable<Feedback> {
/// Returns `true` when the endpoint handled the submission.
pub const fn accepted(self) -> bool {
match self {
Self::Outcome(feedback) => feedback.accepted(),
Self::Rejected => false,
}
}
}

pub mod mailbox;
});
Loading
Loading