diff --git a/cmd/avalanche/avalanche.go b/cmd/avalanche/avalanche.go index 6cb3cde..6ebf68d 100644 --- a/cmd/avalanche/avalanche.go +++ b/cmd/avalanche/avalanche.go @@ -17,6 +17,7 @@ import ( "context" "fmt" "log" + "log/slog" "net/http" "os" "syscall" @@ -86,7 +87,7 @@ func main() { if writeCfg.URL != nil { ctx, cancel := context.WithCancel(context.Background()) g.Add(func() error { - if err := metrics.SendRemoteWrite(ctx, writeCfg, reg); err != nil { + if err := metrics.RunRemoteWriting(ctx, slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo})), writeCfg, reg); err != nil { return err } return nil // One-off. diff --git a/go.mod b/go.mod index 8e4b988..e958be4 100644 --- a/go.mod +++ b/go.mod @@ -1,14 +1,15 @@ module github.com/prometheus-community/avalanche -go 1.21 +go 1.22.0 + +toolchain go1.23.1 require ( - github.com/gogo/protobuf v1.3.2 - github.com/golang/snappy v0.0.4 github.com/google/go-cmp v0.6.0 github.com/nelkinda/health-go v0.0.1 github.com/oklog/run v1.1.0 - github.com/prometheus/client_golang v1.20.5 + github.com/prometheus/client_golang v1.20.6-0.20250117112434-e0800f53b498 + github.com/prometheus/client_golang/exp v0.0.0-20250227122456-ad23ad6d5468 github.com/prometheus/client_model v0.6.1 github.com/prometheus/common v0.62.0 github.com/prometheus/prometheus v0.53.1 @@ -18,16 +19,17 @@ require ( require ( github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect - github.com/alecthomas/units v0.0.0-20231202071711-9a357b53e9c9 // indirect + github.com/alecthomas/units v0.0.0-20240626203959-61d1e3462e30 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect - github.com/klauspost/compress v1.17.9 // indirect + github.com/gogo/protobuf v1.3.2 // indirect + github.com/klauspost/compress v1.17.11 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/nelkinda/http-go v0.0.1 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/prometheus/procfs v0.15.1 // indirect golang.org/x/sys v0.28.0 // indirect - google.golang.org/protobuf v1.36.1 // indirect + google.golang.org/protobuf v1.36.5 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 6456a76..2e35ae0 100644 --- a/go.sum +++ b/go.sum @@ -1,8 +1,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafoB+tBA3gMyHYHrpOtNuDiK/uB5uXxq5wM= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= -github.com/alecthomas/units v0.0.0-20231202071711-9a357b53e9c9 h1:ez/4by2iGztzR4L0zgAOR8lTQK9VlyBVVd7G4omaOQs= -github.com/alecthomas/units v0.0.0-20231202071711-9a357b53e9c9/go.mod h1:OMCwj8VM1Kc9e19TLln2VL61YJF0x1XFtfdL4JdbSyE= +github.com/alecthomas/units v0.0.0-20240626203959-61d1e3462e30 h1:t3eaIm0rUkzbrIewtiFmMK5RXHej2XnoXNhxVsAYUfg= +github.com/alecthomas/units v0.0.0-20240626203959-61d1e3462e30/go.mod h1:fvzegU4vN3H1qMT+8wDmzjAcDONcgo2/SZ/TyfdUOFs= github.com/antchfx/xmlquery v1.2.4/go.mod h1:KQQuESaxSlqugE2ZBcM/qn+ebIpt+d+4Xx7YcSGAIrM= github.com/antchfx/xpath v1.1.6/go.mod h1:Yee4kTMuNiPYJ7nSNorELQMr1J33uOpXDMByNYhvtNk= github.com/aslakhellesoy/gox v1.0.100/go.mod h1:AJl542QsKKG96COVsv0N74HHzVQgDIQPceVUh1aeU2M= @@ -52,8 +52,6 @@ github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= -github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= -github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= @@ -67,8 +65,8 @@ github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQL github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.9.5/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= -github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= -github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= +github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= @@ -100,8 +98,10 @@ github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/prometheus/client_golang v1.20.5 h1:cxppBPuYhUnsO6yo/aoRol4L7q7UFfdm+bR9r+8l63Y= -github.com/prometheus/client_golang v1.20.5/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/jReSnHgO035n//V5WE= +github.com/prometheus/client_golang v1.20.6-0.20250117112434-e0800f53b498 h1:pvxJNIN8Tr8FleTHltMExyfFF2GOFBcYbgggNDKvWDw= +github.com/prometheus/client_golang v1.20.6-0.20250117112434-e0800f53b498/go.mod h1:SJYWANZZtDbfu6CSYeDvRDkzhGX71yepDKePKDTya3M= +github.com/prometheus/client_golang/exp v0.0.0-20250227122456-ad23ad6d5468 h1:Lg8dMj9NGvHGy9ORtzyCkTvHpz1rsE78HT3S9W4ZC1U= +github.com/prometheus/client_golang/exp v0.0.0-20250227122456-ad23ad6d5468/go.mod h1:QdwnzTHLXXx636iZ1pfTiCI1Bn/b/20AgMqkPQr4xfA= github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E= github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY= github.com/prometheus/common v0.62.0 h1:xasJaQlnWAeyHdUBeGjXmutelfJHWMRr+Fg4QszZ2Io= @@ -122,10 +122,17 @@ github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3 github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= @@ -181,8 +188,8 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/protobuf v1.36.1 h1:yBPeRvTftaleIgM3PZ/WBIZ7XM/eEYAaEyCwvyjq/gk= -google.golang.org/protobuf v1.36.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM= +google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= @@ -194,5 +201,6 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EV gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/metrics/write.go b/metrics/write.go index ab7d1e8..a36fcbe 100644 --- a/metrics/write.go +++ b/metrics/write.go @@ -14,33 +14,26 @@ package metrics import ( - "bufio" - "bytes" "context" "crypto/tls" "fmt" - "io" "log" + "log/slog" "net/http" "net/url" "sort" "sync" "time" - "github.com/prometheus-community/avalanche/pkg/errors" - - "github.com/gogo/protobuf/proto" - "github.com/golang/snappy" + "github.com/prometheus/client_golang/exp/api/remote" "github.com/prometheus/client_golang/prometheus" dto "github.com/prometheus/client_model/go" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/prompb" "gopkg.in/alecthomas/kingpin.v2" -) -const maxErrMsgLen = 256 - -var userAgent = "avalanche" + "github.com/prometheus-community/avalanche/pkg/errors" +) // ConfigWrite for the remote write requests. type ConfigWrite struct { @@ -55,11 +48,12 @@ type ConfigWrite struct { TenantHeader string OutOfOrder bool Concurrency int + WriteV2 bool } func NewWriteConfigFromFlags(flagReg func(name, help string) *kingpin.FlagClause) *ConfigWrite { cfg := &ConfigWrite{} - flagReg("remote-url", "URL to send samples via remote_write API."). + flagReg("remote-url", "URL to send samples via remote_write API. By default, path is set to api/v1/write"). URLVar(&cfg.URL) flagReg("remote-concurrency-limit", "how many concurrent writes can happen at any given time").Default("20"). IntVar(&cfg.Concurrency) @@ -78,6 +72,8 @@ func NewWriteConfigFromFlags(flagReg func(name, help string) *kingpin.FlagClause // TODO(bwplotka): Make this a non-bool flag (e.g. out-of-order-min-time). flagReg("remote-out-of-order", "Enable out-of-order timestamps in remote write requests").Default("true"). BoolVar(&cfg.OutOfOrder) + flagReg("remote-write-v2", "Send remote write v2 format requests.").Default("false"). + BoolVar(&cfg.WriteV2) return cfg } @@ -99,30 +95,46 @@ func (c *ConfigWrite) Validate() error { return nil } -// Client for the remote write requests. -type Client struct { - client *http.Client - timeout time.Duration - config *ConfigWrite - gatherer prometheus.Gatherer +// Writer for remote write requests. +type Writer struct { + logger *slog.Logger + timeout time.Duration + config *ConfigWrite + gatherer prometheus.Gatherer + remoteAPI *remote.API } -// SendRemoteWrite initializes a http client and -// sends metrics to a prometheus compatible remote endpoint. -func SendRemoteWrite(ctx context.Context, cfg *ConfigWrite, gatherer prometheus.Gatherer) error { +// RunRemoteWriting initializes a http client and starts a Writer for remote writing metrics to a prometheus compatible remote endpoint. +func RunRemoteWriting(ctx context.Context, logger *slog.Logger, cfg *ConfigWrite, gatherer prometheus.Gatherer) error { var rt http.RoundTripper = &http.Transport{ TLSClientConfig: &cfg.TLSClientConfig, } rt = &tenantRoundTripper{tenant: cfg.Tenant, tenantHeader: cfg.TenantHeader, rt: rt} + rt = &userAgentRoundTripper{userAgent: "avalanche", rt: rt} httpClient := &http.Client{Transport: rt} - client := Client{ - client: httpClient, - timeout: time.Minute, - config: cfg, - gatherer: gatherer, + remoteAPI, err := remote.NewAPI( + cfg.URL.String(), + remote.WithAPIHTTPClient(httpClient), + remote.WithAPILogger(logger.With("component", "remote_write_api")), + ) + if err != nil { + return err + } + + writer := Writer{ + logger: logger, + timeout: time.Minute, + config: cfg, + gatherer: gatherer, + remoteAPI: remoteAPI, + } + + if cfg.WriteV2 { + return writer.writeV2(ctx) } - return client.write(ctx) + + return writer.write(ctx) } // Add the tenant ID header @@ -138,6 +150,17 @@ type tenantRoundTripper struct { rt http.RoundTripper } +type userAgentRoundTripper struct { + userAgent string + rt http.RoundTripper +} + +func (rt *userAgentRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + req = cloneRequest(req) + req.Header.Set("User-Agent", rt.userAgent) + return rt.rt.RoundTrip(req) +} + // cloneRequest returns a clone of the provided *http.Request. // The clone is a shallow copy of the struct and its Header map. func cloneRequest(r *http.Request) *http.Request { @@ -152,41 +175,41 @@ func cloneRequest(r *http.Request) *http.Request { return r2 } -func (c *Client) write(ctx context.Context) error { +func (w *Writer) write(ctx context.Context) error { select { // Wait for update first as write and collector.Run runs simultaneously. - case <-c.config.UpdateNotify: + case <-w.config.UpdateNotify: case <-ctx.Done(): return ctx.Err() } - tss, err := collectMetrics(c.gatherer, c.config.OutOfOrder) + tss, err := collectMetrics(w.gatherer, w.config.OutOfOrder) if err != nil { return err } var ( totalTime time.Duration - totalSamplesExp = len(tss) * c.config.RequestCount + totalSamplesExp = len(tss) * w.config.RequestCount totalSamplesAct int mtx sync.Mutex wgMetrics sync.WaitGroup merr = &errors.MultiError{} ) - shouldRunForever := c.config.RequestCount == -1 + shouldRunForever := w.config.RequestCount == -1 if shouldRunForever { log.Printf("Sending: %v timeseries infinitely, %v timeseries per request, %v delay between requests\n", - len(tss), c.config.BatchSize, c.config.RequestInterval) + len(tss), w.config.BatchSize, w.config.RequestInterval) } else { log.Printf("Sending: %v timeseries, %v times, %v timeseries per request, %v delay between requests\n", - len(tss), c.config.RequestCount, c.config.BatchSize, c.config.RequestInterval) + len(tss), w.config.RequestCount, w.config.BatchSize, w.config.RequestInterval) } - ticker := time.NewTicker(c.config.RequestInterval) + ticker := time.NewTicker(w.config.RequestInterval) defer ticker.Stop() - concurrencyLimitCh := make(chan struct{}, c.config.Concurrency) + concurrencyLimitCh := make(chan struct{}, w.config.Concurrency) for i := 0; ; { if ctx.Err() != nil { @@ -194,7 +217,7 @@ func (c *Client) write(ctx context.Context) error { } if !shouldRunForever { - if i >= c.config.RequestCount { + if i >= w.config.RequestCount { break } i++ @@ -202,9 +225,9 @@ func (c *Client) write(ctx context.Context) error { <-ticker.C select { - case <-c.config.UpdateNotify: + case <-w.config.UpdateNotify: log.Println("updating remote write metrics") - tss, err = collectMetrics(c.gatherer, c.config.OutOfOrder) + tss, err = collectMetrics(w.gatherer, w.config.OutOfOrder) if err != nil { merr.Add(err) } @@ -213,7 +236,7 @@ func (c *Client) write(ctx context.Context) error { } start := time.Now() - for i := 0; i < len(tss); i += c.config.BatchSize { + for i := 0; i < len(tss); i += w.config.BatchSize { wgMetrics.Add(1) concurrencyLimitCh <- struct{}{} go func(i int) { @@ -221,17 +244,20 @@ func (c *Client) write(ctx context.Context) error { <-concurrencyLimitCh }() defer wgMetrics.Done() - end := i + c.config.BatchSize + end := i + w.config.BatchSize if end > len(tss) { end = len(tss) } req := &prompb.WriteRequest{ Timeseries: tss[i:end], } - if err := c.Store(context.TODO(), req); err != nil { + + if _, err := w.remoteAPI.Write(ctx, remote.WriteV1MessageType, req); err != nil { merr.Add(err) + w.logger.Error("error writing metrics", "error", err) return } + mtx.Lock() totalSamplesAct += len(tss[i:end]) mtx.Unlock() @@ -244,10 +270,14 @@ func (c *Client) write(ctx context.Context) error { return merr.Err() } } - if c.config.RequestCount*len(tss) != totalSamplesAct { + if w.config.RequestCount*len(tss) != totalSamplesAct { merr.Add(fmt.Errorf("total samples mismatch, exp:%v , act:%v", totalSamplesExp, totalSamplesAct)) } - log.Printf("Total request time: %v ; Total samples: %v; Samples/sec: %v\n", totalTime.Round(time.Second), totalSamplesAct, int(float64(totalSamplesAct)/totalTime.Seconds())) + w.logger.Info("metrics summary", + "total_time", totalTime.Round(time.Second), + "total_samples", totalSamplesAct, + "samples_per_sec", int(float64(totalSamplesAct)/totalTime.Seconds()), + "errors", merr.Count()) return merr.Err() } @@ -335,45 +365,3 @@ func prompbLabels(name string, label []*dto.LabelPair) []prompb.Label { }) return ret } - -// Store sends a batch of samples to the HTTP endpoint. -func (c *Client) Store(ctx context.Context, req *prompb.WriteRequest) error { - data, err := proto.Marshal(req) - if err != nil { - return err - } - - compressed := snappy.Encode(nil, data) - httpReq, err := http.NewRequest("POST", c.config.URL.String(), bytes.NewReader(compressed)) - if err != nil { - // Errors from NewRequest are from unparseable URLs, so are not - // recoverable. - return err - } - httpReq.Header.Add("Content-Encoding", "snappy") - httpReq.Header.Set("Content-Type", "application/x-protobuf") - httpReq.Header.Set("User-Agent", userAgent) - httpReq.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0") - httpReq = httpReq.WithContext(ctx) - - ctx, cancel := context.WithTimeout(context.Background(), c.timeout) - defer cancel() - - httpResp, err := c.client.Do(httpReq.WithContext(ctx)) - if err != nil { - return err - } - defer httpResp.Body.Close() - - if httpResp.StatusCode/100 != 2 { - scanner := bufio.NewScanner(io.LimitReader(httpResp.Body, maxErrMsgLen)) - line := "" - if scanner.Scan() { - line = scanner.Text() - } - err = fmt.Errorf("server returned HTTP status %s: %s", httpResp.Status, line) - log.Println(err) - } - - return err -} diff --git a/metrics/writev2.go b/metrics/writev2.go new file mode 100644 index 0000000..1a401d2 --- /dev/null +++ b/metrics/writev2.go @@ -0,0 +1,209 @@ +// Copyright 2022 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metrics + +import ( + "context" + "fmt" + "log" + "sync" + "time" + + "github.com/prometheus/client_golang/exp/api/remote" + writev2 "github.com/prometheus/client_golang/exp/api/remote/genproto/v2" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/prometheus/common/model" + + "github.com/prometheus-community/avalanche/pkg/errors" +) + +func (w *Writer) writeV2(ctx context.Context) error { + select { + // Wait for update first as write and collector.Run runs simultaneously. + case <-w.config.UpdateNotify: + case <-ctx.Done(): + return ctx.Err() + } + + tss, st, err := collectMetricsV2(w.gatherer, w.config.OutOfOrder) + if err != nil { + return err + } + + var ( + totalTime time.Duration + totalSamplesExp = len(tss) * w.config.RequestCount + totalSamplesAct int + mtx sync.Mutex + wgMetrics sync.WaitGroup + merr = &errors.MultiError{} + ) + + shouldRunForever := w.config.RequestCount == -1 + if shouldRunForever { + log.Printf("Sending: %v timeseries infinitely, %v timeseries per request, %v delay between requests\n", + len(tss), w.config.BatchSize, w.config.RequestInterval) + } else { + log.Printf("Sending: %v timeseries, %v times, %v timeseries per request, %v delay between requests\n", + len(tss), w.config.RequestCount, w.config.BatchSize, w.config.RequestInterval) + } + + ticker := time.NewTicker(w.config.RequestInterval) + defer ticker.Stop() + + concurrencyLimitCh := make(chan struct{}, w.config.Concurrency) + + for i := 0; ; { + if ctx.Err() != nil { + return ctx.Err() + } + + if !shouldRunForever { + if i >= w.config.RequestCount { + break + } + i++ + } + + <-ticker.C + select { + case <-w.config.UpdateNotify: + log.Println("updating remote write metrics") + tss, st, err = collectMetricsV2(w.gatherer, w.config.OutOfOrder) + if err != nil { + merr.Add(err) + } + default: + tss = updateTimestampsV2(tss) + } + + start := time.Now() + for i := 0; i < len(tss); i += w.config.BatchSize { + wgMetrics.Add(1) + concurrencyLimitCh <- struct{}{} + go func(i int) { + defer func() { + <-concurrencyLimitCh + }() + defer wgMetrics.Done() + end := i + w.config.BatchSize + if end > len(tss) { + end = len(tss) + } + req := &writev2.Request{ + Timeseries: tss[i:end], + Symbols: st.Symbols(), // We pass full symbols table to each request for now + } + + if _, err := w.remoteAPI.Write(ctx, remote.WriteV2MessageType, req); err != nil { + merr.Add(err) + w.logger.Error("error writing metrics", "error", err) + return + } + + mtx.Lock() + totalSamplesAct += len(tss[i:end]) + mtx.Unlock() + }(i) + } + wgMetrics.Wait() + totalTime += time.Since(start) + if merr.Count() > 20 { + merr.Add(fmt.Errorf("too many errors")) + return merr.Err() + } + } + if w.config.RequestCount*len(tss) != totalSamplesAct { + merr.Add(fmt.Errorf("total samples mismatch, exp:%v , act:%v", totalSamplesExp, totalSamplesAct)) + } + w.logger.Info("metrics summary", + "total_time", totalTime.Round(time.Second), + "total_samples", totalSamplesAct, + "samples_per_sec", int(float64(totalSamplesAct)/totalTime.Seconds()), + "errors", merr.Count()) + return merr.Err() +} + +func updateTimestampsV2(tss []*writev2.TimeSeries) []*writev2.TimeSeries { + now := time.Now().UnixMilli() + for i := range tss { + tss[i].Samples[0].Timestamp = now + } + return tss +} + +func shuffleTimestampsV2(tss []*writev2.TimeSeries) []*writev2.TimeSeries { + now := time.Now().UnixMilli() + offsets := []int64{0, -60 * 1000, -5 * 60 * 1000} + for i := range tss { + offset := offsets[i%len(offsets)] + tss[i].Samples[0].Timestamp = now + offset + } + return tss +} + +func collectMetricsV2(gatherer prometheus.Gatherer, outOfOrder bool) ([]*writev2.TimeSeries, writev2.SymbolsTable, error) { + metricFamilies, err := gatherer.Gather() + if err != nil { + return nil, writev2.SymbolsTable{}, err + } + tss, st := ToTimeSeriesSliceV2(metricFamilies) + if outOfOrder { + tss = shuffleTimestampsV2(tss) + } + return tss, st, nil +} + +// ToTimeSeriesSliceV2 converts a slice of metricFamilies containing samples into a slice of writev2.TimeSeries. +func ToTimeSeriesSliceV2(metricFamilies []*dto.MetricFamily) ([]*writev2.TimeSeries, writev2.SymbolsTable) { + st := writev2.NewSymbolTable() + timestamp := int64(model.Now()) + tss := make([]*writev2.TimeSeries, 0, len(metricFamilies)*10) + + skippedSamples := 0 + for _, metricFamily := range metricFamilies { + for _, metric := range metricFamily.Metric { + labels := prompbLabels(*metricFamily.Name, metric.Label) + labelRefs := make([]uint32, 0, len(labels)) + for _, label := range labels { + labelRefs = append(labelRefs, st.Symbolize(label.Name)) + labelRefs = append(labelRefs, st.Symbolize(label.Value)) + } + ts := &writev2.TimeSeries{ + LabelsRefs: labelRefs, + } + switch *metricFamily.Type { + case dto.MetricType_COUNTER: + ts.Samples = []*writev2.Sample{{ + Value: *metric.Counter.Value, + Timestamp: timestamp, + }} + tss = append(tss, ts) + case dto.MetricType_GAUGE: + ts.Samples = []*writev2.Sample{{ + Value: *metric.Gauge.Value, + Timestamp: timestamp, + }} + tss = append(tss, ts) + default: + skippedSamples++ + } + } + } + if skippedSamples > 0 { + log.Printf("WARN: Skipping %v samples; sending only %v samples, given only gauge and counters are currently implemented\n", skippedSamples, len(tss)) + } + return tss, st +}