Skip to content

Commit

Permalink
bugfix of event process (#231)
Browse files Browse the repository at this point in the history
  • Loading branch information
xiejingru authored Feb 1, 2021
1 parent c5e8c07 commit 170bd56
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 17 deletions.
52 changes: 39 additions & 13 deletions dmcontext/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const (
OnlineStatus = "online"
OfflineStatus = "offline"
TypeReportEvent = "report"
KeySysExtConf = "BAETYL_SYSTEM_EXT_CONF"
)

var (
Expand All @@ -31,11 +32,36 @@ var (
ErrResponseChannelNotExist = errors.New("response channel not exist")
)

type DevicePropConfigs struct {
Name string `yaml:"name,omitempty" json:"name,omitempty"`
PropConfigs map[string]string `yaml:"propConfigs,omitempty" json:"propConfigs,omitempty"`
type DeviceProperty struct {
Name string `json:"name,omitempty"`
Type string `json:"type,omitempty" validate:"regexp=^(int16|int32|int64|float32|float64|string|bool)?$"`
Mode string `json:"mode,omitempty" validate:"regexp=^(ro|rw)?$"`
Visitor PropertyVisitor `json:"visitor,omitempty"`
}

type PropertyVisitor struct {
Modbus *ModbusVisitor `json:"modbus,omitempty"`
Opcua *OpcuaVisitor `json:"opcua,omitempty"`
Custom *CustomVisitor `json:"custom,omitempty"`
}

type ModbusVisitor struct {
Function byte `json:"function" validate:"min=1,max=4"`
Address string `json:"address"`
Quantity uint16 `json:"quantity"`
Type string `json:"type,omitempty" validate:"regexp=^(int16|int32|int64|float32|float64|string|bool)?$"`
Scale float64 `json:"scale"`
SwapByte bool `json:"swapByte"`
SwapRegister bool `json:"swapRegister"`
}

type OpcuaVisitor struct {
NodeID string `json:"nodeid,omitempty"`
Type string `json:"type,omitempty" validate:"regexp=^(int16|int32|int64|float32|float64|string|bool)?$"`
}

type CustomVisitor string

type Event struct {
Type string `yaml:"type,omitempty" json:"type,omitempty"`
Payload interface{} `yaml:"payload,omitempty" json:"payload,omitempty"`
Expand All @@ -62,7 +88,7 @@ type Context interface {
Online(info *DeviceInfo) error
Offline(info *DeviceInfo) error
GetDeviceAccessConfig() (string, error)
GetDevicePropConfigs() (map[string]DevicePropConfigs, error)
GetDevicePropConfigs() (map[string][]DeviceProperty, error)
Start()
io.Closer
}
Expand All @@ -75,7 +101,7 @@ type DmCtx struct {
eventCb EventCallback
deltaCb DeltaCallback
response sync.Map
msgs map[string]chan *v1.Message
msgChs map[string]chan *v1.Message
}

func NewContext(confFile string) Context {
Expand All @@ -98,7 +124,7 @@ func NewContext(confFile string) Context {
c.log.Error("failed to load system config, to use default config", log.Error(err))
utils.UnmarshalYAML(nil, sc)
}
c.Store(context.KeySysConf, sc)
c.Store(KeySysExtConf, sc)

var subs []mqtt2.QOSTopic
for _, dev := range sc.Devices {
Expand All @@ -109,8 +135,8 @@ func NewContext(confFile string) Context {
c.log.Warn("fail to create system broker client", log.Any("error", err))
}
c.mqtt = mqtt
c.msgs = make(map[string]chan *v1.Message, 1024)
if err := c.mqtt.Start(newObserver(c.msgs, c.log)); err != nil {
c.msgChs = make(map[string]chan *v1.Message)
if err := c.mqtt.Start(newObserver(c.msgChs, c.log)); err != nil {
c.log.Warn("fail to start mqtt client", log.Any("error", err))
}
return c
Expand All @@ -119,8 +145,8 @@ func NewContext(confFile string) Context {
func (c *DmCtx) Start() {
devices := c.SystemConfigExt().Devices
for _, dev := range devices {
c.msgs[dev.Name] = make(chan *v1.Message)
go c.processing(c.msgs[dev.Name])
c.msgChs[dev.Name] = make(chan *v1.Message, 1024)
go c.processing(c.msgChs[dev.Name])
}
}

Expand Down Expand Up @@ -222,7 +248,7 @@ func (c *DmCtx) processing(ch chan *v1.Message) {
}

func (c *DmCtx) SystemConfigExt() *SystemConfig {
v, ok := c.Load(context.KeySysConf)
v, ok := c.Load(KeySysExtConf)
if !ok {
return nil
}
Expand Down Expand Up @@ -343,8 +369,8 @@ func (c *DmCtx) GetDeviceAccessConfig() (string, error) {
return string(res), nil
}

func (c *DmCtx) GetDevicePropConfigs() (map[string]DevicePropConfigs, error) {
var res map[string]DevicePropConfigs
func (c *DmCtx) GetDevicePropConfigs() (map[string][]DeviceProperty, error) {
var res map[string][]DeviceProperty
if err := c.LoadCustomConfig(&res, DefaultPropsConf); err != nil {
return nil, errors.Trace(err)
}
Expand Down
8 changes: 4 additions & 4 deletions dmcontext/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ const (

type observer struct {
log *log.Logger
msgs map[string]chan *v1.Message
msgChs map[string]chan *v1.Message
}

func newObserver(msgs map[string]chan *v1.Message, log *log.Logger) mqtt.Observer {
return &observer{msgs: msgs, log: log}
func newObserver(msgChs map[string]chan *v1.Message, log *log.Logger) mqtt.Observer {
return &observer{msgChs: msgChs, log: log}
}

func ParseTopic(topic string) (string, error) {
Expand All @@ -49,7 +49,7 @@ func (o *observer) OnPublish(pkt *packet.Publish) error {
log.Any("payload", string(pkt.Message.Payload)))
return nil
}
if ch, ok := o.msgs[device]; ok {
if ch, ok := o.msgChs[device]; ok {
select {
case ch <- &msg:
default:
Expand Down
43 changes: 43 additions & 0 deletions spec/v1/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,49 @@ func patch(doc, delta map[string]interface{}) (map[string]interface{}, error) {
return newDoc, nil
}

func getDeviceInfos(data map[string]interface{}) []DeviceInfo {
if data == nil {
return nil
}
devs, ok := data[KeyDevices]
if !ok || devs == nil {
return nil
}
res, ok := devs.([]DeviceInfo)
if ok {
return res
}
res = []DeviceInfo{}
dis, ok := devs.([]interface{})
if !ok {
return nil
}
for _, di := range dis {
dim := di.(map[string]interface{})
if dim == nil {
return nil
}
res = append(res, DeviceInfo{Name: dim["name"].(string), Version: dim["version"].(string)})
}
return res
}

func (r Report) DeviceInfos() []DeviceInfo {
return getDeviceInfos(r)
}

func (r Report) SetDeviceInfos(devs []DeviceInfo) {
r[KeyDevices] = devs
}

func (d Desire) DeviceInfos() []DeviceInfo {
return getDeviceInfos(d)
}

func (d Desire) SetDeviceInfos(devs []DeviceInfo) {
d[KeyDevices] = devs
}

func (r Report) AppInfos(isSys bool) []AppInfo {
if isSys {
return getAppInfos(KeySysApps, r)
Expand Down
5 changes: 5 additions & 0 deletions spec/v1/sync_report.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ type NodeStats struct {
Extension interface{} `yaml:"extension,omitempty" json:"extension,omitempty"`
}

type DeviceInfo struct {
Name string `yaml:"name,omitempty" json:"name,omitempty"`
Version string `yaml:"version,omitempty" json:"version,omitempty"`
}

// AppInfo app info
type AppInfo struct {
Name string `yaml:"name,omitempty" json:"name,omitempty"`
Expand Down

0 comments on commit 170bd56

Please sign in to comment.