diff --git a/core/routing/query.go b/core/routing/query.go index a99eccaef0..0100c70870 100644 --- a/core/routing/query.go +++ b/core/routing/query.go @@ -2,6 +2,7 @@ package routing import ( "context" + "slices" "sync" "github.com/libp2p/go-libp2p/core/peer" @@ -33,6 +34,12 @@ const ( ) // QueryEvent is emitted for every notable event that happens during a DHT query. +// +// Publishers may mutate Responses (and the AddrInfo values it points at) +// freely after calling PublishQueryEvent: the event is deep-copied before +// it reaches subscribers. Subscribers must still treat their copy as +// read-only, since events are fanned out by pointer to a single consumer +// channel. type QueryEvent struct { ID peer.ID Type QueryEventType @@ -92,6 +99,10 @@ func RegisterForQueryEvents(ctx context.Context) (context.Context, <-chan *Query // PublishQueryEvent publishes a query event to the query event channel // associated with the given context, if any. +// +// The event's Responses slice (and each AddrInfo.Addrs slice it points +// at) is deep-copied before delivery, so the caller can safely keep +// mutating its own copy after this call returns. func PublishQueryEvent(ctx context.Context, ev *QueryEvent) { ich := ctx.Value(routingQueryKey{}) if ich == nil { @@ -100,7 +111,31 @@ func PublishQueryEvent(ctx context.Context, ev *QueryEvent) { // We *want* to panic here. ech := ich.(*eventChannel) - ech.send(ctx, ev) + ech.send(ctx, cloneForPublish(ev)) +} + +// cloneForPublish returns ev with Responses (and each AddrInfo.Addrs) +// replaced by independent copies. Without this, a publisher mutating its +// AddrInfo slice after PublishQueryEvent returns would race with any +// subscriber reading Responses. +// +// The deeper Multiaddr values are treated as immutable by convention and +// are not copied. +func cloneForPublish(ev *QueryEvent) *QueryEvent { + if len(ev.Responses) == 0 { + return ev + } + out := *ev + out.Responses = make([]*peer.AddrInfo, len(ev.Responses)) + for i, ai := range ev.Responses { + if ai == nil { + continue + } + cp := *ai + cp.Addrs = slices.Clone(ai.Addrs) + out.Responses[i] = &cp + } + return &out } // SubscribesToQueryEvents returns true if the context subscribes to query diff --git a/core/routing/query_race_test.go b/core/routing/query_race_test.go new file mode 100644 index 0000000000..4b85d58ffd --- /dev/null +++ b/core/routing/query_race_test.go @@ -0,0 +1,51 @@ +package routing + +import ( + "context" + "testing" + + "github.com/libp2p/go-libp2p/core/peer" + ma "github.com/multiformats/go-multiaddr" +) + +// TestPublishQueryEventResponsesRace shows that PublishQueryEvent forwards +// QueryEvent.Responses to subscribers by pointer. A publisher that mutates +// the AddrInfo it handed over (e.g. appending to AddrInfo.Addrs) races +// with any consumer reading Responses. +// +// This is a real hazard: DHT implementations publish a []*peer.AddrInfo +// of "closer peers" as a PeerResponse event and then continue processing +// the same slice, enriching AddrInfo.Addrs from a peerstore. +// +// Expected to fail under -race. +func TestPublishQueryEventResponsesRace(t *testing.T) { + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + ctx, events := RegisterForQueryEvents(ctx) + + ai := &peer.AddrInfo{ + Addrs: []ma.Multiaddr{ma.StringCast("/ip4/1.2.3.4/tcp/4001")}, + } + more := ma.StringCast("/ip4/5.6.7.8/tcp/4001") + + done := make(chan struct{}) + go func() { + defer close(done) + for ev := range events { + for _, pi := range ev.Responses { + _ = len(pi.Addrs) // racy read of the AddrInfo.Addrs header + } + } + }() + + for range 1000 { + PublishQueryEvent(ctx, &QueryEvent{ + Type: PeerResponse, + Responses: []*peer.AddrInfo{ai}, + }) + ai.Addrs = append(ai.Addrs, more) + } + + cancel() + <-done +}