Skip to content

Commit 6f099c3

Browse files
committed
Improve CircuitBreaker rule loading API (#236)
1 parent 4e8f5a0 commit 6f099c3

File tree

7 files changed

+316
-102
lines changed

7 files changed

+316
-102
lines changed

core/circuitbreaker/rule.go

+5-3
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ const (
1717
ErrorRatio
1818
// ErrorCount strategy changes the circuit breaker state based on error amount
1919
ErrorCount
20+
// Last entry of strategies
21+
UnsupportedStrategy
2022
)
2123

2224
func (s Strategy) String() string {
@@ -76,16 +78,16 @@ func (r *Rule) ResourceName() string {
7678
return r.Resource
7779
}
7880

79-
func (r *Rule) isEqualsToBase(newRule *Rule) bool {
81+
func (r *Rule) equalsToBase(newRule *Rule) bool {
8082
if newRule == nil {
8183
return false
8284
}
8385
return r.Resource == newRule.Resource && r.Strategy == newRule.Strategy && r.RetryTimeoutMs == newRule.RetryTimeoutMs &&
8486
r.MinRequestAmount == newRule.MinRequestAmount && r.StatIntervalMs == newRule.StatIntervalMs
8587
}
8688

87-
func (r *Rule) isEqualsTo(newRule *Rule) bool {
88-
if !r.isEqualsToBase(newRule) {
89+
func (r *Rule) equalsTo(newRule *Rule) bool {
90+
if !r.equalsToBase(newRule) {
8991
return false
9092
}
9193

core/circuitbreaker/rule_manager.go

+69-23
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ func GetRules() []Rule {
104104

105105
// ClearRules clear all the previous rules.
106106
func ClearRules() error {
107-
_, err := LoadRules(nil)
107+
_, err, _ := LoadRules(nil)
108108
return err
109109
}
110110

@@ -114,10 +114,10 @@ func ClearRules() error {
114114
//
115115
// bool: was designed to indicate whether the internal map has been changed
116116
// error: was designed to indicate whether occurs the error.
117-
func LoadRules(rules []*Rule) (bool, error) {
118-
// TODO in order to avoid invalid update, should check consistent with last update rules
119-
err := onRuleUpdate(rules)
120-
return true, err
117+
// []*Rule: was designed to return failed rules. If there is an error, it returns input rules.
118+
func LoadRules(rules []*Rule) (bool, error, []*Rule) {
119+
ret, err, failedRules := onRuleUpdate(rules)
120+
return ret, err, failedRules
121121
}
122122

123123
func getBreakersOfResource(resource string) []CircuitBreaker {
@@ -140,7 +140,7 @@ func calculateReuseIndexFor(r *Rule, oldResCbs []CircuitBreaker) (equalIdx, reus
140140

141141
for idx, oldTc := range oldResCbs {
142142
oldRule := oldTc.BoundRule()
143-
if oldRule.isEqualsTo(r) {
143+
if oldRule.equalsTo(r) {
144144
// break if there is equivalent rule
145145
equalIdx = idx
146146
break
@@ -169,23 +169,35 @@ func insertCbToCbMap(cb CircuitBreaker, res string, m map[string][]CircuitBreake
169169
}
170170

171171
// Concurrent safe to update rules
172-
func onRuleUpdate(rules []*Rule) (err error) {
172+
func onRuleUpdate(rules []*Rule) (ret bool, err error, failedRules []*Rule) {
173+
var start uint64
174+
newBreakerRules := make(map[string][]*Rule)
175+
173176
defer func() {
174177
if r := recover(); r != nil {
178+
// Set to false since rules are not updated due to panic
175179
var ok bool
180+
181+
ret = false
176182
err, ok = r.(error)
177183
if !ok {
178184
err = fmt.Errorf("%+v", r)
179185
}
186+
failedRules = rules
187+
return
180188
}
189+
logging.Debug("Time statistics(ns) for updating circuit breaker rule", "timeCost", util.CurrentTimeNano() - start)
190+
logRuleUpdate(newBreakerRules)
181191
}()
182192

183-
newBreakerRules := make(map[string][]*Rule)
193+
// Preset slice capacity to avoid dynamic allocation
194+
failedRules = make([]*Rule, 0, len(rules))
184195
for _, rule := range rules {
185196
if rule == nil {
186197
continue
187198
}
188199
if err := IsValid(rule); err != nil {
200+
failedRules = append(failedRules, rule)
189201
logging.Warn("Ignoring invalid circuit breaking rule when loading new rules", "rule", rule, "err", err)
190202
continue
191203
}
@@ -195,26 +207,35 @@ func onRuleUpdate(rules []*Rule) (err error) {
195207
if !ok {
196208
ruleSet = make([]*Rule, 0, 1)
197209
}
210+
211+
// Deduplicate loading rules
212+
for _, cmpRule := range ruleSet {
213+
if rule.equalsTo(cmpRule) {
214+
rule = nil
215+
break
216+
}
217+
}
218+
if rule == nil {
219+
continue
220+
}
221+
198222
ruleSet = append(ruleSet, rule)
199223
newBreakerRules[classification] = ruleSet
200224
}
201225

202226
newBreakers := make(map[string][]CircuitBreaker)
203-
// in order to avoid growing, build newBreakers in advance
227+
// Preset slice capacity to avoid dynamic allocation
204228
for res, rules := range newBreakerRules {
205229
newBreakers[res] = make([]CircuitBreaker, 0, len(rules))
206230
}
231+
toAddBreakerRules := make(map[string][]*Rule)
232+
for res, rules := range toAddBreakerRules {
233+
toAddBreakerRules[res] = make([]*Rule, 0, len(rules))
234+
}
207235

208-
start := util.CurrentTimeNano()
236+
start = util.CurrentTimeNano()
209237
updateMux.Lock()
210-
defer func() {
211-
updateMux.Unlock()
212-
if r := recover(); r != nil {
213-
return
214-
}
215-
logging.Debug("Time statistics(ns) for updating circuit breaker rule", "timeCost", util.CurrentTimeNano()-start)
216-
logRuleUpdate(newBreakerRules)
217-
}()
238+
defer updateMux.Unlock()
218239

219240
for res, resRules := range newBreakerRules {
220241
emptyCircuitBreakerList := make([]CircuitBreaker, 0, 0)
@@ -227,6 +248,9 @@ func onRuleUpdate(rules []*Rule) (err error) {
227248

228249
// First check equals scenario
229250
if equalIdx >= 0 {
251+
newRuleSet := toAddBreakerRules[res]
252+
newRuleSet = append(newRuleSet, r)
253+
toAddBreakerRules[res] = newRuleSet
230254
// reuse the old cb
231255
equalOldCb := oldResCbs[equalIdx]
232256
insertCbToCbMap(equalOldCb, res, newBreakers)
@@ -237,6 +261,7 @@ func onRuleUpdate(rules []*Rule) (err error) {
237261

238262
generator := cbGenFuncMap[r.Strategy]
239263
if generator == nil {
264+
failedRules = append(failedRules, r)
240265
logging.Warn("Ignoring the rule due to unsupported circuit breaking strategy", "rule", r)
241266
continue
242267
}
@@ -249,20 +274,41 @@ func onRuleUpdate(rules []*Rule) (err error) {
249274
cb, e = generator(r, nil)
250275
}
251276
if cb == nil || e != nil {
277+
failedRules = append(failedRules, r)
252278
logging.Warn("Ignoring the rule due to bad generated circuit breaker", "rule", r, "err", e)
253279
continue
254280
}
255281

256-
if reuseStatIdx >= 0 {
257-
breakers[res] = append(oldResCbs[:reuseStatIdx], oldResCbs[reuseStatIdx+1:]...)
282+
newRuleSet, ok := toAddBreakerRules[res]
283+
if !ok {
284+
newRuleSet = make([]*Rule, 0, 1)
258285
}
286+
newRuleSet = append(newRuleSet, r)
287+
toAddBreakerRules[res] = newRuleSet
259288
insertCbToCbMap(cb, res, newBreakers)
289+
// Set to true since a new rule was just added
290+
ret = true
260291
}
261292
}
262293

263-
breakerRules = newBreakerRules
294+
// Instead of adding more new rules, the rule set could also have been reduced to less rules.
295+
// In this case, we compare the added rules with old ones on their sizes.
296+
if len(toAddBreakerRules) != len(breakerRules) {
297+
ret = true
298+
}
299+
if ret == false {
300+
for res, addRules := range toAddBreakerRules {
301+
originRules, ok := breakerRules[res]
302+
if !ok || (len(addRules) != len(originRules)) {
303+
ret = true
304+
break
305+
}
306+
}
307+
}
308+
309+
breakerRules = toAddBreakerRules
264310
breakers = newBreakers
265-
return nil
311+
return
266312
}
267313

268314
func rulesFrom(rm map[string][]*Rule) []*Rule {
@@ -338,7 +384,7 @@ func IsValid(r *Rule) error {
338384
if len(r.Resource) == 0 {
339385
return errors.New("empty resource name")
340386
}
341-
if int(r.Strategy) < int(SlowRequestRatio) || int(r.Strategy) > int(ErrorCount) {
387+
if uint(r.Strategy) >= uint(UnsupportedStrategy) {
342388
return errors.New("invalid Strategy")
343389
}
344390
if r.StatIntervalMs <= 0 {

0 commit comments

Comments
 (0)