diff --git a/consume.go b/consume.go index 8d9b788..703ad37 100644 --- a/consume.go +++ b/consume.go @@ -92,14 +92,6 @@ func (consumer *Consumer) Run(handler Handler) error { return err } - handler = func(d Delivery) (action Action) { - if !consumer.handlerMu.TryRLock() { - return NackRequeue - } - defer consumer.handlerMu.RUnlock() - return handler(d) - } - for err := range consumer.reconnectErrCh { consumer.options.Logger.Infof("successful consumer recovery from: %v", err) err = consumer.startGoroutines( @@ -233,8 +225,17 @@ func handlerGoroutine(consumer *Consumer, msgs <-chan amqp.Delivery, consumeOpti break } + if !consumer.handlerMu.TryRLock() { + err := msg.Nack(false, true) + if err != nil { + consumer.options.Logger.Errorf("can't nack message: %v", err) + } + continue + } + if consumeOptions.RabbitConsumerOptions.AutoAck { handler(Delivery{msg}) + consumer.handlerMu.RUnlock() continue } @@ -255,6 +256,7 @@ func handlerGoroutine(consumer *Consumer, msgs <-chan amqp.Delivery, consumeOpti consumer.options.Logger.Errorf("can't nack message: %v", err) } } + consumer.handlerMu.RUnlock() } consumer.options.Logger.Infof("rabbit consumer goroutine closed") }