Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 23 additions & 15 deletions waku/v2/peermanager/peer_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/multiformats/go-multiaddr"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"

"github.com/waku-org/go-waku/tests"
"github.com/waku-org/go-waku/waku/v2/discv5"
"github.com/waku-org/go-waku/waku/v2/onlinechecker"
Expand Down Expand Up @@ -125,36 +126,38 @@ func TestPeerSelection(t *testing.T) {
defer h3.Close()

protocol := libp2pProtocol.ID("test/protocol")
_, err = pm.AddPeer([]multiaddr.Multiaddr{tests.GetAddr(h2)}, wps.Static, []string{"/waku/2/rs/2/1", "/waku/2/rs/2/2"}, libp2pProtocol.ID(protocol))
_, err = pm.AddPeer([]multiaddr.Multiaddr{tests.GetAddr(h2)}, wps.Static, []string{"/waku/2/rs/2/1", "/waku/2/rs/2/2"}, protocol)
require.NoError(t, err)

_, err = pm.AddPeer([]multiaddr.Multiaddr{tests.GetAddr(h3)}, wps.Static, []string{"/waku/2/rs/2/1"}, libp2pProtocol.ID(protocol))
_, err = pm.AddPeer([]multiaddr.Multiaddr{tests.GetAddr(h3)}, wps.Static, []string{"/waku/2/rs/2/1"}, protocol)
require.NoError(t, err)

_, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol})
require.NoError(t, err)
var peerIDs peer.IDSlice

peerIDs, err := pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/2"}})
peerIDs, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol})
require.NoError(t, err)
require.Equal(t, h2.ID(), peerIDs[0])
require.Len(t, peerIDs, 1) // Only 1 peer is returned randomly, because MaxPeers defaults to 1 when not set

_, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/3"}})
require.Error(t, utils.ErrNoPeersAvailable, err)

_, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/1"}})
peerIDs, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/2"}})
require.NoError(t, err)
require.Len(t, peerIDs, 1)
require.Equal(t, h2.ID(), peerIDs[0]) // Only h2 has this pubsub topic

//Test for selectWithLowestRTT
_, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: LowestRTT, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/1"}})
peerIDs, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/3"}})
require.Error(t, utils.ErrNoPeersAvailable, err) // No peer has this pubsub topic
require.Empty(t, peerIDs)

peerIDs, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/1"}})
require.NoError(t, err)
require.Len(t, peerIDs, 1) // Both h2 and h3 have this pubsub topic, but only 1 peer is returned randomly because MaxPeers defaults to 1 when not set

peerIDs, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/1"}, MaxPeers: 2})
require.Equal(t, 2, peerIDs.Len())
require.NoError(t, err)
require.Len(t, peerIDs, 2)

peerIDs, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/1"}, MaxPeers: 3})
require.Equal(t, 2, peerIDs.Len())
require.NoError(t, err)
require.Len(t, peerIDs, 2)

h4, err := tests.MakeHost(ctx, 0, rand.Reader)
require.NoError(t, err)
Expand All @@ -163,9 +166,14 @@ func TestPeerSelection(t *testing.T) {
require.NoError(t, err)

peerIDs, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/1"}, MaxPeers: 3})
require.Equal(t, 3, peerIDs.Len())
require.Len(t, peerIDs, 3)
require.NoError(t, err)

//Test for selectWithLowestRTT
// NOTE: This test must go the last because it involves pinging peers, which modifies the list of supported protocols
peerIDs, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: LowestRTT, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/1"}, MaxPeers: 2})
require.NoError(t, err)
require.Len(t, peerIDs, 1) // With LowestRTT, only 1 peer is returned, even if MaxPeers is set
}

func TestDefaultProtocol(t *testing.T) {
Expand Down
112 changes: 71 additions & 41 deletions waku/v2/peermanager/peer_selection.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@

"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"go.uber.org/zap"
"golang.org/x/exp/maps"

wps "github.com/waku-org/go-waku/waku/v2/peerstore"
waku_proto "github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
"golang.org/x/exp/maps"
)

type PeerSet map[peer.ID]struct{}
Expand Down Expand Up @@ -46,7 +48,7 @@
return peers[0], nil
}

// SelectRandomPeer is used to return a random peer that supports a given protocol.
// SelectRandom is used to return a random peer that supports a given protocol.
// If a list of specific peers is passed, the peer will be chosen from that list assuming
// it supports the chosen protocol, otherwise it will chose a peer from the service slot.
// If a peer cannot be found in the service slot, a peer will be selected from node peerstore
Expand All @@ -57,26 +59,40 @@
// This will require us to check for various factors such as:
// - which topics they track
// - latency?

peerIDs, err := pm.selectServicePeer(criteria)
if err == nil && len(peerIDs) == criteria.MaxPeers {
return maps.Keys(peerIDs), nil
} else if !errors.Is(err, utils.ErrNoPeersAvailable) {
pm.logger.Debug("could not retrieve random peer from slot", zap.String("protocol", string(criteria.Proto)),
zap.Strings("pubsubTopics", criteria.PubsubTopics), zap.Error(err))
}
if !errors.Is(err, utils.ErrNoPeersAvailable) {
pm.logger.Debug("could not retrieve random peer from slot",
zap.String("protocol", string(criteria.Proto)),
zap.Strings("pubsubTopics", criteria.PubsubTopics),
zap.Error(err))
return nil, err
} else if len(peerIDs) == 0 {
}
if len(peerIDs) == 0 {
peerIDs = make(PeerSet)
}
// if not found in serviceSlots or proto == WakuRelayIDv200
filteredPeers, err := pm.FilterPeersByProto(criteria.SpecificPeers, criteria.ExcludePeers, criteria.Proto)

// if not found in serviceSlots or proto == WakuRelayIDv200 (service slots don't support WakuRelayIDv200)
peerSet := criteria.SpecificPeers
if len(peerSet) == 0 {
peerSet = pm.host.Peerstore().Peers()
}
filteredPeers, err := pm.FilterPeersByProto(peerSet, criteria.ExcludePeers, criteria.Proto)
if err != nil {
return nil, err
}
if len(filteredPeers) == 0 && criteria.Proto != relay.WakuRelayID_v200 {
return nil, utils.ErrNoPeersAvailable
}
if len(criteria.PubsubTopics) > 0 {
filteredPeers = pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopics(criteria.PubsubTopics, filteredPeers...)
}

//Not passing excludePeers as filterPeers are already considering excluded ones.
randomPeers, err := selectRandomPeers(filteredPeers, nil, criteria.MaxPeers-len(peerIDs))
randomPeers, err := selectRandomPeers(filteredPeers, nil, min(criteria.MaxPeers, len(peerIDs)))
if err != nil && len(peerIDs) == 0 {
return nil, err
}
Expand All @@ -88,16 +104,14 @@
}

func getRandom(filter PeerSet, count int, excludePeers PeerSet) (PeerSet, error) {
i := 0
selectedPeers := make(PeerSet)
for pID := range filter {
if PeerInSet(excludePeers, pID) {
continue
}
//Map's iterator in golang works using randomness and hence not random function is being used.
selectedPeers[pID] = struct{}{}
i++
if i == count {
if len(selectedPeers) == count {
break
}
}
Expand All @@ -121,41 +135,57 @@
return peerSet
}

/*
selectServicePeer tries to select peer from serviceSlot first based on criteria.
serviceSlots is a map of peerMap by protocol.ID.
- Slots are created automatically in getPeers.
- Slots are not created for relay.WakuRelayID_v200.
Therefore for Relay protocol, selectServicePeer will always return ErrNoPeersAvailable.
If there is no pubsubTopics criteria, a random peer from the selected slot is returned.
Otherwise, peers from the slot are filtered by pubsubTopics and random peers are selected from the filtered list.
If no peer is found in the slot, on-demand discovery is triggered for the given pubsubTopics and protocol.
The function retries once to fetch peers from the slot after discovery.
*/
func (pm *PeerManager) selectServicePeer(criteria PeerSelectionCriteria) (PeerSet, error) {
peers := make(PeerSet)
var err error
for retryCnt := 0; retryCnt < 1; retryCnt++ {
//Try to fetch from serviceSlot
if slot := pm.serviceSlots.getPeers(criteria.Proto); slot != nil {
if len(criteria.PubsubTopics) == 0 || (len(criteria.PubsubTopics) == 1 && criteria.PubsubTopics[0] == "") {
return slot.getRandom(criteria.MaxPeers, criteria.ExcludePeers)
} else { //PubsubTopic based selection
slot.mu.RLock()
keys := make([]peer.ID, 0, len(slot.m))
for i := range slot.m {
if PeerInSet(criteria.ExcludePeers, i) {
continue
}
keys = append(keys, i)
}
slot.mu.RUnlock()
selectedPeers := pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopics(criteria.PubsubTopics, keys...)
tmpPeers, err := selectRandomPeers(selectedPeers, criteria.ExcludePeers, criteria.MaxPeers)
for tmpPeer := range tmpPeers {
peers[tmpPeer] = struct{}{}
}
if err == nil && len(peers) == criteria.MaxPeers {
return peers, nil
} else {
pm.logger.Debug("discovering peers by pubsubTopic", zap.Strings("pubsubTopics", criteria.PubsubTopics))
//Trigger on-demand discovery for this topic and connect to peer immediately.
//For now discover atleast 1 peer for the criteria
pm.discoverPeersByPubsubTopics(criteria.PubsubTopics, criteria.Proto, criteria.Ctx, 1)
//Try to fetch peers again.
continue
}
slot := pm.serviceSlots.getPeers(criteria.Proto)
if slot == nil {
continue
}
if len(criteria.PubsubTopics) == 0 || (len(criteria.PubsubTopics) == 1 && criteria.PubsubTopics[0] == "") {
return slot.getRandom(criteria.MaxPeers, criteria.ExcludePeers)
}

//PubsubTopic based selection
slot.mu.RLock()
keys := make([]peer.ID, 0, len(slot.m))
for peerID := range slot.m {
if PeerInSet(criteria.ExcludePeers, peerID) {
continue
}
keys = append(keys, peerID)
}
slot.mu.RUnlock()
selectedPeers := pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopics(criteria.PubsubTopics, keys...)
tmpPeers, err := selectRandomPeers(selectedPeers, criteria.ExcludePeers, criteria.MaxPeers)
for tmpPeer := range tmpPeers {
peers[tmpPeer] = struct{}{}
}
if err == nil && len(peers) == criteria.MaxPeers {
return peers, nil
}

//Trigger on-demand discovery for this topic and connect to peer immediately.
//For now discover at least 1 peer for the criteria
pm.logger.Debug("discovering peers by pubsubTopic", zap.Strings("pubsubTopics", criteria.PubsubTopics))
pm.discoverPeersByPubsubTopics(criteria.PubsubTopics, criteria.Proto, criteria.Ctx, 1)

//Try to fetch peers again.
}
if len(peers) == 0 {
pm.logger.Debug("could not retrieve random peer from slot", zap.Error(err))
Expand Down
26 changes: 15 additions & 11 deletions waku/v2/peerstore/waku_peer_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ import (
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/waku-org/go-waku/waku/v2/protocol"
"golang.org/x/exp/maps"

"github.com/waku-org/go-waku/waku/v2/protocol"
)

// Origin is used to determine how the peer is identified,
Expand Down Expand Up @@ -233,17 +234,20 @@ func (ps *WakuPeerstoreImpl) PeersByPubSubTopics(pubSubTopics []string, specific
for _, p := range specificPeers {
peerMatch := true
peerTopics, err := ps.PubSubTopics(p)
if err == nil {
for _, t := range pubSubTopics {
if _, ok := peerTopics[t]; !ok {
peerMatch = false
break
}
}
if peerMatch {
result = append(result, p)
if err != nil {
//Note: skipping a peer in case of an error as there would be others available.
continue
}

for _, t := range pubSubTopics {
if _, ok := peerTopics[t]; !ok {
peerMatch = false
break
}
} //Note: skipping a peer in case of an error as there would be others available.
}
if peerMatch {
result = append(result, p)
}
}
return result
}
Expand Down
Loading