-
Notifications
You must be signed in to change notification settings - Fork 78
Expand file tree
/
Copy pathduplicate_label.go
More file actions
120 lines (103 loc) · 2.39 KB
/
duplicate_label.go
File metadata and controls
120 lines (103 loc) · 2.39 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
// Copyright (c) The Thanos Community Authors.
// Licensed under the Apache License 2.0.
package exchange
import (
"context"
"sync"
"github.com/thanos-io/promql-engine/execution/model"
"github.com/thanos-io/promql-engine/execution/telemetry"
"github.com/thanos-io/promql-engine/extlabels"
"github.com/thanos-io/promql-engine/query"
"github.com/prometheus/prometheus/model/labels"
)
type pair struct{ a, b int }
type duplicateLabelCheckOperator struct {
once sync.Once
next model.VectorOperator
p []pair
c []uint64
lastTs int64
}
func NewDuplicateLabelCheck(next model.VectorOperator, opts *query.Options) model.VectorOperator {
oper := &duplicateLabelCheckOperator{
next: next,
}
return telemetry.NewOperator(telemetry.NewTelemetry(oper, opts), oper)
}
func (d *duplicateLabelCheckOperator) Next(ctx context.Context, buf []model.StepVector) (int, error) {
select {
case <-ctx.Done():
return 0, ctx.Err()
default:
}
if err := d.init(ctx); err != nil {
return 0, err
}
n, err := d.next.Next(ctx, buf)
if err != nil {
return 0, err
}
if n == 0 {
return 0, nil
}
if len(d.p) > 0 {
ts := buf[0].T
if ts != d.lastTs {
d.lastTs = ts
for i := range d.p {
d.c[d.p[i].a] = 0
d.c[d.p[i].b] = 0
}
}
for i := range n {
for _, sid := range buf[i].SampleIDs {
d.c[sid] |= 1 << i
}
}
for i := range d.p {
if d.c[d.p[i].a]&d.c[d.p[i].b] > 0 {
return 0, extlabels.ErrDuplicateLabelSet
}
}
}
return n, nil
}
func (d *duplicateLabelCheckOperator) Series(ctx context.Context) ([]labels.Labels, error) {
if err := d.init(ctx); err != nil {
return nil, err
}
series, err := d.next.Series(ctx)
if err != nil {
return nil, err
}
return series, nil
}
func (d *duplicateLabelCheckOperator) Explain() (next []model.VectorOperator) {
return []model.VectorOperator{d.next}
}
func (d *duplicateLabelCheckOperator) String() string {
return "[duplicateLabelCheck]"
}
func (d *duplicateLabelCheckOperator) init(ctx context.Context) error {
var err error
d.once.Do(func() {
series, seriesErr := d.next.Series(ctx)
if seriesErr != nil {
err = seriesErr
return
}
m := make(map[uint64]int, len(series))
p := make([]pair, 0)
for i := range series {
h := series[i].Hash()
if j, ok := m[h]; ok {
p = append(p, pair{a: i, b: j})
} else {
m[h] = i
}
}
d.p = p
d.c = make([]uint64, len(series))
})
return err
}