Skip to content

Commit

Permalink
bugfix of device management (#247)
Browse files Browse the repository at this point in the history
  • Loading branch information
xiejingru authored Mar 29, 2021
1 parent 6784c3b commit b956d61
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 98 deletions.
53 changes: 47 additions & 6 deletions dmcontext/config.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,10 @@
package dmcontext

import (
"github.com/baetyl/baetyl-go/v2/context"
mqtt2 "github.com/baetyl/baetyl-go/v2/mqtt"
v1 "github.com/baetyl/baetyl-go/v2/spec/v1"
)

type SystemConfig struct {
context.SystemConfig `yaml:",inline" json:",inline"`
Devices []DeviceInfo `yaml:"devices,omitempty" json:"devices,omitempty"`
}

type DeviceInfo struct {
Name string `yaml:"name,omitempty" json:"name,omitempty"`
Version string `yaml:"version,omitempty" json:"version,omitempty"`
Expand All @@ -23,3 +18,49 @@ type Topic struct {
Get mqtt2.QOSTopic `yaml:"get,omitempty" json:"get,omitempty"`
GetResponse mqtt2.QOSTopic `yaml:"getResponse,omitempty" json:"getResponse,omitempty"`
}

type DeviceProperty struct {
Name string `yaml:"name,omitempty" json:"name,omitempty"`
Type string `yaml:"type,omitempty" json:"type,omitempty" validate:"regexp=^(int16|int32|int64|float32|float64|string|bool)?$"`
Mode string `yaml:"mode,omitempty" json:"mode,omitempty" validate:"regexp=^(ro|rw)?$"`
Visitor PropertyVisitor `yaml:"visitor,omitempty" json:"visitor,omitempty"`
}

type PropertyVisitor struct {
Modbus *ModbusVisitor `yaml:"modbus,omitempty" json:"modbus,omitempty"`
Opcua *OpcuaVisitor `yaml:"opcua,omitempty" json:"opcua,omitempty"`
Custom *CustomVisitor `yaml:"custom,omitempty" json:"custom,omitempty"`
}

type ModbusVisitor struct {
Function byte `yaml:"function" json:"function" validate:"min=1,max=4"`
Address string `yaml:"address" json:"address"`
Quantity uint16 `yaml:"quantity" json:"quantity"`
Type string `yaml:"type,omitempty" json:"type,omitempty" validate:"regexp=^(int16|int32|int64|float32|float64|string|bool)?$"`
Scale float64 `yaml:"scale" json:"scale"`
SwapByte bool `yaml:"swapByte" json:"swapByte"`
SwapRegister bool `yaml:"swapRegister" json:"swapRegister"`
}

type OpcuaVisitor struct {
NodeID string `yaml:"nodeid,omitempty" json:"nodeid,omitempty"`
Type string `yaml:"type,omitempty" json:"type,omitempty" validate:"regexp=^(int16|int32|int64|float32|float64|string|bool)?$"`
}

type CustomVisitor string

type Event struct {
Type string `yaml:"type,omitempty" json:"type,omitempty"`
Payload interface{} `yaml:"payload,omitempty" json:"payload,omitempty"`
}

type DeviceShadow struct {
Name string `yaml:"name,omitempty" json:"name,omitempty"`
Report v1.Report `yaml:"report,omitempty" json:"report,omitempty"`
Desire v1.Desire `yaml:"desire,omitempty" json:"desire,omitempty"`
}

type driverConfig struct {
Devices []DeviceInfo `yaml:"devices,omitempty" json:"devices,omitempty"`
Driver string `yaml:"driver,omitempty" json:"driver,omitempty"`
}
186 changes: 96 additions & 90 deletions dmcontext/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"sync"
"time"

"gopkg.in/yaml.v2"

"github.com/baetyl/baetyl-go/v2/context"
"github.com/baetyl/baetyl-go/v2/errors"
"github.com/baetyl/baetyl-go/v2/log"
Expand All @@ -16,92 +18,58 @@ import (
)

const (
DefaultAccessConf = "/etc/baetyl/access.yml"
DefaultPropsConf = "/etc/baetyl/props.yml"
DefaultAccessConf = "etc/baetyl/access.yml"
DefaultPropsConf = "etc/baetyl/props.yml"
DefaultDriverConf = "etc/baetyl/conf.yml"
KeyDevice = "device"
KeyStatus = "status"
OnlineStatus = "online"
OfflineStatus = "offline"
TypeReportEvent = "report"
KeySysExtConf = "BAETYL_SYSTEM_EXT_CONF"
)

var (
ErrInvalidMessage = errors.New("invalid device message")
ErrInvalidChannel = errors.New("invalid channel")
ErrResponseChannelNotExist = errors.New("response channel not exist")
ErrAccessConfigNotExist = errors.New("access config not exist")
ErrPropsConfigNotExist = errors.New("properties config not exist")
)

type DeviceProperty struct {
Name string `json:"name,omitempty"`
Type string `json:"type,omitempty" validate:"regexp=^(int16|int32|int64|float32|float64|string|bool)?$"`
Mode string `json:"mode,omitempty" validate:"regexp=^(ro|rw)?$"`
Visitor PropertyVisitor `json:"visitor,omitempty"`
}

type PropertyVisitor struct {
Modbus *ModbusVisitor `json:"modbus,omitempty"`
Opcua *OpcuaVisitor `json:"opcua,omitempty"`
Custom *CustomVisitor `json:"custom,omitempty"`
}

type ModbusVisitor struct {
Function byte `json:"function" validate:"min=1,max=4"`
Address string `json:"address"`
Quantity uint16 `json:"quantity"`
Type string `json:"type,omitempty" validate:"regexp=^(int16|int32|int64|float32|float64|string|bool)?$"`
Scale float64 `json:"scale"`
SwapByte bool `json:"swapByte"`
SwapRegister bool `json:"swapRegister"`
}

type OpcuaVisitor struct {
NodeID string `json:"nodeid,omitempty"`
Type string `json:"type,omitempty" validate:"regexp=^(int16|int32|int64|float32|float64|string|bool)?$"`
}

type CustomVisitor string

type Event struct {
Type string `yaml:"type,omitempty" json:"type,omitempty"`
Payload interface{} `yaml:"payload,omitempty" json:"payload,omitempty"`
}

type DeviceShadow struct {
Name string `json:"name,omitempty"`
Report v1.Report `json:"report,omitempty"`
Desire v1.Desire `json:"desire,omitempty"`
}

type DeltaCallback func(*DeviceInfo, v1.Delta) error

type EventCallback func(*DeviceInfo, *Event) error

type Context interface {
context.Context
SystemConfigExt() *SystemConfig
GetAllDevices() []DeviceInfo
ReportDeviceProperties(*DeviceInfo, v1.Report) error
GetDeviceProperties(info *DeviceInfo) (*DeviceShadow, error)
GetDeviceProperties(device *DeviceInfo) (*DeviceShadow, error)
RegisterDeltaCallback(cb DeltaCallback) error
RegisterEventCallback(cb EventCallback) error
Online(info *DeviceInfo) error
Offline(info *DeviceInfo) error
GetDeviceAccessConfig() (string, error)
GetDevicePropConfigs() (map[string][]DeviceProperty, error)
Online(device *DeviceInfo) error
Offline(device *DeviceInfo) error
GetDriverConfig() string
GetAccessConfig() map[string]string
GetDeviceAccessConfig(device *DeviceInfo) (string, error)
GetPropertiesConfig() map[string][]DeviceProperty
GetDevicePropertiesConfig(device *DeviceInfo) ([]DeviceProperty, error)
Start()
io.Closer
}

type DmCtx struct {
context.Context
log *log.Logger
mqtt *mqtt2.Client
tomb utils.Tomb
eventCb EventCallback
deltaCb DeltaCallback
response sync.Map
msgChs map[string]chan *v1.Message
log *log.Logger
mqtt *mqtt2.Client
tomb utils.Tomb
eventCb EventCallback
deltaCb DeltaCallback
response sync.Map
devices map[string]DeviceInfo
msgChs map[string]chan *v1.Message
driverConfig string
propsConfig map[string][]DeviceProperty
accessConfig map[string]string
}

func NewContext(confFile string) Context {
Expand All @@ -119,17 +87,31 @@ func NewContext(confFile string) Context {
lfs = append(lfs, log.Any("service", c.ServiceName()))
}
c.log = log.With(lfs...)
sc := new(SystemConfig)
if err := c.LoadCustomConfig(sc); err != nil {
c.log.Error("failed to load system config, to use default config", log.Error(err))
utils.UnmarshalYAML(nil, sc)

if err := unmarshalYAML(DefaultAccessConf, &c.accessConfig); err != nil {
c.log.Error("failed to load access config, to use default config", log.Error(err))
utils.UnmarshalYAML(nil, &c.accessConfig)
}

if err := unmarshalYAML(DefaultPropsConf, &c.propsConfig); err != nil {
c.log.Error("failed to load props config, to use default config", log.Error(err))
utils.UnmarshalYAML(nil, &c.propsConfig)
}
c.Store(KeySysExtConf, sc)

var dCfg driverConfig
if err := unmarshalYAML(DefaultDriverConf, &dCfg); err != nil {
c.log.Error("failed to load driver config, to use default config", log.Error(err))
utils.UnmarshalYAML(nil, &dCfg)
}
c.driverConfig = dCfg.Driver

devices := make(map[string]DeviceInfo)
var subs []mqtt2.QOSTopic
for _, dev := range sc.Devices {
for _, dev := range dCfg.Devices {
subs = append(subs, dev.Delta, dev.Event, dev.GetResponse)
devices[dev.Name] = dev
}
c.devices = devices
mqtt, err := c.Context.NewSystemBrokerClient(subs)
if err != nil {
c.log.Warn("fail to create system broker client", log.Any("error", err))
Expand All @@ -143,9 +125,8 @@ func NewContext(confFile string) Context {
}

func (c *DmCtx) Start() {
devices := c.SystemConfigExt().Devices
for _, dev := range devices {
c.msgChs[dev.Name] = make(chan *v1.Message, 1024)
for name, dev := range c.devices {
c.msgChs[name] = make(chan *v1.Message, 1024)
go c.processing(c.msgChs[dev.Name])
}
}
Expand All @@ -159,7 +140,7 @@ func (c *DmCtx) Close() error {
}

func (c *DmCtx) processDelta(msg *v1.Message) error {
device := msg.Metadata[KeyDevice]
deviceName := msg.Metadata[KeyDevice]
if c.deltaCb == nil {
c.log.Debug("delta callback not set and message will not be process")
return nil
Expand All @@ -168,14 +149,19 @@ func (c *DmCtx) processDelta(msg *v1.Message) error {
if err := msg.Content.Unmarshal(&delta); err != nil {
return errors.Trace(err)
}
if err := c.deltaCb(&DeviceInfo{Name: device}, delta); err != nil {
dev, ok := c.devices[deviceName]
if !ok {
c.log.Warn("delta callback can not find device", log.Any("device", deviceName))
return nil
}
if err := c.deltaCb(&dev, delta); err != nil {
return errors.Trace(err)
}
return nil
}

func (c *DmCtx) processEvent(msg *v1.Message) error {
device := msg.Metadata[KeyDevice]
deviceName := msg.Metadata[KeyDevice]
if c.eventCb == nil {
c.log.Debug("event callback not set and message will not be process")
return nil
Expand All @@ -184,7 +170,12 @@ func (c *DmCtx) processEvent(msg *v1.Message) error {
if err := msg.Content.Unmarshal(&event); err != nil {
return errors.Trace(err)
}
if err := c.eventCb(&DeviceInfo{Name: device}, &event); err != nil {
dev, ok := c.devices[deviceName]
if !ok {
c.log.Warn("event callback can not find device", log.Any("device", deviceName))
return nil
}
if err := c.eventCb(&dev, &event); err != nil {
return errors.Trace(err)
}
return nil
Expand Down Expand Up @@ -247,16 +238,12 @@ func (c *DmCtx) processing(ch chan *v1.Message) {
}
}

func (c *DmCtx) SystemConfigExt() *SystemConfig {
v, ok := c.Load(KeySysExtConf)
if !ok {
return nil
}
return v.(*SystemConfig)
}

func (c *DmCtx) GetAllDevices() []DeviceInfo {
return c.SystemConfigExt().Devices
var deviceList []DeviceInfo
for _, dev := range c.devices {
deviceList = append(deviceList, dev)
}
return deviceList
}

func (c *DmCtx) ReportDeviceProperties(info *DeviceInfo, report v1.Report) error {
Expand Down Expand Up @@ -361,18 +348,37 @@ func (c *DmCtx) Offline(info *DeviceInfo) error {
return nil
}

func (c *DmCtx) GetDeviceAccessConfig() (string, error) {
res, err := ioutil.ReadFile(DefaultAccessConf)
if err != nil {
return "", errors.Trace(err)
func (c *DmCtx) GetDriverConfig() string {
return c.driverConfig
}
func (c *DmCtx) GetAccessConfig() map[string]string {
return c.accessConfig
}

func (c *DmCtx) GetDeviceAccessConfig(device *DeviceInfo) (string, error) {
if cfg, ok := c.accessConfig[device.Name]; ok {
return cfg, nil
} else {
return "", ErrAccessConfigNotExist
}
return string(res), nil
}

func (c *DmCtx) GetDevicePropConfigs() (map[string][]DeviceProperty, error) {
var res map[string][]DeviceProperty
if err := c.LoadCustomConfig(&res, DefaultPropsConf); err != nil {
return nil, errors.Trace(err)
func (c *DmCtx) GetPropertiesConfig() map[string][]DeviceProperty {
return c.propsConfig
}

func (c *DmCtx) GetDevicePropertiesConfig(device *DeviceInfo) ([]DeviceProperty, error) {
if cfg, ok := c.propsConfig[device.Name]; ok {
return cfg, nil
} else {
return nil, ErrPropsConfigNotExist
}
}

func unmarshalYAML(file string, out interface{}) error {
bs, err := ioutil.ReadFile(file)
if err != nil {
return err
}
return res, nil
return yaml.Unmarshal(bs, out)
}
4 changes: 2 additions & 2 deletions spec/v1/lazy_value_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ func TestSpecV1_LazyValue(t *testing.T) {
s := "test"
b := true
msg4 := &Message{
Kind: MessageReport,
Content: LazyValue{Value: map[string]interface{}{"int64": i64, "string": s, "bool": b}},
Kind: MessageReport,
Content: LazyValue{Value: map[string]interface{}{"int64": i64, "string": s, "bool": b}},
}
data5, err := json.Marshal(msg4)
assert.NoError(t, err)
Expand Down

0 comments on commit b956d61

Please sign in to comment.