-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(rz): emit NewExternalAddrOfPeer
when discovering peers
#5138
base: master
Are you sure you want to change the base?
Conversation
Yeah, I think we should check that by subscribing to the relevant events and only emitting an event if we find a new peer. That way, event traffic between the behaviours is reduced. |
What's the state of this PR? Is it ready for review? |
Should be ready now :) |
.map(|address| ToSwarm::NewExternalAddrOfPeer { | ||
peer_id, | ||
address: address.clone(), | ||
}) | ||
.collect::<Vec<_>>() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.map(|address| ToSwarm::NewExternalAddrOfPeer { | |
peer_id, | |
address: address.clone(), | |
}) | |
.collect::<Vec<_>>() | |
.map(|address| ToSwarm::NewExternalAddrOfPeer { | |
peer_id, | |
address: address.clone(), | |
}) |
I think we don't need to collect here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I remember correctly, without collecting on the iterator, it would give a borrow error with peer_id
being used like it does. Could probably do something like the following to make it cleaner.
diff --git a/protocols/rendezvous/src/client.rs b/protocols/rendezvous/src/client.rs
index 086491bd2..76bb2e338 100644
--- a/protocols/rendezvous/src/client.rs
+++ b/protocols/rendezvous/src/client.rs
@@ -413,18 +413,18 @@ impl Behaviour {
.record
.addresses()
.iter()
- .filter(|addr| {
+ .scan(peer_id, |peer_id, e| Some((*peer_id, e.clone())))
+ .filter(|(peer_id, addr)| {
!self.discovered_peers.iter().any(
|((discovered_peer_id, _), addrs)| {
- *discovered_peer_id == peer_id && addrs.contains(addr)
+ discovered_peer_id == peer_id && addrs.contains(addr)
},
)
})
- .map(|address| ToSwarm::NewExternalAddrOfPeer {
+ .map(|(peer_id, address)| ToSwarm::NewExternalAddrOfPeer {
peer_id,
- address: address.clone(),
+ address,
})
- .collect::<Vec<_>>()
}));
self.discovered_peers
Thoughts?
!self.discovered_peers.iter().any( | ||
|((discovered_peer_id, _), addrs)| { | ||
*discovered_peer_id == peer_id && addrs.contains(addr) | ||
}, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes me think whether it would make sense to make discovered_peers
a Hashmap<PeerId, Hashmap<Namespace, Vec<Multiaddr>>>
rather than Hasmap<(PeerId, Namespace), Vec<Multiaddr>>
.
That way we could here and in handle_pending_outbound_connection
directly get the addresses for a peer rather than having to iterate the whole hashmap.
We could then (or even without that change) add a method Behavior::discovered_addresses(peer_Id: PeerId) -> impl Iterator<Item=&Multiaddr>
that is used here and in handle_pending_outbound_connection
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something like the following (minus the change in the registration iteration)?
diff --git a/protocols/rendezvous/src/client.rs b/protocols/rendezvous/src/client.rs
index 086491bd2..117de8e2e 100644
--- a/protocols/rendezvous/src/client.rs
+++ b/protocols/rendezvous/src/client.rs
@@ -19,7 +19,7 @@
// DEALINGS IN THE SOFTWARE.
use std::{
- collections::{HashMap, VecDeque},
+ collections::{hash_map::Entry, HashMap, VecDeque},
iter,
task::{Context, Poll},
time::Duration,
@@ -55,7 +55,7 @@ pub struct Behaviour {
///
/// Storing these internally allows us to assist the [`libp2p_swarm::Swarm`] in dialing by
/// returning addresses from [`NetworkBehaviour::handle_pending_outbound_connection`].
- discovered_peers: HashMap<(PeerId, Namespace), Vec<Multiaddr>>,
+ discovered_peers: HashMap<PeerId, HashMap<Namespace, Vec<Multiaddr>>>,
registered_namespaces: HashMap<(PeerId, Namespace), Ttl>,
@@ -310,13 +310,15 @@ impl NetworkBehaviour for Behaviour {
Poll::Pending => {}
}
- if let Poll::Ready(Some(expired_registration)) =
- self.expiring_registrations.poll_next_unpin(cx)
- {
- self.discovered_peers.remove(&expired_registration);
- return Poll::Ready(ToSwarm::GenerateEvent(Event::Expired {
- peer: expired_registration.0,
- }));
+ if let Poll::Ready(Some((peer, ns))) = self.expiring_registrations.poll_next_unpin(cx) {
+ if let Entry::Occupied(mut entry) = self.discovered_peers.entry(peer) {
+ let list = entry.get_mut();
+ list.remove(&ns);
+ if list.is_empty() {
+ entry.remove();
+ }
+ }
+ return Poll::Ready(ToSwarm::GenerateEvent(Event::Expired { peer }));
}
return Poll::Pending;
@@ -337,11 +339,9 @@ impl NetworkBehaviour for Behaviour {
let addresses = self
.discovered_peers
- .iter()
- .filter_map(|((candidate, _), addresses)| (candidate == &peer).then_some(addresses))
- .flatten()
- .cloned()
- .collect();
+ .get(&peer)
+ .map(|discovered| discovered.values().flatten().cloned().collect())
+ .unwrap_or_default();
Ok(addresses)
}
@@ -406,26 +406,30 @@ impl Behaviour {
DiscoverResponse(Ok((registrations, cookie))) => {
if let Some((rendezvous_node, _ns)) = self.waiting_for_discovery.remove(request_id)
{
- self.events
- .extend(registrations.iter().flat_map(|registration| {
- let peer_id = registration.record.peer_id();
- registration
- .record
- .addresses()
- .iter()
- .filter(|addr| {
- !self.discovered_peers.iter().any(
- |((discovered_peer_id, _), addrs)| {
- *discovered_peer_id == peer_id && addrs.contains(addr)
- },
- )
- })
- .map(|address| ToSwarm::NewExternalAddrOfPeer {
- peer_id,
- address: address.clone(),
- })
- .collect::<Vec<_>>()
- }));
+ self.events.extend(
+ registrations
+ .iter()
+ .map(|registration| {
+ let peer_id = registration.record.peer_id();
+ (peer_id, registration)
+ })
+ .flat_map(|(peer_id, registration)| {
+ registration
+ .record
+ .addresses()
+ .iter()
+ .scan(peer_id, |peer_id, e| Some((*peer_id, e)))
+ .filter(|(peer_id, addr)| {
+ !self.discovered_peers.get(peer_id).is_some_and(|list| {
+ list.values().any(|addrs| addrs.contains(addr))
+ })
+ })
+ .map(|(peer_id, address)| ToSwarm::NewExternalAddrOfPeer {
+ peer_id,
+ address: address.clone(),
+ })
+ }),
+ );
self.discovered_peers
.extend(registrations.iter().map(|registration| {
@@ -434,7 +438,7 @@ impl Behaviour {
let addresses = registration.record.addresses().to_vec();
- ((peer_id, namespace), addresses)
+ (peer_id, HashMap::from_iter([(namespace, addresses)]))
}));
self.expiring_registrations
Not against including it in this PR (since it doesnt introduce a breaking change and does allow us to skip iterating over the whole map) but we can also have it in a separate PR too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah I'd separate it to a new PR with the method suggest by Elena
Description
ToSwarm::NewExternalAddrOfPeer
#5105Notes & open questions
Change checklist