Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions p2p/host/peerstore/pstoreds/addr_book.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,13 @@ func (ab *dsAddrBook) ClearAddrs(p peer.ID) {
}
}

// ttlIsConnected reports whether the given TTL marks the address as held by
// a live connection. Such entries are not subject to the per-peer cap and are
// never evicted to make room for incoming addrs.
func ttlIsConnected(ttl time.Duration) bool {
return ttl >= pstore.ConnectedAddrTTL
}

Comment thread
lidel marked this conversation as resolved.
func (ab *dsAddrBook) setAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration, mode ttlWriteMode, _ bool) (err error) {
if len(addrs) == 0 {
return nil
Expand Down Expand Up @@ -517,6 +524,43 @@ func (ab *dsAddrBook) setAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duratio
return existingEntry
}

// Per-peer cap on unconnected addrs. Entries held by a live connection
// (TTL >= ConnectedAddrTTL) are not counted and never evicted; this
// bounds peerstore pollution from sources like DHT gossip while leaving
// addresses tied to active sessions intact.
maxCap := ab.opts.MaxAddrsPerPeer
incomingIsUnconnected := !ttlIsConnected(ttl)
unconnectedCount := 0
if maxCap > 0 && incomingIsUnconnected {
for _, a := range pr.Addrs {
if !ttlIsConnected(time.Duration(a.Ttl)) {
unconnectedCount++
}
}
}
// evictNearestUnconnected drops the unconnected entry from pr.Addrs with
// the soonest expiry. Returns false when every remaining entry is held by
// a live connection, in which case the caller must drop the new addr.
evictNearestUnconnected := func() bool {
victim := -1
var soonest int64
for i, a := range pr.Addrs {
if ttlIsConnected(time.Duration(a.Ttl)) {
continue
}
if victim == -1 || a.Expiry < soonest {
victim = i
soonest = a.Expiry
}
}
if victim == -1 {
return false
}
delete(addrsMap, string(pr.Addrs[victim].Addr))
pr.Addrs = append(pr.Addrs[:victim], pr.Addrs[victim+1:]...)
Comment thread
lidel marked this conversation as resolved.
Outdated
return true
}

var entries []*pb.AddrBookRecord_AddrEntry
for _, incoming := range addrs {
existingEntry := updateExisting(incoming)
Expand All @@ -526,13 +570,23 @@ func (ab *dsAddrBook) setAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duratio
// entries = append(entries, existingEntry)
// }
// } else {
if maxCap > 0 && incomingIsUnconnected && unconnectedCount >= maxCap {
if !evictNearestUnconnected() {
// Every existing addr is protected; drop the new one.
continue
}
unconnectedCount--
}
// new addr, add & broadcast
entry := &pb.AddrBookRecord_AddrEntry{
Addr: incoming.Bytes(),
Ttl: int64(ttl),
Expiry: newExp,
}
entries = append(entries, entry)
if incomingIsUnconnected {
unconnectedCount++
}

// note: there's a minor chance that writing the record will fail, in which case we would've broadcast
// the addresses without persisting them. This is very unlikely and not much of an issue.
Expand Down
71 changes: 71 additions & 0 deletions p2p/host/peerstore/pstoreds/ds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ import (
"testing"
"time"

"github.com/libp2p/go-libp2p/core/peer"
pstore "github.com/libp2p/go-libp2p/core/peerstore"
pt "github.com/libp2p/go-libp2p/p2p/host/peerstore/test"

mockclock "github.com/benbjohnson/clock"
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/sync"
ma "github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -54,6 +56,10 @@ func TestDsAddrBook(t *testing.T) {
opts := DefaultOpts()
opts.GCPurgeInterval = 1 * time.Second
opts.CacheSize = 1024
// Shared addr-book suite inserts batches larger than the default
// per-peer cap; disable the cap so the suite exercises general
// behavior, not the cap path.
opts.MaxAddrsPerPeer = 0
clk := mockclock.NewMock()
opts.Clock = clk

Expand All @@ -64,6 +70,7 @@ func TestDsAddrBook(t *testing.T) {
opts := DefaultOpts()
opts.GCPurgeInterval = 1 * time.Second
opts.CacheSize = 0
opts.MaxAddrsPerPeer = 0
clk := mockclock.NewMock()
opts.Clock = clk

Expand All @@ -72,6 +79,70 @@ func TestDsAddrBook(t *testing.T) {
}
}

func TestDsMaxAddrsPerPeerEvictsNearestExpiry(t *testing.T) {
for name, dsFactory := range dstores {
t.Run(name, func(t *testing.T) {
opts := DefaultOpts()
opts.MaxAddrsPerPeer = 3
clk := mockclock.NewMock()
opts.Clock = clk

ds, closeDs := dsFactory(t)
defer closeDs()
ab, err := NewAddrBook(context.Background(), ds, opts)
require.NoError(t, err)
defer ab.Close()

const p = peer.ID("peer-cap")
a1 := ma.StringCast("/ip4/1.2.3.4/tcp/1")
a2 := ma.StringCast("/ip4/1.2.3.4/tcp/2")
a3 := ma.StringCast("/ip4/1.2.3.4/tcp/3")
a4 := ma.StringCast("/ip4/1.2.3.4/tcp/4")

ab.AddAddr(p, a1, time.Hour) // furthest expiry
ab.AddAddr(p, a2, 30*time.Minute) // middle
ab.AddAddr(p, a3, 10*time.Minute) // nearest expiry, will be evicted first
require.ElementsMatch(t, []ma.Multiaddr{a1, a2, a3}, ab.Addrs(p))

ab.AddAddr(p, a4, 45*time.Minute)
require.ElementsMatch(t, []ma.Multiaddr{a1, a2, a4}, ab.Addrs(p))
})
}
}

func TestDsMaxAddrsPerPeerDoesNotEvictConnected(t *testing.T) {
for name, dsFactory := range dstores {
t.Run(name, func(t *testing.T) {
opts := DefaultOpts()
opts.MaxAddrsPerPeer = 2
clk := mockclock.NewMock()
opts.Clock = clk

ds, closeDs := dsFactory(t)
defer closeDs()
ab, err := NewAddrBook(context.Background(), ds, opts)
require.NoError(t, err)
defer ab.Close()

const p = peer.ID("peer-connected")
live := ma.StringCast("/ip4/1.2.3.4/tcp/1")
a1 := ma.StringCast("/ip4/1.2.3.4/tcp/2")
a2 := ma.StringCast("/ip4/1.2.3.4/tcp/3")
a3 := ma.StringCast("/ip4/1.2.3.4/tcp/4")

ab.AddAddr(p, live, pstore.ConnectedAddrTTL)
ab.AddAddr(p, a1, 10*time.Minute)
ab.AddAddr(p, a2, 20*time.Minute)
require.ElementsMatch(t, []ma.Multiaddr{live, a1, a2}, ab.Addrs(p))

// Adding a third unconnected addr must evict an unconnected one
// (a1 has the soonest expiry), never the connected addr.
ab.AddAddr(p, a3, 30*time.Minute)
require.ElementsMatch(t, []ma.Multiaddr{live, a2, a3}, ab.Addrs(p))
})
}
}

func TestDsKeyBook(t *testing.T) {
for name, dsFactory := range dstores {
t.Run(name, func(t *testing.T) {
Expand Down
9 changes: 9 additions & 0 deletions p2p/host/peerstore/pstoreds/peerstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ type Options struct {
// MaxProtocols is the maximum number of protocols we store for one peer.
MaxProtocols int

// MaxAddrsPerPeer bounds the number of unconnected addresses stored per
// peer. When the limit is reached, the unconnected entry with the nearest
// expiry is evicted before the new entry is inserted. Addresses held by a
// live connection (TTL >= ConnectedAddrTTL) are not counted and never
// evicted. A value <= 0 disables the cap.
MaxAddrsPerPeer int

// Sweep interval to purge expired addresses from the datastore. If this is a zero value, GC will not run
// automatically, but it'll be available on demand via explicit calls.
GCPurgeInterval time.Duration
Expand All @@ -42,13 +49,15 @@ type Options struct {
//
// * Cache size: 1024.
// * MaxProtocols: 1024.
// * MaxAddrsPerPeer: 64.
// * GC purge interval: 2 hours.
// * GC lookahead interval: disabled.
// * GC initial delay: 60 seconds.
func DefaultOpts() Options {
return Options{
CacheSize: 1024,
MaxProtocols: 1024,
MaxAddrsPerPeer: 64,
GCPurgeInterval: 2 * time.Hour,
GCLookaheadInterval: 0,
GCInitialDelay: 60 * time.Second,
Expand Down
74 changes: 74 additions & 0 deletions p2p/host/peerstore/pstoremem/addr_book.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,12 @@ func (rc realclock) Now() time.Time {
const (
defaultMaxSignedPeerRecords = 100_000
defaultMaxUnconnectedAddrs = 1_000_000
// defaultMaxAddrsPerPeer bounds the number of unconnected addresses
// stored per peer. Set conservatively above observed real-world
// maxima (~26 for well-connected multi-transport nodes) so honest
// peers are not clipped while bounding DHT pollution where third
// parties gossip stale or misconfigured peerstore contents.
defaultMaxAddrsPerPeer = 64
)

// memoryAddrBook manages addresses.
Expand All @@ -174,6 +180,7 @@ type memoryAddrBook struct {
signedPeerRecords map[peer.ID]*peerRecordState
maxUnconnectedAddrs int
maxSignedPeerRecords int
maxAddrsPerPeer int

refCount sync.WaitGroup
cancel func()
Expand All @@ -196,6 +203,7 @@ func NewAddrBook(opts ...AddrBookOption) *memoryAddrBook {
clock: realclock{},
maxUnconnectedAddrs: defaultMaxUnconnectedAddrs,
maxSignedPeerRecords: defaultMaxSignedPeerRecords,
maxAddrsPerPeer: defaultMaxAddrsPerPeer,
}
for _, opt := range opts {
opt(ab)
Expand Down Expand Up @@ -232,6 +240,19 @@ func WithMaxSignedPeerRecords(n int) AddrBookOption {
}
}

// WithMaxAddressesPerPeer sets the maximum number of unconnected addresses
// stored per peer. When the limit is reached, the unconnected entry with
// the nearest expiry is evicted before the new entry is inserted.
// Addresses held by a live connection (TTL >= ConnectedAddrTTL) are not
// counted and never evicted. A value <= 0 disables the cap. Default is
// defaultMaxAddrsPerPeer.
func WithMaxAddressesPerPeer(n int) AddrBookOption {
return func(b *memoryAddrBook) error {
b.maxAddrsPerPeer = n
return nil
}
}

// background periodically schedules a gc
func (mab *memoryAddrBook) background(ctx context.Context) {
defer mab.refCount.Done()
Expand Down Expand Up @@ -330,6 +351,43 @@ func (mab *memoryAddrBook) maybeDeleteSignedPeerRecordUnlocked(p peer.ID) {
}
}

// numUnconnectedAddrsForPeerUnlocked returns how many of p's stored addrs
// are not held by a live connection.
func (mab *memoryAddrBook) numUnconnectedAddrsForPeerUnlocked(p peer.ID) int {
n := 0
for _, a := range mab.addrs.Addrs[p] {
if !a.IsConnected() {
n++
}
}
return n
}

// evictNearestExpiryUnconnectedForPeerUnlocked drops p's unconnected addr
// with the earliest expiry. Returns false when every remaining addr for p
// is held by a live connection, in which case the caller must drop the
// incoming addr rather than evicting a connected one.
//
// Shorter-TTL entries (e.g. DHT gossip at TempAddrTTL) expire sooner than
// identify-written peer-vouched addrs (RecentlyConnectedAddrTTL), so this
// rule naturally sheds gossip first without needing to classify sources.
func (mab *memoryAddrBook) evictNearestExpiryUnconnectedForPeerUnlocked(p peer.ID) bool {
var victim *expiringAddr
for _, a := range mab.addrs.Addrs[p] {
if a.IsConnected() {
continue
}
if victim == nil || a.Expiry.Before(victim.Expiry) {
victim = a
}
}
if victim == nil {
return false
}
mab.addrs.Delete(victim)
return true
}

func (mab *memoryAddrBook) addAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) {
mab.mu.Lock()
defer mab.mu.Unlock()
Expand Down Expand Up @@ -364,6 +422,15 @@ func (mab *memoryAddrBook) addAddrsUnlocked(p peer.ID, addrs []ma.Multiaddr, ttl
}
a, found := mab.addrs.FindAddr(p, addr)
if !found {
// Enforce the per-peer cap on unconnected addrs. Entries held
// by a live connection are not counted. A non-positive cap
// disables the check.
if mab.maxAddrsPerPeer > 0 && !ttlIsConnected(ttl) && mab.numUnconnectedAddrsForPeerUnlocked(p) >= mab.maxAddrsPerPeer {
if !mab.evictNearestExpiryUnconnectedForPeerUnlocked(p) {
// Every existing addr is protected; drop the new one.
continue
}
}
// not found, announce it.
entry := &expiringAddr{Addr: addr, Expiry: exp, TTL: ttl, Peer: p}
mab.addrs.Insert(entry)
Expand Down Expand Up @@ -430,6 +497,13 @@ func (mab *memoryAddrBook) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Du
if !ttlIsConnected(ttl) && mab.addrs.NumUnconnectedAddrs() >= mab.maxUnconnectedAddrs {
continue
}
// Same per-peer cap check as addAddrsUnlocked: bound
// how many unconnected addrs we keep for one peer.
if mab.maxAddrsPerPeer > 0 && !ttlIsConnected(ttl) && mab.numUnconnectedAddrsForPeerUnlocked(p) >= mab.maxAddrsPerPeer {
if !mab.evictNearestExpiryUnconnectedForPeerUnlocked(p) {
continue
}
}
entry := &expiringAddr{Addr: addr, Expiry: exp, TTL: ttl, Peer: p}
mab.addrs.Insert(entry)
mab.subManager.BroadcastAddr(p, addr)
Expand Down
Loading
Loading