Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 31 additions & 2 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,23 @@ func runReceive(
})
}

// Start the disk probe if configured.
if probeInterval := time.Duration(*conf.diskProbeInterval); probeInterval > 0 {
level.Info(logger).Log("msg", "setting up disk I/O probe", "interval", probeInterval, "write_size", conf.diskProbeWriteSize)
diskProbe := receive.NewDiskProbe(log.With(logger, "component", "disk-probe"), receive.DiskProbeOptions{
Dir: conf.dataDir,
Interval: probeInterval,
WriteSize: conf.diskProbeWriteSize,
}, reg)
stop := make(chan struct{})
g.Add(func() error {
diskProbe.Run(stop)
return nil
}, func(err error) {
close(stop)
})
}

if receiveMode == receive.IngestorOnly {
level.Debug(logger).Log("msg", "setting up periodic top metrics collection")
topMetricNumSeries := promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Expand Down Expand Up @@ -1047,6 +1064,10 @@ type receiveConfig struct {
poolingEnabled bool
maxPooledCompressedCap int
maxPooledDecompressedCap int

// Disk probe configuration.
diskProbeInterval *model.Duration
diskProbeWriteSize int
}

func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {
Expand Down Expand Up @@ -1096,10 +1117,10 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {

cmd.Flag("receive.hashrings", "Alternative to 'receive.hashrings-file' flag (lower priority). Content of file that contains the hashring configuration.").PlaceHolder("<content>").StringVar(&rc.hashringsFileContent)

hashringAlgorithmsHelptext := strings.Join([]string{string(receive.AlgorithmHashmod), string(receive.AlgorithmKetama)}, ", ")
hashringAlgorithmsHelptext := strings.Join([]string{string(receive.AlgorithmHashmod), string(receive.AlgorithmKetama), string(receive.AlgorithmRendezvous)}, ", ")
cmd.Flag("receive.hashrings-algorithm", "The algorithm used when distributing series in the hashrings. Must be one of "+hashringAlgorithmsHelptext+". Will be overwritten by the tenant-specific algorithm in the hashring config.").
Default(string(receive.AlgorithmHashmod)).
EnumVar(&rc.hashringsAlgorithm, string(receive.AlgorithmHashmod), string(receive.AlgorithmKetama), string(receive.AlgorithmAlignedKetama))
EnumVar(&rc.hashringsAlgorithm, string(receive.AlgorithmHashmod), string(receive.AlgorithmKetama), string(receive.AlgorithmRendezvous))

rc.refreshInterval = extkingpin.ModelDuration(cmd.Flag("receive.hashrings-file-refresh-interval", "Refresh interval to re-read the hashring configuration file. (used as a fallback)").
Default("5m"))
Expand Down Expand Up @@ -1263,6 +1284,14 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {
cmd.Flag("receive.max-pooled-decompressed-cap", "Maximum capacity (bytes) of a decompressed buffer that will be returned to the pool. Buffers larger than this are discarded to prevent pool ballooning.").
Default(fmt.Sprintf("%d", receive.DefaultMaxPooledDecompressedCap)).
IntVar(&rc.maxPooledDecompressedCap)

rc.diskProbeInterval = extkingpin.ModelDuration(cmd.Flag("receive.disk-probe-interval",
"Interval between synthetic disk write probes. The probe writes a small payload to the TSDB data directory "+
"and calls fsync to measure actual disk I/O latency. Set to 0s to disable.").
Default("0s"))
cmd.Flag("receive.disk-probe-write-size",
"Number of bytes written per disk probe operation.").
Default("1024").IntVar(&rc.diskProbeWriteSize)
}

// determineMode returns the ReceiverMode that this receiver is configured to run in.
Expand Down
153 changes: 0 additions & 153 deletions pkg/receive/aligned_hashring.go

This file was deleted.

Loading
Loading