Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 31 additions & 7 deletions pkg/backends/otlp/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,18 @@ import (
"runtime/debug"
"strconv"
"sync/atomic"
"time"

"github.com/cenkalti/backoff"
"github.com/sirupsen/logrus"
"github.com/spf13/viper"
"github.com/tilinna/clock"
"go.uber.org/multierr"
"golang.org/x/sync/errgroup"

"github.com/atlassian/gostatsd/pkg/stats"

"github.com/atlassian/gostatsd"
"github.com/atlassian/gostatsd/pkg/backends/otlp/internal/data"
"github.com/atlassian/gostatsd/pkg/stats"
"github.com/atlassian/gostatsd/pkg/transport"
)

Expand Down Expand Up @@ -54,7 +56,8 @@ type Backend struct {
CompressPayload bool

// metricsPerBatch is the maximum number of metrics to send in a single batch.
metricsPerBatch int
metricsPerBatch int
maxRequestElapsedTime time.Duration
}

var _ gostatsd.Backend = (*Backend)(nil)
Expand Down Expand Up @@ -86,6 +89,7 @@ func NewClientFromViper(v *viper.Viper, logger logrus.FieldLogger, pool *transpo
logger: logger,
requestsBufferSem: make(chan struct{}, cfg.MaxRequests),
maxRetries: cfg.MaxRetries,
maxRequestElapsedTime: cfg.MaxRequestElapsedTime,
CompressPayload: cfg.CompressPayload,
metricsPerBatch: cfg.MetricsPerBatch,
}, nil
Expand Down Expand Up @@ -337,6 +341,12 @@ func (c *Backend) postMetrics(ctx context.Context, batch group) error {
return err
}

b := backoff.NewExponentialBackOff()
clck := clock.FromContext(ctx)
b.Clock = clck
b.Reset()
b.MaxElapsedTime = c.maxRequestElapsedTime

for {
var dropped int64
c.requestsBufferSem <- struct{}{}
Expand All @@ -354,13 +364,27 @@ func (c *Backend) postMetrics(ctx context.Context, batch group) error {
}
}

if retries >= c.maxRetries {
break
next := b.NextBackOff()
if next == backoff.Stop || retries >= c.maxRetries {
atomic.AddUint64(&c.batchesDropped, 1)
return err
}

c.logger.WithFields(logrus.Fields{
"sleep": next,
"error": err,
}).Warn("failed to send metrics, will retry")

retries++

timer := clck.NewTimer(next)
select {
case <-ctx.Done():
timer.Stop()
return ctx.Err()
case <-timer.C:
}

atomic.AddUint64(&c.batchesRetried.Cur, 1)
}

return err
}
39 changes: 33 additions & 6 deletions pkg/backends/otlp/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,11 +469,13 @@ func TestBackendSendAsyncMetrics(t *testing.T) {
func TestRetrySendMetrics(t *testing.T) {
t.Parallel()
for _, tc := range []struct {
name string
numUntilSuccess int
maxRetries int
wantAttempts int
numErrs int
name string
numUntilSuccess int
maxRetries int
maxRequestElapseTime string
wantAttempts int
approxAttempts bool // because of randomness of the retry interval
numErrs int
}{
{
name: "should retry sending metrics if it fails for the first time",
Expand Down Expand Up @@ -503,6 +505,23 @@ func TestRetrySendMetrics(t *testing.T) {
wantAttempts: 1,
numErrs: 1,
},
{
name: "should not retry if maxRetries reached",
numUntilSuccess: 5,
maxRetries: 3,
maxRequestElapseTime: "100s",
wantAttempts: 4,
numErrs: 1,
},
{
name: "should stop retry if maxRequestElapseTime reached",
numUntilSuccess: 5,
maxRetries: 100,
maxRequestElapseTime: "1s",
wantAttempts: 3,
approxAttempts: true,
numErrs: 1,
},
} {
t.Run(tc.name, func(t *testing.T) {
attempts := 0
Expand All @@ -520,6 +539,9 @@ func TestRetrySendMetrics(t *testing.T) {
v.Set("otlp.metrics_endpoint", fmt.Sprintf("%s/%s", s.URL, "v1/metrics"))
v.Set("otlp.logs_endpoint", fmt.Sprintf("%s/%s", s.URL, "v1/logs"))
v.Set("otlp.max_retries", tc.maxRetries)
if tc.maxRequestElapseTime != "" {
v.Set("otlp.max_request_elapsed_time", tc.maxRequestElapseTime)
}

logger := fixtures.NewTestLogger(t)

Expand All @@ -532,7 +554,12 @@ func TestRetrySendMetrics(t *testing.T) {

b.SendMetricsAsync(context.Background(), gostatsd.NewMetricMap(false), func(errs []error) {
assert.Equal(t, tc.numErrs, len(errs))
assert.Equal(t, tc.wantAttempts, attempts, "Must retry sending metrics")
if tc.approxAttempts {
assert.InDelta(t, tc.wantAttempts, attempts, 1, "Must retry sending metrics")
} else {
assert.Equal(t, tc.wantAttempts, attempts, "Must retry sending metrics")
}

})
})
}
Expand Down
22 changes: 15 additions & 7 deletions pkg/backends/otlp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package otlp
import (
"errors"
"runtime"
"time"

"github.com/spf13/viper"
"go.uber.org/multierr"
Expand All @@ -29,6 +30,9 @@ type Config struct {
MaxRequests int `mapstructure:"max_requests"`
// MaxRetries (Optional, default: 3) is the maximum number of retries to send a batch
MaxRetries int `mapstructure:"max_retries"`
// MaxRequestElapsedTime (Optional, default: 15) is the maximum time in seconds to wait for a request to complete
// 0 means it never stop until reaches MaxRetries
MaxRequestElapsedTime time.Duration `mapstructure:"max_request_elapsed_time"`
// CompressPayload (Optional, default: true) is used to enable payload compression
CompressPayload bool `mapstructure:"compress_payload"`
// MetricsPerBatch (Optional, default: 1000) is the maximum number of metrics to send in a single batch.
Expand Down Expand Up @@ -61,13 +65,14 @@ type Config struct {

func newDefaultConfig() *Config {
return &Config{
Transport: "default",
MaxRequests: runtime.NumCPU() * 2,
MaxRetries: 3,
CompressPayload: true,
MetricsPerBatch: defaultMetricsPerBatch,
Conversion: ConversionAsGauge,
UserAgent: "gostatsd",
Transport: "default",
MaxRequests: runtime.NumCPU() * 2,
MaxRetries: 3,
MaxRequestElapsedTime: time.Second * 15,
CompressPayload: true,
MetricsPerBatch: defaultMetricsPerBatch,
Conversion: ConversionAsGauge,
UserAgent: "gostatsd",
}
}

Expand Down Expand Up @@ -97,6 +102,9 @@ func (c *Config) Validate() (errs error) {
if c.MaxRetries < 0 {
errs = multierr.Append(errs, errors.New("max retries must be a positive value"))
}
if c.MaxRequestElapsedTime < 0 {
errs = multierr.Append(errs, errors.New("max request elapsed time must >= 0"))
}
if c.MetricsPerBatch <= 0 {
errs = multierr.Append(errs, errors.New("metrics per batch must be a positive value"))
}
Expand Down
21 changes: 12 additions & 9 deletions pkg/backends/otlp/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package otlp

import (
"testing"
"time"

"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -30,20 +31,22 @@ func TestNewConfig(t *testing.T) {
v.SetDefault("otlp.logs_endpoint", "http://local/v1/logs")
v.SetDefault("otlp.max_requests", 1)
v.SetDefault("otlp.max_retries", 3)
v.SetDefault("otlp.max_request_elapsed_time", "15s")
v.SetDefault("otlp.compress_payload", true)
v.SetDefault("otlp.metrics_per_batch", 999)
return v
}(),
expect: &Config{
MetricsEndpoint: "http://local/v1/metrics",
LogsEndpoint: "http://local/v1/logs",
MaxRequests: 1,
MaxRetries: 3,
CompressPayload: true,
MetricsPerBatch: 999,
Conversion: "AsGauge",
Transport: "default",
UserAgent: "gostatsd",
MetricsEndpoint: "http://local/v1/metrics",
LogsEndpoint: "http://local/v1/logs",
MaxRequests: 1,
MaxRetries: 3,
MaxRequestElapsedTime: time.Second * 15,
CompressPayload: true,
MetricsPerBatch: 999,
Conversion: "AsGauge",
Transport: "default",
UserAgent: "gostatsd",
},
errVal: "",
},
Expand Down
Loading