@@ -56,14 +56,101 @@ type upstreamResolverBase struct {
56
56
func newUpstreamResolverBase (ctx context.Context , statusRecorder * peer.Status ) * upstreamResolverBase {
57
57
ctx , cancel := context .WithCancel (ctx )
58
58
59
- return & upstreamResolverBase {
59
+ resolverBase := & upstreamResolverBase {
60
60
ctx : ctx ,
61
61
cancel : cancel ,
62
62
upstreamTimeout : upstreamTimeout ,
63
63
reactivatePeriod : reactivatePeriod ,
64
64
failsTillDeact : failsTillDeact ,
65
65
statusRecorder : statusRecorder ,
66
66
}
67
+
68
+ go resolverBase .watchPeersConnStatusChanges ()
69
+
70
+ return resolverBase
71
+ }
72
+
73
+ func (u * upstreamResolverBase ) watchPeersConnStatusChanges () {
74
+ var probeRunning atomic.Bool
75
+ var cancelBackOff context.CancelFunc
76
+
77
+ exponentialBackOff := & backoff.ExponentialBackOff {
78
+ InitialInterval : 200 * time .Millisecond ,
79
+ RandomizationFactor : 0.5 ,
80
+ Multiplier : 1.1 ,
81
+ MaxInterval : 5 * time .Second ,
82
+ MaxElapsedTime : 15 * time .Second ,
83
+ Stop : backoff .Stop ,
84
+ Clock : backoff .SystemClock ,
85
+ }
86
+ operation := func () error {
87
+ select {
88
+ case <- u .ctx .Done ():
89
+ return backoff .Permanent (fmt .Errorf ("exiting upstream retry loop for upstreams %s: parent context : %s" , u .upstreamServers , u .ctx .Err ()))
90
+ default :
91
+ }
92
+
93
+ u .probeAvailability ()
94
+ if u .disabled {
95
+ return fmt .Errorf ("probe faled" )
96
+ }
97
+ return nil
98
+ }
99
+
100
+ continualProbe := func () {
101
+ // probe continually for 30s when peer count >= 1
102
+ if u .statusRecorder .GetConnectedPeersCount () == 0 {
103
+ log .Debug ("O peer connected, running one more DNS probe" )
104
+ // cancel backoff operation
105
+ if cancelBackOff != nil {
106
+ cancelBackOff ()
107
+ cancelBackOff = nil
108
+ }
109
+ u .probeAvailability ()
110
+ return
111
+ }
112
+
113
+ if probeRunning .Load () {
114
+ log .Info ("restarting DNS probing" )
115
+ cancelBackOff ()
116
+ cancelBackOff = nil
117
+ }
118
+ defer func () {
119
+ u .mutex .Lock ()
120
+ log .Infof ("DNS probing finished, servers %s disabled: %t" , u .upstreamServers , u .disabled )
121
+ u .mutex .Unlock ()
122
+ probeRunning .Store (false )
123
+ }()
124
+ probeRunning .Store (true )
125
+
126
+ ctx , cancel := context .WithCancel (context .Background ())
127
+ cancelBackOff = cancel
128
+ err := backoff .Retry (func () error {
129
+ select {
130
+ case <- ctx .Done ():
131
+ log .Warn ("DNS probing cancelled" )
132
+ return backoff .Permanent (ctx .Err ())
133
+ default :
134
+ return operation ()
135
+ }
136
+ }, backoff .WithContext (exponentialBackOff , ctx ))
137
+ cancelBackOff = nil
138
+ if err != nil {
139
+ log .Warn ("DNS probe trigger by peer connection failed" )
140
+ u .disable (err )
141
+ return
142
+ }
143
+ }
144
+
145
+ for {
146
+ select {
147
+ case <- u .ctx .Done ():
148
+ return
149
+ case <- u .statusRecorder .GetPeersConnStatusChangeNotifier ():
150
+ log .Debugf ("probing DNS availability on/off for 30s" )
151
+ go continualProbe ()
152
+ }
153
+ }
67
154
}
68
155
69
156
func (u * upstreamResolverBase ) stop () {
@@ -163,7 +250,7 @@ func (u *upstreamResolverBase) checkUpstreamFails(err error) {
163
250
}
164
251
165
252
// probeAvailability tests all upstream servers simultaneously and
166
- // disables the resolver if none work
253
+ // disables/enable the resolver
167
254
func (u * upstreamResolverBase ) probeAvailability () {
168
255
u .mutex .Lock ()
169
256
defer u .mutex .Unlock ()
@@ -174,11 +261,6 @@ func (u *upstreamResolverBase) probeAvailability() {
174
261
default :
175
262
}
176
263
177
- // avoid probe if upstreams could resolve at least one query and fails count is less than failsTillDeact
178
- if u .successCount .Load () > 0 && u .failsCount .Load () < u .failsTillDeact {
179
- return
180
- }
181
-
182
264
var success bool
183
265
var mu sync.Mutex
184
266
var wg sync.WaitGroup
@@ -190,7 +272,7 @@ func (u *upstreamResolverBase) probeAvailability() {
190
272
wg .Add (1 )
191
273
go func () {
192
274
defer wg .Done ()
193
- err := u .testNameserver (upstream , 500 * time . Millisecond )
275
+ err := u .testNameserver (upstream , probeTimeout )
194
276
if err != nil {
195
277
errors = multierror .Append (errors , err )
196
278
log .Warnf ("probing upstream nameserver %s: %s" , upstream , err )
@@ -208,6 +290,15 @@ func (u *upstreamResolverBase) probeAvailability() {
208
290
// didn't find a working upstream server, let's disable and try later
209
291
if ! success {
210
292
u .disable (errors .ErrorOrNil ())
293
+ return
294
+ }
295
+
296
+ if u .disabled {
297
+ log .Infof ("upstreams %s are responsive again. Adding them back to system" , u .upstreamServers )
298
+ u .failsCount .Store (0 )
299
+ u .successCount .Add (1 )
300
+ u .reactivate ()
301
+ u .disabled = false
211
302
}
212
303
}
213
304
@@ -223,37 +314,17 @@ func (u *upstreamResolverBase) waitUntilResponse() {
223
314
Clock : backoff .SystemClock ,
224
315
}
225
316
226
- operation := func () error {
227
- select {
228
- case <- u .ctx .Done ():
229
- return backoff .Permanent (fmt .Errorf ("exiting upstream retry loop for upstreams %s: parent context has been canceled" , u .upstreamServers ))
230
- default :
231
- }
232
-
233
- for _ , upstream := range u .upstreamServers {
234
- if err := u .testNameserver (upstream , probeTimeout ); err != nil {
235
- log .Tracef ("upstream check for %s: %s" , upstream , err )
236
- } else {
237
- // at least one upstream server is available, stop probing
238
- return nil
239
- }
317
+ err := backoff .Retry (func () error {
318
+ u .probeAvailability ()
319
+ if u .disabled {
320
+ return fmt .Errorf ("failed to enable upsstream" )
240
321
}
241
-
242
- log .Tracef ("checking connectivity with upstreams %s failed. Retrying in %s" , u .upstreamServers , exponentialBackOff .NextBackOff ())
243
- return fmt .Errorf ("upstream check call error" )
244
- }
245
-
246
- err := backoff .Retry (operation , exponentialBackOff )
322
+ return nil
323
+ }, exponentialBackOff )
247
324
if err != nil {
248
325
log .Warn (err )
249
326
return
250
327
}
251
-
252
- log .Infof ("upstreams %s are responsive again. Adding them back to system" , u .upstreamServers )
253
- u .failsCount .Store (0 )
254
- u .successCount .Add (1 )
255
- u .reactivate ()
256
- u .disabled = false
257
328
}
258
329
259
330
// isTimeout returns true if the given error is a network timeout error.
0 commit comments