Skip to content

Commit 1b229b8

Browse files
[p2p] Migrate to Mailbox (#3795)
1 parent f4329d4 commit 1b229b8

114 files changed

Lines changed: 4803 additions & 4832 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

Cargo.lock

Lines changed: 3 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

actor/src/benches/mailbox.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -92,11 +92,12 @@ impl Message {
9292
impl mailbox::Policy for Message {
9393
type Overflow = VecDeque<Self>;
9494

95-
fn handle(overflow: &mut VecDeque<Self>, message: Self) {
95+
fn handle(overflow: &mut VecDeque<Self>, message: Self) -> bool {
9696
match message.policy {
97-
Policy::Drop => {}
97+
Policy::Drop => false,
9898
Policy::Spill => {
9999
overflow.push_back(message);
100+
true
100101
}
101102
Policy::Replace => {
102103
if let Some(pending) = overflow
@@ -108,6 +109,7 @@ impl mailbox::Policy for Message {
108109
} else {
109110
overflow.push_back(message);
110111
}
112+
true
111113
}
112114
}
113115
}
@@ -256,7 +258,7 @@ fn bench_overflow_drop(c: &mut Criterion) {
256258
|(sender, _receiver)| {
257259
for _ in 0..MESSAGES {
258260
let result = sender.enqueue(black_box(Message::drop()));
259-
assert_eq!(result, Feedback::Backoff);
261+
assert_eq!(result, Feedback::Rejected);
260262
black_box(result);
261263
}
262264
},

actor/src/lib.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,10 @@ commonware_macros::stability_scope!(BETA {
1515
pub enum Feedback {
1616
/// The work was accepted within the configured capacity.
1717
Ok,
18-
/// The submission exceeded the configured capacity and requests sender backoff.
18+
/// The submission exceeded configured capacity but was handled by the overflow policy.
1919
Backoff,
20+
/// The submission exceeded configured capacity and was rejected by the overflow policy.
21+
Rejected,
2022
/// The endpoint is closed.
2123
Closed,
2224
}

actor/src/mailbox.rs

Lines changed: 64 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -93,9 +93,15 @@ pub trait Policy: Sized {
9393

9494
/// Handle `message` when it cannot enter the bounded ready queue immediately.
9595
///
96-
/// Messages already in the ready queue are not provided here. Policy changes only apply to
97-
/// overflow retained beyond ready capacity. Policies may append, remove, replace, reorder, or
98-
/// clear overflow, and are responsible for bounding it when a hard memory limit is required.
96+
/// Returns `true` when the policy considered the message's effects. This includes
97+
/// retaining the message, coalescing it with retained work, replacing older retained work,
98+
/// or deliberately doing no work because the message is already satisfied, superseded, or no
99+
/// longer needed (for example, a request whose response channel is already closed). These
100+
/// no-op cases are still handled: there is no remaining work, so there is no loss to report.
101+
///
102+
/// Returns `false` only when the policy rejects the message under backpressure. This is the
103+
/// lossy case: the submitted work was not semantically handled, and callers that care should
104+
/// retry or treat the submission as failed.
99105
///
100106
/// # Warning
101107
///
@@ -106,7 +112,7 @@ pub trait Policy: Sized {
106112
/// This method should not unwind after mutating `overflow`. A panic, including one from a
107113
/// destructor triggered while editing `overflow`, can leave retained overflow data stranded in
108114
/// the mailbox.
109-
fn handle(overflow: &mut Self::Overflow, message: Self);
115+
fn handle(overflow: &mut Self::Overflow, message: Self) -> bool;
110116
}
111117

112118
// `activity` packs the published overflow state and in-flight overflow
@@ -299,9 +305,13 @@ impl<T: Policy> OverflowState<T> {
299305
};
300306

301307
// Preserve overflow order, or handle a still-full ready queue.
302-
T::handle(&mut queue, message);
308+
let handled = T::handle(&mut queue, message);
303309
mutation.publish(queue.is_empty());
304-
Feedback::Backoff
310+
if handled {
311+
Feedback::Backoff
312+
} else {
313+
Feedback::Rejected
314+
}
305315
}
306316

307317
fn refill(&self, ready: &Ready<T>) {
@@ -445,10 +455,10 @@ impl<T: Policy> Sender<T> {
445455
self.state.backoff.inc();
446456
}
447457

448-
// Wake on any handled enqueue because a receiver may have skipped
449-
// refill while this overflow mutation was active. By the time we wake,
450-
// the mutation has published its overflow state. Spurious wakes are
451-
// acceptable.
458+
// Wake after any non-closed slow-path enqueue because a receiver may
459+
// have skipped refill while this overflow mutation was active. By the
460+
// time we wake, the mutation has published its overflow state. Spurious
461+
// wakes are acceptable.
452462
if feedback != Feedback::Closed {
453463
self.state.waker.wake();
454464
}
@@ -631,7 +641,7 @@ mod tests {
631641
impl Policy for Message {
632642
type Overflow = VecDeque<Self>;
633643

634-
fn handle(overflow: &mut VecDeque<Self>, message: Self) {
644+
fn handle(overflow: &mut VecDeque<Self>, message: Self) -> bool {
635645
match message {
636646
Self::Update(value) => {
637647
if let Some(index) = overflow
@@ -641,21 +651,24 @@ mod tests {
641651
overflow.remove(index);
642652
}
643653
overflow.push_back(Self::Update(value));
654+
true
644655
}
645656
Self::Required(_) | Self::Buffered(_) => {
646657
overflow.push_back(message);
658+
true
647659
}
648660
Self::Hint(value) => {
649661
let Some(index) = overflow
650662
.iter()
651663
.rposition(|pending| matches!(pending, Self::Update(_)))
652664
else {
653-
return;
665+
return true;
654666
};
655667
overflow.remove(index);
656668
overflow.push_back(Self::Hint(value));
669+
true
657670
}
658-
Self::Vote(_) => {}
671+
Self::Vote(_) => false,
659672
}
660673
}
661674
}
@@ -667,8 +680,9 @@ mod tests {
667680
impl Policy for Ack {
668681
type Overflow = VecDeque<Self>;
669682

670-
fn handle(overflow: &mut VecDeque<Self>, message: Self) {
683+
fn handle(overflow: &mut VecDeque<Self>, message: Self) -> bool {
671684
overflow.push_back(message);
685+
true
672686
}
673687
}
674688

@@ -738,7 +752,7 @@ mod tests {
738752
async fn full_inbox_rejects_non_replaceable_message() {
739753
let (sender, mut receiver) = new(NZUsize!(1));
740754
assert_eq!(sender.enqueue(Message::Vote(1)), Feedback::Ok);
741-
assert_eq!(sender.enqueue(Message::Vote(2)), Feedback::Backoff);
755+
assert_eq!(sender.enqueue(Message::Vote(2)), Feedback::Rejected);
742756

743757
assert_eq!(receiver.recv().await, Some(Message::Vote(1)));
744758
}
@@ -780,6 +794,25 @@ mod tests {
780794
});
781795
}
782796

797+
#[test]
798+
fn rejected_feedback_is_not_accepted_or_counted_as_backoff() {
799+
let executor = deterministic::Runner::default();
800+
executor.start(|context| async move {
801+
let (sender, _receiver) = super::new(context.child("mailbox"), NZUsize!(1));
802+
assert_eq!(sender.enqueue(Message::Vote(1)), Feedback::Ok);
803+
let feedback = sender.enqueue(Message::Vote(2));
804+
805+
assert_eq!(feedback, Feedback::Rejected);
806+
assert!(!feedback.accepted());
807+
808+
let buffer = context.encode();
809+
assert!(
810+
buffer.contains("mailbox_backoff_total 0"),
811+
"unexpected backoff count in metrics: {buffer}"
812+
);
813+
});
814+
}
815+
783816
#[test]
784817
fn try_recv_drains_buffered_messages_after_senders_drop() {
785818
let (sender, mut receiver) = new(NZUsize!(1));
@@ -1007,9 +1040,10 @@ mod tests {
10071040
impl Policy for ClearingMessage {
10081041
type Overflow = VecDeque<Self>;
10091042

1010-
fn handle(overflow: &mut VecDeque<Self>, message: Self) {
1043+
fn handle(overflow: &mut VecDeque<Self>, message: Self) -> bool {
10111044
overflow.push_back(message);
10121045
overflow.clear();
1046+
true
10131047
}
10141048
}
10151049

@@ -1038,8 +1072,9 @@ mod tests {
10381072
impl Policy for SpillMessage {
10391073
type Overflow = VecDeque<Self>;
10401074

1041-
fn handle(overflow: &mut VecDeque<Self>, message: Self) {
1075+
fn handle(overflow: &mut VecDeque<Self>, message: Self) -> bool {
10421076
overflow.push_back(message);
1077+
true
10431078
}
10441079
}
10451080

@@ -1131,11 +1166,12 @@ mod loom_tests {
11311166
impl Policy for Message {
11321167
type Overflow = VecDeque<Self>;
11331168

1134-
fn handle(overflow: &mut VecDeque<Self>, message: Self) {
1169+
fn handle(overflow: &mut VecDeque<Self>, message: Self) -> bool {
11351170
match message {
1136-
Self::Drop(_) => {}
1171+
Self::Drop(_) => false,
11371172
Self::Spill(_) => {
11381173
overflow.push_back(message);
1174+
true
11391175
}
11401176
}
11411177
}
@@ -1144,7 +1180,7 @@ mod loom_tests {
11441180
impl Policy for OrderedMessage {
11451181
type Overflow = VecDeque<Self>;
11461182

1147-
fn handle(overflow: &mut VecDeque<Self>, message: Self) {
1183+
fn handle(overflow: &mut VecDeque<Self>, message: Self) -> bool {
11481184
let gate = match &message {
11491185
Self::Item(_) => None,
11501186
Self::Coordinated(_, gate) => Some(gate.clone()),
@@ -1156,15 +1192,16 @@ mod loom_tests {
11561192
thread::yield_now();
11571193
}
11581194
}
1195+
true
11591196
}
11601197
}
11611198

11621199
impl Policy for ReplacingMessage {
11631200
type Overflow = VecDeque<Self>;
11641201

1165-
fn handle(overflow: &mut VecDeque<Self>, message: Self) {
1202+
fn handle(overflow: &mut VecDeque<Self>, message: Self) -> bool {
11661203
match message {
1167-
Self::FillReady => {}
1204+
Self::FillReady => false,
11681205
Self::Replace(_) => {
11691206
if let Some(pending) = overflow
11701207
.iter_mut()
@@ -1175,6 +1212,7 @@ mod loom_tests {
11751212
} else {
11761213
overflow.push_back(message);
11771214
}
1215+
true
11781216
}
11791217
}
11801218
}
@@ -1183,16 +1221,18 @@ mod loom_tests {
11831221
impl Policy for TrackedMessage {
11841222
type Overflow = VecDeque<Self>;
11851223

1186-
fn handle(overflow: &mut VecDeque<Self>, message: Self) {
1224+
fn handle(overflow: &mut VecDeque<Self>, message: Self) -> bool {
11871225
overflow.push_back(message);
1226+
true
11881227
}
11891228
}
11901229

11911230
impl Policy for CyclicMessage {
11921231
type Overflow = VecDeque<Self>;
11931232

1194-
fn handle(overflow: &mut VecDeque<Self>, message: Self) {
1233+
fn handle(overflow: &mut VecDeque<Self>, message: Self) -> bool {
11951234
overflow.push_back(message);
1235+
true
11961236
}
11971237
}
11981238

broadcast/src/buffered/engine.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ where
188188
message,
189189
} => {
190190
trace!("mailbox: broadcast");
191-
self.handle_broadcast(&mut sender, recipients, message).await;
191+
self.handle_broadcast(&mut sender, recipients, message);
192192
}
193193
Message::Subscribe { digest, responder } => {
194194
trace!("mailbox: subscribe");
@@ -232,7 +232,7 @@ where
232232
////////////////////////////////////////
233233

234234
/// Handles a `broadcast` request from the application.
235-
async fn handle_broadcast<Sr: Sender<PublicKey = P>>(
235+
fn handle_broadcast<Sr: Sender<PublicKey = P>>(
236236
&mut self,
237237
sender: &mut WrappedSender<Sr, M>,
238238
recipients: Recipients<P>,
@@ -243,9 +243,7 @@ where
243243
let _ = self.insert_message(self.public_key.clone(), digest, msg.clone());
244244

245245
// Broadcast the message to the network
246-
if let Err(err) = sender.send(recipients, msg, self.priority).await {
247-
error!(?err, "failed to send message");
248-
}
246+
sender.send(recipients, msg, self.priority);
249247
}
250248

251249
/// Handles a `subscribe` request from the application.

broadcast/src/buffered/ingress.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,12 +77,13 @@ impl<P: PublicKey, M: Digestible> Overflow<Message<P, M>> for Pending<P, M> {
7777
impl<P: PublicKey, M: Digestible> Policy for Message<P, M> {
7878
type Overflow = Pending<P, M>;
7979

80-
fn handle(overflow: &mut Self::Overflow, message: Self) {
80+
fn handle(overflow: &mut Self::Overflow, message: Self) -> bool {
8181
if message.response_closed() {
82-
return;
82+
return true;
8383
}
8484

8585
overflow.0.push_back(message);
86+
true
8687
}
8788
}
8889

0 commit comments

Comments
 (0)