Skip to content

Commit 7385b32

Browse files
committed
feat(peerstore): replace stale addrs on newer signed peer record
Treat ConsumePeerRecord as replace-semantics rather than merge: when a peer publishes a newer signed peer record, addrs that were present in the previously stored signed record but omitted from the new one are evicted, so the peerstore reflects the peer's current self-advertised set instead of the accumulated union. Unsigned addrs (e.g. DHT gossip or identify exchanges without a signed record) are untouched, and addrs held by a live connection (TTL >= ConnectedAddrTTL) are kept so active sessions are not torn down. Both backends recover the prior addr set by decoding the stored envelope: pstoremem reuses the Envelope already kept on peerRecordState; pstoreds fetches it via GetPeerRecord and drops superseded addrs with deleteAddrs. Envelope.Record() caches on first access, so the diff is cheap. The shared CertifiedAddresses assertion is updated to match the new behavior.
1 parent 6572945 commit 7385b32

File tree

5 files changed

+263
-8
lines changed

5 files changed

+263
-8
lines changed

p2p/host/peerstore/pstoreds/addr_book.go

Lines changed: 80 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,21 @@ func (ab *dsAddrBook) AddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duratio
282282

283283
// ConsumePeerRecord adds addresses from a signed peer.PeerRecord (contained in
284284
// a record.Envelope), which will expire after the given TTL.
285-
// See https://godoc.org/github.com/libp2p/go-libp2p/core/peerstore#CertifiedAddrBook for more details.
285+
// See https://godoc.org/github.com/libp2p/go-libp2p/core/peerstore#CertifiedAddrBook
286+
// for more details.
287+
//
288+
// The signed peer record's Seq is treated as monotonic per peer: a record
289+
// with a Seq lower than the last accepted one is rejected. Equal Seq is
290+
// accepted as a TTL refresh.
291+
//
292+
// When a newer signed record is accepted, addrs that were present in the
293+
// previously stored signed record but absent in the new one are evicted, so
294+
// the peerstore reflects the peer's current self-advertised set instead of
295+
// the union of every record we have ever seen. Unsigned addrs (added via
296+
// AddAddr / SetAddr from sources like DHT gossip, or from an identify
297+
// exchange where the peer did not send a signed record) are not touched, and
298+
// addrs held by a live connection (TTL >= ConnectedAddrTTL) are also kept so
299+
// active sessions are not dropped.
286300
func (ab *dsAddrBook) ConsumePeerRecord(recordEnvelope *record.Envelope, ttl time.Duration) (bool, error) {
287301
r, err := recordEnvelope.Record()
288302
if err != nil {
@@ -303,6 +317,15 @@ func (ab *dsAddrBook) ConsumePeerRecord(recordEnvelope *record.Envelope, ttl tim
303317
}
304318

305319
addrs := cleanAddrs(rec.Addrs, rec.PeerID)
320+
321+
// Diff against the previously stored signed record so we can drop addrs
322+
// the peer no longer advertises before adding the new ones.
323+
if superseded := ab.supersededSignedAddrs(rec.PeerID, addrs); len(superseded) > 0 {
324+
if err := ab.deleteAddrs(rec.PeerID, superseded); err != nil {
325+
return false, err
326+
}
327+
}
328+
306329
err = ab.setAddrs(rec.PeerID, addrs, ttl, ttlExtend, true)
307330
if err != nil {
308331
return false, err
@@ -315,6 +338,62 @@ func (ab *dsAddrBook) ConsumePeerRecord(recordEnvelope *record.Envelope, ttl tim
315338
return true, nil
316339
}
317340

341+
// supersededSignedAddrs returns addrs that were present in the previously
342+
// stored signed peer record for p but are absent in newAddrs. Addrs held by
343+
// a live connection (TTL >= ConnectedAddrTTL) are excluded so an active
344+
// session is not torn down when the peer rotates its advertised set.
345+
func (ab *dsAddrBook) supersededSignedAddrs(p peer.ID, newAddrs []ma.Multiaddr) []ma.Multiaddr {
346+
prevEnv := ab.GetPeerRecord(p)
347+
if prevEnv == nil {
348+
return nil
349+
}
350+
prev, err := prevEnv.Record()
351+
if err != nil {
352+
return nil
353+
}
354+
prevRec, ok := prev.(*peer.PeerRecord)
355+
if !ok {
356+
return nil
357+
}
358+
359+
newSet := make(map[string]struct{}, len(newAddrs))
360+
for _, a := range newAddrs {
361+
newSet[string(a.Bytes())] = struct{}{}
362+
}
363+
364+
pr, err := ab.loadRecord(p, true, false)
365+
if err != nil {
366+
return nil
367+
}
368+
pr.RLock()
369+
connected := make(map[string]struct{})
370+
for _, a := range pr.Addrs {
371+
if ttlIsConnected(time.Duration(a.Ttl)) {
372+
connected[string(a.Addr)] = struct{}{}
373+
}
374+
}
375+
pr.RUnlock()
376+
377+
var superseded []ma.Multiaddr
378+
for _, a := range prevRec.Addrs {
379+
key := string(a.Bytes())
380+
if _, still := newSet[key]; still {
381+
continue
382+
}
383+
if _, isConn := connected[key]; isConn {
384+
continue
385+
}
386+
superseded = append(superseded, a)
387+
}
388+
return superseded
389+
}
390+
391+
// ttlIsConnected reports whether the given TTL marks the address as held by
392+
// a live connection.
393+
func ttlIsConnected(ttl time.Duration) bool {
394+
return ttl >= pstore.ConnectedAddrTTL
395+
}
396+
318397
func (ab *dsAddrBook) latestPeerRecordSeq(p peer.ID) uint64 {
319398
pr, err := ab.loadRecord(p, true, false)
320399
if err != nil {

p2p/host/peerstore/pstoreds/ds_test.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,17 @@ import (
55
"testing"
66
"time"
77

8+
"github.com/libp2p/go-libp2p/core/crypto"
9+
"github.com/libp2p/go-libp2p/core/peer"
810
pstore "github.com/libp2p/go-libp2p/core/peerstore"
11+
"github.com/libp2p/go-libp2p/core/record"
12+
"github.com/libp2p/go-libp2p/core/test"
913
pt "github.com/libp2p/go-libp2p/p2p/host/peerstore/test"
1014

1115
mockclock "github.com/benbjohnson/clock"
1216
ds "github.com/ipfs/go-datastore"
1317
"github.com/ipfs/go-datastore/sync"
18+
ma "github.com/multiformats/go-multiaddr"
1419
"github.com/stretchr/testify/require"
1520
)
1621

@@ -72,6 +77,60 @@ func TestDsAddrBook(t *testing.T) {
7277
}
7378
}
7479

80+
// TestDsConsumePeerRecordReplacesStaleAddrs verifies replace-semantics on a
81+
// newer signed peer record: addrs dropped from the new record are evicted,
82+
// while unsigned addrs and addrs held by a live connection are kept.
83+
func TestDsConsumePeerRecordReplacesStaleAddrs(t *testing.T) {
84+
for name, dsFactory := range dstores {
85+
t.Run(name, func(t *testing.T) {
86+
opts := DefaultOpts()
87+
store, closeDs := dsFactory(t)
88+
defer closeDs()
89+
ab, err := NewAddrBook(context.Background(), store, opts)
90+
require.NoError(t, err)
91+
defer ab.Close()
92+
93+
priv, _, err := test.RandTestKeyPair(crypto.Ed25519, 256)
94+
require.NoError(t, err)
95+
id, err := peer.IDFromPrivateKey(priv)
96+
require.NoError(t, err)
97+
98+
keep := ma.StringCast("/ip4/1.2.3.4/tcp/1")
99+
drop := ma.StringCast("/ip4/1.2.3.4/tcp/2")
100+
unsigned := ma.StringCast("/ip4/1.2.3.4/tcp/3")
101+
connected := ma.StringCast("/ip4/1.2.3.4/tcp/4")
102+
103+
rec1 := peer.NewPeerRecord()
104+
rec1.PeerID = id
105+
rec1.Seq = 1
106+
rec1.Addrs = []ma.Multiaddr{keep, drop, connected}
107+
env1, err := record.Seal(rec1, priv)
108+
require.NoError(t, err)
109+
110+
accepted, err := ab.ConsumePeerRecord(env1, time.Hour)
111+
require.NoError(t, err)
112+
require.True(t, accepted)
113+
114+
ab.AddAddr(id, connected, pstore.ConnectedAddrTTL)
115+
ab.AddAddr(id, unsigned, time.Hour)
116+
require.ElementsMatch(t, []ma.Multiaddr{keep, drop, connected, unsigned}, ab.Addrs(id))
117+
118+
rec2 := peer.NewPeerRecord()
119+
rec2.PeerID = id
120+
rec2.Seq = 2
121+
rec2.Addrs = []ma.Multiaddr{keep}
122+
env2, err := record.Seal(rec2, priv)
123+
require.NoError(t, err)
124+
125+
accepted, err = ab.ConsumePeerRecord(env2, time.Hour)
126+
require.NoError(t, err)
127+
require.True(t, accepted)
128+
129+
require.ElementsMatch(t, []ma.Multiaddr{keep, connected, unsigned}, ab.Addrs(id))
130+
})
131+
}
132+
}
133+
75134
func TestDsKeyBook(t *testing.T) {
76135
for name, dsFactory := range dstores {
77136
t.Run(name, func(t *testing.T) {

p2p/host/peerstore/pstoremem/addr_book.go

Lines changed: 65 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,9 @@ func ttlIsConnected(ttl time.Duration) bool {
4444

4545
type peerRecordState struct {
4646
Envelope *record.Envelope
47-
Seq uint64
47+
// Seq is the sequence number from the stored signed peer record. Newer
48+
// records (higher Seq) supersede older ones for the same peer.
49+
Seq uint64
4850
}
4951

5052
// Essentially Go stdlib's Priority Queue example
@@ -289,8 +291,23 @@ func (mab *memoryAddrBook) AddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Du
289291
mab.addAddrs(p, addrs, ttl)
290292
}
291293

292-
// ConsumePeerRecord adds addresses from a signed peer.PeerRecord, which will expire after the given TTL.
293-
// See https://godoc.org/github.com/libp2p/go-libp2p/core/peerstore#CertifiedAddrBook for more details.
294+
// ConsumePeerRecord adds addresses from a signed peer.PeerRecord, which will
295+
// expire after the given TTL. See
296+
// https://godoc.org/github.com/libp2p/go-libp2p/core/peerstore#CertifiedAddrBook
297+
// for more details.
298+
//
299+
// The signed peer record's Seq is treated as monotonic per peer: a record with
300+
// a Seq lower than the last accepted one is rejected. Equal Seq is accepted as
301+
// a TTL refresh.
302+
//
303+
// When a newer signed record is accepted, addrs that were present in the
304+
// previously stored signed record but absent in the new one are evicted, so
305+
// the peerstore reflects the peer's current self-advertised set instead of
306+
// the union of every record we have ever seen. Unsigned addrs (added via
307+
// AddAddr / SetAddr from sources like DHT gossip, or from an identify
308+
// exchange where the peer did not send a signed record) are not touched, and
309+
// addrs held by a live connection (TTL >= ConnectedAddrTTL) are also kept so
310+
// active sessions are not dropped.
294311
func (mab *memoryAddrBook) ConsumePeerRecord(recordEnvelope *record.Envelope, ttl time.Duration) (bool, error) {
295312
r, err := recordEnvelope.Record()
296313
if err != nil {
@@ -316,6 +333,33 @@ func (mab *memoryAddrBook) ConsumePeerRecord(recordEnvelope *record.Envelope, tt
316333
if !found && len(mab.signedPeerRecords) >= mab.maxSignedPeerRecords {
317334
return false, errors.New("too many signed peer records")
318335
}
336+
337+
// Drop addrs from the previous signed record that are absent in the
338+
// new one; addrs held by a live connection are preserved so we don't
339+
// drop an active session if the peer rotates its advertised set. The
340+
// prior addr set is recovered by decoding the stored envelope; that
341+
// call caches on first access (core/record/envelope.go), so repeated
342+
// lookups are cheap.
343+
if found {
344+
if prevRec := prevSignedAddrs(lastState); len(prevRec) > 0 {
345+
newAddrSet := make(map[string]struct{}, len(rec.Addrs))
346+
for _, a := range rec.Addrs {
347+
newAddrSet[string(a.Bytes())] = struct{}{}
348+
}
349+
for _, a := range prevRec {
350+
key := string(a.Bytes())
351+
if _, still := newAddrSet[key]; still {
352+
continue
353+
}
354+
ea, ok := mab.addrs.Addrs[rec.PeerID][key]
355+
if !ok || ea.IsConnected() {
356+
continue
357+
}
358+
mab.addrs.Delete(ea)
359+
}
360+
}
361+
}
362+
319363
mab.signedPeerRecords[rec.PeerID] = &peerRecordState{
320364
Envelope: recordEnvelope,
321365
Seq: rec.Seq,
@@ -324,6 +368,24 @@ func (mab *memoryAddrBook) ConsumePeerRecord(recordEnvelope *record.Envelope, tt
324368
return true, nil
325369
}
326370

371+
// prevSignedAddrs returns the addrs from the stored signed peer record, or
372+
// nil if the envelope is absent or can't be decoded. Envelope.Record() caches
373+
// its result, so repeated calls are cheap.
374+
func prevSignedAddrs(s *peerRecordState) []ma.Multiaddr {
375+
if s == nil || s.Envelope == nil {
376+
return nil
377+
}
378+
r, err := s.Envelope.Record()
379+
if err != nil {
380+
return nil
381+
}
382+
pr, ok := r.(*peer.PeerRecord)
383+
if !ok {
384+
return nil
385+
}
386+
return pr.Addrs
387+
}
388+
327389
func (mab *memoryAddrBook) maybeDeleteSignedPeerRecordUnlocked(p peer.ID) {
328390
if len(mab.addrs.Addrs[p]) == 0 {
329391
delete(mab.signedPeerRecords, p)

p2p/host/peerstore/pstoremem/addr_book_test.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,11 @@ import (
88
"testing"
99
"time"
1010

11+
"github.com/libp2p/go-libp2p/core/crypto"
1112
"github.com/libp2p/go-libp2p/core/peer"
13+
"github.com/libp2p/go-libp2p/core/peerstore"
14+
"github.com/libp2p/go-libp2p/core/record"
15+
"github.com/libp2p/go-libp2p/core/test"
1216
ma "github.com/multiformats/go-multiaddr"
1317
"github.com/stretchr/testify/require"
1418
)
@@ -186,6 +190,56 @@ func TestPeerLimits(t *testing.T) {
186190
require.Equal(t, 1024, ab.addrs.NumUnconnectedAddrs())
187191
}
188192

193+
// TestConsumePeerRecordReplacesStaleAddrs verifies replace-semantics on a
194+
// newer signed peer record: addrs dropped from the new record are evicted,
195+
// while unsigned addrs and addrs held by a live connection are kept.
196+
func TestConsumePeerRecordReplacesStaleAddrs(t *testing.T) {
197+
ab := NewAddrBook()
198+
defer ab.Close()
199+
200+
priv, _, err := test.RandTestKeyPair(crypto.Ed25519, 256)
201+
require.NoError(t, err)
202+
id, err := peer.IDFromPrivateKey(priv)
203+
require.NoError(t, err)
204+
205+
keep := ma.StringCast("/ip4/1.2.3.4/tcp/1")
206+
drop := ma.StringCast("/ip4/1.2.3.4/tcp/2")
207+
unsigned := ma.StringCast("/ip4/1.2.3.4/tcp/3")
208+
connected := ma.StringCast("/ip4/1.2.3.4/tcp/4")
209+
210+
rec1 := peer.NewPeerRecord()
211+
rec1.PeerID = id
212+
rec1.Seq = 1
213+
rec1.Addrs = []ma.Multiaddr{keep, drop, connected}
214+
env1, err := record.Seal(rec1, priv)
215+
require.NoError(t, err)
216+
217+
accepted, err := ab.ConsumePeerRecord(env1, time.Hour)
218+
require.NoError(t, err)
219+
require.True(t, accepted)
220+
221+
// Pin `connected` via ConnectedAddrTTL and add an unsigned addr.
222+
ab.AddAddr(id, connected, peerstore.ConnectedAddrTTL)
223+
ab.AddAddr(id, unsigned, time.Hour)
224+
require.ElementsMatch(t, []ma.Multiaddr{keep, drop, connected, unsigned}, ab.Addrs(id))
225+
226+
// Newer record drops `drop` and only mentions `keep`. `drop` must go;
227+
// `unsigned` (never in a signed record) and `connected` (held by a
228+
// live connection) must stay.
229+
rec2 := peer.NewPeerRecord()
230+
rec2.PeerID = id
231+
rec2.Seq = 2
232+
rec2.Addrs = []ma.Multiaddr{keep}
233+
env2, err := record.Seal(rec2, priv)
234+
require.NoError(t, err)
235+
236+
accepted, err = ab.ConsumePeerRecord(env2, time.Hour)
237+
require.NoError(t, err)
238+
require.True(t, accepted)
239+
240+
require.ElementsMatch(t, []ma.Multiaddr{keep, connected, unsigned}, ab.Addrs(id))
241+
}
242+
189243
func BenchmarkPeerAddrs(b *testing.B) {
190244
sizes := [...]int{1, 10, 100, 1000, 10_000, 100_000, 1000_000}
191245
for _, sz := range sizes {

p2p/host/peerstore/test/addr_book_suite.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -475,8 +475,8 @@ func testCertifiedAddresses(m pstore.AddrBook, clk *mockClock.Mock) func(*testin
475475
t.Error("unable to retrieve signed routing record from addrbook")
476476
}
477477

478-
// Adding a new envelope should clear existing certified addresses.
479-
// Only the newly-added ones should remain
478+
// A newer signed record drops addrs the peer no longer advertises.
479+
// Unsigned addrs (added via plain AddAddrs) are retained.
480480
certifiedAddrs = certifiedAddrs[:3]
481481
rec4 := peer.NewPeerRecord()
482482
rec4.PeerID = id
@@ -488,8 +488,9 @@ func testCertifiedAddresses(m pstore.AddrBook, clk *mockClock.Mock) func(*testin
488488
if !accepted {
489489
t.Error("expected peer record to be accepted")
490490
}
491-
// AssertAddressesEqual(t, certifiedAddrs, m.Addrs(id))
492-
AssertAddressesEqual(t, allAddrs, m.Addrs(id))
491+
expectedAfterRec4 := append([]multiaddr.Multiaddr{}, certifiedAddrs...)
492+
expectedAfterRec4 = append(expectedAfterRec4, uncertifiedAddrs...)
493+
AssertAddressesEqual(t, expectedAfterRec4, m.Addrs(id))
493494

494495
// update TTL on signed addrs to -1 to remove them.
495496
// the signed routing record should be deleted

0 commit comments

Comments
 (0)