Skip to content

[maintained fork] Arbitrary stream support, relative gauges, fixes, etc #40

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 39 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
23802da
Add GaugeRelative to support Telegraf statsd implementation
stevenh Jan 30, 2017
6444453
Merge pull request #1 from stevenh/gauge-relative
stevenh Jan 30, 2017
a51b818
Update client to not require a successful ping
heedson Apr 6, 2018
7c0659f
Merge pull request #2 from heedson/patch-1
stevenh Apr 13, 2018
f706d6d
Change references to multiplay project
Apr 6, 2018
06d149b
Remove old references from README.md
Apr 13, 2018
687c8cb
refactor(conn): remove unused fields that were being populated from t…
joeycumines May 20, 2019
510fd89
fix(TestFlush): increase sleep to chance of race causing test failure
joeycumines May 21, 2019
4004ac8
feat(WriteCloser): support arbitrary output streams
joeycumines May 21, 2019
a7101e1
update(readme): maintainer notes
joeycumines May 21, 2019
88faa5e
feat(InlineFlush): support flushing after metrics
joeycumines May 21, 2019
23eed68
fix(tcp-flush): write trailing newlines for all non-udp cases
joeycumines May 26, 2019
c94382d
revert(readme): remove maintainer notes for pr to upstream
joeycumines Feb 14, 2020
9a056b6
fix: Close relative gauges (#4)
stevenh Feb 23, 2020
111b368
fix(Tags): tags option now updates existing tags correctly
joeycumines Aug 26, 2020
3bf2bb4
Create a new WriteCloser that will check before writing if connection…
pabowers Nov 17, 2020
d6baccd
Change the repository to support go modules (#3)
pabowers Nov 17, 2020
807810e
Update README.md
joeycumines Nov 17, 2020
13ed52b
Create a new WriteCloser that will check before writing if connection…
pabowers Nov 17, 2020
bb35aa9
fix(fileperms): correct source files erroneously made executable
joeycumines Nov 17, 2020
1390b97
fix(Tags): tags option now updates existing tags correctly
joeycumines Aug 26, 2020
3f10036
doc(changelogs): link and clarify old and new changelogs
joeycumines Dec 31, 2021
88a3203
Remove defunct .travis.yml
joeycumines Dec 31, 2021
02b390d
Document New/connect mute behavior (fix 1/2 for alexcesaro/statsd#6)
joeycumines Dec 31, 2021
3f725ad
feat(UDPCheck): option to disable initial udp connection check
joeycumines Dec 31, 2021
63f17cc
Add github issue templates
joeycumines Dec 31, 2021
e99aaac
Fix minor staticcheck issues
joeycumines Dec 31, 2021
bb5eb9d
Add github workflow to build test and lint
joeycumines Dec 31, 2021
e0b3289
Change readme links from upstream to this fork
joeycumines Dec 31, 2021
ead05d6
Add missing unit tests
joeycumines Dec 31, 2021
23446bf
Minor documentation typo fixes
joeycumines Dec 31, 2021
a8e6f80
Update changelog
joeycumines Dec 31, 2021
de3db42
Add relative gauge support per alexcesaro/statsd#12
joeycumines Dec 31, 2021
3c8f8ba
fix(GaugeRelative): remove fmt.Sprintf and fix handling of -0.0
joeycumines Dec 31, 2021
2179950
Update changelog
joeycumines Dec 31, 2021
78fe5e2
Readme typo
joeycumines Dec 31, 2021
848634a
Merge branch 'master' into upstream-pr
joeycumines Jan 1, 2022
0ded5b1
Remove .github directory that was erroneously included
joeycumines Jan 1, 2022
efc594e
Restore .travis.yml, reset CHANGELOG.md, remove go.mod
joeycumines Jan 1, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ with other Go StatsD clients.

## Features

- Supports all StatsD metrics: counter, gauge, timing and set
- Supports all StatsD metrics: counter, gauge (absolute and relative), timing and set
- Supports InfluxDB and Datadog tags
- Fast and GC-friendly: all functions for sending metrics do not allocate
- Efficient: metrics are buffered by default
Expand Down
201 changes: 140 additions & 61 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,83 +2,120 @@ package statsd

import (
"io"
"math"
"math/rand"
"net"
"strconv"
"strings"
"sync"
"time"
)

type conn struct {
// Fields settable with options at Client's creation.
addr string
// config

errorHandler func(error)
flushPeriod time.Duration
maxPacketSize int
network string
tagFormat TagFormat
inlineFlush bool

// state

mu sync.Mutex
// Fields guarded by the mutex.
closed bool
w io.WriteCloser
buf []byte
rateCache map[float32]string
mu sync.Mutex // mu synchronises internal state
closed bool // closed indicates if w has been closed (triggered by first client close)
w io.WriteCloser // w is the writer for the connection
buf []byte // buf is the buffer for the connection
rateCache map[float32]string // rateCache caches string representations of sampling rates
trimTrailingNewline bool // trimTrailingNewline is set only when running in UDP mode
}

func newConn(conf connConfig, muted bool) (*conn, error) {
c := &conn{
addr: conf.Addr,
errorHandler: conf.ErrorHandler,
flushPeriod: conf.FlushPeriod,
maxPacketSize: conf.MaxPacketSize,
network: conf.Network,
tagFormat: conf.TagFormat,
inlineFlush: conf.InlineFlush,
w: conf.WriteCloser,
}

// exit if muted
if muted {
// close and clear any provided writer
if c.w != nil {
_ = c.w.Close()
c.w = nil
}
// return muted client
return c, nil
}

var err error
c.w, err = dialTimeout(c.network, c.addr, 5*time.Second)
if err != nil {
return c, err
}
// When using UDP do a quick check to see if something is listening on the
// given port to return an error as soon as possible.
if c.network[:3] == "udp" {
for i := 0; i < 2; i++ {
_, err = c.w.Write(nil)
if err != nil {
_ = c.w.Close()
c.w = nil
return c, err
}
// initialise writer if not provided
if c.w == nil {
if err := c.connect(conf.Network, conf.Addr, conf.UDPCheck); err != nil {
return c, err
}
}

// To prevent a buffer overflow add some capacity to the buffer to allow for
// an additional metric.
c.buf = make([]byte, 0, c.maxPacketSize+200)

if c.flushPeriod > 0 {
go func() {
ticker := time.NewTicker(c.flushPeriod)
for _ = range ticker.C {
c.mu.Lock()
if c.closed {
ticker.Stop()
c.mu.Unlock()
return
// start the flush worker only if we have a rate and it's not unnecessary
if c.flushPeriod > 0 && !c.inlineFlush {
go c.flushWorker()
}

return c, nil
}

func (c *conn) flushWorker() {
ticker := time.NewTicker(c.flushPeriod)
defer ticker.Stop()
for range ticker.C {
if func() bool {
c.mu.Lock()
defer c.mu.Unlock()
if c.closed {
return true
}
c.flush(0)
return false
}() {
return
}
}
}

func (c *conn) connect(network string, address string, UDPCheck bool) error {
var err error
c.w, err = dialTimeout(network, address, 5*time.Second)
if err != nil {
return err
}

if strings.HasPrefix(network, "udp") {
// udp retains behavior from the original implementation where it would strip a trailing newline
c.trimTrailingNewline = true

// When using UDP do a quick check to see if something is listening on the
// given port to return an error as soon as possible.
//
// See also doc for UDPCheck option (factory func) and https://github.com/alexcesaro/statsd/issues/6
if UDPCheck {
for i := 0; i < 2; i++ {
_, err = c.w.Write(nil)
if err != nil {
_ = c.w.Close()
c.w = nil
return err
}
c.flush(0)
c.mu.Unlock()
}
}()
}
}

return c, nil
return nil
}

func (c *conn) metric(prefix, bucket string, n interface{}, typ string, rate float32, tags string) {
Expand All @@ -89,7 +126,23 @@ func (c *conn) metric(prefix, bucket string, n interface{}, typ string, rate flo
c.appendType(typ)
c.appendRate(rate)
c.closeMetric(tags)
c.flushIfBufferFull(l)
c.flushIfNecessary(l)
c.mu.Unlock()
}

func (c *conn) gaugeRelative(prefix, bucket string, value interface{}, tags string) {
c.mu.Lock()
l := len(c.buf)
c.appendBucket(prefix, bucket, tags)
// add a (positive) sign if necessary (if there's no negative sign)
// this is complicated by the special case of negative zero (IEEE-754 floating point thing)
// note that NaN ends up "+NaN" and invalid values end up "+" (both probably going to do nothing / error)
if f, ok := floatValue(value); (!ok && !isNegativeInteger(value)) ||
(ok && (f != f || (f == 0 && !math.Signbit(f)) || (f > 0 && f <= math.MaxFloat64))) {
c.appendByte('+')
}
c.appendGauge(value, tags)
c.flushIfNecessary(l)
c.mu.Unlock()
}

Expand All @@ -98,13 +151,20 @@ func (c *conn) gauge(prefix, bucket string, value interface{}, tags string) {
l := len(c.buf)
// To set a gauge to a negative value we must first set it to 0.
// https://github.com/etsy/statsd/blob/master/docs/metric_types.md#gauges
if isNegative(value) {
// the presence of a sign (/^[-+]/) requires the special case handling
// https://github.com/statsd/statsd/blob/2041f6fb5e64bbf779a8bcb3e9729e63fe207e2f/stats.js#L307
// +Inf doesn't get this special case, no particular reason, it's just existing behavior
if f, ok := floatValue(value); ok && f == 0 {
// special case to handle negative zero (IEEE-754 floating point thing)
value = 0
} else if (ok && f < 0) || (!ok && isNegativeInteger(value)) {
// note this case includes -Inf, which is just existing behavior that's been retained
c.appendBucket(prefix, bucket, tags)
c.appendGauge(0, tags)
}
c.appendBucket(prefix, bucket, tags)
c.appendGauge(value, tags)
c.flushIfBufferFull(l)
c.flushIfNecessary(l)
c.mu.Unlock()
}

Expand All @@ -121,7 +181,7 @@ func (c *conn) unique(prefix, bucket string, value string, tags string) {
c.appendString(value)
c.appendType("s")
c.closeMetric(tags)
c.flushIfBufferFull(l)
c.flushIfNecessary(l)
c.mu.Unlock()
}

Expand Down Expand Up @@ -162,34 +222,32 @@ func (c *conn) appendNumber(v interface{}) {
}
}

func isNegative(v interface{}) bool {
switch n := v.(type) {
func isNegativeInteger(n interface{}) bool {
switch n := n.(type) {
case int:
return n < 0
case uint:
return n < 0
case int64:
return n < 0
case uint64:
return n < 0
case int32:
return n < 0
case uint32:
return n < 0
case int16:
return n < 0
case uint16:
return n < 0
case int8:
return n < 0
case uint8:
return n < 0
default:
return false
}
}

func floatValue(n interface{}) (float64, bool) {
switch n := n.(type) {
case float64:
return n < 0
return n, true
case float32:
return n < 0
return float64(n), true
default:
return 0, false
}
return false
}

func (c *conn) appendBucket(prefix, bucket string, tags string) {
Expand Down Expand Up @@ -231,8 +289,21 @@ func (c *conn) closeMetric(tags string) {
c.appendByte('\n')
}

func (c *conn) flushIfBufferFull(lastSafeLen int) {
func (c *conn) flushNecessary() bool {
if c.inlineFlush {
return true
}
if len(c.buf) > c.maxPacketSize {
return true
}
return false
}

func (c *conn) flushIfNecessary(lastSafeLen int) {
if c.inlineFlush {
lastSafeLen = 0
}
if c.flushNecessary() {
c.flush(lastSafeLen)
}
}
Expand All @@ -247,9 +318,17 @@ func (c *conn) flush(n int) {
n = len(c.buf)
}

// Trim the last \n, StatsD does not like it.
_, err := c.w.Write(c.buf[:n-1])
// write
buffer := c.buf[:n]
if c.trimTrailingNewline {
// https://github.com/cactus/go-statsd-client/issues/17
// Trim the last \n, StatsD does not like it.
buffer = buffer[:len(buffer)-1]
}
_, err := c.w.Write(buffer)
c.handleError(err)

// consume
if n < len(c.buf) {
copy(c.buf, c.buf[n:])
}
Expand Down
2 changes: 1 addition & 1 deletion examples_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"runtime"
"time"

"gopkg.in/alexcesaro/statsd.v2"
"github.com/joeycumines/statsd"
)

var (
Expand Down
Loading