@@ -133,6 +133,11 @@ func (r *Receiver) Close(ctx context.Context) error {
133133 r .clientMu .Lock ()
134134 defer r .clientMu .Unlock ()
135135
136+ return r .close (ctx )
137+ }
138+
139+ // closes the session. callers *must* hold the client write lock before calling!
140+ func (r * Receiver ) close (ctx context.Context ) error {
136141 if r .doneListening != nil {
137142 r .doneListening ()
138143 }
@@ -181,7 +186,10 @@ func (r *Receiver) Recover(ctx context.Context) error {
181186 closeCtx , cancel := context .WithTimeout (ctx , 10 * time .Second )
182187 closeCtx = tab .NewContext (closeCtx , span )
183188 defer cancel ()
184- _ = r .Close (closeCtx )
189+ // we must close then rebuild the session/link atomically
190+ r .clientMu .Lock ()
191+ defer r .clientMu .Unlock ()
192+ _ = r .close (closeCtx )
185193 return r .newSessionAndLink (ctx )
186194}
187195
@@ -336,7 +344,9 @@ func (r *Receiver) listenForMessage(ctx context.Context) (*amqp.Message, error)
336344 ctx , span := r .startConsumerSpanFromContext (ctx , "sb.Receiver.listenForMessage" )
337345 defer span .End ()
338346
347+ r .clientMu .RLock ()
339348 msg , err := r .receiver .Receive (ctx )
349+ r .clientMu .RUnlock ()
340350 if err != nil {
341351 tab .For (ctx ).Debug (err .Error ())
342352 return nil , err
@@ -351,13 +361,11 @@ func (r *Receiver) listenForMessage(ctx context.Context) (*amqp.Message, error)
351361}
352362
353363// newSessionAndLink will replace the session and link on the Receiver
364+ // NOTE: this does *not* take the write lock, callers must hold it as required!
354365func (r * Receiver ) newSessionAndLink (ctx context.Context ) error {
355366 ctx , span := r .startConsumerSpanFromContext (ctx , "sb.Receiver.newSessionAndLink" )
356367 defer span .End ()
357368
358- r .clientMu .Lock ()
359- defer r .clientMu .Unlock ()
360-
361369 client , err := r .namespace .newClient ()
362370 if err != nil {
363371 tab .For (ctx ).Error (err )
0 commit comments