Skip to content

Commit 30d420f

Browse files
authored
Don't use exponential backoff in http_forwarder for non-retryable errors when fail-fast-on-non-retryable-errors set to true (#773)
* Add logic to provide a contextual error when invoking the returned `post` function returned from `constructPost` in `handler_http_forwarder_v2.go` which indicates whether the request is retryable or not. Does not attempt exponential backoff if not retryable. * add guards for errors.As() calls to ensure expected behavior, fallback to old error format if not ok. Use RetryableErrorCodes instead of using inverse logic for `NonRetryableErrorCodes` * Add Test to confirm backoff is not triggered on bad certificate * restructure code so we don't call b.NextBackOff unless needed * use a guard clause to exit on non-retryable errors instead of nexted conditionals * always return `RequestError` from postInstanceFn * add specific atomic counter for fast failure * make new failfast behaviour opt-in via configuration
1 parent 0cbb24e commit 30d420f

File tree

8 files changed

+825
-115
lines changed

8 files changed

+825
-115
lines changed

README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,11 @@ following configuration options:
146146
in both `custom-header` and `dynamic-header`, the vaule set by `custom-header` takes precedence. Not required, default
147147
is empty. Example: `--dynamic-headers='["region", "service"]'`.
148148
This is an experimental feature and it may be removed or changed in future versions.
149+
- `fail-fast-on-non-retryable-errors`: boolean flag that controls whether to immediately drop messages that encounter
150+
non-retryable HTTP errors (e.g., 4xx status codes indicating client errors). When set to `true`, non-retryable errors
151+
immediately drop the message without retry attempts. When set to `false`, even non-retryable errors will attempt retries
152+
via exponential backoff up to the `max-request-elapsed-time` limit. Note: Some errors (e.g., TLS certificate validation
153+
failures) are always treated as non-retryable. Defaults to `false`.
149154

150155
The following settings from the previous section are also supported:
151156
- `expiry-*`

cmd/gostatsd/main.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -174,12 +174,13 @@ func constructServer(v *viper.Viper) (*statsd.Server, error) {
174174
fmt.Sprintf("version:%s", Version),
175175
fmt.Sprintf("commit:%s", GitCommit),
176176
},
177-
DisabledSubTypes: gostatsd.DisabledSubMetrics(v),
178-
BadLineRateLimitPerSecond: rate.Limit(v.GetFloat64(gostatsd.ParamBadLinesPerMinute) / 60.0),
179-
HistogramLimit: v.GetUint32(gostatsd.ParamTimerHistogramLimit),
180-
DisableInternalEvents: v.GetBool(gostatsd.ParamDisableInternalEvents),
181-
Viper: v,
182-
TransportPool: pool,
177+
DisabledSubTypes: gostatsd.DisabledSubMetrics(v),
178+
BadLineRateLimitPerSecond: rate.Limit(v.GetFloat64(gostatsd.ParamBadLinesPerMinute) / 60.0),
179+
HistogramLimit: v.GetUint32(gostatsd.ParamTimerHistogramLimit),
180+
DisableInternalEvents: v.GetBool(gostatsd.ParamDisableInternalEvents),
181+
Viper: v,
182+
TransportPool: pool,
183+
FastFailOnNonRetryableErrors: v.GetBool(gostatsd.ParamFastFailOnNonRetryableErrors),
183184
}, nil
184185
}
185186

cmd/lambda-extension/main.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,9 +95,10 @@ func NewServer(v *viper.Viper, logger logrus.FieldLogger) *statsd.Server {
9595
fmt.Sprintf("version:%s", Version),
9696
fmt.Sprintf("commit:%s", GitCommit),
9797
},
98-
BadLineRateLimitPerSecond: rate.Limit(v.GetFloat64(gostatsd.ParamBadLinesPerMinute) / 60.0),
99-
Viper: v,
100-
TransportPool: transport.NewTransportPool(logger, v),
98+
BadLineRateLimitPerSecond: rate.Limit(v.GetFloat64(gostatsd.ParamBadLinesPerMinute) / 60.0),
99+
Viper: v,
100+
TransportPool: transport.NewTransportPool(logger, v),
101+
FastFailOnNonRetryableErrors: v.GetBool(gostatsd.ParamFastFailOnNonRetryableErrors),
101102
}
102103

103104
if v.GetBool(gostatsd.ParamLambdaExtensionManualFlush) {

defaults_and_params.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,8 @@ const (
9797
DefaultLogRawMetric = false
9898
// DefaultDisableInternalEvents is the default value for disabling internal events being sent
9999
DefaultDisableInternalEvents = false
100+
// DefaultFastFailOnNonRetryableErrors is the default value for whether to forego exponential backoff on non-retryable errors
101+
DefaultFastFailOnNonRetryableErrors = false
100102
)
101103

102104
const (
@@ -184,6 +186,8 @@ const (
184186
ParamLambdaExtensionManualFlush = "lambda-extension-manual-flush"
185187
// ParamLambdaExtensionTelemetryAddress enables the manual flushing of metrics in forwarder mode, the flush interval is ignored
186188
ParamLambdaExtensionTelemetryAddress = "lambda-extension-telemetry-address"
189+
// ParamFastFailOnNonRetryableErrors disables exponential backoff on http client errors and non retryable http status codes
190+
ParamFastFailOnNonRetryableErrors = "fast-fail-on-non-retryable-errors"
187191
)
188192

189193
// AddFlags adds flags to the specified FlagSet.
@@ -227,6 +231,8 @@ func AddFlags(fs *pflag.FlagSet) {
227231
fs.Uint32(ParamTimerHistogramLimit, DefaultTimerHistogramLimit, "upper limit of timer histogram buckets (MaxUint32 by default)")
228232
fs.Bool(ParamLogRawMetric, DefaultLogRawMetric, "Print metrics received from network to stdout in JSON format")
229233
fs.Bool(ParamDisableInternalEvents, DefaultDisableInternalEvents, "Disables sending internal events from gostatsd")
234+
fs.Bool(ParamFastFailOnNonRetryableErrors, DefaultFastFailOnNonRetryableErrors, "Disables exponential backoff and retry on non-retryable http statuses and client errors if true.")
235+
230236
}
231237

232238
func minInt(a, b int) int {

pkg/statsd/handler_http_forwarder_v2.go

Lines changed: 107 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,14 @@ package statsd
33
import (
44
"bytes"
55
"context"
6+
"errors"
67
"fmt"
78
"io"
89
"io/ioutil"
910
"maps"
1011
"net/http"
12+
"net/url"
13+
"slices"
1114
"strings"
1215
"sync"
1316
"sync/atomic"
@@ -31,27 +34,43 @@ import (
3134
)
3235

3336
const (
34-
defaultConsolidatorFlushInterval = 1 * time.Second
35-
defaultCompress = true
36-
defaultCompressionType = "zlib"
37-
defaultCompressionLevel = 9
38-
defaultApiEndpoint = ""
39-
defaultMaxRequestElapsedTime = 30 * time.Second
40-
defaultMaxRequests = 1000
41-
defaultConcurrentMerge = 1
42-
defaultTransport = "default"
37+
defaultConsolidatorFlushInterval = 1 * time.Second
38+
defaultCompress = true
39+
defaultCompressionType = "zlib"
40+
defaultCompressionLevel = 9
41+
defaultApiEndpoint = ""
42+
defaultMaxRequestElapsedTime = 30 * time.Second
43+
defaultMaxRequests = 1000
44+
defaultConcurrentMerge = 1
45+
defaultTransport = "default"
46+
defaultFastFailOnNonRetryableError = false
4347
)
4448

49+
var (
50+
RetryableErrorCodes = []int{408, 429, 500, 502, 503, 504}
51+
)
52+
53+
type RequestError struct {
54+
StatusCode int
55+
Retryable bool
56+
Err error
57+
}
58+
59+
func (r *RequestError) Error() string {
60+
return fmt.Sprintf("status %d: retryable %t: err %v", r.StatusCode, r.Retryable, r.Err)
61+
}
62+
4563
// HttpForwarderHandlerV2 is a PipelineHandler which sends metrics to another gostatsd instance
4664
type HttpForwarderHandlerV2 struct {
47-
postId uint64 // atomic - used for an id in logs
48-
messagesInvalid uint64 // atomic - messages which failed to be created
49-
messagesCreated uint64 // atomic - messages which were created
50-
messagesSent uint64 // atomic - messages successfully sent
51-
messagesRetried uint64 // atomic - retries (first send is not a retry, final failure is not a retry)
52-
messagesDropped uint64 // atomic - final failure
53-
postLatencyTotal atomic.Int64 // total of the time taken to send messages in a flush interval
54-
postLatencyMax atomic.Int64 // maximum time taken to send a message in a flush interval
65+
postId uint64 // atomic - used for an id in logs
66+
messagesInvalid uint64 // atomic - messages which failed to be created
67+
messagesCreated uint64 // atomic - messages which were created
68+
messagesSent uint64 // atomic - messages successfully sent
69+
messagesRetried uint64 // atomic - retries (first send is not a retry, final failure is not a retry)
70+
messagesDroppedFastFail uint64 // atomic - messages which failed due to non-retryable errors
71+
messagesDropped uint64 // atomic - final failure
72+
postLatencyTotal atomic.Int64 // total of the time taken to send messages in a flush interval
73+
postLatencyMax atomic.Int64 // maximum time taken to send a message in a flush interval
5574

5675
lastSuccessfulSend atomic.Int64
5776

@@ -72,7 +91,8 @@ type HttpForwarderHandlerV2 struct {
7291
consolidator *gostatsd.MetricConsolidator
7392
consolidatedMetrics chan []*gostatsd.MetricMap
7493

75-
flushCoordinator flush.Coordinator
94+
flushCoordinator flush.Coordinator
95+
fastFailOnNonRetryableErrors bool
7696
}
7797

7898
var (
@@ -84,16 +104,17 @@ var (
84104
// as part of the configuration passed through.
85105
func newHTTPForwarderHandlerViperConfig(overrides *viper.Viper) *viper.Viper {
86106
values := map[string]any{
87-
"transport": defaultTransport,
88-
"compress": defaultCompress,
89-
"compression-type": defaultCompressionType,
90-
"compression-level": defaultCompressionLevel,
91-
"api-endpoint": defaultApiEndpoint,
92-
"max-requests": defaultMaxRequests,
93-
"max-request-elapsed-time": defaultMaxRequestElapsedTime,
94-
"consolidator-slots": gostatsd.DefaultMaxParsers,
95-
"flush-interval": defaultConsolidatorFlushInterval,
96-
"concurrent-merge": defaultConcurrentMerge,
107+
"transport": defaultTransport,
108+
"compress": defaultCompress,
109+
"compression-type": defaultCompressionType,
110+
"compression-level": defaultCompressionLevel,
111+
"api-endpoint": defaultApiEndpoint,
112+
"max-requests": defaultMaxRequests,
113+
"max-request-elapsed-time": defaultMaxRequestElapsedTime,
114+
"consolidator-slots": gostatsd.DefaultMaxParsers,
115+
"flush-interval": defaultConsolidatorFlushInterval,
116+
"concurrent-merge": defaultConcurrentMerge,
117+
"fast-fail-on-non-retryable-errors": defaultFastFailOnNonRetryableError,
97118
}
98119
maps.Copy(values, util.GetSubViper(overrides, "http-transport").AllSettings())
99120

@@ -123,6 +144,7 @@ func NewHttpForwarderHandlerV2FromViper(logger logrus.FieldLogger, v *viper.Vipe
123144
values.GetStringSlice("dynamic-headers"),
124145
pool,
125146
fc,
147+
values.GetBool("fast-fail-on-non-retryable-errors"),
126148
)
127149
}
128150

@@ -143,6 +165,7 @@ func NewHttpForwarderHandlerV2(
143165
dynHeaderNames []string,
144166
pool *transport.TransportPool,
145167
fc flush.Coordinator,
168+
failFastOnNonRetryableErrors bool,
146169
) (*HttpForwarderHandlerV2, error) {
147170
if apiEndpoint == "" {
148171
return nil, fmt.Errorf("api-endpoint is required")
@@ -178,14 +201,15 @@ func NewHttpForwarderHandlerV2(
178201
}
179202

180203
logger.WithFields(logrus.Fields{
181-
"api-endpoint": apiEndpoint,
182-
"compress": compress,
183-
"compression-type": compressionTypeStr,
184-
"compression-level": compressionLevel,
185-
"max-request-elapsed-time": maxRequestElapsedTime,
186-
"max-requests": maxRequests,
187-
"consolidator-slots": consolidatorSlots,
188-
"flush-interval": flushInterval,
204+
"api-endpoint": apiEndpoint,
205+
"compress": compress,
206+
"compression-type": compressionTypeStr,
207+
"compression-level": compressionLevel,
208+
"max-request-elapsed-time": maxRequestElapsedTime,
209+
"max-requests": maxRequests,
210+
"consolidator-slots": consolidatorSlots,
211+
"flush-interval": flushInterval,
212+
"fast-fail-on-non-retryable-errors": failFastOnNonRetryableErrors,
189213
}).Info("created HttpForwarderHandler")
190214

191215
// Default set of headers used for the forwarder
@@ -232,21 +256,22 @@ func NewHttpForwarderHandlerV2(
232256
}
233257

234258
return &HttpForwarderHandlerV2{
235-
logger: logger.WithField("component", "http-forwarder-handler-v2"),
236-
apiEndpoint: apiEndpoint,
237-
maxRequestElapsedTime: maxRequestElapsedTime,
238-
metricsSem: metricsSem,
239-
metricsMergingSem: metricsMergingSem,
240-
compress: compress,
241-
compressionType: compressionType,
242-
compressionLevel: compressionLevel,
243-
consolidator: consolidator,
244-
consolidatedMetrics: ch,
245-
client: httpClient.Client,
246-
headers: headers,
247-
dynHeaderNames: dynHeaderNamesWithColon,
248-
done: make(chan struct{}),
249-
flushCoordinator: fc,
259+
logger: logger.WithField("component", "http-forwarder-handler-v2"),
260+
apiEndpoint: apiEndpoint,
261+
maxRequestElapsedTime: maxRequestElapsedTime,
262+
metricsSem: metricsSem,
263+
metricsMergingSem: metricsMergingSem,
264+
compress: compress,
265+
compressionType: compressionType,
266+
compressionLevel: compressionLevel,
267+
consolidator: consolidator,
268+
consolidatedMetrics: ch,
269+
client: httpClient.Client,
270+
headers: headers,
271+
dynHeaderNames: dynHeaderNamesWithColon,
272+
done: make(chan struct{}),
273+
flushCoordinator: fc,
274+
fastFailOnNonRetryableErrors: failFastOnNonRetryableErrors,
250275
}, nil
251276
}
252277

@@ -460,7 +485,7 @@ func (hfh *HttpForwarderHandlerV2) post(ctx context.Context, message proto.Messa
460485
"type": endpointType,
461486
})
462487

463-
post, err := hfh.constructPost(ctx, logger, hfh.apiEndpoint+endpoint, message, dynHeaderTags)
488+
postInstanceFn, err := hfh.constructPost(ctx, logger, hfh.apiEndpoint+endpoint, message, dynHeaderTags)
464489
if err != nil {
465490
atomic.AddUint64(&hfh.messagesInvalid, 1)
466491
logger.WithError(err).Error("failed to create request")
@@ -474,7 +499,7 @@ func (hfh *HttpForwarderHandlerV2) post(ctx context.Context, message proto.Messa
474499

475500
for {
476501
startTime := clock.Now(ctx)
477-
if err = post(); err == nil {
502+
if err = postInstanceFn(); err == nil {
478503
atomic.AddUint64(&hfh.messagesSent, 1)
479504
hfh.lastSuccessfulSend.Store(clock.Now(ctx).UnixNano())
480505

@@ -491,6 +516,15 @@ func (hfh *HttpForwarderHandlerV2) post(ctx context.Context, message proto.Messa
491516
return
492517
}
493518

519+
// All errors coming back from postInstanceFn() should be a RequestError
520+
var reqErr *RequestError
521+
ok := errors.As(err, &reqErr)
522+
if ok && !reqErr.Retryable && hfh.fastFailOnNonRetryableErrors {
523+
atomic.AddUint64(&hfh.messagesDroppedFastFail, 1)
524+
logger.WithError(err).Info("failed to send due to non-retryable error giving up")
525+
return
526+
}
527+
494528
next := b.NextBackOff()
495529
if next == backoff.Stop {
496530
atomic.AddUint64(&hfh.messagesDropped, 1)
@@ -593,7 +627,21 @@ func (hfh *HttpForwarderHandlerV2) constructPost(ctx context.Context, logger log
593627
req.Header.Set("Content-Encoding", encoding)
594628
resp, err := hfh.client.Do(req)
595629
if err != nil {
596-
return fmt.Errorf("error POSTing: %v", err)
630+
// Any error returned from client.Do returns *url.Error
631+
// Documentation Ref: https://pkg.go.dev/net/http#Client.Do
632+
var urlErr *url.Error
633+
ok := errors.As(err, &urlErr)
634+
if ok {
635+
return &RequestError{
636+
Retryable: urlErr.Temporary(),
637+
Err: err,
638+
}
639+
}
640+
641+
return &RequestError{
642+
Retryable: false,
643+
Err: err,
644+
}
597645
}
598646
defer func() {
599647
_, _ = io.Copy(ioutil.Discard, resp.Body)
@@ -605,7 +653,11 @@ func (hfh *HttpForwarderHandlerV2) constructPost(ctx context.Context, logger log
605653
"status": resp.StatusCode,
606654
"body": string(bodyStart),
607655
}).Info("failed request")
608-
return fmt.Errorf("received bad status code %d", resp.StatusCode)
656+
return &RequestError{
657+
StatusCode: resp.StatusCode,
658+
Retryable: slices.Contains(RetryableErrorCodes, resp.StatusCode),
659+
Err: err,
660+
}
609661
}
610662
return nil
611663
}, nil

0 commit comments

Comments
 (0)