Skip to content

Commit b45a6b5

Browse files
authored
Adding circuit breaker (#64)
* adding configurable http client timeout * added circuit breaker * added gobreaker license * vendored gobreaker * addressed all the pr suggestions * moved the circuit braker from api to processor * added a test for the circuit breaker opening * changed consecutive with total failures to trigger
1 parent 140b289 commit b45a6b5

File tree

15 files changed

+652
-38
lines changed

15 files changed

+652
-38
lines changed

LICENSE-3rdparty.csv

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,4 @@ go-spew,github.com/davecgh/go-spew,ISC,"Copyright (c) 2012-2016 Dave Collins <da
88
go-jmespath,github.com/jmespath/go-jmespath,Apache-2.0,"Copyright 2015 James Saryerwinnie"
99
go-difflib,github.com/pmezard/go-difflib,BSD-3-Clause,"Copyright (c) 2013, Patrick Mezard. All rights reserved."
1010
testify,github.com/stretchr/testify,MIT,"Copyright (c) 2012-2018 Mat Ryer and Tyler Bunnell"
11+
gobreaker,github.com/sony/gobreaker,MIT,"Copyright 2015 Sony Corporation"

ddlambda.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,21 @@ type (
5151
DDTraceEnabled bool
5252
// MergeXrayTraces will cause Datadog traces to be merged with traces from AWS X-Ray.
5353
MergeXrayTraces bool
54+
// HttpClientTimeout specifies a time limit for requests to the API. It defaults to 5s.
55+
HttpClientTimeout time.Duration
56+
// CircuitBreakerInterval is the cyclic period of the closed state
57+
// for the CircuitBreaker to clear the internal Counts.
58+
// default: 30s
59+
CircuitBreakerInterval time.Duration
60+
// CircuitBreakerTimeout is the period of the open state,
61+
// after which the state of the CircuitBreaker becomes half-open.
62+
// default: 60s
63+
CircuitBreakerTimeout time.Duration
64+
// CircuitBreakerTotalFailures after this amount of times
65+
// of a request failing in the closed state, the state will become open.
66+
// the counter will get totally reset after CircuitBreakerInterval
67+
// default: 4
68+
CircuitBreakerTotalFailures uint32
5469
}
5570
)
5671

@@ -191,6 +206,7 @@ func (cfg *Config) toMetricsConfig() metrics.Config {
191206
mc.KMSAPIKey = cfg.KMSAPIKey
192207
mc.Site = cfg.Site
193208
mc.ShouldUseLogForwarder = cfg.ShouldUseLogForwarder
209+
mc.HttpClientTimeout = cfg.HttpClientTimeout
194210
}
195211

196212
if mc.Site == "" {

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ require (
1313
github.com/google/uuid v1.1.2 // indirect
1414
github.com/opentracing/opentracing-go v1.2.0 // indirect
1515
github.com/philhofer/fwd v1.0.0 // indirect
16+
github.com/sony/gobreaker v0.4.1
1617
github.com/stretchr/testify v1.3.0
1718
golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e // indirect
1819
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect

go.sum

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
2828
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
2929
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
3030
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
31+
github.com/sony/gobreaker v0.4.1/go.mod h1:ZKptC7FHNvhBz7dN2LGjPVBz2sZJmc0/PkyDJOjmxWY=
3132
github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4=
3233
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
3334
github.com/stretchr/testify v1.2.1/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=

internal/metrics/api.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,11 @@ type (
3737

3838
// APIClientOptions contains instantiation options from creating an APIClient.
3939
APIClientOptions struct {
40-
baseAPIURL string
41-
apiKey string
42-
kmsAPIKey string
43-
decrypter Decrypter
40+
baseAPIURL string
41+
apiKey string
42+
kmsAPIKey string
43+
decrypter Decrypter
44+
httpClientTimeout time.Duration
4445
}
4546

4647
postMetricsModel struct {
@@ -51,7 +52,7 @@ type (
5152
// MakeAPIClient creates a new API client with the given api and app keys
5253
func MakeAPIClient(ctx context.Context, options APIClientOptions) *APIClient {
5354
httpClient := &http.Client{
54-
Timeout: time.Second * 5,
55+
Timeout: options.httpClientTimeout,
5556
}
5657
client := &APIClient{
5758
apiKey: options.apiKey,
@@ -113,7 +114,8 @@ func (cl *APIClient) SendMetrics(metrics []APIMetric) error {
113114
}
114115
return fmt.Errorf("Failed to send metrics to API. Status Code %d, Body %s", resp.StatusCode, body)
115116
}
116-
return nil
117+
118+
return err
117119
}
118120

119121
func (cl *APIClient) decryptAPIKey(decrypter Decrypter, kmsAPIKey string) <-chan string {

internal/metrics/constants.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,14 @@ package metrics
1111
import "time"
1212

1313
const (
14-
apiKeyParam = "api_key"
15-
appKeyParam = "application_key"
16-
defaultRetryInterval = time.Millisecond * 250
17-
defaultBatchInterval = time.Second * 15
14+
apiKeyParam = "api_key"
15+
appKeyParam = "application_key"
16+
defaultRetryInterval = time.Millisecond * 250
17+
defaultBatchInterval = time.Second * 15
18+
defaultHttpClientTimeout = time.Second * 5
19+
defaultCircuitBreakerInterval = time.Second * 30
20+
defaultCircuitBreakerTimeout = time.Second * 60
21+
defaultCircuitBreakerTotalFailures = 4
1822
)
1923

2024
// MetricType enumerates all the available metric types

internal/metrics/listener.go

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,17 @@ type (
3838

3939
// Config gives options for how the listener should work
4040
Config struct {
41-
APIKey string
42-
KMSAPIKey string
43-
Site string
44-
ShouldRetryOnFailure bool
45-
ShouldUseLogForwarder bool
46-
BatchInterval time.Duration
47-
EnhancedMetrics bool
41+
APIKey string
42+
KMSAPIKey string
43+
Site string
44+
ShouldRetryOnFailure bool
45+
ShouldUseLogForwarder bool
46+
BatchInterval time.Duration
47+
EnhancedMetrics bool
48+
HttpClientTimeout time.Duration
49+
CircuitBreakerInterval time.Duration
50+
CircuitBreakerTimeout time.Duration
51+
CircuitBreakerTotalFailures uint32
4852
}
4953

5054
logMetric struct {
@@ -66,11 +70,24 @@ const (
6670
func MakeListener(config Config) Listener {
6771

6872
apiClient := MakeAPIClient(context.Background(), APIClientOptions{
69-
baseAPIURL: config.Site,
70-
apiKey: config.APIKey,
71-
decrypter: MakeKMSDecrypter(),
72-
kmsAPIKey: config.KMSAPIKey,
73+
baseAPIURL: config.Site,
74+
apiKey: config.APIKey,
75+
decrypter: MakeKMSDecrypter(),
76+
kmsAPIKey: config.KMSAPIKey,
77+
httpClientTimeout: config.HttpClientTimeout,
7378
})
79+
if config.HttpClientTimeout <= 0 {
80+
config.HttpClientTimeout = defaultHttpClientTimeout
81+
}
82+
if config.CircuitBreakerInterval <= 0 {
83+
config.CircuitBreakerInterval = defaultCircuitBreakerInterval
84+
}
85+
if config.CircuitBreakerTimeout <= 0 {
86+
config.CircuitBreakerTimeout = defaultCircuitBreakerTimeout
87+
}
88+
if config.CircuitBreakerTotalFailures <= 0 {
89+
config.CircuitBreakerTotalFailures = defaultCircuitBreakerTotalFailures
90+
}
7491
if config.BatchInterval <= 0 {
7592
config.BatchInterval = defaultBatchInterval
7693
}
@@ -102,7 +119,7 @@ func (l *Listener) HandlerStarted(ctx context.Context, msg json.RawMessage) cont
102119
}
103120

104121
ts := MakeTimeService()
105-
pr := MakeProcessor(ctx, l.apiClient, ts, l.config.BatchInterval, l.config.ShouldRetryOnFailure)
122+
pr := MakeProcessor(ctx, l.apiClient, ts, l.config.BatchInterval, l.config.ShouldRetryOnFailure, l.config.CircuitBreakerInterval, l.config.CircuitBreakerTimeout, l.config.CircuitBreakerTotalFailures)
106123
l.processor = pr
107124

108125
ctx = AddListener(ctx, l)

internal/metrics/processor.go

Lines changed: 37 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616

1717
"github.com/DataDog/datadog-lambda-go/internal/logger"
1818
"github.com/cenkalti/backoff"
19+
"github.com/sony/gobreaker"
1920
)
2021

2122
type (
@@ -41,13 +42,16 @@ type (
4142
batcher *Batcher
4243
shouldRetryOnFail bool
4344
isProcessing bool
45+
breaker *gobreaker.CircuitBreaker
4446
}
4547
)
4648

4749
// MakeProcessor creates a new metrics context
48-
func MakeProcessor(ctx context.Context, client Client, timeService TimeService, batchInterval time.Duration, shouldRetryOnFail bool) Processor {
50+
func MakeProcessor(ctx context.Context, client Client, timeService TimeService, batchInterval time.Duration, shouldRetryOnFail bool, circuitBreakerInterval time.Duration, circuitBreakerTimeout time.Duration, circuitBreakerTotalFailures uint32) Processor {
4951
batcher := MakeBatcher(batchInterval)
5052

53+
breaker := MakeCircuitBreaker(circuitBreakerInterval, circuitBreakerTimeout, circuitBreakerTotalFailures)
54+
5155
return &processor{
5256
context: ctx,
5357
metricsChan: make(chan Metric, 2000),
@@ -58,7 +62,22 @@ func MakeProcessor(ctx context.Context, client Client, timeService TimeService,
5862
shouldRetryOnFail: shouldRetryOnFail,
5963
timeService: timeService,
6064
isProcessing: false,
65+
breaker: breaker,
66+
}
67+
}
68+
69+
func MakeCircuitBreaker(circuitBreakerInterval time.Duration, circuitBreakerTimeout time.Duration, circuitBreakerTotalFailures uint32) *gobreaker.CircuitBreaker {
70+
readyToTrip := func(counts gobreaker.Counts) bool {
71+
return counts.TotalFailures > circuitBreakerTotalFailures
6172
}
73+
74+
st := gobreaker.Settings{
75+
Name: "post distribution_points",
76+
Interval: circuitBreakerInterval,
77+
Timeout: circuitBreakerTimeout,
78+
ReadyToTrip: readyToTrip,
79+
}
80+
return gobreaker.NewCircuitBreaker(st)
6281
}
6382

6483
func (p *processor) AddMetric(metric Metric) {
@@ -125,18 +144,24 @@ func (p *processor) processMetrics() {
125144
}
126145

127146
if shouldSendBatch {
128-
if shouldExit && p.shouldRetryOnFail {
129-
// If we are shutting down, and we just failed to send our last batch, do a retry
130-
bo := backoff.WithMaxRetries(backoff.NewConstantBackOff(defaultRetryInterval), 2)
131-
err := backoff.Retry(p.sendMetricsBatch, bo)
132-
if err != nil {
133-
logger.Error(fmt.Errorf("failed to flush metrics to datadog API after retry: %v", err))
134-
}
135-
} else {
136-
err := p.sendMetricsBatch()
137-
if err != nil {
138-
logger.Error(fmt.Errorf("failed to flush metrics to datadog API: %v", err))
147+
_, err := p.breaker.Execute(func() (interface{}, error) {
148+
if shouldExit && p.shouldRetryOnFail {
149+
// If we are shutting down, and we just failed to send our last batch, do a retry
150+
bo := backoff.WithMaxRetries(backoff.NewConstantBackOff(defaultRetryInterval), 2)
151+
err := backoff.Retry(p.sendMetricsBatch, bo)
152+
if err != nil {
153+
return nil, fmt.Errorf("after retry: %v", err)
154+
}
155+
} else {
156+
err := p.sendMetricsBatch()
157+
if err != nil {
158+
return nil, fmt.Errorf("with no retry: %v", err)
159+
}
139160
}
161+
return nil, nil
162+
})
163+
if err != nil {
164+
logger.Error(fmt.Errorf("failed to flush metrics to datadog API: %v", err))
140165
}
141166
}
142167
}

internal/metrics/processor_test.go

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ package metrics
1111
import (
1212
"context"
1313
"errors"
14+
"math"
1415
"testing"
1516
"time"
1617

@@ -67,7 +68,7 @@ func TestProcessorBatches(t *testing.T) {
6768
mts.now, _ = time.Parse(time.RFC3339, "2006-01-02T15:04:05Z")
6869
nowUnix := float64(mts.now.Unix())
6970

70-
processor := MakeProcessor(context.Background(), &mc, &mts, 1000, false)
71+
processor := MakeProcessor(context.Background(), &mc, &mts, 1000, false, time.Hour*1000, time.Hour*1000, math.MaxUint32)
7172

7273
d1 := Distribution{
7374
Name: "metric-1",
@@ -113,7 +114,7 @@ func TestProcessorBatchesPerTick(t *testing.T) {
113114
secondTimeUnix := float64(secondTime.Unix())
114115
mts.now = firstTime
115116

116-
processor := MakeProcessor(context.Background(), &mc, &mts, 1000, false)
117+
processor := MakeProcessor(context.Background(), &mc, &mts, 1000, false, time.Hour*1000, time.Hour*1000, math.MaxUint32)
117118

118119
d1 := Distribution{
119120
Name: "metric-1",
@@ -188,7 +189,7 @@ func TestProcessorPerformsRetry(t *testing.T) {
188189
mts.now, _ = time.Parse(time.RFC3339, "2006-01-02T15:04:05Z")
189190

190191
shouldRetry := true
191-
processor := MakeProcessor(context.Background(), &mc, &mts, 1000, shouldRetry)
192+
processor := MakeProcessor(context.Background(), &mc, &mts, 1000, shouldRetry, time.Hour*1000, time.Hour*1000, math.MaxUint32)
192193

193194
d1 := Distribution{
194195
Name: "metric-1",
@@ -213,7 +214,7 @@ func TestProcessorCancelsWithContext(t *testing.T) {
213214

214215
shouldRetry := true
215216
ctx, cancelFunc := context.WithCancel(context.Background())
216-
processor := MakeProcessor(ctx, &mc, &mts, 1000, shouldRetry)
217+
processor := MakeProcessor(ctx, &mc, &mts, 1000, shouldRetry, time.Hour*1000, time.Hour*1000, math.MaxUint32)
217218

218219
d1 := Distribution{
219220
Name: "metric-1",
@@ -230,3 +231,29 @@ func TestProcessorCancelsWithContext(t *testing.T) {
230231

231232
assert.Equal(t, 0, mc.sendMetricsCalledCount)
232233
}
234+
235+
func TestProcessorBatchesWithOpeningCircuitBreaker(t *testing.T) {
236+
mc := makeMockClient()
237+
mts := makeMockTimeService()
238+
239+
mts.now, _ = time.Parse(time.RFC3339, "2006-01-02T15:04:05Z")
240+
241+
// Will open the circuit breaker at number of total failures > 1
242+
circuitBreakerTotalFailures := uint32(1)
243+
processor := MakeProcessor(context.Background(), &mc, &mts, 1000, false, time.Hour*1000, time.Hour*1000, circuitBreakerTotalFailures)
244+
245+
d1 := Distribution{
246+
Name: "metric-1",
247+
Tags: []string{"a", "b", "c"},
248+
Values: []MetricValue{{Timestamp: mts.now, Value: 1}, {Timestamp: mts.now, Value: 2}, {Timestamp: mts.now, Value: 3}},
249+
}
250+
251+
mc.err = errors.New("Some error")
252+
253+
processor.AddMetric(&d1)
254+
255+
processor.FinishProcessing()
256+
257+
// It should have retried 3 times, but circuit breaker opened at the second time
258+
assert.Equal(t, 1, mc.sendMetricsCalledCount)
259+
}

vendor/github.com/sony/gobreaker/.travis.yml

Lines changed: 15 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vendor/github.com/sony/gobreaker/LICENSE

Lines changed: 21 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)