Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,23 @@ func runReceive(
})
}

// Start the disk probe — writes 1KB + fsync to the TSDB data directory once per 5 seconds
// to measure actual disk I/O latency.
{
diskProbe := receive.NewDiskProbe(log.With(logger, "component", "disk-probe"), receive.DiskProbeOptions{
Dir: conf.dataDir,
Interval: 5 * time.Second,
WriteSize: 1024,
}, reg)
stop := make(chan struct{})
g.Add(func() error {
diskProbe.Run(stop)
return nil
}, func(err error) {
close(stop)
})
}

if receiveMode == receive.IngestorOnly {
level.Debug(logger).Log("msg", "setting up periodic top metrics collection")
topMetricNumSeries := promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Expand Down Expand Up @@ -1096,10 +1113,10 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {

cmd.Flag("receive.hashrings", "Alternative to 'receive.hashrings-file' flag (lower priority). Content of file that contains the hashring configuration.").PlaceHolder("<content>").StringVar(&rc.hashringsFileContent)

hashringAlgorithmsHelptext := strings.Join([]string{string(receive.AlgorithmHashmod), string(receive.AlgorithmKetama)}, ", ")
hashringAlgorithmsHelptext := strings.Join([]string{string(receive.AlgorithmHashmod), string(receive.AlgorithmKetama), string(receive.AlgorithmRendezvous)}, ", ")
cmd.Flag("receive.hashrings-algorithm", "The algorithm used when distributing series in the hashrings. Must be one of "+hashringAlgorithmsHelptext+". Will be overwritten by the tenant-specific algorithm in the hashring config.").
Default(string(receive.AlgorithmHashmod)).
EnumVar(&rc.hashringsAlgorithm, string(receive.AlgorithmHashmod), string(receive.AlgorithmKetama), string(receive.AlgorithmAlignedKetama))
EnumVar(&rc.hashringsAlgorithm, string(receive.AlgorithmHashmod), string(receive.AlgorithmKetama), string(receive.AlgorithmRendezvous))

rc.refreshInterval = extkingpin.ModelDuration(cmd.Flag("receive.hashrings-file-refresh-interval", "Refresh interval to re-read the hashring configuration file. (used as a fallback)").
Default("5m"))
Expand Down Expand Up @@ -1263,6 +1280,7 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {
cmd.Flag("receive.max-pooled-decompressed-cap", "Maximum capacity (bytes) of a decompressed buffer that will be returned to the pool. Buffers larger than this are discarded to prevent pool ballooning.").
Default(fmt.Sprintf("%d", receive.DefaultMaxPooledDecompressedCap)).
IntVar(&rc.maxPooledDecompressedCap)

}

// determineMode returns the ReceiverMode that this receiver is configured to run in.
Expand Down
153 changes: 0 additions & 153 deletions pkg/receive/aligned_hashring.go

This file was deleted.

Loading
Loading