Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 80 additions & 1 deletion p2p/host/peerstore/pstoreds/addr_book.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,21 @@ func (ab *dsAddrBook) AddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duratio

// ConsumePeerRecord adds addresses from a signed peer.PeerRecord (contained in
// a record.Envelope), which will expire after the given TTL.
// See https://godoc.org/github.com/libp2p/go-libp2p/core/peerstore#CertifiedAddrBook for more details.
// See https://godoc.org/github.com/libp2p/go-libp2p/core/peerstore#CertifiedAddrBook
// for more details.
//
// The signed peer record's Seq is treated as monotonic per peer: a record
// with a Seq lower than the last accepted one is rejected. Equal Seq is
// accepted as a TTL refresh.
//
// When a newer signed record is accepted, addrs that were present in the
// previously stored signed record but absent in the new one are evicted, so
// the peerstore reflects the peer's current self-advertised set instead of
// the union of every record we have ever seen. Unsigned addrs (added via
// AddAddr / SetAddr from sources like DHT gossip, or from an identify
// exchange where the peer did not send a signed record) are not touched, and
// addrs held by a live connection (TTL >= ConnectedAddrTTL) are also kept so
// active sessions are not dropped.
func (ab *dsAddrBook) ConsumePeerRecord(recordEnvelope *record.Envelope, ttl time.Duration) (bool, error) {
r, err := recordEnvelope.Record()
if err != nil {
Expand All @@ -303,6 +317,15 @@ func (ab *dsAddrBook) ConsumePeerRecord(recordEnvelope *record.Envelope, ttl tim
}

addrs := cleanAddrs(rec.Addrs, rec.PeerID)

// Diff against the previously stored signed record so we can drop addrs
// the peer no longer advertises before adding the new ones.
if superseded := ab.supersededSignedAddrs(rec.PeerID, addrs); len(superseded) > 0 {
if err := ab.deleteAddrs(rec.PeerID, superseded); err != nil {
return false, err
}
}

err = ab.setAddrs(rec.PeerID, addrs, ttl, ttlExtend, true)
if err != nil {
return false, err
Expand All @@ -315,6 +338,62 @@ func (ab *dsAddrBook) ConsumePeerRecord(recordEnvelope *record.Envelope, ttl tim
return true, nil
}

// supersededSignedAddrs returns addrs that were present in the previously
// stored signed peer record for p but are absent in newAddrs. Addrs held by
// a live connection (TTL >= ConnectedAddrTTL) are excluded so an active
// session is not torn down when the peer rotates its advertised set.
func (ab *dsAddrBook) supersededSignedAddrs(p peer.ID, newAddrs []ma.Multiaddr) []ma.Multiaddr {
prevEnv := ab.GetPeerRecord(p)
if prevEnv == nil {
return nil
}
prev, err := prevEnv.Record()
if err != nil {
return nil
}
prevRec, ok := prev.(*peer.PeerRecord)
if !ok {
return nil
}

newSet := make(map[string]struct{}, len(newAddrs))
for _, a := range newAddrs {
newSet[string(a.Bytes())] = struct{}{}
}

pr, err := ab.loadRecord(p, true, false)
if err != nil {
return nil
}
pr.RLock()
connected := make(map[string]struct{})
for _, a := range pr.Addrs {
if ttlIsConnected(time.Duration(a.Ttl)) {
connected[string(a.Addr)] = struct{}{}
}
}
pr.RUnlock()

superseded := make([]ma.Multiaddr, 0, len(prevRec.Addrs))
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A very minor nit: if you return an iter.Seq you avoid this allocation.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd lean toward keeping the slice here: caller gates on len(superseded) > 0 and deleteInPlace iterates the set per surviving entry, so iter.Seq would just push the alloc into deleteAddrs or force a signature change.. feels like not worth the noise.

for _, a := range prevRec.Addrs {
key := string(a.Bytes())
if _, still := newSet[key]; still {
continue
}
if _, isConn := connected[key]; isConn {
continue
}
superseded = append(superseded, a)
}
return superseded
}

// ttlIsConnected reports whether the given TTL marks the address as held by
// a live connection.
func ttlIsConnected(ttl time.Duration) bool {
return ttl >= pstore.ConnectedAddrTTL
}

func (ab *dsAddrBook) latestPeerRecordSeq(p peer.ID) uint64 {
pr, err := ab.loadRecord(p, true, false)
if err != nil {
Expand Down
59 changes: 59 additions & 0 deletions p2p/host/peerstore/pstoreds/ds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,17 @@ import (
"testing"
"time"

"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
pstore "github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/core/record"
"github.com/libp2p/go-libp2p/core/test"
pt "github.com/libp2p/go-libp2p/p2p/host/peerstore/test"

mockclock "github.com/benbjohnson/clock"
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/sync"
ma "github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -72,6 +77,60 @@ func TestDsAddrBook(t *testing.T) {
}
}

// TestDsConsumePeerRecordReplacesStaleAddrs verifies replace-semantics on a
// newer signed peer record: addrs dropped from the new record are evicted,
// while unsigned addrs and addrs held by a live connection are kept.
func TestDsConsumePeerRecordReplacesStaleAddrs(t *testing.T) {
for name, dsFactory := range dstores {
t.Run(name, func(t *testing.T) {
opts := DefaultOpts()
store, closeDs := dsFactory(t)
defer closeDs()
ab, err := NewAddrBook(context.Background(), store, opts)
require.NoError(t, err)
defer ab.Close()

priv, _, err := test.RandTestKeyPair(crypto.Ed25519, 256)
require.NoError(t, err)
id, err := peer.IDFromPrivateKey(priv)
require.NoError(t, err)

keep := ma.StringCast("/ip4/1.2.3.4/tcp/1")
drop := ma.StringCast("/ip4/1.2.3.4/tcp/2")
unsigned := ma.StringCast("/ip4/1.2.3.4/tcp/3")
connected := ma.StringCast("/ip4/1.2.3.4/tcp/4")

rec1 := peer.NewPeerRecord()
rec1.PeerID = id
rec1.Seq = 1
rec1.Addrs = []ma.Multiaddr{keep, drop, connected}
env1, err := record.Seal(rec1, priv)
require.NoError(t, err)

accepted, err := ab.ConsumePeerRecord(env1, time.Hour)
require.NoError(t, err)
require.True(t, accepted)

ab.AddAddr(id, connected, pstore.ConnectedAddrTTL)
ab.AddAddr(id, unsigned, time.Hour)
require.ElementsMatch(t, []ma.Multiaddr{keep, drop, connected, unsigned}, ab.Addrs(id))

rec2 := peer.NewPeerRecord()
rec2.PeerID = id
rec2.Seq = 2
rec2.Addrs = []ma.Multiaddr{keep}
env2, err := record.Seal(rec2, priv)
require.NoError(t, err)

accepted, err = ab.ConsumePeerRecord(env2, time.Hour)
require.NoError(t, err)
require.True(t, accepted)

require.ElementsMatch(t, []ma.Multiaddr{keep, connected, unsigned}, ab.Addrs(id))
})
}
}

func TestDsKeyBook(t *testing.T) {
for name, dsFactory := range dstores {
t.Run(name, func(t *testing.T) {
Expand Down
68 changes: 65 additions & 3 deletions p2p/host/peerstore/pstoremem/addr_book.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ func ttlIsConnected(ttl time.Duration) bool {

type peerRecordState struct {
Envelope *record.Envelope
Seq uint64
// Seq is the sequence number from the stored signed peer record. Newer
// records (higher Seq) supersede older ones for the same peer.
Seq uint64
}

// Essentially Go stdlib's Priority Queue example
Expand Down Expand Up @@ -289,8 +291,23 @@ func (mab *memoryAddrBook) AddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Du
mab.addAddrs(p, addrs, ttl)
}

// ConsumePeerRecord adds addresses from a signed peer.PeerRecord, which will expire after the given TTL.
// See https://godoc.org/github.com/libp2p/go-libp2p/core/peerstore#CertifiedAddrBook for more details.
// ConsumePeerRecord adds addresses from a signed peer.PeerRecord, which will
// expire after the given TTL. See
// https://godoc.org/github.com/libp2p/go-libp2p/core/peerstore#CertifiedAddrBook
// for more details.
//
// The signed peer record's Seq is treated as monotonic per peer: a record with
// a Seq lower than the last accepted one is rejected. Equal Seq is accepted as
// a TTL refresh.
//
// When a newer signed record is accepted, addrs that were present in the
// previously stored signed record but absent in the new one are evicted, so
// the peerstore reflects the peer's current self-advertised set instead of
// the union of every record we have ever seen. Unsigned addrs (added via
// AddAddr / SetAddr from sources like DHT gossip, or from an identify
// exchange where the peer did not send a signed record) are not touched, and
// addrs held by a live connection (TTL >= ConnectedAddrTTL) are also kept so
// active sessions are not dropped.
func (mab *memoryAddrBook) ConsumePeerRecord(recordEnvelope *record.Envelope, ttl time.Duration) (bool, error) {
r, err := recordEnvelope.Record()
if err != nil {
Expand All @@ -316,6 +333,33 @@ func (mab *memoryAddrBook) ConsumePeerRecord(recordEnvelope *record.Envelope, tt
if !found && len(mab.signedPeerRecords) >= mab.maxSignedPeerRecords {
return false, errors.New("too many signed peer records")
}

// Drop addrs from the previous signed record that are absent in the
// new one; addrs held by a live connection are preserved so we don't
// drop an active session if the peer rotates its advertised set. The
// prior addr set is recovered by decoding the stored envelope; that
// call caches on first access (core/record/envelope.go), so repeated
// lookups are cheap.
if found {
if prevRec := prevSignedAddrs(lastState); len(prevRec) > 0 {
newAddrSet := make(map[string]struct{}, len(rec.Addrs))
for _, a := range rec.Addrs {
newAddrSet[string(a.Bytes())] = struct{}{}
}
for _, a := range prevRec {
key := string(a.Bytes())
if _, still := newAddrSet[key]; still {
continue
}
ea, ok := mab.addrs.Addrs[rec.PeerID][key]
if !ok || ea.IsConnected() {
continue
}
mab.addrs.Delete(ea)
}
}
}

mab.signedPeerRecords[rec.PeerID] = &peerRecordState{
Envelope: recordEnvelope,
Seq: rec.Seq,
Expand All @@ -324,6 +368,24 @@ func (mab *memoryAddrBook) ConsumePeerRecord(recordEnvelope *record.Envelope, tt
return true, nil
}

// prevSignedAddrs returns the addrs from the stored signed peer record, or
// nil if the envelope is absent or can't be decoded. Envelope.Record() caches
// its result, so repeated calls are cheap.
func prevSignedAddrs(s *peerRecordState) []ma.Multiaddr {
if s == nil || s.Envelope == nil {
return nil
}
r, err := s.Envelope.Record()
if err != nil {
return nil
}
pr, ok := r.(*peer.PeerRecord)
if !ok {
return nil
}
return pr.Addrs
}

func (mab *memoryAddrBook) maybeDeleteSignedPeerRecordUnlocked(p peer.ID) {
if len(mab.addrs.Addrs[p]) == 0 {
delete(mab.signedPeerRecords, p)
Expand Down
54 changes: 54 additions & 0 deletions p2p/host/peerstore/pstoremem/addr_book_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@ import (
"testing"
"time"

"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/core/record"
"github.com/libp2p/go-libp2p/core/test"
ma "github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -186,6 +190,56 @@ func TestPeerLimits(t *testing.T) {
require.Equal(t, 1024, ab.addrs.NumUnconnectedAddrs())
}

// TestConsumePeerRecordReplacesStaleAddrs verifies replace-semantics on a
// newer signed peer record: addrs dropped from the new record are evicted,
// while unsigned addrs and addrs held by a live connection are kept.
func TestConsumePeerRecordReplacesStaleAddrs(t *testing.T) {
ab := NewAddrBook()
defer ab.Close()

priv, _, err := test.RandTestKeyPair(crypto.Ed25519, 256)
require.NoError(t, err)
id, err := peer.IDFromPrivateKey(priv)
require.NoError(t, err)

keep := ma.StringCast("/ip4/1.2.3.4/tcp/1")
drop := ma.StringCast("/ip4/1.2.3.4/tcp/2")
unsigned := ma.StringCast("/ip4/1.2.3.4/tcp/3")
connected := ma.StringCast("/ip4/1.2.3.4/tcp/4")

rec1 := peer.NewPeerRecord()
rec1.PeerID = id
rec1.Seq = 1
rec1.Addrs = []ma.Multiaddr{keep, drop, connected}
env1, err := record.Seal(rec1, priv)
require.NoError(t, err)

accepted, err := ab.ConsumePeerRecord(env1, time.Hour)
require.NoError(t, err)
require.True(t, accepted)

// Pin `connected` via ConnectedAddrTTL and add an unsigned addr.
ab.AddAddr(id, connected, peerstore.ConnectedAddrTTL)
ab.AddAddr(id, unsigned, time.Hour)
require.ElementsMatch(t, []ma.Multiaddr{keep, drop, connected, unsigned}, ab.Addrs(id))

// Newer record drops `drop` and only mentions `keep`. `drop` must go;
// `unsigned` (never in a signed record) and `connected` (held by a
// live connection) must stay.
rec2 := peer.NewPeerRecord()
rec2.PeerID = id
rec2.Seq = 2
rec2.Addrs = []ma.Multiaddr{keep}
env2, err := record.Seal(rec2, priv)
require.NoError(t, err)

accepted, err = ab.ConsumePeerRecord(env2, time.Hour)
require.NoError(t, err)
require.True(t, accepted)

require.ElementsMatch(t, []ma.Multiaddr{keep, connected, unsigned}, ab.Addrs(id))
}

func BenchmarkPeerAddrs(b *testing.B) {
sizes := [...]int{1, 10, 100, 1000, 10_000, 100_000, 1000_000}
for _, sz := range sizes {
Expand Down
9 changes: 5 additions & 4 deletions p2p/host/peerstore/test/addr_book_suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,8 +475,8 @@ func testCertifiedAddresses(m pstore.AddrBook, clk *mockClock.Mock) func(*testin
t.Error("unable to retrieve signed routing record from addrbook")
}

// Adding a new envelope should clear existing certified addresses.
// Only the newly-added ones should remain
Comment on lines -478 to -479
Copy link
Copy Markdown
Member Author

@lidel lidel Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ℹ️ intention was documented, but never implemented. this PR aims to implement it

this PR does not purge all addrs, only one matching previous signed record, but we could also pivot it to do full clean so only signed addrs remain

(not feeling strongly, whatever feels better)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replacing it makes sense. The full purge is a bigger riskier change I think

// A newer signed record drops addrs the peer no longer advertises.
// Unsigned addrs (added via plain AddAddrs) are retained.
certifiedAddrs = certifiedAddrs[:3]
rec4 := peer.NewPeerRecord()
rec4.PeerID = id
Expand All @@ -488,8 +488,9 @@ func testCertifiedAddresses(m pstore.AddrBook, clk *mockClock.Mock) func(*testin
if !accepted {
t.Error("expected peer record to be accepted")
}
// AssertAddressesEqual(t, certifiedAddrs, m.Addrs(id))
AssertAddressesEqual(t, allAddrs, m.Addrs(id))
expectedAfterRec4 := append([]multiaddr.Multiaddr{}, certifiedAddrs...)
expectedAfterRec4 = append(expectedAfterRec4, uncertifiedAddrs...)
AssertAddressesEqual(t, expectedAfterRec4, m.Addrs(id))

// update TTL on signed addrs to -1 to remove them.
// the signed routing record should be deleted
Expand Down
Loading