Skip to content

Commit b8d94d1

Browse files
authored
[feature] add outlier ejection (#579)
* Add basic outlier ejection * Add half-open nodes to token result * Removed map of nodeID to IP address * Add outlier ejection examples * Fix go-micro outlier adapter * Fix kitex outlier adapter * Fix kratos outlier adapter * Move outlier ejection examples * Fix bugs of outlier recycler * Fix OnCompleted of outlier MetricStatSlot * Add unit tests for outlier ejection * Add unit tests for rule_manager.go * Fix bugs of outlier unit tests * Fix data race of outlier unit tests * Update go version * Fix bugs of micro adapter
1 parent f13247d commit b8d94d1

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+8134
-454
lines changed

api/tracer.go

+15-1
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,10 @@
1515
package api
1616

1717
import (
18+
"github.com/pkg/errors"
19+
1820
"github.com/alibaba/sentinel-golang/core/base"
1921
"github.com/alibaba/sentinel-golang/logging"
20-
"github.com/pkg/errors"
2122
)
2223

2324
// TraceError records the provided error to the given SentinelEntry.
@@ -34,3 +35,16 @@ func TraceError(entry *base.SentinelEntry, err error) {
3435

3536
entry.SetError(err)
3637
}
38+
39+
func TraceCallee(entry *base.SentinelEntry, address string) {
40+
defer func() {
41+
if e := recover(); e != nil {
42+
logging.Error(errors.Errorf("%+v", e), "Failed to api.TraceCallee()")
43+
return
44+
}
45+
}()
46+
if entry == nil || address == "" {
47+
return
48+
}
49+
entry.SetPair("address", address)
50+
}

core/base/context.go

+16
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,22 @@ func (ctx *EntryContext) Rt() uint64 {
7575
return ctx.rt
7676
}
7777

78+
func (ctx *EntryContext) FilterNodes() []string {
79+
return ctx.RuleCheckResult.FilterNodes()
80+
}
81+
82+
func (ctx *EntryContext) HalfOpenNodes() []string {
83+
return ctx.RuleCheckResult.HalfOpenNodes()
84+
}
85+
86+
func (ctx *EntryContext) SetPair(key, val interface{}) {
87+
ctx.Data[key] = val
88+
}
89+
90+
func (ctx *EntryContext) GetPair(key interface{}) interface{} {
91+
return ctx.Data[key]
92+
}
93+
7894
func NewEmptyEntryContext() *EntryContext {
7995
return &EntryContext{}
8096
}

core/base/entry.go

+8-1
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,9 @@ package base
1717
import (
1818
"sync"
1919

20-
"github.com/alibaba/sentinel-golang/logging"
2120
"github.com/pkg/errors"
21+
22+
"github.com/alibaba/sentinel-golang/logging"
2223
)
2324

2425
type ExitHandler func(entry *SentinelEntry, ctx *EntryContext) error
@@ -55,6 +56,12 @@ func (e *SentinelEntry) SetError(err error) {
5556
}
5657
}
5758

59+
func (e *SentinelEntry) SetPair(key, val interface{}) {
60+
if e.ctx != nil {
61+
e.ctx.SetPair(key, val)
62+
}
63+
}
64+
5865
func (e *SentinelEntry) Context() *EntryContext {
5966
return e.ctx
6067
}

core/base/result.go

+20-2
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,10 @@ func (s TokenResultStatus) String() string {
8484
type TokenResult struct {
8585
status TokenResultStatus
8686

87-
blockErr *BlockError
88-
nanosToWait time.Duration
87+
blockErr *BlockError
88+
nanosToWait time.Duration
89+
filterNodes []string
90+
halfOpenNodes []string
8991
}
9092

9193
func (r *TokenResult) DeepCopyFrom(newResult *TokenResult) {
@@ -154,6 +156,22 @@ func (r *TokenResult) NanosToWait() time.Duration {
154156
return r.nanosToWait
155157
}
156158

159+
func (r *TokenResult) FilterNodes() []string {
160+
return r.filterNodes
161+
}
162+
163+
func (r *TokenResult) HalfOpenNodes() []string {
164+
return r.halfOpenNodes
165+
}
166+
167+
func (r *TokenResult) SetFilterNodes(nodes []string) {
168+
r.filterNodes = nodes
169+
}
170+
171+
func (r *TokenResult) SetHalfOpenNodes(nodes []string) {
172+
r.halfOpenNodes = nodes
173+
}
174+
157175
func (r *TokenResult) String() string {
158176
var blockMsg string
159177
if r.blockErr == nil {

core/circuitbreaker/rule_manager.go

+11-10
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,10 @@ import (
1919
"reflect"
2020
"sync"
2121

22+
"github.com/pkg/errors"
23+
2224
"github.com/alibaba/sentinel-golang/logging"
2325
"github.com/alibaba/sentinel-golang/util"
24-
"github.com/pkg/errors"
2526
)
2627

2728
type CircuitBreakerGenFunc func(r *Rule, reuseStat interface{}) (CircuitBreaker, error)
@@ -262,7 +263,7 @@ func onRuleUpdate(rawResRulesMap map[string][]*Rule) (err error) {
262263

263264
newBreakers := make(map[string][]CircuitBreaker, len(validResRulesMap))
264265
for res, resRules := range validResRulesMap {
265-
newCbsOfRes := buildResourceCircuitBreaker(res, resRules, breakersClone[res])
266+
newCbsOfRes := BuildResourceCircuitBreaker(res, resRules, breakersClone[res])
266267
if len(newCbsOfRes) > 0 {
267268
newBreakers[res] = newCbsOfRes
268269
}
@@ -275,7 +276,7 @@ func onRuleUpdate(rawResRulesMap map[string][]*Rule) (err error) {
275276
currentRules = rawResRulesMap
276277

277278
logging.Debug("[CircuitBreaker onRuleUpdate] Time statistics(ns) for updating circuit breaker rule", "timeCost", util.CurrentTimeNano()-start)
278-
logRuleUpdate(validResRulesMap)
279+
LogRuleUpdate(validResRulesMap)
279280
return nil
280281
}
281282

@@ -305,7 +306,7 @@ func onResourceRuleUpdate(res string, rawResRules []*Rule) (err error) {
305306
oldResCbs = append(oldResCbs, breakers[res]...)
306307
updateMux.RUnlock()
307308

308-
newCbsOfRes := buildResourceCircuitBreaker(res, rawResRules, oldResCbs)
309+
newCbsOfRes := BuildResourceCircuitBreaker(res, rawResRules, oldResCbs)
309310

310311
updateMux.Lock()
311312
if len(newCbsOfRes) == 0 {
@@ -341,7 +342,7 @@ func rulesFrom(rm map[string][]*Rule) []*Rule {
341342
return rules
342343
}
343344

344-
func logRuleUpdate(m map[string][]*Rule) {
345+
func LogRuleUpdate(m map[string][]*Rule) {
345346
rs := rulesFrom(m)
346347
if len(rs) == 0 {
347348
logging.Info("[CircuitBreakerRuleManager] Circuit breaking rules were cleared")
@@ -399,12 +400,12 @@ func ClearRulesOfResource(res string) error {
399400
return err
400401
}
401402

402-
// buildResourceCircuitBreaker builds CircuitBreaker slice from rules. the resource of rules must be equals to res
403-
func buildResourceCircuitBreaker(res string, rulesOfRes []*Rule, oldResCbs []CircuitBreaker) []CircuitBreaker {
403+
// BuildResourceCircuitBreaker builds CircuitBreaker slice from rules. the resource of rules must be equals to res
404+
func BuildResourceCircuitBreaker(res string, rulesOfRes []*Rule, oldResCbs []CircuitBreaker) []CircuitBreaker {
404405
newCbsOfRes := make([]CircuitBreaker, 0, len(rulesOfRes))
405406
for _, r := range rulesOfRes {
406407
if res != r.Resource {
407-
logging.Error(errors.Errorf("unmatched resource name expect: %s, actual: %s", res, r.Resource), "Unmatched resource name in circuitBreaker.buildResourceCircuitBreaker()", "rule", r)
408+
logging.Error(errors.Errorf("unmatched resource name expect: %s, actual: %s", res, r.Resource), "Unmatched resource name in circuitBreaker.BuildResourceCircuitBreaker()", "rule", r)
408409
continue
409410
}
410411
equalIdx, reuseStatIdx := calculateReuseIndexFor(r, oldResCbs)
@@ -421,7 +422,7 @@ func buildResourceCircuitBreaker(res string, rulesOfRes []*Rule, oldResCbs []Cir
421422

422423
generator := cbGenFuncMap[r.Strategy]
423424
if generator == nil {
424-
logging.Warn("[CircuitBreaker buildResourceCircuitBreaker] Ignoring the rule due to unsupported circuit breaking strategy", "rule", r)
425+
logging.Warn("[CircuitBreaker BuildResourceCircuitBreaker] Ignoring the rule due to unsupported circuit breaking strategy", "rule", r)
425426
continue
426427
}
427428

@@ -433,7 +434,7 @@ func buildResourceCircuitBreaker(res string, rulesOfRes []*Rule, oldResCbs []Cir
433434
cb, e = generator(r, nil)
434435
}
435436
if cb == nil || e != nil {
436-
logging.Warn("[CircuitBreaker buildResourceCircuitBreaker] Ignoring the rule due to bad generated circuit breaker", "rule", r, "err", e.Error())
437+
logging.Warn("[CircuitBreaker BuildResourceCircuitBreaker] Ignoring the rule due to bad generated circuit breaker", "rule", r, "err", e.Error())
437438
continue
438439
}
439440

core/outlier/recycler.go

+114
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
// Copyright 1999-2020 Alibaba Group Holding Ltd.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package outlier
16+
17+
import (
18+
"errors"
19+
"fmt"
20+
"sync"
21+
"time"
22+
23+
"github.com/alibaba/sentinel-golang/logging"
24+
)
25+
26+
const capacity = 200
27+
28+
var (
29+
// resource name ---> node recycler
30+
recyclers = make(map[string]*Recycler)
31+
recyclerMutex = new(sync.Mutex)
32+
recyclerCh = make(chan task, capacity)
33+
)
34+
35+
type task struct {
36+
nodes []string
37+
resource string
38+
}
39+
40+
func init() {
41+
go func() {
42+
defer func() {
43+
if err := recover(); err != nil {
44+
logging.Error(fmt.Errorf("%+v", err), "Unexpected panic when consuming recyclerCh")
45+
}
46+
}()
47+
for task := range recyclerCh {
48+
recycler := getRecyclerOfResource(task.resource)
49+
recycler.scheduleNodes(task.nodes)
50+
}
51+
}()
52+
}
53+
54+
// Recycler recycles node instance that have been invalidated for a long time
55+
type Recycler struct {
56+
resource string
57+
interval time.Duration
58+
status map[string]bool
59+
mtx sync.Mutex
60+
}
61+
62+
func getRecyclerOfResource(resource string) *Recycler {
63+
recyclerMutex.Lock()
64+
defer recyclerMutex.Unlock()
65+
if _, ok := recyclers[resource]; !ok {
66+
recycler := &Recycler{
67+
resource: resource,
68+
status: make(map[string]bool),
69+
}
70+
rule := getOutlierRuleOfResource(resource)
71+
if rule == nil {
72+
logging.Error(errors.New("nil outlier rule"), "Nil outlier rule in getRecyclerOfResource()")
73+
} else {
74+
if rule.RecycleIntervalS == 0 {
75+
recycler.interval = 10 * time.Minute
76+
} else {
77+
recycler.interval = time.Duration(rule.RecycleIntervalS * 1e9)
78+
}
79+
}
80+
recyclers[resource] = recycler
81+
}
82+
return recyclers[resource]
83+
}
84+
85+
func (r *Recycler) scheduleNodes(nodes []string) {
86+
r.mtx.Lock()
87+
defer r.mtx.Unlock()
88+
for _, node := range nodes {
89+
if _, ok := r.status[node]; !ok {
90+
r.status[node] = false
91+
nodeCopy := node // Copy values to correctly capture the closure for node.
92+
time.AfterFunc(r.interval, func() {
93+
r.recycle(nodeCopy)
94+
})
95+
}
96+
}
97+
}
98+
99+
func (r *Recycler) recover(node string) {
100+
r.mtx.Lock()
101+
defer r.mtx.Unlock()
102+
if _, ok := r.status[node]; ok {
103+
r.status[node] = true
104+
}
105+
}
106+
107+
func (r *Recycler) recycle(node string) {
108+
r.mtx.Lock()
109+
defer r.mtx.Unlock()
110+
if v, ok := r.status[node]; ok && !v {
111+
deleteNodeBreakerOfResource(r.resource, node)
112+
}
113+
delete(r.status, node)
114+
}

0 commit comments

Comments
 (0)