Skip to content

Commit 456bec0

Browse files
authored
Merge pull request #166 from thibleroy/main
Consumer: close gracefully
2 parents 249fe54 + 0e96881 commit 456bec0

File tree

2 files changed

+72
-0
lines changed

2 files changed

+72
-0
lines changed

consume.go

+64
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package rabbitmq
22

33
import (
4+
"context"
45
"errors"
56
"fmt"
67
"sync"
@@ -32,6 +33,7 @@ type Consumer struct {
3233
reconnectErrCh <-chan error
3334
closeConnectionToManagerCh chan<- struct{}
3435
options ConsumerOptions
36+
handlerMux *sync.RWMutex
3537

3638
isClosedMux *sync.RWMutex
3739
isClosed bool
@@ -89,6 +91,14 @@ func (consumer *Consumer) Run(handler Handler) error {
8991
return err
9092
}
9193

94+
handler = func(d Delivery) (action Action) {
95+
if !consumer.handlerMux.TryRLock() {
96+
return NackRequeue
97+
}
98+
defer consumer.handlerMux.RUnlock()
99+
return handler(d)
100+
}
101+
92102
for err := range consumer.reconnectErrCh {
93103
consumer.options.Logger.Infof("successful consumer recovery from: %v", err)
94104
err = consumer.startGoroutines(
@@ -104,10 +114,26 @@ func (consumer *Consumer) Run(handler Handler) error {
104114
}
105115

106116
// Close cleans up resources and closes the consumer.
117+
// It waits for handler to finish before returning by default
118+
// (use WithConsumerOptionsForceShutdown option to disable this behavior).
119+
// Use CloseWithContext to specify a context to cancel the handler completion.
107120
// It does not close the connection manager, just the subscription
108121
// to the connection manager and the consuming goroutines.
109122
// Only call once.
110123
func (consumer *Consumer) Close() {
124+
if consumer.options.CloseGracefully {
125+
consumer.options.Logger.Infof("waiting for handler to finish...")
126+
err := consumer.waitForHandlerCompletion(context.Background())
127+
if err != nil {
128+
consumer.options.Logger.Warnf("error while waiting for handler to finish: %v", err)
129+
}
130+
}
131+
132+
consumer.cleanupResources()
133+
134+
}
135+
136+
func (consumer *Consumer) cleanupResources() {
111137
consumer.isClosedMux.Lock()
112138
defer consumer.isClosedMux.Unlock()
113139
consumer.isClosed = true
@@ -124,6 +150,24 @@ func (consumer *Consumer) Close() {
124150
}()
125151
}
126152

153+
// CloseWithContext cleans up resources and closes the consumer.
154+
// It waits for handler to finish before returning
155+
// (use WithConsumerOptionsForceShutdown option to disable this behavior).
156+
// Use the context to cancel the handler completion.
157+
// CloseWithContext does not close the connection manager, just the subscription
158+
// to the connection manager and the consuming goroutines.
159+
// Only call once.
160+
func (consumer *Consumer) CloseWithContext(ctx context.Context) {
161+
if consumer.options.CloseGracefully {
162+
err := consumer.waitForHandlerCompletion(ctx)
163+
if err != nil {
164+
consumer.options.Logger.Warnf("error while waiting for handler to finish: %v", err)
165+
}
166+
}
167+
168+
consumer.cleanupResources()
169+
}
170+
127171
// startGoroutines declares the queue if it doesn't exist,
128172
// binds the queue to the routing key(s), and starts the goroutines
129173
// that will consume from the queue
@@ -213,3 +257,23 @@ func handlerGoroutine(consumer *Consumer, msgs <-chan amqp.Delivery, consumeOpti
213257
}
214258
consumer.options.Logger.Infof("rabbit consumer goroutine closed")
215259
}
260+
261+
func (consumer *Consumer) waitForHandlerCompletion(ctx context.Context) error {
262+
if ctx == nil {
263+
ctx = context.Background()
264+
} else if ctx.Err() != nil {
265+
return ctx.Err()
266+
}
267+
c := make(chan struct{})
268+
go func() {
269+
consumer.handlerMux.Lock()
270+
defer consumer.handlerMux.Unlock()
271+
close(c)
272+
}()
273+
select {
274+
case <-ctx.Done():
275+
return ctx.Err()
276+
case <-c:
277+
return nil
278+
}
279+
}

consumer_options.go

+8
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ func getDefaultConsumerOptions(queueName string) ConsumerOptions {
2828
},
2929
ExchangeOptions: []ExchangeOptions{},
3030
Concurrency: 1,
31+
CloseGracefully: true,
3132
Logger: stdDebugLogger{},
3233
QOSPrefetch: 10,
3334
QOSGlobal: false,
@@ -64,6 +65,7 @@ func getDefaultBindingOptions() BindingOptions {
6465
type ConsumerOptions struct {
6566
RabbitConsumerOptions RabbitConsumerOptions
6667
QueueOptions QueueOptions
68+
CloseGracefully bool
6769
ExchangeOptions []ExchangeOptions
6870
Concurrency int
6971
Logger logger.Logger
@@ -311,6 +313,12 @@ func WithConsumerOptionsQOSGlobal(options *ConsumerOptions) {
311313
options.QOSGlobal = true
312314
}
313315

316+
// WithConsumerOptionsForceShutdown tells the consumer to not wait for
317+
// the handler to complete in consumer.Close
318+
func WithConsumerOptionsForceShutdown(options *ConsumerOptions) {
319+
options.CloseGracefully = false
320+
}
321+
314322
// WithConsumerOptionsQueueQuorum sets the queue a quorum type, which means
315323
// multiple nodes in the cluster will have the messages distributed amongst them
316324
// for higher reliability

0 commit comments

Comments
 (0)