diff --git a/p2p/host/peerstore/pstoreds/addr_book.go b/p2p/host/peerstore/pstoreds/addr_book.go index 7e95a203e4..674802c4fb 100644 --- a/p2p/host/peerstore/pstoreds/addr_book.go +++ b/p2p/host/peerstore/pstoreds/addr_book.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "slices" "sort" "sync" "time" @@ -388,8 +389,9 @@ func (ab *dsAddrBook) supersededSignedAddrs(p peer.ID, newAddrs []ma.Multiaddr) return superseded } -// ttlIsConnected reports whether the given TTL marks the address as held by -// a live connection. +// ttlIsConnected reports whether the TTL marks the address as held by a +// live connection. Such entries bypass the per-peer cap and survive +// eviction. func ttlIsConnected(ttl time.Duration) bool { return ttl >= pstore.ConnectedAddrTTL } @@ -596,6 +598,43 @@ func (ab *dsAddrBook) setAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duratio return existingEntry } + // Per-peer cap on unconnected addrs. Entries held by a live connection + // (TTL >= ConnectedAddrTTL) bypass the cap and survive eviction. The + // cap bounds peerstore pollution from sources like DHT gossip while + // leaving addresses tied to active sessions intact. + maxCap := ab.opts.MaxAddrsPerPeer + incomingIsUnconnected := !ttlIsConnected(ttl) + unconnectedCount := 0 + if maxCap > 0 && incomingIsUnconnected { + for _, a := range pr.Addrs { + if !ttlIsConnected(time.Duration(a.Ttl)) { + unconnectedCount++ + } + } + } + // evictNearestUnconnected drops the unconnected entry from pr.Addrs with + // the soonest expiry. Returns false when every remaining entry is held by + // a live connection, in which case the caller must drop the new addr. + evictNearestUnconnected := func() bool { + victim := -1 + var soonest int64 + for i, a := range pr.Addrs { + if ttlIsConnected(time.Duration(a.Ttl)) { + continue + } + if victim == -1 || a.Expiry < soonest { + victim = i + soonest = a.Expiry + } + } + if victim == -1 { + return false + } + delete(addrsMap, string(pr.Addrs[victim].Addr)) + pr.Addrs = slices.Delete(pr.Addrs, victim, victim+1) + return true + } + var entries []*pb.AddrBookRecord_AddrEntry for _, incoming := range addrs { existingEntry := updateExisting(incoming) @@ -605,6 +644,13 @@ func (ab *dsAddrBook) setAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duratio // entries = append(entries, existingEntry) // } // } else { + if maxCap > 0 && incomingIsUnconnected && unconnectedCount >= maxCap { + if !evictNearestUnconnected() { + // Every existing addr is protected; drop the new one. + continue + } + unconnectedCount-- + } // new addr, add & broadcast entry := &pb.AddrBookRecord_AddrEntry{ Addr: incoming.Bytes(), @@ -612,6 +658,9 @@ func (ab *dsAddrBook) setAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duratio Expiry: newExp, } entries = append(entries, entry) + if incomingIsUnconnected { + unconnectedCount++ + } // note: there's a minor chance that writing the record will fail, in which case we would've broadcast // the addresses without persisting them. This is very unlikely and not much of an issue. diff --git a/p2p/host/peerstore/pstoreds/ds_test.go b/p2p/host/peerstore/pstoreds/ds_test.go index c5aeb70beb..e50b9c005e 100644 --- a/p2p/host/peerstore/pstoreds/ds_test.go +++ b/p2p/host/peerstore/pstoreds/ds_test.go @@ -2,6 +2,7 @@ package pstoreds import ( "context" + "fmt" "testing" "time" @@ -59,6 +60,10 @@ func TestDsAddrBook(t *testing.T) { opts := DefaultOpts() opts.GCPurgeInterval = 1 * time.Second opts.CacheSize = 1024 + // Shared addr-book suite inserts batches larger than the default + // per-peer cap; disable the cap so the suite exercises general + // behavior, not the cap path. + opts.MaxAddrsPerPeer = 0 clk := mockclock.NewMock() opts.Clock = clk @@ -69,6 +74,7 @@ func TestDsAddrBook(t *testing.T) { opts := DefaultOpts() opts.GCPurgeInterval = 1 * time.Second opts.CacheSize = 0 + opts.MaxAddrsPerPeer = 0 clk := mockclock.NewMock() opts.Clock = clk @@ -77,6 +83,128 @@ func TestDsAddrBook(t *testing.T) { } } +func TestDsMaxAddrsPerPeerEvictsNearestExpiry(t *testing.T) { + for name, dsFactory := range dstores { + t.Run(name, func(t *testing.T) { + opts := DefaultOpts() + opts.MaxAddrsPerPeer = 3 + clk := mockclock.NewMock() + opts.Clock = clk + + ds, closeDs := dsFactory(t) + defer closeDs() + ab, err := NewAddrBook(context.Background(), ds, opts) + require.NoError(t, err) + defer ab.Close() + + const p = peer.ID("peer-cap") + a1 := ma.StringCast("/ip4/1.2.3.4/tcp/1") + a2 := ma.StringCast("/ip4/1.2.3.4/tcp/2") + a3 := ma.StringCast("/ip4/1.2.3.4/tcp/3") + a4 := ma.StringCast("/ip4/1.2.3.4/tcp/4") + + ab.AddAddr(p, a1, time.Hour) // furthest expiry + ab.AddAddr(p, a2, 30*time.Minute) // middle + ab.AddAddr(p, a3, 10*time.Minute) // nearest expiry, will be evicted first + require.ElementsMatch(t, []ma.Multiaddr{a1, a2, a3}, ab.Addrs(p)) + + ab.AddAddr(p, a4, 45*time.Minute) + require.ElementsMatch(t, []ma.Multiaddr{a1, a2, a4}, ab.Addrs(p)) + }) + } +} + +func TestDsMaxAddrsPerPeerEnforcedOnSetAddrs(t *testing.T) { + for name, dsFactory := range dstores { + t.Run(name, func(t *testing.T) { + opts := DefaultOpts() + opts.MaxAddrsPerPeer = 2 + clk := mockclock.NewMock() + opts.Clock = clk + + ds, closeDs := dsFactory(t) + defer closeDs() + ab, err := NewAddrBook(context.Background(), ds, opts) + require.NoError(t, err) + defer ab.Close() + + const p = peer.ID("peer-setaddrs") + a1 := ma.StringCast("/ip4/1.2.3.4/tcp/1") + a2 := ma.StringCast("/ip4/1.2.3.4/tcp/2") + a3 := ma.StringCast("/ip4/1.2.3.4/tcp/3") + + ab.AddAddr(p, a1, time.Hour) // furthest expiry + ab.AddAddr(p, a2, 10*time.Minute) // nearest expiry, eviction target + require.ElementsMatch(t, []ma.Multiaddr{a1, a2}, ab.Addrs(p)) + + // SetAddrs with a new addr hits the cap; nearest-expiry a2 must go. + ab.SetAddrs(p, []ma.Multiaddr{a3}, 30*time.Minute) + require.ElementsMatch(t, []ma.Multiaddr{a1, a3}, ab.Addrs(p)) + }) + } +} + +// TestDsMaxAddrsPerPeerDisabled verifies that MaxAddrsPerPeer = 0 disables +// the cap so callers can store more than the default 64 addrs per peer. +func TestDsMaxAddrsPerPeerDisabled(t *testing.T) { + for name, dsFactory := range dstores { + t.Run(name, func(t *testing.T) { + opts := DefaultOpts() + opts.MaxAddrsPerPeer = 0 + clk := mockclock.NewMock() + opts.Clock = clk + + ds, closeDs := dsFactory(t) + defer closeDs() + ab, err := NewAddrBook(context.Background(), ds, opts) + require.NoError(t, err) + defer ab.Close() + + const p = peer.ID("peer-disabled") + const n = 200 // well above the default cap of 64 + addrs := make([]ma.Multiaddr, n) + for i := range addrs { + addrs[i] = ma.StringCast(fmt.Sprintf("/ip4/1.2.3.4/tcp/%d", i+1)) + ab.AddAddr(p, addrs[i], time.Hour) + } + require.Len(t, ab.Addrs(p), n) + }) + } +} + +func TestDsMaxAddrsPerPeerDoesNotEvictConnected(t *testing.T) { + for name, dsFactory := range dstores { + t.Run(name, func(t *testing.T) { + opts := DefaultOpts() + opts.MaxAddrsPerPeer = 2 + clk := mockclock.NewMock() + opts.Clock = clk + + ds, closeDs := dsFactory(t) + defer closeDs() + ab, err := NewAddrBook(context.Background(), ds, opts) + require.NoError(t, err) + defer ab.Close() + + const p = peer.ID("peer-connected") + live := ma.StringCast("/ip4/1.2.3.4/tcp/1") + a1 := ma.StringCast("/ip4/1.2.3.4/tcp/2") + a2 := ma.StringCast("/ip4/1.2.3.4/tcp/3") + a3 := ma.StringCast("/ip4/1.2.3.4/tcp/4") + + ab.AddAddr(p, live, pstore.ConnectedAddrTTL) + ab.AddAddr(p, a1, 10*time.Minute) + ab.AddAddr(p, a2, 20*time.Minute) + require.ElementsMatch(t, []ma.Multiaddr{live, a1, a2}, ab.Addrs(p)) + + // Adding a third unconnected addr must evict an unconnected one + // (a1 has the soonest expiry), never the connected addr. + ab.AddAddr(p, a3, 30*time.Minute) + require.ElementsMatch(t, []ma.Multiaddr{live, a2, a3}, ab.Addrs(p)) + }) + } +} + // TestDsConsumePeerRecordReplacesStaleAddrs verifies replace-semantics on a // newer signed peer record: addrs dropped from the new record are evicted, // while unsigned addrs and addrs held by a live connection are kept. diff --git a/p2p/host/peerstore/pstoreds/peerstore.go b/p2p/host/peerstore/pstoreds/peerstore.go index 895ac21514..39f66de51d 100644 --- a/p2p/host/peerstore/pstoreds/peerstore.go +++ b/p2p/host/peerstore/pstoreds/peerstore.go @@ -23,6 +23,13 @@ type Options struct { // MaxProtocols is the maximum number of protocols we store for one peer. MaxProtocols int + // MaxAddrsPerPeer bounds the number of unconnected addresses stored per + // peer. When the limit is reached, the unconnected entry with the nearest + // expiry is evicted before the new entry is inserted. Addresses held by a + // live connection (TTL >= ConnectedAddrTTL) are not counted and never + // evicted. A value <= 0 disables the cap. + MaxAddrsPerPeer int + // Sweep interval to purge expired addresses from the datastore. If this is a zero value, GC will not run // automatically, but it'll be available on demand via explicit calls. GCPurgeInterval time.Duration @@ -42,6 +49,7 @@ type Options struct { // // * Cache size: 1024. // * MaxProtocols: 1024. +// * MaxAddrsPerPeer: 64. // * GC purge interval: 2 hours. // * GC lookahead interval: disabled. // * GC initial delay: 60 seconds. @@ -49,6 +57,7 @@ func DefaultOpts() Options { return Options{ CacheSize: 1024, MaxProtocols: 1024, + MaxAddrsPerPeer: 64, GCPurgeInterval: 2 * time.Hour, GCLookaheadInterval: 0, GCInitialDelay: 60 * time.Second, diff --git a/p2p/host/peerstore/pstoremem/addr_book.go b/p2p/host/peerstore/pstoremem/addr_book.go index a09c172312..0ffad1b16e 100644 --- a/p2p/host/peerstore/pstoremem/addr_book.go +++ b/p2p/host/peerstore/pstoremem/addr_book.go @@ -167,6 +167,12 @@ func (rc realclock) Now() time.Time { const ( defaultMaxSignedPeerRecords = 100_000 defaultMaxUnconnectedAddrs = 1_000_000 + // defaultMaxAddrsPerPeer caps the unconnected addresses stored per + // peer. Sized well above observed real-world maxima (~26 for + // well-connected multi-transport nodes) to leave honest peers + // untouched while bounding DHT pollution from third parties gossiping + // stale or misconfigured peerstore contents. + defaultMaxAddrsPerPeer = 64 ) // memoryAddrBook manages addresses. @@ -176,6 +182,7 @@ type memoryAddrBook struct { signedPeerRecords map[peer.ID]*peerRecordState maxUnconnectedAddrs int maxSignedPeerRecords int + maxAddrsPerPeer int refCount sync.WaitGroup cancel func() @@ -198,6 +205,7 @@ func NewAddrBook(opts ...AddrBookOption) *memoryAddrBook { clock: realclock{}, maxUnconnectedAddrs: defaultMaxUnconnectedAddrs, maxSignedPeerRecords: defaultMaxSignedPeerRecords, + maxAddrsPerPeer: defaultMaxAddrsPerPeer, } for _, opt := range opts { opt(ab) @@ -234,6 +242,18 @@ func WithMaxSignedPeerRecords(n int) AddrBookOption { } } +// WithMaxAddressesPerPeer caps the unconnected addresses stored per peer. +// When the cap is full, adding a new addr evicts the unconnected entry +// with the nearest expiry. Addresses held by a live connection +// (TTL >= ConnectedAddrTTL) bypass the cap and survive eviction. Pass 0 +// or a negative value to disable the cap. Defaults to 64. +func WithMaxAddressesPerPeer(n int) AddrBookOption { + return func(b *memoryAddrBook) error { + b.maxAddrsPerPeer = n + return nil + } +} + // background periodically schedules a gc func (mab *memoryAddrBook) background(ctx context.Context) { defer mab.refCount.Done() @@ -392,6 +412,43 @@ func (mab *memoryAddrBook) maybeDeleteSignedPeerRecordUnlocked(p peer.ID) { } } +// numUnconnectedAddrsForPeerUnlocked returns how many of p's stored addrs +// are not held by a live connection. +func (mab *memoryAddrBook) numUnconnectedAddrsForPeerUnlocked(p peer.ID) int { + n := 0 + for _, a := range mab.addrs.Addrs[p] { + if !a.IsConnected() { + n++ + } + } + return n +} + +// evictNearestExpiryUnconnectedForPeerUnlocked drops p's unconnected addr +// with the earliest expiry. Returns false when every remaining addr for p +// is held by a live connection; the caller must then drop the incoming +// addr. +// +// Shorter-TTL entries (e.g. DHT gossip at TempAddrTTL) expire sooner than +// identify-written peer-vouched addrs (RecentlyConnectedAddrTTL), so the +// rule sheds gossip first without classifying sources. +func (mab *memoryAddrBook) evictNearestExpiryUnconnectedForPeerUnlocked(p peer.ID) bool { + var victim *expiringAddr + for _, a := range mab.addrs.Addrs[p] { + if a.IsConnected() { + continue + } + if victim == nil || a.Expiry.Before(victim.Expiry) { + victim = a + } + } + if victim == nil { + return false + } + mab.addrs.Delete(victim) + return true +} + func (mab *memoryAddrBook) addAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) { mab.mu.Lock() defer mab.mu.Unlock() @@ -426,6 +483,15 @@ func (mab *memoryAddrBook) addAddrsUnlocked(p peer.ID, addrs []ma.Multiaddr, ttl } a, found := mab.addrs.FindAddr(p, addr) if !found { + // Enforce the per-peer cap on unconnected addrs. Entries held + // by a live connection are not counted. A non-positive cap + // disables the check. + if mab.maxAddrsPerPeer > 0 && !ttlIsConnected(ttl) && mab.numUnconnectedAddrsForPeerUnlocked(p) >= mab.maxAddrsPerPeer { + if !mab.evictNearestExpiryUnconnectedForPeerUnlocked(p) { + // Every existing addr is protected; drop the new one. + continue + } + } // not found, announce it. entry := &expiringAddr{Addr: addr, Expiry: exp, TTL: ttl, Peer: p} mab.addrs.Insert(entry) @@ -492,6 +558,13 @@ func (mab *memoryAddrBook) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Du if !ttlIsConnected(ttl) && mab.addrs.NumUnconnectedAddrs() >= mab.maxUnconnectedAddrs { continue } + // Same per-peer cap check as addAddrsUnlocked: bound + // how many unconnected addrs we keep for one peer. + if mab.maxAddrsPerPeer > 0 && !ttlIsConnected(ttl) && mab.numUnconnectedAddrsForPeerUnlocked(p) >= mab.maxAddrsPerPeer { + if !mab.evictNearestExpiryUnconnectedForPeerUnlocked(p) { + continue + } + } entry := &expiringAddr{Addr: addr, Expiry: exp, TTL: ttl, Peer: p} mab.addrs.Insert(entry) mab.subManager.BroadcastAddr(p, addr) diff --git a/p2p/host/peerstore/pstoremem/addr_book_test.go b/p2p/host/peerstore/pstoremem/addr_book_test.go index 72cc474498..9fc9c70b7a 100644 --- a/p2p/host/peerstore/pstoremem/addr_book_test.go +++ b/p2p/host/peerstore/pstoremem/addr_book_test.go @@ -190,6 +190,73 @@ func TestPeerLimits(t *testing.T) { require.Equal(t, 1024, ab.addrs.NumUnconnectedAddrs()) } +// TestMaxAddrsPerPeerEvictsNearestExpiry verifies the per-peer cap evicts +// the stored addr with the soonest expiry first, not just the oldest insert. +func TestMaxAddrsPerPeerEvictsNearestExpiry(t *testing.T) { + ab := NewAddrBook(WithMaxAddressesPerPeer(3)) + defer ab.Close() + + const p = peer.ID("peer-cap") + a1 := ma.StringCast("/ip4/1.2.3.4/tcp/1") + a2 := ma.StringCast("/ip4/1.2.3.4/tcp/2") + a3 := ma.StringCast("/ip4/1.2.3.4/tcp/3") + a4 := ma.StringCast("/ip4/1.2.3.4/tcp/4") + + ab.AddAddr(p, a1, time.Hour) // furthest expiry + ab.AddAddr(p, a2, 30*time.Minute) // middle + ab.AddAddr(p, a3, 10*time.Minute) // nearest expiry, will be evicted first + require.ElementsMatch(t, []ma.Multiaddr{a1, a2, a3}, ab.Addrs(p)) + + // Adding a fourth addr forces eviction; a3 (nearest expiry) must go. + ab.AddAddr(p, a4, 45*time.Minute) + require.ElementsMatch(t, []ma.Multiaddr{a1, a2, a4}, ab.Addrs(p)) +} + +// TestMaxAddrsPerPeerEnforcedOnSetAddrs verifies the per-peer cap fires on +// the SetAddrs path too, not only AddAddr. +func TestMaxAddrsPerPeerEnforcedOnSetAddrs(t *testing.T) { + ab := NewAddrBook(WithMaxAddressesPerPeer(2)) + defer ab.Close() + + const p = peer.ID("peer-setaddrs") + a1 := ma.StringCast("/ip4/1.2.3.4/tcp/1") + a2 := ma.StringCast("/ip4/1.2.3.4/tcp/2") + a3 := ma.StringCast("/ip4/1.2.3.4/tcp/3") + + ab.AddAddr(p, a1, time.Hour) // furthest expiry + ab.AddAddr(p, a2, 10*time.Minute) // nearest expiry, eviction target + require.ElementsMatch(t, []ma.Multiaddr{a1, a2}, ab.Addrs(p)) + + // SetAddrs with a new addr hits the cap; nearest-expiry a2 must go. + ab.SetAddrs(p, []ma.Multiaddr{a3}, 30*time.Minute) + require.ElementsMatch(t, []ma.Multiaddr{a1, a3}, ab.Addrs(p)) +} + +// TestMaxAddrsPerPeerDoesNotEvictConnected verifies that addrs held by live +// connections (TTL >= ConnectedAddrTTL) are neither counted toward the cap +// nor eligible for eviction. +func TestMaxAddrsPerPeerDoesNotEvictConnected(t *testing.T) { + ab := NewAddrBook(WithMaxAddressesPerPeer(2)) + defer ab.Close() + + const p = peer.ID("peer-connected") + live := ma.StringCast("/ip4/1.2.3.4/tcp/1") + a1 := ma.StringCast("/ip4/1.2.3.4/tcp/2") + a2 := ma.StringCast("/ip4/1.2.3.4/tcp/3") + a3 := ma.StringCast("/ip4/1.2.3.4/tcp/4") + + // Pin one addr via ConnectedAddrTTL. It should not count toward the cap. + ab.AddAddr(p, live, peerstore.ConnectedAddrTTL) + ab.AddAddr(p, a1, 10*time.Minute) + ab.AddAddr(p, a2, 20*time.Minute) + require.ElementsMatch(t, []ma.Multiaddr{live, a1, a2}, ab.Addrs(p)) + + // Adding a third unconnected addr must evict an unconnected one + // (a1 has the soonest expiry), never the connected addr. + ab.AddAddr(p, a3, 30*time.Minute) + require.ElementsMatch(t, []ma.Multiaddr{live, a2, a3}, ab.Addrs(p)) +} + // TestConsumePeerRecordReplacesStaleAddrs verifies replace-semantics on a // newer signed peer record: addrs dropped from the new record are evicted, // while unsigned addrs and addrs held by a live connection are kept. diff --git a/p2p/host/peerstore/pstoremem/inmem_test.go b/p2p/host/peerstore/pstoremem/inmem_test.go index 0a047eeede..06207b1919 100644 --- a/p2p/host/peerstore/pstoremem/inmem_test.go +++ b/p2p/host/peerstore/pstoremem/inmem_test.go @@ -48,8 +48,11 @@ func TestPeerstoreProtoStoreLimits(t *testing.T) { func TestInMemoryAddrBook(t *testing.T) { clk := mockClock.NewMock() + // Shared addr-book suite inserts batches larger than the default + // per-peer cap; disable the cap so the suite exercises general + // behavior, not the cap path. pt.TestAddrBook(t, func() (pstore.AddrBook, func()) { - ps, err := NewPeerstore(WithClock(clk)) + ps, err := NewPeerstore(WithClock(clk), WithMaxAddressesPerPeer(0)) require.NoError(t, err) return ps, func() { ps.Close() } }, clk)