Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ use multiaddr::{Multiaddr, Protocol};
use transport::Endpoint;
use types::ConnectionId;

use crate::transport::manager::DialFailureAddresses;
pub use bandwidth::BandwidthSink;
pub use error::Error;
pub use peer_id::PeerId;
Expand Down Expand Up @@ -198,6 +199,7 @@ impl Litep2p {
config.fallback_names.clone(),
config.codec,
litep2p_config.keep_alive_timeout,
DialFailureAddresses::NotRequired,
);
let executor = Arc::clone(&litep2p_config.executor);
litep2p_config.executor.run(Box::pin(async move {
Expand All @@ -218,6 +220,7 @@ impl Litep2p {
config.fallback_names.clone(),
config.codec,
litep2p_config.keep_alive_timeout,
DialFailureAddresses::NotRequired,
);
litep2p_config.executor.run(Box::pin(async move {
RequestResponseProtocol::new(service, config).run().await
Expand All @@ -233,6 +236,7 @@ impl Litep2p {
Vec::new(),
protocol.codec(),
litep2p_config.keep_alive_timeout,
DialFailureAddresses::NotRequired,
);
litep2p_config.executor.run(Box::pin(async move {
let _ = protocol.run(service).await;
Expand All @@ -252,6 +256,7 @@ impl Litep2p {
Vec::new(),
ping_config.codec,
litep2p_config.keep_alive_timeout,
DialFailureAddresses::NotRequired,
);
litep2p_config.executor.run(Box::pin(async move {
Ping::new(service, ping_config).run().await
Expand All @@ -275,6 +280,7 @@ impl Litep2p {
fallback_names,
kademlia_config.codec,
litep2p_config.keep_alive_timeout,
DialFailureAddresses::Required,
);
litep2p_config.executor.run(Box::pin(async move {
let _ = Kademlia::new(service, kademlia_config).run().await;
Expand All @@ -296,6 +302,7 @@ impl Litep2p {
Vec::new(),
identify_config.codec,
litep2p_config.keep_alive_timeout,
DialFailureAddresses::NotRequired,
);
identify_config.public = Some(litep2p_config.keypair.public().into());

Expand All @@ -316,6 +323,7 @@ impl Litep2p {
Vec::new(),
bitswap_config.codec,
litep2p_config.keep_alive_timeout,
DialFailureAddresses::NotRequired,
);
litep2p_config.executor.run(Box::pin(async move {
Bitswap::new(service, bitswap_config).run().await
Expand Down
52 changes: 35 additions & 17 deletions src/transport/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,19 @@ pub(crate) mod handle;
/// Logging target for the file.
const LOG_TARGET: &str = "litep2p::transport-manager";

/// Determines if a protocol requires the list of failed addresses upon a dial failure.
///
/// This is used during protocol registration with the `TransportManager` to specify
/// whether `InnerTransportEvent::DialFailure` events sent to this protocol should
/// include the specific multiaddresses that failed.
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub enum DialFailureAddresses {
/// The protocol needs the list of failed addresses.
Required,
/// The protocol does not need the list of failed addresses.
NotRequired,
}

/// The connection established result.
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
enum ConnectionEstablishedResult {
Expand Down Expand Up @@ -106,6 +119,9 @@ pub struct ProtocolContext {

/// Fallback names for the protocol.
pub fallback_names: Vec<ProtocolName>,

/// Specifies if the protocol requires dial failure addresses.
pub dial_failure_mode: DialFailureAddresses,
}

impl ProtocolContext {
Expand All @@ -114,11 +130,20 @@ impl ProtocolContext {
codec: ProtocolCodec,
tx: Sender<InnerTransportEvent>,
fallback_names: Vec<ProtocolName>,
dial_failure_mode: DialFailureAddresses,
) -> Self {
Self {
tx,
codec,
fallback_names,
dial_failure_mode,
}
}

fn dial_failure_addresses(&self, addresses: &[Multiaddr]) -> Vec<Multiaddr> {
match self.dial_failure_mode {
DialFailureAddresses::Required => addresses.to_vec(),
DialFailureAddresses::NotRequired => Vec::new(),
}
}
}
Expand Down Expand Up @@ -332,6 +357,7 @@ impl TransportManager {
fallback_names: Vec<ProtocolName>,
codec: ProtocolCodec,
keep_alive_timeout: Duration,
dial_failure_mode: DialFailureAddresses,
) -> TransportService {
assert!(!self.protocol_names.contains(&protocol));

Expand All @@ -352,7 +378,7 @@ impl TransportManager {

self.protocols.insert(
protocol.clone(),
ProtocolContext::new(codec, sender, fallback_names.clone()),
ProtocolContext::new(codec, sender, fallback_names.clone(), dial_failure_mode),
);
self.protocol_names.insert(protocol);
self.protocol_names.extend(fallback_names);
Expand Down Expand Up @@ -1116,10 +1142,10 @@ impl TransportManager {
?protocol,
"dial failure, notify protocol",
);
match context.tx.try_send(InnerTransportEvent::DialFailure {
peer,
addresses: vec![address.clone()],
}) {

let addresses = context.dial_failure_addresses(&[address.clone()]);

match context.tx.try_send(InnerTransportEvent::DialFailure { peer, addresses: addresses.clone() }) {
Ok(()) => {}
Err(_) => {
tracing::trace!(
Expand All @@ -1132,10 +1158,7 @@ impl TransportManager {
);
let _ = context
.tx
.send(InnerTransportEvent::DialFailure {
peer,
addresses: vec![address.clone()],
})
.send(InnerTransportEvent::DialFailure { peer, addresses })
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dharjeezy Can you also add to the doc of InnerTransportEvent & TransportEvent that addresses is only forwarded if the protocol was registered with DialFailureAddresses::Required, please?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is done now @dmitry-markin. can we merge it in now?

.await;
}
}
Expand Down Expand Up @@ -1266,12 +1289,10 @@ impl TransportManager {
.collect::<Vec<_>>();

for (protocol, context) in &self.protocols {
let addresses = context.dial_failure_addresses(&addresses);
let _ = match context
.tx
.try_send(InnerTransportEvent::DialFailure {
peer,
addresses: addresses.clone(),
}) {
.try_send(InnerTransportEvent::DialFailure { peer, addresses: addresses.clone() }) {
Ok(_) => Ok(()),
Err(_) => {
tracing::trace!(
Expand All @@ -1284,10 +1305,7 @@ impl TransportManager {

context
.tx
.send(InnerTransportEvent::DialFailure {
peer,
addresses: addresses.clone(),
})
.send(InnerTransportEvent::DialFailure { peer, addresses })
.await
}
};
Expand Down
Loading