Skip to content
This repository was archived by the owner on Oct 12, 2023. It is now read-only.

Commit 27d94af

Browse files
authored
Fix hang when closing reciever (#188)
The fix in (#187) introduced a hang when closing the receiver. Don't hold the read lock before calling Receive() as it blocks. Return a connection closed error.
1 parent 4ace8d4 commit 27d94af

File tree

1 file changed

+16
-1
lines changed

1 file changed

+16
-1
lines changed

receiver.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -344,9 +344,14 @@ func (r *Receiver) listenForMessage(ctx context.Context) (*amqp.Message, error)
344344
ctx, span := r.startConsumerSpanFromContext(ctx, "sb.Receiver.listenForMessage")
345345
defer span.End()
346346

347+
var receiver *amqp.Receiver
347348
r.clientMu.RLock()
348-
msg, err := r.receiver.Receive(ctx)
349+
if r.receiver == nil {
350+
return nil, r.connClosedError(ctx)
351+
}
352+
receiver = r.receiver
349353
r.clientMu.RUnlock()
354+
msg, err := receiver.Receive(ctx)
350355
if err != nil {
351356
tab.For(ctx).Debug(err.Error())
352357
return nil, err
@@ -360,6 +365,16 @@ func (r *Receiver) listenForMessage(ctx context.Context) (*amqp.Message, error)
360365
return msg, nil
361366
}
362367

368+
func (r *Receiver) connClosedError(ctx context.Context) error {
369+
name := "Receiver"
370+
if r.Name != "" {
371+
name = r.Name
372+
}
373+
err := ErrConnectionClosed(name)
374+
tab.For(ctx).Error(err)
375+
return err
376+
}
377+
363378
// newSessionAndLink will replace the session and link on the Receiver
364379
// NOTE: this does *not* take the write lock, callers must hold it as required!
365380
func (r *Receiver) newSessionAndLink(ctx context.Context) error {

0 commit comments

Comments
 (0)