Skip to content

Commit 70568a1

Browse files
[broadcast/buffer] Final Tweaks (#837)
1 parent 78180d8 commit 70568a1

2 files changed

Lines changed: 30 additions & 17 deletions

File tree

broadcast/src/buffered/engine.rs

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,6 @@ pub struct Engine<
3232
D: Digest,
3333
Cfg: CodecCfg,
3434
M: Digestible<D> + Codec<Cfg>,
35-
NetS: Sender<PublicKey = P>,
36-
NetR: Receiver<PublicKey = P>,
3735
> {
3836
////////////////////////////////////////
3937
// Interfaces
@@ -87,8 +85,6 @@ pub struct Engine<
8785
////////////////////////////////////////
8886
/// Metrics
8987
metrics: metrics::Metrics,
90-
91-
_phantom: std::marker::PhantomData<(NetS, NetR)>,
9288
}
9389

9490
impl<
@@ -97,9 +93,7 @@ impl<
9793
D: Digest,
9894
Cfg: CodecCfg,
9995
M: Digestible<D> + Codec<Cfg>,
100-
NetS: Sender<PublicKey = P>,
101-
NetR: Receiver<PublicKey = P>,
102-
> Engine<E, P, D, Cfg, M, NetS, NetR>
96+
> Engine<E, P, D, Cfg, M>
10397
{
10498
/// Creates a new engine with the given context and configuration.
10599
/// Returns the engine and a mailbox for sending messages to the engine.
@@ -120,20 +114,21 @@ impl<
120114
items: HashMap::new(),
121115
counts: HashMap::new(),
122116
metrics,
123-
124-
_phantom: std::marker::PhantomData,
125117
};
126118

127119
(result, mailbox)
128120
}
129121

130122
/// Starts the engine with the given network.
131-
pub fn start(mut self, network: (NetS, NetR)) -> Handle<()> {
123+
pub fn start(
124+
mut self,
125+
network: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>),
126+
) -> Handle<()> {
132127
self.context.spawn_ref()(self.run(network))
133128
}
134129

135130
/// Inner run loop called by `start`.
136-
async fn run(mut self, network: (NetS, NetR)) {
131+
async fn run(mut self, network: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>)) {
137132
let (mut sender, mut receiver) = wrap(self.codec_config.clone(), network.0, network.1);
138133
let mut shutdown = self.context.stopped();
139134

@@ -204,9 +199,9 @@ impl<
204199
////////////////////////////////////////
205200

206201
/// Handles a `broadcast` request from the application.
207-
async fn handle_broadcast(
202+
async fn handle_broadcast<Sr: Sender<PublicKey = P>>(
208203
&mut self,
209-
sender: &mut WrappedSender<NetS, Cfg, M>,
204+
sender: &mut WrappedSender<Sr, Cfg, M>,
210205
msg: M,
211206
responder: oneshot::Sender<Vec<P>>,
212207
) {

broadcast/src/buffered/mod.rs

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -121,10 +121,7 @@ mod tests {
121121
codec_config: (),
122122
};
123123
let (engine, engine_mailbox) =
124-
Engine::<_, PublicKey, Sha256Digest, _, TestMessage, _, _>::new(
125-
context.clone(),
126-
config,
127-
);
124+
Engine::<_, PublicKey, Sha256Digest, _, TestMessage>::new(context.clone(), config);
128125
mailboxes.insert(peer.clone(), engine_mailbox);
129126
engine.start(network);
130127
}
@@ -156,6 +153,27 @@ mod tests {
156153
assert_eq!(received_message.unwrap(), message);
157154
}
158155
assert_eq!(result.await.unwrap().len(), peers.len() - 1);
156+
157+
// Drop broadcast result
158+
let message = TestMessage::new(b"hello world again");
159+
let result = first_mailbox.broadcast(message.clone()).await;
160+
drop(result);
161+
162+
// Allow time for propagation
163+
context.sleep(Duration::from_secs(1)).await;
164+
165+
// Check that all peers received the new message
166+
let mut found = 0;
167+
for peer in peers.iter() {
168+
let mut mailbox = mailboxes.get(peer).unwrap().clone();
169+
let digest = message.digest();
170+
let receiver = mailbox.get(digest).await;
171+
if let Some(msg) = receiver {
172+
assert_eq!(msg, message);
173+
found += 1;
174+
}
175+
}
176+
assert!(found > 0, "No peers received the message");
159177
});
160178
}
161179

0 commit comments

Comments
 (0)