From d0ba7745901ff8b6746b3a42547bd772752aedb2 Mon Sep 17 00:00:00 2001 From: Saswata Mukherjee Date: Sun, 19 Jan 2025 13:56:25 +0000 Subject: [PATCH 1/7] Add ability to send remote write 2.0 requests Signed-off-by: Saswata Mukherjee --- cmd/avalanche/avalanche.go | 3 +- go.mod | 15 +- go.sum | 20 ++- metrics/write.go | 297 +++++++++++++++++++++++++++++-------- 4 files changed, 257 insertions(+), 78 deletions(-) diff --git a/cmd/avalanche/avalanche.go b/cmd/avalanche/avalanche.go index 6cb3cde..b0a5244 100644 --- a/cmd/avalanche/avalanche.go +++ b/cmd/avalanche/avalanche.go @@ -17,6 +17,7 @@ import ( "context" "fmt" "log" + "log/slog" "net/http" "os" "syscall" @@ -86,7 +87,7 @@ func main() { if writeCfg.URL != nil { ctx, cancel := context.WithCancel(context.Background()) g.Add(func() error { - if err := metrics.SendRemoteWrite(ctx, writeCfg, reg); err != nil { + if err := metrics.SendRemoteWrite(ctx, slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo})), writeCfg, reg); err != nil { return err } return nil // One-off. diff --git a/go.mod b/go.mod index 8e4b988..dfc30e6 100644 --- a/go.mod +++ b/go.mod @@ -1,14 +1,14 @@ module github.com/prometheus-community/avalanche -go 1.21 +go 1.22.0 + +toolchain go1.23.1 require ( - github.com/gogo/protobuf v1.3.2 - github.com/golang/snappy v0.0.4 github.com/google/go-cmp v0.6.0 github.com/nelkinda/health-go v0.0.1 github.com/oklog/run v1.1.0 - github.com/prometheus/client_golang v1.20.5 + github.com/prometheus/client_golang v1.20.6-0.20250117112434-e0800f53b498 github.com/prometheus/client_model v0.6.1 github.com/prometheus/common v0.62.0 github.com/prometheus/prometheus v0.53.1 @@ -18,11 +18,12 @@ require ( require ( github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect - github.com/alecthomas/units v0.0.0-20231202071711-9a357b53e9c9 // indirect + github.com/alecthomas/units v0.0.0-20240626203959-61d1e3462e30 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect - github.com/klauspost/compress v1.17.9 // indirect + github.com/gogo/protobuf v1.3.2 // indirect + github.com/klauspost/compress v1.17.11 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/nelkinda/http-go v0.0.1 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect @@ -31,3 +32,5 @@ require ( google.golang.org/protobuf v1.36.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) + +replace github.com/prometheus/client_golang => ../client_golang diff --git a/go.sum b/go.sum index 6456a76..08f5c3d 100644 --- a/go.sum +++ b/go.sum @@ -1,8 +1,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafoB+tBA3gMyHYHrpOtNuDiK/uB5uXxq5wM= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= -github.com/alecthomas/units v0.0.0-20231202071711-9a357b53e9c9 h1:ez/4by2iGztzR4L0zgAOR8lTQK9VlyBVVd7G4omaOQs= -github.com/alecthomas/units v0.0.0-20231202071711-9a357b53e9c9/go.mod h1:OMCwj8VM1Kc9e19TLln2VL61YJF0x1XFtfdL4JdbSyE= +github.com/alecthomas/units v0.0.0-20240626203959-61d1e3462e30 h1:t3eaIm0rUkzbrIewtiFmMK5RXHej2XnoXNhxVsAYUfg= +github.com/alecthomas/units v0.0.0-20240626203959-61d1e3462e30/go.mod h1:fvzegU4vN3H1qMT+8wDmzjAcDONcgo2/SZ/TyfdUOFs= github.com/antchfx/xmlquery v1.2.4/go.mod h1:KQQuESaxSlqugE2ZBcM/qn+ebIpt+d+4Xx7YcSGAIrM= github.com/antchfx/xpath v1.1.6/go.mod h1:Yee4kTMuNiPYJ7nSNorELQMr1J33uOpXDMByNYhvtNk= github.com/aslakhellesoy/gox v1.0.100/go.mod h1:AJl542QsKKG96COVsv0N74HHzVQgDIQPceVUh1aeU2M= @@ -52,8 +52,6 @@ github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= -github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= -github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= @@ -67,8 +65,8 @@ github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQL github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.9.5/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= -github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= -github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= +github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= @@ -100,8 +98,6 @@ github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/prometheus/client_golang v1.20.5 h1:cxppBPuYhUnsO6yo/aoRol4L7q7UFfdm+bR9r+8l63Y= -github.com/prometheus/client_golang v1.20.5/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/jReSnHgO035n//V5WE= github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E= github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY= github.com/prometheus/common v0.62.0 h1:xasJaQlnWAeyHdUBeGjXmutelfJHWMRr+Fg4QszZ2Io= @@ -122,10 +118,17 @@ github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3 github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= @@ -194,5 +197,6 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EV gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/metrics/write.go b/metrics/write.go index ab7d1e8..3eac182 100644 --- a/metrics/write.go +++ b/metrics/write.go @@ -14,13 +14,11 @@ package metrics import ( - "bufio" - "bytes" "context" "crypto/tls" "fmt" - "io" "log" + "log/slog" "net/http" "net/url" "sort" @@ -28,9 +26,9 @@ import ( "time" "github.com/prometheus-community/avalanche/pkg/errors" + "github.com/prometheus/client_golang/exp/api/remote" + writev2 "github.com/prometheus/client_golang/exp/api/remote/genproto/v2" - "github.com/gogo/protobuf/proto" - "github.com/golang/snappy" "github.com/prometheus/client_golang/prometheus" dto "github.com/prometheus/client_model/go" "github.com/prometheus/common/model" @@ -38,10 +36,6 @@ import ( "gopkg.in/alecthomas/kingpin.v2" ) -const maxErrMsgLen = 256 - -var userAgent = "avalanche" - // ConfigWrite for the remote write requests. type ConfigWrite struct { URL *url.URL @@ -55,11 +49,12 @@ type ConfigWrite struct { TenantHeader string OutOfOrder bool Concurrency int + WriteV2 bool } func NewWriteConfigFromFlags(flagReg func(name, help string) *kingpin.FlagClause) *ConfigWrite { cfg := &ConfigWrite{} - flagReg("remote-url", "URL to send samples via remote_write API."). + flagReg("remote-url", "URL to send samples via remote_write API. By default, path is set to api/v1/write"). URLVar(&cfg.URL) flagReg("remote-concurrency-limit", "how many concurrent writes can happen at any given time").Default("20"). IntVar(&cfg.Concurrency) @@ -78,6 +73,8 @@ func NewWriteConfigFromFlags(flagReg func(name, help string) *kingpin.FlagClause // TODO(bwplotka): Make this a non-bool flag (e.g. out-of-order-min-time). flagReg("remote-out-of-order", "Enable out-of-order timestamps in remote write requests").Default("true"). BoolVar(&cfg.OutOfOrder) + flagReg("remote-write-v2", "Send remote write v2 format requests.").Default("false"). + BoolVar(&cfg.WriteV2) return cfg } @@ -101,27 +98,46 @@ func (c *ConfigWrite) Validate() error { // Client for the remote write requests. type Client struct { - client *http.Client - timeout time.Duration - config *ConfigWrite - gatherer prometheus.Gatherer + client *http.Client + logger *slog.Logger + timeout time.Duration + config *ConfigWrite + gatherer prometheus.Gatherer + remoteAPI *remote.API } // SendRemoteWrite initializes a http client and // sends metrics to a prometheus compatible remote endpoint. -func SendRemoteWrite(ctx context.Context, cfg *ConfigWrite, gatherer prometheus.Gatherer) error { +func SendRemoteWrite(ctx context.Context, logger *slog.Logger, cfg *ConfigWrite, gatherer prometheus.Gatherer) error { var rt http.RoundTripper = &http.Transport{ TLSClientConfig: &cfg.TLSClientConfig, } rt = &tenantRoundTripper{tenant: cfg.Tenant, tenantHeader: cfg.TenantHeader, rt: rt} + rt = &userAgentRoundTripper{userAgent: "avalanche", rt: rt} httpClient := &http.Client{Transport: rt} + remoteAPI, err := remote.NewAPI( + httpClient, + cfg.URL.String(), + remote.WithAPILogger(logger.With("component", "remote_write_api")), + ) + if err != nil { + return err + } + client := Client{ - client: httpClient, - timeout: time.Minute, - config: cfg, - gatherer: gatherer, + client: httpClient, + logger: logger, + timeout: time.Minute, + config: cfg, + gatherer: gatherer, + remoteAPI: remoteAPI, + } + + if cfg.WriteV2 { + return client.writeV2(ctx) } + return client.write(ctx) } @@ -138,6 +154,18 @@ type tenantRoundTripper struct { rt http.RoundTripper } +// User agent round tripper +type userAgentRoundTripper struct { + userAgent string + rt http.RoundTripper +} + +func (rt *userAgentRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + req = cloneRequest(req) + req.Header.Set("User-Agent", rt.userAgent) + return rt.rt.RoundTrip(req) +} + // cloneRequest returns a clone of the provided *http.Request. // The clone is a shallow copy of the struct and its Header map. func cloneRequest(r *http.Request) *http.Request { @@ -152,6 +180,113 @@ func cloneRequest(r *http.Request) *http.Request { return r2 } +func (c *Client) writeV2(ctx context.Context) error { + select { + // Wait for update first as write and collector.Run runs simultaneously. + case <-c.config.UpdateNotify: + case <-ctx.Done(): + return ctx.Err() + } + + tss, st, err := collectMetricsV2(c.gatherer, c.config.OutOfOrder) + if err != nil { + return err + } + + var ( + totalTime time.Duration + totalSamplesExp = len(tss) * c.config.RequestCount + totalSamplesAct int + mtx sync.Mutex + wgMetrics sync.WaitGroup + merr = &errors.MultiError{} + ) + + shouldRunForever := c.config.RequestCount == -1 + if shouldRunForever { + log.Printf("Sending: %v timeseries infinitely, %v timeseries per request, %v delay between requests\n", + len(tss), c.config.BatchSize, c.config.RequestInterval) + } else { + log.Printf("Sending: %v timeseries, %v times, %v timeseries per request, %v delay between requests\n", + len(tss), c.config.RequestCount, c.config.BatchSize, c.config.RequestInterval) + } + + ticker := time.NewTicker(c.config.RequestInterval) + defer ticker.Stop() + + concurrencyLimitCh := make(chan struct{}, c.config.Concurrency) + + for i := 0; ; { + if ctx.Err() != nil { + return ctx.Err() + } + + if !shouldRunForever { + if i >= c.config.RequestCount { + break + } + i++ + } + + <-ticker.C + select { + case <-c.config.UpdateNotify: + log.Println("updating remote write metrics") + tss, st, err = collectMetricsV2(c.gatherer, c.config.OutOfOrder) + if err != nil { + merr.Add(err) + } + default: + tss = updateTimestampsV2(tss) + } + + start := time.Now() + for i := 0; i < len(tss); i += c.config.BatchSize { + wgMetrics.Add(1) + concurrencyLimitCh <- struct{}{} + go func(i int) { + defer func() { + <-concurrencyLimitCh + }() + defer wgMetrics.Done() + end := i + c.config.BatchSize + if end > len(tss) { + end = len(tss) + } + req := &writev2.Request{ + Timeseries: tss[i:end], + Symbols: st.Symbols(), // We pass full symbols table to each request for now + } + + if _, err := c.remoteAPI.Write(ctx, req); err != nil { + merr.Add(err) + c.logger.Error("error writing metrics", "error", err) + return + } + + mtx.Lock() + totalSamplesAct += len(tss[i:end]) + mtx.Unlock() + }(i) + } + wgMetrics.Wait() + totalTime += time.Since(start) + if merr.Count() > 20 { + merr.Add(fmt.Errorf("too many errors")) + return merr.Err() + } + } + if c.config.RequestCount*len(tss) != totalSamplesAct { + merr.Add(fmt.Errorf("total samples mismatch, exp:%v , act:%v", totalSamplesExp, totalSamplesAct)) + } + c.logger.Info("metrics summary", + "total_time", totalTime.Round(time.Second), + "total_samples", totalSamplesAct, + "samples_per_sec", int(float64(totalSamplesAct)/totalTime.Seconds()), + "errors", merr.Count()) + return merr.Err() +} + func (c *Client) write(ctx context.Context) error { select { // Wait for update first as write and collector.Run runs simultaneously. @@ -228,10 +363,13 @@ func (c *Client) write(ctx context.Context) error { req := &prompb.WriteRequest{ Timeseries: tss[i:end], } - if err := c.Store(context.TODO(), req); err != nil { + + if _, err := c.remoteAPI.Write(ctx, req); err != nil { merr.Add(err) + c.logger.Error("error writing metrics", "error", err) return } + mtx.Lock() totalSamplesAct += len(tss[i:end]) mtx.Unlock() @@ -247,10 +385,22 @@ func (c *Client) write(ctx context.Context) error { if c.config.RequestCount*len(tss) != totalSamplesAct { merr.Add(fmt.Errorf("total samples mismatch, exp:%v , act:%v", totalSamplesExp, totalSamplesAct)) } - log.Printf("Total request time: %v ; Total samples: %v; Samples/sec: %v\n", totalTime.Round(time.Second), totalSamplesAct, int(float64(totalSamplesAct)/totalTime.Seconds())) + c.logger.Info("metrics summary", + "total_time", totalTime.Round(time.Second), + "total_samples", totalSamplesAct, + "samples_per_sec", int(float64(totalSamplesAct)/totalTime.Seconds()), + "errors", merr.Count()) return merr.Err() } +func updateTimestampsV2(tss []*writev2.TimeSeries) []*writev2.TimeSeries { + now := time.Now().UnixMilli() + for i := range tss { + tss[i].Samples[0].Timestamp = now + } + return tss +} + func updateTimetamps(tss []prompb.TimeSeries) []prompb.TimeSeries { t := int64(model.Now()) for i := range tss { @@ -259,6 +409,18 @@ func updateTimetamps(tss []prompb.TimeSeries) []prompb.TimeSeries { return tss } +func collectMetricsV2(gatherer prometheus.Gatherer, outOfOrder bool) ([]*writev2.TimeSeries, writev2.SymbolsTable, error) { + metricFamilies, err := gatherer.Gather() + if err != nil { + return nil, writev2.SymbolsTable{}, err + } + tss, st := ToTimeSeriesSliceV2(metricFamilies) + if outOfOrder { + tss = shuffleTimestampsV2(tss) + } + return tss, st, nil +} + func collectMetrics(gatherer prometheus.Gatherer, outOfOrder bool) ([]prompb.TimeSeries, error) { metricFamilies, err := gatherer.Gather() if err != nil { @@ -271,6 +433,16 @@ func collectMetrics(gatherer prometheus.Gatherer, outOfOrder bool) ([]prompb.Tim return tss, nil } +func shuffleTimestampsV2(tss []*writev2.TimeSeries) []*writev2.TimeSeries { + now := time.Now().UnixMilli() + offsets := []int64{0, -60 * 1000, -5 * 60 * 1000} + for i := range tss { + offset := offsets[i%len(offsets)] + tss[i].Samples[0].Timestamp = now + offset + } + return tss +} + func shuffleTimestamps(tss []prompb.TimeSeries) []prompb.TimeSeries { now := time.Now().UnixMilli() offsets := []int64{0, -60 * 1000, -5 * 60 * 1000} @@ -318,6 +490,47 @@ func ToTimeSeriesSlice(metricFamilies []*dto.MetricFamily) []prompb.TimeSeries { return tss } +func ToTimeSeriesSliceV2(metricFamilies []*dto.MetricFamily) ([]*writev2.TimeSeries, writev2.SymbolsTable) { + st := writev2.NewSymbolTable() + timestamp := int64(model.Now()) + tss := make([]*writev2.TimeSeries, 0, len(metricFamilies)*10) + + skippedSamples := 0 + for _, metricFamily := range metricFamilies { + for _, metric := range metricFamily.Metric { + labels := prompbLabels(*metricFamily.Name, metric.Label) + labelRefs := make([]uint32, 0, len(labels)) + for _, label := range labels { + labelRefs = append(labelRefs, st.Symbolize(label.Name)) + labelRefs = append(labelRefs, st.Symbolize(label.Value)) + } + ts := &writev2.TimeSeries{ + LabelsRefs: labelRefs, + } + switch *metricFamily.Type { + case dto.MetricType_COUNTER: + ts.Samples = []*writev2.Sample{{ + Value: *metric.Counter.Value, + Timestamp: timestamp, + }} + tss = append(tss, ts) + case dto.MetricType_GAUGE: + ts.Samples = []*writev2.Sample{{ + Value: *metric.Gauge.Value, + Timestamp: timestamp, + }} + tss = append(tss, ts) + default: + skippedSamples++ + } + } + } + if skippedSamples > 0 { + log.Printf("WARN: Skipping %v samples; sending only %v samples, given only gauge and counters are currently implemented\n", skippedSamples, len(tss)) + } + return tss, st +} + func prompbLabels(name string, label []*dto.LabelPair) []prompb.Label { ret := make([]prompb.Label, 0, len(label)+1) ret = append(ret, prompb.Label{ @@ -335,45 +548,3 @@ func prompbLabels(name string, label []*dto.LabelPair) []prompb.Label { }) return ret } - -// Store sends a batch of samples to the HTTP endpoint. -func (c *Client) Store(ctx context.Context, req *prompb.WriteRequest) error { - data, err := proto.Marshal(req) - if err != nil { - return err - } - - compressed := snappy.Encode(nil, data) - httpReq, err := http.NewRequest("POST", c.config.URL.String(), bytes.NewReader(compressed)) - if err != nil { - // Errors from NewRequest are from unparseable URLs, so are not - // recoverable. - return err - } - httpReq.Header.Add("Content-Encoding", "snappy") - httpReq.Header.Set("Content-Type", "application/x-protobuf") - httpReq.Header.Set("User-Agent", userAgent) - httpReq.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0") - httpReq = httpReq.WithContext(ctx) - - ctx, cancel := context.WithTimeout(context.Background(), c.timeout) - defer cancel() - - httpResp, err := c.client.Do(httpReq.WithContext(ctx)) - if err != nil { - return err - } - defer httpResp.Body.Close() - - if httpResp.StatusCode/100 != 2 { - scanner := bufio.NewScanner(io.LimitReader(httpResp.Body, maxErrMsgLen)) - line := "" - if scanner.Scan() { - line = scanner.Text() - } - err = fmt.Errorf("server returned HTTP status %s: %s", httpResp.Status, line) - log.Println(err) - } - - return err -} From dc00280912458bda22ece1b447ed65b8a467c2cd Mon Sep 17 00:00:00 2001 From: Saswata Mukherjee Date: Mon, 3 Feb 2025 07:36:54 +0000 Subject: [PATCH 2/7] Use exp package, don't use generics, split into separate file Signed-off-by: Saswata Mukherjee --- go.mod | 3 +- go.sum | 4 + metrics/write.go | 179 --------------------------------------- metrics/writev2.go | 207 +++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 212 insertions(+), 181 deletions(-) create mode 100644 metrics/writev2.go diff --git a/go.mod b/go.mod index dfc30e6..34c4d6c 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/nelkinda/health-go v0.0.1 github.com/oklog/run v1.1.0 github.com/prometheus/client_golang v1.20.6-0.20250117112434-e0800f53b498 + github.com/prometheus/client_golang/exp v0.0.0-20250131124507-433de6814d4b github.com/prometheus/client_model v0.6.1 github.com/prometheus/common v0.62.0 github.com/prometheus/prometheus v0.53.1 @@ -32,5 +33,3 @@ require ( google.golang.org/protobuf v1.36.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) - -replace github.com/prometheus/client_golang => ../client_golang diff --git a/go.sum b/go.sum index 08f5c3d..da21c8e 100644 --- a/go.sum +++ b/go.sum @@ -98,6 +98,10 @@ github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v1.20.6-0.20250117112434-e0800f53b498 h1:pvxJNIN8Tr8FleTHltMExyfFF2GOFBcYbgggNDKvWDw= +github.com/prometheus/client_golang v1.20.6-0.20250117112434-e0800f53b498/go.mod h1:SJYWANZZtDbfu6CSYeDvRDkzhGX71yepDKePKDTya3M= +github.com/prometheus/client_golang/exp v0.0.0-20250131124507-433de6814d4b h1:SeZR11qcDl6QeY27jlrC3dNggRyImBlKw3g2g4GJ7LA= +github.com/prometheus/client_golang/exp v0.0.0-20250131124507-433de6814d4b/go.mod h1:0HIB0F0XkTu3oZ6jBaF7CMLznKdaOuFCTOq8lxgsN9Q= github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E= github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY= github.com/prometheus/common v0.62.0 h1:xasJaQlnWAeyHdUBeGjXmutelfJHWMRr+Fg4QszZ2Io= diff --git a/metrics/write.go b/metrics/write.go index 3eac182..0654daf 100644 --- a/metrics/write.go +++ b/metrics/write.go @@ -27,7 +27,6 @@ import ( "github.com/prometheus-community/avalanche/pkg/errors" "github.com/prometheus/client_golang/exp/api/remote" - writev2 "github.com/prometheus/client_golang/exp/api/remote/genproto/v2" "github.com/prometheus/client_golang/prometheus" dto "github.com/prometheus/client_model/go" @@ -180,113 +179,6 @@ func cloneRequest(r *http.Request) *http.Request { return r2 } -func (c *Client) writeV2(ctx context.Context) error { - select { - // Wait for update first as write and collector.Run runs simultaneously. - case <-c.config.UpdateNotify: - case <-ctx.Done(): - return ctx.Err() - } - - tss, st, err := collectMetricsV2(c.gatherer, c.config.OutOfOrder) - if err != nil { - return err - } - - var ( - totalTime time.Duration - totalSamplesExp = len(tss) * c.config.RequestCount - totalSamplesAct int - mtx sync.Mutex - wgMetrics sync.WaitGroup - merr = &errors.MultiError{} - ) - - shouldRunForever := c.config.RequestCount == -1 - if shouldRunForever { - log.Printf("Sending: %v timeseries infinitely, %v timeseries per request, %v delay between requests\n", - len(tss), c.config.BatchSize, c.config.RequestInterval) - } else { - log.Printf("Sending: %v timeseries, %v times, %v timeseries per request, %v delay between requests\n", - len(tss), c.config.RequestCount, c.config.BatchSize, c.config.RequestInterval) - } - - ticker := time.NewTicker(c.config.RequestInterval) - defer ticker.Stop() - - concurrencyLimitCh := make(chan struct{}, c.config.Concurrency) - - for i := 0; ; { - if ctx.Err() != nil { - return ctx.Err() - } - - if !shouldRunForever { - if i >= c.config.RequestCount { - break - } - i++ - } - - <-ticker.C - select { - case <-c.config.UpdateNotify: - log.Println("updating remote write metrics") - tss, st, err = collectMetricsV2(c.gatherer, c.config.OutOfOrder) - if err != nil { - merr.Add(err) - } - default: - tss = updateTimestampsV2(tss) - } - - start := time.Now() - for i := 0; i < len(tss); i += c.config.BatchSize { - wgMetrics.Add(1) - concurrencyLimitCh <- struct{}{} - go func(i int) { - defer func() { - <-concurrencyLimitCh - }() - defer wgMetrics.Done() - end := i + c.config.BatchSize - if end > len(tss) { - end = len(tss) - } - req := &writev2.Request{ - Timeseries: tss[i:end], - Symbols: st.Symbols(), // We pass full symbols table to each request for now - } - - if _, err := c.remoteAPI.Write(ctx, req); err != nil { - merr.Add(err) - c.logger.Error("error writing metrics", "error", err) - return - } - - mtx.Lock() - totalSamplesAct += len(tss[i:end]) - mtx.Unlock() - }(i) - } - wgMetrics.Wait() - totalTime += time.Since(start) - if merr.Count() > 20 { - merr.Add(fmt.Errorf("too many errors")) - return merr.Err() - } - } - if c.config.RequestCount*len(tss) != totalSamplesAct { - merr.Add(fmt.Errorf("total samples mismatch, exp:%v , act:%v", totalSamplesExp, totalSamplesAct)) - } - c.logger.Info("metrics summary", - "total_time", totalTime.Round(time.Second), - "total_samples", totalSamplesAct, - "samples_per_sec", int(float64(totalSamplesAct)/totalTime.Seconds()), - "errors", merr.Count()) - return merr.Err() -} - func (c *Client) write(ctx context.Context) error { select { // Wait for update first as write and collector.Run runs simultaneously. @@ -393,14 +285,6 @@ func (c *Client) write(ctx context.Context) error { return merr.Err() } -func updateTimestampsV2(tss []*writev2.TimeSeries) []*writev2.TimeSeries { - now := time.Now().UnixMilli() - for i := range tss { - tss[i].Samples[0].Timestamp = now - } - return tss -} - func updateTimetamps(tss []prompb.TimeSeries) []prompb.TimeSeries { t := int64(model.Now()) for i := range tss { @@ -409,18 +293,6 @@ func updateTimetamps(tss []prompb.TimeSeries) []prompb.TimeSeries { return tss } -func collectMetricsV2(gatherer prometheus.Gatherer, outOfOrder bool) ([]*writev2.TimeSeries, writev2.SymbolsTable, error) { - metricFamilies, err := gatherer.Gather() - if err != nil { - return nil, writev2.SymbolsTable{}, err - } - tss, st := ToTimeSeriesSliceV2(metricFamilies) - if outOfOrder { - tss = shuffleTimestampsV2(tss) - } - return tss, st, nil -} - func collectMetrics(gatherer prometheus.Gatherer, outOfOrder bool) ([]prompb.TimeSeries, error) { metricFamilies, err := gatherer.Gather() if err != nil { @@ -433,16 +305,6 @@ func collectMetrics(gatherer prometheus.Gatherer, outOfOrder bool) ([]prompb.Tim return tss, nil } -func shuffleTimestampsV2(tss []*writev2.TimeSeries) []*writev2.TimeSeries { - now := time.Now().UnixMilli() - offsets := []int64{0, -60 * 1000, -5 * 60 * 1000} - for i := range tss { - offset := offsets[i%len(offsets)] - tss[i].Samples[0].Timestamp = now + offset - } - return tss -} - func shuffleTimestamps(tss []prompb.TimeSeries) []prompb.TimeSeries { now := time.Now().UnixMilli() offsets := []int64{0, -60 * 1000, -5 * 60 * 1000} @@ -490,47 +352,6 @@ func ToTimeSeriesSlice(metricFamilies []*dto.MetricFamily) []prompb.TimeSeries { return tss } -func ToTimeSeriesSliceV2(metricFamilies []*dto.MetricFamily) ([]*writev2.TimeSeries, writev2.SymbolsTable) { - st := writev2.NewSymbolTable() - timestamp := int64(model.Now()) - tss := make([]*writev2.TimeSeries, 0, len(metricFamilies)*10) - - skippedSamples := 0 - for _, metricFamily := range metricFamilies { - for _, metric := range metricFamily.Metric { - labels := prompbLabels(*metricFamily.Name, metric.Label) - labelRefs := make([]uint32, 0, len(labels)) - for _, label := range labels { - labelRefs = append(labelRefs, st.Symbolize(label.Name)) - labelRefs = append(labelRefs, st.Symbolize(label.Value)) - } - ts := &writev2.TimeSeries{ - LabelsRefs: labelRefs, - } - switch *metricFamily.Type { - case dto.MetricType_COUNTER: - ts.Samples = []*writev2.Sample{{ - Value: *metric.Counter.Value, - Timestamp: timestamp, - }} - tss = append(tss, ts) - case dto.MetricType_GAUGE: - ts.Samples = []*writev2.Sample{{ - Value: *metric.Gauge.Value, - Timestamp: timestamp, - }} - tss = append(tss, ts) - default: - skippedSamples++ - } - } - } - if skippedSamples > 0 { - log.Printf("WARN: Skipping %v samples; sending only %v samples, given only gauge and counters are currently implemented\n", skippedSamples, len(tss)) - } - return tss, st -} - func prompbLabels(name string, label []*dto.LabelPair) []prompb.Label { ret := make([]prompb.Label, 0, len(label)+1) ret = append(ret, prompb.Label{ diff --git a/metrics/writev2.go b/metrics/writev2.go new file mode 100644 index 0000000..c4880fd --- /dev/null +++ b/metrics/writev2.go @@ -0,0 +1,207 @@ +// Copyright 2022 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metrics + +import ( + "context" + "fmt" + "log" + "sync" + "time" + + "github.com/prometheus-community/avalanche/pkg/errors" + writev2 "github.com/prometheus/client_golang/exp/api/remote/genproto/v2" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/prometheus/common/model" +) + +func (c *Client) writeV2(ctx context.Context) error { + select { + // Wait for update first as write and collector.Run runs simultaneously. + case <-c.config.UpdateNotify: + case <-ctx.Done(): + return ctx.Err() + } + + tss, st, err := collectMetricsV2(c.gatherer, c.config.OutOfOrder) + if err != nil { + return err + } + + var ( + totalTime time.Duration + totalSamplesExp = len(tss) * c.config.RequestCount + totalSamplesAct int + mtx sync.Mutex + wgMetrics sync.WaitGroup + merr = &errors.MultiError{} + ) + + shouldRunForever := c.config.RequestCount == -1 + if shouldRunForever { + log.Printf("Sending: %v timeseries infinitely, %v timeseries per request, %v delay between requests\n", + len(tss), c.config.BatchSize, c.config.RequestInterval) + } else { + log.Printf("Sending: %v timeseries, %v times, %v timeseries per request, %v delay between requests\n", + len(tss), c.config.RequestCount, c.config.BatchSize, c.config.RequestInterval) + } + + ticker := time.NewTicker(c.config.RequestInterval) + defer ticker.Stop() + + concurrencyLimitCh := make(chan struct{}, c.config.Concurrency) + + for i := 0; ; { + if ctx.Err() != nil { + return ctx.Err() + } + + if !shouldRunForever { + if i >= c.config.RequestCount { + break + } + i++ + } + + <-ticker.C + select { + case <-c.config.UpdateNotify: + log.Println("updating remote write metrics") + tss, st, err = collectMetricsV2(c.gatherer, c.config.OutOfOrder) + if err != nil { + merr.Add(err) + } + default: + tss = updateTimestampsV2(tss) + } + + start := time.Now() + for i := 0; i < len(tss); i += c.config.BatchSize { + wgMetrics.Add(1) + concurrencyLimitCh <- struct{}{} + go func(i int) { + defer func() { + <-concurrencyLimitCh + }() + defer wgMetrics.Done() + end := i + c.config.BatchSize + if end > len(tss) { + end = len(tss) + } + req := &writev2.Request{ + Timeseries: tss[i:end], + Symbols: st.Symbols(), // We pass full symbols table to each request for now + } + + if _, err := c.remoteAPI.Write(ctx, req); err != nil { + merr.Add(err) + c.logger.Error("error writing metrics", "error", err) + return + } + + mtx.Lock() + totalSamplesAct += len(tss[i:end]) + mtx.Unlock() + }(i) + } + wgMetrics.Wait() + totalTime += time.Since(start) + if merr.Count() > 20 { + merr.Add(fmt.Errorf("too many errors")) + return merr.Err() + } + } + if c.config.RequestCount*len(tss) != totalSamplesAct { + merr.Add(fmt.Errorf("total samples mismatch, exp:%v , act:%v", totalSamplesExp, totalSamplesAct)) + } + c.logger.Info("metrics summary", + "total_time", totalTime.Round(time.Second), + "total_samples", totalSamplesAct, + "samples_per_sec", int(float64(totalSamplesAct)/totalTime.Seconds()), + "errors", merr.Count()) + return merr.Err() +} + +func updateTimestampsV2(tss []*writev2.TimeSeries) []*writev2.TimeSeries { + now := time.Now().UnixMilli() + for i := range tss { + tss[i].Samples[0].Timestamp = now + } + return tss +} + +func shuffleTimestampsV2(tss []*writev2.TimeSeries) []*writev2.TimeSeries { + now := time.Now().UnixMilli() + offsets := []int64{0, -60 * 1000, -5 * 60 * 1000} + for i := range tss { + offset := offsets[i%len(offsets)] + tss[i].Samples[0].Timestamp = now + offset + } + return tss +} + +func collectMetricsV2(gatherer prometheus.Gatherer, outOfOrder bool) ([]*writev2.TimeSeries, writev2.SymbolsTable, error) { + metricFamilies, err := gatherer.Gather() + if err != nil { + return nil, writev2.SymbolsTable{}, err + } + tss, st := ToTimeSeriesSliceV2(metricFamilies) + if outOfOrder { + tss = shuffleTimestampsV2(tss) + } + return tss, st, nil +} + +// ToTimeSeriesSliceV2 converts a slice of metricFamilies containing samples into a slice of writev2.TimeSeries. +func ToTimeSeriesSliceV2(metricFamilies []*dto.MetricFamily) ([]*writev2.TimeSeries, writev2.SymbolsTable) { + st := writev2.NewSymbolTable() + timestamp := int64(model.Now()) + tss := make([]*writev2.TimeSeries, 0, len(metricFamilies)*10) + + skippedSamples := 0 + for _, metricFamily := range metricFamilies { + for _, metric := range metricFamily.Metric { + labels := prompbLabels(*metricFamily.Name, metric.Label) + labelRefs := make([]uint32, 0, len(labels)) + for _, label := range labels { + labelRefs = append(labelRefs, st.Symbolize(label.Name)) + labelRefs = append(labelRefs, st.Symbolize(label.Value)) + } + ts := &writev2.TimeSeries{ + LabelsRefs: labelRefs, + } + switch *metricFamily.Type { + case dto.MetricType_COUNTER: + ts.Samples = []*writev2.Sample{{ + Value: *metric.Counter.Value, + Timestamp: timestamp, + }} + tss = append(tss, ts) + case dto.MetricType_GAUGE: + ts.Samples = []*writev2.Sample{{ + Value: *metric.Gauge.Value, + Timestamp: timestamp, + }} + tss = append(tss, ts) + default: + skippedSamples++ + } + } + } + if skippedSamples > 0 { + log.Printf("WARN: Skipping %v samples; sending only %v samples, given only gauge and counters are currently implemented\n", skippedSamples, len(tss)) + } + return tss, st +} From 752f8cc2acd63cfd6e8cce55fba970de7636ce8f Mon Sep 17 00:00:00 2001 From: Saswata Mukherjee Date: Mon, 3 Feb 2025 07:40:03 +0000 Subject: [PATCH 3/7] Lint fix Signed-off-by: Saswata Mukherjee --- metrics/write.go | 4 ++-- metrics/writev2.go | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/metrics/write.go b/metrics/write.go index 0654daf..580ca1e 100644 --- a/metrics/write.go +++ b/metrics/write.go @@ -25,14 +25,14 @@ import ( "sync" "time" - "github.com/prometheus-community/avalanche/pkg/errors" "github.com/prometheus/client_golang/exp/api/remote" - "github.com/prometheus/client_golang/prometheus" dto "github.com/prometheus/client_model/go" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/prompb" "gopkg.in/alecthomas/kingpin.v2" + + "github.com/prometheus-community/avalanche/pkg/errors" ) // ConfigWrite for the remote write requests. diff --git a/metrics/writev2.go b/metrics/writev2.go index c4880fd..7a2dfdc 100644 --- a/metrics/writev2.go +++ b/metrics/writev2.go @@ -20,11 +20,12 @@ import ( "sync" "time" - "github.com/prometheus-community/avalanche/pkg/errors" writev2 "github.com/prometheus/client_golang/exp/api/remote/genproto/v2" "github.com/prometheus/client_golang/prometheus" dto "github.com/prometheus/client_model/go" "github.com/prometheus/common/model" + + "github.com/prometheus-community/avalanche/pkg/errors" ) func (c *Client) writeV2(ctx context.Context) error { From 9fb82c160eaf04c5075b7cea58c03b25bfe0f64d Mon Sep 17 00:00:00 2001 From: Saswata Mukherjee Date: Mon, 24 Feb 2025 08:04:46 +0000 Subject: [PATCH 4/7] Rm unused client Signed-off-by: Saswata Mukherjee --- metrics/write.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/metrics/write.go b/metrics/write.go index 580ca1e..7479c7a 100644 --- a/metrics/write.go +++ b/metrics/write.go @@ -97,7 +97,6 @@ func (c *ConfigWrite) Validate() error { // Client for the remote write requests. type Client struct { - client *http.Client logger *slog.Logger timeout time.Duration config *ConfigWrite @@ -125,7 +124,6 @@ func SendRemoteWrite(ctx context.Context, logger *slog.Logger, cfg *ConfigWrite, } client := Client{ - client: httpClient, logger: logger, timeout: time.Minute, config: cfg, From dab4dae69eafca9f513f03a0702e6e6c4e35db9b Mon Sep 17 00:00:00 2001 From: Saswata Mukherjee Date: Tue, 25 Feb 2025 16:42:54 +0000 Subject: [PATCH 5/7] Update exp Signed-off-by: Saswata Mukherjee --- go.mod | 4 ++-- go.sum | 8 ++++---- metrics/write.go | 4 ++-- metrics/writev2.go | 3 ++- 4 files changed, 10 insertions(+), 9 deletions(-) diff --git a/go.mod b/go.mod index 34c4d6c..0e20c04 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ require ( github.com/nelkinda/health-go v0.0.1 github.com/oklog/run v1.1.0 github.com/prometheus/client_golang v1.20.6-0.20250117112434-e0800f53b498 - github.com/prometheus/client_golang/exp v0.0.0-20250131124507-433de6814d4b + github.com/prometheus/client_golang/exp v0.0.0-20250225163354-248c3f7f612b github.com/prometheus/client_model v0.6.1 github.com/prometheus/common v0.62.0 github.com/prometheus/prometheus v0.53.1 @@ -30,6 +30,6 @@ require ( github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/prometheus/procfs v0.15.1 // indirect golang.org/x/sys v0.28.0 // indirect - google.golang.org/protobuf v1.36.1 // indirect + google.golang.org/protobuf v1.36.5 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index da21c8e..27b3c74 100644 --- a/go.sum +++ b/go.sum @@ -100,8 +100,8 @@ github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRI github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_golang v1.20.6-0.20250117112434-e0800f53b498 h1:pvxJNIN8Tr8FleTHltMExyfFF2GOFBcYbgggNDKvWDw= github.com/prometheus/client_golang v1.20.6-0.20250117112434-e0800f53b498/go.mod h1:SJYWANZZtDbfu6CSYeDvRDkzhGX71yepDKePKDTya3M= -github.com/prometheus/client_golang/exp v0.0.0-20250131124507-433de6814d4b h1:SeZR11qcDl6QeY27jlrC3dNggRyImBlKw3g2g4GJ7LA= -github.com/prometheus/client_golang/exp v0.0.0-20250131124507-433de6814d4b/go.mod h1:0HIB0F0XkTu3oZ6jBaF7CMLznKdaOuFCTOq8lxgsN9Q= +github.com/prometheus/client_golang/exp v0.0.0-20250225163354-248c3f7f612b h1:hKmTbsCQrXLTwcv27uQfkkxvFO82hBf92RkucD8VEDY= +github.com/prometheus/client_golang/exp v0.0.0-20250225163354-248c3f7f612b/go.mod h1:QdwnzTHLXXx636iZ1pfTiCI1Bn/b/20AgMqkPQr4xfA= github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E= github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY= github.com/prometheus/common v0.62.0 h1:xasJaQlnWAeyHdUBeGjXmutelfJHWMRr+Fg4QszZ2Io= @@ -188,8 +188,8 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/protobuf v1.36.1 h1:yBPeRvTftaleIgM3PZ/WBIZ7XM/eEYAaEyCwvyjq/gk= -google.golang.org/protobuf v1.36.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM= +google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/metrics/write.go b/metrics/write.go index 7479c7a..a1f5ab5 100644 --- a/metrics/write.go +++ b/metrics/write.go @@ -115,8 +115,8 @@ func SendRemoteWrite(ctx context.Context, logger *slog.Logger, cfg *ConfigWrite, httpClient := &http.Client{Transport: rt} remoteAPI, err := remote.NewAPI( - httpClient, cfg.URL.String(), + remote.WithAPIHTTPClient(httpClient), remote.WithAPILogger(logger.With("component", "remote_write_api")), ) if err != nil { @@ -254,7 +254,7 @@ func (c *Client) write(ctx context.Context) error { Timeseries: tss[i:end], } - if _, err := c.remoteAPI.Write(ctx, req); err != nil { + if _, err := c.remoteAPI.Write(ctx, remote.WriteV1MessageType, req); err != nil { merr.Add(err) c.logger.Error("error writing metrics", "error", err) return diff --git a/metrics/writev2.go b/metrics/writev2.go index 7a2dfdc..02a9b09 100644 --- a/metrics/writev2.go +++ b/metrics/writev2.go @@ -20,6 +20,7 @@ import ( "sync" "time" + "github.com/prometheus/client_golang/exp/api/remote" writev2 "github.com/prometheus/client_golang/exp/api/remote/genproto/v2" "github.com/prometheus/client_golang/prometheus" dto "github.com/prometheus/client_model/go" @@ -106,7 +107,7 @@ func (c *Client) writeV2(ctx context.Context) error { Symbols: st.Symbols(), // We pass full symbols table to each request for now } - if _, err := c.remoteAPI.Write(ctx, req); err != nil { + if _, err := c.remoteAPI.Write(ctx, remote.WriteV2MessageType, req); err != nil { merr.Add(err) c.logger.Error("error writing metrics", "error", err) return From a5556cae2237e7625aa2c518b18e55c693f9dc83 Mon Sep 17 00:00:00 2001 From: Saswata Mukherjee Date: Thu, 27 Feb 2025 17:09:19 +0000 Subject: [PATCH 6/7] Rename to NewRemoteWriter and Writer Signed-off-by: Saswata Mukherjee --- cmd/avalanche/avalanche.go | 2 +- go.mod | 2 +- go.sum | 4 +-- metrics/write.go | 51 +++++++++++++++++++------------------- metrics/writev2.go | 36 +++++++++++++-------------- 5 files changed, 47 insertions(+), 48 deletions(-) diff --git a/cmd/avalanche/avalanche.go b/cmd/avalanche/avalanche.go index b0a5244..bd4f31f 100644 --- a/cmd/avalanche/avalanche.go +++ b/cmd/avalanche/avalanche.go @@ -87,7 +87,7 @@ func main() { if writeCfg.URL != nil { ctx, cancel := context.WithCancel(context.Background()) g.Add(func() error { - if err := metrics.SendRemoteWrite(ctx, slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo})), writeCfg, reg); err != nil { + if err := metrics.NewRemoteWriter(ctx, slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo})), writeCfg, reg); err != nil { return err } return nil // One-off. diff --git a/go.mod b/go.mod index 0e20c04..e958be4 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ require ( github.com/nelkinda/health-go v0.0.1 github.com/oklog/run v1.1.0 github.com/prometheus/client_golang v1.20.6-0.20250117112434-e0800f53b498 - github.com/prometheus/client_golang/exp v0.0.0-20250225163354-248c3f7f612b + github.com/prometheus/client_golang/exp v0.0.0-20250227122456-ad23ad6d5468 github.com/prometheus/client_model v0.6.1 github.com/prometheus/common v0.62.0 github.com/prometheus/prometheus v0.53.1 diff --git a/go.sum b/go.sum index 27b3c74..2e35ae0 100644 --- a/go.sum +++ b/go.sum @@ -100,8 +100,8 @@ github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRI github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_golang v1.20.6-0.20250117112434-e0800f53b498 h1:pvxJNIN8Tr8FleTHltMExyfFF2GOFBcYbgggNDKvWDw= github.com/prometheus/client_golang v1.20.6-0.20250117112434-e0800f53b498/go.mod h1:SJYWANZZtDbfu6CSYeDvRDkzhGX71yepDKePKDTya3M= -github.com/prometheus/client_golang/exp v0.0.0-20250225163354-248c3f7f612b h1:hKmTbsCQrXLTwcv27uQfkkxvFO82hBf92RkucD8VEDY= -github.com/prometheus/client_golang/exp v0.0.0-20250225163354-248c3f7f612b/go.mod h1:QdwnzTHLXXx636iZ1pfTiCI1Bn/b/20AgMqkPQr4xfA= +github.com/prometheus/client_golang/exp v0.0.0-20250227122456-ad23ad6d5468 h1:Lg8dMj9NGvHGy9ORtzyCkTvHpz1rsE78HT3S9W4ZC1U= +github.com/prometheus/client_golang/exp v0.0.0-20250227122456-ad23ad6d5468/go.mod h1:QdwnzTHLXXx636iZ1pfTiCI1Bn/b/20AgMqkPQr4xfA= github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E= github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY= github.com/prometheus/common v0.62.0 h1:xasJaQlnWAeyHdUBeGjXmutelfJHWMRr+Fg4QszZ2Io= diff --git a/metrics/write.go b/metrics/write.go index a1f5ab5..0405ee8 100644 --- a/metrics/write.go +++ b/metrics/write.go @@ -95,8 +95,8 @@ func (c *ConfigWrite) Validate() error { return nil } -// Client for the remote write requests. -type Client struct { +// Writer for remote write requests. +type Writer struct { logger *slog.Logger timeout time.Duration config *ConfigWrite @@ -104,9 +104,8 @@ type Client struct { remoteAPI *remote.API } -// SendRemoteWrite initializes a http client and -// sends metrics to a prometheus compatible remote endpoint. -func SendRemoteWrite(ctx context.Context, logger *slog.Logger, cfg *ConfigWrite, gatherer prometheus.Gatherer) error { +// NewRemoteWriter initializes a http client and starts a Writer for remote writing metrics to a prometheus compatible remote endpoint. +func NewRemoteWriter(ctx context.Context, logger *slog.Logger, cfg *ConfigWrite, gatherer prometheus.Gatherer) error { var rt http.RoundTripper = &http.Transport{ TLSClientConfig: &cfg.TLSClientConfig, } @@ -123,7 +122,7 @@ func SendRemoteWrite(ctx context.Context, logger *slog.Logger, cfg *ConfigWrite, return err } - client := Client{ + writer := Writer{ logger: logger, timeout: time.Minute, config: cfg, @@ -132,10 +131,10 @@ func SendRemoteWrite(ctx context.Context, logger *slog.Logger, cfg *ConfigWrite, } if cfg.WriteV2 { - return client.writeV2(ctx) + return writer.writeV2(ctx) } - return client.write(ctx) + return writer.write(ctx) } // Add the tenant ID header @@ -177,41 +176,41 @@ func cloneRequest(r *http.Request) *http.Request { return r2 } -func (c *Client) write(ctx context.Context) error { +func (w *Writer) write(ctx context.Context) error { select { // Wait for update first as write and collector.Run runs simultaneously. - case <-c.config.UpdateNotify: + case <-w.config.UpdateNotify: case <-ctx.Done(): return ctx.Err() } - tss, err := collectMetrics(c.gatherer, c.config.OutOfOrder) + tss, err := collectMetrics(w.gatherer, w.config.OutOfOrder) if err != nil { return err } var ( totalTime time.Duration - totalSamplesExp = len(tss) * c.config.RequestCount + totalSamplesExp = len(tss) * w.config.RequestCount totalSamplesAct int mtx sync.Mutex wgMetrics sync.WaitGroup merr = &errors.MultiError{} ) - shouldRunForever := c.config.RequestCount == -1 + shouldRunForever := w.config.RequestCount == -1 if shouldRunForever { log.Printf("Sending: %v timeseries infinitely, %v timeseries per request, %v delay between requests\n", - len(tss), c.config.BatchSize, c.config.RequestInterval) + len(tss), w.config.BatchSize, w.config.RequestInterval) } else { log.Printf("Sending: %v timeseries, %v times, %v timeseries per request, %v delay between requests\n", - len(tss), c.config.RequestCount, c.config.BatchSize, c.config.RequestInterval) + len(tss), w.config.RequestCount, w.config.BatchSize, w.config.RequestInterval) } - ticker := time.NewTicker(c.config.RequestInterval) + ticker := time.NewTicker(w.config.RequestInterval) defer ticker.Stop() - concurrencyLimitCh := make(chan struct{}, c.config.Concurrency) + concurrencyLimitCh := make(chan struct{}, w.config.Concurrency) for i := 0; ; { if ctx.Err() != nil { @@ -219,7 +218,7 @@ func (c *Client) write(ctx context.Context) error { } if !shouldRunForever { - if i >= c.config.RequestCount { + if i >= w.config.RequestCount { break } i++ @@ -227,9 +226,9 @@ func (c *Client) write(ctx context.Context) error { <-ticker.C select { - case <-c.config.UpdateNotify: + case <-w.config.UpdateNotify: log.Println("updating remote write metrics") - tss, err = collectMetrics(c.gatherer, c.config.OutOfOrder) + tss, err = collectMetrics(w.gatherer, w.config.OutOfOrder) if err != nil { merr.Add(err) } @@ -238,7 +237,7 @@ func (c *Client) write(ctx context.Context) error { } start := time.Now() - for i := 0; i < len(tss); i += c.config.BatchSize { + for i := 0; i < len(tss); i += w.config.BatchSize { wgMetrics.Add(1) concurrencyLimitCh <- struct{}{} go func(i int) { @@ -246,7 +245,7 @@ func (c *Client) write(ctx context.Context) error { <-concurrencyLimitCh }() defer wgMetrics.Done() - end := i + c.config.BatchSize + end := i + w.config.BatchSize if end > len(tss) { end = len(tss) } @@ -254,9 +253,9 @@ func (c *Client) write(ctx context.Context) error { Timeseries: tss[i:end], } - if _, err := c.remoteAPI.Write(ctx, remote.WriteV1MessageType, req); err != nil { + if _, err := w.remoteAPI.Write(ctx, remote.WriteV1MessageType, req); err != nil { merr.Add(err) - c.logger.Error("error writing metrics", "error", err) + w.logger.Error("error writing metrics", "error", err) return } @@ -272,10 +271,10 @@ func (c *Client) write(ctx context.Context) error { return merr.Err() } } - if c.config.RequestCount*len(tss) != totalSamplesAct { + if w.config.RequestCount*len(tss) != totalSamplesAct { merr.Add(fmt.Errorf("total samples mismatch, exp:%v , act:%v", totalSamplesExp, totalSamplesAct)) } - c.logger.Info("metrics summary", + w.logger.Info("metrics summary", "total_time", totalTime.Round(time.Second), "total_samples", totalSamplesAct, "samples_per_sec", int(float64(totalSamplesAct)/totalTime.Seconds()), diff --git a/metrics/writev2.go b/metrics/writev2.go index 02a9b09..1a401d2 100644 --- a/metrics/writev2.go +++ b/metrics/writev2.go @@ -29,41 +29,41 @@ import ( "github.com/prometheus-community/avalanche/pkg/errors" ) -func (c *Client) writeV2(ctx context.Context) error { +func (w *Writer) writeV2(ctx context.Context) error { select { // Wait for update first as write and collector.Run runs simultaneously. - case <-c.config.UpdateNotify: + case <-w.config.UpdateNotify: case <-ctx.Done(): return ctx.Err() } - tss, st, err := collectMetricsV2(c.gatherer, c.config.OutOfOrder) + tss, st, err := collectMetricsV2(w.gatherer, w.config.OutOfOrder) if err != nil { return err } var ( totalTime time.Duration - totalSamplesExp = len(tss) * c.config.RequestCount + totalSamplesExp = len(tss) * w.config.RequestCount totalSamplesAct int mtx sync.Mutex wgMetrics sync.WaitGroup merr = &errors.MultiError{} ) - shouldRunForever := c.config.RequestCount == -1 + shouldRunForever := w.config.RequestCount == -1 if shouldRunForever { log.Printf("Sending: %v timeseries infinitely, %v timeseries per request, %v delay between requests\n", - len(tss), c.config.BatchSize, c.config.RequestInterval) + len(tss), w.config.BatchSize, w.config.RequestInterval) } else { log.Printf("Sending: %v timeseries, %v times, %v timeseries per request, %v delay between requests\n", - len(tss), c.config.RequestCount, c.config.BatchSize, c.config.RequestInterval) + len(tss), w.config.RequestCount, w.config.BatchSize, w.config.RequestInterval) } - ticker := time.NewTicker(c.config.RequestInterval) + ticker := time.NewTicker(w.config.RequestInterval) defer ticker.Stop() - concurrencyLimitCh := make(chan struct{}, c.config.Concurrency) + concurrencyLimitCh := make(chan struct{}, w.config.Concurrency) for i := 0; ; { if ctx.Err() != nil { @@ -71,7 +71,7 @@ func (c *Client) writeV2(ctx context.Context) error { } if !shouldRunForever { - if i >= c.config.RequestCount { + if i >= w.config.RequestCount { break } i++ @@ -79,9 +79,9 @@ func (c *Client) writeV2(ctx context.Context) error { <-ticker.C select { - case <-c.config.UpdateNotify: + case <-w.config.UpdateNotify: log.Println("updating remote write metrics") - tss, st, err = collectMetricsV2(c.gatherer, c.config.OutOfOrder) + tss, st, err = collectMetricsV2(w.gatherer, w.config.OutOfOrder) if err != nil { merr.Add(err) } @@ -90,7 +90,7 @@ func (c *Client) writeV2(ctx context.Context) error { } start := time.Now() - for i := 0; i < len(tss); i += c.config.BatchSize { + for i := 0; i < len(tss); i += w.config.BatchSize { wgMetrics.Add(1) concurrencyLimitCh <- struct{}{} go func(i int) { @@ -98,7 +98,7 @@ func (c *Client) writeV2(ctx context.Context) error { <-concurrencyLimitCh }() defer wgMetrics.Done() - end := i + c.config.BatchSize + end := i + w.config.BatchSize if end > len(tss) { end = len(tss) } @@ -107,9 +107,9 @@ func (c *Client) writeV2(ctx context.Context) error { Symbols: st.Symbols(), // We pass full symbols table to each request for now } - if _, err := c.remoteAPI.Write(ctx, remote.WriteV2MessageType, req); err != nil { + if _, err := w.remoteAPI.Write(ctx, remote.WriteV2MessageType, req); err != nil { merr.Add(err) - c.logger.Error("error writing metrics", "error", err) + w.logger.Error("error writing metrics", "error", err) return } @@ -125,10 +125,10 @@ func (c *Client) writeV2(ctx context.Context) error { return merr.Err() } } - if c.config.RequestCount*len(tss) != totalSamplesAct { + if w.config.RequestCount*len(tss) != totalSamplesAct { merr.Add(fmt.Errorf("total samples mismatch, exp:%v , act:%v", totalSamplesExp, totalSamplesAct)) } - c.logger.Info("metrics summary", + w.logger.Info("metrics summary", "total_time", totalTime.Round(time.Second), "total_samples", totalSamplesAct, "samples_per_sec", int(float64(totalSamplesAct)/totalTime.Seconds()), From 23b79befe773a9a81bafc056cac65c39aefd4c60 Mon Sep 17 00:00:00 2001 From: Saswata Mukherjee Date: Fri, 28 Feb 2025 09:47:42 +0000 Subject: [PATCH 7/7] Rename to RunRemoteWriting Signed-off-by: Saswata Mukherjee --- cmd/avalanche/avalanche.go | 2 +- metrics/write.go | 5 ++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/cmd/avalanche/avalanche.go b/cmd/avalanche/avalanche.go index bd4f31f..6ebf68d 100644 --- a/cmd/avalanche/avalanche.go +++ b/cmd/avalanche/avalanche.go @@ -87,7 +87,7 @@ func main() { if writeCfg.URL != nil { ctx, cancel := context.WithCancel(context.Background()) g.Add(func() error { - if err := metrics.NewRemoteWriter(ctx, slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo})), writeCfg, reg); err != nil { + if err := metrics.RunRemoteWriting(ctx, slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo})), writeCfg, reg); err != nil { return err } return nil // One-off. diff --git a/metrics/write.go b/metrics/write.go index 0405ee8..a36fcbe 100644 --- a/metrics/write.go +++ b/metrics/write.go @@ -104,8 +104,8 @@ type Writer struct { remoteAPI *remote.API } -// NewRemoteWriter initializes a http client and starts a Writer for remote writing metrics to a prometheus compatible remote endpoint. -func NewRemoteWriter(ctx context.Context, logger *slog.Logger, cfg *ConfigWrite, gatherer prometheus.Gatherer) error { +// RunRemoteWriting initializes a http client and starts a Writer for remote writing metrics to a prometheus compatible remote endpoint. +func RunRemoteWriting(ctx context.Context, logger *slog.Logger, cfg *ConfigWrite, gatherer prometheus.Gatherer) error { var rt http.RoundTripper = &http.Transport{ TLSClientConfig: &cfg.TLSClientConfig, } @@ -150,7 +150,6 @@ type tenantRoundTripper struct { rt http.RoundTripper } -// User agent round tripper type userAgentRoundTripper struct { userAgent string rt http.RoundTripper