Skip to content

Commit c654d6d

Browse files
[collector] Minimal Panic Fix (#1560)
1 parent b295d5b commit c654d6d

11 files changed

Lines changed: 185 additions & 25 deletions

File tree

Cargo.lock

Lines changed: 3 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ commonware-runtime = { version = "0.0.60", path = "runtime" }
4848
commonware-storage = { version = "0.0.60", path = "storage" }
4949
commonware-stream = { version = "0.0.60", path = "stream" }
5050
commonware-utils = { version = "0.0.60", path = "utils", default-features = false }
51+
anyhow = { version = "1.0.99", default-features = false }
5152
thiserror = { version = "2.0.12", default-features = false }
5253
bytes = { version = "1.7.1", default-features = false }
5354
sha2 = { version = "0.10.8", default-features = false }

collector/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ tracing = { workspace = true }
2525
governor = { workspace = true }
2626
prometheus-client = { workspace = true }
2727
thiserror = { workspace = true }
28+
anyhow = { workspace = true }
2829

2930
[dev-dependencies]
3031
tracing-subscriber = { workspace = true }

collector/src/lib.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,19 @@ use commonware_cryptography::{Committable, Digestible, PublicKey};
1515
use commonware_p2p::Recipients;
1616
use futures::channel::oneshot;
1717
use std::future::Future;
18+
use thiserror::Error;
1819

1920
pub mod p2p;
2021

22+
/// Errors that can occur when interacting with a [Originator].
23+
#[derive(Error, Debug)]
24+
pub enum Error {
25+
#[error("send failed: {0}")]
26+
SendFailed(anyhow::Error),
27+
#[error("canceled")]
28+
Canceled,
29+
}
30+
2131
/// An [Originator] sends requests out to a set of [Handler]s and collects replies.
2232
pub trait Originator: Clone + Send + 'static {
2333
/// The [PublicKey] of a recipient.
@@ -32,7 +42,7 @@ pub trait Originator: Clone + Send + 'static {
3242
&mut self,
3343
recipients: Recipients<Self::PublicKey>,
3444
request: Self::Request,
35-
) -> impl Future<Output = Vec<Self::PublicKey>> + Send;
45+
) -> impl Future<Output = Result<Vec<Self::PublicKey>, Error>> + Send;
3646

3747
/// Cancel a request by `commitment`, ignoring any future responses.
3848
///

collector/src/p2p/engine.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@ use super::{
22
ingress::{Mailbox, Message},
33
Config,
44
};
5-
use crate::p2p::{Handler, Monitor};
5+
use crate::{
6+
p2p::{Handler, Monitor},
7+
Error,
8+
};
69
use commonware_codec::Codec;
710
use commonware_cryptography::{Committable, Digestible, PublicKey};
811
use commonware_macros::select;
@@ -143,13 +146,12 @@ where
143146
self.priority_request
144147
).await {
145148
Ok(recipients) => {
146-
for peer in &recipients {
147-
entry.0.insert(peer.clone());
148-
}
149-
let _ = responder.send(recipients);
149+
entry.0.extend(recipients.iter().cloned());
150+
let _ = responder.send(Ok(recipients));
150151
}
151152
Err(err) => {
152153
error!(?err, ?commitment, "failed to send message");
154+
let _ = responder.send(Err(Error::SendFailed(err.into())));
153155
}
154156
}
155157
},

collector/src/p2p/ingress.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::Originator;
1+
use crate::{Error, Originator};
22
use commonware_codec::Codec;
33
use commonware_cryptography::{Committable, Digestible, PublicKey};
44
use commonware_p2p::Recipients;
@@ -12,7 +12,7 @@ pub enum Message<P: PublicKey, R: Committable + Digestible + Codec> {
1212
Send {
1313
request: R,
1414
recipients: Recipients<P>,
15-
responder: oneshot::Sender<Vec<P>>,
15+
responder: oneshot::Sender<Result<Vec<P>, Error>>,
1616
},
1717
Cancel {
1818
commitment: R::Commitment,
@@ -36,7 +36,7 @@ impl<P: PublicKey, R: Committable + Digestible + Codec> Originator for Mailbox<P
3636
type Request = R;
3737
type PublicKey = P;
3838

39-
async fn send(&mut self, recipients: Recipients<P>, request: R) -> Vec<P> {
39+
async fn send(&mut self, recipients: Recipients<P>, request: R) -> Result<Vec<P>, Error> {
4040
let (tx, rx) = oneshot::channel();
4141
let _ = self
4242
.sender
@@ -46,7 +46,7 @@ impl<P: PublicKey, R: Committable + Digestible + Codec> Originator for Mailbox<P
4646
responder: tx,
4747
})
4848
.await;
49-
rx.await.unwrap()
49+
rx.await.map_err(|_| Error::Canceled)?
5050
}
5151

5252
async fn cancel(&mut self, commitment: R::Commitment) {

collector/src/p2p/mocks/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,5 @@
22
33
pub mod handler;
44
pub mod monitor;
5+
pub mod sender;
56
pub mod types;

collector/src/p2p/mocks/sender.rs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
//! Mock sender implementations for testing.
2+
3+
use bytes::Bytes;
4+
use commonware_cryptography::PublicKey;
5+
use commonware_p2p::{Recipients, Sender};
6+
use thiserror::Error;
7+
8+
/// Errors that can be returned by [Failing].
9+
#[derive(Debug, Error)]
10+
pub enum Error {
11+
#[error("send failed")]
12+
Failed,
13+
}
14+
15+
/// A sender that always fails with [Error::Canceled].
16+
#[derive(Clone, Debug)]
17+
pub struct Failing<P: PublicKey> {
18+
_phantom: std::marker::PhantomData<P>,
19+
}
20+
21+
impl<P: PublicKey> Failing<P> {
22+
/// Creates a new failing sender.
23+
pub fn new() -> Self {
24+
Self {
25+
_phantom: std::marker::PhantomData,
26+
}
27+
}
28+
}
29+
30+
impl<P: PublicKey> Sender for Failing<P> {
31+
type PublicKey = P;
32+
type Error = Error;
33+
34+
async fn send(
35+
&mut self,
36+
_recipients: Recipients<P>,
37+
_message: Bytes,
38+
_priority: bool,
39+
) -> Result<Vec<P>, Self::Error> {
40+
Err(Error::Failed)
41+
}
42+
}

collector/src/p2p/mod.rs

Lines changed: 109 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ mod tests {
4949
},
5050
Config, Engine, Mailbox,
5151
};
52-
use crate::{Handler, Monitor, Originator};
52+
use crate::{Error, Handler, Monitor, Originator};
5353
use commonware_codec::Encode;
5454
use commonware_cryptography::{
5555
ed25519::{PrivateKey, PublicKey},
@@ -208,7 +208,8 @@ mod tests {
208208
let request = Request { id: 1, data: 1 };
209209
let recipients = mailbox1
210210
.send(Recipients::One(peers[1].clone()), request.clone())
211-
.await;
211+
.await
212+
.expect("send failed");
212213
assert_eq!(recipients, vec![peers[1].clone()]);
213214

214215
// Verify peer 2 received the request
@@ -275,7 +276,8 @@ mod tests {
275276
let commitment = request.commitment();
276277
let recipients = mailbox
277278
.send(Recipients::One(peers[1].clone()), request.clone())
278-
.await;
279+
.await
280+
.expect("send failed");
279281
assert_eq!(recipients, vec![peers[1].clone()]);
280282

281283
// Cancel immediately
@@ -356,7 +358,10 @@ mod tests {
356358

357359
// Broadcast request
358360
let request = Request { id: 3, data: 3 };
359-
let recipients = mailbox1.send(Recipients::All, request.clone()).await;
361+
let recipients = mailbox1
362+
.send(Recipients::All, request.clone())
363+
.await
364+
.expect("send failed");
360365
assert_eq!(recipients.len(), 2);
361366
assert!(recipients.contains(&peers[1]));
362367
assert!(recipients.contains(&peers[2]));
@@ -434,7 +439,8 @@ mod tests {
434439
for _ in 0..3 {
435440
let recipients = mailbox1
436441
.send(Recipients::One(peers[1].clone()), request.clone())
437-
.await;
442+
.await
443+
.expect("send failed");
438444
assert_eq!(recipients, vec![peers[1].clone()]);
439445
}
440446

@@ -507,10 +513,12 @@ mod tests {
507513
let request2 = Request { id: 20, data: 20 };
508514
mailbox1
509515
.send(Recipients::One(peers[1].clone()), request1)
510-
.await;
516+
.await
517+
.expect("send failed");
511518
mailbox1
512519
.send(Recipients::One(peers[1].clone()), request2)
513-
.await;
520+
.await
521+
.expect("send failed");
514522

515523
// Collect both responses
516524
let mut response10_received = false;
@@ -585,7 +593,8 @@ mod tests {
585593
let request = Request { id: 100, data: 100 };
586594
let recipients = mailbox1
587595
.send(Recipients::One(peers[1].clone()), request.clone())
588-
.await;
596+
.await
597+
.expect("send failed");
589598
assert_eq!(recipients, vec![peers[1].clone()]);
590599

591600
// Verify handler received request but didn't respond
@@ -632,7 +641,10 @@ mod tests {
632641

633642
// Send request with empty recipients list
634643
let request = Request { id: 1, data: 1 };
635-
let recipients = mailbox.send(Recipients::All, request.clone()).await;
644+
let recipients = mailbox
645+
.send(Recipients::All, request.clone())
646+
.await
647+
.expect("send failed");
636648
assert_eq!(recipients, Vec::<PublicKey>::new());
637649

638650
// Verify no responses collected
@@ -647,6 +659,92 @@ mod tests {
647659
});
648660
}
649661

662+
#[test_traced]
663+
fn test_send_fails_with_network_error() {
664+
let executor = deterministic::Runner::timed(Duration::from_secs(10));
665+
executor.start(|context| async move {
666+
let (oracle, schemes, peers, connections) =
667+
setup_network_and_peers(&context, &[0, 1]).await;
668+
let mut schemes = schemes.into_iter();
669+
let mut connections = connections.into_iter();
670+
671+
// Setup peer 1 with a failing sender
672+
let scheme = schemes.next().unwrap();
673+
let conn = connections.next().unwrap();
674+
let (_, receiver1) = conn.0; // Request channel
675+
let sender1 = super::mocks::sender::Failing::<PublicKey>::new();
676+
let (sender2, receiver2) = conn.1; // Response channel
677+
let (engine, mut mailbox) = Engine::new(
678+
context.with_label(&format!("engine_{}", scheme.public_key())),
679+
Config {
680+
blocker: oracle.control(scheme.public_key()),
681+
monitor: MockMonitor::dummy(),
682+
handler: MockHandler::dummy(),
683+
mailbox_size: MAILBOX_SIZE,
684+
priority_request: false,
685+
request_codec: (),
686+
priority_response: false,
687+
response_codec: (),
688+
},
689+
);
690+
691+
// Start engine
692+
engine.start((sender1, receiver1), (sender2, receiver2));
693+
694+
// Send request
695+
let request = Request { id: 1, data: 1 };
696+
let err = mailbox
697+
.send(Recipients::One(peers[1].clone()), request.clone())
698+
.await
699+
.unwrap_err();
700+
assert!(matches!(err, Error::SendFailed(_)));
701+
});
702+
}
703+
704+
#[test_traced]
705+
fn test_send_fails_with_canceled() {
706+
let executor = deterministic::Runner::timed(Duration::from_secs(10));
707+
executor.start(|context| async move {
708+
let (oracle, schemes, peers, connections) =
709+
setup_network_and_peers(&context, &[0, 1]).await;
710+
let mut schemes = schemes.into_iter();
711+
let mut connections = connections.into_iter();
712+
713+
// Setup peer 1 with a failing sender
714+
let scheme = schemes.next().unwrap();
715+
let conn = connections.next().unwrap();
716+
let (sender1, receiver1) = conn.0; // Request channel
717+
let (sender2, receiver2) = conn.1; // Response channel
718+
let (engine, mut mailbox) = Engine::new(
719+
context.with_label(&format!("engine_{}", scheme.public_key())),
720+
Config {
721+
blocker: oracle.control(scheme.public_key()),
722+
monitor: MockMonitor::dummy(),
723+
handler: MockHandler::dummy(),
724+
mailbox_size: MAILBOX_SIZE,
725+
priority_request: false,
726+
request_codec: (),
727+
priority_response: false,
728+
response_codec: (),
729+
},
730+
);
731+
732+
// Start engine
733+
let handle = engine.start((sender1, receiver1), (sender2, receiver2));
734+
735+
// Stop the engine (which will result in all further requests being canceled)
736+
handle.abort();
737+
738+
// Send request (will return Error::Canceled instead of Error::SendFailed)
739+
let request = Request { id: 1, data: 1 };
740+
let err = mailbox
741+
.send(Recipients::One(peers[1].clone()), request.clone())
742+
.await
743+
.unwrap_err();
744+
assert!(matches!(err, Error::Canceled));
745+
});
746+
}
747+
650748
#[test_traced]
651749
fn test_response_from_unknown_peer() {
652750
let executor = deterministic::Runner::timed(Duration::from_secs(10));
@@ -701,7 +799,8 @@ mod tests {
701799
let request_to_peer2 = Request { id: 42, data: 42 };
702800
let recipients = mailbox1
703801
.send(Recipients::One(peers[1].clone()), request_to_peer2.clone())
704-
.await;
802+
.await
803+
.expect("send failed");
705804
assert_eq!(recipients, vec![peers[1].clone()]);
706805

707806
// Send a response from peer 3 to peer 1

p2p/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ pub enum Recipients<P: PublicKey> {
3838
/// Interface for sending messages to a set of recipients.
3939
pub trait Sender: Clone + Debug + Send + 'static {
4040
/// Error that can occur when sending a message.
41-
type Error: Debug + StdError + Send;
41+
type Error: Debug + StdError + Send + Sync;
4242

4343
/// Public key type used to identify recipients.
4444
type PublicKey: PublicKey;
@@ -55,7 +55,7 @@ pub trait Sender: Clone + Debug + Send + 'static {
5555
/// Interface for receiving messages from arbitrary recipients.
5656
pub trait Receiver: Debug + Send + 'static {
5757
/// Error that can occur when receiving a message.
58-
type Error: Debug + StdError + Send;
58+
type Error: Debug + StdError + Send + Sync;
5959

6060
/// Public key type used to identify recipients.
6161
type PublicKey: PublicKey;

0 commit comments

Comments
 (0)