diff --git a/consume.go b/consume.go index 4517c5d..ad80012 100644 --- a/consume.go +++ b/consume.go @@ -1,6 +1,7 @@ package rabbitmq import ( + "context" "errors" "fmt" "sync" @@ -32,6 +33,7 @@ type Consumer struct { reconnectErrCh <-chan error closeConnectionToManagerCh chan<- struct{} options ConsumerOptions + handlerMux *sync.RWMutex isClosedMux *sync.RWMutex isClosed bool @@ -89,6 +91,14 @@ func (consumer *Consumer) Run(handler Handler) error { return err } + handler = func(d Delivery) (action Action) { + if !consumer.handlerMux.TryRLock() { + return NackRequeue + } + defer consumer.handlerMux.RUnlock() + return handler(d) + } + for err := range consumer.reconnectErrCh { consumer.options.Logger.Infof("successful consumer recovery from: %v", err) err = consumer.startGoroutines( @@ -104,10 +114,26 @@ func (consumer *Consumer) Run(handler Handler) error { } // Close cleans up resources and closes the consumer. +// It waits for handler to finish before returning by default +// (use WithConsumerOptionsForceShutdown option to disable this behavior). +// Use CloseWithContext to specify a context to cancel the handler completion. // It does not close the connection manager, just the subscription // to the connection manager and the consuming goroutines. // Only call once. func (consumer *Consumer) Close() { + if consumer.options.CloseGracefully { + consumer.options.Logger.Infof("waiting for handler to finish...") + err := consumer.waitForHandlerCompletion(context.Background()) + if err != nil { + consumer.options.Logger.Warnf("error while waiting for handler to finish: %v", err) + } + } + + consumer.cleanupResources() + +} + +func (consumer *Consumer) cleanupResources() { consumer.isClosedMux.Lock() defer consumer.isClosedMux.Unlock() consumer.isClosed = true @@ -124,6 +150,24 @@ func (consumer *Consumer) Close() { }() } +// CloseWithContext cleans up resources and closes the consumer. +// It waits for handler to finish before returning +// (use WithConsumerOptionsForceShutdown option to disable this behavior). +// Use the context to cancel the handler completion. +// CloseWithContext does not close the connection manager, just the subscription +// to the connection manager and the consuming goroutines. +// Only call once. +func (consumer *Consumer) CloseWithContext(ctx context.Context) { + if consumer.options.CloseGracefully { + err := consumer.waitForHandlerCompletion(ctx) + if err != nil { + consumer.options.Logger.Warnf("error while waiting for handler to finish: %v", err) + } + } + + consumer.cleanupResources() +} + // startGoroutines declares the queue if it doesn't exist, // binds the queue to the routing key(s), and starts the goroutines // that will consume from the queue @@ -211,3 +255,23 @@ func handlerGoroutine(consumer *Consumer, msgs <-chan amqp.Delivery, consumeOpti } consumer.options.Logger.Infof("rabbit consumer goroutine closed") } + +func (consumer *Consumer) waitForHandlerCompletion(ctx context.Context) error { + if ctx == nil { + ctx = context.Background() + } else if ctx.Err() != nil { + return ctx.Err() + } + c := make(chan struct{}) + go func() { + consumer.handlerMux.Lock() + defer consumer.handlerMux.Unlock() + close(c) + }() + select { + case <-ctx.Done(): + return ctx.Err() + case <-c: + return nil + } +} diff --git a/consumer_options.go b/consumer_options.go index 7de85cb..aa87bdd 100644 --- a/consumer_options.go +++ b/consumer_options.go @@ -28,6 +28,7 @@ func getDefaultConsumerOptions(queueName string) ConsumerOptions { }, ExchangeOptions: []ExchangeOptions{}, Concurrency: 1, + CloseGracefully: true, Logger: stdDebugLogger{}, QOSPrefetch: 10, QOSGlobal: false, @@ -64,6 +65,7 @@ func getDefaultBindingOptions() BindingOptions { type ConsumerOptions struct { RabbitConsumerOptions RabbitConsumerOptions QueueOptions QueueOptions + CloseGracefully bool ExchangeOptions []ExchangeOptions Concurrency int Logger logger.Logger @@ -311,6 +313,12 @@ func WithConsumerOptionsQOSGlobal(options *ConsumerOptions) { options.QOSGlobal = true } +// WithConsumerOptionsForceShutdown tells the consumer to not wait for +// the handler to complete in consumer.Close +func WithConsumerOptionsForceShutdown(options *ConsumerOptions) { + options.CloseGracefully = false +} + // WithConsumerOptionsQueueQuorum sets the queue a quorum type, which means // multiple nodes in the cluster will have the messages distributed amongst them // for higher reliability