@@ -31,7 +31,6 @@ import (
31
31
"os/signal"
32
32
"runtime"
33
33
"sort"
34
- "strings"
35
34
"sync"
36
35
"sync/atomic"
37
36
"time"
@@ -229,8 +228,8 @@ func ingest(ctx context.Context, bheUrl url.URL, bheClient *http.Client, in <-ch
229
228
log .Error (err , unrecoverableErrMsg )
230
229
return true
231
230
} else if response .StatusCode == http .StatusGatewayTimeout || response .StatusCode == http .StatusServiceUnavailable || response .StatusCode == http .StatusBadGateway {
232
- serverError := fmt .Errorf ("received server error %d while requesting %v" , response .StatusCode , endpoint )
233
- log .Error (serverError , "attempt %d/%d" , retry + 1 , maxRetries )
231
+ serverError := fmt .Errorf ("received server error %d while requesting %v; " , response .StatusCode , endpoint )
232
+ log .Error (serverError , "attempt %d/%d; trying again " , retry + 1 , maxRetries )
234
233
235
234
rest .ExponentialBackoff (retry , maxRetries )
236
235
@@ -266,61 +265,55 @@ func ingest(ctx context.Context, bheUrl url.URL, bheClient *http.Client, in <-ch
266
265
// TODO: create/use a proper bloodhound client
267
266
func do (bheClient * http.Client , req * http.Request ) (* http.Response , error ) {
268
267
var (
269
- body []byte
270
268
res * http.Response
271
269
err error
272
270
maxRetries = 3
273
271
)
274
272
275
273
// copy the bytes in case we need to retry the request
276
- if req .Body != nil {
277
- if body , err = io .ReadAll (req .Body ); err != nil {
278
- return nil , err
279
- }
280
- if body != nil {
281
- req .Body = io .NopCloser (bytes .NewBuffer (body ))
282
- }
283
- }
274
+ if body , err := rest .CopyBody (req ); err != nil {
275
+ return nil , err
276
+ } else {
277
+ for retry := 0 ; retry < maxRetries ; retry ++ {
278
+ // Reusing http.Request requires rewinding the request body
279
+ // back to a working state
280
+ if body != nil && retry > 0 {
281
+ req .Body = io .NopCloser (bytes .NewBuffer (body ))
282
+ }
284
283
285
- for retry := 0 ; retry < maxRetries ; retry ++ {
286
- // Reusing http.Request requires rewinding the request body
287
- // back to a working state
288
- if body != nil && retry > 0 {
289
- req .Body = io .NopCloser (bytes .NewBuffer (body ))
290
- }
284
+ if res , err = bheClient .Do (req ); err != nil {
285
+ if rest .IsClosedConnectionErr (err ) {
286
+ // try again on force closed connections
287
+ log .Error (err , "remote host force closed connection while requesting %s; attempt %d/%d; trying again\n " , req .URL , retry + 1 , maxRetries )
288
+ rest .ExponentialBackoff (retry , maxRetries )
289
+ continue
290
+ }
291
+ // normal client error, dont attempt again
292
+ return nil , err
293
+ } else if res .StatusCode < http .StatusOK || res .StatusCode >= http .StatusBadRequest {
294
+ if res .StatusCode >= http .StatusInternalServerError {
295
+ // Internal server error, backoff and try again.
296
+ serverError := fmt .Errorf ("received server error %d while requesting %v" , res .StatusCode , req .URL )
297
+ log .Error (serverError , "attempt %d/%d" , retry + 1 , maxRetries )
291
298
292
- if res , err = bheClient .Do (req ); err != nil {
293
- if strings .Contains (err .Error (), rest .ClosedConnectionMsg ) || strings .HasSuffix (err .Error (), ": EOF" ) {
294
- // try again on force closed connections
295
- log .Error (err , "remote host force closed connection while requesting %s; attempt %d/%d; trying again\n " , req .URL , retry + 1 , maxRetries )
296
- rest .ExponentialBackoff (retry , maxRetries )
297
- continue
298
- }
299
- // normal client error, dont attempt again
300
- return nil , err
301
- } else if res .StatusCode < http .StatusOK || res .StatusCode >= http .StatusBadRequest {
302
- if res .StatusCode >= http .StatusInternalServerError {
303
- // Internal server error, backoff and try again.
304
- serverError := fmt .Errorf ("received server error %d while requesting %v" , res .StatusCode , req .URL )
305
- log .Error (serverError , "attempt %d/%d" , retry + 1 , maxRetries )
306
-
307
- rest .ExponentialBackoff (retry , maxRetries )
308
- continue
309
- }
310
- // bad request we do not need to retry
311
- var body json.RawMessage
312
- defer res .Body .Close ()
313
- if err := json .NewDecoder (res .Body ).Decode (& body ); err != nil {
314
- return nil , fmt .Errorf ("received unexpected response code from %v: %s; failure reading response body" , req .URL , res .Status )
299
+ rest .ExponentialBackoff (retry , maxRetries )
300
+ continue
301
+ }
302
+ // bad request we do not need to retry
303
+ var body json.RawMessage
304
+ defer res .Body .Close ()
305
+ if err := json .NewDecoder (res .Body ).Decode (& body ); err != nil {
306
+ return nil , fmt .Errorf ("received unexpected response code from %v: %s; failure reading response body" , req .URL , res .Status )
307
+ } else {
308
+ return nil , fmt .Errorf ("received unexpected response code from %v: %s %s" , req .URL , res .Status , body )
309
+ }
315
310
} else {
316
- return nil , fmt . Errorf ( "received unexpected response code from %v: %s %s" , req . URL , res . Status , body )
311
+ return res , nil
317
312
}
318
- } else {
319
- return res , nil
320
313
}
321
314
}
322
315
323
- return nil , fmt .Errorf ("unable to complete request | url=%s | attempts=%d | ERR=%w" , req .URL , maxRetries , err )
316
+ return nil , fmt .Errorf ("unable to complete request to url=%s; attempts=%d; ERR=%w" , req .URL , maxRetries , err )
324
317
}
325
318
326
319
type basicResponse [T any ] struct {
0 commit comments