Skip to content

Commit 2a51df3

Browse files
[actor] Add Unreliable Feedback Type (#3849)
1 parent edc5cce commit 2a51df3

50 files changed

Lines changed: 1170 additions & 527 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

actor/src/benches/mailbox.rs

Lines changed: 50 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use commonware_actor::{mailbox, Feedback};
1+
use commonware_actor::{mailbox, Feedback, Unreliable};
22
use commonware_runtime::{
33
telemetry::metrics::{Metric, Registered, Registration},
44
Metrics, Name, Supervisor,
@@ -51,10 +51,10 @@ impl Metrics for NoopMetrics {
5151
}
5252
}
5353

54-
fn new<T: mailbox::Policy>(
54+
fn new<T: mailbox::UnreliablePolicy>(
5555
capacity: std::num::NonZeroUsize,
56-
) -> (mailbox::Sender<T>, mailbox::Receiver<T>) {
57-
mailbox::new(NoopMetrics, capacity)
56+
) -> (mailbox::UnreliableSender<T>, mailbox::UnreliableReceiver<T>) {
57+
mailbox::new_unreliable(NoopMetrics, capacity)
5858
}
5959

6060
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
@@ -89,7 +89,7 @@ impl Message {
8989
}
9090
}
9191

92-
impl mailbox::Policy for Message {
92+
impl mailbox::UnreliablePolicy for Message {
9393
type Overflow = VecDeque<Self>;
9494

9595
fn handle(overflow: &mut VecDeque<Self>, message: Self) -> bool {
@@ -125,7 +125,7 @@ fn bench_enqueue_ready(c: &mut Criterion) {
125125
|(sender, _receiver)| {
126126
for _ in 0..MESSAGES {
127127
let result = sender.enqueue(black_box(Message::drop()));
128-
assert_eq!(result, Feedback::Ok);
128+
assert_eq!(result, Unreliable::new(Feedback::Ok));
129129
black_box(result);
130130
}
131131
},
@@ -145,7 +145,10 @@ fn bench_try_recv_ready(c: &mut Criterion) {
145145
|| {
146146
let (sender, receiver) = new::<Message>(NZUsize!(CAPACITY));
147147
for _ in 0..MESSAGES {
148-
assert_eq!(sender.enqueue(Message::drop()), Feedback::Ok);
148+
assert_eq!(
149+
sender.enqueue(Message::drop()),
150+
Unreliable::new(Feedback::Ok)
151+
);
149152
}
150153
receiver
151154
},
@@ -170,10 +173,16 @@ fn bench_try_recv_overflow(c: &mut Criterion) {
170173
|| {
171174
let (sender, receiver) = new::<Message>(NZUsize!(CAPACITY));
172175
for _ in 0..CAPACITY {
173-
assert_eq!(sender.enqueue(Message::drop()), Feedback::Ok);
176+
assert_eq!(
177+
sender.enqueue(Message::drop()),
178+
Unreliable::new(Feedback::Ok)
179+
);
174180
}
175181
for _ in 0..MESSAGES {
176-
assert_eq!(sender.enqueue(Message::spill()), Feedback::Backoff);
182+
assert_eq!(
183+
sender.enqueue(Message::spill()),
184+
Unreliable::new(Feedback::Backoff)
185+
);
177186
}
178187
receiver
179188
},
@@ -199,7 +208,7 @@ fn bench_round_trip_ready(c: &mut Criterion) {
199208
|(sender, mut receiver)| {
200209
for _ in 0..MESSAGES {
201210
let result = sender.enqueue(black_box(Message::drop()));
202-
assert_eq!(result, Feedback::Ok);
211+
assert_eq!(result, Unreliable::new(Feedback::Ok));
203212
black_box(result);
204213
black_box(receiver.try_recv().unwrap());
205214
}
@@ -231,7 +240,7 @@ fn bench_recv_waiting(c: &mut Criterion) {
231240
.await;
232241

233242
let result = sender.enqueue(Message::drop());
234-
assert_eq!(result, Feedback::Ok);
243+
assert_eq!(result, Unreliable::new(Feedback::Ok));
235244
black_box(result);
236245
black_box(next.await.unwrap());
237246
}
@@ -252,13 +261,16 @@ fn bench_overflow_drop(c: &mut Criterion) {
252261
b.iter_batched(
253262
|| {
254263
let (sender, receiver) = new::<Message>(NZUsize!(1));
255-
assert_eq!(sender.enqueue(Message::drop()), Feedback::Ok);
264+
assert_eq!(
265+
sender.enqueue(Message::drop()),
266+
Unreliable::new(Feedback::Ok)
267+
);
256268
(sender, receiver)
257269
},
258270
|(sender, _receiver)| {
259271
for _ in 0..MESSAGES {
260272
let result = sender.enqueue(black_box(Message::drop()));
261-
assert_eq!(result, Feedback::Rejected);
273+
assert_eq!(result, Unreliable::Rejected);
262274
black_box(result);
263275
}
264276
},
@@ -277,13 +289,16 @@ fn bench_overflow_spill(c: &mut Criterion) {
277289
b.iter_batched(
278290
|| {
279291
let (sender, receiver) = new::<Message>(NZUsize!(1));
280-
assert_eq!(sender.enqueue(Message::drop()), Feedback::Ok);
292+
assert_eq!(
293+
sender.enqueue(Message::drop()),
294+
Unreliable::new(Feedback::Ok)
295+
);
281296
(sender, receiver)
282297
},
283298
|(sender, _receiver)| {
284299
for _ in 0..MESSAGES {
285300
let result = sender.enqueue(black_box(Message::spill()));
286-
assert_eq!(result, Feedback::Backoff);
301+
assert_eq!(result, Unreliable::new(Feedback::Backoff));
287302
black_box(result);
288303
}
289304
},
@@ -294,17 +309,31 @@ fn bench_overflow_spill(c: &mut Criterion) {
294309
group.finish();
295310
}
296311

297-
fn replace_queue(newest: bool) -> (mailbox::Sender<Message>, mailbox::Receiver<Message>) {
312+
fn replace_queue(
313+
newest: bool,
314+
) -> (
315+
mailbox::UnreliableSender<Message>,
316+
mailbox::UnreliableReceiver<Message>,
317+
) {
298318
let (sender, receiver) = new::<Message>(NZUsize!(REPLACE_CAPACITY));
299319

300320
for _ in 0..REPLACE_CAPACITY {
301-
assert_eq!(sender.enqueue(Message::drop()), Feedback::Ok);
321+
assert_eq!(
322+
sender.enqueue(Message::drop()),
323+
Unreliable::new(Feedback::Ok)
324+
);
302325
}
303-
assert_eq!(sender.enqueue(Message::replace()), Feedback::Backoff);
326+
assert_eq!(
327+
sender.enqueue(Message::replace()),
328+
Unreliable::new(Feedback::Backoff)
329+
);
304330

305331
if !newest {
306332
for _ in 1..REPLACE_CAPACITY {
307-
assert_eq!(sender.enqueue(Message::spill()), Feedback::Backoff);
333+
assert_eq!(
334+
sender.enqueue(Message::spill()),
335+
Unreliable::new(Feedback::Backoff)
336+
);
308337
}
309338
}
310339

@@ -324,7 +353,7 @@ fn bench_overflow_replace(c: &mut Criterion) {
324353
|(sender, _receiver)| {
325354
for _ in 0..MESSAGES {
326355
let result = sender.enqueue(black_box(Message::replace()));
327-
assert_eq!(result, Feedback::Backoff);
356+
assert_eq!(result, Unreliable::new(Feedback::Backoff));
328357
black_box(result);
329358
}
330359
},
@@ -352,7 +381,7 @@ fn bench_concurrent_enqueue(c: &mut Criterion) {
352381
scope.spawn(move || {
353382
for _ in 0..PRODUCER_MESSAGES {
354383
let result = sender.enqueue(Message::drop());
355-
assert_eq!(result, Feedback::Ok);
384+
assert_eq!(result, Unreliable::new(Feedback::Ok));
356385
black_box(result);
357386
}
358387
});

actor/src/lib.rs

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@ commonware_macros::stability_scope!(BETA {
1717
Ok,
1818
/// The submission exceeded configured capacity but was handled by the overflow policy.
1919
Backoff,
20-
/// The submission exceeded configured capacity and was rejected by the overflow policy.
21-
Rejected,
2220
/// The endpoint is closed.
2321
Closed,
2422
}
@@ -30,5 +28,49 @@ commonware_macros::stability_scope!(BETA {
3028
}
3129
}
3230

31+
/// Feedback from endpoints that may reject work under backpressure.
32+
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
33+
pub enum Unreliable<T> {
34+
/// Endpoint outcome from the submission attempt.
35+
Outcome(T),
36+
/// The work was rejected by the endpoint.
37+
Rejected,
38+
}
39+
40+
impl<T> Unreliable<T> {
41+
/// Wrap an outcome for an operation that may reject work.
42+
pub const fn new(outcome: T) -> Self {
43+
Self::Outcome(outcome)
44+
}
45+
46+
/// Create a rejected result.
47+
pub const fn rejected() -> Self {
48+
Self::Rejected
49+
}
50+
51+
/// Returns `true` when the operation was rejected before producing an outcome.
52+
pub const fn is_rejected(&self) -> bool {
53+
matches!(self, Self::Rejected)
54+
}
55+
56+
/// Returns the outcome when the operation was not rejected.
57+
pub fn outcome(self) -> Option<T> {
58+
match self {
59+
Self::Outcome(outcome) => Some(outcome),
60+
Self::Rejected => None,
61+
}
62+
}
63+
}
64+
65+
impl Unreliable<Feedback> {
66+
/// Returns `true` when the endpoint handled the submission.
67+
pub const fn accepted(self) -> bool {
68+
match self {
69+
Self::Outcome(feedback) => feedback.accepted(),
70+
Self::Rejected => false,
71+
}
72+
}
73+
}
74+
3375
pub mod mailbox;
3476
});

0 commit comments

Comments
 (0)