Skip to content

Commit f17417f

Browse files
Merge pull request #38 from fisherxu/sync
2 parents cd19bca + b165d75 commit f17417f

File tree

5 files changed

+64
-52
lines changed

5 files changed

+64
-52
lines changed

OWNERS

+1
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,4 @@ reviewers:
1414
- kadisi
1515
- subpathdev
1616
- anyushun
17+
- Iceber

pkg/common/util/conn.go

+5-6
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,12 @@ type UnixSocket struct {
1515
}
1616

1717
// NewUnixSocket create new socket
18-
func NewUnixSocket(filename string, size ...int) *UnixSocket {
19-
size1 := 10480
20-
if size != nil {
21-
size1 = size[0]
18+
func NewUnixSocket(filename string, sizes ...int) *UnixSocket {
19+
size := 10480
20+
if len(sizes) != 0 {
21+
size = sizes[0]
2222
}
23-
us := UnixSocket{filename: filename, bufsize: size1}
24-
return &us
23+
return &UnixSocket{filename: filename, bufsize: size}
2524
}
2625

2726
func (us *UnixSocket) createServer() {

pkg/core/context/context_channel.go

+17-22
Original file line numberDiff line numberDiff line change
@@ -160,22 +160,19 @@ func (ctx *ChannelContext) SendResp(message model.Message) {
160160

161161
// SendToGroup send msg to modules. Todo: do not stuck
162162
func (ctx *ChannelContext) SendToGroup(moduleType string, message model.Message) {
163-
// avoid exception because of channel closing
164-
// TODO: need reconstruction
165-
defer func() {
166-
if exception := recover(); exception != nil {
167-
klog.Warningf("Recover when sendToGroup message, exception: %+v", exception)
168-
}
169-
}()
170-
171163
send := func(ch chan model.Message) {
164+
// avoid exception because of channel closing
165+
// TODO: need reconstruction
166+
defer func() {
167+
if exception := recover(); exception != nil {
168+
klog.Warningf("Recover when sendToGroup message, exception: %+v", exception)
169+
}
170+
}()
172171
select {
173172
case ch <- message:
174173
default:
175174
klog.Warningf("the message channel is full, message: %+v", message)
176-
select {
177-
case ch <- message:
178-
}
175+
ch <- message
179176
}
180177
}
181178
if channelList := ctx.getTypeChannel(moduleType); channelList != nil {
@@ -190,14 +187,6 @@ func (ctx *ChannelContext) SendToGroup(moduleType string, message model.Message)
190187
// SendToGroupSync : broadcast the message to echo module channel, the module send response back anon channel
191188
// check timeout and the size of anon channel
192189
func (ctx *ChannelContext) SendToGroupSync(moduleType string, message model.Message, timeout time.Duration) error {
193-
// avoid exception because of channel closing
194-
// TODO: need reconstruction
195-
defer func() {
196-
if exception := recover(); exception != nil {
197-
klog.Warningf("Recover when sendToGroupsync message, exception: %+v", exception)
198-
}
199-
}()
200-
201190
if timeout <= 0 {
202191
timeout = MessageTimeoutDefault
203192
}
@@ -242,6 +231,13 @@ func (ctx *ChannelContext) SendToGroupSync(moduleType string, message model.Mess
242231

243232
var timeoutCounter int32
244233
send := func(ch chan model.Message) {
234+
// avoid exception because of channel closing
235+
// TODO: need reconstruction
236+
defer func() {
237+
if exception := recover(); exception != nil {
238+
klog.Warningf("Recover when sendToGroupsync message, exception: %+v", exception)
239+
}
240+
}()
245241
sendTimer := time.NewTimer(time.Until(deadline))
246242
select {
247243
case ch <- message:
@@ -308,13 +304,12 @@ func (ctx *ChannelContext) addChannel(module string, moduleCh chan model.Message
308304
func (ctx *ChannelContext) delChannel(module string) {
309305
// delete module channel from channels map
310306
ctx.chsLock.Lock()
311-
_, exist := ctx.channels[module]
312-
if !exist {
307+
if _, exist := ctx.channels[module]; !exist {
308+
ctx.chsLock.Unlock()
313309
klog.Warningf("Failed to get channel, module:%s", module)
314310
return
315311
}
316312
delete(ctx.channels, module)
317-
318313
ctx.chsLock.Unlock()
319314

320315
// delete module channel from typechannels map

pkg/core/core.go

+11-12
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@ func StartModules() {
1616

1717
modules := GetModules()
1818
for name, module := range modules {
19-
//Init the module
19+
// Init the module
2020
beehiveContext.AddModule(name)
21-
//Assemble typeChannels for sendToGroup
21+
// Assemble typeChannels for sendToGroup
2222
beehiveContext.AddModuleGroup(name, module.Group())
2323
go module.Start()
2424
klog.Infof("Starting module %v", name)
@@ -30,16 +30,15 @@ func GracefulShutdown() {
3030
c := make(chan os.Signal)
3131
signal.Notify(c, syscall.SIGINT, syscall.SIGHUP, syscall.SIGTERM,
3232
syscall.SIGQUIT, syscall.SIGILL, syscall.SIGTRAP, syscall.SIGABRT)
33-
select {
34-
case s := <-c:
35-
klog.Infof("Get os signal %v", s.String())
36-
//Cleanup each modules
37-
beehiveContext.Cancel()
38-
modules := GetModules()
39-
for name, _ := range modules {
40-
klog.Infof("Cleanup module %v", name)
41-
beehiveContext.Cleanup(name)
42-
}
33+
s := <-c
34+
klog.Infof("Get os signal %v", s.String())
35+
36+
// Cleanup each modules
37+
beehiveContext.Cancel()
38+
modules := GetModules()
39+
for name := range modules {
40+
klog.Infof("Cleanup module %v", name)
41+
beehiveContext.Cleanup(name)
4342
}
4443
}
4544

pkg/core/model/message.go

+30-12
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package model
22

33
import (
4+
"encoding/json"
5+
"fmt"
46
"time"
57

68
uuid "github.com/satori/go.uuid"
@@ -15,13 +17,16 @@ const (
1517
ResponseOperation = "response"
1618
ResponseErrorOperation = "error"
1719

18-
ResourceTypePod = "pod"
19-
ResourceTypeConfigmap = "configmap"
20-
ResourceTypeSecret = "secret"
21-
ResourceTypeNode = "node"
22-
ResourceTypePodlist = "podlist"
23-
ResourceTypePodStatus = "podstatus"
24-
ResourceTypeNodeStatus = "nodestatus"
20+
ResourceTypePod = "pod"
21+
ResourceTypeConfigmap = "configmap"
22+
ResourceTypeSecret = "secret"
23+
ResourceTypeNode = "node"
24+
ResourceTypePodlist = "podlist"
25+
ResourceTypePodStatus = "podstatus"
26+
ResourceTypeNodeStatus = "nodestatus"
27+
ResourceTypeRule = "rule"
28+
ResourceTypeRuleEndpoint = "ruleendpoint"
29+
ResourceTypeRuleStatus = "rulestatus"
2530
)
2631

2732
// Message struct
@@ -118,27 +123,40 @@ func (msg *Message) GetID() string {
118123
return msg.Header.ID
119124
}
120125

121-
//GetParentID returns message parent id
126+
// GetParentID returns message parent id
122127
func (msg *Message) GetParentID() string {
123128
return msg.Header.ParentID
124129
}
125130

126-
//GetTimestamp returns message timestamp
131+
// GetTimestamp returns message timestamp
127132
func (msg *Message) GetTimestamp() int64 {
128133
return msg.Header.Timestamp
129134
}
130135

131-
//GetContent returns message content
136+
// GetContent returns message content
132137
func (msg *Message) GetContent() interface{} {
133138
return msg.Content
134139
}
135140

136-
//GetResourceVersion returns message resource version
141+
// GetContentData returns message content data
142+
func (msg *Message) GetContentData() ([]byte, error) {
143+
if data, ok := msg.Content.([]byte); ok {
144+
return data, nil
145+
}
146+
147+
data, err := json.Marshal(msg.Content)
148+
if err != nil {
149+
return nil, fmt.Errorf("marshal message content failed: %s", err)
150+
}
151+
return data, nil
152+
}
153+
154+
// GetResourceVersion returns message resource version
137155
func (msg *Message) GetResourceVersion() string {
138156
return msg.Header.ResourceVersion
139157
}
140158

141-
//UpdateID returns message object updating its ID
159+
// UpdateID returns message object updating its ID
142160
func (msg *Message) UpdateID() *Message {
143161
msg.Header.ID = uuid.NewV4().String()
144162
return msg

0 commit comments

Comments
 (0)