Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion waku/v2/api/filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type Sub struct {
type subscribeParameters struct {
batchInterval time.Duration
multiplexChannelBuffer int
preferredPeers peer.IDSlice
}

type SubscribeOptions func(*subscribeParameters)
Expand All @@ -75,6 +76,12 @@ func defaultOptions() []SubscribeOptions {
}
}

func WithPreferredServiceNodes(peers peer.IDSlice) SubscribeOptions {
return func(params *subscribeParameters) {
params.preferredPeers = peers
}
}

// Subscribe
func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilter protocol.ContentFilter, config FilterConfig, log *zap.Logger, params *subscribeParameters) (*Sub, error) {
sub := new(Sub)
Expand Down Expand Up @@ -197,7 +204,16 @@ func (apiSub *Sub) subscribe(contentFilter protocol.ContentFilter, peerCount int
options := make([]filter.FilterSubscribeOption, 0)
options = append(options, filter.WithMaxPeersPerContentFilter(int(peerCount)))
for _, p := range apiSub.Config.Peers {
options = append(options, filter.WithPeer(p))
isExcludedPeer := false
for _, px := range peersToExclude { // configured peer can be excluded if sub fails with it.
if p == px {
isExcludedPeer = true
break
}
}
if !isExcludedPeer {
options = append(options, filter.WithPeer(p))
}
}
if len(peersToExclude) > 0 {
apiSub.log.Debug("subscribing with peers to exclude", zap.Stringers("excluded-peers", peersToExclude))
Expand Down
11 changes: 10 additions & 1 deletion waku/v2/api/filter/filter_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package filter

import (
"context"
"math/rand"
"sync"
"time"

"github.com/google/uuid"
"github.com/libp2p/go-libp2p/core/peer"

"go.uber.org/zap"
"golang.org/x/exp/maps"
Expand Down Expand Up @@ -61,7 +63,8 @@ type EnevelopeProcessor interface {
OnNewEnvelope(env *protocol.Envelope) error
}

func NewFilterManager(ctx context.Context, logger *zap.Logger, minPeersPerFilter int, envProcessor EnevelopeProcessor, node *filter.WakuFilterLightNode, opts ...SubscribeOptions) *FilterManager {
func NewFilterManager(ctx context.Context, logger *zap.Logger, minPeersPerFilter int,
envProcessor EnevelopeProcessor, node *filter.WakuFilterLightNode, opts ...SubscribeOptions) *FilterManager {
// This fn is being mocked in test
mgr := new(FilterManager)
mgr.ctx = ctx
Expand Down Expand Up @@ -162,6 +165,12 @@ func (mgr *FilterManager) subscribeAndRunLoop(f filterConfig) {
defer utils.LogOnPanic()
ctx, cancel := context.WithCancel(mgr.ctx)
config := FilterConfig{MaxPeers: mgr.minPeersPerFilter}
if len(mgr.params.preferredPeers) > 0 {
//use one peer which is from preferred peers.
randomIndex := rand.Intn(len(mgr.params.preferredPeers) - 1)
randomPreferredPeer := mgr.params.preferredPeers[randomIndex]
config.Peers = []peer.ID{randomPreferredPeer}
}
sub, err := Subscribe(ctx, mgr.node, f.contentFilter, config, mgr.logger, mgr.params)
mgr.Lock()
mgr.filterSubscriptions[f.ID] = SubDetails{cancel, sub}
Expand Down
2 changes: 1 addition & 1 deletion waku/v2/api/filter/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (s *FilterApiTestSuite) TestSubscribe() {
s.Require().Equal(contentFilter.PubsubTopic, s.TestTopic)
ctx, cancel := context.WithCancel(context.Background())
s.Log.Info("About to perform API Subscribe()")
params := subscribeParameters{300 * time.Second, 1024}
params := subscribeParameters{300 * time.Second, 1024, nil}
apiSub, err := Subscribe(ctx, s.LightNode, contentFilter, apiConfig, s.Log, &params)
s.Require().NoError(err)
s.Require().Equal(apiSub.ContentFilter, contentFilter)
Expand Down
18 changes: 16 additions & 2 deletions waku/v2/protocol/filter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,11 +333,20 @@ func (wf *WakuFilterLightNode) handleFilterSubscribeOptions(ctx context.Context,
params.selectedPeers = append(params.selectedPeers, pData.AddrInfo.ID)
}
reqPeerCount := params.maxPeers - len(params.selectedPeers)
for _, p := range params.selectedPeers {
if params.peersToExclude == nil {
params.peersToExclude = make(peermanager.PeerSet)
}
//exclude peers that are preferredpeers so that they don't get selected again.
if _, ok := params.peersToExclude[p]; !ok {
params.peersToExclude[p] = struct{}{}
}
}

if params.pm != nil && reqPeerCount > 0 {

wf.log.Debug("handleFilterSubscribeOptions", zap.Int("peerCount", reqPeerCount), zap.Int("excludePeersLen", len(params.peersToExclude)))
params.selectedPeers, err = wf.pm.SelectPeers(
selectedPeers, err := wf.pm.SelectPeers(
peermanager.PeerSelectionCriteria{
SelectionType: params.peerSelectionType,
Proto: FilterSubscribeID_v20beta1,
Expand All @@ -350,7 +359,12 @@ func (wf *WakuFilterLightNode) handleFilterSubscribeOptions(ctx context.Context,
)
if err != nil {
wf.log.Error("peer selection returned err", zap.Error(err))
return nil, nil, err
if len(params.selectedPeers) == 0 {
return nil, nil, err
}
}
if len(selectedPeers) > 0 {
params.selectedPeers = append(params.selectedPeers, selectedPeers...)
}
}
wf.log.Debug("handleFilterSubscribeOptions exit", zap.Int("selectedPeerCount", len(params.selectedPeers)))
Expand Down
Loading