Skip to content

Commit

Permalink
refactor of device management (#249)
Browse files Browse the repository at this point in the history
  • Loading branch information
xiejingru authored Mar 31, 2021
1 parent b32706e commit 7b32a8f
Show file tree
Hide file tree
Showing 2 changed files with 177 additions and 13 deletions.
70 changes: 70 additions & 0 deletions dmcontext/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package dmcontext

import (
"time"

mqtt2 "github.com/baetyl/baetyl-go/v2/mqtt"
v1 "github.com/baetyl/baetyl-go/v2/spec/v1"
)
Expand All @@ -19,6 +21,74 @@ type Topic struct {
GetResponse mqtt2.QOSTopic `yaml:"getResponse,omitempty" json:"getResponse,omitempty"`
}

func (a *AccessConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
var modbus ModbusAccessConfig
if err := unmarshal(&modbus); err == nil {
a.Modbus = &modbus
return nil
}
var opcua OpcuaAccessConfig
if err := unmarshal(&opcua); err == nil {
a.Opcua = &opcua
return nil
}
var custom CustomAccessConfig
if err := unmarshal(&custom); err != nil {
return err
}
a.Custom = &custom
return nil
}

type AccessConfig struct {
Modbus *ModbusAccessConfig `yaml:"modbus,omitempty" json:"modbus,omitempty"`
Opcua *OpcuaAccessConfig `yaml:"opcua,omitempty" json:"opcua,omitempty"`
Custom *CustomAccessConfig `yaml:"custom,omitempty" json:"custom,omitempty"`
}

type ModbusAccessConfig struct {
Tcp *TcpConfig `yaml:"tcp,omitempty" json:"tcp,omitempty"`
Rtu *RtuConfig `yaml:"rtu,omitempty" json:"rtu,omitempty"`
}

type TcpConfig struct {
Address string `yaml:"address,omitempty" json:"address,omitempty" validate:"required"`
Port uint16 `yaml:"port,omitempty" json:"port,omitempty" validate:"required"`
}

type RtuConfig struct {
Port string `yaml:"port,omitempty" json:"port,omitempty" validate:"required"`
BaudRate int `yaml:"baudrate,omitempty" json:"baudrate,omitempty" default:"19200"`
Parity string `yaml:"parity,omitempty" json:"parity,omitempty" default:"E" validate:"regexp=^(E|N|O)?$"`
DataBit int `yaml:"databit,omitempty" json:"databit,omitempty" default:"8" validate:"min=5, max=8"`
StopBit int `yaml:"stopbit,omitempty" json:"stopbit,omitempty" default:"1" validate:"min=1, max=2"`
}

type OpcuaAccessConfig struct {
ID byte `yaml:"id,omitempty" json:"id,omitempty"`
Timeout time.Duration `yaml:"timeout,omitempty" json:"timeout,omitempty"`
Security OpcuaSecurity `yaml:"security,omitempty" json:"security,omitempty"`
Auth OpcuaAuth `yaml:"auth,omitempty" json:"auth,omitempty"`
Certificate OpcuaCertificate `yaml:"certificate,omitempty" json:"certificate,omitempty"`
}

type OpcuaSecurity struct {
Policy string `yaml:"policy,omitempty" json:"policy,omitempty"`
Mode string `yaml:"mode,omitempty" json:"mode,omitempty"`
}

type OpcuaAuth struct {
Username string `yaml:"username,omitempty" json:"username,omitempty"`
Password string `yaml:"password,omitempty" json:"password,omitempty"`
}

type OpcuaCertificate struct {
Cert string `yaml:"certFile,omitempty" json:"certFile,omitempty"`
Key string `yaml:"keyFile,omitempty" json:"keyFile,omitempty"`
}

type CustomAccessConfig string

type DeviceProperty struct {
Name string `yaml:"name,omitempty" json:"name,omitempty"`
Type string `yaml:"type,omitempty" json:"type,omitempty" validate:"regexp=^(int16|int32|int64|float32|float64|string|bool)?$"`
Expand Down
120 changes: 107 additions & 13 deletions dmcontext/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/json"
"io"
"io/ioutil"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -34,6 +35,18 @@ var (
ErrResponseChannelNotExist = errors.New("response channel not exist")
ErrAccessConfigNotExist = errors.New("access config not exist")
ErrPropsConfigNotExist = errors.New("properties config not exist")
ErrDeviceNotExist = errors.New("device not exist")
ErrTypeNotSupported = errors.New("type not supported")
)

const (
TypeInt16 = "int16"
TypeInt32 = "int32"
TypeInt64 = "int64"
TypeFloat32 = "float32"
TypeFloat64 = "float64"
TypeBool = "bool"
TypeString = "string"
)

type DeltaCallback func(*DeviceInfo, v1.Delta) error
Expand All @@ -49,8 +62,8 @@ type Context interface {
Online(device *DeviceInfo) error
Offline(device *DeviceInfo) error
GetDriverConfig() string
GetAccessConfig() map[string]string
GetDeviceAccessConfig(device *DeviceInfo) (string, error)
GetAccessConfig() map[string]AccessConfig
GetDeviceAccessConfig(device *DeviceInfo) (*AccessConfig, error)
GetPropertiesConfig() map[string][]DeviceProperty
GetDevicePropertiesConfig(device *DeviceInfo) ([]DeviceProperty, error)
Start()
Expand All @@ -69,7 +82,7 @@ type DmCtx struct {
msgChs map[string]chan *v1.Message
driverConfig string
propsConfig map[string][]DeviceProperty
accessConfig map[string]string
accessConfig map[string]AccessConfig
}

func NewContext(confFile string) Context {
Expand Down Expand Up @@ -146,13 +159,16 @@ func (c *DmCtx) processDelta(msg *v1.Message) error {
return nil
}
var delta v1.Delta
if err := msg.Content.Unmarshal(&delta); err != nil {
if err := msg.Content.ExactUnmarshal(&delta); err != nil {
return errors.Trace(err)
}
dev, ok := c.devices[deviceName]
if !ok {
c.log.Warn("delta callback can not find device", log.Any("device", deviceName))
return nil
return errors.Trace(ErrDeviceNotExist)
}
delta, err := c.parsePropertyValues(deviceName, delta)
if err != nil {
return errors.Trace(err)
}
if err := c.deltaCb(&dev, delta); err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -182,8 +198,8 @@ func (c *DmCtx) processEvent(msg *v1.Message) error {
}

func (c *DmCtx) processResponse(msg *v1.Message) error {
device := msg.Metadata[KeyDevice]
val, ok := c.response.Load(device)
deviceName := msg.Metadata[KeyDevice]
val, ok := c.response.Load(deviceName)
if !ok {
return errors.Trace(ErrResponseChannelNotExist)
}
Expand All @@ -192,12 +208,21 @@ func (c *DmCtx) processResponse(msg *v1.Message) error {
return errors.Trace(ErrInvalidChannel)
}
var shad *DeviceShadow
if err := msg.Content.Unmarshal(&shad); err != nil {
if err := msg.Content.ExactUnmarshal(&shad); err != nil {
return errors.Trace(err)
}
if !ok {
return errors.Trace(ErrInvalidMessage)
}
var err error
shad.Report, err = c.parsePropertyValues(deviceName, shad.Report)
if err != nil {
return errors.Trace(err)
}
shad.Desire, err = c.parsePropertyValues(deviceName, shad.Desire)
if err != nil {
return errors.Trace(err)
}
select {
case ch <- shad:
default:
Expand Down Expand Up @@ -351,15 +376,15 @@ func (c *DmCtx) Offline(info *DeviceInfo) error {
func (c *DmCtx) GetDriverConfig() string {
return c.driverConfig
}
func (c *DmCtx) GetAccessConfig() map[string]string {
func (c *DmCtx) GetAccessConfig() map[string]AccessConfig {
return c.accessConfig
}

func (c *DmCtx) GetDeviceAccessConfig(device *DeviceInfo) (string, error) {
func (c *DmCtx) GetDeviceAccessConfig(device *DeviceInfo) (*AccessConfig, error) {
if cfg, ok := c.accessConfig[device.Name]; ok {
return cfg, nil
return &cfg, nil
} else {
return "", ErrAccessConfigNotExist
return nil, ErrAccessConfigNotExist
}
}

Expand All @@ -382,3 +407,72 @@ func unmarshalYAML(file string, out interface{}) error {
}
return yaml.Unmarshal(bs, out)
}

func (c *DmCtx) parsePropertyValues(devName string, props map[string]interface{}) (map[string]interface{}, error) {
res := make(map[string]interface{})
vals, ok := c.propsConfig[devName]
if !ok {
return nil, errors.Trace(ErrDeviceNotExist)
}
cfgs := make(map[string]DeviceProperty)
for _, val := range vals {
cfgs[val.Name] = val
}
for key, val := range props {
if cfg, ok := cfgs[key]; ok {
pVal, err := parsePropertyValue(cfg.Type, val)
if err != nil {
return nil, errors.Trace(err)
}
res[key] = pVal
} else {
return nil, errors.Trace(ErrPropsConfigNotExist)
}
}
return res, nil
}

func parsePropertyValue(tpy string, val interface{}) (interface{}, error) {
// it is json.Number (string actually) when val is number
switch tpy {
case TypeInt16:
num, _ := val.(json.Number)
i, err := strconv.ParseInt(num.String(), 10, 16)
if err != nil {
return nil, errors.Trace(err)
}
return int16(i), nil
case TypeInt32:
num, _ := val.(json.Number)
i, err := strconv.ParseInt(num.String(), 10, 32)
if err != nil {
return nil, errors.Trace(err)
}
return int32(i), nil
case TypeInt64:
num, _ := val.(json.Number)
i, err := strconv.ParseInt(num.String(), 10, 64)
if err != nil {
return nil, errors.Trace(err)
}
return i, nil
case TypeFloat32:
num, _ := val.(json.Number)
f, err := strconv.ParseFloat(num.String(), 32)
if err != nil {
return nil, errors.Trace(err)
}
return float32(f), nil
case TypeFloat64:
num, _ := val.(json.Number)
f, err := strconv.ParseFloat(num.String(), 64)
if err != nil {
return nil, errors.Trace(err)
}
return f, nil
case TypeBool, TypeString:
return val, nil
default:
return nil, errors.Trace(ErrTypeNotSupported)
}
}

0 comments on commit 7b32a8f

Please sign in to comment.