diff --git a/README.md b/README.md index 774a1c6..f255c39 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,7 @@ with other Go StatsD clients. ## Features -- Supports all StatsD metrics: counter, gauge, timing and set +- Supports all StatsD metrics: counter, gauge (absolute and relative), timing and set - Supports InfluxDB and Datadog tags - Fast and GC-friendly: all functions for sending metrics do not allocate - Efficient: metrics are buffered by default diff --git a/conn.go b/conn.go index 4dbda63..7739eeb 100644 --- a/conn.go +++ b/conn.go @@ -2,59 +2,59 @@ package statsd import ( "io" + "math" "math/rand" "net" "strconv" + "strings" "sync" "time" ) type conn struct { - // Fields settable with options at Client's creation. - addr string + // config + errorHandler func(error) flushPeriod time.Duration maxPacketSize int - network string tagFormat TagFormat + inlineFlush bool + + // state - mu sync.Mutex - // Fields guarded by the mutex. - closed bool - w io.WriteCloser - buf []byte - rateCache map[float32]string + mu sync.Mutex // mu synchronises internal state + closed bool // closed indicates if w has been closed (triggered by first client close) + w io.WriteCloser // w is the writer for the connection + buf []byte // buf is the buffer for the connection + rateCache map[float32]string // rateCache caches string representations of sampling rates + trimTrailingNewline bool // trimTrailingNewline is set only when running in UDP mode } func newConn(conf connConfig, muted bool) (*conn, error) { c := &conn{ - addr: conf.Addr, errorHandler: conf.ErrorHandler, flushPeriod: conf.FlushPeriod, maxPacketSize: conf.MaxPacketSize, - network: conf.Network, tagFormat: conf.TagFormat, + inlineFlush: conf.InlineFlush, + w: conf.WriteCloser, } + // exit if muted if muted { + // close and clear any provided writer + if c.w != nil { + _ = c.w.Close() + c.w = nil + } + // return muted client return c, nil } - var err error - c.w, err = dialTimeout(c.network, c.addr, 5*time.Second) - if err != nil { - return c, err - } - // When using UDP do a quick check to see if something is listening on the - // given port to return an error as soon as possible. - if c.network[:3] == "udp" { - for i := 0; i < 2; i++ { - _, err = c.w.Write(nil) - if err != nil { - _ = c.w.Close() - c.w = nil - return c, err - } + // initialise writer if not provided + if c.w == nil { + if err := c.connect(conf.Network, conf.Addr, conf.UDPCheck); err != nil { + return c, err } } @@ -62,23 +62,60 @@ func newConn(conf connConfig, muted bool) (*conn, error) { // an additional metric. c.buf = make([]byte, 0, c.maxPacketSize+200) - if c.flushPeriod > 0 { - go func() { - ticker := time.NewTicker(c.flushPeriod) - for _ = range ticker.C { - c.mu.Lock() - if c.closed { - ticker.Stop() - c.mu.Unlock() - return + // start the flush worker only if we have a rate and it's not unnecessary + if c.flushPeriod > 0 && !c.inlineFlush { + go c.flushWorker() + } + + return c, nil +} + +func (c *conn) flushWorker() { + ticker := time.NewTicker(c.flushPeriod) + defer ticker.Stop() + for range ticker.C { + if func() bool { + c.mu.Lock() + defer c.mu.Unlock() + if c.closed { + return true + } + c.flush(0) + return false + }() { + return + } + } +} + +func (c *conn) connect(network string, address string, UDPCheck bool) error { + var err error + c.w, err = dialTimeout(network, address, 5*time.Second) + if err != nil { + return err + } + + if strings.HasPrefix(network, "udp") { + // udp retains behavior from the original implementation where it would strip a trailing newline + c.trimTrailingNewline = true + + // When using UDP do a quick check to see if something is listening on the + // given port to return an error as soon as possible. + // + // See also doc for UDPCheck option (factory func) and https://github.com/alexcesaro/statsd/issues/6 + if UDPCheck { + for i := 0; i < 2; i++ { + _, err = c.w.Write(nil) + if err != nil { + _ = c.w.Close() + c.w = nil + return err } - c.flush(0) - c.mu.Unlock() } - }() + } } - return c, nil + return nil } func (c *conn) metric(prefix, bucket string, n interface{}, typ string, rate float32, tags string) { @@ -89,7 +126,23 @@ func (c *conn) metric(prefix, bucket string, n interface{}, typ string, rate flo c.appendType(typ) c.appendRate(rate) c.closeMetric(tags) - c.flushIfBufferFull(l) + c.flushIfNecessary(l) + c.mu.Unlock() +} + +func (c *conn) gaugeRelative(prefix, bucket string, value interface{}, tags string) { + c.mu.Lock() + l := len(c.buf) + c.appendBucket(prefix, bucket, tags) + // add a (positive) sign if necessary (if there's no negative sign) + // this is complicated by the special case of negative zero (IEEE-754 floating point thing) + // note that NaN ends up "+NaN" and invalid values end up "+" (both probably going to do nothing / error) + if f, ok := floatValue(value); (!ok && !isNegativeInteger(value)) || + (ok && (f != f || (f == 0 && !math.Signbit(f)) || (f > 0 && f <= math.MaxFloat64))) { + c.appendByte('+') + } + c.appendGauge(value, tags) + c.flushIfNecessary(l) c.mu.Unlock() } @@ -98,13 +151,20 @@ func (c *conn) gauge(prefix, bucket string, value interface{}, tags string) { l := len(c.buf) // To set a gauge to a negative value we must first set it to 0. // https://github.com/etsy/statsd/blob/master/docs/metric_types.md#gauges - if isNegative(value) { + // the presence of a sign (/^[-+]/) requires the special case handling + // https://github.com/statsd/statsd/blob/2041f6fb5e64bbf779a8bcb3e9729e63fe207e2f/stats.js#L307 + // +Inf doesn't get this special case, no particular reason, it's just existing behavior + if f, ok := floatValue(value); ok && f == 0 { + // special case to handle negative zero (IEEE-754 floating point thing) + value = 0 + } else if (ok && f < 0) || (!ok && isNegativeInteger(value)) { + // note this case includes -Inf, which is just existing behavior that's been retained c.appendBucket(prefix, bucket, tags) c.appendGauge(0, tags) } c.appendBucket(prefix, bucket, tags) c.appendGauge(value, tags) - c.flushIfBufferFull(l) + c.flushIfNecessary(l) c.mu.Unlock() } @@ -121,7 +181,7 @@ func (c *conn) unique(prefix, bucket string, value string, tags string) { c.appendString(value) c.appendType("s") c.closeMetric(tags) - c.flushIfBufferFull(l) + c.flushIfNecessary(l) c.mu.Unlock() } @@ -162,34 +222,32 @@ func (c *conn) appendNumber(v interface{}) { } } -func isNegative(v interface{}) bool { - switch n := v.(type) { +func isNegativeInteger(n interface{}) bool { + switch n := n.(type) { case int: return n < 0 - case uint: - return n < 0 case int64: return n < 0 - case uint64: - return n < 0 case int32: return n < 0 - case uint32: - return n < 0 case int16: return n < 0 - case uint16: - return n < 0 case int8: return n < 0 - case uint8: - return n < 0 + default: + return false + } +} + +func floatValue(n interface{}) (float64, bool) { + switch n := n.(type) { case float64: - return n < 0 + return n, true case float32: - return n < 0 + return float64(n), true + default: + return 0, false } - return false } func (c *conn) appendBucket(prefix, bucket string, tags string) { @@ -231,8 +289,21 @@ func (c *conn) closeMetric(tags string) { c.appendByte('\n') } -func (c *conn) flushIfBufferFull(lastSafeLen int) { +func (c *conn) flushNecessary() bool { + if c.inlineFlush { + return true + } if len(c.buf) > c.maxPacketSize { + return true + } + return false +} + +func (c *conn) flushIfNecessary(lastSafeLen int) { + if c.inlineFlush { + lastSafeLen = 0 + } + if c.flushNecessary() { c.flush(lastSafeLen) } } @@ -247,9 +318,17 @@ func (c *conn) flush(n int) { n = len(c.buf) } - // Trim the last \n, StatsD does not like it. - _, err := c.w.Write(c.buf[:n-1]) + // write + buffer := c.buf[:n] + if c.trimTrailingNewline { + // https://github.com/cactus/go-statsd-client/issues/17 + // Trim the last \n, StatsD does not like it. + buffer = buffer[:len(buffer)-1] + } + _, err := c.w.Write(buffer) c.handleError(err) + + // consume if n < len(c.buf) { copy(c.buf, c.buf[n:]) } diff --git a/examples_test.go b/examples_test.go index dc6c2f3..89bc9af 100644 --- a/examples_test.go +++ b/examples_test.go @@ -5,7 +5,7 @@ import ( "runtime" "time" - "gopkg.in/alexcesaro/statsd.v2" + "github.com/joeycumines/statsd" ) var ( diff --git a/options.go b/options.go index ef95bb8..a757662 100644 --- a/options.go +++ b/options.go @@ -2,6 +2,7 @@ package statsd import ( "bytes" + "io" "strings" "time" ) @@ -18,6 +19,7 @@ type clientConfig struct { Tags []tag } +// connConfig is used by New, to initialise a conn type connConfig struct { Addr string ErrorHandler func(error) @@ -25,6 +27,9 @@ type connConfig struct { MaxPacketSize int Network string TagFormat TagFormat + WriteCloser io.WriteCloser + InlineFlush bool + UDPCheck bool } // An Option represents an option for a Client. It must be used as an @@ -52,7 +57,7 @@ func ErrorHandler(h func(error)) Option { } // FlushPeriod sets how often the Client's buffer is flushed. If p is 0, the -// goroutine that periodically flush the buffer is not lauched and the buffer +// goroutine that periodically flush the buffer is not launched and the buffer // is only flushed when it is full. // // By default, the flush period is 100 ms. This option is ignored in @@ -84,8 +89,49 @@ func Network(network string) Option { }) } +// WriteCloser sets the connection writer used by the client. If this option is +// present it will take precedence over the Network and Address options. If the +// client is muted then the writer will be closed before returning. The writer +// will be closed on Client.Close. Multiples of this option will cause the last +// writer to be used (if any), and previously provided writers to be closed. +// +// This option is ignored in Client.Clone(). +func WriteCloser(writer io.WriteCloser) Option { + return func(c *config) { + if c.Conn.WriteCloser != nil { + _ = c.Conn.WriteCloser.Close() + } + c.Conn.WriteCloser = writer + } +} + +// InlineFlush enables or disables (default disabled) forced flushing, inline +// with recording each stat. This option takes precedence over FlushPeriod, +// which would be redundant if always flushing after each write. Note that +// this DOES NOT guarantee exactly one line per write. +// +// This option is ignored in Client.Clone(). +func InlineFlush(enabled bool) Option { + return func(c *config) { + c.Conn.InlineFlush = enabled + } +} + +// UDPCheck enables or disables (default enabled) checking UDP connections, as +// part of New. This behavior is useful, as it makes it easier to quickly +// identify misconfigured services. Disabling this option removes the need to +// explicitly manage the connection state, at the cost of error visibility. +// Using an error handler may mitigate some of this cost. +// +// This option is ignored in Client.Clone(). +func UDPCheck(enabled bool) Option { + return func(c *config) { + c.Conn.UDPCheck = enabled + } +} + // Mute sets whether the Client is muted. All methods of a muted Client do -// nothing and return immedialtly. +// nothing and return immediately. // // This option can be used in Client.Clone() only if the parent Client is not // muted. The clones of a muted Client are always muted. @@ -135,33 +181,19 @@ func Tags(tags ...string) Option { if len(tags)%2 != 0 { panic("statsd: Tags only accepts an even number of arguments") } - - return Option(func(c *config) { - if len(tags) == 0 { - return - } - - newTags := make([]tag, len(tags)/2) + return func(c *config) { + UpdateLoop: for i := 0; i < len(tags)/2; i++ { - newTags[i] = tag{K: tags[2*i], V: tags[2*i+1]} - } - - for _, newTag := range newTags { - exists := false - for _, oldTag := range c.Client.Tags { + newTag := tag{K: tags[2*i], V: tags[2*i+1]} + for i, oldTag := range c.Client.Tags { if newTag.K == oldTag.K { - exists = true - oldTag.V = newTag.V + c.Client.Tags[i] = newTag + continue UpdateLoop } } - if !exists { - c.Client.Tags = append(c.Client.Tags, tag{ - K: newTag.K, - V: newTag.V, - }) - } + c.Client.Tags = append(c.Client.Tags, newTag) } - }) + } } type tag struct { diff --git a/safeconn.go b/safeconn.go new file mode 100644 index 0000000..f1058ab --- /dev/null +++ b/safeconn.go @@ -0,0 +1,68 @@ +package statsd + +import ( + "fmt" + "io" + "net" + "time" +) + +const ( + defaultReadTimeout = 10 * time.Millisecond + defaultConnTimeout = 5 * time.Second +) + +// SafeConn is an implementation of the io.WriteCloser that wraps a net.Conn type +// its purpose is to perform a guard as a part of each Write call to first check if +// the connection is still up by performing a small read. The use case of this is to +// protect against the case where a TCP connection comes disconnected and the Write +// continues to retry for up to 15 minutes before determining that the connection has +// been broken off. +type SafeConn struct { + netConn net.Conn + connTimeout time.Duration + readTimeout time.Duration +} + +func NewSafeConn(network, address string, connTimeout, readTimeout time.Duration) (*SafeConn, error) { + newConn, err := dialTimeout(network, address, connTimeout) + if err != nil { + return nil, err + } + + c := &SafeConn{ + netConn: newConn, + connTimeout: connTimeout, + readTimeout: readTimeout, + } + + return c, nil +} + +func NewSafeConnWithDefaultTimeouts(network string, address string) (*SafeConn, error) { + return NewSafeConn(network, address, defaultConnTimeout, defaultReadTimeout) +} + +func (s *SafeConn) Write(p []byte) (n int, err error) { + // check if connection is closed + if s.connIsClosed() { + return 0, fmt.Errorf("connection is closed") + } + + return s.netConn.Write(p) +} + +func (s *SafeConn) Close() error { + return s.netConn.Close() +} + +func (s *SafeConn) connIsClosed() bool { + err := s.netConn.SetReadDeadline(time.Now().Add(s.readTimeout)) + if err != nil { + return true + } + + one := make([]byte, 1) + _, err = s.netConn.Read(one) + return err == io.EOF +} diff --git a/safeconn_test.go b/safeconn_test.go new file mode 100644 index 0000000..3cf5ffe --- /dev/null +++ b/safeconn_test.go @@ -0,0 +1,224 @@ +package statsd + +import ( + "errors" + "io" + "net" + "testing" + "time" +) + +type mockNetConn struct { + read func(p []byte) (n int, err error) + write func(p []byte) (n int, err error) + close func() error + localAddr func() net.Addr + remoteAddr func() net.Addr + setDeadline func(t time.Time) error + setReadDeadline func(t time.Time) error + setWriteDeadline func(t time.Time) error +} + +func (m *mockNetConn) Read(p []byte) (n int, err error) { + if m.read != nil { + return m.read(p) + } + panic("implement me") +} + +func (m *mockNetConn) Write(p []byte) (n int, err error) { + if m.write != nil { + return m.write(p) + } + panic("implement me") +} + +func (m *mockNetConn) Close() error { + if m.close != nil { + return m.close() + } + panic("implement me") +} + +func (m *mockNetConn) LocalAddr() net.Addr { + if m.localAddr != nil { + return m.localAddr() + } + panic("implement me") +} + +func (m *mockNetConn) RemoteAddr() net.Addr { + if m.remoteAddr != nil { + return m.remoteAddr() + } + panic("implement me") +} + +func (m *mockNetConn) SetDeadline(t time.Time) error { + if m.setDeadline != nil { + return m.setDeadline(t) + } + panic("implement me") +} + +func (m *mockNetConn) SetReadDeadline(t time.Time) error { + if m.setReadDeadline != nil { + return m.setReadDeadline(t) + } + panic("implement me") +} + +func (m *mockNetConn) SetWriteDeadline(t time.Time) error { + if m.setWriteDeadline != nil { + return m.setWriteDeadline(t) + } + panic("implement me") +} + +func TestSafeConn_FailsToWriteIfCannotRead(t *testing.T) { + c := &mockNetConn{ + setReadDeadline: func(t time.Time) error { + return nil + }, + read: func(b []byte) (int, error) { + return 0, io.EOF + }, + } + + s := SafeConn{ + netConn: c, + } + + p := []byte("test_key:1|c\n") + n, err := s.Write(p) + if n != 0 { + t.Error("Write() did not return 0 bytes when it failed") + } + if err == nil { + t.Error("Error should have been connection is closed") + } +} + +func TestSafeConn_SuccessfullyWritesWhenConnectionOpen(t *testing.T) { + c := &mockNetConn{ + setReadDeadline: func(t time.Time) error { + return nil + }, + read: func(b []byte) (int, error) { + return 1, nil + }, + write: func(b []byte) (int, error) { + return len(b), nil + }, + } + + s := SafeConn{ + netConn: c, + } + + p := []byte("test_key:1|c\n") + _, err := s.Write(p) + if err != nil { + t.Errorf("Error should have been nil, but instead it was: %v", err) + } +} + +func TestNewSafeConnWithDefaultTimeouts(t *testing.T) { + for _, tc := range [...]struct { + Name string + Conn net.Conn + Err error + }{ + { + Name: `failure`, + Err: errors.New(`some error`), + }, + { + Name: `success`, + Conn: new(mockNetConn), + }, + } { + t.Run(tc.Name, func(t *testing.T) { + type ( + DialIn struct { + Network string + Address string + Timeout time.Duration + } + DialOut struct { + Conn net.Conn + Err error + } + ) + var ( + dialIn = make(chan DialIn) + dialOut = make(chan DialOut) + ) + defer close(dialIn) + defer close(dialOut) + defer func() func() { + old := dialTimeout + dialTimeout = func(network, address string, timeout time.Duration) (net.Conn, error) { + dialIn <- DialIn{network, address, timeout} + v := <-dialOut + return v.Conn, v.Err + } + return func() { dialTimeout = old } + }()() + + const ( + expectedNetwork = `tcp` + expectedAddress = `127.0.0.1:21969` + ) + + done := make(chan struct{}) + go func() { + defer close(done) + if v := <-dialIn; v != (DialIn{Network: expectedNetwork, Address: expectedAddress, Timeout: defaultConnTimeout}) { + t.Errorf("%+v", v) + } + dialOut <- DialOut{tc.Conn, tc.Err} + }() + + conn, err := NewSafeConnWithDefaultTimeouts(expectedNetwork, expectedAddress) + if err != tc.Err { + t.Error(conn, err) + } + if (tc.Conn == nil) != (conn == nil) { + t.Error(conn) + } else if conn != nil { + if conn.netConn != tc.Conn { + t.Error(conn.netConn) + } + if conn.connTimeout != defaultConnTimeout { + t.Error(conn.connTimeout) + } + if conn.readTimeout != defaultReadTimeout { + t.Error(conn.readTimeout) + } + } + + <-done + }) + } +} + +func TestSafeConn_Close(t *testing.T) { + a, b := net.Pipe() + conn := SafeConn{ + netConn: a, + readTimeout: 1, + } + if conn.connIsClosed() { + t.Error() + } + if err := (&SafeConn{netConn: b}).Close(); err != nil { + t.Error(err) + } + if !conn.connIsClosed() { + t.Error() + } + if err := conn.Close(); err != nil { + t.Error(err) + } +} diff --git a/statsd.go b/statsd.go index f19204d..c86724c 100644 --- a/statsd.go +++ b/statsd.go @@ -11,7 +11,9 @@ type Client struct { tags string } -// New returns a new Client. +// New returns a new Client, which will always be non-nil, but will be muted +// (permanently inert / stubbed) if returned with an error, or if the Mute +// option was set. func New(opts ...Option) (*Client, error) { // The default configuration. conf := &config{ @@ -25,6 +27,7 @@ func New(opts ...Option) (*Client, error) { // Ethernet MTU - IPv6 Header - TCP Header = 1500 - 40 - 20 = 1440 MaxPacketSize: 1440, Network: "udp", + UDPCheck: true, }, } for _, o := range opts { @@ -87,7 +90,7 @@ func (c *Client) skip() bool { return c.muted || (c.rate != 1 && randFloat() > c.rate) } -// Increment increment the given bucket. It is equivalent to Count(bucket, 1). +// Increment increments the given bucket. It is equivalent to Count(bucket, 1). func (c *Client) Increment(bucket string) { c.Count(bucket, 1) } @@ -100,6 +103,14 @@ func (c *Client) Gauge(bucket string, value interface{}) { c.conn.gauge(c.prefix, bucket, value, c.tags) } +// GaugeRelative records a relative value for the given bucket. +func (c *Client) GaugeRelative(bucket string, value interface{}) { + if c.skip() { + return + } + c.conn.gaugeRelative(c.prefix, bucket, value, c.tags) +} + // Timing sends a timing value to a bucket. func (c *Client) Timing(bucket string, value interface{}) { if c.skip() { @@ -108,7 +119,7 @@ func (c *Client) Timing(bucket string, value interface{}) { c.conn.metric(c.prefix, bucket, value, "ms", c.rate, c.tags) } -// Histogram sends an histogram value to a bucket. +// Histogram sends a histogram value to a bucket. func (c *Client) Histogram(bucket string, value interface{}) { if c.skip() { return @@ -116,7 +127,7 @@ func (c *Client) Histogram(bucket string, value interface{}) { c.conn.metric(c.prefix, bucket, value, "h", c.rate, c.tags) } -// A Timing is an helper object that eases sending timing values. +// Timing is a helper object that eases sending timing values. type Timing struct { start time.Time c *Client diff --git a/statsd_test.go b/statsd_test.go index c97650c..fb976df 100644 --- a/statsd_test.go +++ b/statsd_test.go @@ -5,7 +5,10 @@ import ( "errors" "io" "io/ioutil" + "math" "net" + "runtime" + "strings" "sync" "testing" "time" @@ -31,12 +34,184 @@ func TestIncrement(t *testing.T) { } func TestGauge(t *testing.T) { - testOutput(t, "test_key:5|g\ntest_key:0|g\ntest_key:-10|g", func(c *Client) { + testOutput(t, "test_key:5|g\ntest_key:0|g\ntest_key:-10|g\ntest_key:5|g\ntest_key:0|g\ntest_key:-10|g\ntest_key:0|g\ntest_key:0|g", func(c *Client) { c.Gauge(testKey, 5) c.Gauge(testKey, -10) + c.Gauge(testKey, 5.0) + c.Gauge(testKey, -10.0) + c.Gauge(testKey, 0.0) + c.Gauge(testKey, math.Copysign(0, -1)) }) } +func TestClient_GaugeRelative(t *testing.T) { + for _, tc := range [...]struct { + Name string + Output string + Input func(c *Client) + }{ + { + Name: "inc then dec", + Output: "test_key:+5|g\ntest_key:-10|g", + Input: func(c *Client) { + c.GaugeRelative(testKey, 5) + c.GaugeRelative(testKey, -10) + }, + }, + { + Name: "neg", + Output: "test_key:-3.123|g", + Input: func(c *Client) { + c.GaugeRelative(testKey, -3.123) + }, + }, + { + Name: "pos", + Output: "test_key:+3.123|g", + Input: func(c *Client) { + c.GaugeRelative(testKey, 3.123) + }, + }, + { + Name: "zero", + Output: "test_key:+0|g", + Input: func(c *Client) { + c.GaugeRelative(testKey, 0) + }, + }, + { + Name: "zero float64", + Output: "test_key:+0|g", + Input: func(c *Client) { + c.GaugeRelative(testKey, float64(0)) + }, + }, + { + Name: "zero float32", + Output: "test_key:+0|g", + Input: func(c *Client) { + c.GaugeRelative(testKey, float32(0)) + }, + }, + { + Name: "neg zero float64", + Output: "test_key:-0|g", + Input: func(c *Client) { + c.GaugeRelative(testKey, math.Copysign(0, -1)) + }, + }, + { + Name: "neg zero float32", + Output: "test_key:-0|g", + Input: func(c *Client) { + c.GaugeRelative(testKey, float32(math.Copysign(0, -1))) + }, + }, + { + Name: "pos large", + Output: "test_key:+5932443000000000000000000000|g", + Input: func(c *Client) { + c.GaugeRelative(testKey, float32(59324.4289e23)) + }, + }, + { + Name: "pos small", + Output: "test_key:+0.00000000000000000059324427|g", + Input: func(c *Client) { + c.GaugeRelative(testKey, float32(59324.4289e-23)) + }, + }, + { + Name: "uint8", + Output: "test_key:+252|g", + Input: func(c *Client) { + c.GaugeRelative(testKey, byte(252)) + }, + }, + { + Name: "neg int64", + Output: "test_key:-9942423|g", + Input: func(c *Client) { + c.GaugeRelative(testKey, int64(-9942423)) + }, + }, + { + Name: "uint 0", + Output: "test_key:+0|g", + Input: func(c *Client) { + c.GaugeRelative(testKey, uint(0)) + }, + }, + { + Name: "pos max float64", + Output: "test_key:+179769313486231570000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000|g", + Input: func(c *Client) { + c.GaugeRelative(testKey, math.MaxFloat64) + }, + }, + { + Name: "neg max float64", + Output: "test_key:-179769313486231570000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000|g", + Input: func(c *Client) { + c.GaugeRelative(testKey, -math.MaxFloat64) + }, + }, + { + Name: "pos inf float64", + Output: "test_key:+Inf|g", + Input: func(c *Client) { + c.GaugeRelative(testKey, math.Inf(1)) + }, + }, + { + Name: "neg inf float64", + Output: "test_key:-Inf|g", + Input: func(c *Client) { + c.GaugeRelative(testKey, math.Inf(-1)) + }, + }, + { + Name: "pos inf float32", + Output: "test_key:+Inf|g", + Input: func(c *Client) { + c.GaugeRelative(testKey, float32(math.Inf(1))) + }, + }, + { + Name: "neg inf float32", + Output: "test_key:-Inf|g", + Input: func(c *Client) { + c.GaugeRelative(testKey, float32(math.Inf(-1))) + }, + }, + { + Name: "nan float64", + Output: "test_key:+NaN|g", + Input: func(c *Client) { + c.GaugeRelative(testKey, math.NaN()) + }, + }, + { + Name: "nan float32", + Output: "test_key:+NaN|g", + Input: func(c *Client) { + c.GaugeRelative(testKey, float32(math.NaN())) + }, + }, + { + Name: "unsupported", + Output: "test_key:+|g", + Input: func(c *Client) { + c.GaugeRelative(testKey, struct{}{}) + }, + }, + } { + t.Run(tc.Name, func(t *testing.T) { + testOutput(t, tc.Output, tc.Input) + }) + } +} + func TestTiming(t *testing.T) { testOutput(t, "test_key:6|ms", func(c *Client) { c.Timing(testKey, 6) @@ -132,6 +307,7 @@ func TestMute(t *testing.T) { } c.Increment(testKey) c.Gauge(testKey, 1) + c.GaugeRelative(testKey, 1) c.Timing(testKey, 1) c.Histogram(testKey, 1) c.Unique(testKey, "1") @@ -233,7 +409,7 @@ func TestFlush(t *testing.T) { func TestFlushPeriod(t *testing.T) { testClient(t, func(c *Client) { c.Increment(testKey) - time.Sleep(time.Millisecond) + time.Sleep(time.Millisecond * 400) c.conn.mu.Lock() got := getOutput(c) want := "test_key:1|c" @@ -245,6 +421,27 @@ func TestFlushPeriod(t *testing.T) { }, FlushPeriod(time.Nanosecond)) } +func TestFlushPeriod_writeCloser(t *testing.T) { + c, err := New( + ErrorHandler(expectNoError(t)), + FlushPeriod(time.Nanosecond), + WriteCloser(&testBuffer{}), + ) + if err != nil { + t.Fatalf("New: %v", err) + } + c.Increment(testKey) + time.Sleep(time.Millisecond * 400) + c.conn.mu.Lock() + got := getOutput(c) + want := "test_key:1|c\n" + if got != want { + t.Errorf("Invalid output, got %q, want %q", got, want) + } + c.conn.mu.Unlock() + c.Close() +} + func TestMaxPacketSize(t *testing.T) { testClient(t, func(c *Client) { c.Increment(testKey) @@ -317,17 +514,17 @@ func TestCloneRate(t *testing.T) { } func TestCloneInfluxDBTags(t *testing.T) { - testOutput(t, "test_key,tag1=value1,tag2=value2:5|c", func(c *Client) { - clone := c.Clone(Tags("tag1", "value3", "tag2", "value2")) + testOutput(t, "test_key,tag1=value3,tag3=value4,tag4=value9,tag5=value6,tag2=value2:5|c", func(c *Client) { + clone := c.Clone(Tags("tag2", "value2", "tag1", "value3", "tag4", "value8", "tag4", "value9")) clone.Count(testKey, 5) - }, TagsFormat(InfluxDB), Tags("tag1", "value1")) + }, TagsFormat(InfluxDB), Tags("tag1", "value1", "tag3", "value4", "tag4", "value5", "tag5", "value6", "tag4", "value7")) } func TestCloneDatadogTags(t *testing.T) { - testOutput(t, "test_key:5|c|#tag1:value1,tag2:value2", func(c *Client) { + testOutput(t, "test_key:5|c|#tag1:value3,tag3:value4,tag2:value2", func(c *Client) { clone := c.Clone(Tags("tag1", "value3", "tag2", "value2")) clone.Count(testKey, 5) - }, TagsFormat(Datadog), Tags("tag1", "value1")) + }, TagsFormat(Datadog), Tags("tag1", "value1", "tag3", "value4")) } func TestDialError(t *testing.T) { @@ -359,19 +556,188 @@ func TestConcurrency(t *testing.T) { }) } -func TestUDPNotListening(t *testing.T) { - dialTimeout = mockUDPClosed - defer func() { dialTimeout = net.DialTimeout }() +func TestNew_udpNotListening(t *testing.T) { + for _, tc := range [...]struct { + Name string + Options []Option + Muted bool + Errored bool + }{ + { + Name: `default`, + Muted: true, + Errored: true, + }, + { + Name: `true`, + Options: []Option{UDPCheck(true)}, + Muted: true, + Errored: true, + }, + { + Name: `false`, + Options: []Option{UDPCheck(false)}, + Muted: false, + Errored: false, + }, + } { + t.Run(tc.Name, func(t *testing.T) { + dialTimeout = mockUDPClosed + defer func() { dialTimeout = net.DialTimeout }() + c, err := New(tc.Options...) + if c == nil { + t.Fatal(`client should never be nil`) + } + if c.muted != tc.Muted { + t.Error(c.muted) + } + if (err != nil) != tc.Errored { + t.Error(err) + } + }) + } +} - c, err := New() - if c == nil || !c.muted { - t.Error("New() did not return a muted client") +func TestWriteCloser(t *testing.T) { + count := 0 + writer := &mockWriteCloser{ + close: func() error { + count++ + return nil + }, } - if err == nil { - t.Error("New should return an error") + config := new(config) + WriteCloser(writer)(config) + if v := config.Conn.WriteCloser; v != writer { + t.Fatal(v) + } + if count != 0 { + t.Fatal(writer) + } + WriteCloser(nil)(config) + if v := config.Conn.WriteCloser; v != nil { + t.Fatal(v) + } + if count != 1 { + t.Fatal(writer) } } +func TestNew_writeCloserClosesOnMute(t *testing.T) { + count := 0 + writer := &mockWriteCloser{ + close: func() error { + count++ + return nil + }, + } + client, err := New( + FlushPeriod(0), + ErrorHandler(expectNoError(t)), + WriteCloser(writer), + Mute(true), + ) + if client == nil || err != nil { + t.Fatal(client, err) + } + if client.conn.w != nil { + t.Error(client.conn.w) + } + if count != 1 { + t.Error(count) + } +} + +func TestNew_inlineFlush(t *testing.T) { + defer func() func() { + startGoroutines := runtime.NumGoroutine() + return func() { + endGoroutines := runtime.NumGoroutine() + if startGoroutines < endGoroutines { + t.Error(startGoroutines, endGoroutines) + } + } + }()() + client, err := New( + FlushPeriod(0), + ErrorHandler(expectNoError(t)), + WriteCloser(&mockWriteCloser{}), + InlineFlush(true), + ) + if client == nil || err != nil || client.muted { + t.Fatal(client, err) + } + if !client.conn.inlineFlush { + t.Error(client.conn.inlineFlush) + } +} + +func TestInlineFlush(t *testing.T) { + config := new(config) + InlineFlush(true)(config) + if !config.Conn.InlineFlush { + t.Error(config.Conn.InlineFlush) + } +} + +func TestConn_flushNecessary_inlineFlush(t *testing.T) { + if !(&conn{inlineFlush: true}).flushNecessary() { + t.Error(`expected always true if always flush is enabled`) + } +} + +func TestConn_flushIfNecessary_inlineFlush(t *testing.T) { + var ( + called bool + flushed string + c = &conn{ + buf: []byte("test_key:1|c\n"), + inlineFlush: true, + maxPacketSize: 100, + w: &mockWriteCloser{ + write: func(b []byte) (int, error) { + if called { + t.Error(`called more than once`) + } + called = true + flushed = string(b) + return len(b), nil + }, + }, + } + ) + // will actually flush everything + c.flushIfNecessary(2) + if !called { + t.Error(called) + } + if flushed != "test_key:1|c\n" { + t.Error(flushed) + } + if len(c.buf) != 0 { + t.Error(c.buf) + } +} + +type mockWriteCloser struct { + write func(p []byte) (n int, err error) + close func() error +} + +func (m *mockWriteCloser) Write(p []byte) (n int, err error) { + if m.write != nil { + return m.write(p) + } + panic("implement me") +} + +func (m *mockWriteCloser) Close() error { + if m.close != nil { + return m.close() + } + panic("implement me") +} + type mockClosedUDPConn struct { i int net.Conn @@ -394,6 +760,7 @@ func mockUDPClosed(string, string, time.Duration) (net.Conn, error) { } func testClient(t *testing.T, f func(*Client), options ...Option) { + t.Helper() dialTimeout = mockDial defer func() { dialTimeout = net.DialTimeout }() @@ -410,7 +777,9 @@ func testClient(t *testing.T, f func(*Client), options ...Option) { } func testOutput(t *testing.T, want string, f func(*Client), options ...Option) { + t.Helper() testClient(t, func(c *Client) { + t.Helper() f(c) c.Close() @@ -474,6 +843,13 @@ func testNetwork(t *testing.T, network string) { received := make(chan bool) server := newServer(t, network, testAddr, func(p []byte) { s := string(p) + if network != "udp" { + if !strings.HasSuffix(s, "\n") { + t.Error(s) + } else { + s = s[:len(s)-1] + } + } if s != "test_key:1|c" { t.Errorf("invalid output: %q", s) }