Skip to content

Commit 94801b1

Browse files
more cleanup
1 parent 766136a commit 94801b1

29 files changed

Lines changed: 319 additions & 272 deletions

File tree

broadcast/src/buffered/engine.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,10 @@ where
158158
network.0,
159159
network.1,
160160
);
161-
let peer_set_subscription = &mut self.peer_provider.subscribe().await;
161+
let Ok(mut peer_set_subscription) = self.peer_provider.subscribe().await else {
162+
debug!("peer set subscription failed");
163+
return;
164+
};
162165

163166
select_loop! {
164167
self.context,

collector/fuzz/fuzz_targets/collector.rs

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ impl Handler for FuzzHandler {
151151
type Request = FuzzRequest;
152152
type Response = FuzzResponse;
153153

154-
async fn process(
154+
fn process(
155155
&mut self,
156156
_origin: Self::PublicKey,
157157
request: Self::Request,
@@ -186,12 +186,7 @@ impl Monitor for FuzzMonitor {
186186
type PublicKey = PublicKey;
187187
type Response = FuzzResponse;
188188

189-
async fn collected(
190-
&mut self,
191-
_handler: Self::PublicKey,
192-
_response: Self::Response,
193-
_count: usize,
194-
) {
189+
fn collected(&mut self, _handler: Self::PublicKey, _response: Self::Response, _count: usize) {
195190
self.collected_count += 1;
196191
}
197192
}
@@ -379,9 +374,7 @@ fn fuzz(input: FuzzInput) {
379374

380375
if let Some(handler) = handlers.get_mut(&handler_idx) {
381376
let (tx, rx) = oneshot::channel();
382-
handler
383-
.process(peers[origin_idx].public_key(), request.clone(), tx)
384-
.await;
377+
handler.process(peers[origin_idx].public_key(), request.clone(), tx);
385378

386379
if should_respond {
387380
if let Ok(response) = rx.await {
@@ -400,9 +393,7 @@ fn fuzz(input: FuzzInput) {
400393
let handler_idx = (peer_idx as usize) % peers.len();
401394

402395
if let Some(monitor) = monitors.get_mut(&monitor_idx) {
403-
monitor
404-
.collected(peers[handler_idx].public_key(), response, count)
405-
.await;
396+
monitor.collected(peers[handler_idx].public_key(), response, count);
406397
}
407398
}
408399

collector/src/lib.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ commonware_macros::stability_scope!(ALPHA {
1515
use commonware_cryptography::{Committable, Digestible, PublicKey};
1616
use commonware_p2p::Recipients;
1717
use commonware_utils::channel::oneshot;
18-
use std::future::Future;
1918

2019
pub mod p2p;
2120

@@ -53,15 +52,17 @@ commonware_macros::stability_scope!(ALPHA {
5352
+ Digestible<Digest = <Self::Request as Digestible>::Digest>
5453
+ Codec;
5554

56-
/// Processes a `request` from an [Originator] and (optionally) send a response.
55+
/// Processes a `request` from an [Originator] and (optionally) sends a response.
5756
///
58-
/// If no response is needed, the `responder` should be dropped.
57+
/// Implementations should return promptly. If processing requires async work,
58+
/// enqueue it and use `response` to send the result later. If no response is
59+
/// needed, the `response` should be dropped.
5960
fn process(
6061
&mut self,
6162
origin: Self::PublicKey,
6263
request: Self::Request,
6364
response: oneshot::Sender<Self::Response>,
64-
) -> impl Future<Output = ()> + Send;
65+
);
6566
}
6667

6768
/// A [Monitor] collects responses from [Handler]s.
@@ -75,12 +76,15 @@ commonware_macros::stability_scope!(ALPHA {
7576
/// Called for each response collected with the number of responses collected so far for
7677
/// the same commitment.
7778
///
79+
/// Implementations should return promptly. If observing a collection event
80+
/// requires async work, enqueue it before returning.
81+
///
7882
/// [Monitor::collected] is only called once per `handler`.
7983
fn collected(
8084
&mut self,
8185
handler: Self::PublicKey,
8286
response: Self::Response,
8387
count: usize,
84-
) -> impl Future<Output = ()> + Send;
88+
);
8589
}
8690
});

collector/src/p2p/engine.rs

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,6 @@ where
125125

126126
// Create futures pools
127127
let mut processed: Pool<Result<(P, Rs), oneshot::error::RecvError>> = Pool::default();
128-
let mut monitored: Pool<()> = Pool::default();
129128
select_loop! {
130129
self.context,
131130
on_stopped => {
@@ -162,7 +161,6 @@ where
162161
// Send the response
163162
let _ = res_tx.send(Recipients::One(peer), reply, self.priority_response);
164163
},
165-
_ = monitored.next_completed() => {},
166164

167165
// Request from an originator
168166
message = req_rx.recv() => {
@@ -186,9 +184,8 @@ where
186184

187185
// Handle the request
188186
let (tx, rx) = oneshot::channel();
189-
let mut handler = self.handler.clone();
187+
self.handler.process(peer.clone(), msg, tx);
190188
processed.push(async move {
191-
handler.process(peer.clone(), msg, tx).await;
192189
Ok((peer, rx.await?))
193190
});
194191
},
@@ -227,11 +224,8 @@ where
227224
}
228225

229226
// Send the response to the monitor
230-
let mut monitor = self.monitor.clone();
231227
let count = responses.1.len();
232-
monitored.push(async move {
233-
monitor.collected(peer, msg, count).await;
234-
});
228+
self.monitor.collected(peer, msg, count);
235229
},
236230
}
237231
}

collector/src/p2p/mocks/handler.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ impl crate::Handler for Handler {
5858
type Request = Request;
5959
type Response = Response;
6060

61-
async fn process(
61+
fn process(
6262
&mut self,
6363
origin: Self::PublicKey,
6464
request: Self::Request,

collector/src/p2p/mocks/monitor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ impl crate::Monitor for Monitor {
3434
type PublicKey = PublicKey;
3535
type Response = Response;
3636

37-
async fn collected(
37+
fn collected(
3838
&mut self,
3939
handler: Self::PublicKey,
4040
response: Self::Response,

consensus/src/marshal/coding/shards/engine.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -439,7 +439,10 @@ where
439439
);
440440
// Keep the handle alive to prevent the background receiver from being aborted.
441441
let _receiver_handle = receiver_service.start();
442-
let mut peer_set_subscription = self.peer_provider.subscribe().await;
442+
let Ok(mut peer_set_subscription) = self.peer_provider.subscribe().await else {
443+
debug!("peer set subscription failed");
444+
return;
445+
};
443446

444447
select_loop! {
445448
self.context,

examples/reshare/src/dkg/actor.rs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -654,7 +654,10 @@ mod tests {
654654
use commonware_math::algebra::Random;
655655
use commonware_p2p::{utils::mocks::inert_channel, PeerSetSubscription, Provider};
656656
use commonware_runtime::{deterministic, Runner, Supervisor as _};
657-
use commonware_utils::{channel::mpsc, N3f1, NZUsize, TryCollect, NZU32};
657+
use commonware_utils::{
658+
channel::{mpsc, oneshot},
659+
N3f1, NZUsize, TryCollect, NZU32,
660+
};
658661
use core::marker::PhantomData;
659662
use std::collections::BTreeMap;
660663

@@ -670,13 +673,17 @@ mod tests {
670673
impl<P: PublicKey> Provider for NoopManager<P> {
671674
type PublicKey = P;
672675

673-
async fn peer_set(&mut self, _: u64) -> Option<TrackedPeers<Self::PublicKey>> {
674-
None
676+
fn peer_set(&mut self, _: u64) -> oneshot::Receiver<Option<TrackedPeers<Self::PublicKey>>> {
677+
let (sender, receiver) = oneshot::channel();
678+
let _ = sender.send(None);
679+
receiver
675680
}
676681

677-
async fn subscribe(&mut self) -> PeerSetSubscription<Self::PublicKey> {
682+
fn subscribe(&mut self) -> oneshot::Receiver<PeerSetSubscription<Self::PublicKey>> {
678683
let (_, rx) = mpsc::unbounded_channel();
679-
rx
684+
let (sender, receiver) = oneshot::channel();
685+
let _ = sender.send(rx);
686+
receiver
680687
}
681688
}
682689

p2p/src/authenticated/discovery/actors/dialer.rs

Lines changed: 29 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@ use commonware_runtime::{
1919
BufferPooler, Clock, ContextCell, Handle, Metrics, Network, Resolver, SinkOf, Spawner,
2020
StreamOf,
2121
};
22-
use commonware_utils::futures::Pool;
23-
use futures::future::{self, Either};
2422
use commonware_stream::encrypted::{dial, Config as StreamConfig};
23+
use commonware_utils::channel::oneshot;
24+
use futures::future::{self, Either};
2525
use rand::seq::SliceRandom;
2626
use rand_core::CryptoRngCore;
2727
use std::time::{Duration, SystemTime};
@@ -32,10 +32,24 @@ type SupervisorMailbox<E, C> =
3232
Mailbox<spawner::Message<SinkOf<E>, StreamOf<E>, <C as Signer>::PublicKey>>;
3333

3434
enum TrackerQuery<C: PublicKey> {
35+
Dialable(oneshot::Receiver<Dialable<C>>),
36+
Dial(oneshot::Receiver<Option<Reservation<C>>>),
37+
}
38+
39+
enum TrackerReply<C: PublicKey> {
3540
Dialable(Dialable<C>),
3641
Dial(Option<Reservation<C>>),
3742
}
3843

44+
impl<C: PublicKey> TrackerQuery<C> {
45+
async fn recv(&mut self) -> TrackerReply<C> {
46+
match self {
47+
Self::Dialable(receiver) => TrackerReply::Dialable(receiver.await.unwrap_or_default()),
48+
Self::Dial(receiver) => TrackerReply::Dial(receiver.await.ok().flatten()),
49+
}
50+
}
51+
}
52+
3953
/// Configuration for the dialer actor.
4054
pub struct Config<C: Signer> {
4155
/// Configuration for the stream.
@@ -173,42 +187,41 @@ impl<
173187
mut supervisor: SupervisorMailbox<E, C>,
174188
) {
175189
let mut dial_deadline = self.context.current();
176-
let mut queries = Pool::default();
177-
let mut query_pending = false;
190+
let mut query: Option<TrackerQuery<C::PublicKey>> = None;
178191
select_loop! {
179192
self.context,
180193
on_start => {
181-
let wait_for_deadline = if query_pending {
194+
let wait_for_deadline = if query.is_some() {
182195
Either::Left(future::pending())
183196
} else {
184197
Either::Right(self.context.sleep_until(dial_deadline))
185198
};
199+
let wait_for_query = match &mut query {
200+
Some(query) => Either::Left(query.recv()),
201+
None => Either::Right(future::pending()),
202+
};
186203
},
187204
on_stopped => {
188205
debug!("context shutdown, stopping dialer");
189206
},
190207
_ = wait_for_deadline => {
191208
let now = self.context.current();
192209
if self.queue.is_empty() {
193-
let tracker = tracker.clone();
194-
queries.push(async move { TrackerQuery::Dialable(tracker.dialable().await) });
195-
query_pending = true;
210+
query = Some(TrackerQuery::Dialable(tracker.dialable()));
196211
continue;
197212
}
198213

199214
if let Some(peer) = self.queue.pop() {
200-
let tracker = tracker.clone();
201-
queries.push(async move { TrackerQuery::Dial(tracker.dial(peer).await) });
202-
query_pending = true;
215+
query = Some(TrackerQuery::Dial(tracker.dial(peer)));
203216
} else {
204217
dial_deadline = self.empty_queue_deadline(now, None);
205218
}
206219
},
207-
query = queries.next_completed() => {
208-
query_pending = false;
220+
reply = wait_for_query => {
221+
query = None;
209222
let now = self.context.current();
210-
match query {
211-
TrackerQuery::Dialable(dialable) => {
223+
match reply {
224+
TrackerReply::Dialable(dialable) => {
212225
self.queue = dialable.peers;
213226
self.queue.shuffle(self.context.as_mut());
214227
dial_deadline = if self.queue.is_empty() {
@@ -217,7 +230,7 @@ impl<
217230
now
218231
};
219232
}
220-
TrackerQuery::Dial(result) => {
233+
TrackerReply::Dial(result) => {
221234
if let Some(reservation) = result {
222235
self.dial_peer(reservation, &mut supervisor);
223236
dial_deadline = now + self.dial_frequency;

p2p/src/authenticated/discovery/actors/listener.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,10 @@ impl<E: Spawner + BufferPooler + Clock + Network + CryptoRngCore + Metrics, C: S
9797
) {
9898
let (peer, send, recv) = match listen(
9999
context,
100-
|peer| tracker.acceptable(peer),
100+
|peer| {
101+
let accepted = tracker.acceptable(peer);
102+
async move { accepted.await.unwrap_or(false) }
103+
},
101104
stream_cfg,
102105
stream,
103106
sink,
@@ -113,7 +116,7 @@ impl<E: Spawner + BufferPooler + Clock + Network + CryptoRngCore + Metrics, C: S
113116
debug!(?peer, ?address, "completed handshake");
114117

115118
// Attempt to claim the connection
116-
let Some(reservation) = tracker.listen(peer.clone()).await else {
119+
let Some(reservation) = tracker.listen(peer.clone()).await.ok().flatten() else {
117120
debug!(?peer, ?address, "unable to reserve connection to peer");
118121
return;
119122
};

0 commit comments

Comments
 (0)