1
1
package addresspool
2
2
3
3
import (
4
+ "context"
4
5
"fmt"
6
+ "io"
5
7
"net"
8
+ "net/http"
6
9
"os"
7
10
"strconv"
8
11
"sync"
9
12
"time"
10
13
14
+ "github.com/go-chassis/foundation/httpclient"
11
15
"github.com/go-chassis/openlog"
12
16
13
17
"github.com/go-chassis/cari/discovery"
@@ -24,6 +28,16 @@ const (
24
28
healthProbeTimeout = time .Second
25
29
)
26
30
31
+ type HttpProbeOptions struct {
32
+ Client * httpclient.Requests
33
+ Protocol string
34
+ Path string
35
+ }
36
+
37
+ type Options struct {
38
+ HttpProbeOptions * HttpProbeOptions // used in available check if set, tcp will be used if not set
39
+ }
40
+
27
41
// Pool cloud server address pool
28
42
type Pool struct {
29
43
mutex sync.RWMutex
@@ -33,10 +47,11 @@ type Pool struct {
33
47
sameAzAddress []string
34
48
diffAzAddress []string
35
49
36
- status map [string ]string
37
- onceMonitor sync.Once
38
- quit chan struct {}
39
- onceQuit sync.Once
50
+ status map [string ]string
51
+ onceMonitor sync.Once
52
+ quit chan struct {}
53
+ onceQuit sync.Once
54
+ httpProbeOptions * HttpProbeOptions
40
55
}
41
56
42
57
func (p * Pool ) Close () {
@@ -46,23 +61,21 @@ func (p *Pool) Close() {
46
61
}
47
62
48
63
// NewPool Get registry pool instance
49
- func NewPool (addresses []string ) * Pool {
64
+ func NewPool (addresses []string , opts ... Options ) * Pool {
50
65
p := & Pool {
51
66
defaultAddress : removeDuplicates (addresses ),
52
67
status : make (map [string ]string ),
53
68
}
54
- p .appendAddressToStatus (addresses )
55
- p .monitor ()
56
- return p
57
- }
58
69
59
- func (p * Pool ) appendAddressToStatus (addresses []string ) {
60
- for _ , v := range addresses {
61
- if _ , ok := p .status [v ]; ok {
62
- continue
70
+ if len (opts ) > 0 && opts [0 ].HttpProbeOptions != nil {
71
+ p .httpProbeOptions = opts [0 ].HttpProbeOptions
72
+ if p .httpProbeOptions .Client == nil {
73
+ openlog .Info (fmt .Sprintf ("http client nil, make one with default options" ))
74
+ p .httpProbeOptions .Client , _ = httpclient .New (nil )
63
75
}
64
- p .status [v ] = statusAvailable
65
76
}
77
+ p .monitor ()
78
+ return p
66
79
}
67
80
68
81
func (p * Pool ) ResetAddress (addresses []string ) {
@@ -72,7 +85,6 @@ func (p *Pool) ResetAddress(addresses []string) {
72
85
p .diffAzAddress = []string {}
73
86
p .sameAzAddress = []string {}
74
87
p .status = make (map [string ]string )
75
- p .appendAddressToStatus (addresses )
76
88
}
77
89
78
90
func (p * Pool ) SetAddressByInstances (instances []* discovery.MicroServiceInstance ) error {
@@ -88,12 +100,10 @@ func (p *Pool) SetAddressByInstances(instances []*discovery.MicroServiceInstance
88
100
uniqueAddrList := removeDuplicates (addrList )
89
101
if p .isSameAzAddr (uniqueAddrList ) {
90
102
p .sameAzAddress = uniqueAddrList
91
- p .appendAddressToStatus (uniqueAddrList )
92
103
openlog .Info (fmt .Sprintf ("sync same az endpoints: %s" , uniqueAddrList ))
93
104
continue
94
105
}
95
106
p .diffAzAddress = uniqueAddrList
96
- p .appendAddressToStatus (uniqueAddrList )
97
107
openlog .Info (fmt .Sprintf ("sync different az endpoints: %s" , addrList ))
98
108
}
99
109
return nil
@@ -167,13 +177,12 @@ func (p *Pool) checkConnectivity() {
167
177
if _ , exist := status [v ]; exist {
168
178
continue
169
179
}
170
- conn , err := net . DialTimeout ( "tcp" , v , healthProbeTimeout )
180
+ err := p . doCheckConnectivity ( v )
171
181
if err != nil {
172
- openlog .Error (" connectivity unavailable: " + v )
182
+ openlog .Error (fmt . Sprintf ( "%s connectivity unavailable: %s" , v , err ) )
173
183
status [v ] = statusUnavailable
174
184
} else {
175
185
status [v ] = statusAvailable
176
- conn .Close ()
177
186
}
178
187
}
179
188
@@ -182,6 +191,54 @@ func (p *Pool) checkConnectivity() {
182
191
p .mutex .Unlock ()
183
192
}
184
193
194
+ func (p * Pool ) doCheckConnectivity (endpoint string ) error {
195
+ if p .httpProbeOptions != nil {
196
+ return p .doCheckConnectivityWithHttp (endpoint )
197
+ }
198
+
199
+ return p .doCheckConnectivityWithTcp (endpoint )
200
+ }
201
+
202
+ func (p * Pool ) doCheckConnectivityWithTcp (endpoint string ) error {
203
+ conn , err := net .DialTimeout ("tcp" , endpoint , healthProbeTimeout )
204
+ if err != nil {
205
+ return err
206
+ }
207
+
208
+ err = conn .Close ()
209
+ if err != nil {
210
+ openlog .Error (fmt .Sprintf ("close conn failed when check connectivity: %s" , err ))
211
+ }
212
+
213
+ return nil
214
+ }
215
+
216
+ func (p * Pool ) doCheckConnectivityWithHttp (endpoint string ) error {
217
+ u := p .httpProbeOptions .Protocol + "://" + endpoint + p .httpProbeOptions .Path
218
+ resp , err := p .httpProbeOptions .Client .Get (context .Background (), u , nil )
219
+ if err != nil {
220
+ return err
221
+ }
222
+
223
+ if resp .StatusCode >= http .StatusOK && resp .StatusCode < http .StatusMultipleChoices {
224
+ return nil
225
+ }
226
+ // do tcp check if api not exist, ensure to compatible with old scenes
227
+ if resp .StatusCode == http .StatusNotFound {
228
+ return p .doCheckConnectivityWithTcp (endpoint )
229
+ }
230
+
231
+ body , err := io .ReadAll (resp .Body )
232
+ if err != nil {
233
+ return fmt .Errorf ("http status: %s, read resp error: %s" , resp .Status , err )
234
+ }
235
+ err = resp .Body .Close ()
236
+ if err != nil {
237
+ openlog .Error (fmt .Sprintf ("close http resp.Body failed when check connectivity: %s" , err ))
238
+ }
239
+ return fmt .Errorf ("http status: %s, resp: %s" , resp .Status , string (body ))
240
+ }
241
+
185
242
func (p * Pool ) monitor () {
186
243
p .onceMonitor .Do (func () {
187
244
var interval time.Duration
0 commit comments