Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@ The following emojis are used to highlight certain changes:

### Added

- `bitswap/httpnet`: New option `WithMaxDontHaveErrors(int)` (defaults to 100)
will trigger disconnections from bitswap peers that cannot provide any of
the blocks that are requested for the given number of requests in a
row.. This is meant to limit bitswap HTTP-based optimistic requests for
blocks to discovered endpoints, which were before considered permanently
peered upon discovery.

### Changed

### Removed
Expand Down
48 changes: 48 additions & 0 deletions bitswap/network/httpnet/error_tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package httpnet

import (
"errors"
"sync"

"github.com/libp2p/go-libp2p/core/peer"
)

var errThresholdCrossed = errors.New("the peer crossed the error threshold")

type errorTracker struct {
ht *Network

mux sync.RWMutex
errors map[peer.ID]int
}

func newErrorTracker(ht *Network) *errorTracker {
return &errorTracker{
ht: ht,
errors: make(map[peer.ID]int),
}
}

func (et *errorTracker) stopTracking(p peer.ID) {
et.mux.Lock()
delete(et.errors, p)
et.mux.Unlock()
}

// logErrors adds n to the current error count for p. If the total count is above the threshold, then an error is returned. If n is 0, the the total count is reset to 0.
func (et *errorTracker) logErrors(p peer.ID, n int, threshold int) error {
et.mux.Lock()
defer et.mux.Unlock()

if n == 0 { // reset error count
delete(et.errors, p)
return nil
}
count := et.errors[p]
total := count + n
et.errors[p] = total
if total > threshold {
return errThresholdCrossed
}
return nil
}
98 changes: 98 additions & 0 deletions bitswap/network/httpnet/error_tracker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package httpnet

// Write tests for the errorTracker implementation found in watcher.go
import (
"sync"
"testing"

"github.com/libp2p/go-libp2p/core/peer"
)

func TestErrorTracker_StopTracking(t *testing.T) {
et := newErrorTracker(&Network{})
p := peer.ID("testpeer")

// Stop tracking the peer
et.stopTracking(p)

// Check if the error count is removed
if _, ok := et.errors[p]; ok {
t.Errorf("Expected peer %s to be untracked but it was still tracked", p)
}
}

func TestErrorTracker_LogErrors_Reset(t *testing.T) {
et := newErrorTracker(&Network{})
p := peer.ID("testpeer")

// Log some errors
err := et.logErrors(p, 5, 10)
if err != nil {
t.Errorf("Expected no error when logging errors but got %v", err)
}

// Reset error count
err = et.logErrors(p, 0, 10)
if err != nil {
t.Errorf("Expected no error when resetting error count but got %v", err)
}

// Check if the error count is reset to 0
count := et.errors[p]
if count != 0 {
t.Errorf("Expected error count for peer %s to be 0 after reset but got %d", p, count)
}
}

func TestErrorTracker_LogErrors_ThresholdCrossed(t *testing.T) {
et := newErrorTracker(&Network{})
p := peer.ID("testpeer")

// Log errors until threshold is crossed
err := et.logErrors(p, 11, 10)
if err != errThresholdCrossed {
t.Errorf("Expected errorThresholdCrossed when logging errors above threshold but got %v", err)
}

// Check if the error count reflects the logged errors
count, ok := et.errors[p]
if !ok {
t.Errorf("Expected peer %s to be tracked but it was not", p)
}
if count != 11 {
t.Errorf("Expected error count for peer %s to be 10 after logging errors above threshold but got %d", p, count)
}
}

// Write a test that tests concurrent access to the methods
func TestErrorTracker_ConcurrentAccess(t *testing.T) {
et := newErrorTracker(&Network{})
p := peer.ID("testpeer")

var wg sync.WaitGroup
numRoutines := 10
threshold := 100

for i := 0; i < numRoutines; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < int(threshold)/numRoutines; j++ {
et.logErrors(p, 1, threshold)
}
}()
}

wg.Wait()

// Check if the error count is correct
count, ok := et.errors[p]
if !ok {
t.Errorf("Expected peer %s to be tracked but it was not", p)
}
expectedCount := threshold
actualCount := count
if actualCount != expectedCount {
t.Errorf("Expected error count for peer %s to be %d after concurrent logging but got %d", p, expectedCount, actualCount)
}
}
21 changes: 21 additions & 0 deletions bitswap/network/httpnet/httpnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ const (
DefaultInsecureSkipVerify = false
DefaultMaxBackoff = time.Minute
DefaultMaxHTTPAddressesPerPeer = 10
DefaultMaxDontHaveErrors = 100
DefaultHTTPWorkers = 64
)

Expand Down Expand Up @@ -155,6 +156,19 @@ func WithHTTPWorkers(n int) Option {
}
}

// WithMaxDontHaveErrors sets the maximum number of client errors that a peer
// can cause in a row before we disconnect. For example, if set to 50, and a
// peer returns 404 to 50 requests in a row, we will disconnect and signal the
// upper layers to stop making requests to this peer and its endpoints. It may
// be that pending requests will still happen. The HTTP connection might be
// kept until it times-out per the IdleConnTimeout. Requests will resume if a
// provider record is found causing us to "reconnect" to the peer.
func WithMaxDontHaveErrors(threshold int) Option {
return func(net *Network) {
net.maxDontHaveErrors = threshold
}
}

type Network struct {
// NOTE: Stats must be at the top of the heap allocation to ensure 64bit
// alignment.
Expand All @@ -168,6 +182,7 @@ type Network struct {
receivers []network.Receiver
connEvtMgr *network.ConnectEventManager
pinger *pinger
errorTracker *errorTracker
requestTracker *requestTracker
cooldownTracker *cooldownTracker

Expand All @@ -180,6 +195,7 @@ type Network struct {
maxIdleConns int
insecureSkipVerify bool
maxHTTPAddressesPerPeer int
maxDontHaveErrors int
httpWorkers int
allowlist map[string]struct{}
denylist map[string]struct{}
Expand Down Expand Up @@ -215,6 +231,7 @@ func New(host host.Host, opts ...Option) network.BitSwapNetwork {
maxIdleConns: DefaultMaxIdleConns,
insecureSkipVerify: DefaultInsecureSkipVerify,
maxHTTPAddressesPerPeer: DefaultMaxHTTPAddressesPerPeer,
maxDontHaveErrors: DefaultMaxDontHaveErrors,
httpWorkers: DefaultHTTPWorkers,
httpRequests: make(chan httpRequestInfo),
}
Expand Down Expand Up @@ -293,6 +310,9 @@ func New(host host.Host, opts ...Option) network.BitSwapNetwork {
pinger := newPinger(htnet, pingCid)
htnet.pinger = pinger

et := newErrorTracker(htnet)
htnet.errorTracker = et

for i := 0; i < htnet.httpWorkers; i++ {
go htnet.httpWorker(i)
}
Expand Down Expand Up @@ -520,6 +540,7 @@ func (ht *Network) DisconnectFrom(ctx context.Context, p peer.ID) error {
ht.host.Peerstore().SetAddrs(p, bsaddrs.Addrs, peerstore.TempAddrTTL)
}
ht.pinger.stopPinging(p)
ht.errorTracker.stopTracking(p)

// coolDownTracker: we leave untouched. We want to keep
// ongoing cooldowns there in case we reconnect to this peer.
Expand Down
96 changes: 84 additions & 12 deletions bitswap/network/httpnet/httpnet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@ var backoffCid = cid.MustParse("bafkreid6g5qrufgqj46djic7ntjnppaj5bg4urppjoyywrx
var _ network.Receiver = (*mockRecv)(nil)

type mockRecv struct {
blocks map[cid.Cid]struct{}
haves map[cid.Cid]struct{}
donthaves map[cid.Cid]struct{}
waitCh chan struct{}
blocks map[cid.Cid]struct{}
haves map[cid.Cid]struct{}
donthaves map[cid.Cid]struct{}
waitCh chan struct{}
waitConnectedCh chan struct{}
waitDisconnectedCh chan struct{}
}

func (recv *mockRecv) ReceiveMessage(ctx context.Context, sender peer.ID, incoming bsmsg.BitSwapMessage) {
Expand All @@ -56,8 +58,8 @@ func (recv *mockRecv) ReceiveMessage(ctx context.Context, sender peer.ID, incomi
recv.waitCh <- struct{}{}
}

func (recv *mockRecv) wait(seconds time.Duration) error {
ctx, cancel := context.WithTimeout(context.Background(), seconds*time.Second)
func (recv *mockRecv) wait(seconds int) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(seconds)*time.Second)
defer cancel()
select {
case <-ctx.Done():
Expand All @@ -67,25 +69,49 @@ func (recv *mockRecv) wait(seconds time.Duration) error {
}
}

func (recv *mockRecv) waitConnected(seconds int) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(seconds)*time.Second)
defer cancel()
select {
case <-ctx.Done():
return errors.New("receiver waited too long without receiving a connect event")
case <-recv.waitConnectedCh:
return nil
}
}

func (recv *mockRecv) waitDisconnected(seconds int) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(seconds)*time.Second)
defer cancel()
select {
case <-ctx.Done():
return errors.New("receiver waited too long without receiving a disconnect event")
case <-recv.waitDisconnectedCh:
return nil
}
}

func (recv *mockRecv) ReceiveError(err error) {

}

func (recv *mockRecv) PeerConnected(p peer.ID) {

recv.waitConnectedCh <- struct{}{}
}

func (recv *mockRecv) PeerDisconnected(p peer.ID) {

recv.waitDisconnectedCh <- struct{}{}
}

func mockReceiver(t *testing.T) *mockRecv {
t.Helper()
return &mockRecv{
blocks: make(map[cid.Cid]struct{}),
haves: make(map[cid.Cid]struct{}),
donthaves: make(map[cid.Cid]struct{}),
waitCh: make(chan struct{}, 1),
blocks: make(map[cid.Cid]struct{}),
haves: make(map[cid.Cid]struct{}),
donthaves: make(map[cid.Cid]struct{}),
waitCh: make(chan struct{}, 1),
waitConnectedCh: make(chan struct{}, 1),
waitDisconnectedCh: make(chan struct{}, 1),
}

}
Expand Down Expand Up @@ -559,3 +585,49 @@ func TestBackOff(t *testing.T) {
t.Error("no blocks should have been received while on backoff")
}
}

// Write a TestErrorTracking function which tests that a peer is disconnected when the treshold is crossed.
func TestErrorTracking(t *testing.T) {
ctx := context.Background()
recv := mockReceiver(t)
htnet, mn := mockNetwork(t, recv, WithMaxDontHaveErrors(1))

peer, err := mn.GenPeer()
if err != nil {
t.Fatal(err)
}

msrv := makeServer(t, 0, 0)
connectToPeer(t, ctx, htnet, peer, msrv)

err = recv.waitConnected(1)
if err != nil {
t.Fatal(err)
}

wl := makeCids(t, 0, 1)
msg := makeWantsMessage(wl)

err = htnet.SendMessage(ctx, peer.ID(), msg)
if err != nil {
t.Fatal(err)
}

recv.wait(1)
err = recv.waitDisconnected(1)
if err == nil { // we received a disconnect event
t.Fatal("disconnect event not expected")
}

// Threshold was 1. This will trigger a disconnection.
err = htnet.SendMessage(ctx, peer.ID(), msg)
if err != nil {
t.Fatal(err)
}

recv.wait(1)
err = recv.waitDisconnected(1)
if err != nil {
t.Fatal(err)
}
}
Loading