Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
4ac2eee
Add becnhamrks and tests for per-sample HA dedup
dimitarvdimitrov Oct 5, 2022
4c3e034
Implement partial HA dedup
dimitarvdimitrov Oct 23, 2022
6495815
Add comments
dimitarvdimitrov Oct 1, 2023
b8d4631
Optimize for single replica
dimitarvdimitrov Oct 1, 2023
b9f465a
optimize for single replica, improve findHALabels
dimitarvdimitrov Oct 1, 2023
db03fe0
Rebase and complete per-sample ha deduplication
julietteO Nov 26, 2025
04044a0
typo
vaxvms Nov 27, 2025
0b32a72
Restore costAttribution manager call
vaxvms Nov 27, 2025
ed6bb0b
Rebase and complete per-sample ha deduplication
julietteO Nov 26, 2025
0f6cf6b
Add updated benchmarks and stats
julietteO Dec 1, 2025
a13e021
Correctly count samples for multi replica case
vaxvms Dec 1, 2025
03b44ad
Clone string in haReplica to avoid gRPC buffer issues
julietteO Dec 1, 2025
1479197
Replace deprecated opentracing
julietteO Dec 1, 2025
097f6d5
Update documentation about ha tracker behavior
vaxvms Dec 3, 2025
07b5a7c
Refacto to remove useless switch in tests
julietteO Dec 3, 2025
0481f60
Add testcase on `TestHaDedupeMiddleware`
vaxvms Dec 3, 2025
2fa92f0
Move comment as godoc for replicaState
julietteO Dec 3, 2025
44cb3d1
loop is useless
vaxvms Dec 3, 2025
593243d
Set all seen clusters and replicas value as attribute of the trace
vaxvms Dec 3, 2025
456cfc0
Refacto findHALabels so it returns an haReplica
julietteO Dec 4, 2025
71a9d43
Break down prePushHaDedupeMiddleware to use helper functions
julietteO Dec 4, 2025
2581474
Only clone values if and when needed
julietteO Dec 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
* [ENHANCEMENT] Server: The `/metrics` endpoint now supports metrics filtering by providing one or more `name[]` query parameters. #13746
* [ENHANCEMENT] Ingester: Make sharded active-series requests matching all series faster. #13491
* [ENHANCEMENT] Partitions ring: Add support to forcefully lock a partition state through the web UI. #13811
* [ENHANCEMENT] HA: Deduplicate per sample instead of per batch. #13665
* [BUGFIX] Compactor: Fix potential concurrent map writes. #13053
* [BUGFIX] Query-frontend: Fix issue where queries sometimes fail with `failed to receive query result stream message: rpc error: code = Canceled desc = context canceled` if remote execution is enabled. #13084
* [BUGFIX] Query-frontend: Fix issue where query stats, such as series read, did not include the parameters to the `histogram_quantile` and `histogram_fraction` functions if remote execution was enabled. #13084
Expand Down Expand Up @@ -113,6 +114,7 @@
* [BUGFIX] Ruler: Fixed `-ruler.max-rule-groups-per-tenant-by-namespace` to only count rule groups in the specified namespace instead of all namespaces. #13743
* [BUGFIX] Update to Go v1.25.5 to address [CVE-2025-61729](https://pkg.go.dev/vuln/GO-2025-4155). #13755
* [BUGFIX] Query-frontend: Fix race condition that could sometimes cause unnecessary resharding of queriers if querier shuffle sharding and remote execution is enabled. #13794
* [ENHANCEMENT] HA: Deduplication per sample instead of per batch. #13665

### Mixin

Expand Down
672 changes: 672 additions & 0 deletions benchmarks/after.txt
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you delete these before merging?

Large diffs are not rendered by default.

648 changes: 648 additions & 0 deletions benchmarks/before.txt

Large diffs are not rendered by default.

186 changes: 186 additions & 0 deletions benchmarks/bench_stat.txt
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for running the benchmarks. it looks like there's nothing to worry about, but imo worth running the cases with HA tracker without your diff too.

normally it's enough to post this as a comment, but committed works too. Just don't forget to delete it before this PR is merged

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ Incoming samples are considered duplicated (and thus dropped) if they are receiv

If the HA tracker is enabled but incoming samples contain only one or none of the cluster and replica labels, these samples are accepted by default and never deduplicated.

> Note: for performance reasons, the HA tracker only checks the cluster and replica label of the first series in the request to determine whether all series in the request should be deduplicated. This assumes that all series inside the request have the same cluster and replica labels, which is typically true when Prometheus is configured with external labels. Ensure this requirement is honored if you have a non-standard Prometheus setup (for example, you're using Prometheus federation or have a metrics proxy in between).
> Note: the HA tracker checks the cluster and replica label of every series in the request to determine whether each series in the request should be deduplicated.

## Configuration

Expand Down
310 changes: 263 additions & 47 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/grafana/dskit/limiter"
"github.com/grafana/dskit/middleware"
"github.com/grafana/dskit/mtime"
"github.com/grafana/dskit/multierror"
"github.com/grafana/dskit/ring"
ring_client "github.com/grafana/dskit/ring/client"
"github.com/grafana/dskit/services"
Expand Down Expand Up @@ -1132,6 +1133,149 @@ func (d *Distributor) wrapPushWithMiddlewares(next PushFunc) PushFunc {

}

type replicaState int

const (
// replicaRejectedUnknown sample is rejected due to an unknown error.
replicaRejectedUnknown replicaState = 0
// replicaIsPrimary sample is from the elected primary replica and should be accepted.
replicaIsPrimary replicaState = 1 << iota
// replicaNotHA sample doesn't have both HA labels and should be accepted.
replicaNotHA
// replicaDeduped sample is from a non-primary replica and should be deduplicated.
replicaDeduped
// replicaRejectedTooManyClusters sample is rejected because the tenant has too many HA clusters.
replicaRejectedTooManyClusters

replicaAccepted = replicaIsPrimary | replicaNotHA
)

type haReplica struct {
cluster, replica string
}

type replicaInfo struct {
state replicaState
sampleCount int
}

// replicaObserved checks if a sample from a given replica should be accepted for ingestion based on HA deduplication rules.
func (d *Distributor) replicaObserved(ctx context.Context, userID string, replica haReplica, ts int64) (replicaState, error) {
span := trace.SpanFromContext(ctx)
span.SetAttributes(
// Make a copy of these, since they may be retained as tags
attribute.String("cluster", strings.Clone(replica.cluster)),
attribute.String("replica", strings.Clone(replica.replica)),
)

isAccepted, err := d.checkSample(ctx, userID, replica.cluster, replica.replica, ts)
if err != nil {
switch {
case errors.As(err, &replicasDidNotMatchError{}):
// These samples have been deduped.
return replicaDeduped, err
case errors.As(err, &tooManyClustersError{}):
return replicaRejectedTooManyClusters, err
default:
return replicaRejectedUnknown, err
}
}

if isAccepted {
return replicaIsPrimary, nil
}
// If there wasn't an error but isAccepted is false that means we didn't find both HA labels.
return replicaNotHA, nil
}

func getReplicasFromRequest(req *mimirpb.WriteRequest, haReplicaLabel, haClusterLabel string) []haReplica {
replicas := make([]haReplica, len(req.Timeseries))
for i, ts := range req.Timeseries {
// Make a copy of these, since they may be retained as labels on our metrics, e.g. dedupedSamples.
replicas[i] = findHALabels(haReplicaLabel, haClusterLabel, ts.Labels)
}
return replicas
}

func getEarliestSampleTimestamp(req *mimirpb.WriteRequest, defaultTimestamp int64) int64 {
earliestSampleTimestamp := defaultTimestamp
for _, ts := range req.Timeseries {
if len(ts.Samples) > 0 {
tsms := ts.Samples[0].TimestampMs
if tsms < earliestSampleTimestamp {
earliestSampleTimestamp = tsms
}
}
if len(ts.Histograms) > 0 {
tsms := ts.Histograms[0].Timestamp
if tsms < earliestSampleTimestamp {
earliestSampleTimestamp = tsms
}
}
}
return earliestSampleTimestamp
}

func (d *Distributor) processHaReplicas(ctx context.Context, userID string, sampleTimestamp int64, replicaInfos map[haReplica]*replicaInfo) (map[replicaState]int, error) {
var errs multierror.MultiError
samplesPerState := make(map[replicaState]int)
for replicaKey, info := range replicaInfos {
if info.state == replicaRejectedUnknown {
state, replicaErr := d.replicaObserved(ctx, userID, replicaKey, sampleTimestamp)
if replicaErr != nil {
errs.Add(replicaErr)
}
info.state = state
}
samplesPerState[info.state] += info.sampleCount
}
return samplesPerState, errs.Err()
}

func getReplicaInfos(req *mimirpb.WriteRequest, replicas []haReplica) map[haReplica]*replicaInfo {
replicaInfos := make(map[haReplica]*replicaInfo)
// Check if all timeseries belong to the same replica
firstReplica := replicas[0]
isOneReplica := true
for i := 1; i < len(req.Timeseries); i++ {
if replicas[i] != firstReplica {
isOneReplica = false
break
}
}

// Count samples per replica
if isOneReplica {
numSamples := 0
for _, ts := range req.Timeseries {
numSamples += len(ts.Samples) + len(ts.Histograms)
}
// The replica info is stored in a map where the key is the replica itself.
// The replica labels are references to the request buffer, which will be reused.
// To safely use the replica as map key, we need to clone its labels.
firstReplica.cluster = strings.Clone(firstReplica.cluster)
firstReplica.replica = strings.Clone(firstReplica.replica)
replicaInfos[firstReplica] = &replicaInfo{sampleCount: numSamples}
} else {
for i, ts := range req.Timeseries {
r := replicas[i]
info := replicaInfos[r]
if info == nil {
// The replica info is stored in a map where the key is the replica itself.
// The replica labels are references to the request buffer, which will be reused.
// To safely use the replica as map key, we need to clone its labels.
r.cluster = strings.Clone(r.cluster)
r.replica = strings.Clone(r.replica)

info = &replicaInfo{}
replicaInfos[r] = info
}
info.sampleCount += len(ts.Samples) + len(ts.Histograms)
}
}
return replicaInfos
}

func (d *Distributor) prePushHaDedupeMiddleware(next PushFunc) PushFunc {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's way too many things happening in this function already let's break it up. i'll try to leave some suggestions

return WithCleanup(next, func(next PushFunc, ctx context.Context, pushReq *Request) error {
req, err := pushReq.WriteRequest()
Expand All @@ -1149,75 +1293,147 @@ func (d *Distributor) prePushHaDedupeMiddleware(next PushFunc) PushFunc {
}

haReplicaLabel := d.limits.HAReplicaLabel(userID)
cluster, replica := findHALabels(haReplicaLabel, d.limits.HAClusterLabel(userID), req.Timeseries[0].Labels)
// Make a copy of these, since they may be retained as labels on our metrics, e.g. dedupedSamples.
cluster, replica = strings.Clone(cluster), strings.Clone(replica)
haClusterLabel := d.limits.HAClusterLabel(userID)

replicas := getReplicasFromRequest(req, haReplicaLabel, haClusterLabel)

span := trace.SpanFromContext(ctx)
span.SetAttributes(
attribute.String("cluster", cluster),
attribute.String("replica", replica),
)

numSamples := 0
now := time.Now()

group := d.activeGroups.UpdateActiveGroupTimestamp(userID, validation.GroupLabel(d.limits, userID, req.Timeseries), now)
sampleTimestamp := timestamp.FromTime(now)
if d.limits.HATrackerUseSampleTimeForFailover(userID) {
earliestSampleTimestamp := sampleTimestamp
for _, ts := range req.Timeseries {
if len(ts.Samples) > 0 {
tsms := ts.Samples[0].TimestampMs
if tsms < earliestSampleTimestamp {
earliestSampleTimestamp = tsms
}
}
if len(ts.Histograms) > 0 {
tsms := ts.Histograms[0].Timestamp
if tsms < earliestSampleTimestamp {
earliestSampleTimestamp = tsms
}
}
}
sampleTimestamp = earliestSampleTimestamp
sampleTimestamp = getEarliestSampleTimestamp(req, sampleTimestamp)
}
for _, ts := range req.Timeseries {
numSamples += len(ts.Samples) + len(ts.Histograms)

var errs multierror.MultiError
replicaInfos := getReplicaInfos(req, replicas)

var clusters []string
var replicasAsStrings []string
for replicaKey := range replicaInfos {
clusters = append(clusters, replicaKey.cluster)
replicasAsStrings = append(replicasAsStrings, replicaKey.replica)
}
span.SetAttributes(
attribute.StringSlice("clusters", clusters),
attribute.StringSlice("replicas", replicasAsStrings),
)

removeReplica, err := d.checkSample(ctx, userID, cluster, replica, sampleTimestamp)
if err != nil {
if errors.As(err, &replicasDidNotMatchError{}) {
// These samples have been deduped.
d.dedupedSamples.WithLabelValues(userID, cluster).Add(float64(numSamples))
}
samplesPerState, processErr := d.processHaReplicas(ctx, userID, sampleTimestamp, replicaInfos)
if processErr != nil {
errs.Add(processErr)
}

if errors.As(err, &tooManyClustersError{}) {
d.discardedSamplesTooManyHaClusters.WithLabelValues(userID, group).Add(float64(numSamples))
d.costAttributionMgr.SampleTracker(userID).IncrementDiscardedSamples(req.Timeseries[0].Labels, float64(numSamples), reasonTooManyHAClusters, now)
}
lastAccepted := sortByAccepted(req, replicaInfos, replicas)
removeHAReplicaLabels(req, lastAccepted, replicas, replicaInfos, haReplicaLabel)

return err
// We don't want to send samples beyond the last accepted sample - that was deduplicated
if samplesPerState[replicaRejectedTooManyClusters] > 0 {
d.updateHADedupeMetrics(userID, group, replicaInfos, samplesPerState, req.Timeseries[lastAccepted+1].Labels)
} else {
d.updateHADedupeMetrics(userID, group, replicaInfos, samplesPerState, nil)
}
pushReq.AddCleanup(sliceUnacceptedRequests(req, lastAccepted))

if removeReplica {
// If we found both the cluster and replica labels, we only want to include the cluster label when
// storing series in Mimir. If we kept the replica label we would end up with another series for the same
// series we're trying to dedupe when HA tracking moves over to a different replica.
for ix := range req.Timeseries {
req.Timeseries[ix].RemoveLabel(haReplicaLabel)
if len(req.Timeseries) > 0 {
if pushErr := next(ctx, pushReq); pushErr != nil {
errs.Add(pushErr)
}
} else {
// If there wasn't an error but removeReplica is false that means we didn't find both HA labels.
d.nonHASamples.WithLabelValues(userID).Add(float64(numSamples))
}

return next(ctx, pushReq)
return errs.Err()
})
}

func removeHAReplicaLabels(req *mimirpb.WriteRequest, lastAccepted int, replicas []haReplica, replicaInfos map[haReplica]*replicaInfo, haReplicaLabel string) {
for i := 0; i <= lastAccepted; i++ {
r := replicas[i]
s := replicaInfos[r].state
if s&replicaIsPrimary == 0 {
continue
}
// If we found both the cluster and replica labels, we only want to include the cluster label when
// storing series in Mimir. If we kept the replica label we would end up with another series for the same
// series we're trying to dedupe when HA tracking moves over to a different replica.
req.Timeseries[i].RemoveLabel(haReplicaLabel)
}
}

func sliceUnacceptedRequests(req *mimirpb.WriteRequest, lastAccepted int) func() {
originalLen := len(req.Timeseries)
req.Timeseries = req.Timeseries[:lastAccepted+1]
return func() {
// Restore the length so that we can put back all the series in the request to the memory pool
req.Timeseries = req.Timeseries[:originalLen]
}

}

// updateHADedupeMetrics updates metrics related to HA deduplication.
func (d *Distributor) updateHADedupeMetrics(userID, group string, replicaInfos map[haReplica]*replicaInfo, samplesPerState map[replicaState]int, labels []mimirpb.LabelAdapter) {
for replica, info := range replicaInfos {
if info.state&replicaDeduped != 0 && info.sampleCount > 0 {
cluster := strings.Clone(replica.cluster) // Make a copy of this, since it may be retained as labels on our metrics
d.dedupedSamples.WithLabelValues(userID, cluster).Add(float64(info.sampleCount))
}
}
if samplesPerState[replicaNotHA] > 0 {
d.nonHASamples.WithLabelValues(userID).Add(float64(samplesPerState[replicaNotHA]))
}
if samplesPerState[replicaRejectedTooManyClusters] > 0 {
d.costAttributionMgr.SampleTracker(userID).IncrementDiscardedSamples(labels, float64(samplesPerState[replicaRejectedTooManyClusters]), reasonTooManyHAClusters, time.Now())
d.discardedSamplesTooManyHaClusters.WithLabelValues(userID, group).Add(float64(samplesPerState[replicaRejectedTooManyClusters]))
}
}

// sortByAccepted returns the index of the last accepted timeseries in the write request based on the ha dedup states of the replicas
func sortByAccepted(req *mimirpb.WriteRequest, replicaInfos map[haReplica]*replicaInfo, replicas []haReplica) int {
numAcceptedReplicas := 0
for _, info := range replicaInfos {
if info.state&replicaAccepted != 0 {
numAcceptedReplicas++
}
}
if numAcceptedReplicas == len(replicaInfos) {
return len(req.Timeseries) - 1
}
if numAcceptedReplicas == 0 {
return -1
}
findPreviousAccepted := func(i int) int {
for i > 0 {
state := replicaInfos[replicas[i]].state
if state&replicaAccepted != 0 {
break
}
i--
}
return i
}
lastAccepted := findPreviousAccepted(len(req.Timeseries) - 1)
// next we shift all accepted samples to the front of the timeseries slice
for i := range req.Timeseries {
if i > lastAccepted {
break
}
state := replicaInfos[replicas[i]].state
if state&replicaAccepted == 0 {
req.Timeseries[i], req.Timeseries[lastAccepted] = req.Timeseries[lastAccepted], req.Timeseries[i]
replicas[i], replicas[lastAccepted] = replicas[lastAccepted], replicas[i]
lastAccepted--
lastAccepted = findPreviousAccepted(lastAccepted)
}

}

return lastAccepted
}

func (d *Distributor) prePushRelabelMiddleware(next PushFunc) PushFunc {
return WithCleanup(next, func(next PushFunc, ctx context.Context, pushReq *Request) error {

req, err := pushReq.WriteRequest()
if err != nil {
return err
Expand Down
Loading