Skip to content

feat(discovery): add jitter and fix backoff #2967

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 28 additions & 32 deletions share/p2p/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,18 @@
"context"
"errors"
"fmt"
"math/rand"
"time"

"golang.org/x/sync/errgroup"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to keep as it was.


logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p/core/discovery"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
"golang.org/x/sync/errgroup"
)

var log = logging.Logger("share/discovery")
Expand All @@ -33,12 +35,12 @@

// logInterval defines the time interval at which a warning message will be logged
// if the desired number of nodes is not detected.
logInterval = 5 * time.Minute
logInterval = 5 * time.Minute
maxBackoffDuration = 10 * time.Minute
backoffInitialDuration = 5 * time.Second
jitterFactor = 0.1
)

// discoveryRetryTimeout defines time interval between discovery attempts, needed for tests
var discoveryRetryTimeout = retryTimeout

// Discovery combines advertise and discover services and allows to store discovered nodes.
// TODO: The code here gets horribly hairy, so we should refactor this at some point
type Discovery struct {
Expand All @@ -57,7 +59,8 @@

cancel context.CancelFunc

params *Parameters
params *Parameters
backoffDuration time.Duration
}

type OnUpdatedPeers func(peerID peer.ID, isAdded bool)
Expand Down Expand Up @@ -86,14 +89,15 @@
}
o := newOptions(opts...)
return &Discovery{
tag: tag,
set: newLimitedSet(params.PeersLimit),
host: h,
disc: d,
connector: newBackoffConnector(h, defaultBackoffFactory),
onUpdatedPeers: o.onUpdatedPeers,
params: params,
triggerDisc: make(chan struct{}),
tag: tag,
set: newLimitedSet(params.PeersLimit),
host: h,
disc: d,
connector: newBackoffConnector(h, defaultBackoffFactory),
onUpdatedPeers: o.onUpdatedPeers,
params: params,
triggerDisc: make(chan struct{}),
backoffDuration: backoffInitialDuration,
}, nil
}

Expand Down Expand Up @@ -199,21 +203,25 @@
// It initiates peer discovery upon request and restarts the process until the soft limit is
// reached.
func (d *Discovery) discoveryLoop(ctx context.Context) {
t := time.NewTicker(discoveryRetryTimeout)
defer t.Stop()
backoffTimer := time.NewTimer(0)
defer backoffTimer.Stop()

warnTicker := time.NewTicker(logInterval)
defer warnTicker.Stop()

for {
// drain all previous ticks from the channel
drainChannel(t.C)
select {
case <-t.C:
case <-backoffTimer.C:
if !d.discover(ctx) {
// rerun discovery if the number of peers hasn't reached the limit
d.backoffDuration += backoffInitialDuration
if d.backoffDuration > maxBackoffDuration {
d.backoffDuration = maxBackoffDuration
}
jitter := time.Duration(float64(d.backoffDuration) * jitterFactor * (rand.Float64()*2 - 1))

Check failure on line 220 in share/p2p/discovery/discovery.go

View workflow job for this annotation

GitHub Actions / go-ci / Lint

G404: Use of weak random number generator (math/rand instead of crypto/rand) (gosec)
backoffTimer.Reset(d.backoffDuration + jitter)
continue
}
d.backoffDuration = backoffInitialDuration
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also reset backoffTimer here?

case <-warnTicker.C:
if d.set.Size() < d.set.Limit() {
log.Warnf(
Expand All @@ -222,8 +230,6 @@
d.tag, logInterval, d.set.Size(), d.set.Limit(),
)
}
// Do not break the loop; just continue
continue
case <-ctx.Done():
return
}
Expand Down Expand Up @@ -375,13 +381,3 @@
d.host.ConnManager().Protect(peer.ID, d.tag)
return true
}

func drainChannel(c <-chan time.Time) {
for {
select {
case <-c:
default:
return
}
}
}
4 changes: 1 addition & 3 deletions share/p2p/discovery/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@ const (
func TestDiscovery(t *testing.T) {
const nodes = 10 // higher number brings higher coverage

discoveryRetryTimeout = time.Millisecond * 100 // defined in discovery.go

ctx, cancel := context.WithTimeout(context.Background(), time.Second*20)
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*10)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why so big? 30x more than before 👀

t.Cleanup(cancel)

tn := newTestnet(ctx, t)
Expand Down
Loading