Skip to content

Commit 7ca3160

Browse files
authored
[broadcast] Add a result type to the broadcast (#804)
1 parent 1079326 commit 7ca3160

4 files changed

Lines changed: 76 additions & 32 deletions

File tree

broadcast/src/buffered/engine.rs

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ pub struct Engine<
6161
// Messaging
6262
////////////////////////////////////////
6363
/// The mailbox for receiving messages.
64-
mailbox_receiver: mpsc::Receiver<Message<D, M>>,
64+
mailbox_receiver: mpsc::Receiver<Message<P, D, M>>,
6565

6666
/// Pending requests from the application.
6767
waiters: HashMap<D, Vec<oneshot::Sender<M>>>,
@@ -103,9 +103,9 @@ impl<
103103
{
104104
/// Creates a new engine with the given context and configuration.
105105
/// Returns the engine and a mailbox for sending messages to the engine.
106-
pub fn new(context: E, cfg: Config<Cfg, P>) -> (Self, Mailbox<D, M>) {
106+
pub fn new(context: E, cfg: Config<Cfg, P>) -> (Self, Mailbox<P, D, M>) {
107107
let (mailbox_sender, mailbox_receiver) = mpsc::channel(cfg.mailbox_size);
108-
let mailbox = Mailbox::<D, M>::new(mailbox_sender);
108+
let mailbox = Mailbox::<P, D, M>::new(mailbox_sender);
109109
let metrics = metrics::Metrics::init(context.clone());
110110

111111
let result = Self {
@@ -154,9 +154,9 @@ impl<
154154
break;
155155
};
156156
match msg {
157-
Message::Broadcast{ message } => {
157+
Message::Broadcast{ message, responder } => {
158158
trace!("mailbox: broadcast");
159-
self.handle_broadcast(&mut net_sender, message).await;
159+
self.handle_broadcast(&mut net_sender, message, responder).await;
160160
}
161161
Message::Get{ digest, responder } => {
162162
trace!("mailbox: get");
@@ -199,16 +199,26 @@ impl<
199199
////////////////////////////////////////
200200

201201
/// Handles a `broadcast` request from the application.
202-
async fn handle_broadcast(&mut self, net_sender: &mut NetS, msg: M) {
202+
async fn handle_broadcast(
203+
&mut self,
204+
net_sender: &mut NetS,
205+
msg: M,
206+
responder: oneshot::Sender<Vec<P>>,
207+
) {
203208
// Store the message, continue even if it was already stored
204209
let _ = self.insert_message(self.public_key.clone(), msg.clone());
205210

206211
// Broadcast the message to the network
207212
let recipients = Recipients::All;
208213
let msg = Bytes::from(msg.encode());
209-
if let Err(err) = net_sender.send(recipients, msg, self.priority).await {
210-
warn!(?err, "failed to send message");
211-
}
214+
let sent_to = net_sender
215+
.send(recipients, msg, self.priority)
216+
.await
217+
.unwrap_or_else(|err| {
218+
error!(?err, "failed to send message");
219+
vec![]
220+
});
221+
let _ = responder.send(sent_to);
212222
}
213223

214224
/// Handles a `get` request from the application.

broadcast/src/buffered/ingress.rs

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,21 @@
11
use crate::Broadcaster;
22
use commonware_codec::{Codec, Config};
33
use commonware_cryptography::{Digest, Digestible};
4+
use commonware_utils::Array;
45
use futures::{
56
channel::{mpsc, oneshot},
67
SinkExt,
78
};
89

910
/// Message types that can be sent to the `Mailbox`
10-
pub enum Message<D, M> {
11+
pub enum Message<P: Array, D: Digest, M: Digestible<D>> {
1112
/// Broadcast a [`Message`](crate::Broadcaster::Message) to the network.
12-
Broadcast { message: M },
13+
///
14+
/// The responder will be sent a list of peers that received the message.
15+
Broadcast {
16+
message: M,
17+
responder: oneshot::Sender<Vec<P>>,
18+
},
1319

1420
/// Get a message by digest.
1521
///
@@ -23,17 +29,17 @@ pub enum Message<D, M> {
2329

2430
/// Ingress mailbox for [`Engine`](super::Engine).
2531
#[derive(Clone)]
26-
pub struct Mailbox<D: Digest, M: Digestible<D>> {
27-
sender: mpsc::Sender<Message<D, M>>,
32+
pub struct Mailbox<P: Array, D: Digest, M: Digestible<D>> {
33+
sender: mpsc::Sender<Message<P, D, M>>,
2834
}
2935

30-
impl<D: Digest, M: Digestible<D>> Mailbox<D, M> {
31-
pub(super) fn new(sender: mpsc::Sender<Message<D, M>>) -> Self {
36+
impl<P: Array, D: Digest, M: Digestible<D>> Mailbox<P, D, M> {
37+
pub(super) fn new(sender: mpsc::Sender<Message<P, D, M>>) -> Self {
3238
Self { sender }
3339
}
3440
}
3541

36-
impl<D: Digest, M: Digestible<D>> Mailbox<D, M> {
42+
impl<P: Array, D: Digest, M: Digestible<D>> Mailbox<P, D, M> {
3743
/// Get a message by digest.
3844
pub async fn get(&mut self, digest: D) -> oneshot::Receiver<M> {
3945
let (sender, receiver) = oneshot::channel();
@@ -48,13 +54,21 @@ impl<D: Digest, M: Digestible<D>> Mailbox<D, M> {
4854
}
4955
}
5056

51-
impl<Cfg: Config, D: Digest, M: Codec<Cfg> + Digestible<D>> Broadcaster<Cfg> for Mailbox<D, M> {
57+
impl<Cfg: Config, P: Array, D: Digest, M: Codec<Cfg> + Digestible<D>> Broadcaster<Cfg>
58+
for Mailbox<P, D, M>
59+
{
5260
type Message = M;
61+
type Response = Vec<P>;
5362

54-
async fn broadcast(&mut self, message: Self::Message) {
63+
async fn broadcast(&mut self, message: Self::Message) -> oneshot::Receiver<Vec<P>> {
64+
let (sender, receiver) = oneshot::channel();
5565
self.sender
56-
.send(Message::Broadcast { message })
66+
.send(Message::Broadcast {
67+
message,
68+
responder: sender,
69+
})
5770
.await
5871
.expect("mailbox closed");
72+
receiver
5973
}
6074
}

broadcast/src/buffered/mod.rs

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -108,8 +108,9 @@ mod tests {
108108
fn spawn_peer_engines(
109109
context: deterministic::Context,
110110
registrations: &mut Registrations,
111-
) -> BTreeMap<PublicKey, Mailbox<Sha256Digest, TestMessage>> {
112-
let mut mailboxes = BTreeMap::<PublicKey, Mailbox<Sha256Digest, TestMessage>>::new();
111+
) -> BTreeMap<PublicKey, Mailbox<PublicKey, Sha256Digest, TestMessage>> {
112+
let mut mailboxes =
113+
BTreeMap::<PublicKey, Mailbox<PublicKey, Sha256Digest, TestMessage>>::new();
113114
while let Some((peer, network)) = registrations.pop_first() {
114115
let context = context.with_label(&peer.to_string());
115116
let config = Config {
@@ -141,7 +142,7 @@ mod tests {
141142
// Send a single broadcast message from the first peer
142143
let message = TestMessage::new(b"hello world test message");
143144
let mut first_mailbox = mailboxes.get(peers.first().unwrap()).unwrap().clone();
144-
first_mailbox.broadcast(message.clone()).await;
145+
let result = first_mailbox.broadcast(message.clone()).await;
145146

146147
// Allow time for propagation
147148
context.sleep(Duration::from_secs(1)).await;
@@ -154,6 +155,7 @@ mod tests {
154155
let received_message = receiver.await.ok();
155156
assert_eq!(received_message.unwrap(), message);
156157
}
158+
assert_eq!(result.await.unwrap().len(), peers.len() - 1);
157159
});
158160
}
159161

@@ -177,7 +179,8 @@ mod tests {
177179
let receiver_before = mailbox_a.get(digest_m1).await;
178180

179181
// Broadcast the message
180-
mailbox_a.broadcast(m1.clone()).await;
182+
let result = mailbox_a.broadcast(m1.clone()).await;
183+
assert_eq!(result.await.unwrap().len(), peers.len() - 1);
181184

182185
// Wait for the pre-broadcast retrieval to complete
183186
let msg_before = receiver_before
@@ -219,7 +222,7 @@ mod tests {
219222
let digest = message.digest();
220223
for i in 0..100 {
221224
// Broadcast the message
222-
first_mailbox.broadcast(message.clone()).await;
225+
let result = first_mailbox.broadcast(message.clone()).await;
223226
context.sleep(NETWORK_SPEED_WITH_BUFFER).await;
224227

225228
// Check if all peers received the message
@@ -233,6 +236,7 @@ mod tests {
233236
};
234237
all_received &= has;
235238
}
239+
assert_eq!(result.await.unwrap().len(), peers.len() - 1);
236240

237241
// If all received, we're done
238242
if all_received {
@@ -255,7 +259,8 @@ mod tests {
255259
// Broadcast a message
256260
let message = TestMessage::new(b"cached message");
257261
let mut first_mailbox = mailboxes.get(peers.first().unwrap()).unwrap().clone();
258-
first_mailbox.broadcast(message.clone()).await;
262+
let result = first_mailbox.broadcast(message.clone()).await;
263+
assert_eq!(result.await.unwrap().len(), peers.len() - 1);
259264

260265
// Wait for propagation
261266
context.sleep(NETWORK_SPEED_WITH_BUFFER).await;
@@ -294,7 +299,8 @@ mod tests {
294299
drop(dummy2);
295300

296301
// Broadcast the message
297-
mailbox1.broadcast(message.clone()).await;
302+
let result = mailbox1.broadcast(message.clone()).await;
303+
assert_eq!(result.await.unwrap().len(), peers.len() - 1);
298304

299305
// Wait for propagation
300306
context.sleep(NETWORK_SPEED_WITH_BUFFER).await;
@@ -320,7 +326,8 @@ mod tests {
320326
messages.push(TestMessage::new(format!("message {}", i).as_bytes()));
321327
}
322328
for message in messages.iter() {
323-
mailbox.broadcast(message.clone()).await;
329+
let result = mailbox.broadcast(message.clone()).await;
330+
assert_eq!(result.await.unwrap().len(), peers.len() - 1);
324331
}
325332

326333
// Wait for propagation
@@ -359,11 +366,13 @@ mod tests {
359366
// Create and broadcast message M1 from A
360367
let m1 = TestMessage::new(b"message M1");
361368
let digest_m1 = m1.digest();
362-
mailbox_a.broadcast(m1.clone()).await;
369+
let result = mailbox_a.broadcast(m1.clone()).await;
370+
assert_eq!(result.await.unwrap().len(), peers.len() - 1);
363371
context.sleep(NETWORK_SPEED_WITH_BUFFER).await;
364372

365373
// Broadcast M1 from C
366-
mailbox_c.broadcast(m1.clone()).await;
374+
let result = mailbox_c.broadcast(m1.clone()).await;
375+
assert_eq!(result.await.unwrap().len(), peers.len() - 1);
367376
context.sleep(NETWORK_SPEED_WITH_BUFFER).await;
368377

369378
// M1 is now in A's and C's deques in B's engine
@@ -374,7 +383,8 @@ mod tests {
374383
new_messages_a.push(TestMessage::new(format!("A{}", i).as_bytes()));
375384
}
376385
for msg in &new_messages_a {
377-
mailbox_a.broadcast(msg.clone()).await;
386+
let result = mailbox_a.broadcast(msg.clone()).await;
387+
assert_eq!(result.await.unwrap().len(), peers.len() - 1);
378388
}
379389
context.sleep(NETWORK_SPEED_WITH_BUFFER).await;
380390

@@ -389,7 +399,8 @@ mod tests {
389399
new_messages_c.push(TestMessage::new(format!("C{}", i).as_bytes()));
390400
}
391401
for msg in &new_messages_c {
392-
mailbox_c.broadcast(msg.clone()).await;
402+
let result = mailbox_c.broadcast(msg.clone()).await;
403+
assert_eq!(result.await.unwrap().len(), peers.len() - 1);
393404
}
394405
context.sleep(NETWORK_SPEED_WITH_BUFFER).await;
395406

broadcast/src/lib.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
//! Disseminate data over a wide-area network.
22
33
use commonware_codec::{Codec, Config};
4+
use futures::channel::oneshot;
45
use std::future::Future;
56

67
pub mod buffered;
@@ -14,6 +15,14 @@ pub trait Broadcaster<Cfg: Config>: Clone + Send + 'static {
1415
/// - deserialized upon reception
1516
type Message: Codec<Cfg> + Clone + Send + 'static;
1617

18+
/// Response is the type of data that is returned once the message is broadcasted.
19+
///
20+
/// It may also indicate the success or failure of the broadcast attempt.
21+
type Response: Clone + Send + 'static;
22+
1723
/// Attempt to broadcast a message to the network.
18-
fn broadcast(&mut self, message: Self::Message) -> impl Future<Output = ()> + Send;
24+
fn broadcast(
25+
&mut self,
26+
message: Self::Message,
27+
) -> impl Future<Output = oneshot::Receiver<Self::Response>> + Send;
1928
}

0 commit comments

Comments
 (0)