Skip to content

Mqtt shared Connection Support #14

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,5 @@

#VS code
.vscode/

mqtt-shared
28 changes: 23 additions & 5 deletions activity/mqtt/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,14 @@ func New(ctx activity.InitContext) (activity.Activity, error) {
if err != nil {
return nil, err
}

if settings.SharedConnection {

act := &Activity{
settings: settings,
}
return act, nil
}

options := initClientOption(ctx.Logger(), settings)

if strings.HasPrefix(settings.Broker, "ssl") {
Expand Down Expand Up @@ -134,6 +141,8 @@ func New(ctx activity.InitContext) (activity.Activity, error) {
topic: ParseTopic(settings.Topic),
}
return act, nil


}

type Activity struct {
Expand All @@ -155,14 +164,23 @@ func (a *Activity) Eval(ctx activity.Context) (done bool, err error) {
if err != nil {
return true, err
}

topic := a.settings.Topic
if params := input.TopicParams; len(params) > 0 {
topic = a.topic.String(params)
}
if token := a.client.Publish(topic, byte(a.settings.Qos), a.settings.Retain, input.Message); token.Wait() && token.Error() != nil {
ctx.Logger().Debugf("Error in publishing: %v", err)
return true, token.Error()

if input.Connection != nil {
ctx.Logger().Info("Using Shared Connection to publish..", input.Message)
if token := input.Connection.GetConnection().(mqtt.Client).Publish(topic, byte(a.settings.Qos), a.settings.Retain, input.Message); token.Wait() && token.Error() != nil {
ctx.Logger().Info("Error in publishing..")
return true, token.Error()
}

}else {
if token := a.client.Publish(topic, byte(a.settings.Qos), a.settings.Retain, input.Message); token.Wait() && token.Error() != nil {
ctx.Logger().Debugf("Error in publishing: %v", err)
return true, token.Error()
}
}

ctx.Logger().Debugf("Published Message: %v", input.Message)
Expand Down
2 changes: 1 addition & 1 deletion activity/mqtt/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.12
require (
github.com/eclipse/paho.mqtt.golang v1.2.0
github.com/pkg/errors v0.8.1 // indirect
github.com/project-flogo/core v0.9.0
github.com/project-flogo/core v0.9.3-0.20190726142805-ef75331bd75a
github.com/stretchr/testify v1.3.0
golang.org/x/net v0.0.0-20190514140710-3ec191127204 // indirect
)
14 changes: 11 additions & 3 deletions activity/mqtt/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,29 @@ package mqtt

import (
"github.com/project-flogo/core/data/coerce"
"github.com/project-flogo/core/support/connection"
)

type Settings struct {
Broker string `md:"broker,required"` // The broker URL
Id string `md:"id,required"` // The id of client
Broker string `md:"broker"` // The broker URL
Id string `md:"id"` // The id of client
Username string `md:"username"` // The user's name
Password string `md:"password"` // The user's password
Store string `md:"store"` // The store for message persistence
CleanSession bool `md:"cleanSession"` // Clean session flag

Retain bool `md:"retain"` // Retain Messages
Topic string `md:"topic,required"` // The topic to publish to
Topic string `md:"topic,required"` // The topic to publish to
Qos int `md:"qos"` // The Quality of Service
SSLConfig map[string]interface{} `md:"sslConfig"` // SSL Configuration
SharedConnection bool `md:"sharedconnection,required"`

}

type Input struct {
Message interface{} `md:"message"` // The message to send
TopicParams map[string]string `md:"topicParams"` // The topic parameters
Connection connection.Manager `md:"connection"`
}

type Output struct {
Expand All @@ -31,6 +35,7 @@ func (i *Input) ToMap() map[string]interface{} {
return map[string]interface{}{
"message": i.Message,
"topicParams": i.TopicParams,
"connection": i.Connection,
}
}

Expand All @@ -41,6 +46,9 @@ func (i *Input) FromMap(values map[string]interface{}) error {
if err != nil {
return err
}
if values["connection"] != nil {
i.Connection, err = coerce.ToConnection(values["connection"])
}
return nil
}

Expand Down
107 changes: 107 additions & 0 deletions connections/mqtt/connection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package mqtt

import (
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/project-flogo/core/data/coerce"
"github.com/project-flogo/core/data/metadata"
"github.com/project-flogo/core/support/connection"
)

func init() {

connection.RegisterManagerFactory(&Factory{})
}

type Settings struct {
Broker string `md:"broker"` //The Broker to connect to
Id string `md:"id"` // Id of the client
User string `md:"user"` // User name of the client
Password string `md:"password"` //Password of the client
Store string `md:"store"` //Cert Store
Cleansess bool `md:"cleansess"` //Cleansess flag
Close uint `md:"close"` //Time in millisecond to disconnect
}

type MqttSharedConn struct {
settings *Settings
conn mqtt.Client
}

type Factory struct {
}

func (f *Factory) Type() string {

return "mqtt:paho.mqtt.golang"
}

func (*Factory) NewManager(settings map[string]interface{}) (connection.Manager, error) {
settingStruct := &Settings{}
err := metadata.MapToStruct(settings, settingStruct, true)
if err != nil {
return nil, err
}
conn, err := getMqttConnection(settingStruct)
if err != nil {
//fmt.Printf("Mqtt Client initialization got error: [%s]", err.Error())
return nil, err
}
if token := conn.Connect(); token.Wait() && token.Error() != nil {
return nil, token.Error()
}

sharedConn := &MqttSharedConn{settings: settingStruct, conn: conn}

return sharedConn, nil
}

func (h *MqttSharedConn) Type() string {

return "mqtt:paho.mqtt.golang"
}

func (h *MqttSharedConn) GetConnection() interface{} {

return h.conn
}

func (h *MqttSharedConn) ReleaseConnection(connection interface{}) {

h.conn.Disconnect(h.settings.Close)

}

func (h *MqttSharedConn) Start() error {

return nil
}

func getMqttConnection(settings *Settings) (mqtt.Client, error) {
options := initClientOption(settings)

mqttClient := mqtt.NewClient(options)

return mqttClient, nil
}

func initClientOption(settings *Settings) *mqtt.ClientOptions {

opts := mqtt.NewClientOptions()
opts.AddBroker(settings.Broker)
opts.SetClientID(settings.Id)
opts.SetUsername(settings.User)
opts.SetPassword(settings.Password)
b, err := coerce.ToBool(settings.Cleansess)
if err != nil {
//log.Error("Error converting \"cleansess\" to a boolean ", err.Error())
return nil
}
opts.SetCleanSession(b)
if storeType := settings.Store; storeType != ":memory:" {
if settings.Store != "" {
opts.SetStore(mqtt.NewFileStore(settings.Store))
}

}
return opts
}
11 changes: 11 additions & 0 deletions connections/mqtt/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
module github.com/project-flogo/edge-contrib/connections/mqtt

go 1.12

require (
github.com/eclipse/paho.mqtt.golang v1.2.0
github.com/project-flogo/core v0.9.3-0.20190726142805-ef75331bd75a
github.com/stretchr/objx v0.2.0 // indirect
go.uber.org/zap v1.10.0 // indirect
golang.org/x/net v0.0.0-20190909003024-a7b16738d86b // indirect
)
24 changes: 24 additions & 0 deletions connections/mqtt/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/eclipse/paho.mqtt.golang v1.2.0 h1:1F8mhG9+aO5/xpdtFkW4SxOJB67ukuDC3t2y2qayIX0=
github.com/eclipse/paho.mqtt.golang v1.2.0/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7joQ8SYLhZwfeOo6Ts=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/project-flogo/core v0.9.3-0.20190726142805-ef75331bd75a h1:6S6rhPgntonQfPxNaW2E7AiESFt0beU34f7ZfV2p9mk=
github.com/project-flogo/core v0.9.3-0.20190726142805-ef75331bd75a/go.mod h1:QGWi7TDLlhGUaYH3n/16ImCuulbEHGADYEXyrcHhX7U=
github.com/project-flogo/core v0.9.3 h1:uZXHR9j1Byqt+x3faNnOqB8NlEfwE2gpCh40iQ+44oA=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/xeipuuv/gojsonschema v1.1.0/go.mod h1:5yf86TLmAcydyeJq5YvxkGPE2fm/u4myDekKRoLuqhs=
go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM=
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/net v0.0.0-20190909003024-a7b16738d86b h1:XfVGCX+0T4WOStkaOsJRllbsiImhB2jgVBGc9L0lPGc=
golang.org/x/net v0.0.0-20190909003024-a7b16738d86b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
71 changes: 71 additions & 0 deletions example/mqtt-shared/flogo.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
{

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mellistibco should we have a top-level "examples" directory in this repo? or should we have an examples directory under the mqtt activity and mqtt trigger?

Maybe we should create a top-level mqtt directory like we are doing for mongodb and have an examples directory within it. and leave the old mqtt as is.

"name": "mqtt-shared",
"type": "flogo:app",
"version": "0.0.1",
"description": "",
"appModel": "1.0.0",
"imports": [
"github.com/project-flogo/contrib/activity/log",
"github.com/project-flogo/flow",
"github.com/project-flogo/edge-contrib/connections/mqtt",
"github.com/project-flogo/contrib/trigger/timer",
"github.com/project-flogo/edge-contrib/activity/mqtt"
],
"triggers": [
{
"id": "flogo-time",
"ref": "#timer",
"settings": null,
"handlers": [
{
"settings": null,
"actions": [
{
"ref": "#flow",
"settings": {
"flowURI": "res://flow:test"
}
}
]
}
]
}
],
"connections":{
"mymqttconn":{
"ref":"github.com/project-flogo/edge-contrib/connections/mqtt",
"settings": {
"broker" : "tcp://localhost:1883",
"id": "sender_1"
}
}
},
"resources": [
{
"id": "flow:test",
"data": {
"name": "test",
"description": "A sample flow",
"tasks": [
{
"id": "Mqtt-Activity",
"name": "Send Mqtt Message using Shared Activity",
"activity": {
"ref": "github.com/project-flogo/edge-contrib/activity/mqtt",
"settings": {
"sharedconnection": true,
"topic":"led",
"qos": "2",
"retain":true
},
"input": {
"connection": "conn://mymqttconn",
"message": "SAmple"
}
}
}
]
}
}
]
}
18 changes: 18 additions & 0 deletions example/mqtt-shared/src/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
module main

go 1.12

require (
github.com/project-flogo/contrib/activity/log v0.9.0
github.com/project-flogo/contrib/trigger/timer v0.9.0
github.com/project-flogo/core v0.9.4-0.20190829220729-31eb91f2b8a7
github.com/project-flogo/edge-contrib/activity/mqtt v0.0.0-20190715122927-42d43a13e2a9
github.com/project-flogo/edge-contrib/connections/mqtt v0.0.0
github.com/project-flogo/flow v0.9.3
github.com/stretchr/testify v1.4.0 // indirect
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
)

replace github.com/project-flogo/edge-contrib/connections/mqtt => ../../../connections/mqtt

replace github.com/project-flogo/edge-contrib/activity/mqtt v0.0.0-20190715122927-42d43a13e2a9 => /Users/skothari-tibco/new-contrib/edge-contrib/edge-contrib/activity/mqtt/
Loading