Skip to content

MQTT Trigger: Reconnect on initial connection #20

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 59 additions & 15 deletions trigger/mqtt/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ type Trigger struct {
logger log.Logger
options *mqtt.ClientOptions
client mqtt.Client
stop chan struct{}
}
type clientHandler struct {
//client mqtt.Client
Expand Down Expand Up @@ -209,6 +210,8 @@ func (t *Trigger) Initialize(ctx trigger.InitContext) error {

t.logger.Debugf("Client options: %v", options)

t.stop = make(chan struct{}, 1)

t.handlers = make(map[string]*clientHandler)

for _, handler := range ctx.GetHandlers() {
Expand Down Expand Up @@ -250,23 +253,11 @@ func initClientOption(settings *Settings) *mqtt.ClientOptions {

// Start implements trigger.Trigger.Start
func (t *Trigger) Start() error {

client := mqtt.NewClient(t.options)

if token := client.Connect(); token.Wait() && token.Error() != nil {
return token.Error()
}

t.client = client

for _, handler := range t.handlers {
parsed := ParseTopic(handler.settings.Topic)
if token := client.Subscribe(parsed.String(), byte(handler.settings.Qos), t.getHanlder(handler, parsed)); token.Wait() && token.Error() != nil {
t.logger.Errorf("Error subscribing to topic %s: %s", handler.settings.Topic, token.Error())
return token.Error()
}

t.logger.Debugf("Subscribed to topic: %s", handler.settings.Topic)
if err := t.connect(); err != nil {
return err
}

return nil
Expand All @@ -286,10 +277,63 @@ func (t *Trigger) Stop() error {

t.client.Disconnect(250)

t.stop <- struct{}{}

return nil
}

// Connect to the configured MQTT broker. If connection was established successful it subscribes to each configured topic.
func (t *Trigger) connect() error {
if t.settings.AutoReconnect {
go func() {
CONNECT:
for {
select {
case <-t.stop:
return
default:
if token := t.client.Connect(); token.Wait() && token.Error() != nil {
t.logger.Errorf("Could not connect to %s: %s. Try to reconnect in 5 seconds", t.settings.Broker, token.Error())
time.Sleep(time.Second * 5)
continue
}

// If subscribing to topic fails, should we stop the trigger or just keep it running?
if err := t.subscribe(); err != nil {
t.logger.Errorf("Failed to subscribe: %s", err.Error())
}

break CONNECT
}
}
}()
} else {
if token := t.client.Connect(); token.Wait() && token.Error() != nil {
return token.Error()
}

if err := t.subscribe(); err != nil {
return err
}
}
return nil
}

func (t *Trigger) subscribe() error {
for _, handler := range t.handlers {
parsed := ParseTopic(handler.settings.Topic)
if token := t.client.Subscribe(parsed.String(), byte(handler.settings.Qos), t.getHandler(handler, parsed)); token.Wait() && token.Error() != nil {
t.logger.Errorf("Error subscribing to topic %s: %s", handler.settings.Topic, token.Error())
return token.Error()
}

t.logger.Debugf("Subscribed to topic: %s", handler.settings.Topic)
}

return nil
}

func (t *Trigger) getHanlder(handler *clientHandler, parsed Topic) func(mqtt.Client, mqtt.Message) {
func (t *Trigger) getHandler(handler *clientHandler, parsed Topic) func(mqtt.Client, mqtt.Message) {
return func(client mqtt.Client, msg mqtt.Message) {
topic := msg.Topic()
qos := msg.Qos()
Expand Down
54 changes: 53 additions & 1 deletion trigger/mqtt/trigger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ const testConfigLocalgetHandler string = `{
"id": "trigger1-mqtt",
"ref": "github.com/project-flogo/device-contrib/trigger/mqtt",
"settings": {
"autoreconnect": true,
"autoReconnect": true,
"broker": "tcp://localhost:1883",
"cleansess": false,
"enableTLS": false,
Expand Down Expand Up @@ -185,6 +185,58 @@ func TestRestTrigger_Initialize(t *testing.T) {
assert.Nil(t, err)
}

func TestTrigger_Start_Reconnect(t *testing.T) {

command := exec.Command("docker", "stop", "mqtt")
command.Run()

f := &Factory{}

config := &trigger.Config{}
err := json.Unmarshal([]byte(testConfigLocalgetHandler), config)
assert.Nil(t, err)

done := make(chan bool, 1)
actions := map[string]action.Action{"dummyTest": test.NewDummyAction(func() {
done <- true
})}

trg, err := test.InitTrigger(f, config, actions)
assert.Nil(t, err)
assert.NotNil(t, trg)

err = trg.Start()
assert.Nil(t, err)

command = exec.Command("docker", "start", "mqtt")
err = command.Run()
assert.Nil(t, err)

// Give the broker a second to start
time.Sleep(time.Second * 1)

options := mqtt.NewClientOptions()
options.AddBroker("tcp://localhost:1883")
options.SetClientID("TestAbc123")
client := mqtt.NewClient(options)
token := client.Connect()
token.Wait()
assert.Nil(t, token.Error())

token = client.Publish("test/a/b/req/c/d", 0, true, []byte(`{"message": "hello world"}`))
token.Wait()
assert.Nil(t, token.Error())
select {
case <-done:
case <-time.Tick(time.Second * 10):
t.Fatal("didn't get message in time")
}
client.Disconnect(50)

err = trg.Stop()
assert.Nil(t, err)
}

func TestRestTrigger_getHanlder(t *testing.T) {

f := &Factory{}
Expand Down