From c295f48493037e2e9a7774419ef1400393463403 Mon Sep 17 00:00:00 2001 From: svc-excavator-bot Date: Wed, 24 Sep 2025 16:47:21 +0000 Subject: [PATCH] Excavator: Bump go dependency github.com/DataDog/datadog-go/v5 --- go.mod | 2 +- go.sum | 4 +- .../datadog-go/v5/statsd/aggregator.go | 75 +- .../DataDog/datadog-go/v5/statsd/buffer.go | 45 +- .../v5/statsd/buffered_metric_context.go | 15 +- .../datadog-go/v5/statsd/external_env.go | 40 + .../DataDog/datadog-go/v5/statsd/format.go | 74 +- .../DataDog/datadog-go/v5/statsd/metrics.go | 103 +- .../DataDog/datadog-go/v5/statsd/options.go | 11 + .../DataDog/datadog-go/v5/statsd/statsd.go | 701 ++----------- .../datadog-go/v5/statsd/statsd_direct.go | 8 +- .../DataDog/datadog-go/v5/statsd/statsdex.go | 950 ++++++++++++++++++ .../datadog-go/v5/statsd/tag_cardinality.go | 99 ++ .../DataDog/datadog-go/v5/statsd/telemetry.go | 8 +- .../DataDog/datadog-go/v5/statsd/uds.go | 31 +- .../DataDog/datadog-go/v5/statsd/worker.go | 18 +- vendor/modules.txt | 2 +- 17 files changed, 1396 insertions(+), 790 deletions(-) create mode 100644 vendor/github.com/DataDog/datadog-go/v5/statsd/external_env.go create mode 100644 vendor/github.com/DataDog/datadog-go/v5/statsd/statsdex.go create mode 100644 vendor/github.com/DataDog/datadog-go/v5/statsd/tag_cardinality.go diff --git a/go.mod b/go.mod index 4e6a864a7..ae0b59ea2 100644 --- a/go.mod +++ b/go.mod @@ -19,7 +19,7 @@ require ( ) require ( - github.com/DataDog/datadog-go/v5 v5.7.1 // indirect + github.com/DataDog/datadog-go/v5 v5.8.0 // indirect github.com/Microsoft/go-winio v0.6.2 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/bluekeyes/hatpear v0.1.2 // indirect diff --git a/go.sum b/go.sum index f3ed25b3c..6d6afb031 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,5 @@ -github.com/DataDog/datadog-go/v5 v5.7.1 h1:dNhEwKaO3LJhGYKajl2DjobArfa5R9YF72z3Dy+PH3k= -github.com/DataDog/datadog-go/v5 v5.7.1/go.mod h1:CA9Ih6tb3jtxk+ps1xvTnxmhjr7ldE8TiwrZyrm31ss= +github.com/DataDog/datadog-go/v5 v5.8.0 h1:pKZtux5CfqkqGYGvKCM3wV5i8sYAzcddK7nkrChUtxo= +github.com/DataDog/datadog-go/v5 v5.8.0/go.mod h1:K9kcYBlxkcPP8tvvjZZKs/m1edNAUFzBbdpTUKfCsuw= github.com/Microsoft/go-winio v0.5.0/go.mod h1:JPGBdM1cNvN/6ISo+n8V5iA4v8pBzdOpzfwIujj1a84= github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY= github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= diff --git a/vendor/github.com/DataDog/datadog-go/v5/statsd/aggregator.go b/vendor/github.com/DataDog/datadog-go/v5/statsd/aggregator.go index 33eb930ae..e8178f272 100644 --- a/vendor/github.com/DataDog/datadog-go/v5/statsd/aggregator.go +++ b/vendor/github.com/DataDog/datadog-go/v5/statsd/aggregator.go @@ -32,7 +32,7 @@ type aggregator struct { closed chan struct{} - client *Client + client *ClientEx // aggregator implements channelMode mechanism to receive histograms, // distributions and timings. Since they need sampling they need to @@ -43,7 +43,7 @@ type aggregator struct { wg sync.WaitGroup } -func newAggregator(c *Client, maxSamplesPerContext int64) *aggregator { +func newAggregator(c *ClientEx, maxSamplesPerContext int64) *aggregator { return &aggregator{ client: c, counts: countsMap{}, @@ -96,11 +96,11 @@ func (a *aggregator) pullMetric() { case m := <-a.inputMetrics: switch m.metricType { case histogram: - a.histogram(m.name, m.fvalue, m.tags, m.rate) + a.histogram(m.name, m.fvalue, m.tags, m.rate, m.overrideCard) case distribution: - a.distribution(m.name, m.fvalue, m.tags, m.rate) + a.distribution(m.name, m.fvalue, m.tags, m.rate, m.overrideCard) case timing: - a.timing(m.name, m.fvalue, m.tags, m.rate) + a.timing(m.name, m.fvalue, m.tags, m.rate, m.overrideCard) } case <-a.stopChannelMode: a.wg.Done() @@ -172,32 +172,46 @@ func (a *aggregator) flushMetrics() []metric { return metrics } -// getContext returns the context for a metric name and tags. +// getContext returns the context for a metric name, tags, and cardinality. // -// The context is the metric name and tags separated by a separator symbol. +// The context is the metric name, tags, and cardinality separated by separator symbols. // It is not intended to be used as a metric name but as a unique key to aggregate -func getContext(name string, tags []string) string { - c, _ := getContextAndTags(name, tags) +func getContext(name string, tags []string, cardinality Cardinality) string { + c, _ := getContextAndTags(name, tags, cardinality) return c } -// getContextAndTags returns the context and tags for a metric name and tags. +// getContextAndTags returns the context and tags for a metric name, tags, and cardinality. // // See getContext for usage for context // The tags are the tags separated by a separator symbol and can be re-used to pass down to the writer -func getContextAndTags(name string, tags []string) (string, string) { +func getContextAndTags(name string, tags []string, cardinality Cardinality) (string, string) { + cardString := cardinality.String() if len(tags) == 0 { - return name, "" + if cardString == "" { + return name, "" + } + return name + nameSeparatorSymbol + cardinality.String(), "" } + n := len(name) + len(nameSeparatorSymbol) + len(tagSeparatorSymbol)*(len(tags)-1) for _, s := range tags { n += len(s) } + var cardStringLen = 0 + if cardString != "" { + n += len(cardString) + len(cardSeparatorSymbol) + cardStringLen = len(cardString) + len(cardSeparatorSymbol) + } var sb strings.Builder sb.Grow(n) sb.WriteString(name) sb.WriteString(nameSeparatorSymbol) + if cardString != "" { + sb.WriteString(cardString) + sb.WriteString(cardSeparatorSymbol) + } sb.WriteString(tags[0]) for _, s := range tags[1:] { sb.WriteString(tagSeparatorSymbol) @@ -206,11 +220,12 @@ func getContextAndTags(name string, tags []string) (string, string) { s := sb.String() - return s, s[len(name)+len(nameSeparatorSymbol):] + return s, s[len(name)+len(nameSeparatorSymbol)+cardStringLen:] } -func (a *aggregator) count(name string, value int64, tags []string) error { - context := getContext(name, tags) +func (a *aggregator) count(name string, value int64, tags []string, cardinality Cardinality) error { + resolvedCardinality := resolveCardinality(cardinality) + context := getContext(name, tags, resolvedCardinality) a.countsM.RLock() if count, found := a.counts[context]; found { count.sample(value) @@ -227,13 +242,14 @@ func (a *aggregator) count(name string, value int64, tags []string) error { return nil } - a.counts[context] = newCountMetric(name, value, tags) + a.counts[context] = newCountMetric(name, value, tags, resolvedCardinality) a.countsM.Unlock() return nil } -func (a *aggregator) gauge(name string, value float64, tags []string) error { - context := getContext(name, tags) +func (a *aggregator) gauge(name string, value float64, tags []string, cardinality Cardinality) error { + resolvedCardinality := resolveCardinality(cardinality) + context := getContext(name, tags, resolvedCardinality) a.gaugesM.RLock() if gauge, found := a.gauges[context]; found { gauge.sample(value) @@ -242,7 +258,7 @@ func (a *aggregator) gauge(name string, value float64, tags []string) error { } a.gaugesM.RUnlock() - gauge := newGaugeMetric(name, value, tags) + gauge := newGaugeMetric(name, value, tags, resolvedCardinality) a.gaugesM.Lock() // Check if another goroutines hasn't created the value betwen the 'RUnlock' and 'Lock' @@ -256,8 +272,9 @@ func (a *aggregator) gauge(name string, value float64, tags []string) error { return nil } -func (a *aggregator) set(name string, value string, tags []string) error { - context := getContext(name, tags) +func (a *aggregator) set(name string, value string, tags []string, cardinality Cardinality) error { + resolvedCardinality := resolveCardinality(cardinality) + context := getContext(name, tags, resolvedCardinality) a.setsM.RLock() if set, found := a.sets[context]; found { set.sample(value) @@ -273,7 +290,7 @@ func (a *aggregator) set(name string, value string, tags []string) error { a.setsM.Unlock() return nil } - a.sets[context] = newSetMetric(name, value, tags) + a.sets[context] = newSetMetric(name, value, tags, resolvedCardinality) a.setsM.Unlock() return nil } @@ -283,16 +300,16 @@ func (a *aggregator) set(name string, value string, tags []string) error { // sample rate will have impacts on the CPU and memory usage of the Agent. // type alias for Client.sendToAggregator -type bufferedMetricSampleFunc func(name string, value float64, tags []string, rate float64) error +type bufferedMetricSampleFunc func(name string, value float64, tags []string, rate float64, cardinality Cardinality) error -func (a *aggregator) histogram(name string, value float64, tags []string, rate float64) error { - return a.histograms.sample(name, value, tags, rate) +func (a *aggregator) histogram(name string, value float64, tags []string, rate float64, cardinality Cardinality) error { + return a.histograms.sample(name, value, tags, rate, cardinality) } -func (a *aggregator) distribution(name string, value float64, tags []string, rate float64) error { - return a.distributions.sample(name, value, tags, rate) +func (a *aggregator) distribution(name string, value float64, tags []string, rate float64, cardinality Cardinality) error { + return a.distributions.sample(name, value, tags, rate, cardinality) } -func (a *aggregator) timing(name string, value float64, tags []string, rate float64) error { - return a.timings.sample(name, value, tags, rate) +func (a *aggregator) timing(name string, value float64, tags []string, rate float64, cardinality Cardinality) error { + return a.timings.sample(name, value, tags, rate, cardinality) } diff --git a/vendor/github.com/DataDog/datadog-go/v5/statsd/buffer.go b/vendor/github.com/DataDog/datadog-go/v5/statsd/buffer.go index 91f2e32b9..ad05a398a 100644 --- a/vendor/github.com/DataDog/datadog-go/v5/statsd/buffer.go +++ b/vendor/github.com/DataDog/datadog-go/v5/statsd/buffer.go @@ -39,40 +39,44 @@ func newStatsdBuffer(maxSize, maxElements int) *statsdBuffer { } } -func (b *statsdBuffer) writeGauge(namespace string, globalTags []string, name string, value float64, tags []string, rate float64, timestamp int64) error { +func (b *statsdBuffer) writeGauge(namespace string, globalTags []string, name string, value float64, tags []string, rate float64, timestamp int64, originDetection bool, overrideCard Cardinality) error { if b.elementCount >= b.maxElements { return errBufferFull } originalBuffer := b.buffer - b.buffer = appendGauge(b.buffer, namespace, globalTags, name, value, tags, rate) + b.buffer = appendGauge(b.buffer, namespace, globalTags, name, value, tags, rate, originDetection) b.buffer = appendTimestamp(b.buffer, timestamp) + b.buffer = appendTagCardinality(b.buffer, overrideCard) b.writeSeparator() return b.validateNewElement(originalBuffer) } -func (b *statsdBuffer) writeCount(namespace string, globalTags []string, name string, value int64, tags []string, rate float64, timestamp int64) error { + +func (b *statsdBuffer) writeCount(namespace string, globalTags []string, name string, value int64, tags []string, rate float64, timestamp int64, originDetection bool, overrideCard Cardinality) error { if b.elementCount >= b.maxElements { return errBufferFull } originalBuffer := b.buffer - b.buffer = appendCount(b.buffer, namespace, globalTags, name, value, tags, rate) + b.buffer = appendCount(b.buffer, namespace, globalTags, name, value, tags, rate, originDetection) b.buffer = appendTimestamp(b.buffer, timestamp) + b.buffer = appendTagCardinality(b.buffer, overrideCard) b.writeSeparator() return b.validateNewElement(originalBuffer) } -func (b *statsdBuffer) writeHistogram(namespace string, globalTags []string, name string, value float64, tags []string, rate float64) error { +func (b *statsdBuffer) writeHistogram(namespace string, globalTags []string, name string, value float64, tags []string, rate float64, originDetection bool, overrideCard Cardinality) error { if b.elementCount >= b.maxElements { return errBufferFull } originalBuffer := b.buffer - b.buffer = appendHistogram(b.buffer, namespace, globalTags, name, value, tags, rate) + b.buffer = appendHistogram(b.buffer, namespace, globalTags, name, value, tags, rate, originDetection) + b.buffer = appendTagCardinality(b.buffer, overrideCard) b.writeSeparator() return b.validateNewElement(originalBuffer) } // writeAggregated serialized as many values as possible in the current buffer and return the position in values where it stopped. -func (b *statsdBuffer) writeAggregated(metricSymbol []byte, namespace string, globalTags []string, name string, values []float64, tags string, tagSize int, precision int, rate float64) (int, error) { +func (b *statsdBuffer) writeAggregated(metricSymbol []byte, namespace string, globalTags []string, name string, values []float64, tags string, tagSize int, precision int, rate float64, originDetection bool, overrideCard Cardinality) (int, error) { if b.elementCount >= b.maxElements { return 0, errBufferFull } @@ -115,6 +119,8 @@ func (b *statsdBuffer) writeAggregated(metricSymbol []byte, namespace string, gl b.buffer = appendRate(b.buffer, rate) b.buffer = appendTagsAggregated(b.buffer, globalTags, tags) b.buffer = appendContainerID(b.buffer) + b.buffer = appendExternalEnv(b.buffer, originDetection) + b.buffer = appendTagCardinality(b.buffer, overrideCard) b.writeSeparator() b.elementCount++ @@ -125,52 +131,57 @@ func (b *statsdBuffer) writeAggregated(metricSymbol []byte, namespace string, gl } -func (b *statsdBuffer) writeDistribution(namespace string, globalTags []string, name string, value float64, tags []string, rate float64) error { +func (b *statsdBuffer) writeDistribution(namespace string, globalTags []string, name string, value float64, tags []string, rate float64, originDetection bool, overrideCard Cardinality) error { if b.elementCount >= b.maxElements { return errBufferFull } originalBuffer := b.buffer - b.buffer = appendDistribution(b.buffer, namespace, globalTags, name, value, tags, rate) + b.buffer = appendDistribution(b.buffer, namespace, globalTags, name, value, tags, rate, originDetection) + b.buffer = appendTagCardinality(b.buffer, overrideCard) b.writeSeparator() return b.validateNewElement(originalBuffer) } -func (b *statsdBuffer) writeSet(namespace string, globalTags []string, name string, value string, tags []string, rate float64) error { +func (b *statsdBuffer) writeSet(namespace string, globalTags []string, name string, value string, tags []string, rate float64, originDetection bool, overrideCard Cardinality) error { if b.elementCount >= b.maxElements { return errBufferFull } originalBuffer := b.buffer - b.buffer = appendSet(b.buffer, namespace, globalTags, name, value, tags, rate) + b.buffer = appendSet(b.buffer, namespace, globalTags, name, value, tags, rate, originDetection) + b.buffer = appendTagCardinality(b.buffer, overrideCard) b.writeSeparator() return b.validateNewElement(originalBuffer) } -func (b *statsdBuffer) writeTiming(namespace string, globalTags []string, name string, value float64, tags []string, rate float64) error { +func (b *statsdBuffer) writeTiming(namespace string, globalTags []string, name string, value float64, tags []string, rate float64, originDetection bool, overrideCard Cardinality) error { if b.elementCount >= b.maxElements { return errBufferFull } originalBuffer := b.buffer - b.buffer = appendTiming(b.buffer, namespace, globalTags, name, value, tags, rate) + b.buffer = appendTiming(b.buffer, namespace, globalTags, name, value, tags, rate, originDetection) + b.buffer = appendTagCardinality(b.buffer, overrideCard) b.writeSeparator() return b.validateNewElement(originalBuffer) } -func (b *statsdBuffer) writeEvent(event *Event, globalTags []string) error { +func (b *statsdBuffer) writeEvent(event *Event, globalTags []string, originDetection bool, overrideCard Cardinality) error { if b.elementCount >= b.maxElements { return errBufferFull } originalBuffer := b.buffer - b.buffer = appendEvent(b.buffer, event, globalTags) + b.buffer = appendEvent(b.buffer, event, globalTags, originDetection) + b.buffer = appendTagCardinality(b.buffer, overrideCard) b.writeSeparator() return b.validateNewElement(originalBuffer) } -func (b *statsdBuffer) writeServiceCheck(serviceCheck *ServiceCheck, globalTags []string) error { +func (b *statsdBuffer) writeServiceCheck(serviceCheck *ServiceCheck, globalTags []string, originDetection bool, overrideCard Cardinality) error { if b.elementCount >= b.maxElements { return errBufferFull } originalBuffer := b.buffer - b.buffer = appendServiceCheck(b.buffer, serviceCheck, globalTags) + b.buffer = appendServiceCheck(b.buffer, serviceCheck, globalTags, originDetection) + b.buffer = appendTagCardinality(b.buffer, overrideCard) b.writeSeparator() return b.validateNewElement(originalBuffer) } diff --git a/vendor/github.com/DataDog/datadog-go/v5/statsd/buffered_metric_context.go b/vendor/github.com/DataDog/datadog-go/v5/statsd/buffered_metric_context.go index 94b31fe5b..9f28d9a52 100644 --- a/vendor/github.com/DataDog/datadog-go/v5/statsd/buffered_metric_context.go +++ b/vendor/github.com/DataDog/datadog-go/v5/statsd/buffered_metric_context.go @@ -14,7 +14,7 @@ type bufferedMetricContexts struct { nbContext uint64 mutex sync.RWMutex values bufferedMetricMap - newMetric func(string, float64, string, float64) *bufferedMetric + newMetric func(string, float64, string, float64, Cardinality) *bufferedMetric // Each bufferedMetricContexts uses its own random source and random // lock to prevent goroutines from contending for the lock on the @@ -25,11 +25,11 @@ type bufferedMetricContexts struct { randomLock sync.Mutex } -func newBufferedContexts(newMetric func(string, float64, string, int64, float64) *bufferedMetric, maxSamples int64) bufferedMetricContexts { +func newBufferedContexts(newMetric func(string, float64, string, int64, float64, Cardinality) *bufferedMetric, maxSamples int64) bufferedMetricContexts { return bufferedMetricContexts{ values: bufferedMetricMap{}, - newMetric: func(name string, value float64, stringTags string, rate float64) *bufferedMetric { - return newMetric(name, value, stringTags, maxSamples, rate) + newMetric: func(name string, value float64, stringTags string, rate float64, cardinality Cardinality) *bufferedMetric { + return newMetric(name, value, stringTags, maxSamples, rate, cardinality) }, // Note that calling "time.Now().UnixNano()" repeatedly quickly may return // very similar values. That's fine for seeding the worker-specific random @@ -54,7 +54,7 @@ func (bc *bufferedMetricContexts) flush(metrics []metric) []metric { return metrics } -func (bc *bufferedMetricContexts) sample(name string, value float64, tags []string, rate float64) error { +func (bc *bufferedMetricContexts) sample(name string, value float64, tags []string, rate float64, cardinality Cardinality) error { keepingSample := shouldSample(rate, bc.random, &bc.randomLock) // If we don't keep the sample, return early. If we do keep the sample @@ -67,7 +67,8 @@ func (bc *bufferedMetricContexts) sample(name string, value float64, tags []stri return nil } - context, stringTags := getContextAndTags(name, tags) + resolvedCardinality := resolveCardinality(cardinality) + context, stringTags := getContextAndTags(name, tags, resolvedCardinality) var v *bufferedMetric bc.mutex.RLock() @@ -81,7 +82,7 @@ func (bc *bufferedMetricContexts) sample(name string, value float64, tags []stri v, _ = bc.values[context] if v == nil { // If we might keep a sample that we should have skipped, but that should not drastically affect performances. - bc.values[context] = bc.newMetric(name, value, stringTags, rate) + bc.values[context] = bc.newMetric(name, value, stringTags, rate, resolvedCardinality) // We added a new value, we need to unlock the mutex and quit bc.mutex.Unlock() return nil diff --git a/vendor/github.com/DataDog/datadog-go/v5/statsd/external_env.go b/vendor/github.com/DataDog/datadog-go/v5/statsd/external_env.go new file mode 100644 index 000000000..7ea9a54d7 --- /dev/null +++ b/vendor/github.com/DataDog/datadog-go/v5/statsd/external_env.go @@ -0,0 +1,40 @@ +package statsd + +import ( + "os" + "unicode" +) + +// ddExternalEnvVarName specifies the env var to inject the environment name. +const ddExternalEnvVarName = "DD_EXTERNAL_ENV" + +var ( + externalEnv = "" +) + +// initExternalEnv initializes the external environment name. +func initExternalEnv() { + var value = os.Getenv(ddExternalEnvVarName) + if value != "" { + externalEnv = sanitizeExternalEnv(value) + } +} + +// sanitizeExternalEnv removes non-printable characters and pipe characters from the external environment name. +func sanitizeExternalEnv(externalEnv string) string { + if externalEnv == "" { + return "" + } + var output string + for _, r := range externalEnv { + if unicode.IsPrint(r) && r != '|' { + output += string(r) + } + } + + return output +} + +func getExternalEnv() string { + return externalEnv +} diff --git a/vendor/github.com/DataDog/datadog-go/v5/statsd/format.go b/vendor/github.com/DataDog/datadog-go/v5/statsd/format.go index f3ab9231f..089c373f3 100644 --- a/vendor/github.com/DataDog/datadog-go/v5/statsd/format.go +++ b/vendor/github.com/DataDog/datadog-go/v5/statsd/format.go @@ -6,14 +6,18 @@ import ( ) var ( - gaugeSymbol = []byte("g") - countSymbol = []byte("c") - histogramSymbol = []byte("h") - distributionSymbol = []byte("d") - setSymbol = []byte("s") - timingSymbol = []byte("ms") + gaugeSymbol = []byte("g") + countSymbol = []byte("c") + histogramSymbol = []byte("h") + distributionSymbol = []byte("d") + setSymbol = []byte("s") + timingSymbol = []byte("ms") +) + +const ( tagSeparatorSymbol = "," nameSeparatorSymbol = ":" + cardSeparatorSymbol = "|" ) func appendHeader(buffer []byte, namespace string, name string) []byte { @@ -95,7 +99,7 @@ func appendTagsAggregated(buffer []byte, globalTags []string, tags string) []byt return buffer } -func appendFloatMetric(buffer []byte, typeSymbol []byte, namespace string, globalTags []string, name string, value float64, tags []string, rate float64, precision int) []byte { +func appendFloatMetric(buffer []byte, typeSymbol []byte, namespace string, globalTags []string, name string, value float64, tags []string, rate float64, precision int, originDetection bool) []byte { buffer = appendHeader(buffer, namespace, name) buffer = strconv.AppendFloat(buffer, value, 'f', precision, 64) buffer = append(buffer, '|') @@ -103,10 +107,11 @@ func appendFloatMetric(buffer []byte, typeSymbol []byte, namespace string, globa buffer = appendRate(buffer, rate) buffer = appendTags(buffer, globalTags, tags) buffer = appendContainerID(buffer) + buffer = appendExternalEnv(buffer, originDetection) return buffer } -func appendIntegerMetric(buffer []byte, typeSymbol []byte, namespace string, globalTags []string, name string, value int64, tags []string, rate float64) []byte { +func appendIntegerMetric(buffer []byte, typeSymbol []byte, namespace string, globalTags []string, name string, value int64, tags []string, rate float64, originDetection bool) []byte { buffer = appendHeader(buffer, namespace, name) buffer = strconv.AppendInt(buffer, value, 10) buffer = append(buffer, '|') @@ -114,10 +119,11 @@ func appendIntegerMetric(buffer []byte, typeSymbol []byte, namespace string, glo buffer = appendRate(buffer, rate) buffer = appendTags(buffer, globalTags, tags) buffer = appendContainerID(buffer) + buffer = appendExternalEnv(buffer, originDetection) return buffer } -func appendStringMetric(buffer []byte, typeSymbol []byte, namespace string, globalTags []string, name string, value string, tags []string, rate float64) []byte { +func appendStringMetric(buffer []byte, typeSymbol []byte, namespace string, globalTags []string, name string, value string, tags []string, rate float64, originDetection bool) []byte { buffer = appendHeader(buffer, namespace, name) buffer = append(buffer, value...) buffer = append(buffer, '|') @@ -125,31 +131,32 @@ func appendStringMetric(buffer []byte, typeSymbol []byte, namespace string, glob buffer = appendRate(buffer, rate) buffer = appendTags(buffer, globalTags, tags) buffer = appendContainerID(buffer) + buffer = appendExternalEnv(buffer, originDetection) return buffer } -func appendGauge(buffer []byte, namespace string, globalTags []string, name string, value float64, tags []string, rate float64) []byte { - return appendFloatMetric(buffer, gaugeSymbol, namespace, globalTags, name, value, tags, rate, -1) +func appendGauge(buffer []byte, namespace string, globalTags []string, name string, value float64, tags []string, rate float64, originDetection bool) []byte { + return appendFloatMetric(buffer, gaugeSymbol, namespace, globalTags, name, value, tags, rate, -1, originDetection) } -func appendCount(buffer []byte, namespace string, globalTags []string, name string, value int64, tags []string, rate float64) []byte { - return appendIntegerMetric(buffer, countSymbol, namespace, globalTags, name, value, tags, rate) +func appendCount(buffer []byte, namespace string, globalTags []string, name string, value int64, tags []string, rate float64, originDetection bool) []byte { + return appendIntegerMetric(buffer, countSymbol, namespace, globalTags, name, value, tags, rate, originDetection) } -func appendHistogram(buffer []byte, namespace string, globalTags []string, name string, value float64, tags []string, rate float64) []byte { - return appendFloatMetric(buffer, histogramSymbol, namespace, globalTags, name, value, tags, rate, -1) +func appendHistogram(buffer []byte, namespace string, globalTags []string, name string, value float64, tags []string, rate float64, originDetection bool) []byte { + return appendFloatMetric(buffer, histogramSymbol, namespace, globalTags, name, value, tags, rate, -1, originDetection) } -func appendDistribution(buffer []byte, namespace string, globalTags []string, name string, value float64, tags []string, rate float64) []byte { - return appendFloatMetric(buffer, distributionSymbol, namespace, globalTags, name, value, tags, rate, -1) +func appendDistribution(buffer []byte, namespace string, globalTags []string, name string, value float64, tags []string, rate float64, originDetection bool) []byte { + return appendFloatMetric(buffer, distributionSymbol, namespace, globalTags, name, value, tags, rate, -1, originDetection) } -func appendSet(buffer []byte, namespace string, globalTags []string, name string, value string, tags []string, rate float64) []byte { - return appendStringMetric(buffer, setSymbol, namespace, globalTags, name, value, tags, rate) +func appendSet(buffer []byte, namespace string, globalTags []string, name string, value string, tags []string, rate float64, originDetection bool) []byte { + return appendStringMetric(buffer, setSymbol, namespace, globalTags, name, value, tags, rate, originDetection) } -func appendTiming(buffer []byte, namespace string, globalTags []string, name string, value float64, tags []string, rate float64) []byte { - return appendFloatMetric(buffer, timingSymbol, namespace, globalTags, name, value, tags, rate, 6) +func appendTiming(buffer []byte, namespace string, globalTags []string, name string, value float64, tags []string, rate float64, originDetection bool) []byte { + return appendFloatMetric(buffer, timingSymbol, namespace, globalTags, name, value, tags, rate, 6, originDetection) } func escapedEventTextLen(text string) int { @@ -167,7 +174,7 @@ func appendEscapedEventText(buffer []byte, text string) []byte { return buffer } -func appendEvent(buffer []byte, event *Event, globalTags []string) []byte { +func appendEvent(buffer []byte, event *Event, globalTags []string, originDetection bool) []byte { escapedTextLen := escapedEventTextLen(event.Text) buffer = append(buffer, "_e{"...) @@ -215,6 +222,7 @@ func appendEvent(buffer []byte, event *Event, globalTags []string) []byte { buffer = appendTags(buffer, globalTags, event.Tags) buffer = appendContainerID(buffer) + buffer = appendExternalEnv(buffer, originDetection) return buffer } @@ -232,7 +240,7 @@ func appendEscapedServiceCheckText(buffer []byte, text string) []byte { return buffer } -func appendServiceCheck(buffer []byte, serviceCheck *ServiceCheck, globalTags []string) []byte { +func appendServiceCheck(buffer []byte, serviceCheck *ServiceCheck, globalTags []string, originDetection bool) []byte { buffer = append(buffer, "_sc|"...) buffer = append(buffer, serviceCheck.Name...) buffer = append(buffer, '|') @@ -256,6 +264,7 @@ func appendServiceCheck(buffer []byte, serviceCheck *ServiceCheck, globalTags [] } buffer = appendContainerID(buffer) + buffer = appendExternalEnv(buffer, originDetection) return buffer } @@ -278,3 +287,22 @@ func appendTimestamp(buffer []byte, timestamp int64) []byte { } return buffer } + +func appendExternalEnv(buffer []byte, originDetection bool) []byte { + if externalEnv := getExternalEnv(); externalEnv != "" && originDetection { + buffer = append(buffer, "|e:"...) + buffer = append(buffer, externalEnv...) + } + return buffer +} + +func appendTagCardinality(buffer []byte, overrideCard Cardinality) []byte { + // Check if the user has provided a valid cardinality parameter. If not, use the global setting. + cardString := resolveCardinality(overrideCard).String() + + if cardString != "" { + buffer = append(buffer, "|card:"...) + buffer = append(buffer, cardString...) + } + return buffer +} diff --git a/vendor/github.com/DataDog/datadog-go/v5/statsd/metrics.go b/vendor/github.com/DataDog/datadog-go/v5/statsd/metrics.go index 3d243b7a6..b59de473d 100644 --- a/vendor/github.com/DataDog/datadog-go/v5/statsd/metrics.go +++ b/vendor/github.com/DataDog/datadog-go/v5/statsd/metrics.go @@ -15,16 +15,18 @@ Those are metrics type that can be aggregated on the client side: */ type countMetric struct { - value int64 - name string - tags []string + value int64 + name string + tags []string + overrideCard Cardinality } -func newCountMetric(name string, value int64, tags []string) *countMetric { +func newCountMetric(name string, value int64, tags []string, cardinality Cardinality) *countMetric { return &countMetric{ - value: value, - name: name, - tags: copySlice(tags), + value: value, + name: name, + tags: copySlice(tags), + overrideCard: cardinality, } } @@ -34,27 +36,30 @@ func (c *countMetric) sample(v int64) { func (c *countMetric) flushUnsafe() metric { return metric{ - metricType: count, - name: c.name, - tags: c.tags, - rate: 1, - ivalue: c.value, + metricType: count, + name: c.name, + tags: c.tags, + rate: 1, + ivalue: c.value, + overrideCard: c.overrideCard, } } // Gauge type gaugeMetric struct { - value uint64 - name string - tags []string + value uint64 + name string + tags []string + overrideCard Cardinality } -func newGaugeMetric(name string, value float64, tags []string) *gaugeMetric { +func newGaugeMetric(name string, value float64, tags []string, cardinality Cardinality) *gaugeMetric { return &gaugeMetric{ - value: math.Float64bits(value), - name: name, - tags: copySlice(tags), + value: math.Float64bits(value), + name: name, + tags: copySlice(tags), + overrideCard: cardinality, } } @@ -64,28 +69,31 @@ func (g *gaugeMetric) sample(v float64) { func (g *gaugeMetric) flushUnsafe() metric { return metric{ - metricType: gauge, - name: g.name, - tags: g.tags, - rate: 1, - fvalue: math.Float64frombits(g.value), + metricType: gauge, + name: g.name, + tags: g.tags, + rate: 1, + fvalue: math.Float64frombits(g.value), + overrideCard: g.overrideCard, } } // Set type setMetric struct { - data map[string]struct{} - name string - tags []string + data map[string]struct{} + name string + tags []string + overrideCard Cardinality sync.Mutex } -func newSetMetric(name string, value string, tags []string) *setMetric { +func newSetMetric(name string, value string, tags []string, cardinality Cardinality) *setMetric { set := &setMetric{ - data: map[string]struct{}{}, - name: name, - tags: copySlice(tags), + data: map[string]struct{}{}, + name: name, + tags: copySlice(tags), + overrideCard: cardinality, } set.data[value] = struct{}{} return set @@ -108,11 +116,12 @@ func (s *setMetric) flushUnsafe() []metric { i := 0 for value := range s.data { metrics[i] = metric{ - metricType: set, - name: s.name, - tags: s.tags, - rate: 1, - svalue: value, + metricType: set, + name: s.name, + tags: s.tags, + rate: 1, + svalue: value, + overrideCard: s.overrideCard, } i++ } @@ -144,6 +153,8 @@ type bufferedMetric struct { // The first observed user-specified sample rate. When specified // it is used because we don't know better. specifiedRate float64 + + overrideCard Cardinality } func (s *bufferedMetric) sample(v float64) { @@ -201,17 +212,18 @@ func (s *bufferedMetric) flushUnsafe() metric { } return metric{ - metricType: s.mtype, - name: s.name, - stags: s.tags, - rate: rate, - fvalues: s.data[:s.storedSamples], + metricType: s.mtype, + name: s.name, + stags: s.tags, + rate: rate, + fvalues: s.data[:s.storedSamples], + overrideCard: resolveCardinality(s.overrideCard), } } type histogramMetric = bufferedMetric -func newHistogramMetric(name string, value float64, stringTags string, maxSamples int64, rate float64) *histogramMetric { +func newHistogramMetric(name string, value float64, stringTags string, maxSamples int64, rate float64, cardinality Cardinality) *histogramMetric { return &histogramMetric{ data: newData(value, maxSamples), totalSamples: 1, @@ -221,12 +233,13 @@ func newHistogramMetric(name string, value float64, stringTags string, maxSample mtype: histogramAggregated, maxSamples: maxSamples, specifiedRate: rate, + overrideCard: resolveCardinality(cardinality), } } type distributionMetric = bufferedMetric -func newDistributionMetric(name string, value float64, stringTags string, maxSamples int64, rate float64) *distributionMetric { +func newDistributionMetric(name string, value float64, stringTags string, maxSamples int64, rate float64, cardinality Cardinality) *distributionMetric { return &distributionMetric{ data: newData(value, maxSamples), totalSamples: 1, @@ -236,12 +249,13 @@ func newDistributionMetric(name string, value float64, stringTags string, maxSam mtype: distributionAggregated, maxSamples: maxSamples, specifiedRate: rate, + overrideCard: resolveCardinality(cardinality), } } type timingMetric = bufferedMetric -func newTimingMetric(name string, value float64, stringTags string, maxSamples int64, rate float64) *timingMetric { +func newTimingMetric(name string, value float64, stringTags string, maxSamples int64, rate float64, cardinality Cardinality) *timingMetric { return &timingMetric{ data: newData(value, maxSamples), totalSamples: 1, @@ -251,6 +265,7 @@ func newTimingMetric(name string, value float64, stringTags string, maxSamples i mtype: timingAggregated, maxSamples: maxSamples, specifiedRate: rate, + overrideCard: resolveCardinality(cardinality), } } diff --git a/vendor/github.com/DataDog/datadog-go/v5/statsd/options.go b/vendor/github.com/DataDog/datadog-go/v5/statsd/options.go index e007505a6..fd72bc0aa 100644 --- a/vendor/github.com/DataDog/datadog-go/v5/statsd/options.go +++ b/vendor/github.com/DataDog/datadog-go/v5/statsd/options.go @@ -28,6 +28,7 @@ var ( defaultOriginDetection = true defaultChannelModeErrorsWhenFull = false defaultErrorHandler = func(error) {} + defaultTagCardinality = CardinalityNotSet ) // Options contains the configuration options for a client. @@ -54,6 +55,7 @@ type Options struct { containerID string channelModeErrorsWhenFull bool errorHandler ErrorHandler + tagCardinality Cardinality } func resolveOptions(options []Option) (*Options, error) { @@ -78,6 +80,7 @@ func resolveOptions(options []Option) (*Options, error) { originDetection: defaultOriginDetection, channelModeErrorsWhenFull: defaultChannelModeErrorsWhenFull, errorHandler: defaultErrorHandler, + tagCardinality: defaultTagCardinality, } for _, option := range options { @@ -412,3 +415,11 @@ func WithContainerID(id string) Option { return nil } } + +// WithCardinality sets the tag cardinality of the metric. +func WithCardinality(card Cardinality) Option { + return func(o *Options) error { + o.tagCardinality = card + return nil + } +} diff --git a/vendor/github.com/DataDog/datadog-go/v5/statsd/statsd.go b/vendor/github.com/DataDog/datadog-go/v5/statsd/statsd.go index c0137b523..1f09ec79a 100644 --- a/vendor/github.com/DataDog/datadog-go/v5/statsd/statsd.go +++ b/vendor/github.com/DataDog/datadog-go/v5/statsd/statsd.go @@ -13,178 +13,10 @@ package statsd //go:generate mockgen -source=statsd.go -destination=mocks/statsd.go import ( - "errors" - "fmt" "io" - "net/url" - "os" - "strconv" - "strings" - "sync" - "sync/atomic" "time" ) -/* -OptimalUDPPayloadSize defines the optimal payload size for a UDP datagram, 1432 bytes -is optimal for regular networks with an MTU of 1500 so datagrams don't get -fragmented. It's generally recommended not to fragment UDP datagrams as losing -a single fragment will cause the entire datagram to be lost. -*/ -const OptimalUDPPayloadSize = 1432 - -/* -MaxUDPPayloadSize defines the maximum payload size for a UDP datagram. -Its value comes from the calculation: 65535 bytes Max UDP datagram size - -8byte UDP header - 60byte max IP headers -any number greater than that will see frames being cut out. -*/ -const MaxUDPPayloadSize = 65467 - -// DefaultUDPBufferPoolSize is the default size of the buffer pool for UDP clients. -const DefaultUDPBufferPoolSize = 2048 - -// DefaultUDSBufferPoolSize is the default size of the buffer pool for UDS clients. -const DefaultUDSBufferPoolSize = 512 - -/* -DefaultMaxAgentPayloadSize is the default maximum payload size the agent -can receive. This can be adjusted by changing dogstatsd_buffer_size in the -agent configuration file datadog.yaml. This is also used as the optimal payload size -for UDS datagrams. -*/ -const DefaultMaxAgentPayloadSize = 8192 - -/* -UnixAddressPrefix holds the prefix to use to enable Unix Domain Socket -traffic instead of UDP. The type of the socket will be guessed. -*/ -const UnixAddressPrefix = "unix://" - -/* -UnixDatagramAddressPrefix holds the prefix to use to enable Unix Domain Socket -datagram traffic instead of UDP. -*/ -const UnixAddressDatagramPrefix = "unixgram://" - -/* -UnixAddressStreamPrefix holds the prefix to use to enable Unix Domain Socket -stream traffic instead of UDP. -*/ -const UnixAddressStreamPrefix = "unixstream://" - -/* -WindowsPipeAddressPrefix holds the prefix to use to enable Windows Named Pipes -traffic instead of UDP. -*/ -const WindowsPipeAddressPrefix = `\\.\pipe\` - -var ( - AddressPrefixes = []string{UnixAddressPrefix, UnixAddressDatagramPrefix, UnixAddressStreamPrefix, WindowsPipeAddressPrefix} -) - -const ( - agentHostEnvVarName = "DD_AGENT_HOST" - agentPortEnvVarName = "DD_DOGSTATSD_PORT" - agentURLEnvVarName = "DD_DOGSTATSD_URL" - defaultUDPPort = "8125" -) - -const ( - // ddEntityID specifies client-side user-specified entity ID injection. - // This env var can be set to the Pod UID on Kubernetes via the downward API. - // Docs: https://docs.datadoghq.com/developers/dogstatsd/?tab=kubernetes#origin-detection-over-udp - ddEntityID = "DD_ENTITY_ID" - - // ddEntityIDTag specifies the tag name for the client-side entity ID injection - // The Agent expects this tag to contain a non-prefixed Kubernetes Pod UID. - ddEntityIDTag = "dd.internal.entity_id" - - // originDetectionEnabled specifies the env var to enable/disable sending the container ID field. - originDetectionEnabled = "DD_ORIGIN_DETECTION_ENABLED" -) - -/* -ddEnvTagsMapping is a mapping of each "DD_" prefixed environment variable -to a specific tag name. We use a slice to keep the order and simplify tests. -*/ -var ddEnvTagsMapping = []struct{ envName, tagName string }{ - {ddEntityID, ddEntityIDTag}, // Client-side entity ID injection for container tagging. - {"DD_ENV", "env"}, // The name of the env in which the service runs. - {"DD_SERVICE", "service"}, // The name of the running service. - {"DD_VERSION", "version"}, // The current version of the running service. -} - -type metricType int - -const ( - gauge metricType = iota - count - histogram - histogramAggregated - distribution - distributionAggregated - set - timing - timingAggregated - event - serviceCheck -) - -type receivingMode int - -const ( - mutexMode receivingMode = iota - channelMode -) - -const ( - writerNameUDP string = "udp" - writerNameUDS string = "uds" - writerNameUDSStream string = "uds-stream" - writerWindowsPipe string = "pipe" - writerNameCustom string = "custom" -) - -// noTimestamp is used as a value for metric without a given timestamp. -const noTimestamp = int64(0) - -type metric struct { - metricType metricType - namespace string - globalTags []string - name string - fvalue float64 - fvalues []float64 - ivalue int64 - svalue string - evalue *Event - scvalue *ServiceCheck - tags []string - stags string - rate float64 - timestamp int64 -} - -type noClientErr string - -// ErrNoClient is returned if statsd reporting methods are invoked on -// a nil client. -const ErrNoClient = noClientErr("statsd client is nil") - -func (e noClientErr) Error() string { - return string(e) -} - -type invalidTimestampErr string - -// InvalidTimestamp is returned if a provided timestamp is invalid. -const InvalidTimestamp = invalidTimestampErr("invalid timestamp") - -func (e invalidTimestampErr) Error() string { - return string(e) -} - // ClientInterface is an interface that exposes the common client functions for the // purpose of being able to provide a no-op client or even mocking. This can aid // downstream users' with their testing. @@ -261,175 +93,40 @@ type ClientInterface interface { GetTelemetry() Telemetry } -type ErrorHandler func(error) - // A Client is a handle for sending messages to dogstatsd. It is safe to // use one Client from multiple goroutines simultaneously. type Client struct { - // Sender handles the underlying networking protocol - sender *sender - // namespace to prepend to all statsd calls - namespace string - // tags are global tags to be added to every statsd call - tags []string - flushTime time.Duration - telemetry *statsdTelemetry - telemetryClient *telemetryClient - stop chan struct{} - wg sync.WaitGroup - workers []*worker - closerLock sync.Mutex - workersMode receivingMode - aggregatorMode receivingMode - agg *aggregator - aggExtended *aggregator - options []Option - addrOption string - isClosed bool - errorOnBlockedChannel bool - errorHandler ErrorHandler -} - -// statsdTelemetry contains telemetry metrics about the client -type statsdTelemetry struct { - totalMetricsGauge uint64 - totalMetricsCount uint64 - totalMetricsHistogram uint64 - totalMetricsDistribution uint64 - totalMetricsSet uint64 - totalMetricsTiming uint64 - totalEvents uint64 - totalServiceChecks uint64 - totalDroppedOnReceive uint64 + clientEx *ClientEx } // Verify that Client implements the ClientInterface. // https://golang.org/doc/faq#guarantee_satisfies_interface var _ ClientInterface = &Client{} -func resolveAddr(addr string) string { - envPort := "" - - if addr == "" { - addr = os.Getenv(agentHostEnvVarName) - envPort = os.Getenv(agentPortEnvVarName) - agentURL, _ := os.LookupEnv(agentURLEnvVarName) - agentURL = parseAgentURL(agentURL) - - // agentURLEnvVarName has priority over agentHostEnvVarName - if agentURL != "" { - return agentURL - } - } - - if addr == "" { - return "" - } - - for _, prefix := range AddressPrefixes { - if strings.HasPrefix(addr, prefix) { - return addr - } - } - // TODO: How does this work for IPv6? - if strings.Contains(addr, ":") { - return addr - } - if envPort != "" { - addr = fmt.Sprintf("%s:%s", addr, envPort) - } else { - addr = fmt.Sprintf("%s:%s", addr, defaultUDPPort) - } - return addr -} - -func parseAgentURL(agentURL string) string { - if agentURL != "" { - if strings.HasPrefix(agentURL, WindowsPipeAddressPrefix) { - return agentURL - } - - parsedURL, err := url.Parse(agentURL) - if err != nil { - return "" - } - - if parsedURL.Scheme == "udp" { - if strings.Contains(parsedURL.Host, ":") { - return parsedURL.Host - } - return fmt.Sprintf("%s:%s", parsedURL.Host, defaultUDPPort) - } - - if parsedURL.Scheme == "unix" { - return agentURL - } - } - return "" -} - -func createWriter(addr string, writeTimeout time.Duration, connectTimeout time.Duration) (Transport, string, error) { - if addr == "" { - return nil, "", errors.New("No address passed and autodetection from environment failed") - } - - switch { - case strings.HasPrefix(addr, WindowsPipeAddressPrefix): - w, err := newWindowsPipeWriter(addr, writeTimeout) - return w, writerWindowsPipe, err - case strings.HasPrefix(addr, UnixAddressPrefix): - w, err := newUDSWriter(addr[len(UnixAddressPrefix):], writeTimeout, connectTimeout, "") - return w, writerNameUDS, err - case strings.HasPrefix(addr, UnixAddressDatagramPrefix): - w, err := newUDSWriter(addr[len(UnixAddressDatagramPrefix):], writeTimeout, connectTimeout, "unixgram") - return w, writerNameUDS, err - case strings.HasPrefix(addr, UnixAddressStreamPrefix): - w, err := newUDSWriter(addr[len(UnixAddressStreamPrefix):], writeTimeout, connectTimeout, "unix") - return w, writerNameUDS, err - default: - w, err := newUDPWriter(addr, writeTimeout) - return w, writerNameUDP, err - } -} - // New returns a pointer to a new Client given an addr in the format "hostname:port" for UDP, // "unix:///path/to/socket" for UDS or "\\.\pipe\path\to\pipe" for Windows Named Pipes. func New(addr string, options ...Option) (*Client, error) { - o, err := resolveOptions(options) + clientEx, err := NewEx(addr, options...) if err != nil { return nil, err } - addr = resolveAddr(addr) - w, writerType, err := createWriter(addr, o.writeTimeout, o.connectTimeout) - if err != nil { - return nil, err - } - - client, err := newWithWriter(w, o, writerType) - if err == nil { - client.options = append(client.options, options...) - client.addrOption = addr - } - return client, err -} - -type customWriter struct { - io.WriteCloser -} - -func (w *customWriter) GetTransportName() string { - return writerNameCustom + return &Client{ + clientEx: clientEx, + }, nil } // NewWithWriter creates a new Client with given writer. Writer is a // io.WriteCloser func NewWithWriter(w io.WriteCloser, options ...Option) (*Client, error) { - o, err := resolveOptions(options) + clientEx, err := NewWithWriterEx(w, options...) if err != nil { return nil, err } - return newWithWriter(&customWriter{w}, o, writerNameCustom) + + return &Client{ + clientEx: clientEx, + }, nil } // CloneWithExtraOptions create a new Client with extra options @@ -438,128 +135,14 @@ func CloneWithExtraOptions(c *Client, options ...Option) (*Client, error) { return nil, ErrNoClient } - if c.addrOption == "" { - return nil, fmt.Errorf("can't clone client with no addrOption") - } - opt := append(c.options, options...) - return New(c.addrOption, opt...) -} - -func newWithWriter(w Transport, o *Options, writerName string) (*Client, error) { - c := Client{ - namespace: o.namespace, - tags: o.tags, - telemetry: &statsdTelemetry{}, - errorOnBlockedChannel: o.channelModeErrorsWhenFull, - errorHandler: o.errorHandler, - } - - // Inject values of DD_* environment variables as global tags. - for _, mapping := range ddEnvTagsMapping { - if value := os.Getenv(mapping.envName); value != "" { - c.tags = append(c.tags, fmt.Sprintf("%s:%s", mapping.tagName, value)) - } - } - - initContainerID(o.containerID, isOriginDetectionEnabled(o), isHostCgroupNamespace()) - isUDS := writerName == writerNameUDS - - if o.maxBytesPerPayload == 0 { - if isUDS { - o.maxBytesPerPayload = DefaultMaxAgentPayloadSize - } else { - o.maxBytesPerPayload = OptimalUDPPayloadSize - } - } - if o.bufferPoolSize == 0 { - if isUDS { - o.bufferPoolSize = DefaultUDSBufferPoolSize - } else { - o.bufferPoolSize = DefaultUDPBufferPoolSize - } - } - if o.senderQueueSize == 0 { - if isUDS { - o.senderQueueSize = DefaultUDSBufferPoolSize - } else { - o.senderQueueSize = DefaultUDPBufferPoolSize - } - } - - bufferPool := newBufferPool(o.bufferPoolSize, o.maxBytesPerPayload, o.maxMessagesPerPayload) - c.sender = newSender(w, o.senderQueueSize, bufferPool, o.errorHandler) - c.aggregatorMode = o.receiveMode - - c.workersMode = o.receiveMode - // channelMode mode at the worker level is not enabled when - // ExtendedAggregation is since the user app will not directly - // use the worker (the aggregator sit between the app and the - // workers). - if o.extendedAggregation { - c.workersMode = mutexMode - } - - if o.aggregation || o.extendedAggregation || o.maxBufferedSamplesPerContext > 0 { - c.agg = newAggregator(&c, int64(o.maxBufferedSamplesPerContext)) - c.agg.start(o.aggregationFlushInterval) - - if o.extendedAggregation { - c.aggExtended = c.agg - - if c.aggregatorMode == channelMode { - c.agg.startReceivingMetric(o.channelModeBufferSize, o.workersCount) - } - } - } - - for i := 0; i < o.workersCount; i++ { - w := newWorker(bufferPool, c.sender) - c.workers = append(c.workers, w) - - if c.workersMode == channelMode { - w.startReceivingMetric(o.channelModeBufferSize) - } - } - - c.flushTime = o.bufferFlushInterval - c.stop = make(chan struct{}, 1) - - c.wg.Add(1) - go func() { - defer c.wg.Done() - c.watch() - }() - - if o.telemetry { - if o.telemetryAddr == "" { - c.telemetryClient = newTelemetryClient(&c, c.agg != nil) - } else { - var err error - c.telemetryClient, err = newTelemetryClientWithCustomAddr(&c, o.telemetryAddr, c.agg != nil, bufferPool, o.writeTimeout, o.connectTimeout) - if err != nil { - return nil, err - } - } - c.telemetryClient.run(&c.wg, c.stop) + clientEx, err := CloneWithExtraOptionsEx(c.clientEx, options...) + if err != nil { + return nil, err } - return &c, nil -} - -func (c *Client) watch() { - ticker := time.NewTicker(c.flushTime) - - for { - select { - case <-ticker.C: - for _, w := range c.workers { - w.flush() - } - case <-c.stop: - ticker.Stop() - return - } - } + return &Client{ + clientEx: clientEx, + }, nil } // Flush forces a flush of all the queued dogstatsd payloads This method is @@ -570,112 +153,22 @@ func (c *Client) Flush() error { if c == nil { return ErrNoClient } - if c.agg != nil { - c.agg.flush() - } - for _, w := range c.workers { - w.pause() - defer w.unpause() - w.flushUnsafe() - } - // Now that the worker are pause the sender can flush the queue between - // worker and senders - c.sender.flush() - return nil + return c.clientEx.Flush() } // IsClosed returns if the client has been closed. func (c *Client) IsClosed() bool { - c.closerLock.Lock() - defer c.closerLock.Unlock() - return c.isClosed -} - -func (c *Client) flushTelemetryMetrics(t *Telemetry) { - t.TotalMetricsGauge = atomic.LoadUint64(&c.telemetry.totalMetricsGauge) - t.TotalMetricsCount = atomic.LoadUint64(&c.telemetry.totalMetricsCount) - t.TotalMetricsSet = atomic.LoadUint64(&c.telemetry.totalMetricsSet) - t.TotalMetricsHistogram = atomic.LoadUint64(&c.telemetry.totalMetricsHistogram) - t.TotalMetricsDistribution = atomic.LoadUint64(&c.telemetry.totalMetricsDistribution) - t.TotalMetricsTiming = atomic.LoadUint64(&c.telemetry.totalMetricsTiming) - t.TotalEvents = atomic.LoadUint64(&c.telemetry.totalEvents) - t.TotalServiceChecks = atomic.LoadUint64(&c.telemetry.totalServiceChecks) - t.TotalDroppedOnReceive = atomic.LoadUint64(&c.telemetry.totalDroppedOnReceive) + return c.clientEx.IsClosed() } // GetTelemetry return the telemetry metrics for the client since it started. func (c *Client) GetTelemetry() Telemetry { - return c.telemetryClient.getTelemetry() + return c.clientEx.GetTelemetry() } // GetTransport return the name of the transport used. func (c *Client) GetTransport() string { - if c.sender == nil { - return "" - } - return c.sender.getTransportName() -} - -type ErrorInputChannelFull struct { - Metric metric - ChannelSize int - Msg string -} - -func (e ErrorInputChannelFull) Error() string { - return e.Msg -} - -func (c *Client) send(m metric) error { - h := hashString32(m.name) - worker := c.workers[h%uint32(len(c.workers))] - - if c.workersMode == channelMode { - select { - case worker.inputMetrics <- m: - default: - atomic.AddUint64(&c.telemetry.totalDroppedOnReceive, 1) - err := &ErrorInputChannelFull{m, len(worker.inputMetrics), "Worker input channel full"} - if c.errorHandler != nil { - c.errorHandler(err) - } - if c.errorOnBlockedChannel { - return err - } - } - return nil - } - return worker.processMetric(m) -} - -// sendBlocking is used by the aggregator to inject aggregated metrics. -func (c *Client) sendBlocking(m metric) error { - m.globalTags = c.tags - m.namespace = c.namespace - - h := hashString32(m.name) - worker := c.workers[h%uint32(len(c.workers))] - return worker.processMetric(m) -} - -func (c *Client) sendToAggregator(mType metricType, name string, value float64, tags []string, rate float64, f bufferedMetricSampleFunc) error { - if c.aggregatorMode == channelMode { - m := metric{metricType: mType, name: name, fvalue: value, tags: tags, rate: rate} - select { - case c.aggExtended.inputMetrics <- m: - default: - atomic.AddUint64(&c.telemetry.totalDroppedOnReceive, 1) - err := &ErrorInputChannelFull{m, len(c.aggExtended.inputMetrics), "Aggregator input channel full"} - if c.errorHandler != nil { - c.errorHandler(err) - } - if c.errorOnBlockedChannel { - return err - } - } - return nil - } - return f(name, value, tags, rate) + return c.clientEx.GetTransport() } // Gauge measures the value of a metric at a particular time. @@ -683,11 +176,7 @@ func (c *Client) Gauge(name string, value float64, tags []string, rate float64) if c == nil { return ErrNoClient } - atomic.AddUint64(&c.telemetry.totalMetricsGauge, 1) - if c.agg != nil { - return c.agg.gauge(name, value, tags) - } - return c.send(metric{metricType: gauge, name: name, fvalue: value, tags: tags, rate: rate, globalTags: c.tags, namespace: c.namespace}) + return c.clientEx.Gauge(name, value, tags, rate) } // GaugeWithTimestamp measures the value of a metric at a given time. @@ -700,13 +189,7 @@ func (c *Client) GaugeWithTimestamp(name string, value float64, tags []string, r if c == nil { return ErrNoClient } - - if timestamp.IsZero() || timestamp.Unix() <= noTimestamp { - return InvalidTimestamp - } - - atomic.AddUint64(&c.telemetry.totalMetricsGauge, 1) - return c.send(metric{metricType: gauge, name: name, fvalue: value, tags: tags, rate: rate, globalTags: c.tags, namespace: c.namespace, timestamp: timestamp.Unix()}) + return c.clientEx.GaugeWithTimestamp(name, value, tags, rate, timestamp) } // Count tracks how many times something happened per second. @@ -714,11 +197,7 @@ func (c *Client) Count(name string, value int64, tags []string, rate float64) er if c == nil { return ErrNoClient } - atomic.AddUint64(&c.telemetry.totalMetricsCount, 1) - if c.agg != nil { - return c.agg.count(name, value, tags) - } - return c.send(metric{metricType: count, name: name, ivalue: value, tags: tags, rate: rate, globalTags: c.tags, namespace: c.namespace}) + return c.clientEx.Count(name, value, tags, rate) } // CountWithTimestamp tracks how many times something happened at the given second. @@ -731,13 +210,7 @@ func (c *Client) CountWithTimestamp(name string, value int64, tags []string, rat if c == nil { return ErrNoClient } - - if timestamp.IsZero() || timestamp.Unix() <= noTimestamp { - return InvalidTimestamp - } - - atomic.AddUint64(&c.telemetry.totalMetricsCount, 1) - return c.send(metric{metricType: count, name: name, ivalue: value, tags: tags, rate: rate, globalTags: c.tags, namespace: c.namespace, timestamp: timestamp.Unix()}) + return c.clientEx.CountWithTimestamp(name, value, tags, rate, timestamp) } // Histogram tracks the statistical distribution of a set of values on each host. @@ -745,11 +218,7 @@ func (c *Client) Histogram(name string, value float64, tags []string, rate float if c == nil { return ErrNoClient } - atomic.AddUint64(&c.telemetry.totalMetricsHistogram, 1) - if c.aggExtended != nil { - return c.sendToAggregator(histogram, name, value, tags, rate, c.aggExtended.histogram) - } - return c.send(metric{metricType: histogram, name: name, fvalue: value, tags: tags, rate: rate, globalTags: c.tags, namespace: c.namespace}) + return c.clientEx.Histogram(name, value, tags, rate) } // Distribution tracks the statistical distribution of a set of values across your infrastructure. @@ -757,21 +226,23 @@ func (c *Client) Distribution(name string, value float64, tags []string, rate fl if c == nil { return ErrNoClient } - atomic.AddUint64(&c.telemetry.totalMetricsDistribution, 1) - if c.aggExtended != nil { - return c.sendToAggregator(distribution, name, value, tags, rate, c.aggExtended.distribution) - } - return c.send(metric{metricType: distribution, name: name, fvalue: value, tags: tags, rate: rate, globalTags: c.tags, namespace: c.namespace}) + return c.clientEx.Distribution(name, value, tags, rate) } // Decr is just Count of -1 func (c *Client) Decr(name string, tags []string, rate float64) error { - return c.Count(name, -1, tags, rate) + if c == nil { + return ErrNoClient + } + return c.clientEx.Decr(name, tags, rate) } // Incr is just Count of 1 func (c *Client) Incr(name string, tags []string, rate float64) error { - return c.Count(name, 1, tags, rate) + if c == nil { + return ErrNoClient + } + return c.clientEx.Incr(name, tags, rate) } // Set counts the number of unique elements in a group. @@ -779,16 +250,16 @@ func (c *Client) Set(name string, value string, tags []string, rate float64) err if c == nil { return ErrNoClient } - atomic.AddUint64(&c.telemetry.totalMetricsSet, 1) - if c.agg != nil { - return c.agg.set(name, value, tags) - } - return c.send(metric{metricType: set, name: name, svalue: value, tags: tags, rate: rate, globalTags: c.tags, namespace: c.namespace}) + return c.clientEx.Set(name, value, tags, rate) + } // Timing sends timing information, it is an alias for TimeInMilliseconds func (c *Client) Timing(name string, value time.Duration, tags []string, rate float64) error { - return c.TimeInMilliseconds(name, value.Seconds()*1000, tags, rate) + if c == nil { + return ErrNoClient + } + return c.clientEx.Timing(name, value, tags, rate) } // TimeInMilliseconds sends timing information in milliseconds. @@ -797,11 +268,7 @@ func (c *Client) TimeInMilliseconds(name string, value float64, tags []string, r if c == nil { return ErrNoClient } - atomic.AddUint64(&c.telemetry.totalMetricsTiming, 1) - if c.aggExtended != nil { - return c.sendToAggregator(timing, name, value, tags, rate, c.aggExtended.timing) - } - return c.send(metric{metricType: timing, name: name, fvalue: value, tags: tags, rate: rate, globalTags: c.tags, namespace: c.namespace}) + return c.clientEx.TimeInMilliseconds(name, value, tags, rate) } // Event sends the provided Event. @@ -809,14 +276,15 @@ func (c *Client) Event(e *Event) error { if c == nil { return ErrNoClient } - atomic.AddUint64(&c.telemetry.totalEvents, 1) - return c.send(metric{metricType: event, evalue: e, rate: 1, globalTags: c.tags, namespace: c.namespace}) + return c.clientEx.Event(e) } // SimpleEvent sends an event with the provided title and text. func (c *Client) SimpleEvent(title, text string) error { - e := NewEvent(title, text) - return c.Event(e) + if c == nil { + return ErrNoClient + } + return c.clientEx.SimpleEvent(title, text) } // ServiceCheck sends the provided ServiceCheck. @@ -824,14 +292,16 @@ func (c *Client) ServiceCheck(sc *ServiceCheck) error { if c == nil { return ErrNoClient } - atomic.AddUint64(&c.telemetry.totalServiceChecks, 1) - return c.send(metric{metricType: serviceCheck, scvalue: sc, rate: 1, globalTags: c.tags, namespace: c.namespace}) + return c.clientEx.ServiceCheck(sc) } // SimpleServiceCheck sends an serviceCheck with the provided name and status. func (c *Client) SimpleServiceCheck(name string, status ServiceCheckStatus) error { - sc := NewServiceCheck(name, status) - return c.ServiceCheck(sc) + if c == nil { + return ErrNoClient + } + return c.clientEx.SimpleServiceCheck(name, status) + } // Close the client connection. @@ -839,69 +309,10 @@ func (c *Client) Close() error { if c == nil { return ErrNoClient } - - // Acquire closer lock to ensure only one thread can close the stop channel - c.closerLock.Lock() - defer c.closerLock.Unlock() - - if c.isClosed { - return nil - } - - // Notify all other threads that they should stop - select { - case <-c.stop: - return nil - default: - } - close(c.stop) - - if c.workersMode == channelMode { - for _, w := range c.workers { - w.stopReceivingMetric() - } - } - - // flush the aggregator first - if c.agg != nil { - if c.aggExtended != nil && c.aggregatorMode == channelMode { - c.agg.stopReceivingMetric() - } - c.agg.stop() - } - - // Wait for the threads to stop - c.wg.Wait() - - c.Flush() - - c.isClosed = true - return c.sender.close() + return c.clientEx.Close() } -// isOriginDetectionEnabled returns whether the clients should fill the container field. -// -// Disable origin detection only in one of the following cases: -// - DD_ORIGIN_DETECTION_ENABLED is explicitly set to false -// - o.originDetection is explicitly set to false, which is true by default -func isOriginDetectionEnabled(o *Options) bool { - if !o.originDetection || o.containerID != "" { - return false - } - - envVarValue := os.Getenv(originDetectionEnabled) - if envVarValue == "" { - // DD_ORIGIN_DETECTION_ENABLED is not set - // default to true - return true - } - - enabled, err := strconv.ParseBool(envVarValue) - if err != nil { - // Error due to an unsupported DD_ORIGIN_DETECTION_ENABLED value - // default to true - return true - } - - return enabled +// sendBlocking is used by the aggregator to inject aggregated metrics. +func (c *Client) sendBlocking(m metric) error { + return c.clientEx.sendBlocking(m) } diff --git a/vendor/github.com/DataDog/datadog-go/v5/statsd/statsd_direct.go b/vendor/github.com/DataDog/datadog-go/v5/statsd/statsd_direct.go index af66517cb..150ee2c81 100644 --- a/vendor/github.com/DataDog/datadog-go/v5/statsd/statsd_direct.go +++ b/vendor/github.com/DataDog/datadog-go/v5/statsd/statsd_direct.go @@ -51,16 +51,16 @@ func (c *ClientDirect) DistributionSamples(name string, values []float64, tags [ if c == nil { return ErrNoClient } - atomic.AddUint64(&c.telemetry.totalMetricsDistribution, uint64(len(values))) - return c.send(metric{ + atomic.AddUint64(&c.clientEx.telemetry.totalMetricsDistribution, uint64(len(values))) + return c.clientEx.send(metric{ metricType: distributionAggregated, name: name, fvalues: values, tags: tags, stags: strings.Join(tags, tagSeparatorSymbol), rate: rate, - globalTags: c.tags, - namespace: c.namespace, + globalTags: c.clientEx.tags, + namespace: c.clientEx.namespace, }) } diff --git a/vendor/github.com/DataDog/datadog-go/v5/statsd/statsdex.go b/vendor/github.com/DataDog/datadog-go/v5/statsd/statsdex.go new file mode 100644 index 000000000..d9e9b4ed0 --- /dev/null +++ b/vendor/github.com/DataDog/datadog-go/v5/statsd/statsdex.go @@ -0,0 +1,950 @@ +// Copyright 2013 Ooyala, Inc. + +/* +Package statsd provides a Go dogstatsd client. Dogstatsd extends the popular statsd, +adding tags and histograms and pushing upstream to Datadog. + +Refer to http://docs.datadoghq.com/guides/dogstatsd/ for information about DogStatsD. + +statsd is based on go-statsd-client. +*/ +package statsd + +//go:generate mockgen -source=statsd.go -destination=mocks/statsd.go + +import ( + "errors" + "fmt" + "io" + "net/url" + "os" + "strconv" + "strings" + "sync" + "sync/atomic" + "time" +) + +/* +OptimalUDPPayloadSize defines the optimal payload size for a UDP datagram, 1432 bytes +is optimal for regular networks with an MTU of 1500 so datagrams don't get +fragmented. It's generally recommended not to fragment UDP datagrams as losing +a single fragment will cause the entire datagram to be lost. +*/ +const OptimalUDPPayloadSize = 1432 + +/* +MaxUDPPayloadSize defines the maximum payload size for a UDP datagram. +Its value comes from the calculation: 65535 bytes Max UDP datagram size - +8byte UDP header - 60byte max IP headers +any number greater than that will see frames being cut out. +*/ +const MaxUDPPayloadSize = 65467 + +// DefaultUDPBufferPoolSize is the default size of the buffer pool for UDP clients. +const DefaultUDPBufferPoolSize = 2048 + +// DefaultUDSBufferPoolSize is the default size of the buffer pool for UDS clients. +const DefaultUDSBufferPoolSize = 512 + +/* +DefaultMaxAgentPayloadSize is the default maximum payload size the agent +can receive. This can be adjusted by changing dogstatsd_buffer_size in the +agent configuration file datadog.yaml. This is also used as the optimal payload size +for UDS datagrams. +*/ +const DefaultMaxAgentPayloadSize = 8192 + +/* +UnixAddressPrefix holds the prefix to use to enable Unix Domain Socket +traffic instead of UDP. The type of the socket will be guessed. +*/ +const UnixAddressPrefix = "unix://" + +/* +UnixDatagramAddressPrefix holds the prefix to use to enable Unix Domain Socket +datagram traffic instead of UDP. +*/ +const UnixAddressDatagramPrefix = "unixgram://" + +/* +UnixAddressStreamPrefix holds the prefix to use to enable Unix Domain Socket +stream traffic instead of UDP. +*/ +const UnixAddressStreamPrefix = "unixstream://" + +/* +WindowsPipeAddressPrefix holds the prefix to use to enable Windows Named Pipes +traffic instead of UDP. +*/ +const WindowsPipeAddressPrefix = `\\.\pipe\` + +var ( + AddressPrefixes = []string{UnixAddressPrefix, UnixAddressDatagramPrefix, UnixAddressStreamPrefix, WindowsPipeAddressPrefix} +) + +const ( + agentHostEnvVarName = "DD_AGENT_HOST" + agentPortEnvVarName = "DD_DOGSTATSD_PORT" + agentURLEnvVarName = "DD_DOGSTATSD_URL" + defaultUDPPort = "8125" +) + +const ( + // ddEntityID specifies client-side user-specified entity ID injection. + // This env var can be set to the Pod UID on Kubernetes via the downward API. + // Docs: https://docs.datadoghq.com/developers/dogstatsd/?tab=kubernetes#origin-detection-over-udp + ddEntityID = "DD_ENTITY_ID" + + // ddEntityIDTag specifies the tag name for the client-side entity ID injection + // The Agent expects this tag to contain a non-prefixed Kubernetes Pod UID. + ddEntityIDTag = "dd.internal.entity_id" + + // originDetectionEnabled specifies the env var to enable/disable sending the container ID field. + originDetectionEnabled = "DD_ORIGIN_DETECTION_ENABLED" +) + +/* +ddEnvTagsMapping is a mapping of each "DD_" prefixed environment variable +to a specific tag name. We use a slice to keep the order and simplify tests. +*/ +var ddEnvTagsMapping = []struct{ envName, tagName string }{ + {ddEntityID, ddEntityIDTag}, // Client-side entity ID injection for container tagging. + {"DD_ENV", "env"}, // The name of the env in which the service runs. + {"DD_SERVICE", "service"}, // The name of the running service. + {"DD_VERSION", "version"}, // The current version of the running service. +} + +type metricType int + +const ( + gauge metricType = iota + count + histogram + histogramAggregated + distribution + distributionAggregated + set + timing + timingAggregated + event + serviceCheck +) + +type receivingMode int + +const ( + mutexMode receivingMode = iota + channelMode +) + +const ( + writerNameUDP string = "udp" + writerNameUDS string = "uds" + writerNameUDSStream string = "uds-stream" + writerWindowsPipe string = "pipe" + writerNameCustom string = "custom" +) + +// noTimestamp is used as a value for metric without a given timestamp. +const noTimestamp = int64(0) + +type metric struct { + metricType metricType + namespace string + globalTags []string + name string + fvalue float64 + fvalues []float64 + ivalue int64 + svalue string + evalue *Event + scvalue *ServiceCheck + tags []string + stags string + rate float64 + timestamp int64 + originDetection bool + overrideCard Cardinality +} + +type noClientErr string + +// ErrNoClient is returned if statsd reporting methods are invoked on +// a nil client. +const ErrNoClient = noClientErr("statsd client is nil") + +func (e noClientErr) Error() string { + return string(e) +} + +type invalidTimestampErr string + +// InvalidTimestamp is returned if a provided timestamp is invalid. +const InvalidTimestamp = invalidTimestampErr("invalid timestamp") + +func (e invalidTimestampErr) Error() string { + return string(e) +} + +// ClientInterfaceEx is an temporary interface that is similar to ClientInterface +// but with the addition of a `...Parameter` for the telemetry functions. This is currently +// just used to specify the tag cardinality. We want to avoid changing ClientInterface +// at present as that would require a new major release. +// Users should avoid implementing this interface as it will be deprecated in the next version. +type ClientInterfaceEx interface { + // Gauge measures the value of a metric at a particular time. + Gauge(name string, value float64, tags []string, rate float64, parameters ...Parameter) error + + // GaugeWithTimestamp measures the value of a metric at a given time. + // BETA - Please contact our support team for more information to use this feature: https://www.datadoghq.com/support/ + // The value will bypass any aggregation on the client side and agent side, this is + // useful when sending points in the past. + // + // Minimum Datadog Agent version: 7.40.0 + GaugeWithTimestamp(name string, value float64, tags []string, rate float64, timestamp time.Time, parameters ...Parameter) error + + // Count tracks how many times something happened per second. + Count(name string, value int64, tags []string, rate float64, parameters ...Parameter) error + + // CountWithTimestamp tracks how many times something happened at the given second. + // BETA - Please contact our support team for more information to use this feature: https://www.datadoghq.com/support/ + // The value will bypass any aggregation on the client side and agent side, this is + // useful when sending points in the past. + // + // Minimum Datadog Agent version: 7.40.0 + CountWithTimestamp(name string, value int64, tags []string, rate float64, timestamp time.Time, parameters ...Parameter) error + + // Histogram tracks the statistical distribution of a set of values on each host. + Histogram(name string, value float64, tags []string, rate float64, parameters ...Parameter) error + + // Distribution tracks the statistical distribution of a set of values across your infrastructure. + // + // It is recommended to use `WithMaxBufferedMetricsPerContext` to avoid dropping metrics at high throughput, `rate` can + // also be used to limit the load. Both options can *not* be used together. + Distribution(name string, value float64, tags []string, rate float64, parameters ...Parameter) error + + // Decr is just Count of -1 + Decr(name string, tags []string, rate float64, parameters ...Parameter) error + + // Incr is just Count of 1 + Incr(name string, tags []string, rate float64, parameters ...Parameter) error + + // Set counts the number of unique elements in a group. + Set(name string, value string, tags []string, rate float64, parameters ...Parameter) error + + // Timing sends timing information, it is an alias for TimeInMilliseconds + Timing(name string, value time.Duration, tags []string, rate float64, parameters ...Parameter) error + + // TimeInMilliseconds sends timing information in milliseconds. + // It is flushed by statsd with percentiles, mean and other info (https://github.com/etsy/statsd/blob/master/docs/metric_types.md#timing) + TimeInMilliseconds(name string, value float64, tags []string, rate float64, parameters ...Parameter) error + + // Event sends the provided Event. + Event(e *Event, parameters ...Parameter) error + + // SimpleEvent sends an event with the provided title and text. + SimpleEvent(title, text string, parameters ...Parameter) error + + // ServiceCheck sends the provided ServiceCheck. + ServiceCheck(sc *ServiceCheck, parameters ...Parameter) error + + // SimpleServiceCheck sends an serviceCheck with the provided name and status. + SimpleServiceCheck(name string, status ServiceCheckStatus, parameters ...Parameter) error + + // Close the client connection. + Close() error + + // Flush forces a flush of all the queued dogstatsd payloads. + Flush() error + + // IsClosed returns if the client has been closed. + IsClosed() bool + + // GetTelemetry return the telemetry metrics for the client since it started. + GetTelemetry() Telemetry + + // Ensure this interface can't be implemented outside of this package. + // ClientInterfaceEx is a temporary measure to allow us to release a version of the library with the + // extra `...Parameter` parameter (currently used to specify the tag cardinality) in the metric functions + // without having to release a new major version. + // This interface will be deprecated with the next release. + private() +} + +type ErrorHandler func(error) + +// A Client is a handle for sending messages to dogstatsd. It is safe to +// use one Client from multiple goroutines simultaneously. +type ClientEx struct { + // Sender handles the underlying networking protocol + sender *sender + // namespace to prepend to all statsd calls + namespace string + // tags are global tags to be added to every statsd call + tags []string + flushTime time.Duration + telemetry *statsdTelemetry + telemetryClient *telemetryClient + stop chan struct{} + wg sync.WaitGroup + workers []*worker + closerLock sync.Mutex + workersMode receivingMode + aggregatorMode receivingMode + agg *aggregator + aggExtended *aggregator + options []Option + addrOption string + isClosed bool + errorOnBlockedChannel bool + errorHandler ErrorHandler + originDetection bool +} + +// statsdTelemetry contains telemetry metrics about the client +type statsdTelemetry struct { + totalMetricsGauge uint64 + totalMetricsCount uint64 + totalMetricsHistogram uint64 + totalMetricsDistribution uint64 + totalMetricsSet uint64 + totalMetricsTiming uint64 + totalEvents uint64 + totalServiceChecks uint64 + totalDroppedOnReceive uint64 +} + +// Verify that ClientEx implements the ClientInterfaceEx interface. +// https://golang.org/doc/faq#guarantee_satisfies_interface +var _ ClientInterfaceEx = &ClientEx{} + +func resolveAddr(addr string) string { + envPort := "" + + if addr == "" { + addr = os.Getenv(agentHostEnvVarName) + envPort = os.Getenv(agentPortEnvVarName) + agentURL, _ := os.LookupEnv(agentURLEnvVarName) + agentURL = parseAgentURL(agentURL) + + // agentURLEnvVarName has priority over agentHostEnvVarName + if agentURL != "" { + return agentURL + } + } + + if addr == "" { + return "" + } + + for _, prefix := range AddressPrefixes { + if strings.HasPrefix(addr, prefix) { + return addr + } + } + // TODO: How does this work for IPv6? + if strings.Contains(addr, ":") { + return addr + } + if envPort != "" { + addr = fmt.Sprintf("%s:%s", addr, envPort) + } else { + addr = fmt.Sprintf("%s:%s", addr, defaultUDPPort) + } + return addr +} + +func parseAgentURL(agentURL string) string { + if agentURL != "" { + if strings.HasPrefix(agentURL, WindowsPipeAddressPrefix) { + return agentURL + } + + parsedURL, err := url.Parse(agentURL) + if err != nil { + return "" + } + + if parsedURL.Scheme == "udp" { + if strings.Contains(parsedURL.Host, ":") { + return parsedURL.Host + } + return fmt.Sprintf("%s:%s", parsedURL.Host, defaultUDPPort) + } + + if parsedURL.Scheme == "unix" { + return agentURL + } + } + return "" +} + +func createWriter(addr string, writeTimeout time.Duration, connectTimeout time.Duration) (Transport, string, error) { + if addr == "" { + return nil, "", errors.New("No address passed and autodetection from environment failed") + } + + switch { + case strings.HasPrefix(addr, WindowsPipeAddressPrefix): + w, err := newWindowsPipeWriter(addr, writeTimeout) + return w, writerWindowsPipe, err + case strings.HasPrefix(addr, UnixAddressPrefix): + w, err := newUDSWriter(addr[len(UnixAddressPrefix):], writeTimeout, connectTimeout, "") + return w, writerNameUDS, err + case strings.HasPrefix(addr, UnixAddressDatagramPrefix): + w, err := newUDSWriter(addr[len(UnixAddressDatagramPrefix):], writeTimeout, connectTimeout, "unixgram") + return w, writerNameUDS, err + case strings.HasPrefix(addr, UnixAddressStreamPrefix): + w, err := newUDSWriter(addr[len(UnixAddressStreamPrefix):], writeTimeout, connectTimeout, "unix") + return w, writerNameUDS, err + default: + w, err := newUDPWriter(addr, writeTimeout) + return w, writerNameUDP, err + } +} + +// New returns a pointer to a new Client given an addr in the format "hostname:port" for UDP, +// "unix:///path/to/socket" for UDS or "\\.\pipe\path\to\pipe" for Windows Named Pipes. +func NewEx(addr string, options ...Option) (*ClientEx, error) { + o, err := resolveOptions(options) + if err != nil { + return nil, err + } + + addr = resolveAddr(addr) + w, writerType, err := createWriter(addr, o.writeTimeout, o.connectTimeout) + if err != nil { + return nil, err + } + + client, err := newWithWriter(w, o, writerType) + if err == nil { + client.options = append(client.options, options...) + client.addrOption = addr + } + return client, err +} + +type customWriter struct { + io.WriteCloser +} + +func (w *customWriter) GetTransportName() string { + return writerNameCustom +} + +// NewWithWriter creates a new ClientEx with given writer. Writer is a +// io.WriteCloser +func NewWithWriterEx(w io.WriteCloser, options ...Option) (*ClientEx, error) { + o, err := resolveOptions(options) + if err != nil { + return nil, err + } + return newWithWriter(&customWriter{w}, o, writerNameCustom) +} + +// CloneWithExtraOptions create a new ClientEx with extra options +func CloneWithExtraOptionsEx(c *ClientEx, options ...Option) (*ClientEx, error) { + if c == nil { + return nil, ErrNoClient + } + + if c.addrOption == "" { + return nil, fmt.Errorf("can't clone client with no addrOption") + } + opt := append(c.options, options...) + return NewEx(c.addrOption, opt...) +} + +func newWithWriter(w Transport, o *Options, writerName string) (*ClientEx, error) { + c := ClientEx{ + namespace: o.namespace, + tags: o.tags, + telemetry: &statsdTelemetry{}, + errorOnBlockedChannel: o.channelModeErrorsWhenFull, + errorHandler: o.errorHandler, + originDetection: isOriginDetectionEnabled(o), + } + + // Inject values of DD_* environment variables as global tags. + for _, mapping := range ddEnvTagsMapping { + if value := os.Getenv(mapping.envName); value != "" { + c.tags = append(c.tags, fmt.Sprintf("%s:%s", mapping.tagName, value)) + } + } + // Whether origin detection is enabled or not for this client, we need to initialize the global + // external environment variable in case another client has enabled it and needs to access it. + initExternalEnv() + + // Initializes the global tag cardinality with either the value passed in by the user or the value from the DD_CARDINALITY/DATADOG_CARDINALITY environment variable. + initTagCardinality(o.tagCardinality) + + initContainerID(o.containerID, fillInContainerID(o), isHostCgroupNamespace()) + isUDS := writerName == writerNameUDS + + if o.maxBytesPerPayload == 0 { + if isUDS { + o.maxBytesPerPayload = DefaultMaxAgentPayloadSize + } else { + o.maxBytesPerPayload = OptimalUDPPayloadSize + } + } + if o.bufferPoolSize == 0 { + if isUDS { + o.bufferPoolSize = DefaultUDSBufferPoolSize + } else { + o.bufferPoolSize = DefaultUDPBufferPoolSize + } + } + if o.senderQueueSize == 0 { + if isUDS { + o.senderQueueSize = DefaultUDSBufferPoolSize + } else { + o.senderQueueSize = DefaultUDPBufferPoolSize + } + } + + bufferPool := newBufferPool(o.bufferPoolSize, o.maxBytesPerPayload, o.maxMessagesPerPayload) + c.sender = newSender(w, o.senderQueueSize, bufferPool, o.errorHandler) + c.aggregatorMode = o.receiveMode + + c.workersMode = o.receiveMode + // channelMode mode at the worker level is not enabled when + // ExtendedAggregation is since the user app will not directly + // use the worker (the aggregator sit between the app and the + // workers). + if o.extendedAggregation { + c.workersMode = mutexMode + } + + if o.aggregation || o.extendedAggregation || o.maxBufferedSamplesPerContext > 0 { + c.agg = newAggregator(&c, int64(o.maxBufferedSamplesPerContext)) + c.agg.start(o.aggregationFlushInterval) + + if o.extendedAggregation { + c.aggExtended = c.agg + + if c.aggregatorMode == channelMode { + c.agg.startReceivingMetric(o.channelModeBufferSize, o.workersCount) + } + } + } + + for i := 0; i < o.workersCount; i++ { + w := newWorker(bufferPool, c.sender) + c.workers = append(c.workers, w) + + if c.workersMode == channelMode { + w.startReceivingMetric(o.channelModeBufferSize) + } + } + + c.flushTime = o.bufferFlushInterval + c.stop = make(chan struct{}, 1) + + c.wg.Add(1) + go func() { + defer c.wg.Done() + c.watch() + }() + + if o.telemetry { + if o.telemetryAddr == "" { + c.telemetryClient = newTelemetryClient(&c, c.agg != nil) + } else { + var err error + c.telemetryClient, err = newTelemetryClientWithCustomAddr(&c, o.telemetryAddr, c.agg != nil, bufferPool, o.writeTimeout, o.connectTimeout) + if err != nil { + return nil, err + } + } + c.telemetryClient.run(&c.wg, c.stop) + } + + return &c, nil +} + +func (c *ClientEx) watch() { + ticker := time.NewTicker(c.flushTime) + + for { + select { + case <-ticker.C: + for _, w := range c.workers { + w.flush() + } + case <-c.stop: + ticker.Stop() + return + } + } +} + +// Flush forces a flush of all the queued dogstatsd payloads This method is +// blocking and will not return until everything is sent through the network. +// In mutexMode, this will also block sampling new data to the client while the +// workers and sender are flushed. +func (c *ClientEx) Flush() error { + if c == nil { + return ErrNoClient + } + if c.agg != nil { + c.agg.flush() + } + for _, w := range c.workers { + w.pause() + defer w.unpause() + w.flushUnsafe() + } + // Now that the worker are pause the sender can flush the queue between + // worker and senders + c.sender.flush() + return nil +} + +// IsClosed returns if the client has been closed. +func (c *ClientEx) IsClosed() bool { + c.closerLock.Lock() + defer c.closerLock.Unlock() + return c.isClosed +} + +func (c *ClientEx) flushTelemetryMetrics(t *Telemetry) { + t.TotalMetricsGauge = atomic.LoadUint64(&c.telemetry.totalMetricsGauge) + t.TotalMetricsCount = atomic.LoadUint64(&c.telemetry.totalMetricsCount) + t.TotalMetricsSet = atomic.LoadUint64(&c.telemetry.totalMetricsSet) + t.TotalMetricsHistogram = atomic.LoadUint64(&c.telemetry.totalMetricsHistogram) + t.TotalMetricsDistribution = atomic.LoadUint64(&c.telemetry.totalMetricsDistribution) + t.TotalMetricsTiming = atomic.LoadUint64(&c.telemetry.totalMetricsTiming) + t.TotalEvents = atomic.LoadUint64(&c.telemetry.totalEvents) + t.TotalServiceChecks = atomic.LoadUint64(&c.telemetry.totalServiceChecks) + t.TotalDroppedOnReceive = atomic.LoadUint64(&c.telemetry.totalDroppedOnReceive) +} + +// GetTelemetry return the telemetry metrics for the client since it started. +func (c *ClientEx) GetTelemetry() Telemetry { + return c.telemetryClient.getTelemetry() +} + +// GetTransport return the name of the transport used. +func (c *ClientEx) GetTransport() string { + if c.sender == nil { + return "" + } + return c.sender.getTransportName() +} + +type ErrorInputChannelFull struct { + Metric metric + ChannelSize int + Msg string +} + +func (e ErrorInputChannelFull) Error() string { + return e.Msg +} + +func (c *ClientEx) send(m metric) error { + h := hashString32(m.name) + worker := c.workers[h%uint32(len(c.workers))] + + if c.workersMode == channelMode { + select { + case worker.inputMetrics <- m: + default: + atomic.AddUint64(&c.telemetry.totalDroppedOnReceive, 1) + err := &ErrorInputChannelFull{m, len(worker.inputMetrics), "Worker input channel full"} + if c.errorHandler != nil { + c.errorHandler(err) + } + if c.errorOnBlockedChannel { + return err + } + } + return nil + } + return worker.processMetric(m) +} + +// sendBlocking is used by the aggregator to inject aggregated metrics. +func (c *ClientEx) sendBlocking(m metric) error { + m.globalTags = c.tags + m.namespace = c.namespace + + h := hashString32(m.name) + worker := c.workers[h%uint32(len(c.workers))] + return worker.processMetric(m) +} + +func (c *ClientEx) sendToAggregator(mType metricType, name string, value float64, tags []string, rate float64, f bufferedMetricSampleFunc, cardinality Cardinality) error { + if c.aggregatorMode == channelMode { + m := metric{metricType: mType, name: name, fvalue: value, tags: tags, rate: rate, overrideCard: cardinality} + select { + case c.aggExtended.inputMetrics <- m: + default: + atomic.AddUint64(&c.telemetry.totalDroppedOnReceive, 1) + err := &ErrorInputChannelFull{m, len(c.aggExtended.inputMetrics), "Aggregator input channel full"} + if c.errorHandler != nil { + c.errorHandler(err) + } + if c.errorOnBlockedChannel { + return err + } + } + return nil + } + return f(name, value, tags, rate, cardinality) +} + +// Gauge measures the value of a metric at a particular time. +func (c *ClientEx) Gauge(name string, value float64, tags []string, rate float64, parameters ...Parameter) error { + if c == nil { + return ErrNoClient + } + atomic.AddUint64(&c.telemetry.totalMetricsGauge, 1) + + cardinality := parseTagCardinality(parameters) + + if c.agg != nil { + return c.agg.gauge(name, value, tags, cardinality) + } + return c.send(metric{metricType: gauge, name: name, fvalue: value, tags: tags, rate: rate, globalTags: c.tags, namespace: c.namespace, originDetection: c.originDetection, overrideCard: cardinality}) +} + +// GaugeWithTimestamp measures the value of a metric at a given time. +// BETA - Please contact our support team for more information to use this feature: https://www.datadoghq.com/support/ +// The value will bypass any aggregation on the client side and agent side, this is +// useful when sending points in the past. +// +// Minimum Datadog Agent version: 7.40.0 +func (c *ClientEx) GaugeWithTimestamp(name string, value float64, tags []string, rate float64, timestamp time.Time, parameters ...Parameter) error { + if c == nil { + return ErrNoClient + } + + if timestamp.IsZero() || timestamp.Unix() <= noTimestamp { + return InvalidTimestamp + } + + atomic.AddUint64(&c.telemetry.totalMetricsGauge, 1) + cardinality := parseTagCardinality(parameters) + return c.send(metric{metricType: gauge, name: name, fvalue: value, tags: tags, rate: rate, globalTags: c.tags, namespace: c.namespace, timestamp: timestamp.Unix(), originDetection: c.originDetection, overrideCard: cardinality}) +} + +// Count tracks how many times something happened per second. +func (c *ClientEx) Count(name string, value int64, tags []string, rate float64, parameters ...Parameter) error { + if c == nil { + return ErrNoClient + } + atomic.AddUint64(&c.telemetry.totalMetricsCount, 1) + cardinality := parseTagCardinality(parameters) + if c.agg != nil { + return c.agg.count(name, value, tags, cardinality) + } + return c.send(metric{metricType: count, name: name, ivalue: value, tags: tags, rate: rate, globalTags: c.tags, namespace: c.namespace, originDetection: c.originDetection, overrideCard: cardinality}) +} + +// CountWithTimestamp tracks how many times something happened at the given second. +// BETA - Please contact our support team for more information to use this feature: https://www.datadoghq.com/support/ +// The value will bypass any aggregation on the client side and agent side, this is +// useful when sending points in the past. +// +// Minimum Datadog Agent version: 7.40.0 +func (c *ClientEx) CountWithTimestamp(name string, value int64, tags []string, rate float64, timestamp time.Time, parameters ...Parameter) error { + if c == nil { + return ErrNoClient + } + + if timestamp.IsZero() || timestamp.Unix() <= noTimestamp { + return InvalidTimestamp + } + + atomic.AddUint64(&c.telemetry.totalMetricsCount, 1) + cardinality := parseTagCardinality(parameters) + return c.send(metric{metricType: count, name: name, ivalue: value, tags: tags, rate: rate, globalTags: c.tags, namespace: c.namespace, timestamp: timestamp.Unix(), originDetection: c.originDetection, overrideCard: cardinality}) +} + +// Histogram tracks the statistical distribution of a set of values on each host. +func (c *ClientEx) Histogram(name string, value float64, tags []string, rate float64, parameters ...Parameter) error { + if c == nil { + return ErrNoClient + } + atomic.AddUint64(&c.telemetry.totalMetricsHistogram, 1) + cardinality := parseTagCardinality(parameters) + if c.aggExtended != nil { + return c.sendToAggregator(histogram, name, value, tags, rate, c.aggExtended.histogram, cardinality) + } + return c.send(metric{metricType: histogram, name: name, fvalue: value, tags: tags, rate: rate, globalTags: c.tags, namespace: c.namespace, originDetection: c.originDetection, overrideCard: cardinality}) +} + +// Distribution tracks the statistical distribution of a set of values across your infrastructure. +func (c *ClientEx) Distribution(name string, value float64, tags []string, rate float64, parameters ...Parameter) error { + if c == nil { + return ErrNoClient + } + atomic.AddUint64(&c.telemetry.totalMetricsDistribution, 1) + cardinality := parseTagCardinality(parameters) + if c.aggExtended != nil { + return c.sendToAggregator(distribution, name, value, tags, rate, c.aggExtended.distribution, cardinality) + } + return c.send(metric{metricType: distribution, name: name, fvalue: value, tags: tags, rate: rate, globalTags: c.tags, namespace: c.namespace, originDetection: c.originDetection, overrideCard: cardinality}) +} + +// Decr is just Count of -1 +func (c *ClientEx) Decr(name string, tags []string, rate float64, parameters ...Parameter) error { + return c.Count(name, -1, tags, rate, parameters...) +} + +// Incr is just Count of 1 +func (c *ClientEx) Incr(name string, tags []string, rate float64, parameters ...Parameter) error { + return c.Count(name, 1, tags, rate, parameters...) +} + +// Set counts the number of unique elements in a group. +func (c *ClientEx) Set(name string, value string, tags []string, rate float64, parameters ...Parameter) error { + if c == nil { + return ErrNoClient + } + atomic.AddUint64(&c.telemetry.totalMetricsSet, 1) + cardinality := parseTagCardinality(parameters) + + if c.agg != nil { + return c.agg.set(name, value, tags, cardinality) + } + return c.send(metric{metricType: set, name: name, svalue: value, tags: tags, rate: rate, globalTags: c.tags, namespace: c.namespace, originDetection: c.originDetection, overrideCard: cardinality}) +} + +// Timing sends timing information, it is an alias for TimeInMilliseconds +func (c *ClientEx) Timing(name string, value time.Duration, tags []string, rate float64, parameters ...Parameter) error { + return c.TimeInMilliseconds(name, value.Seconds()*1000, tags, rate, parameters...) +} + +// TimeInMilliseconds sends timing information in milliseconds. +// It is flushed by statsd with percentiles, mean and other info (https://github.com/etsy/statsd/blob/master/docs/metric_types.md#timing) +func (c *ClientEx) TimeInMilliseconds(name string, value float64, tags []string, rate float64, parameters ...Parameter) error { + if c == nil { + return ErrNoClient + } + atomic.AddUint64(&c.telemetry.totalMetricsTiming, 1) + cardinality := parseTagCardinality(parameters) + if c.aggExtended != nil { + return c.sendToAggregator(timing, name, value, tags, rate, c.aggExtended.timing, cardinality) + } + return c.send(metric{metricType: timing, name: name, fvalue: value, tags: tags, rate: rate, globalTags: c.tags, namespace: c.namespace, originDetection: c.originDetection, overrideCard: cardinality}) +} + +// Event sends the provided Event. +func (c *ClientEx) Event(e *Event, parameters ...Parameter) error { + if c == nil { + return ErrNoClient + } + atomic.AddUint64(&c.telemetry.totalEvents, 1) + cardinality := parseTagCardinality(parameters) + return c.send(metric{metricType: event, evalue: e, rate: 1, globalTags: c.tags, namespace: c.namespace, originDetection: c.originDetection, overrideCard: cardinality}) +} + +// SimpleEvent sends an event with the provided title and text. +func (c *ClientEx) SimpleEvent(title, text string, parameters ...Parameter) error { + e := NewEvent(title, text) + return c.Event(e, parameters...) +} + +// ServiceCheck sends the provided ServiceCheck. +func (c *ClientEx) ServiceCheck(sc *ServiceCheck, parameters ...Parameter) error { + if c == nil { + return ErrNoClient + } + atomic.AddUint64(&c.telemetry.totalServiceChecks, 1) + cardinality := parseTagCardinality(parameters) + return c.send(metric{metricType: serviceCheck, scvalue: sc, rate: 1, globalTags: c.tags, namespace: c.namespace, originDetection: c.originDetection, overrideCard: cardinality}) +} + +// SimpleServiceCheck sends an serviceCheck with the provided name and status. +func (c *ClientEx) SimpleServiceCheck(name string, status ServiceCheckStatus, parameters ...Parameter) error { + sc := NewServiceCheck(name, status) + return c.ServiceCheck(sc, parameters...) +} + +// Close the client connection. +func (c *ClientEx) Close() error { + if c == nil { + return ErrNoClient + } + + // Acquire closer lock to ensure only one thread can close the stop channel + c.closerLock.Lock() + defer c.closerLock.Unlock() + + if c.isClosed { + return nil + } + + // Notify all other threads that they should stop + select { + case <-c.stop: + return nil + default: + } + close(c.stop) + + if c.workersMode == channelMode { + for _, w := range c.workers { + w.stopReceivingMetric() + } + } + + // flush the aggregator first + if c.agg != nil { + if c.aggExtended != nil && c.aggregatorMode == channelMode { + c.agg.stopReceivingMetric() + } + c.agg.stop() + } + + // Wait for the threads to stop + c.wg.Wait() + + c.Flush() + + c.isClosed = true + return c.sender.close() +} + +func (*ClientEx) private() { +} + +// isOriginDetectionEnabled returns whether origin detection is enabled. +// +// Disable origin detection only in one of the following cases: +// - DD_ORIGIN_DETECTION_ENABLED is explicitly set to false +// - o.originDetection is explicitly set to false, which is true by default +func isOriginDetectionEnabled(o *Options) bool { + if !o.originDetection { + return false + } + + envVarValue := os.Getenv(originDetectionEnabled) + if envVarValue == "" { + // DD_ORIGIN_DETECTION_ENABLED is not set + // default to true + return true + } + + enabled, err := strconv.ParseBool(envVarValue) + if err != nil { + // Error due to an unsupported DD_ORIGIN_DETECTION_ENABLED value + // default to true + return true + } + + return enabled +} + +// fillInContainerID returns whether the clients should fill the container field. +func fillInContainerID(o *Options) bool { + if o.containerID != "" { + return false + } + return isOriginDetectionEnabled(o) +} diff --git a/vendor/github.com/DataDog/datadog-go/v5/statsd/tag_cardinality.go b/vendor/github.com/DataDog/datadog-go/v5/statsd/tag_cardinality.go new file mode 100644 index 000000000..1289e4729 --- /dev/null +++ b/vendor/github.com/DataDog/datadog-go/v5/statsd/tag_cardinality.go @@ -0,0 +1,99 @@ +package statsd + +import ( + "os" + "strings" + "sync" +) + +type Parameter interface{} + +type Cardinality int + +const ( + CardinalityNotSet Cardinality = -1 + CardinalityNone Cardinality = iota + CardinalityLow + CardinalityOrchestrator + CardinalityHigh +) + +func (c Cardinality) String() string { + switch c { + case CardinalityNone: + return "none" + case CardinalityLow: + return "low" + case CardinalityOrchestrator: + return "orchestrator" + case CardinalityHigh: + return "high" + } + return "" +} + +// validateCardinality converts a string to Cardinality +func validateCardinality(card string) Cardinality { + card = strings.ToLower(card) + switch card { + case "none": + return CardinalityNone + case "low": + return CardinalityLow + case "orchestrator": + return CardinalityOrchestrator + case "high": + return CardinalityHigh + default: + return CardinalityNotSet + } +} + +var ( + // Global setting of the tag cardinality. + tagCardinality Cardinality = CardinalityNotSet + tagCardinalityMutex sync.RWMutex +) + +// initTagCardinality initializes the tag cardinality. +func initTagCardinality(card Cardinality) { + tagCardinalityMutex.Lock() + defer tagCardinalityMutex.Unlock() + + tagCardinality = card + + // If the user has not provided a valid value, read the value from the DD_CARDINALITY environment variable. + if tagCardinality.String() == "" { + tagCardinality = validateCardinality(os.Getenv("DD_CARDINALITY")) + } + // If DD_CARDINALITY is not set or valid, read the value from the DATADOG_CARDINALITY environment variable. + if tagCardinality.String() == "" { + tagCardinality = validateCardinality(os.Getenv("DATADOG_CARDINALITY")) + } +} + +func getTagCardinality() Cardinality { + tagCardinalityMutex.RLock() + defer tagCardinalityMutex.RUnlock() + return tagCardinality +} + +func parseTagCardinality(parameters []Parameter) Cardinality { + cardinality := CardinalityNotSet + for _, o := range parameters { + c, ok := o.(Cardinality) + if ok { + cardinality = c + } + } + return resolveCardinality(cardinality) +} + +// resolveCardinality returns the cardinality to use, prioritizing the metric-level cardinality over the global setting. +// This function validates the cardinality and falls back to the global setting if invalid. +func resolveCardinality(card Cardinality) Cardinality { + if card.String() == "" { + return getTagCardinality() + } + return card +} diff --git a/vendor/github.com/DataDog/datadog-go/v5/statsd/telemetry.go b/vendor/github.com/DataDog/datadog-go/v5/statsd/telemetry.go index feda764b5..db34f0bd1 100644 --- a/vendor/github.com/DataDog/datadog-go/v5/statsd/telemetry.go +++ b/vendor/github.com/DataDog/datadog-go/v5/statsd/telemetry.go @@ -19,7 +19,7 @@ var clientTelemetryTag = "client:go" /* clientVersionTelemetryTag is a tag identifying this specific client version. */ -var clientVersionTelemetryTag = "client_version:5.4.0" +var clientVersionTelemetryTag = "client_version:5.8.0" // Telemetry represents internal metrics about the client behavior since it started. type Telemetry struct { @@ -115,7 +115,7 @@ type Telemetry struct { type telemetryClient struct { sync.RWMutex // used mostly to change the transport tag. - c *Client + c *ClientEx aggEnabled bool // is aggregation enabled and should we sent aggregation telemetry. transport string tags []string @@ -126,7 +126,7 @@ type telemetryClient struct { lastSample Telemetry // The previous sample of telemetry sent } -func newTelemetryClient(c *Client, aggregationEnabled bool) *telemetryClient { +func newTelemetryClient(c *ClientEx, aggregationEnabled bool) *telemetryClient { t := &telemetryClient{ c: c, aggEnabled: aggregationEnabled, @@ -138,7 +138,7 @@ func newTelemetryClient(c *Client, aggregationEnabled bool) *telemetryClient { return t } -func newTelemetryClientWithCustomAddr(c *Client, telemetryAddr string, aggregationEnabled bool, pool *bufferPool, +func newTelemetryClientWithCustomAddr(c *ClientEx, telemetryAddr string, aggregationEnabled bool, pool *bufferPool, writeTimeout time.Duration, connectTimeout time.Duration, ) (*telemetryClient, error) { telemetryAddr = resolveAddr(telemetryAddr) diff --git a/vendor/github.com/DataDog/datadog-go/v5/statsd/uds.go b/vendor/github.com/DataDog/datadog-go/v5/statsd/uds.go index 09518992a..ed26f3ea2 100644 --- a/vendor/github.com/DataDog/datadog-go/v5/statsd/uds.go +++ b/vendor/github.com/DataDog/datadog-go/v5/statsd/uds.go @@ -113,11 +113,34 @@ func (w *udsWriter) tryToDial(network string) (net.Conn, error) { if err != nil { return nil, err } - newConn, err := net.DialTimeout(udsAddr.Network(), udsAddr.String(), w.connectTimeout) - if err != nil { - return nil, err + + // Try to gracefully reconnect to the socket when we encounter "connection refused", as it's likely that the Agent + // is restarting and the socket is not yet available. + connectAttemptsLeft := 3 + connectDeadline := time.Now().Add(w.connectTimeout) + + // Calculate the backoff time for connection refused errors, but don't exceed one second: this means we won't waste + // longer than 1 seconds worth of time if the socket becomes available immediately after our last connect attempt + connRefusedBackoff := w.connectTimeout / time.Duration(connectAttemptsLeft+1) + if connRefusedBackoff > time.Second { + connRefusedBackoff = time.Second + } + + for { + connectAttemptsLeft-- + + perCallTimeout := time.Until(connectDeadline) + newConn, err := net.DialTimeout(udsAddr.Network(), udsAddr.String(), perCallTimeout) + if err != nil { + if strings.HasSuffix(err.Error(), "connection refused") && connectAttemptsLeft > 0 { + // If we get a connection refused error, we need to wait a bit before trying again. + time.Sleep(connRefusedBackoff) + continue + } + return nil, err + } + return newConn, nil } - return newConn, nil } func (w *udsWriter) ensureConnection() (net.Conn, error) { diff --git a/vendor/github.com/DataDog/datadog-go/v5/statsd/worker.go b/vendor/github.com/DataDog/datadog-go/v5/statsd/worker.go index 19dccd339..b1bf6cf23 100644 --- a/vendor/github.com/DataDog/datadog-go/v5/statsd/worker.go +++ b/vendor/github.com/DataDog/datadog-go/v5/statsd/worker.go @@ -92,7 +92,7 @@ func (w *worker) writeAggregatedMetricUnsafe(m metric, metricSymbol []byte, prec } for { - pos, err := w.buffer.writeAggregated(metricSymbol, m.namespace, m.globalTags, m.name, m.fvalues[globalPos:], m.stags, extraSize, precision, rate) + pos, err := w.buffer.writeAggregated(metricSymbol, m.namespace, m.globalTags, m.name, m.fvalues[globalPos:], m.stags, extraSize, precision, rate, m.originDetection, m.overrideCard) if err == errPartialWrite { // We successfully wrote part of the histogram metrics. // We flush the current buffer and finish the histogram @@ -108,21 +108,21 @@ func (w *worker) writeAggregatedMetricUnsafe(m metric, metricSymbol []byte, prec func (w *worker) writeMetricUnsafe(m metric) error { switch m.metricType { case gauge: - return w.buffer.writeGauge(m.namespace, m.globalTags, m.name, m.fvalue, m.tags, m.rate, m.timestamp) + return w.buffer.writeGauge(m.namespace, m.globalTags, m.name, m.fvalue, m.tags, m.rate, m.timestamp, m.originDetection, m.overrideCard) case count: - return w.buffer.writeCount(m.namespace, m.globalTags, m.name, m.ivalue, m.tags, m.rate, m.timestamp) + return w.buffer.writeCount(m.namespace, m.globalTags, m.name, m.ivalue, m.tags, m.rate, m.timestamp, m.originDetection, m.overrideCard) case histogram: - return w.buffer.writeHistogram(m.namespace, m.globalTags, m.name, m.fvalue, m.tags, m.rate) + return w.buffer.writeHistogram(m.namespace, m.globalTags, m.name, m.fvalue, m.tags, m.rate, m.originDetection, m.overrideCard) case distribution: - return w.buffer.writeDistribution(m.namespace, m.globalTags, m.name, m.fvalue, m.tags, m.rate) + return w.buffer.writeDistribution(m.namespace, m.globalTags, m.name, m.fvalue, m.tags, m.rate, m.originDetection, m.overrideCard) case set: - return w.buffer.writeSet(m.namespace, m.globalTags, m.name, m.svalue, m.tags, m.rate) + return w.buffer.writeSet(m.namespace, m.globalTags, m.name, m.svalue, m.tags, m.rate, m.originDetection, m.overrideCard) case timing: - return w.buffer.writeTiming(m.namespace, m.globalTags, m.name, m.fvalue, m.tags, m.rate) + return w.buffer.writeTiming(m.namespace, m.globalTags, m.name, m.fvalue, m.tags, m.rate, m.originDetection, m.overrideCard) case event: - return w.buffer.writeEvent(m.evalue, m.globalTags) + return w.buffer.writeEvent(m.evalue, m.globalTags, m.originDetection, m.overrideCard) case serviceCheck: - return w.buffer.writeServiceCheck(m.scvalue, m.globalTags) + return w.buffer.writeServiceCheck(m.scvalue, m.globalTags, m.originDetection, m.overrideCard) case histogramAggregated: return w.writeAggregatedMetricUnsafe(m, histogramSymbol, -1, m.rate) case distributionAggregated: diff --git a/vendor/modules.txt b/vendor/modules.txt index 3758a05d9..8765656d1 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1,4 +1,4 @@ -# github.com/DataDog/datadog-go/v5 v5.7.1 +# github.com/DataDog/datadog-go/v5 v5.8.0 ## explicit; go 1.13 github.com/DataDog/datadog-go/v5/statsd # github.com/Microsoft/go-winio v0.6.2