Skip to content

Commit c24f08a

Browse files
Add CheckReadiness method and set defaultCheckScIntervalInSecond=15 (#75)
* Add CheckReadiness method and set defaultCheckScIntervalInSecond=15
1 parent b02623c commit c24f08a

File tree

2 files changed

+199
-11
lines changed

2 files changed

+199
-11
lines changed

Diff for: addresspool/pool.go

+55-1
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,16 @@ const (
2424
statusAvailable string = "available"
2525
statusUnavailable string = "unavailable"
2626

27-
defaultCheckScIntervalInSecond = 25 // default sc instance health check interval in second
27+
defaultCheckScIntervalInSecond = 15 // default sc instance health check interval in second
2828
healthProbeTimeout = time.Second
2929
)
3030

31+
const (
32+
ReadinessSuccess = 0 // 连续两次成功
33+
ReadinessFailed = 1 // 连续三次失败
34+
ReadinessIndeterminate = 2 // 其他状态
35+
)
36+
3137
type HttpProbeOptions struct {
3238
Client *httpclient.Requests
3339
Protocol string
@@ -52,6 +58,7 @@ type Pool struct {
5258
quit chan struct{}
5359
onceQuit sync.Once
5460
httpProbeOptions *HttpProbeOptions
61+
statusHistory []map[string]string
5562
}
5663

5764
func (p *Pool) Close() {
@@ -65,6 +72,7 @@ func NewPool(addresses []string, opts ...Options) *Pool {
6572
p := &Pool{
6673
defaultAddress: removeDuplicates(addresses),
6774
status: make(map[string]string),
75+
statusHistory: make([]map[string]string, 0, 4),
6876
}
6977

7078
if len(opts) > 0 && opts[0].HttpProbeOptions != nil {
@@ -85,6 +93,7 @@ func (p *Pool) ResetAddress(addresses []string) {
8593
p.diffAzAddress = []string{}
8694
p.sameAzAddress = []string{}
8795
p.status = make(map[string]string)
96+
p.statusHistory = make([]map[string]string, 0, 4)
8897
}
8998

9099
func (p *Pool) SetAddressByInstances(instances []*discovery.MicroServiceInstance) error {
@@ -166,6 +175,45 @@ func (p *Pool) filterAvailableAddress(addresses []string) []string {
166175
return result
167176
}
168177

178+
func (p *Pool) CheckReadiness() int {
179+
p.mutex.RLock()
180+
defer p.mutex.RUnlock()
181+
statusHistory := p.statusHistory
182+
183+
if len(statusHistory) < 2 {
184+
return ReadinessIndeterminate
185+
}
186+
187+
successCnt := 0
188+
failedCnt := 0
189+
for _, status := range statusHistory {
190+
if !existAvailableEndpointInStatus(status) {
191+
successCnt = 0
192+
failedCnt++
193+
continue
194+
}
195+
successCnt++
196+
failedCnt = 0
197+
}
198+
if successCnt >= 2 {
199+
return ReadinessSuccess
200+
}
201+
if failedCnt == 3 {
202+
return ReadinessFailed
203+
}
204+
205+
return ReadinessIndeterminate
206+
}
207+
208+
func existAvailableEndpointInStatus(status map[string]string) bool {
209+
for _, v := range status {
210+
if v == statusAvailable {
211+
return true
212+
}
213+
}
214+
return false
215+
}
216+
169217
func (p *Pool) checkConnectivity() {
170218
toCheckedAddressList := make([]string, 0, len(p.defaultAddress)+len(p.sameAzAddress)+len(p.diffAzAddress))
171219
toCheckedAddressList = append(toCheckedAddressList, p.defaultAddress...)
@@ -188,6 +236,11 @@ func (p *Pool) checkConnectivity() {
188236

189237
p.mutex.Lock()
190238
p.status = status
239+
p.statusHistory = append(p.statusHistory, status)
240+
cnt := len(p.statusHistory)
241+
if cnt > 3 {
242+
p.statusHistory = p.statusHistory[(cnt - 3):]
243+
}
191244
p.mutex.Unlock()
192245
}
193246

@@ -256,6 +309,7 @@ func (p *Pool) monitor() {
256309
ticker := time.NewTicker(interval * time.Second)
257310
p.quit = make(chan struct{})
258311

312+
p.checkConnectivity()
259313
go func() {
260314
for {
261315
select {

Diff for: addresspool/pool_test.go

+144-10
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"net/http"
55
"net/http/httptest"
66
"os"
7+
"sync"
78
"testing"
89
"time"
910

@@ -27,10 +28,7 @@ func TestNewPool(t *testing.T) {
2728
assert.Equal(t, defaultAddr, addr)
2829

2930
// check monitor started
30-
assert.NotEqual(t, statusAvailable, pool.status[defaultAddr]) // unavailable by default
31-
32-
time.Sleep(2*time.Second + 100*time.Millisecond)
33-
assert.Equal(t, statusAvailable, pool.status[defaultAddr])
31+
assert.Equal(t, statusAvailable, pool.status[defaultAddr]) // available by default
3432

3533
mockHttpServer.Close()
3634
time.Sleep(2*time.Second + 100*time.Millisecond)
@@ -176,12 +174,6 @@ func TestAddressPool_checkConnectivity(t *testing.T) {
176174
// init, all address is available
177175
defaultAddr := "127.0.0.1:30000"
178176
p := NewPool([]string{defaultAddr, server1Addr, server2Addr})
179-
assert.NotEqual(t, statusAvailable, p.status[defaultAddr])
180-
assert.NotEqual(t, statusAvailable, p.status[server1Addr])
181-
assert.NotEqual(t, statusAvailable, p.status[server2Addr])
182-
183-
// check connectivity, default address status should be unavailable, as it is fake
184-
p.checkConnectivity()
185177
assert.Equal(t, statusUnavailable, p.status[defaultAddr])
186178
assert.Equal(t, statusAvailable, p.status[server1Addr])
187179
assert.Equal(t, statusAvailable, p.status[server2Addr])
@@ -194,3 +186,145 @@ func TestAddressPool_checkConnectivity(t *testing.T) {
194186
assert.Equal(t, statusUnavailable, p.status[server1Addr])
195187
assert.Equal(t, statusUnavailable, p.status[server2Addr])
196188
}
189+
190+
func TestPool_CheckReadiness(t *testing.T) {
191+
type fields struct {
192+
mutex sync.RWMutex
193+
statusHistory []map[string]string
194+
}
195+
tests := []struct {
196+
name string
197+
fields fields
198+
want int
199+
}{
200+
{
201+
name: "success",
202+
fields: fields{
203+
statusHistory: []map[string]string{
204+
{
205+
"1.1.1.1:30110": statusAvailable,
206+
},
207+
{
208+
"1.1.1.1:30110": statusAvailable,
209+
},
210+
{
211+
"1.1.1.1:30110": statusAvailable,
212+
},
213+
},
214+
},
215+
want: ReadinessSuccess,
216+
},
217+
{
218+
name: "success",
219+
fields: fields{
220+
statusHistory: []map[string]string{
221+
{
222+
"1.1.1.1:30110": statusUnavailable,
223+
},
224+
{
225+
"1.1.1.1:30110": statusAvailable,
226+
},
227+
{
228+
"1.1.1.1:30110": statusAvailable,
229+
},
230+
},
231+
},
232+
want: ReadinessSuccess,
233+
},
234+
{
235+
name: "success",
236+
fields: fields{
237+
statusHistory: []map[string]string{
238+
{
239+
"1.1.1.1:30110": statusUnavailable,
240+
},
241+
{
242+
"1.1.1.1:30110": statusAvailable,
243+
},
244+
{
245+
"1.1.1.1:30110": statusUnavailable,
246+
"1.1.1.2:30110": statusAvailable,
247+
},
248+
},
249+
},
250+
want: ReadinessSuccess,
251+
},
252+
{
253+
name: "indeterminate",
254+
fields: fields{
255+
statusHistory: []map[string]string{
256+
{
257+
"1.1.1.1:30110": statusAvailable,
258+
},
259+
{
260+
"1.1.1.1:30110": statusAvailable,
261+
},
262+
{
263+
"1.1.1.1:30110": statusUnavailable,
264+
},
265+
},
266+
},
267+
want: ReadinessIndeterminate,
268+
},
269+
{
270+
name: "indeterminate",
271+
fields: fields{
272+
statusHistory: []map[string]string{
273+
{
274+
"1.1.1.1:30110": statusAvailable,
275+
},
276+
{
277+
"1.1.1.1:30110": statusUnavailable,
278+
},
279+
{
280+
"1.1.1.1:30110": statusUnavailable,
281+
},
282+
},
283+
},
284+
want: ReadinessIndeterminate,
285+
},
286+
{
287+
name: "indeterminate",
288+
fields: fields{
289+
statusHistory: []map[string]string{
290+
{
291+
"1.1.1.1:30110": statusUnavailable,
292+
},
293+
{
294+
"1.1.1.1:30110": statusAvailable,
295+
},
296+
{
297+
"1.1.1.1:30110": statusUnavailable,
298+
},
299+
},
300+
},
301+
want: ReadinessIndeterminate,
302+
},
303+
{
304+
name: "failed",
305+
fields: fields{
306+
statusHistory: []map[string]string{
307+
{
308+
"1.1.1.1:30110": statusUnavailable,
309+
},
310+
{
311+
"1.1.1.1:30110": statusUnavailable,
312+
},
313+
{
314+
"1.1.1.1:30110": statusUnavailable,
315+
},
316+
},
317+
},
318+
want: ReadinessFailed,
319+
},
320+
}
321+
for _, tt := range tests {
322+
t.Run(tt.name, func(t *testing.T) {
323+
p := &Pool{
324+
mutex: tt.fields.mutex,
325+
statusHistory: tt.fields.statusHistory,
326+
}
327+
assert.Equalf(t, tt.want, p.CheckReadiness(), "CheckReadiness()")
328+
})
329+
}
330+
}

0 commit comments

Comments
 (0)