Skip to content

Commit 99c1891

Browse files
use unreliable
1 parent 2e16f57 commit 99c1891

29 files changed

Lines changed: 568 additions & 339 deletions

File tree

actor/src/benches/mailbox.rs

Lines changed: 46 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use commonware_actor::{mailbox, Feedback, Lossy};
1+
use commonware_actor::{mailbox, Feedback, Unreliable};
22
use commonware_runtime::{
33
telemetry::metrics::{Metric, Registered, Registration},
44
Metrics, Name, Supervisor,
@@ -51,10 +51,10 @@ impl Metrics for NoopMetrics {
5151
}
5252
}
5353

54-
fn new<T: mailbox::LossyPolicy>(
54+
fn new<T: mailbox::UnreliablePolicy>(
5555
capacity: std::num::NonZeroUsize,
56-
) -> (mailbox::LossySender<T>, mailbox::LossyReceiver<T>) {
57-
mailbox::new_lossy(NoopMetrics, capacity)
56+
) -> (mailbox::UnreliableSender<T>, mailbox::UnreliableReceiver<T>) {
57+
mailbox::new_unreliable(NoopMetrics, capacity)
5858
}
5959

6060
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
@@ -89,7 +89,7 @@ impl Message {
8989
}
9090
}
9191

92-
impl mailbox::LossyPolicy for Message {
92+
impl mailbox::UnreliablePolicy for Message {
9393
type Overflow = VecDeque<Self>;
9494

9595
fn handle(overflow: &mut VecDeque<Self>, message: Self) -> bool {
@@ -125,7 +125,7 @@ fn bench_enqueue_ready(c: &mut Criterion) {
125125
|(sender, _receiver)| {
126126
for _ in 0..MESSAGES {
127127
let result = sender.enqueue(black_box(Message::drop()));
128-
assert_eq!(result, Feedback::Ok);
128+
assert_eq!(result, Unreliable::new(Feedback::Ok));
129129
black_box(result);
130130
}
131131
},
@@ -145,7 +145,10 @@ fn bench_try_recv_ready(c: &mut Criterion) {
145145
|| {
146146
let (sender, receiver) = new::<Message>(NZUsize!(CAPACITY));
147147
for _ in 0..MESSAGES {
148-
assert_eq!(sender.enqueue(Message::drop()), Feedback::Ok);
148+
assert_eq!(
149+
sender.enqueue(Message::drop()),
150+
Unreliable::new(Feedback::Ok)
151+
);
149152
}
150153
receiver
151154
},
@@ -170,10 +173,16 @@ fn bench_try_recv_overflow(c: &mut Criterion) {
170173
|| {
171174
let (sender, receiver) = new::<Message>(NZUsize!(CAPACITY));
172175
for _ in 0..CAPACITY {
173-
assert_eq!(sender.enqueue(Message::drop()), Feedback::Ok);
176+
assert_eq!(
177+
sender.enqueue(Message::drop()),
178+
Unreliable::new(Feedback::Ok)
179+
);
174180
}
175181
for _ in 0..MESSAGES {
176-
assert_eq!(sender.enqueue(Message::spill()), Feedback::Backoff);
182+
assert_eq!(
183+
sender.enqueue(Message::spill()),
184+
Unreliable::new(Feedback::Backoff)
185+
);
177186
}
178187
receiver
179188
},
@@ -199,7 +208,7 @@ fn bench_round_trip_ready(c: &mut Criterion) {
199208
|(sender, mut receiver)| {
200209
for _ in 0..MESSAGES {
201210
let result = sender.enqueue(black_box(Message::drop()));
202-
assert_eq!(result, Feedback::Ok);
211+
assert_eq!(result, Unreliable::new(Feedback::Ok));
203212
black_box(result);
204213
black_box(receiver.try_recv().unwrap());
205214
}
@@ -231,7 +240,7 @@ fn bench_recv_waiting(c: &mut Criterion) {
231240
.await;
232241

233242
let result = sender.enqueue(Message::drop());
234-
assert_eq!(result, Feedback::Ok);
243+
assert_eq!(result, Unreliable::new(Feedback::Ok));
235244
black_box(result);
236245
black_box(next.await.unwrap());
237246
}
@@ -252,13 +261,16 @@ fn bench_overflow_drop(c: &mut Criterion) {
252261
b.iter_batched(
253262
|| {
254263
let (sender, receiver) = new::<Message>(NZUsize!(1));
255-
assert_eq!(sender.enqueue(Message::drop()), Feedback::Ok);
264+
assert_eq!(
265+
sender.enqueue(Message::drop()),
266+
Unreliable::new(Feedback::Ok)
267+
);
256268
(sender, receiver)
257269
},
258270
|(sender, _receiver)| {
259271
for _ in 0..MESSAGES {
260272
let result = sender.enqueue(black_box(Message::drop()));
261-
assert_eq!(result, Lossy::Rejected);
273+
assert_eq!(result, Unreliable::Rejected);
262274
black_box(result);
263275
}
264276
},
@@ -277,13 +289,16 @@ fn bench_overflow_spill(c: &mut Criterion) {
277289
b.iter_batched(
278290
|| {
279291
let (sender, receiver) = new::<Message>(NZUsize!(1));
280-
assert_eq!(sender.enqueue(Message::drop()), Feedback::Ok);
292+
assert_eq!(
293+
sender.enqueue(Message::drop()),
294+
Unreliable::new(Feedback::Ok)
295+
);
281296
(sender, receiver)
282297
},
283298
|(sender, _receiver)| {
284299
for _ in 0..MESSAGES {
285300
let result = sender.enqueue(black_box(Message::spill()));
286-
assert_eq!(result, Feedback::Backoff);
301+
assert_eq!(result, Unreliable::new(Feedback::Backoff));
287302
black_box(result);
288303
}
289304
},
@@ -297,19 +312,28 @@ fn bench_overflow_spill(c: &mut Criterion) {
297312
fn replace_queue(
298313
newest: bool,
299314
) -> (
300-
mailbox::LossySender<Message>,
301-
mailbox::LossyReceiver<Message>,
315+
mailbox::UnreliableSender<Message>,
316+
mailbox::UnreliableReceiver<Message>,
302317
) {
303318
let (sender, receiver) = new::<Message>(NZUsize!(REPLACE_CAPACITY));
304319

305320
for _ in 0..REPLACE_CAPACITY {
306-
assert_eq!(sender.enqueue(Message::drop()), Feedback::Ok);
321+
assert_eq!(
322+
sender.enqueue(Message::drop()),
323+
Unreliable::new(Feedback::Ok)
324+
);
307325
}
308-
assert_eq!(sender.enqueue(Message::replace()), Feedback::Backoff);
326+
assert_eq!(
327+
sender.enqueue(Message::replace()),
328+
Unreliable::new(Feedback::Backoff)
329+
);
309330

310331
if !newest {
311332
for _ in 1..REPLACE_CAPACITY {
312-
assert_eq!(sender.enqueue(Message::spill()), Feedback::Backoff);
333+
assert_eq!(
334+
sender.enqueue(Message::spill()),
335+
Unreliable::new(Feedback::Backoff)
336+
);
313337
}
314338
}
315339

@@ -329,7 +353,7 @@ fn bench_overflow_replace(c: &mut Criterion) {
329353
|(sender, _receiver)| {
330354
for _ in 0..MESSAGES {
331355
let result = sender.enqueue(black_box(Message::replace()));
332-
assert_eq!(result, Feedback::Backoff);
356+
assert_eq!(result, Unreliable::new(Feedback::Backoff));
333357
black_box(result);
334358
}
335359
},
@@ -357,7 +381,7 @@ fn bench_concurrent_enqueue(c: &mut Criterion) {
357381
scope.spawn(move || {
358382
for _ in 0..PRODUCER_MESSAGES {
359383
let result = sender.enqueue(Message::drop());
360-
assert_eq!(result, Feedback::Ok);
384+
assert_eq!(result, Unreliable::new(Feedback::Ok));
361385
black_box(result);
362386
}
363387
});

actor/src/lib.rs

Lines changed: 9 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -28,19 +28,19 @@ commonware_macros::stability_scope!(BETA {
2828
}
2929
}
3030

31-
/// Feedback from endpoints that may drop work under backpressure.
31+
/// Feedback from endpoints that may reject work under backpressure.
3232
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
33-
pub enum Lossy<T> {
34-
/// The work was handled by the endpoint.
35-
Handled(T),
33+
pub enum Unreliable<T> {
34+
/// Feedback returned by the endpoint.
35+
Feedback(T),
3636
/// The work was rejected by the endpoint.
3737
Rejected,
3838
}
3939

40-
impl Lossy<Feedback> {
41-
/// Create handled feedback for an endpoint that may reject live work.
40+
impl Unreliable<Feedback> {
41+
/// Wrap endpoint feedback for an endpoint that may reject work.
4242
pub const fn new(feedback: Feedback) -> Self {
43-
Self::Handled(feedback)
43+
Self::Feedback(feedback)
4444
}
4545

4646
/// Create rejected feedback.
@@ -51,29 +51,17 @@ commonware_macros::stability_scope!(BETA {
5151
/// Returns `true` when the endpoint handled the submission.
5252
pub const fn accepted(self) -> bool {
5353
match self {
54-
Self::Handled(feedback) => feedback.accepted(),
54+
Self::Feedback(feedback) => feedback.accepted(),
5555
Self::Rejected => false,
5656
}
5757
}
5858
}
5959

60-
impl From<Feedback> for Lossy<Feedback> {
60+
impl From<Feedback> for Unreliable<Feedback> {
6161
fn from(feedback: Feedback) -> Self {
6262
Self::new(feedback)
6363
}
6464
}
6565

66-
impl PartialEq<Feedback> for Lossy<Feedback> {
67-
fn eq(&self, other: &Feedback) -> bool {
68-
matches!(self, Self::Handled(feedback) if feedback == other)
69-
}
70-
}
71-
72-
impl PartialEq<Lossy<Self>> for Feedback {
73-
fn eq(&self, other: &Lossy<Self>) -> bool {
74-
other == self
75-
}
76-
}
77-
7866
pub mod mailbox;
7967
});

0 commit comments

Comments
 (0)