Skip to content

Commit c0fca00

Browse files
committed
MQTT: handle subscription errors
1 parent f4c9db2 commit c0fca00

3 files changed

Lines changed: 38 additions & 8 deletions

File tree

go.sum

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -219,14 +219,10 @@ github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+W
219219
github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk=
220220
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
221221
github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU=
222-
github.com/onsi/ginkgo/v2 v2.28.1 h1:S4hj+HbZp40fNKuLUQOYLDgZLwNUVn19N3Atb98NCyI=
223-
github.com/onsi/ginkgo/v2 v2.28.1/go.mod h1:CLtbVInNckU3/+gC8LzkGUb9oF+e8W8TdUsxPwvdOgE=
224222
github.com/onsi/ginkgo/v2 v2.28.3 h1:4JvMdwtFU0imd8fHx25OJXoDMRexnf8v5NHKYSTTji4=
225223
github.com/onsi/ginkgo/v2 v2.28.3/go.mod h1:+aXOY+vzZ5mu2iI2HpTZUPmM//oQfsNFX6gU9kNcA44=
226224
github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
227225
github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
228-
github.com/onsi/gomega v1.39.1 h1:1IJLAad4zjPn2PsnhH70V4DKRFlrCzGBNrNaru+Vf28=
229-
github.com/onsi/gomega v1.39.1/go.mod h1:hL6yVALoTOxeWudERyfppUcZXjMwIMLnuSfruD2lcfg=
230226
github.com/onsi/gomega v1.40.0 h1:Vtol0e1MghCD2ZVIilPDIg44XSL9l2QAn8ZNaljWcJc=
231227
github.com/onsi/gomega v1.40.0/go.mod h1:M/Uqpu/8qTjtzCLUA2zJHX9Iilrau25x1PdoSRbWh5A=
232228
github.com/orisano/pixelmatch v0.0.0-20220722002657-fb0b55479cde/go.mod h1:nZgzbfBr3hhjoZnS66nKrHmduYNpc34ny7RK4z5/HM0=

pkg/mqtt/consumer.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,19 @@ func (c MqttConsumer) Start(cosumerReady chan bool) {
7474
token := client.Subscribe(topic, byte(c.Config.MqttConsumer.QoS), handler)
7575
token.Wait()
7676
if token.Error() != nil {
77-
log.Error("failed to subscribe", "id", c.Id, "error", token.Error())
77+
log.Error("failed to subscribe, reconnecting", "id", c.Id, "topic", topic, "error", token.Error())
78+
go func() {
79+
select {
80+
case <-c.ctx.Done():
81+
return
82+
case <-time.After(config.ReconnectDelay):
83+
}
84+
client.Disconnect(250)
85+
if t := client.Connect(); t.Wait() && t.Error() != nil {
86+
log.Error("consumer reconnect failed", "id", c.Id, "error", t.Error())
87+
}
88+
}()
89+
return
7890
}
7991
log.Info("consumer subscribed", "id", c.Id, "topic", topic)
8092
}

pkg/mqtt/consumer_v5.go

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -106,12 +106,34 @@ func (c Mqtt5Consumer) Start(consumerReady chan bool) {
106106
Topic: topic,
107107
QoS: byte(c.Config.MqttConsumer.QoS),
108108
})
109-
log.Info("consumer subscribing", "id", c.Id, "topic", topic)
110109
}
111-
if _, err := cm.Subscribe(context.Background(), &paho.Subscribe{
110+
if _, err := cm.Subscribe(c.ctx, &paho.Subscribe{
112111
Subscriptions: subscriptions,
113112
}); err != nil {
114-
fmt.Printf("failed to subscribe (%s). This is likely to mean no messages will be received.", err)
113+
log.Error("failed to subscribe, retrying", "id", c.Id, "error", err)
114+
go func() {
115+
for {
116+
select {
117+
case <-c.ctx.Done():
118+
return
119+
case <-time.After(config.ReconnectDelay):
120+
}
121+
if _, retryErr := cm.Subscribe(c.ctx, &paho.Subscribe{
122+
Subscriptions: subscriptions,
123+
}); retryErr == nil {
124+
for _, sub := range subscriptions {
125+
log.Info("consumer subscribed", "id", c.Id, "topic", sub.Topic)
126+
}
127+
return
128+
} else {
129+
log.Error("failed to subscribe, retrying", "id", c.Id, "error", retryErr)
130+
}
131+
}
132+
}()
133+
} else {
134+
for _, sub := range subscriptions {
135+
log.Info("consumer subscribed", "id", c.Id, "topic", sub.Topic)
136+
}
115137
}
116138
},
117139
OnConnectError: func(err error) {

0 commit comments

Comments
 (0)