Skip to content

Commit fd0b4e4

Browse files
committed
mutex lock namespace and issues to prevent concurrent hook process
1 parent e9d8130 commit fd0b4e4

File tree

1 file changed

+54
-12
lines changed

1 file changed

+54
-12
lines changed

bot/bot.go

Lines changed: 54 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,12 @@ package bot
33
import (
44
"fmt"
55
"github.com/bivas/rivi/util"
6+
"github.com/patrickmn/go-cache"
67
"net/http"
78
"path/filepath"
89
"strings"
10+
"sync"
11+
"time"
912
)
1013

1114
type HandledEventResult struct {
@@ -20,6 +23,10 @@ type Bot interface {
2023
type bot struct {
2124
defaultNamespace string
2225
configurations map[string]Configuration
26+
27+
cacheLocker *sync.Mutex
28+
namespaceMutexes *cache.Cache
29+
repoIssueMutexes *cache.Cache
2330
}
2431

2532
func (b *bot) getCurrentConfiguration(namespace string) (Configuration, error) {
@@ -34,21 +41,25 @@ func (b *bot) getCurrentConfiguration(namespace string) (Configuration, error) {
3441
return configuration, nil
3542
}
3643

37-
func (b *bot) HandleEvent(r *http.Request) *HandledEventResult {
38-
39-
workingConfiguration, err := b.getCurrentConfiguration(r.URL.Query().Get("namespace"))
40-
if err != nil {
41-
return &HandledEventResult{Message: err.Error()}
42-
}
43-
data, process := buildFromRequest(workingConfiguration.GetClientConfig(), r)
44-
if !process {
45-
return &HandledEventResult{Message: "Skipping rules processing (could be not supported event type)"}
44+
func (b *bot) processRules(configuration Configuration, data EventData) *HandledEventResult {
45+
id := fmt.Sprintf("%s/%s#%d", data.GetOwner(), data.GetRepo(), data.GetNumber())
46+
util.Logger.Debug("acquire global lock during rules process")
47+
b.cacheLocker.Lock()
48+
locker, exists := b.repoIssueMutexes.Get(id)
49+
if !exists {
50+
locker = &sync.Mutex{}
51+
b.repoIssueMutexes.Set(id, locker, cache.DefaultExpiration)
4652
}
53+
util.Logger.Debug("acquire repo issue %s lock during rules process", id)
54+
locker.(*sync.Mutex).Lock()
55+
defer locker.(*sync.Mutex).Unlock()
56+
util.Logger.Debug("release global lock during rules process")
57+
b.cacheLocker.Unlock()
4758
applied := make([]Rule, 0)
4859
result := &HandledEventResult{
4960
AppliedRules: []string{},
5061
}
51-
for _, rule := range workingConfiguration.GetRules() {
62+
for _, rule := range configuration.GetRules() {
5263
if rule.Accept(data) {
5364
util.Logger.Debug("Accepting rule %s for '%s'", rule.Name(), data.GetTitle())
5465
applied = append(applied, rule)
@@ -58,14 +69,45 @@ func (b *bot) HandleEvent(r *http.Request) *HandledEventResult {
5869
for _, rule := range applied {
5970
util.Logger.Debug("Applying rule %s for '%s'", rule.Name(), data.GetTitle())
6071
for _, action := range rule.Actions() {
61-
action.Apply(workingConfiguration, data)
72+
action.Apply(configuration, data)
6273
}
6374
}
6475
return result
6576
}
6677

78+
func (b *bot) HandleEvent(r *http.Request) *HandledEventResult {
79+
namespace := r.URL.Query().Get("namespace")
80+
b.cacheLocker.Lock()
81+
util.Logger.Debug("acquire global lock during namespace process")
82+
locker, exists := b.namespaceMutexes.Get(namespace)
83+
if !exists {
84+
locker = &sync.Mutex{}
85+
b.namespaceMutexes.Set(namespace, locker, cache.DefaultExpiration)
86+
}
87+
util.Logger.Debug("acquire namespace %s lock", namespace)
88+
locker.(*sync.Mutex).Lock()
89+
util.Logger.Debug("release global lock during namespace process")
90+
b.cacheLocker.Unlock()
91+
workingConfiguration, err := b.getCurrentConfiguration(namespace)
92+
if err != nil {
93+
return &HandledEventResult{Message: err.Error()}
94+
}
95+
data, process := buildFromRequest(workingConfiguration.GetClientConfig(), r)
96+
if !process {
97+
return &HandledEventResult{Message: "Skipping rules processing (could be not supported event type)"}
98+
}
99+
util.Logger.Debug("release namespace %s lock", namespace)
100+
locker.(*sync.Mutex).Unlock()
101+
return b.processRules(workingConfiguration, data)
102+
}
103+
67104
func New(configPaths ...string) (Bot, error) {
68-
b := &bot{configurations: make(map[string]Configuration)}
105+
b := &bot{
106+
configurations: make(map[string]Configuration),
107+
cacheLocker: &sync.Mutex{},
108+
namespaceMutexes: cache.New(time.Minute, 30*time.Second),
109+
repoIssueMutexes: cache.New(time.Minute, 20*time.Second),
110+
}
69111
for index, configPath := range configPaths {
70112
baseConfigPath := filepath.Base(configPath)
71113
namespace := strings.TrimSuffix(baseConfigPath, filepath.Ext(baseConfigPath))

0 commit comments

Comments
 (0)