-
Notifications
You must be signed in to change notification settings - Fork 62
test(gossipsub): Topic Membership Tests #1201
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 6 commits
871efab
dc7f8d4
5790b6f
eced002
1c2e221
2923a2d
fda0d2b
eb2f6bf
9c0966e
27c2850
25df50d
f0c8c5b
46b7125
19d3ead
f42a763
e10e4d0
89473da
806592d
2d38e8a
d594c04
a12b56c
cb7ccae
ff6b274
567a456
cc8e976
e68685c
44dd7d1
d80abe0
a7e80de
be818b9
a3c29ae
aa34c7f
df674d5
1bfdded
49ce782
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,6 +11,7 @@ build/ | |
*.exe | ||
*.dll | ||
.vscode/ | ||
.idea/ | ||
.DS_Store | ||
tests/pubsub/testgossipsub | ||
examples/*.md | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,176 @@ | ||
# Nim-LibP2P | ||
# Copyright (c) 2023-2024 Status Research & Development GmbH | ||
# Licensed under either of | ||
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) | ||
# * MIT license ([LICENSE-MIT](LICENSE-MIT)) | ||
# at your option. | ||
# This file may not be copied, modified, or distributed except according to | ||
# those terms. | ||
|
||
{.used.} | ||
|
||
import std/[options, deques, sequtils, enumerate, algorithm, sets] | ||
import stew/byteutils | ||
import ../../libp2p/builders | ||
import ../../libp2p/errors | ||
import ../../libp2p/crypto/crypto | ||
import ../../libp2p/stream/bufferstream | ||
import ../../libp2p/protocols/pubsub/[pubsub, gossipsub, mcache, mcache, peertable] | ||
import ../../libp2p/protocols/pubsub/rpc/[message, messages] | ||
import ../../libp2p/switch | ||
import ../../libp2p/muxers/muxer | ||
import ../../libp2p/protocols/pubsub/rpc/protobuf | ||
import utils | ||
import chronos | ||
|
||
import ../helpers | ||
|
||
proc noop(data: seq[byte]) {.async: (raises: [CancelledError, LPStreamError]).} = | ||
discard | ||
|
||
const MsgIdSuccess = "msg id gen success" | ||
|
||
suite "GossipSub Topic Membership Tests": | ||
teardown: | ||
checkTrackers() | ||
|
||
# Addition of Designed Test cases for 6. Topic Membership Tests: https://www.notion.so/Gossipsub-651e02d4d7894bb2ac1e4edb55f3192d | ||
|
||
# Generalized setup function to initialize one or more topics | ||
proc setupGossipSub( | ||
topics: seq[string], numPeers: int | ||
): (TestGossipSub, seq[Connection]) = | ||
let gossipSub = TestGossipSub.init(newStandardSwitch()) | ||
var conns = newSeq[Connection]() | ||
|
||
for topic in topics: | ||
gossipSub.mesh[topic] = initHashSet[PubSubPeer]() | ||
gossipSub.topicParams[topic] = TopicParams.init() | ||
gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]() | ||
|
||
for i in 0 ..< numPeers: | ||
let conn = TestBufferStream.new(noop) | ||
conns &= conn | ||
let peerId = randomPeerId() | ||
conn.peerId = peerId | ||
let peer = gossipSub.getPubSubPeer(peerId) | ||
peer.sendConn = conn | ||
gossipSub.gossipsub[topic].incl(peer) | ||
|
||
return (gossipSub, conns) | ||
|
||
# Wrapper function to initialize a single topic by converting it into a seq | ||
proc setupGossipSub(topic: string, numPeers: int): (TestGossipSub, seq[Connection]) = | ||
setupGossipSub(@[topic], numPeers) | ||
|
||
# Helper function to subscribe to topics | ||
proc subscribeToTopics(gossipSub: TestGossipSub, topics: seq[string]) = | ||
for topic in topics: | ||
gossipSub.PubSub.subscribe( | ||
strinnityk marked this conversation as resolved.
Show resolved
Hide resolved
|
||
topic, | ||
proc(topic: string, data: seq[byte]): Future[void] {.async.} = | ||
discard | ||
, | ||
) | ||
|
||
# Helper function to unsubscribe to topics | ||
proc unsubscribeFromTopics(gossipSub: TestGossipSub, topics: seq[string]) = | ||
for topic in topics: | ||
gossipSub.PubSub.unsubscribeAll(topic) | ||
strinnityk marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
# Simulate the `SUBSCRIBE` to the topic and check proper handling in the mesh and gossipsub structures | ||
asyncTest "handle SUBSCRIBE to the topic": | ||
strinnityk marked this conversation as resolved.
Show resolved
Hide resolved
strinnityk marked this conversation as resolved.
Show resolved
Hide resolved
|
||
let topic = "test-topic" | ||
let (gossipSub, conns) = setupGossipSub(topic, 5) | ||
|
||
# Subscribe to the topic | ||
subscribeToTopics(gossipSub, @[topic]) | ||
|
||
# Check if the topic is present in the list of subscribed topics | ||
check gossipSub.topics.contains(topic) | ||
|
||
strinnityk marked this conversation as resolved.
Show resolved
Hide resolved
|
||
# Check if the topic is added to gossipsub and the peers list is not empty | ||
check gossipSub.gossipsub[topic].len() > 0 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we move this line before There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what was exactly done? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Its after subscription and I am testing subscribed value
strinnityk marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
# Close all peer connections and verify that they are properly cleaned up | ||
await allFuturesThrowing(conns.mapIt(it.close())) | ||
diegomrsantos marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
# Stop the gossipSub switch and wait for it to stop completely | ||
await gossipSub.switch.stop() | ||
|
||
# Simulate an UNSUBSCRIBE to the topic and check if the topic is removed from the relevant data structures but remains in gossipsub | ||
asyncTest "handle UNSUBSCRIBE to the topic": | ||
let topic = "test-topic" | ||
let (gossipSub, conns) = setupGossipSub(topic, 5) | ||
|
||
# Subscribe to the topic first | ||
subscribeToTopics(gossipSub, @[topic]) | ||
|
||
# Now unsubscribe from the topic | ||
unsubscribeFromTopics(gossipSub, @[topic]) | ||
|
||
# Verify the topic is removed from relevant structures | ||
check topic notin gossipSub.topics | ||
strinnityk marked this conversation as resolved.
Show resolved
Hide resolved
|
||
check topic notin gossipSub.mesh | ||
check topic in gossipSub.gossipsub | ||
|
||
# The topic should remain in gossipsub (for fanout) | ||
|
||
await allFuturesThrowing(conns.mapIt(it.close())) | ||
await gossipSub.switch.stop() | ||
|
||
# Test subscribing and unsubscribing multiple topics | ||
asyncTest "handle SUBSCRIBE and UNSUBSCRIBE multiple topics": | ||
let topics = ["topic1", "topic2", "topic3"].toSeq() | ||
let (gossipSub, conns) = setupGossipSub(topics, 5) | ||
|
||
# Subscribe to multiple topics | ||
subscribeToTopics(gossipSub, topics) | ||
|
||
# Verify that all topics are added to the topics and gossipsub | ||
check gossipSub.topics.len == 3 | ||
for topic in topics: | ||
check gossipSub.gossipsub[topic].len() >= 0 | ||
strinnityk marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
# Unsubscribe from all topics | ||
unsubscribeFromTopics(gossipSub, topics) | ||
|
||
# Ensure topics are removed from topics and mesh, but still present in gossipsub | ||
for topic in topics: | ||
check topic notin gossipSub.topics | ||
check topic notin gossipSub.mesh | ||
check topic in gossipSub.gossipsub | ||
|
||
await allFuturesThrowing(conns.mapIt(it.close())) | ||
await gossipSub.switch.stop() | ||
|
||
# Test ensuring that the number of subscriptions does not exceed the limit set in the GossipSub parameters | ||
asyncTest "subscription limit test": | ||
let gossipSub = TestGossipSub.init(newStandardSwitch()) | ||
gossipSub.topicsHigh = 10 | ||
|
||
var conns = newSeq[Connection]() | ||
strinnityk marked this conversation as resolved.
Show resolved
Hide resolved
|
||
for i in 0 .. gossipSub.topicsHigh + 5: | ||
let topic = "topic" & $i | ||
# Ensure all topics are properly initialized before subscribing | ||
gossipSub.mesh[topic] = initHashSet[PubSubPeer]() | ||
gossipSub.topicParams[topic] = TopicParams.init() | ||
gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]() | ||
|
||
if gossipSub.topics.len < gossipSub.topicsHigh: | ||
strinnityk marked this conversation as resolved.
Show resolved
Hide resolved
strinnityk marked this conversation as resolved.
Show resolved
Hide resolved
strinnityk marked this conversation as resolved.
Show resolved
Hide resolved
|
||
gossipSub.PubSub.subscribe( | ||
topic, | ||
proc(topic: string, data: seq[byte]): Future[void] {.async.} = | ||
discard | ||
, | ||
) | ||
else: | ||
# Prevent subscription beyond the limit and log the error | ||
echo "Subscription limit reached for topic: ", topic | ||
strinnityk marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
# Ensure that the number of subscribed topics does not exceed the limit | ||
check gossipSub.topics.len <= gossipSub.topicsHigh | ||
check gossipSub.topics.len == gossipSub.topicsHigh | ||
strinnityk marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would check the Also, maybe it'd be interesting to connect the nodes and check for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
|
||
await allFuturesThrowing(conns.mapIt(it.close())) | ||
await gossipSub.switch.stop() |
Uh oh!
There was an error while loading. Please reload this page.