-
Notifications
You must be signed in to change notification settings - Fork 107
/
Copy pathdelta_distribution.go
172 lines (142 loc) · 5.31 KB
/
delta_distribution.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
// Copyright 2022 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package collectors
import (
"fmt"
"sort"
"strings"
"sync"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"google.golang.org/genproto/googleapis/api/metric"
)
type CollectedHistogram struct {
histogram *HistogramMetric
lastCollectedAt time.Time
}
// DeltaDistributionStore defines a set of functions which must be implemented in order to be used as a DeltaDistributionStore
// which accumulates DELTA histogram metrics over time
type DeltaDistributionStore interface {
// Increment will use the incoming metricDescriptor and currentValue to either create a new entry or add the incoming
// value to an existing entry in the underlying store
Increment(metricDescriptor *metric.MetricDescriptor, currentValue *HistogramMetric)
// ListMetrics will return all known entries in the store for a metricDescriptorName
ListMetrics(metricDescriptorName string) map[string][]*CollectedHistogram
}
type histogramEntry struct {
collected map[uint64]*CollectedHistogram
mutex *sync.RWMutex
}
type inMemoryDeltaDistributionStore struct {
store *sync.Map
ttl time.Duration
logger log.Logger
}
// NewInMemoryDeltaDistributionStore returns an implementation of DeltaDistributionStore which is persisted in-memory
func NewInMemoryDeltaDistributionStore(logger log.Logger, ttl time.Duration) DeltaDistributionStore {
return &inMemoryDeltaDistributionStore{
store: &sync.Map{},
logger: logger,
ttl: ttl,
}
}
func (s *inMemoryDeltaDistributionStore) Increment(metricDescriptor *metric.MetricDescriptor, currentValue *HistogramMetric) {
if currentValue == nil {
return
}
tmp, _ := s.store.LoadOrStore(metricDescriptor.Name, &histogramEntry{
collected: map[uint64]*CollectedHistogram{},
mutex: &sync.RWMutex{},
})
entry := tmp.(*histogramEntry)
key := toHistogramKey(currentValue)
entry.mutex.Lock()
defer entry.mutex.Unlock()
existing := entry.collected[key]
if existing == nil {
level.Debug(s.logger).Log("msg", "Tracking new histogram", "fqName", currentValue.fqName, "key", key, "incoming_time", currentValue.reportTime)
entry.collected[key] = &CollectedHistogram{histogram: currentValue, lastCollectedAt: time.Now()}
return
}
if existing.histogram.reportTime.Before(currentValue.reportTime) {
level.Debug(s.logger).Log("msg", "Incrementing existing histogram", "fqName", currentValue.fqName, "key", key, "last_reported_time", existing.histogram.reportTime, "incoming_time", currentValue.reportTime)
existing.histogram = mergeHistograms(existing.histogram, currentValue)
existing.lastCollectedAt = time.Now()
return
}
level.Debug(s.logger).Log("msg", "Ignoring old sample for histogram", "fqName", currentValue.fqName, "key", key, "last_reported_time", existing.histogram.reportTime, "incoming_time", currentValue.reportTime)
}
func toHistogramKey(hist *HistogramMetric) uint64 {
labels := make(map[string]string)
keysCopy := append([]string{}, hist.labelKeys...)
for i := range hist.labelKeys {
labels[hist.labelKeys[i]] = hist.labelValues[i]
}
sort.Strings(keysCopy)
var keyParts []string
for _, k := range keysCopy {
keyParts = append(keyParts, fmt.Sprintf("%s:%s", k, labels[k]))
}
hashText := fmt.Sprintf("%s|%s", hist.fqName, strings.Join(keyParts, "|"))
h := hashNew()
h = hashAdd(h, hashText)
return h
}
func mergeHistograms(existing *HistogramMetric, current *HistogramMetric) *HistogramMetric {
for key, value := range existing.buckets {
current.buckets[key] += value
}
// Calculate a new mean and overall count
mean := existing.dist.Mean
mean += current.dist.Mean
mean /= 2
var count uint64
for _, v := range current.buckets {
count += v
}
current.dist.Mean = mean
current.dist.Count = int64(count)
return current
}
func (s *inMemoryDeltaDistributionStore) ListMetrics(metricDescriptorName string) map[string][]*CollectedHistogram {
output := map[string][]*CollectedHistogram{}
now := time.Now()
ttlWindowStart := now.Add(-s.ttl)
tmp, exists := s.store.Load(metricDescriptorName)
if !exists {
return output
}
entry := tmp.(*histogramEntry)
entry.mutex.Lock()
defer entry.mutex.Unlock()
for key, collected := range entry.collected {
//Scan and remove metrics which are outside the TTL
if ttlWindowStart.After(collected.lastCollectedAt) {
level.Debug(s.logger).Log("msg", "Deleting histogram entry outside of TTL", "key", key, "fqName", collected.histogram.fqName)
delete(entry.collected, key)
continue
}
metrics, exists := output[collected.histogram.fqName]
if !exists {
metrics = make([]*CollectedHistogram, 0)
}
histCopy := *collected.histogram
outputEntry := CollectedHistogram{
histogram: &histCopy,
lastCollectedAt: collected.lastCollectedAt,
}
output[collected.histogram.fqName] = append(metrics, &outputEntry)
}
return output
}