@@ -2,13 +2,15 @@ package providerquerymanager
22
33import (
44 "context"
5- "fmt"
65 "sync"
76 "time"
87
8+ "github.com/ipfs/boxo/bitswap/client/internal"
99 "github.com/ipfs/go-cid"
1010 logging "github.com/ipfs/go-log/v2"
1111 peer "github.com/libp2p/go-libp2p/core/peer"
12+ "go.opentelemetry.io/otel/attribute"
13+ "go.opentelemetry.io/otel/trace"
1214)
1315
1416var log = logging .Logger ("bitswap" )
@@ -39,7 +41,7 @@ type ProviderQueryNetwork interface {
3941}
4042
4143type providerQueryMessage interface {
42- debugMessage () string
44+ debugMessage ()
4345 handle (pqm * ProviderQueryManager )
4446}
4547
@@ -61,6 +63,7 @@ type newProvideQueryMessage struct {
6163}
6264
6365type cancelRequestMessage struct {
66+ ctx context.Context
6467 incomingProviders chan peer.ID
6568 k cid.Cid
6669}
@@ -121,6 +124,10 @@ func (pqm *ProviderQueryManager) SetFindProviderTimeout(findProviderTimeout time
121124func (pqm * ProviderQueryManager ) FindProvidersAsync (sessionCtx context.Context , k cid.Cid ) <- chan peer.ID {
122125 inProgressRequestChan := make (chan inProgressRequest )
123126
127+ var span trace.Span
128+ sessionCtx , span = internal .StartSpan (sessionCtx , "ProviderQueryManager.FindProvidersAsync" , trace .WithAttributes (attribute .Stringer ("cid" , k )))
129+ defer span .End ()
130+
124131 select {
125132 case pqm .providerQueryMessages <- & newProvideQueryMessage {
126133 ctx : sessionCtx ,
@@ -182,7 +189,7 @@ func (pqm *ProviderQueryManager) receiveProviders(sessionCtx context.Context, k
182189 return
183190 case <- sessionCtx .Done ():
184191 if incomingProviders != nil {
185- pqm .cancelProviderRequest (k , incomingProviders )
192+ pqm .cancelProviderRequest (sessionCtx , k , incomingProviders )
186193 }
187194 return
188195 case provider , ok := <- incomingProviders :
@@ -199,11 +206,12 @@ func (pqm *ProviderQueryManager) receiveProviders(sessionCtx context.Context, k
199206 return returnedProviders
200207}
201208
202- func (pqm * ProviderQueryManager ) cancelProviderRequest (k cid.Cid , incomingProviders chan peer.ID ) {
209+ func (pqm * ProviderQueryManager ) cancelProviderRequest (ctx context. Context , k cid.Cid , incomingProviders chan peer.ID ) {
203210 cancelMessageChannel := pqm .providerQueryMessages
204211 for {
205212 select {
206213 case cancelMessageChannel <- & cancelRequestMessage {
214+ ctx : ctx ,
207215 incomingProviders : incomingProviders ,
208216 k : k ,
209217 }:
@@ -235,17 +243,22 @@ func (pqm *ProviderQueryManager) findProviderWorker() {
235243 pqm .timeoutMutex .RLock ()
236244 findProviderCtx , cancel := context .WithTimeout (fpr .ctx , pqm .findProviderTimeout )
237245 pqm .timeoutMutex .RUnlock ()
246+ span := trace .SpanFromContext (findProviderCtx )
247+ span .AddEvent ("StartFindProvidersAsync" )
238248 providers := pqm .network .FindProvidersAsync (findProviderCtx , k , maxProviders )
239249 wg := & sync.WaitGroup {}
240250 for p := range providers {
241251 wg .Add (1 )
242252 go func (p peer.ID ) {
243253 defer wg .Done ()
254+ span .AddEvent ("FoundProvider" , trace .WithAttributes (attribute .Stringer ("peer" , p )))
244255 err := pqm .network .ConnectTo (findProviderCtx , p )
245256 if err != nil {
257+ span .RecordError (err , trace .WithAttributes (attribute .Stringer ("peer" , p )))
246258 log .Debugf ("failed to connect to provider %s: %s" , p , err )
247259 return
248260 }
261+ span .AddEvent ("ConnectedToProvider" , trace .WithAttributes (attribute .Stringer ("peer" , p )))
249262 select {
250263 case pqm .providerQueryMessages <- & receivedProviderMessage {
251264 ctx : findProviderCtx ,
@@ -326,16 +339,17 @@ func (pqm *ProviderQueryManager) run() {
326339 for {
327340 select {
328341 case nextMessage := <- pqm .providerQueryMessages :
329- log . Debug ( nextMessage .debugMessage () )
342+ nextMessage .debugMessage ()
330343 nextMessage .handle (pqm )
331344 case <- pqm .ctx .Done ():
332345 return
333346 }
334347 }
335348}
336349
337- func (rpm * receivedProviderMessage ) debugMessage () string {
338- return fmt .Sprintf ("Received provider (%s) for cid (%s)" , rpm .p .String (), rpm .k .String ())
350+ func (rpm * receivedProviderMessage ) debugMessage () {
351+ log .Debugf ("Received provider (%s) (%s)" , rpm .p , rpm .k )
352+ trace .SpanFromContext (rpm .ctx ).AddEvent ("ReceivedProvider" , trace .WithAttributes (attribute .Stringer ("provider" , rpm .p ), attribute .Stringer ("cid" , rpm .k )))
339353}
340354
341355func (rpm * receivedProviderMessage ) handle (pqm * ProviderQueryManager ) {
@@ -354,8 +368,9 @@ func (rpm *receivedProviderMessage) handle(pqm *ProviderQueryManager) {
354368 }
355369}
356370
357- func (fpqm * finishedProviderQueryMessage ) debugMessage () string {
358- return "Finished Provider Query on cid: " + fpqm .k .String ()
371+ func (fpqm * finishedProviderQueryMessage ) debugMessage () {
372+ log .Debugf ("Finished Provider Query on cid: %s" , fpqm .k )
373+ trace .SpanFromContext (fpqm .ctx ).AddEvent ("FinishedProviderQuery" , trace .WithAttributes (attribute .Stringer ("cid" , fpqm .k )))
359374}
360375
361376func (fpqm * finishedProviderQueryMessage ) handle (pqm * ProviderQueryManager ) {
@@ -371,21 +386,28 @@ func (fpqm *finishedProviderQueryMessage) handle(pqm *ProviderQueryManager) {
371386 requestStatus .cancelFn ()
372387}
373388
374- func (npqm * newProvideQueryMessage ) debugMessage () string {
375- return "New Provider Query on cid: " + npqm .k .String ()
389+ func (npqm * newProvideQueryMessage ) debugMessage () {
390+ log .Debugf ("New Provider Query on cid: %s" , npqm .k )
391+ trace .SpanFromContext (npqm .ctx ).AddEvent ("NewProvideQuery" , trace .WithAttributes (attribute .Stringer ("cid" , npqm .k )))
376392}
377393
378394func (npqm * newProvideQueryMessage ) handle (pqm * ProviderQueryManager ) {
379395 requestStatus , ok := pqm .inProgressRequestStatuses [npqm .k ]
380396 if ! ok {
381397
382398 ctx , cancelFn := context .WithCancel (pqm .ctx )
399+ span := trace .SpanFromContext (npqm .ctx )
400+ span .AddEvent ("NewQuery" , trace .WithAttributes (attribute .Stringer ("cid" , npqm .k )))
401+ ctx = trace .ContextWithSpan (ctx , span )
402+
383403 requestStatus = & inProgressRequestStatus {
384404 listeners : make (map [chan peer.ID ]struct {}),
385405 ctx : ctx ,
386406 cancelFn : cancelFn ,
387407 }
408+
388409 pqm .inProgressRequestStatuses [npqm .k ] = requestStatus
410+
389411 select {
390412 case pqm .incomingFindProviderRequests <- & findProviderRequest {
391413 k : npqm .k ,
@@ -394,6 +416,8 @@ func (npqm *newProvideQueryMessage) handle(pqm *ProviderQueryManager) {
394416 case <- pqm .ctx .Done ():
395417 return
396418 }
419+ } else {
420+ trace .SpanFromContext (npqm .ctx ).AddEvent ("JoinQuery" , trace .WithAttributes (attribute .Stringer ("cid" , npqm .k )))
397421 }
398422 inProgressChan := make (chan peer.ID )
399423 requestStatus .listeners [inProgressChan ] = struct {}{}
@@ -406,8 +430,9 @@ func (npqm *newProvideQueryMessage) handle(pqm *ProviderQueryManager) {
406430 }
407431}
408432
409- func (crm * cancelRequestMessage ) debugMessage () string {
410- return "Cancel provider query on cid: " + crm .k .String ()
433+ func (crm * cancelRequestMessage ) debugMessage () {
434+ log .Debugf ("Cancel provider query on cid: %s" , crm .k )
435+ trace .SpanFromContext (crm .ctx ).AddEvent ("CancelRequest" , trace .WithAttributes (attribute .Stringer ("cid" , crm .k )))
411436}
412437
413438func (crm * cancelRequestMessage ) handle (pqm * ProviderQueryManager ) {
0 commit comments