File tree 1 file changed +10
-10
lines changed
1 file changed +10
-10
lines changed Original file line number Diff line number Diff line change @@ -84,26 +84,26 @@ func NewConsumer(
84
84
// Run starts consuming with automatic reconnection handling. Do not reuse the
85
85
// consumer for anything other than to close it.
86
86
func (consumer * Consumer ) Run (handler Handler ) error {
87
- err := consumer .startGoroutines (
88
- handler ,
89
- consumer .options ,
90
- )
91
- if err != nil {
92
- return err
93
- }
94
-
95
- handler = func (d Delivery ) (action Action ) {
87
+ handlerWrapper := func (d Delivery ) (action Action ) {
96
88
if ! consumer .handlerMu .TryRLock () {
97
89
return NackRequeue
98
90
}
99
91
defer consumer .handlerMu .RUnlock ()
100
92
return handler (d )
101
93
}
102
94
95
+ err := consumer .startGoroutines (
96
+ handlerWrapper ,
97
+ consumer .options ,
98
+ )
99
+ if err != nil {
100
+ return err
101
+ }
102
+
103
103
for err := range consumer .reconnectErrCh {
104
104
consumer .options .Logger .Infof ("successful consumer recovery from: %v" , err )
105
105
err = consumer .startGoroutines (
106
- handler ,
106
+ handlerWrapper ,
107
107
consumer .options ,
108
108
)
109
109
if err != nil {
You can’t perform that action at this time.
0 commit comments