Skip to content

Commit 1456893

Browse files
committed
Better error handling
Don't report consumers as ready until subscription succeeded
1 parent 41eba6e commit 1456893

4 files changed

Lines changed: 122 additions & 110 deletions

File tree

pkg/amqp091/consumer.go

Lines changed: 32 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ func (c *Amqp091Consumer) Connect() {
6161
c.Channel = nil
6262
c.Connection = nil
6363

64-
for c.Connection == nil {
64+
utils.Retry(c.ctx, config.ReconnectDelay, func() bool {
6565
uri := utils.NextURI(c.Config.ConsumerUri, &c.whichUri)
6666
dialCfg := amqp091.Config{
6767
Properties: amqp091.Table{
@@ -75,34 +75,26 @@ func (c *Amqp091Consumer) Connect() {
7575
conn, err := amqp091.DialConfig(uri, dialCfg)
7676
if err != nil {
7777
log.Error("consumer connection failed", "id", c.Id, "error", err.Error())
78-
select {
79-
case <-c.ctx.Done():
80-
return
81-
case <-time.After(config.ReconnectDelay):
82-
continue
83-
}
84-
} else {
85-
log.Debug("consumer connected", "id", c.Id, "uri", uri)
86-
c.Connection = conn
78+
return false
8779
}
80+
log.Debug("consumer connected", "id", c.Id, "uri", uri)
81+
c.Connection = conn
82+
return true
83+
})
84+
85+
if c.Connection == nil {
86+
return
8887
}
8988

90-
for c.Channel == nil {
89+
utils.Retry(c.ctx, config.ReconnectDelay, func() bool {
9190
channel, err := c.Connection.Channel()
9291
if err != nil {
93-
if err == context.Canceled {
94-
return
95-
}
9692
log.Error("consumer failed to create a channel", "id", c.Id, "error", err.Error())
97-
select {
98-
case <-c.ctx.Done():
99-
return
100-
case <-time.After(config.ReconnectDelay):
101-
}
102-
} else {
103-
c.Channel = channel
93+
return false
10494
}
105-
}
95+
c.Channel = channel
96+
return true
97+
})
10698
}
10799

108100
func (c *Amqp091Consumer) Subscribe() {
@@ -165,7 +157,19 @@ func (c *Amqp091Consumer) Subscribe() {
165157
}
166158

167159
func (c *Amqp091Consumer) Start(consumerReady chan bool) {
168-
c.Subscribe()
160+
if !utils.Retry(c.ctx, config.ReconnectDelay, func() bool {
161+
c.Subscribe()
162+
if c.Messages != nil {
163+
return true
164+
}
165+
log.Error("subscription failed, reconnecting", "id", c.Id, "terminus", c.Terminus)
166+
c.Connect()
167+
return false
168+
}) {
169+
close(consumerReady)
170+
c.Stop("context cancelled")
171+
return
172+
}
169173
close(consumerReady)
170174
log.Info("consumer started", "id", c.Id, "terminus", c.Terminus)
171175
var oooTracker *utils.OutOfOrderTracker
@@ -174,13 +178,13 @@ func (c *Amqp091Consumer) Start(consumerReady chan bool) {
174178
}
175179

176180
for i := 1; i <= c.Config.ConsumeCount; {
177-
for c.Messages == nil {
178-
select {
179-
case <-c.ctx.Done():
181+
if c.Messages == nil {
182+
if !utils.Retry(c.ctx, config.ReconnectDelay, func() bool {
183+
c.Subscribe()
184+
return c.Messages != nil
185+
}) {
180186
c.Stop("context cancelled")
181187
return
182-
default:
183-
c.Subscribe()
184188
}
185189
}
186190

pkg/amqp10/consumer.go

Lines changed: 49 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ func (c *Amqp10Consumer) Connect() {
6565
c.Session = nil
6666
c.Connection = nil
6767

68-
for c.Connection == nil {
68+
utils.Retry(c.ctx, config.ReconnectDelay, func() bool {
6969
uri := utils.NextURI(c.Config.ConsumerUri, &c.whichUri)
7070
hostname, vhost := hostAndVHost(uri)
7171
conn, err := amqp.Dial(c.ctx, uri, &amqp.ConnOptions{
@@ -80,33 +80,26 @@ func (c *Amqp10Consumer) Connect() {
8080
})
8181
if err != nil {
8282
log.Error("consumer failed to connect", "id", c.Id, "error", err.Error())
83-
select {
84-
case <-c.ctx.Done():
85-
return
86-
case <-time.After(config.ReconnectDelay):
87-
}
88-
} else {
89-
log.Debug("consumer connected", "id", c.Id, "uri", uri)
90-
c.Connection = conn
83+
return false
9184
}
85+
log.Debug("consumer connected", "id", c.Id, "uri", uri)
86+
c.Connection = conn
87+
return true
88+
})
89+
90+
if c.Connection == nil {
91+
return
9292
}
9393

94-
for c.Session == nil {
94+
utils.Retry(c.ctx, config.ReconnectDelay, func() bool {
9595
session, err := c.Connection.NewSession(c.ctx, nil)
9696
if err != nil {
97-
if err == context.Canceled {
98-
return
99-
}
10097
log.Error("consumer failed to create a session", "id", c.Id, "error", err.Error())
101-
select {
102-
case <-c.ctx.Done():
103-
return
104-
case <-time.After(config.ReconnectDelay):
105-
}
106-
} else {
107-
c.Session = session
98+
return false
10899
}
109-
}
100+
c.Session = session
101+
return true
102+
})
110103
}
111104

112105
func (c *Amqp10Consumer) CreateReceiver(ctx context.Context) {
@@ -120,53 +113,45 @@ func (c *Amqp10Consumer) CreateReceiver(ctx context.Context) {
120113
durability = amqp.DurabilityUnsettledState
121114
}
122115

123-
for c.Receiver == nil && c.Session != nil {
124-
select {
125-
case <-ctx.Done():
126-
return
127-
default:
128-
linkProperties := buildLinkProperties(c.Config, c.Id)
129-
receiverOpts := &amqp.ReceiverOptions{
130-
SourceDurability: durability,
131-
Credit: int32(c.Config.ConsumerCredits),
132-
Properties: linkProperties,
133-
Filters: buildLinkFilters(c.Config),
134-
RequestedSenderSettleMode: requestedSenderSettleMode(c.Config),
135-
OnLinkStateProperties: func(props map[string]any) {
136-
if active, ok := props["rabbitmq:active"]; ok {
137-
if activeBool, ok := active.(bool); ok {
138-
logArgs := []any{"id", c.Id}
139-
if priority, ok := linkProperties["rabbitmq:priority"]; ok {
140-
logArgs = append(logArgs, "priority", priority)
141-
}
142-
if activeBool {
143-
log.Info("consumer is active", logArgs...)
144-
} else {
145-
log.Info("consumer is not active", logArgs...)
146-
}
147-
}
116+
linkProperties := buildLinkProperties(c.Config, c.Id)
117+
receiverOpts := &amqp.ReceiverOptions{
118+
SourceDurability: durability,
119+
Credit: int32(c.Config.ConsumerCredits),
120+
Properties: linkProperties,
121+
Filters: buildLinkFilters(c.Config),
122+
RequestedSenderSettleMode: requestedSenderSettleMode(c.Config),
123+
OnLinkStateProperties: func(props map[string]any) {
124+
if active, ok := props["rabbitmq:active"]; ok {
125+
if activeBool, ok := active.(bool); ok {
126+
logArgs := []any{"id", c.Id}
127+
if priority, ok := linkProperties["rabbitmq:priority"]; ok {
128+
logArgs = append(logArgs, "priority", priority)
129+
}
130+
if activeBool {
131+
log.Info("consumer is active", logArgs...)
132+
} else {
133+
log.Info("consumer is not active", logArgs...)
148134
}
149-
},
150-
}
151-
if c.Config.Amqp.Browse {
152-
receiverOpts.SourceDistributionMode = "copy"
153-
}
154-
receiver, err := c.Session.NewReceiver(ctx, c.Terminus, receiverOpts)
155-
if err != nil {
156-
if err == context.Canceled {
157-
return
158-
}
159-
log.Error("consumer failed to create a receiver", "id", c.Id, "error", err.Error())
160-
select {
161-
case <-ctx.Done():
162-
return
163-
case <-time.After(config.ReconnectDelay):
164135
}
165-
} else {
166-
c.Receiver = receiver
167136
}
168-
}
137+
},
138+
}
139+
if c.Config.Amqp.Browse {
140+
receiverOpts.SourceDistributionMode = "copy"
169141
}
142+
143+
utils.Retry(ctx, config.ReconnectDelay, func() bool {
144+
if c.Session == nil {
145+
return true // session gone; caller must reconnect
146+
}
147+
receiver, err := c.Session.NewReceiver(ctx, c.Terminus, receiverOpts)
148+
if err != nil {
149+
log.Error("consumer failed to create a receiver", "id", c.Id, "error", err.Error())
150+
return false
151+
}
152+
c.Receiver = receiver
153+
return true
154+
})
170155
}
171156

172157
func (c *Amqp10Consumer) Start(consumerReady chan bool) {

pkg/stomp/consumer.go

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ func (c *StompConsumer) Connect() {
5858
c.Subscription = nil
5959
c.Connection = nil
6060

61-
for c.Connection == nil {
61+
utils.Retry(c.ctx, config.ReconnectDelay, func() bool {
6262
uri := utils.NextURI(c.Config.ConsumerUri, &c.whichUri)
6363
useTLS := strings.HasPrefix(uri, "stomp+ssl://") || strings.HasPrefix(uri, "stomps://")
6464
parsedUri := utils.ParseURI(uri, "stomp", "61613")
@@ -69,9 +69,9 @@ func (c *StompConsumer) Connect() {
6969
}
7070

7171
log.Debug("connecting to broker", "id", c.Id, "broker", parsedUri.Broker)
72+
dialer := &net.Dialer{Timeout: dialTimeout}
7273
var conn *stomp.Conn
7374
var err error
74-
dialer := &net.Dialer{Timeout: dialTimeout}
7575
if useTLS {
7676
netConn, tlsErr := tls.DialWithDialer(dialer, "tcp", parsedUri.Broker, &tls.Config{
7777
InsecureSkipVerify: c.Config.InsecureSkipTLSVerify,
@@ -95,19 +95,13 @@ func (c *StompConsumer) Connect() {
9595
}
9696
}
9797
}
98-
9998
if err != nil {
10099
log.Error("consumer connection failed", "id", c.Id, "error", err.Error())
101-
select {
102-
case <-c.ctx.Done():
103-
return
104-
case <-time.After(config.ReconnectDelay):
105-
continue
106-
}
107-
} else {
108-
c.Connection = conn
100+
return false
109101
}
110-
}
102+
c.Connection = conn
103+
return true
104+
})
111105
}
112106

113107
func (c *StompConsumer) Subscribe() {
@@ -125,7 +119,19 @@ func (c *StompConsumer) Subscribe() {
125119
}
126120

127121
func (c *StompConsumer) Start(consumerReady chan bool) {
128-
c.Subscribe()
122+
if !utils.Retry(c.ctx, config.ReconnectDelay, func() bool {
123+
c.Subscribe()
124+
if c.Subscription != nil {
125+
return true
126+
}
127+
log.Error("subscription failed, reconnecting", "id", c.Id, "destination", c.Topic)
128+
c.Connect()
129+
return false
130+
}) {
131+
close(consumerReady)
132+
c.Stop("context cancelled")
133+
return
134+
}
129135
close(consumerReady)
130136
log.Info("consumer started", "id", c.Id, "destination", c.Topic)
131137

@@ -135,13 +141,13 @@ func (c *StompConsumer) Start(consumerReady chan bool) {
135141
}
136142

137143
for i := 1; i <= c.Config.ConsumeCount; {
138-
for c.Subscription == nil {
139-
select {
140-
case <-c.ctx.Done():
144+
if c.Subscription == nil {
145+
if !utils.Retry(c.ctx, config.ReconnectDelay, func() bool {
146+
c.Subscribe()
147+
return c.Subscription != nil
148+
}) {
141149
c.Stop("context cancelled")
142150
return
143-
default:
144-
c.Subscribe()
145151
}
146152
}
147153

pkg/utils/utils.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package utils
22

33
import (
44
"bytes"
5+
"context"
56
"encoding/binary"
67
"fmt"
78
"net/url"
@@ -410,6 +411,22 @@ func (t *OutOfOrderTracker) Check(publisherID int, seq uint64) SequenceCheckResu
410411
return SequenceCheckResult{Status: SequenceOK, LastSeq: lastSeq}
411412
}
412413

414+
// Retry calls f repeatedly until f returns true or ctx is cancelled.
415+
// A fixed delay is inserted between each failed attempt.
416+
// Returns true if f succeeded, false if ctx was cancelled first.
417+
func Retry(ctx context.Context, delay time.Duration, f func() bool) bool {
418+
for {
419+
if f() {
420+
return true
421+
}
422+
select {
423+
case <-ctx.Done():
424+
return false
425+
case <-time.After(delay):
426+
}
427+
}
428+
}
429+
413430
// PastTense converts message outcome verbs to their past tense form for logging.
414431
func PastTense(outcome string) string {
415432
switch outcome {

0 commit comments

Comments
 (0)