Skip to content

Commit 8f09050

Browse files
author
Thibault Leroy
committed
fix(shutdown): lock in handlerGoroutine
1 parent ef6a380 commit 8f09050

File tree

1 file changed

+10
-8
lines changed

1 file changed

+10
-8
lines changed

consume.go

+10-8
Original file line numberDiff line numberDiff line change
@@ -92,14 +92,6 @@ func (consumer *Consumer) Run(handler Handler) error {
9292
return err
9393
}
9494

95-
handler = func(d Delivery) (action Action) {
96-
if !consumer.handlerMu.TryRLock() {
97-
return NackRequeue
98-
}
99-
defer consumer.handlerMu.RUnlock()
100-
return handler(d)
101-
}
102-
10395
for err := range consumer.reconnectErrCh {
10496
consumer.options.Logger.Infof("successful consumer recovery from: %v", err)
10597
err = consumer.startGoroutines(
@@ -233,8 +225,17 @@ func handlerGoroutine(consumer *Consumer, msgs <-chan amqp.Delivery, consumeOpti
233225
break
234226
}
235227

228+
if !consumer.handlerMu.TryRLock() {
229+
err := msg.Nack(false, true)
230+
if err != nil {
231+
consumer.options.Logger.Errorf("can't nack message: %v", err)
232+
}
233+
continue
234+
}
235+
236236
if consumeOptions.RabbitConsumerOptions.AutoAck {
237237
handler(Delivery{msg})
238+
consumer.handlerMu.RUnlock()
238239
continue
239240
}
240241

@@ -255,6 +256,7 @@ func handlerGoroutine(consumer *Consumer, msgs <-chan amqp.Delivery, consumeOpti
255256
consumer.options.Logger.Errorf("can't nack message: %v", err)
256257
}
257258
}
259+
consumer.handlerMu.RUnlock()
258260
}
259261
consumer.options.Logger.Infof("rabbit consumer goroutine closed")
260262
}

0 commit comments

Comments
 (0)