From f6c6408f4d4c3485ac3840fa954152c1261b1917 Mon Sep 17 00:00:00 2001 From: amanycodes Date: Sun, 2 Feb 2025 18:47:27 +0530 Subject: [PATCH] added min-time in remote-out-of-order flag Signed-off-by: amanycodes --- metrics/write.go | 36 ++++++++++++++++++------------------ metrics/write_test.go | 28 +++++++++++++--------------- 2 files changed, 31 insertions(+), 33 deletions(-) diff --git a/metrics/write.go b/metrics/write.go index ab7d1e8..d22972a 100644 --- a/metrics/write.go +++ b/metrics/write.go @@ -48,13 +48,13 @@ type ConfigWrite struct { RequestInterval time.Duration BatchSize, RequestCount int - UpdateNotify chan struct{} - PprofURLs []*url.URL - Tenant string - TLSClientConfig tls.Config - TenantHeader string - OutOfOrder bool - Concurrency int + UpdateNotify chan struct{} + PprofURLs []*url.URL + Tenant string + TLSClientConfig tls.Config + TenantHeader string + OutOfOrderMinTime time.Duration + Concurrency int } func NewWriteConfigFromFlags(flagReg func(name, help string) *kingpin.FlagClause) *ConfigWrite { @@ -76,8 +76,8 @@ func NewWriteConfigFromFlags(flagReg func(name, help string) *kingpin.FlagClause flagReg("remote-tenant-header", "Tenant ID to include in remote_write send. The default, is the default tenant header expected by Cortex.").Default("X-Scope-OrgID"). StringVar(&cfg.TenantHeader) // TODO(bwplotka): Make this a non-bool flag (e.g. out-of-order-min-time). - flagReg("remote-out-of-order", "Enable out-of-order timestamps in remote write requests").Default("true"). - BoolVar(&cfg.OutOfOrder) + flagReg("remote-out-of-order.min-time", "Specifies the minimum duration by which out-of-order timestamps can be accepted in remote write requests.").Default("0"). + DurationVar(&cfg.OutOfOrderMinTime) return cfg } @@ -160,7 +160,7 @@ func (c *Client) write(ctx context.Context) error { return ctx.Err() } - tss, err := collectMetrics(c.gatherer, c.config.OutOfOrder) + tss, err := collectMetrics(c.gatherer, c.config.OutOfOrderMinTime) if err != nil { return err } @@ -204,7 +204,7 @@ func (c *Client) write(ctx context.Context) error { select { case <-c.config.UpdateNotify: log.Println("updating remote write metrics") - tss, err = collectMetrics(c.gatherer, c.config.OutOfOrder) + tss, err = collectMetrics(c.gatherer, c.config.OutOfOrderMinTime) if err != nil { merr.Add(err) } @@ -259,25 +259,25 @@ func updateTimetamps(tss []prompb.TimeSeries) []prompb.TimeSeries { return tss } -func collectMetrics(gatherer prometheus.Gatherer, outOfOrder bool) ([]prompb.TimeSeries, error) { +func collectMetrics(gatherer prometheus.Gatherer, outOfOrderMinTime time.Duration) ([]prompb.TimeSeries, error) { metricFamilies, err := gatherer.Gather() if err != nil { return nil, err } tss := ToTimeSeriesSlice(metricFamilies) - if outOfOrder { - tss = shuffleTimestamps(tss) + if outOfOrderMinTime != 0 { + tss = shuffleTimestamps(outOfOrderMinTime, tss) } return tss, nil } -func shuffleTimestamps(tss []prompb.TimeSeries) []prompb.TimeSeries { +func shuffleTimestamps(minTime time.Duration, tss []prompb.TimeSeries) []prompb.TimeSeries { now := time.Now().UnixMilli() - offsets := []int64{0, -60 * 1000, -5 * 60 * 1000} + interval := minTime.Milliseconds() / int64(len(tss)) for i := range tss { - offset := offsets[i%len(offsets)] - tss[i].Samples[0].Timestamp = now + offset + offset := int64(i) * interval + tss[i].Samples[0].Timestamp = now - offset } return tss } diff --git a/metrics/write_test.go b/metrics/write_test.go index dbec7f7..370799c 100644 --- a/metrics/write_test.go +++ b/metrics/write_test.go @@ -22,27 +22,25 @@ import ( func TestShuffleTimestamps(t *testing.T) { now := time.Now().UnixMilli() - + minTime := 5 * time.Minute tss := []prompb.TimeSeries{ {Samples: []prompb.Sample{{Timestamp: now}}}, {Samples: []prompb.Sample{{Timestamp: now}}}, {Samples: []prompb.Sample{{Timestamp: now}}}, } - shuffledTSS := shuffleTimestamps(tss) - - offsets := []int64{0, -60 * 1000, -5 * 60 * 1000} - for _, ts := range shuffledTSS { - timestampValid := false - for _, offset := range offsets { - expectedTimestamp := now + offset - if ts.Samples[0].Timestamp == expectedTimestamp { - timestampValid = true - break - } - } - if !timestampValid { - t.Errorf("Timestamp %v is not in the expected offsets: %v", ts.Samples[0].Timestamp, offsets) + shuffledTSS := shuffleTimestamps(minTime, tss) + interval := minTime.Milliseconds() / int64(len(tss)) + + expectedTimestamps := []int64{ + now, + now - interval, + now - 2*interval, + } + + for i, ts := range shuffledTSS { + if ts.Samples[0].Timestamp != expectedTimestamps[i] { + t.Errorf("Expected timestamp %d, but got %d", expectedTimestamps[i], ts.Samples[0].Timestamp) } }