Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 36 additions & 1 deletion core/routing/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package routing

import (
"context"
"slices"
"sync"

"github.com/libp2p/go-libp2p/core/peer"
Expand Down Expand Up @@ -33,6 +34,12 @@ const (
)

// QueryEvent is emitted for every notable event that happens during a DHT query.
//
// Publishers may mutate Responses (and the AddrInfo values it points at)
// freely after calling PublishQueryEvent: the event is deep-copied before
// it reaches subscribers. Subscribers must still treat their copy as
// read-only, since events are fanned out by pointer to a single consumer
// channel.
type QueryEvent struct {
ID peer.ID
Type QueryEventType
Expand Down Expand Up @@ -92,6 +99,10 @@ func RegisterForQueryEvents(ctx context.Context) (context.Context, <-chan *Query

// PublishQueryEvent publishes a query event to the query event channel
// associated with the given context, if any.
//
// The event's Responses slice (and each AddrInfo.Addrs slice it points
// at) is deep-copied before delivery, so the caller can safely keep
// mutating its own copy after this call returns.
func PublishQueryEvent(ctx context.Context, ev *QueryEvent) {
ich := ctx.Value(routingQueryKey{})
if ich == nil {
Expand All @@ -100,7 +111,31 @@ func PublishQueryEvent(ctx context.Context, ev *QueryEvent) {

// We *want* to panic here.
ech := ich.(*eventChannel)
ech.send(ctx, ev)
ech.send(ctx, cloneForPublish(ev))
}

// cloneForPublish returns ev with Responses (and each AddrInfo.Addrs)
// replaced by independent copies. Without this, a publisher mutating its
// AddrInfo slice after PublishQueryEvent returns would race with any
// subscriber reading Responses.
//
// The deeper Multiaddr values are treated as immutable by convention and
// are not copied.
func cloneForPublish(ev *QueryEvent) *QueryEvent {
if len(ev.Responses) == 0 {
return ev
}
out := *ev
out.Responses = make([]*peer.AddrInfo, len(ev.Responses))
for i, ai := range ev.Responses {
if ai == nil {
continue
}
cp := *ai
cp.Addrs = slices.Clone(ai.Addrs)
out.Responses[i] = &cp
}
return &out
}

// SubscribesToQueryEvents returns true if the context subscribes to query
Expand Down
51 changes: 51 additions & 0 deletions core/routing/query_race_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package routing

import (
"context"
"testing"

"github.com/libp2p/go-libp2p/core/peer"
ma "github.com/multiformats/go-multiaddr"
)

// TestPublishQueryEventResponsesRace shows that PublishQueryEvent forwards
// QueryEvent.Responses to subscribers by pointer. A publisher that mutates
// the AddrInfo it handed over (e.g. appending to AddrInfo.Addrs) races
// with any consumer reading Responses.
//
// This is a real hazard: DHT implementations publish a []*peer.AddrInfo
// of "closer peers" as a PeerResponse event and then continue processing
// the same slice, enriching AddrInfo.Addrs from a peerstore.
//
// Expected to fail under -race.
func TestPublishQueryEventResponsesRace(t *testing.T) {
ctx, cancel := context.WithCancel(t.Context())
defer cancel()
ctx, events := RegisterForQueryEvents(ctx)

ai := &peer.AddrInfo{
Addrs: []ma.Multiaddr{ma.StringCast("/ip4/1.2.3.4/tcp/4001")},
}
more := ma.StringCast("/ip4/5.6.7.8/tcp/4001")

done := make(chan struct{})
go func() {
defer close(done)
for ev := range events {
for _, pi := range ev.Responses {
_ = len(pi.Addrs) // racy read of the AddrInfo.Addrs header
}
}
}()

for range 1000 {
PublishQueryEvent(ctx, &QueryEvent{
Type: PeerResponse,
Responses: []*peer.AddrInfo{ai},
})
ai.Addrs = append(ai.Addrs, more)
}

cancel()
<-done
}
Loading