Skip to content

Commit 1a2f110

Browse files
authored
Create WebhookBroker struct (#80)
* Create WebhookBroker struct * stop using global config in webhook.go * ticker resource leak * linter
1 parent b544d0b commit 1a2f110

File tree

8 files changed

+100
-202
lines changed

8 files changed

+100
-202
lines changed

src/broker/webhook.go

+60-58
Original file line numberDiff line numberDiff line change
@@ -21,72 +21,77 @@ import (
2121
log "github.com/sirupsen/logrus"
2222
)
2323

24+
type WebhookBroker struct {
25+
// key is the hash of topic full name and pulsar url, and subscription name
26+
webhooks map[string]chan *SubCloseSignal
27+
dbHandler db.Db
28+
l *log.Entry
29+
sync.RWMutex
30+
}
31+
2432
// SubCloseSignal is a signal object to pass for channel
2533
type SubCloseSignal struct{}
2634

27-
// key is the hash of topic full name and pulsar url, and subscription name
28-
var webhooks = make(map[string]chan *SubCloseSignal)
29-
var whLock = sync.RWMutex{}
35+
func NewWebhookBroker(config *util.Configuration) *WebhookBroker {
36+
return &WebhookBroker{
37+
dbHandler: db.NewDbWithPanic(config.PbDbType),
38+
webhooks: make(map[string]chan *SubCloseSignal),
39+
l: log.WithFields(log.Fields{"app": "webhookbroker"}),
40+
}
41+
}
3042

3143
// ReadWebhook reads a thread safe map
32-
func ReadWebhook(key string) (chan *SubCloseSignal, bool) {
33-
whLock.RLock()
34-
defer whLock.RUnlock()
35-
c, ok := webhooks[key]
44+
func (wb *WebhookBroker) ReadWebhook(key string) (chan *SubCloseSignal, bool) {
45+
wb.RLock()
46+
defer wb.RUnlock()
47+
c, ok := wb.webhooks[key]
3648
return c, ok
3749
}
3850

3951
// WriteWebhook writes a key/value to a thread safe map
40-
func WriteWebhook(key string, c chan *SubCloseSignal) {
41-
whLock.Lock()
42-
defer whLock.Unlock()
43-
webhooks[key] = c
52+
func (wb *WebhookBroker) WriteWebhook(key string, c chan *SubCloseSignal) {
53+
wb.Lock()
54+
defer wb.Unlock()
55+
wb.webhooks[key] = c
4456
}
4557

4658
// DeleteWebhook deletes a key from a thread safe map
47-
func DeleteWebhook(key string) bool {
48-
whLock.Lock()
49-
defer whLock.Unlock()
50-
if c, ok := webhooks[key]; ok {
59+
func (wb *WebhookBroker) DeleteWebhook(key string) bool {
60+
wb.Lock()
61+
defer wb.Unlock()
62+
if c, ok := wb.webhooks[key]; ok {
5163
c <- &SubCloseSignal{}
52-
delete(webhooks, key)
64+
delete(wb.webhooks, key)
5365
//channel is deleted where it's been created with `defer`
5466
return ok
5567
}
5668
return false
5769
}
5870

59-
var singleDb db.Db
60-
6171
// Init initializes webhook configuration database
62-
func Init() {
63-
NewDbHandler()
64-
durationStr := util.AssignString(util.GetConfig().PbDbInterval, "180s")
72+
func Init(config *util.Configuration) {
73+
svr := NewWebhookBroker(config)
74+
durationStr := util.AssignString(config.PbDbInterval, "180s")
6575
duration, err := time.ParseDuration(durationStr)
6676
if err != nil {
67-
log.Errorf("specified duration %s error %v", durationStr, err)
77+
svr.l.Errorf("specified duration %s error %v", durationStr, err)
6878
duration, _ = time.ParseDuration("180s")
6979
}
70-
log.Warnf("beam database pull every %.0f seconds", duration.Seconds())
80+
svr.l.Infof("beam database pull every %.0f seconds", duration.Seconds())
7181

7282
go func() {
73-
run()
83+
svr.run()
7484
ticker := time.NewTicker(duration)
85+
defer ticker.Stop()
7586
for {
7687
select {
7788
case <-ticker.C:
78-
run()
89+
svr.run()
7990
}
8091
}
8192
}()
8293
}
8394

84-
// NewDbHandler gets a local copy of Db handler
85-
func NewDbHandler() {
86-
log.Infof("webhook database init...")
87-
singleDb = db.NewDbWithPanic(util.GetConfig().PbDbType)
88-
}
89-
9095
// pushWebhook sends data to a webhook interface
9196
func pushWebhook(url string, data []byte, headers []string) (int, *http.Response) {
9297

@@ -163,7 +168,7 @@ func pushAndAck(c pulsar.Consumer, msg pulsar.Message, url string, data []byte,
163168

164169
// ConsumeLoop consumes data from Pulsar topic
165170
// Do not use context since go vet will puke that requires cancel invoked in the same function
166-
func ConsumeLoop(url, token, topic, subscriptionKey string, whCfg model.WebhookConfig) error {
171+
func (wb *WebhookBroker) ConsumeLoop(url, token, topic, subscriptionKey string, whCfg model.WebhookConfig) error {
167172
headers := whCfg.Headers
168173
_, err := model.GetSubscriptionType(whCfg.SubscriptionType)
169174
if err != nil {
@@ -175,11 +180,11 @@ func ConsumeLoop(url, token, topic, subscriptionKey string, whCfg model.WebhookC
175180
}
176181
c, err := pulsardriver.GetPulsarConsumer(url, token, topic, whCfg.Subscription, whCfg.InitialPosition, whCfg.SubscriptionType, subscriptionKey)
177182
if err != nil {
178-
return fmt.Errorf("Failed to create Pulsar subscription %v", err)
183+
return fmt.Errorf("failed to create Pulsar subscription %v", err)
179184
}
180185

181186
terminate := make(chan *SubCloseSignal, 2)
182-
WriteWebhook(subscriptionKey, terminate)
187+
wb.WriteWebhook(subscriptionKey, terminate)
183188
defer close(terminate)
184189
ctx := context.Background()
185190

@@ -189,18 +194,18 @@ func ConsumeLoop(url, token, topic, subscriptionKey string, whCfg model.WebhookC
189194
retryMax := 3
190195
for {
191196
if retry > retryMax {
192-
cancelConsumer(subscriptionKey)
197+
wb.cancelConsumer(subscriptionKey)
193198
return fmt.Errorf("consumer retried %d times, max reached", retryMax)
194199
}
195200
msg, err := c.Receive(ctx)
196201
if err != nil {
197-
log.Infof("error from consumer loop receive: %v\n", err)
202+
wb.l.Infof("error from consumer loop receive: %v\n", err)
198203
retry++
199204
ticker := time.NewTicker(time.Duration(2*retry) * time.Second)
200205
defer ticker.Stop()
201206
select {
202207
case <-terminate:
203-
log.Infof("subscription %s received signal to exit consumer loop", subscriptionKey)
208+
wb.l.Infof("subscription %s received signal to exit consumer loop", subscriptionKey)
204209
return nil
205210
case <-ticker.C:
206211
//reconnect after error
@@ -211,8 +216,8 @@ func ConsumeLoop(url, token, topic, subscriptionKey string, whCfg model.WebhookC
211216
}
212217
} else if msg != nil {
213218
retry = 0
214-
if log.GetLevel() == log.DebugLevel {
215-
log.Debugf("PulsarMessageId:%#v", msg.ID())
219+
if wb.l.Level == log.DebugLevel {
220+
wb.l.Debugf("PulsarMessageId:%v", msg.ID())
216221
}
217222
headers = append(headers, fmt.Sprintf("PulsarMessageId:%#v", msg.ID()))
218223
headers = append(headers, "PulsarPublishedTime:"+msg.PublishTime().String())
@@ -229,61 +234,58 @@ func ConsumeLoop(url, token, topic, subscriptionKey string, whCfg model.WebhookC
229234
if json.Valid(data) {
230235
headers = append(headers, "content-type:application/json")
231236
}
232-
if log.GetLevel() == log.DebugLevel {
233-
log.Debug(string(data))
234-
}
235237
pushAndAck(c, msg, whCfg.URL, data, headers)
236238
}
237239
}
238240

239241
}
240242

241-
func run() {
243+
func (wb *WebhookBroker) run() {
242244
// key is hash of topic name and pulsar url, and subscription name
243245
subscriptionSet := make(map[string]bool)
244246

245-
for _, cfg := range LoadConfig() {
247+
for _, cfg := range wb.LoadConfig() {
246248
for _, whCfg := range cfg.Webhooks {
247249
topic := cfg.TopicFullName
248250
token := cfg.Token
249251
url := cfg.PulsarURL
250252
subscriptionKey := cfg.Key + whCfg.URL
251253
status := whCfg.WebhookStatus
252-
_, ok := ReadWebhook(subscriptionKey)
254+
_, ok := wb.ReadWebhook(subscriptionKey)
253255
if status == model.Activated {
254256
subscriptionSet[subscriptionKey] = true
255257
if !ok {
256-
log.Infof("start activated webhook for topic subscription %v", subscriptionKey)
257-
go ConsumeLoop(url, token, topic, subscriptionKey, whCfg)
258+
wb.l.Infof("start activated webhook for topic subscription %v", subscriptionKey)
259+
go wb.ConsumeLoop(url, token, topic, subscriptionKey, whCfg)
258260
}
259261
}
260262
}
261263
}
262264

263265
// cancel any webhook which is no longer required to be activated by the database
264-
for k := range webhooks {
265-
if subscriptionSet[k] != true {
266-
log.Infof("cancel webhook consumer subscription key %s", k)
267-
cancelConsumer(k)
266+
for k := range wb.webhooks {
267+
if !subscriptionSet[k] {
268+
wb.l.Infof("cancel webhook consumer subscription key %s", k)
269+
wb.cancelConsumer(k)
268270
}
269271
}
270-
log.Infof("load webhooks size %d", len(webhooks))
272+
wb.l.Infof("load webhooks size %d", len(wb.webhooks))
271273
}
272274

273275
// LoadConfig loads the entire topic documents from the database
274-
func LoadConfig() []*model.TopicConfig {
275-
cfgs, err := singleDb.Load()
276+
func (wb *WebhookBroker) LoadConfig() []*model.TopicConfig {
277+
cfgs, err := wb.dbHandler.Load()
276278
if err != nil {
277-
log.Errorf("failed to load topics from database error %v", err.Error())
279+
wb.l.Errorf("failed to load topics from database error %v", err.Error())
278280
}
279281

280282
return cfgs
281283
}
282284

283-
func cancelConsumer(key string) error {
284-
ok := DeleteWebhook(key)
285+
func (wb *WebhookBroker) cancelConsumer(key string) error {
286+
ok := wb.DeleteWebhook(key)
285287
if ok {
286-
log.Infof("cancel consumer %v", key)
288+
wb.l.Infof("cancel consumer %v", key)
287289
pulsardriver.CancelPulsarConsumer(key)
288290
return nil
289291
}

src/main.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ func main() {
2828
}
2929

3030
exit := make(chan bool) // future use to exit the main program if in broker only mode
31-
util.Init()
31+
config := util.Init()
3232

3333
flag.Parse()
3434
log.Warnf("start server mode %s", mode)
@@ -37,7 +37,7 @@ func main() {
3737
}
3838

3939
if util.IsBrokerRequired(&mode) {
40-
broker.Init()
40+
broker.Init(config)
4141
}
4242
if util.IsHTTPRouterRequired(&mode) {
4343
route.Init()

src/route/router_test.go

+19
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package route
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/assert"
7+
)
8+
9+
func TestEffectiveRoutes(t *testing.T) {
10+
receiverRoutesLen := len(ReceiverRoutes)
11+
restRoutesLen := len(RestRoutes)
12+
prometheusLen := len(PrometheusRoute)
13+
// mode := "hybrid"
14+
// assert.Equal(t, len(GetEffectiveRoutes(&mode)), (receiverRoutesLen + restRoutesLen + prometheusLen))
15+
mode := "rest"
16+
assert.Equal(t, len(GetEffectiveRoutes(&mode)), (receiverRoutesLen + restRoutesLen + prometheusLen))
17+
mode = "receiver"
18+
assert.Equal(t, len(GetEffectiveRoutes(&mode)), (receiverRoutesLen + restRoutesLen + prometheusLen))
19+
}

0 commit comments

Comments
 (0)