99 "sync"
1010 "time"
1111
12+ backoffv4 "github.com/cenkalti/backoff/v4"
1213 golog "github.com/ipfs/go-log/v2"
1314 "github.com/libp2p/go-libp2p"
1415 "go.uber.org/zap"
@@ -23,6 +24,8 @@ import (
2324 "github.com/libp2p/go-libp2p/core/peerstore"
2425 "github.com/libp2p/go-libp2p/core/protocol"
2526 "github.com/libp2p/go-libp2p/p2p/discovery/backoff"
27+ "github.com/libp2p/go-libp2p/p2p/host/autorelay"
28+ "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/proto"
2629 ws "github.com/libp2p/go-libp2p/p2p/transport/websocket"
2730 ma "github.com/multiformats/go-multiaddr"
2831 "go.opencensus.io/stats"
@@ -89,7 +92,8 @@ type WakuNode struct {
8992 store ReceptorService
9093 rlnRelay RLNRelay
9194
92- wakuFlag enr.WakuEnrBitfield
95+ wakuFlag enr.WakuEnrBitfield
96+ circuitRelayNodes chan peer.AddrInfo
9397
9498 localNode * enode.LocalNode
9599
@@ -177,6 +181,34 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
177181 w .wg = & sync.WaitGroup {}
178182 w .keepAliveFails = make (map [peer.ID ]int )
179183 w .wakuFlag = enr .NewWakuEnrBitfield (w .opts .enableLightPush , w .opts .enableLegacyFilter , w .opts .enableStore , w .opts .enableRelay )
184+ w .circuitRelayNodes = make (chan peer.AddrInfo )
185+
186+ // Use circuit relay with nodes received on circuitRelayNodes channel
187+ params .libP2POpts = append (params .libP2POpts , libp2p .EnableAutoRelayWithPeerSource (
188+ func (ctx context.Context , numPeers int ) <- chan peer.AddrInfo {
189+ r := make (chan peer.AddrInfo )
190+ go func () {
191+ defer close (r )
192+ for ; numPeers != 0 ; numPeers -- {
193+ select {
194+ case v , ok := <- w .circuitRelayNodes :
195+ if ! ok {
196+ return
197+ }
198+ select {
199+ case r <- v :
200+ case <- ctx .Done ():
201+ return
202+ }
203+ case <- ctx .Done ():
204+ return
205+ }
206+ }
207+ }()
208+ return r
209+ },
210+ autorelay .WithMinInterval (0 ),
211+ ))
180212
181213 if params .enableNTP {
182214 w .timesource = timesource .NewNTPTimesource (w .opts .ntpURLs , w .log )
@@ -305,10 +337,11 @@ func (w *WakuNode) Start(ctx context.Context) error {
305337
306338 w .enrChangeCh = make (chan struct {}, 10 )
307339
308- w .wg .Add (3 )
340+ w .wg .Add (4 )
309341 go w .connectednessListener (ctx )
310342 go w .watchMultiaddressChanges (ctx )
311343 go w .watchENRChanges (ctx )
344+ go w .findRelayNodes (ctx )
312345
313346 err = w .bcaster .Start (ctx )
314347 if err != nil {
@@ -812,3 +845,56 @@ func (w *WakuNode) Peers() ([]*Peer, error) {
812845 }
813846 return peers , nil
814847}
848+
849+ func (w * WakuNode ) findRelayNodes (ctx context.Context ) {
850+ defer w .wg .Done ()
851+
852+ // Feed peers more often right after the bootstrap, then backoff
853+ bo := backoffv4 .NewExponentialBackOff ()
854+ bo .InitialInterval = 15 * time .Second
855+ bo .Multiplier = 3
856+ bo .MaxInterval = 1 * time .Hour
857+ bo .MaxElapsedTime = 0 // never stop
858+ t := backoffv4 .NewTicker (bo )
859+ defer t .Stop ()
860+ for {
861+ select {
862+ case <- t .C :
863+ case <- ctx .Done ():
864+ return
865+ }
866+
867+ peers , err := w .Peers ()
868+ if err != nil {
869+ w .log .Error ("failed to fetch peers" , zap .Error (err ))
870+ continue
871+ }
872+
873+ // Shuffle peers
874+ rand .Seed (time .Now ().UnixNano ())
875+ rand .Shuffle (len (peers ), func (i , j int ) { peers [i ], peers [j ] = peers [j ], peers [i ] })
876+
877+ for _ , p := range peers {
878+ info := w .Host ().Peerstore ().PeerInfo (p .ID )
879+
880+ supportedProtocols , err := w .Host ().Peerstore ().SupportsProtocols (p .ID , proto .ProtoIDv2Hop )
881+ if err != nil {
882+ w .log .Error ("could not check supported protocols" , zap .Error (err ))
883+ continue
884+ }
885+
886+ if len (supportedProtocols ) == 0 {
887+ continue
888+ }
889+
890+ select {
891+ case <- ctx .Done ():
892+ w .log .Debug ("context done, auto-relay has enough peers" )
893+ return
894+
895+ case w .circuitRelayNodes <- info :
896+ w .log .Debug ("published auto-relay peer info" , zap .Any ("peer-id" , p .ID ))
897+ }
898+ }
899+ }
900+ }
0 commit comments