From 7b32a8f06e00d462641c3ede9173b3302862cde6 Mon Sep 17 00:00:00 2001 From: xiejingru Date: Wed, 31 Mar 2021 19:14:18 +0800 Subject: [PATCH] refactor of device management (#249) --- dmcontext/config.go | 70 +++++++++++++++++++++++++ dmcontext/context.go | 120 ++++++++++++++++++++++++++++++++++++++----- 2 files changed, 177 insertions(+), 13 deletions(-) diff --git a/dmcontext/config.go b/dmcontext/config.go index 66659113..28db9fc1 100644 --- a/dmcontext/config.go +++ b/dmcontext/config.go @@ -1,6 +1,8 @@ package dmcontext import ( + "time" + mqtt2 "github.com/baetyl/baetyl-go/v2/mqtt" v1 "github.com/baetyl/baetyl-go/v2/spec/v1" ) @@ -19,6 +21,74 @@ type Topic struct { GetResponse mqtt2.QOSTopic `yaml:"getResponse,omitempty" json:"getResponse,omitempty"` } +func (a *AccessConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { + var modbus ModbusAccessConfig + if err := unmarshal(&modbus); err == nil { + a.Modbus = &modbus + return nil + } + var opcua OpcuaAccessConfig + if err := unmarshal(&opcua); err == nil { + a.Opcua = &opcua + return nil + } + var custom CustomAccessConfig + if err := unmarshal(&custom); err != nil { + return err + } + a.Custom = &custom + return nil +} + +type AccessConfig struct { + Modbus *ModbusAccessConfig `yaml:"modbus,omitempty" json:"modbus,omitempty"` + Opcua *OpcuaAccessConfig `yaml:"opcua,omitempty" json:"opcua,omitempty"` + Custom *CustomAccessConfig `yaml:"custom,omitempty" json:"custom,omitempty"` +} + +type ModbusAccessConfig struct { + Tcp *TcpConfig `yaml:"tcp,omitempty" json:"tcp,omitempty"` + Rtu *RtuConfig `yaml:"rtu,omitempty" json:"rtu,omitempty"` +} + +type TcpConfig struct { + Address string `yaml:"address,omitempty" json:"address,omitempty" validate:"required"` + Port uint16 `yaml:"port,omitempty" json:"port,omitempty" validate:"required"` +} + +type RtuConfig struct { + Port string `yaml:"port,omitempty" json:"port,omitempty" validate:"required"` + BaudRate int `yaml:"baudrate,omitempty" json:"baudrate,omitempty" default:"19200"` + Parity string `yaml:"parity,omitempty" json:"parity,omitempty" default:"E" validate:"regexp=^(E|N|O)?$"` + DataBit int `yaml:"databit,omitempty" json:"databit,omitempty" default:"8" validate:"min=5, max=8"` + StopBit int `yaml:"stopbit,omitempty" json:"stopbit,omitempty" default:"1" validate:"min=1, max=2"` +} + +type OpcuaAccessConfig struct { + ID byte `yaml:"id,omitempty" json:"id,omitempty"` + Timeout time.Duration `yaml:"timeout,omitempty" json:"timeout,omitempty"` + Security OpcuaSecurity `yaml:"security,omitempty" json:"security,omitempty"` + Auth OpcuaAuth `yaml:"auth,omitempty" json:"auth,omitempty"` + Certificate OpcuaCertificate `yaml:"certificate,omitempty" json:"certificate,omitempty"` +} + +type OpcuaSecurity struct { + Policy string `yaml:"policy,omitempty" json:"policy,omitempty"` + Mode string `yaml:"mode,omitempty" json:"mode,omitempty"` +} + +type OpcuaAuth struct { + Username string `yaml:"username,omitempty" json:"username,omitempty"` + Password string `yaml:"password,omitempty" json:"password,omitempty"` +} + +type OpcuaCertificate struct { + Cert string `yaml:"certFile,omitempty" json:"certFile,omitempty"` + Key string `yaml:"keyFile,omitempty" json:"keyFile,omitempty"` +} + +type CustomAccessConfig string + type DeviceProperty struct { Name string `yaml:"name,omitempty" json:"name,omitempty"` Type string `yaml:"type,omitempty" json:"type,omitempty" validate:"regexp=^(int16|int32|int64|float32|float64|string|bool)?$"` diff --git a/dmcontext/context.go b/dmcontext/context.go index 1f00138d..4bf265fe 100644 --- a/dmcontext/context.go +++ b/dmcontext/context.go @@ -4,6 +4,7 @@ import ( "encoding/json" "io" "io/ioutil" + "strconv" "sync" "time" @@ -34,6 +35,18 @@ var ( ErrResponseChannelNotExist = errors.New("response channel not exist") ErrAccessConfigNotExist = errors.New("access config not exist") ErrPropsConfigNotExist = errors.New("properties config not exist") + ErrDeviceNotExist = errors.New("device not exist") + ErrTypeNotSupported = errors.New("type not supported") +) + +const ( + TypeInt16 = "int16" + TypeInt32 = "int32" + TypeInt64 = "int64" + TypeFloat32 = "float32" + TypeFloat64 = "float64" + TypeBool = "bool" + TypeString = "string" ) type DeltaCallback func(*DeviceInfo, v1.Delta) error @@ -49,8 +62,8 @@ type Context interface { Online(device *DeviceInfo) error Offline(device *DeviceInfo) error GetDriverConfig() string - GetAccessConfig() map[string]string - GetDeviceAccessConfig(device *DeviceInfo) (string, error) + GetAccessConfig() map[string]AccessConfig + GetDeviceAccessConfig(device *DeviceInfo) (*AccessConfig, error) GetPropertiesConfig() map[string][]DeviceProperty GetDevicePropertiesConfig(device *DeviceInfo) ([]DeviceProperty, error) Start() @@ -69,7 +82,7 @@ type DmCtx struct { msgChs map[string]chan *v1.Message driverConfig string propsConfig map[string][]DeviceProperty - accessConfig map[string]string + accessConfig map[string]AccessConfig } func NewContext(confFile string) Context { @@ -146,13 +159,16 @@ func (c *DmCtx) processDelta(msg *v1.Message) error { return nil } var delta v1.Delta - if err := msg.Content.Unmarshal(&delta); err != nil { + if err := msg.Content.ExactUnmarshal(&delta); err != nil { return errors.Trace(err) } dev, ok := c.devices[deviceName] if !ok { - c.log.Warn("delta callback can not find device", log.Any("device", deviceName)) - return nil + return errors.Trace(ErrDeviceNotExist) + } + delta, err := c.parsePropertyValues(deviceName, delta) + if err != nil { + return errors.Trace(err) } if err := c.deltaCb(&dev, delta); err != nil { return errors.Trace(err) @@ -182,8 +198,8 @@ func (c *DmCtx) processEvent(msg *v1.Message) error { } func (c *DmCtx) processResponse(msg *v1.Message) error { - device := msg.Metadata[KeyDevice] - val, ok := c.response.Load(device) + deviceName := msg.Metadata[KeyDevice] + val, ok := c.response.Load(deviceName) if !ok { return errors.Trace(ErrResponseChannelNotExist) } @@ -192,12 +208,21 @@ func (c *DmCtx) processResponse(msg *v1.Message) error { return errors.Trace(ErrInvalidChannel) } var shad *DeviceShadow - if err := msg.Content.Unmarshal(&shad); err != nil { + if err := msg.Content.ExactUnmarshal(&shad); err != nil { return errors.Trace(err) } if !ok { return errors.Trace(ErrInvalidMessage) } + var err error + shad.Report, err = c.parsePropertyValues(deviceName, shad.Report) + if err != nil { + return errors.Trace(err) + } + shad.Desire, err = c.parsePropertyValues(deviceName, shad.Desire) + if err != nil { + return errors.Trace(err) + } select { case ch <- shad: default: @@ -351,15 +376,15 @@ func (c *DmCtx) Offline(info *DeviceInfo) error { func (c *DmCtx) GetDriverConfig() string { return c.driverConfig } -func (c *DmCtx) GetAccessConfig() map[string]string { +func (c *DmCtx) GetAccessConfig() map[string]AccessConfig { return c.accessConfig } -func (c *DmCtx) GetDeviceAccessConfig(device *DeviceInfo) (string, error) { +func (c *DmCtx) GetDeviceAccessConfig(device *DeviceInfo) (*AccessConfig, error) { if cfg, ok := c.accessConfig[device.Name]; ok { - return cfg, nil + return &cfg, nil } else { - return "", ErrAccessConfigNotExist + return nil, ErrAccessConfigNotExist } } @@ -382,3 +407,72 @@ func unmarshalYAML(file string, out interface{}) error { } return yaml.Unmarshal(bs, out) } + +func (c *DmCtx) parsePropertyValues(devName string, props map[string]interface{}) (map[string]interface{}, error) { + res := make(map[string]interface{}) + vals, ok := c.propsConfig[devName] + if !ok { + return nil, errors.Trace(ErrDeviceNotExist) + } + cfgs := make(map[string]DeviceProperty) + for _, val := range vals { + cfgs[val.Name] = val + } + for key, val := range props { + if cfg, ok := cfgs[key]; ok { + pVal, err := parsePropertyValue(cfg.Type, val) + if err != nil { + return nil, errors.Trace(err) + } + res[key] = pVal + } else { + return nil, errors.Trace(ErrPropsConfigNotExist) + } + } + return res, nil +} + +func parsePropertyValue(tpy string, val interface{}) (interface{}, error) { + // it is json.Number (string actually) when val is number + switch tpy { + case TypeInt16: + num, _ := val.(json.Number) + i, err := strconv.ParseInt(num.String(), 10, 16) + if err != nil { + return nil, errors.Trace(err) + } + return int16(i), nil + case TypeInt32: + num, _ := val.(json.Number) + i, err := strconv.ParseInt(num.String(), 10, 32) + if err != nil { + return nil, errors.Trace(err) + } + return int32(i), nil + case TypeInt64: + num, _ := val.(json.Number) + i, err := strconv.ParseInt(num.String(), 10, 64) + if err != nil { + return nil, errors.Trace(err) + } + return i, nil + case TypeFloat32: + num, _ := val.(json.Number) + f, err := strconv.ParseFloat(num.String(), 32) + if err != nil { + return nil, errors.Trace(err) + } + return float32(f), nil + case TypeFloat64: + num, _ := val.(json.Number) + f, err := strconv.ParseFloat(num.String(), 64) + if err != nil { + return nil, errors.Trace(err) + } + return f, nil + case TypeBool, TypeString: + return val, nil + default: + return nil, errors.Trace(ErrTypeNotSupported) + } +}