Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@ The following emojis are used to highlight certain changes:

### Added

- `bitswap/httpnet`: New option `WithMaxDontHaveErrors(int)` (defaults to 100)
will trigger disconnections from bitswap peers that cannot provide any of
the blocks that are requested for the given number of requests in a
row.. This is meant to limit bitswap HTTP-based optimistic requests for
blocks to discovered endpoints, which were before considered permanently
peered upon discovery.

### Changed

### Removed
Expand Down
65 changes: 65 additions & 0 deletions bitswap/network/httpnet/error_tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package httpnet

import (
"errors"
"sync"
"sync/atomic"

"github.com/libp2p/go-libp2p/core/peer"
)

var errThresholdCrossed = errors.New("the peer crossed the error threshold")

type errorTracker struct {
ht *Network

mux sync.RWMutex
errors map[peer.ID]*atomic.Uint64
}

func newErrorTracker(ht *Network) *errorTracker {
return &errorTracker{
ht: ht,
errors: make(map[peer.ID]*atomic.Uint64),
}
}

func (et *errorTracker) startTracking(p peer.ID) {
et.mux.Lock()
defer et.mux.Unlock()

// Initialize the error count for the peer if it doesn't exist
if _, exists := et.errors[p]; !exists {
et.errors[p] = &atomic.Uint64{}
}
}

func (et *errorTracker) stopTracking(p peer.ID) {
et.mux.Lock()
defer et.mux.Unlock()
delete(et.errors, p)
}

// logErrors adds n to the current error count for p. If the total count is above the threshold, then an error is returned. If n is 0, the the total count is reset to 0.
func (et *errorTracker) logErrors(p peer.ID, n uint64, threshold uint64) error {
et.mux.RLock()
count, ok := et.errors[p]
et.mux.RUnlock()

if !ok {
// i.e. we disconnected but there were pending requests
log.Debug("logging errors for untracked peer: %s", p)
return nil
}

if n == 0 { // reset error count
count.Store(0)
return nil
}

total := count.Add(n)
if total > threshold {
return errThresholdCrossed
}
return nil
}
141 changes: 141 additions & 0 deletions bitswap/network/httpnet/error_tracker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package httpnet

// Write tests for the errorTracker implementation found in watcher.go
import (
"sync"
"testing"

"github.com/libp2p/go-libp2p/core/peer"
)

func TestErrorTracker_StartTracking(t *testing.T) {
et := newErrorTracker(&Network{})
p := peer.ID("testpeer")

// Start tracking the peer
et.startTracking(p)

// Check if the error count is initialized to 0
count, ok := et.errors[p]
if !ok {
t.Errorf("Expected peer %s to be tracked but it was not", p)
}
if count.Load() != 0 {
t.Errorf("Expected initial error count for peer %s to be 0 but got %d", p, count.Load())
}
}

func TestErrorTracker_StopTracking(t *testing.T) {
et := newErrorTracker(&Network{})
p := peer.ID("testpeer")

// Start tracking the peer
et.startTracking(p)

// Stop tracking the peer
et.stopTracking(p)

// Check if the error count is removed
if _, ok := et.errors[p]; ok {
t.Errorf("Expected peer %s to be untracked but it was still tracked", p)
}
}

func TestErrorTracker_LogErrors_Reset(t *testing.T) {
et := newErrorTracker(&Network{})
p := peer.ID("testpeer")

// Start tracking the peer
et.startTracking(p)

// Log some errors
err := et.logErrors(p, 5, 10)
if err != nil {
t.Errorf("Expected no error when logging errors but got %v", err)
}

// Reset error count
err = et.logErrors(p, 0, 10)
if err != nil {
t.Errorf("Expected no error when resetting error count but got %v", err)
}

// Check if the error count is reset to 0
count, ok := et.errors[p]
if !ok {
t.Errorf("Expected peer %s to be tracked but it was not", p)
}
if count.Load() != 0 {
t.Errorf("Expected error count for peer %s to be 0 after reset but got %d", p, count.Load())
}
}

func TestErrorTracker_LogErrors_ThresholdCrossed(t *testing.T) {
et := newErrorTracker(&Network{})
p := peer.ID("testpeer")

// Start tracking the peer
et.startTracking(p)

// Log errors until threshold is crossed
err := et.logErrors(p, 11, 10)
if err != errThresholdCrossed {
t.Errorf("Expected errorThresholdCrossed when logging errors above threshold but got %v", err)
}

// Check if the error count reflects the logged errors
count, ok := et.errors[p]
if !ok {
t.Errorf("Expected peer %s to be tracked but it was not", p)
}
if count.Load() != 11 {
t.Errorf("Expected error count for peer %s to be 10 after logging errors above threshold but got %d", p, count.Load())
}
}

func TestErrorTracker_LogErrors_UntrackedPeer(t *testing.T) {
et := newErrorTracker(&Network{})
p := peer.ID("testpeer")

// Log errors for an untracked peer
err := et.logErrors(p, 5, 10)
if err != nil {
t.Errorf("Expected no error when logging errors for an untracked peer but got %v", err)
}
}

// Write a test that tests concurrent access to the methods
func TestErrorTracker_ConcurrentAccess(t *testing.T) {
et := newErrorTracker(&Network{})
p := peer.ID("testpeer")

// Start tracking the peer
et.startTracking(p)

var wg sync.WaitGroup
numRoutines := 10
threshold := uint64(100)

for i := 0; i < numRoutines; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < int(threshold)/numRoutines; j++ {
et.logErrors(p, 1, threshold)
}
}()
}

wg.Wait()

// Check if the error count is correct
count, ok := et.errors[p]
if !ok {
t.Errorf("Expected peer %s to be tracked but it was not", p)
}
expectedCount := threshold
actualCount := count.Load()
if actualCount != expectedCount {
t.Errorf("Expected error count for peer %s to be %d after concurrent logging but got %d", p, expectedCount, actualCount)
}
}
22 changes: 22 additions & 0 deletions bitswap/network/httpnet/httpnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ const (
DefaultInsecureSkipVerify = false
DefaultMaxBackoff = time.Minute
DefaultMaxHTTPAddressesPerPeer = 10
DefaultMaxDontHaveErrors = 100
DefaultHTTPWorkers = 64
)

Expand Down Expand Up @@ -155,6 +156,19 @@ func WithHTTPWorkers(n int) Option {
}
}

// WithMaxDontHaveErrors sets the maximum number of client errors that a peer
// can cause in a row before we disconnect. For example, if set to 50, and a
// peer returns 404 to 50 requests in a row, we will disconnect and signal the
// upper layers to stop making requests to this peer and its endpoints. It may
// be that pending requests will still happen. The HTTP connection might be
// kept until it times-out per the IdleConnTimeout. Requests will resume if a
// provider record is found causing us to "reconnect" to the peer.
func WithMaxDontHaveErrors(threshold uint64) Option {
return func(net *Network) {
net.maxDontHaveErrors = threshold
}
}

type Network struct {
// NOTE: Stats must be at the top of the heap allocation to ensure 64bit
// alignment.
Expand All @@ -168,6 +182,7 @@ type Network struct {
receivers []network.Receiver
connEvtMgr *network.ConnectEventManager
pinger *pinger
errorTracker *errorTracker
requestTracker *requestTracker
cooldownTracker *cooldownTracker

Expand All @@ -180,6 +195,7 @@ type Network struct {
maxIdleConns int
insecureSkipVerify bool
maxHTTPAddressesPerPeer int
maxDontHaveErrors uint64
httpWorkers int
allowlist map[string]struct{}
denylist map[string]struct{}
Expand Down Expand Up @@ -215,6 +231,7 @@ func New(host host.Host, opts ...Option) network.BitSwapNetwork {
maxIdleConns: DefaultMaxIdleConns,
insecureSkipVerify: DefaultInsecureSkipVerify,
maxHTTPAddressesPerPeer: DefaultMaxHTTPAddressesPerPeer,
maxDontHaveErrors: DefaultMaxDontHaveErrors,
httpWorkers: DefaultHTTPWorkers,
httpRequests: make(chan httpRequestInfo),
}
Expand Down Expand Up @@ -293,6 +310,9 @@ func New(host host.Host, opts ...Option) network.BitSwapNetwork {
pinger := newPinger(htnet, pingCid)
htnet.pinger = pinger

et := newErrorTracker(htnet)
htnet.errorTracker = et

for i := 0; i < htnet.httpWorkers; i++ {
go htnet.httpWorker(i)
}
Expand Down Expand Up @@ -464,6 +484,7 @@ func (ht *Network) Connect(ctx context.Context, pi peer.AddrInfo) error {

ht.connEvtMgr.Connected(p)
ht.pinger.startPinging(p)
ht.errorTracker.startTracking(p)
log.Debugf("connect success to %s (supports HEAD: %t)", p, supportsHead)
// We "connected"
return nil
Expand Down Expand Up @@ -520,6 +541,7 @@ func (ht *Network) DisconnectFrom(ctx context.Context, p peer.ID) error {
ht.host.Peerstore().SetAddrs(p, bsaddrs.Addrs, peerstore.TempAddrTTL)
}
ht.pinger.stopPinging(p)
ht.errorTracker.stopTracking(p)

// coolDownTracker: we leave untouched. We want to keep
// ongoing cooldowns there in case we reconnect to this peer.
Expand Down
Loading