@@ -25,7 +25,6 @@ import (
25
25
"errors"
26
26
"fmt"
27
27
"io"
28
- "math"
29
28
"net/http"
30
29
"net/url"
31
30
"os"
@@ -215,16 +214,30 @@ func ingest(ctx context.Context, bheUrl url.URL, bheClient *http.Client, in <-ch
215
214
} else {
216
215
req .Header .Set ("User-Agent" , constants .UserAgent ())
217
216
req .Header .Set ("Accept" , "application/json" )
218
- req .Header .Set ("Prefer" , "wait=60" )
219
217
req .Header .Set ("Content-Encoding" , "gzip" )
220
218
for retry := 0 ; retry < maxRetries ; retry ++ {
221
219
// No retries on regular err cases, only on HTTP 504 Gateway Timeout and HTTP 503 Service Unavailable
222
220
if response , err := bheClient .Do (req ); err != nil {
221
+ if rest .IsClosedConnectionErr (err ) {
222
+ // try again on force closed connection
223
+ log .Error (err , fmt .Sprintf ("remote host force closed connection while requesting %s; attempt %d/%d; trying again" , req .URL , retry + 1 , maxRetries ))
224
+ rest .ExponentialBackoff (retry )
225
+
226
+ if retry == maxRetries - 1 {
227
+ log .Error (ErrExceededRetryLimit , "" )
228
+ hasErrors = true
229
+ }
230
+
231
+ continue
232
+ }
223
233
log .Error (err , unrecoverableErrMsg )
224
234
return true
225
- } else if response .StatusCode == http .StatusGatewayTimeout || response .StatusCode == http .StatusServiceUnavailable {
226
- backoff := math .Pow (5 , float64 (retry + 1 ))
227
- time .Sleep (time .Second * time .Duration (backoff ))
235
+ } else if response .StatusCode == http .StatusGatewayTimeout || response .StatusCode == http .StatusServiceUnavailable || response .StatusCode == http .StatusBadGateway {
236
+ serverError := fmt .Errorf ("received server error %d while requesting %v; attempt %d/%d; trying again" , response .StatusCode , endpoint , retry + 1 , maxRetries )
237
+ log .Error (serverError , "" )
238
+
239
+ rest .ExponentialBackoff (retry )
240
+
228
241
if retry == maxRetries - 1 {
229
242
log .Error (ErrExceededRetryLimit , "" )
230
243
hasErrors = true
@@ -256,19 +269,55 @@ func ingest(ctx context.Context, bheUrl url.URL, bheClient *http.Client, in <-ch
256
269
257
270
// TODO: create/use a proper bloodhound client
258
271
func do (bheClient * http.Client , req * http.Request ) (* http.Response , error ) {
259
- if res , err := bheClient .Do (req ); err != nil {
260
- return nil , fmt .Errorf ("failed to request %v: %w" , req .URL , err )
261
- } else if res .StatusCode < http .StatusOK || res .StatusCode >= http .StatusBadRequest {
262
- var body json.RawMessage
263
- defer res .Body .Close ()
264
- if err := json .NewDecoder (res .Body ).Decode (& body ); err != nil {
265
- return nil , fmt .Errorf ("received unexpected response code from %v: %s; failure reading response body" , req .URL , res .Status )
266
- } else {
267
- return nil , fmt .Errorf ("received unexpected response code from %v: %s %s" , req .URL , res .Status , body )
268
- }
272
+ var (
273
+ res * http.Response
274
+ maxRetries = 3
275
+ )
276
+
277
+ // copy the bytes in case we need to retry the request
278
+ if body , err := rest .CopyBody (req ); err != nil {
279
+ return nil , err
269
280
} else {
270
- return res , nil
281
+ for retry := 0 ; retry < maxRetries ; retry ++ {
282
+ // Reusing http.Request requires rewinding the request body
283
+ // back to a working state
284
+ if body != nil && retry > 0 {
285
+ req .Body = io .NopCloser (bytes .NewBuffer (body ))
286
+ }
287
+
288
+ if res , err = bheClient .Do (req ); err != nil {
289
+ if rest .IsClosedConnectionErr (err ) {
290
+ // try again on force closed connections
291
+ log .Error (err , fmt .Sprintf ("remote host force closed connection while requesting %s; attempt %d/%d; trying again" , req .URL , retry + 1 , maxRetries ))
292
+ rest .ExponentialBackoff (retry )
293
+ continue
294
+ }
295
+ // normal client error, dont attempt again
296
+ return nil , err
297
+ } else if res .StatusCode < http .StatusOK || res .StatusCode >= http .StatusBadRequest {
298
+ if res .StatusCode >= http .StatusInternalServerError {
299
+ // Internal server error, backoff and try again.
300
+ serverError := fmt .Errorf ("received server error %d while requesting %v" , res .StatusCode , req .URL )
301
+ log .Error (serverError , fmt .Sprintf ("attempt %d/%d; trying again" , retry + 1 , maxRetries ))
302
+
303
+ rest .ExponentialBackoff (retry )
304
+ continue
305
+ }
306
+ // bad request we do not need to retry
307
+ var body json.RawMessage
308
+ defer res .Body .Close ()
309
+ if err := json .NewDecoder (res .Body ).Decode (& body ); err != nil {
310
+ return nil , fmt .Errorf ("received unexpected response code from %v: %s; failure reading response body" , req .URL , res .Status )
311
+ } else {
312
+ return nil , fmt .Errorf ("received unexpected response code from %v: %s %s" , req .URL , res .Status , body )
313
+ }
314
+ } else {
315
+ return res , nil
316
+ }
317
+ }
271
318
}
319
+
320
+ return nil , fmt .Errorf ("unable to complete request to url=%s; attempts=%d;" , req .URL , maxRetries )
272
321
}
273
322
274
323
type basicResponse [T any ] struct {
0 commit comments