Skip to content

Commit a4e904e

Browse files
authored
bugfix: graceful consumer shutdown
1 parent ef6a380 commit a4e904e

File tree

1 file changed

+10
-10
lines changed

1 file changed

+10
-10
lines changed

consume.go

+10-10
Original file line numberDiff line numberDiff line change
@@ -84,26 +84,26 @@ func NewConsumer(
8484
// Run starts consuming with automatic reconnection handling. Do not reuse the
8585
// consumer for anything other than to close it.
8686
func (consumer *Consumer) Run(handler Handler) error {
87-
err := consumer.startGoroutines(
88-
handler,
89-
consumer.options,
90-
)
91-
if err != nil {
92-
return err
93-
}
94-
95-
handler = func(d Delivery) (action Action) {
87+
handlerWrapper := func(d Delivery) (action Action) {
9688
if !consumer.handlerMu.TryRLock() {
9789
return NackRequeue
9890
}
9991
defer consumer.handlerMu.RUnlock()
10092
return handler(d)
10193
}
10294

95+
err := consumer.startGoroutines(
96+
handlerWrapper,
97+
consumer.options,
98+
)
99+
if err != nil {
100+
return err
101+
}
102+
103103
for err := range consumer.reconnectErrCh {
104104
consumer.options.Logger.Infof("successful consumer recovery from: %v", err)
105105
err = consumer.startGoroutines(
106-
handler,
106+
handlerWrapper,
107107
consumer.options,
108108
)
109109
if err != nil {

0 commit comments

Comments
 (0)