Skip to content

Commit 5635735

Browse files
authored
fix(PeerManager): SelectRandom filter by protocol (#1296)
1 parent 84a4b1b commit 5635735

File tree

3 files changed

+109
-67
lines changed

3 files changed

+109
-67
lines changed

waku/v2/peermanager/peer_manager_test.go

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/multiformats/go-multiaddr"
1616
"github.com/prometheus/client_golang/prometheus"
1717
"github.com/stretchr/testify/require"
18+
1819
"github.com/waku-org/go-waku/tests"
1920
"github.com/waku-org/go-waku/waku/v2/discv5"
2021
"github.com/waku-org/go-waku/waku/v2/onlinechecker"
@@ -125,36 +126,38 @@ func TestPeerSelection(t *testing.T) {
125126
defer h3.Close()
126127

127128
protocol := libp2pProtocol.ID("test/protocol")
128-
_, err = pm.AddPeer([]multiaddr.Multiaddr{tests.GetAddr(h2)}, wps.Static, []string{"/waku/2/rs/2/1", "/waku/2/rs/2/2"}, libp2pProtocol.ID(protocol))
129+
_, err = pm.AddPeer([]multiaddr.Multiaddr{tests.GetAddr(h2)}, wps.Static, []string{"/waku/2/rs/2/1", "/waku/2/rs/2/2"}, protocol)
129130
require.NoError(t, err)
130131

131-
_, err = pm.AddPeer([]multiaddr.Multiaddr{tests.GetAddr(h3)}, wps.Static, []string{"/waku/2/rs/2/1"}, libp2pProtocol.ID(protocol))
132+
_, err = pm.AddPeer([]multiaddr.Multiaddr{tests.GetAddr(h3)}, wps.Static, []string{"/waku/2/rs/2/1"}, protocol)
132133
require.NoError(t, err)
133134

134-
_, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol})
135-
require.NoError(t, err)
135+
var peerIDs peer.IDSlice
136136

137-
peerIDs, err := pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/2"}})
137+
peerIDs, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol})
138138
require.NoError(t, err)
139-
require.Equal(t, h2.ID(), peerIDs[0])
139+
require.Len(t, peerIDs, 1) // Only 1 peer is returned randomly, because MaxPeers defaults to 1 when not set
140140

141-
_, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/3"}})
142-
require.Error(t, utils.ErrNoPeersAvailable, err)
143-
144-
_, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/1"}})
141+
peerIDs, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/2"}})
145142
require.NoError(t, err)
143+
require.Len(t, peerIDs, 1)
144+
require.Equal(t, h2.ID(), peerIDs[0]) // Only h2 has this pubsub topic
146145

147-
//Test for selectWithLowestRTT
148-
_, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: LowestRTT, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/1"}})
146+
peerIDs, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/3"}})
147+
require.Error(t, utils.ErrNoPeersAvailable, err) // No peer has this pubsub topic
148+
require.Empty(t, peerIDs)
149+
150+
peerIDs, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/1"}})
149151
require.NoError(t, err)
152+
require.Len(t, peerIDs, 1) // Both h2 and h3 have this pubsub topic, but only 1 peer is returned randomly because MaxPeers defaults to 1 when not set
150153

151154
peerIDs, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/1"}, MaxPeers: 2})
152-
require.Equal(t, 2, peerIDs.Len())
153155
require.NoError(t, err)
156+
require.Len(t, peerIDs, 2)
154157

155158
peerIDs, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/1"}, MaxPeers: 3})
156-
require.Equal(t, 2, peerIDs.Len())
157159
require.NoError(t, err)
160+
require.Len(t, peerIDs, 2)
158161

159162
h4, err := tests.MakeHost(ctx, 0, rand.Reader)
160163
require.NoError(t, err)
@@ -163,9 +166,14 @@ func TestPeerSelection(t *testing.T) {
163166
require.NoError(t, err)
164167

165168
peerIDs, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/1"}, MaxPeers: 3})
166-
require.Equal(t, 3, peerIDs.Len())
169+
require.Len(t, peerIDs, 3)
167170
require.NoError(t, err)
168171

172+
//Test for selectWithLowestRTT
173+
// NOTE: This test must go the last because it involves pinging peers, which modifies the list of supported protocols
174+
peerIDs, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: LowestRTT, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/1"}, MaxPeers: 2})
175+
require.NoError(t, err)
176+
require.Len(t, peerIDs, 1) // With LowestRTT, only 1 peer is returned, even if MaxPeers is set
169177
}
170178

171179
func TestDefaultProtocol(t *testing.T) {

waku/v2/peermanager/peer_selection.go

Lines changed: 71 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,13 @@ import (
77

88
"github.com/libp2p/go-libp2p/core/peer"
99
"github.com/libp2p/go-libp2p/core/protocol"
10+
"go.uber.org/zap"
11+
"golang.org/x/exp/maps"
12+
1013
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
1114
waku_proto "github.com/waku-org/go-waku/waku/v2/protocol"
15+
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
1216
"github.com/waku-org/go-waku/waku/v2/utils"
13-
"go.uber.org/zap"
14-
"golang.org/x/exp/maps"
1517
)
1618

1719
type PeerSet map[peer.ID]struct{}
@@ -46,7 +48,7 @@ func (pm *PeerManager) SelectPeerByContentTopics(proto protocol.ID, contentTopic
4648
return peers[0], nil
4749
}
4850

49-
// SelectRandomPeer is used to return a random peer that supports a given protocol.
51+
// SelectRandom is used to return a random peer that supports a given protocol.
5052
// If a list of specific peers is passed, the peer will be chosen from that list assuming
5153
// it supports the chosen protocol, otherwise it will chose a peer from the service slot.
5254
// If a peer cannot be found in the service slot, a peer will be selected from node peerstore
@@ -57,26 +59,40 @@ func (pm *PeerManager) SelectRandom(criteria PeerSelectionCriteria) (peer.IDSlic
5759
// This will require us to check for various factors such as:
5860
// - which topics they track
5961
// - latency?
62+
6063
peerIDs, err := pm.selectServicePeer(criteria)
6164
if err == nil && len(peerIDs) == criteria.MaxPeers {
6265
return maps.Keys(peerIDs), nil
63-
} else if !errors.Is(err, utils.ErrNoPeersAvailable) {
64-
pm.logger.Debug("could not retrieve random peer from slot", zap.String("protocol", string(criteria.Proto)),
65-
zap.Strings("pubsubTopics", criteria.PubsubTopics), zap.Error(err))
66+
}
67+
if !errors.Is(err, utils.ErrNoPeersAvailable) {
68+
pm.logger.Debug("could not retrieve random peer from slot",
69+
zap.String("protocol", string(criteria.Proto)),
70+
zap.Strings("pubsubTopics", criteria.PubsubTopics),
71+
zap.Error(err))
6672
return nil, err
67-
} else if len(peerIDs) == 0 {
73+
}
74+
if len(peerIDs) == 0 {
6875
peerIDs = make(PeerSet)
6976
}
70-
// if not found in serviceSlots or proto == WakuRelayIDv200
71-
filteredPeers, err := pm.FilterPeersByProto(criteria.SpecificPeers, criteria.ExcludePeers, criteria.Proto)
77+
78+
// if not found in serviceSlots or proto == WakuRelayIDv200 (service slots don't support WakuRelayIDv200)
79+
peerSet := criteria.SpecificPeers
80+
if len(peerSet) == 0 {
81+
peerSet = pm.host.Peerstore().Peers()
82+
}
83+
filteredPeers, err := pm.FilterPeersByProto(peerSet, criteria.ExcludePeers, criteria.Proto)
7284
if err != nil {
7385
return nil, err
7486
}
87+
if len(filteredPeers) == 0 && criteria.Proto != relay.WakuRelayID_v200 {
88+
return nil, utils.ErrNoPeersAvailable
89+
}
7590
if len(criteria.PubsubTopics) > 0 {
7691
filteredPeers = pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopics(criteria.PubsubTopics, filteredPeers...)
7792
}
93+
7894
//Not passing excludePeers as filterPeers are already considering excluded ones.
79-
randomPeers, err := selectRandomPeers(filteredPeers, nil, criteria.MaxPeers-len(peerIDs))
95+
randomPeers, err := selectRandomPeers(filteredPeers, nil, min(criteria.MaxPeers, len(peerIDs)))
8096
if err != nil && len(peerIDs) == 0 {
8197
return nil, err
8298
}
@@ -88,16 +104,14 @@ func (pm *PeerManager) SelectRandom(criteria PeerSelectionCriteria) (peer.IDSlic
88104
}
89105

90106
func getRandom(filter PeerSet, count int, excludePeers PeerSet) (PeerSet, error) {
91-
i := 0
92107
selectedPeers := make(PeerSet)
93108
for pID := range filter {
94109
if PeerInSet(excludePeers, pID) {
95110
continue
96111
}
97112
//Map's iterator in golang works using randomness and hence not random function is being used.
98113
selectedPeers[pID] = struct{}{}
99-
i++
100-
if i == count {
114+
if len(selectedPeers) == count {
101115
break
102116
}
103117
}
@@ -121,41 +135,57 @@ func PeerSliceToMap(peers peer.IDSlice) PeerSet {
121135
return peerSet
122136
}
123137

138+
/*
139+
selectServicePeer tries to select peer from serviceSlot first based on criteria.
140+
serviceSlots is a map of peerMap by protocol.ID.
141+
- Slots are created automatically in getPeers.
142+
- Slots are not created for relay.WakuRelayID_v200.
143+
Therefore for Relay protocol, selectServicePeer will always return ErrNoPeersAvailable.
144+
145+
If there is no pubsubTopics criteria, a random peer from the selected slot is returned.
146+
Otherwise, peers from the slot are filtered by pubsubTopics and random peers are selected from the filtered list.
147+
148+
If no peer is found in the slot, on-demand discovery is triggered for the given pubsubTopics and protocol.
149+
The function retries once to fetch peers from the slot after discovery.
150+
*/
124151
func (pm *PeerManager) selectServicePeer(criteria PeerSelectionCriteria) (PeerSet, error) {
125152
peers := make(PeerSet)
126153
var err error
127154
for retryCnt := 0; retryCnt < 1; retryCnt++ {
128155
//Try to fetch from serviceSlot
129-
if slot := pm.serviceSlots.getPeers(criteria.Proto); slot != nil {
130-
if len(criteria.PubsubTopics) == 0 || (len(criteria.PubsubTopics) == 1 && criteria.PubsubTopics[0] == "") {
131-
return slot.getRandom(criteria.MaxPeers, criteria.ExcludePeers)
132-
} else { //PubsubTopic based selection
133-
slot.mu.RLock()
134-
keys := make([]peer.ID, 0, len(slot.m))
135-
for i := range slot.m {
136-
if PeerInSet(criteria.ExcludePeers, i) {
137-
continue
138-
}
139-
keys = append(keys, i)
140-
}
141-
slot.mu.RUnlock()
142-
selectedPeers := pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopics(criteria.PubsubTopics, keys...)
143-
tmpPeers, err := selectRandomPeers(selectedPeers, criteria.ExcludePeers, criteria.MaxPeers)
144-
for tmpPeer := range tmpPeers {
145-
peers[tmpPeer] = struct{}{}
146-
}
147-
if err == nil && len(peers) == criteria.MaxPeers {
148-
return peers, nil
149-
} else {
150-
pm.logger.Debug("discovering peers by pubsubTopic", zap.Strings("pubsubTopics", criteria.PubsubTopics))
151-
//Trigger on-demand discovery for this topic and connect to peer immediately.
152-
//For now discover atleast 1 peer for the criteria
153-
pm.discoverPeersByPubsubTopics(criteria.PubsubTopics, criteria.Proto, criteria.Ctx, 1)
154-
//Try to fetch peers again.
155-
continue
156-
}
156+
slot := pm.serviceSlots.getPeers(criteria.Proto)
157+
if slot == nil {
158+
continue
159+
}
160+
if len(criteria.PubsubTopics) == 0 || (len(criteria.PubsubTopics) == 1 && criteria.PubsubTopics[0] == "") {
161+
return slot.getRandom(criteria.MaxPeers, criteria.ExcludePeers)
162+
}
163+
164+
//PubsubTopic based selection
165+
slot.mu.RLock()
166+
keys := make([]peer.ID, 0, len(slot.m))
167+
for peerID := range slot.m {
168+
if PeerInSet(criteria.ExcludePeers, peerID) {
169+
continue
157170
}
171+
keys = append(keys, peerID)
172+
}
173+
slot.mu.RUnlock()
174+
selectedPeers := pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopics(criteria.PubsubTopics, keys...)
175+
tmpPeers, err := selectRandomPeers(selectedPeers, criteria.ExcludePeers, criteria.MaxPeers)
176+
for tmpPeer := range tmpPeers {
177+
peers[tmpPeer] = struct{}{}
158178
}
179+
if err == nil && len(peers) == criteria.MaxPeers {
180+
return peers, nil
181+
}
182+
183+
//Trigger on-demand discovery for this topic and connect to peer immediately.
184+
//For now discover at least 1 peer for the criteria
185+
pm.logger.Debug("discovering peers by pubsubTopic", zap.Strings("pubsubTopics", criteria.PubsubTopics))
186+
pm.discoverPeersByPubsubTopics(criteria.PubsubTopics, criteria.Proto, criteria.Ctx, 1)
187+
188+
//Try to fetch peers again.
159189
}
160190
if len(peers) == 0 {
161191
pm.logger.Debug("could not retrieve random peer from slot", zap.Error(err))

waku/v2/peerstore/waku_peer_store.go

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,9 @@ import (
88
"github.com/libp2p/go-libp2p/core/network"
99
"github.com/libp2p/go-libp2p/core/peer"
1010
"github.com/libp2p/go-libp2p/core/peerstore"
11-
"github.com/waku-org/go-waku/waku/v2/protocol"
1211
"golang.org/x/exp/maps"
12+
13+
"github.com/waku-org/go-waku/waku/v2/protocol"
1314
)
1415

1516
// Origin is used to determine how the peer is identified,
@@ -233,17 +234,20 @@ func (ps *WakuPeerstoreImpl) PeersByPubSubTopics(pubSubTopics []string, specific
233234
for _, p := range specificPeers {
234235
peerMatch := true
235236
peerTopics, err := ps.PubSubTopics(p)
236-
if err == nil {
237-
for _, t := range pubSubTopics {
238-
if _, ok := peerTopics[t]; !ok {
239-
peerMatch = false
240-
break
241-
}
242-
}
243-
if peerMatch {
244-
result = append(result, p)
237+
if err != nil {
238+
//Note: skipping a peer in case of an error as there would be others available.
239+
continue
240+
}
241+
242+
for _, t := range pubSubTopics {
243+
if _, ok := peerTopics[t]; !ok {
244+
peerMatch = false
245+
break
245246
}
246-
} //Note: skipping a peer in case of an error as there would be others available.
247+
}
248+
if peerMatch {
249+
result = append(result, p)
250+
}
247251
}
248252
return result
249253
}

0 commit comments

Comments
 (0)