Skip to content

Commit d81465e

Browse files
committed
feat: add option to specify preferred peers for filter
1 parent c78b09d commit d81465e

File tree

4 files changed

+44
-5
lines changed

4 files changed

+44
-5
lines changed

waku/v2/api/filter/filter.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ type Sub struct {
5252
type subscribeParameters struct {
5353
batchInterval time.Duration
5454
multiplexChannelBuffer int
55+
preferredPeers peer.IDSlice
5556
}
5657

5758
type SubscribeOptions func(*subscribeParameters)
@@ -75,6 +76,12 @@ func defaultOptions() []SubscribeOptions {
7576
}
7677
}
7778

79+
func WithPreferredServiceNodes(peers peer.IDSlice) SubscribeOptions {
80+
return func(params *subscribeParameters) {
81+
params.preferredPeers = peers
82+
}
83+
}
84+
7885
// Subscribe
7986
func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilter protocol.ContentFilter, config FilterConfig, log *zap.Logger, params *subscribeParameters) (*Sub, error) {
8087
sub := new(Sub)
@@ -197,7 +204,16 @@ func (apiSub *Sub) subscribe(contentFilter protocol.ContentFilter, peerCount int
197204
options := make([]filter.FilterSubscribeOption, 0)
198205
options = append(options, filter.WithMaxPeersPerContentFilter(int(peerCount)))
199206
for _, p := range apiSub.Config.Peers {
200-
options = append(options, filter.WithPeer(p))
207+
isExcludedPeer := false
208+
for _, px := range peersToExclude { // configured peer can be excluded if sub fails with it.
209+
if p == px {
210+
isExcludedPeer = true
211+
break
212+
}
213+
}
214+
if !isExcludedPeer {
215+
options = append(options, filter.WithPeer(p))
216+
}
201217
}
202218
if len(peersToExclude) > 0 {
203219
apiSub.log.Debug("subscribing with peers to exclude", zap.Stringers("excluded-peers", peersToExclude))

waku/v2/api/filter/filter_manager.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@ package filter
22

33
import (
44
"context"
5+
"math/rand"
56
"sync"
67
"time"
78

89
"github.com/google/uuid"
10+
"github.com/libp2p/go-libp2p/core/peer"
911

1012
"go.uber.org/zap"
1113
"golang.org/x/exp/maps"
@@ -61,7 +63,8 @@ type EnevelopeProcessor interface {
6163
OnNewEnvelope(env *protocol.Envelope) error
6264
}
6365

64-
func NewFilterManager(ctx context.Context, logger *zap.Logger, minPeersPerFilter int, envProcessor EnevelopeProcessor, node *filter.WakuFilterLightNode, opts ...SubscribeOptions) *FilterManager {
66+
func NewFilterManager(ctx context.Context, logger *zap.Logger, minPeersPerFilter int,
67+
envProcessor EnevelopeProcessor, node *filter.WakuFilterLightNode, opts ...SubscribeOptions) *FilterManager {
6568
// This fn is being mocked in test
6669
mgr := new(FilterManager)
6770
mgr.ctx = ctx
@@ -162,6 +165,12 @@ func (mgr *FilterManager) subscribeAndRunLoop(f filterConfig) {
162165
defer utils.LogOnPanic()
163166
ctx, cancel := context.WithCancel(mgr.ctx)
164167
config := FilterConfig{MaxPeers: mgr.minPeersPerFilter}
168+
if len(mgr.params.preferredPeers) > 0 {
169+
//use one peer which is from preferred peers.
170+
randomIndex := rand.Intn(len(mgr.params.preferredPeers) - 1)
171+
randomPreferredPeer := mgr.params.preferredPeers[randomIndex]
172+
config.Peers = []peer.ID{randomPreferredPeer}
173+
}
165174
sub, err := Subscribe(ctx, mgr.node, f.contentFilter, config, mgr.logger, mgr.params)
166175
mgr.Lock()
167176
mgr.filterSubscriptions[f.ID] = SubDetails{cancel, sub}

waku/v2/api/filter/filter_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ func (s *FilterApiTestSuite) TestSubscribe() {
5454
s.Require().Equal(contentFilter.PubsubTopic, s.TestTopic)
5555
ctx, cancel := context.WithCancel(context.Background())
5656
s.Log.Info("About to perform API Subscribe()")
57-
params := subscribeParameters{300 * time.Second, 1024}
57+
params := subscribeParameters{300 * time.Second, 1024, nil}
5858
apiSub, err := Subscribe(ctx, s.LightNode, contentFilter, apiConfig, s.Log, &params)
5959
s.Require().NoError(err)
6060
s.Require().Equal(apiSub.ContentFilter, contentFilter)

waku/v2/protocol/filter/client.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -333,11 +333,20 @@ func (wf *WakuFilterLightNode) handleFilterSubscribeOptions(ctx context.Context,
333333
params.selectedPeers = append(params.selectedPeers, pData.AddrInfo.ID)
334334
}
335335
reqPeerCount := params.maxPeers - len(params.selectedPeers)
336+
for _, p := range params.selectedPeers {
337+
if params.peersToExclude == nil {
338+
params.peersToExclude = make(peermanager.PeerSet)
339+
}
340+
//exclude peers that are preferredpeers so that they don't get selected again.
341+
if _, ok := params.peersToExclude[p]; !ok {
342+
params.peersToExclude[p] = struct{}{}
343+
}
344+
}
336345

337346
if params.pm != nil && reqPeerCount > 0 {
338347

339348
wf.log.Debug("handleFilterSubscribeOptions", zap.Int("peerCount", reqPeerCount), zap.Int("excludePeersLen", len(params.peersToExclude)))
340-
params.selectedPeers, err = wf.pm.SelectPeers(
349+
selectedPeers, err := wf.pm.SelectPeers(
341350
peermanager.PeerSelectionCriteria{
342351
SelectionType: params.peerSelectionType,
343352
Proto: FilterSubscribeID_v20beta1,
@@ -350,7 +359,12 @@ func (wf *WakuFilterLightNode) handleFilterSubscribeOptions(ctx context.Context,
350359
)
351360
if err != nil {
352361
wf.log.Error("peer selection returned err", zap.Error(err))
353-
return nil, nil, err
362+
if len(params.selectedPeers) == 0 {
363+
return nil, nil, err
364+
}
365+
}
366+
if len(selectedPeers) > 0 {
367+
params.selectedPeers = append(params.selectedPeers, selectedPeers...)
354368
}
355369
}
356370
wf.log.Debug("handleFilterSubscribeOptions exit", zap.Int("selectedPeerCount", len(params.selectedPeers)))

0 commit comments

Comments
 (0)