Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion protocols/gossipsub/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
## 0.50.0

- Send all topic subscriptions in a single hello RPC when connecting to a new peer, aligning with the GossipSub spec and other implementations (Go, Nim, JS).
See [PR 6385](https://github.com/libp2p/rust-libp2p/pull/6385).
- Raise MSRV to 1.88.0.
See [PR 6273](https://github.com/libp2p/rust-libp2p/pull/6273).

Expand Down
48 changes: 27 additions & 21 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3203,28 +3203,34 @@ where
}

tracing::debug!(peer=%peer_id, "New peer connected");
// We need to send our subscriptions to the newly-connected node.
for topic_hash in self.mesh.clone().into_keys() {
#[cfg(not(feature = "partial_messages"))]
let (requests_partial, supports_partial) = (false, false);
#[cfg(feature = "partial_messages")]
let Some(SubscriptionOpts {
requests_partial,
supports_partial,
}) = self.partial_messages_extension.opts(&topic_hash)
else {
tracing::error!("Partial subscription options should exist for subscribed topic");
return;
};
// Send all our topic subscriptions to the newly-connected peer in a single hello RPC.
let topics: Vec<_> = self
.mesh
.keys()
.cloned()
.filter_map(|topic_hash| {
#[cfg(not(feature = "partial_messages"))]
let (requests_partial, supports_partial) = (false, false);
#[cfg(feature = "partial_messages")]
let (requests_partial, supports_partial) = {
let Some(SubscriptionOpts {
requests_partial,
supports_partial,
}) = self.partial_messages_extension.opts(&topic_hash)
else {
tracing::error!(
"Partial subscription options should exist for subscribed topic"
);
return None;
};
(requests_partial, supports_partial)
};
Some((topic_hash, requests_partial, supports_partial))
})
.collect();

self.send_message(
peer_id,
RpcOut::Subscribe {
topic: topic_hash.clone(),
requests_partial,
supports_partial,
},
);
if !topics.is_empty() {
self.send_message(peer_id, RpcOut::SubscribeMany(topics));
}
}

Expand Down
80 changes: 67 additions & 13 deletions protocols/gossipsub/src/behaviour/tests/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,17 @@ fn test_subscribe() {
"Subscribe should add a new entry to the mesh[topic] hashmap"
);

// collect all the subscriptions
// collect all the subscriptions (hello RPC on connection is now a single SubscribeMany)
let subscriptions = queues
.into_values()
.fold(0, |mut collected_subscriptions, mut queue| {
while !queue.is_empty() {
if let Some(RpcOut::Subscribe { .. }) = queue.try_pop() {
collected_subscriptions += 1
match queue.try_pop() {
Some(RpcOut::Subscribe { .. }) => collected_subscriptions += 1,
Some(RpcOut::SubscribeMany(topics)) => {
collected_subscriptions += topics.len()
}
_ => {}
}
}
collected_subscriptions
Expand Down Expand Up @@ -114,19 +118,23 @@ fn test_unsubscribe() {
"should be able to unsubscribe successfully from each topic",
);

// collect all the subscriptions
// collect all the subscriptions (hello RPC on connection is now a single SubscribeMany)
let subscriptions = queues
.into_values()
.fold(0, |mut collected_subscriptions, mut queue| {
while !queue.is_empty() {
if let Some(RpcOut::Subscribe { .. }) = queue.try_pop() {
collected_subscriptions += 1
match queue.try_pop() {
Some(RpcOut::Subscribe { .. }) => collected_subscriptions += 1,
Some(RpcOut::SubscribeMany(topics)) => {
collected_subscriptions += topics.len()
}
_ => {}
}
}
collected_subscriptions
});

// we sent a unsubscribe to all known peers, for two topics
// we sent subscriptions to all known peers for two topics (20 peers × 2 topics)
assert_eq!(subscriptions, 40);

// check we clean up internal structures
Expand Down Expand Up @@ -303,23 +311,22 @@ fn test_peer_added_on_connection() {
.to_subscribe(true)
.create_network();

// check that our subscriptions are sent to each of the peers
// collect all the SendEvents
// check that our subscriptions are sent to each of the peers as a single hello RPC
let subscriptions = queues.into_iter().fold(
HashMap::<libp2p_identity::PeerId, Vec<String>>::new(),
|mut collected_subscriptions, (peer, mut queue)| {
while !queue.is_empty() {
if let Some(RpcOut::Subscribe { topic, .. }) = queue.try_pop() {
let mut peer_subs = collected_subscriptions.remove(&peer).unwrap_or_default();
peer_subs.push(topic.into_string());
if let Some(RpcOut::SubscribeMany(topics)) = queue.try_pop() {
let peer_subs: Vec<String> =
topics.into_iter().map(|(t, _, _)| t.into_string()).collect();
collected_subscriptions.insert(peer, peer_subs);
}
}
collected_subscriptions
},
);

// check that there are two subscriptions sent to each peer
// check that there are two subscriptions sent to each peer in a single RPC
for peer_subs in subscriptions.values() {
assert!(peer_subs.contains(&String::from("topic1")));
assert!(peer_subs.contains(&String::from("topic2")));
Expand All @@ -339,6 +346,53 @@ fn test_peer_added_on_connection() {
}
}

/// Test that on new connection the hello RPC is a single batched message, not one per topic.
#[test]
fn test_hello_rpc_is_single_batched_message() {
let topic_names = vec![
String::from("alpha"),
String::from("beta"),
String::from("gamma"),
];
let (_, _, queues, topic_hashes) = DefaultBehaviourTestBuilder::default()
.peer_no(5)
.topics(topic_names)
.to_subscribe(true)
.create_network();

for (_, mut queue) in queues {
let mut subscribe_many_count = 0;
let mut individual_subscribe_count = 0;

while !queue.is_empty() {
match queue.try_pop() {
Some(RpcOut::SubscribeMany(topics)) => {
subscribe_many_count += 1;
// All topics must be present in the single hello packet.
let sent: Vec<_> = topics.into_iter().map(|(t, _, _)| t).collect();
for topic_hash in &topic_hashes {
assert!(
sent.contains(topic_hash),
"hello RPC must include all subscribed topics"
);
}
}
Some(RpcOut::Subscribe { .. }) => individual_subscribe_count += 1,
_ => {}
}
}

assert_eq!(
subscribe_many_count, 1,
"exactly one batched hello RPC should be sent per peer"
);
assert_eq!(
individual_subscribe_count, 0,
"no individual Subscribe RPCs should be sent on connection"
);
}
}

/// Test subscription handling
#[test]
fn test_handle_received_subscriptions() {
Expand Down
5 changes: 4 additions & 1 deletion protocols/gossipsub/src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,10 @@ impl Queue {
/// which will only happen for control and non priority messages.
pub(crate) fn try_push(&mut self, message: RpcOut) -> Result<(), Box<RpcOut>> {
match message {
RpcOut::Extensions(_) | RpcOut::Subscribe { .. } | RpcOut::Unsubscribe(_) => {
RpcOut::Extensions(_)
| RpcOut::Subscribe { .. }
| RpcOut::SubscribeMany(_)
| RpcOut::Unsubscribe(_) => {
self.priority
.try_push(message)
.expect("Shared is unbounded");
Expand Down
17 changes: 17 additions & 0 deletions protocols/gossipsub/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,8 @@ pub enum RpcOut {
requests_partial: bool,
supports_partial: bool,
},
/// Subscribe to multiple topics in a single RPC (hello packet on new connection).
SubscribeMany(Vec<(TopicHash, bool, bool)>),
/// Unsubscribe a topic.
Unsubscribe(TopicHash),
/// Send a GRAFT control message.
Expand Down Expand Up @@ -392,6 +394,7 @@ impl RpcOut {
matches!(
self,
RpcOut::Subscribe { .. }
| RpcOut::SubscribeMany(_)
| RpcOut::Unsubscribe(_)
| RpcOut::Graft(_)
| RpcOut::Prune(_)
Expand Down Expand Up @@ -431,6 +434,20 @@ impl From<RpcOut> for proto::RPC {
control: None,
partial: None,
},
RpcOut::SubscribeMany(topics) => proto::RPC {
publish: Vec::new(),
subscriptions: topics
.into_iter()
.map(|(topic, requests_partial, supports_partial)| proto::SubOpts {
subscribe: Some(true),
topic_id: Some(topic.into_string()),
requestsPartial: Some(requests_partial),
supportsPartial: Some(supports_partial),
})
.collect(),
control: None,
partial: None,
},
RpcOut::Unsubscribe(topic) => proto::RPC {
publish: Vec::new(),
subscriptions: vec![proto::SubOpts {
Expand Down
Loading