Skip to content

Commit 3b43f3b

Browse files
committed
Convert OTel histograms to values/counts
1 parent f1dd213 commit 3b43f3b

File tree

5 files changed

+807
-0
lines changed

5 files changed

+807
-0
lines changed
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
# Histograms
2+
3+
This package holds common CloudWatch histogram functionality for AWS owned OpenTelemetry components.
4+
5+
## Visualize histogram mappings
6+
7+
1. Remove `t.Skip(...)` from `TestWriteInputHistograms` and run the test to generate json files for the input histograms.
8+
1. Remove `t.Skip(...)` from `TestWriteConvertedHistograms` and run the test to generate json files for the converted histograms.
9+
1. Run `histogram_mapping.py` to generate visualizations
10+
11+
```bash
12+
pip install matplotlib numpy
13+
python histogram_mappings.py
14+
````
Lines changed: 317 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,317 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package histograms // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/aws/cloudwatch/histograms"
5+
6+
import (
7+
"math"
8+
9+
"go.opentelemetry.io/collector/pdata/pmetric"
10+
11+
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/aws/cloudwatch"
12+
)
13+
14+
type ExponentialMapping struct {
15+
maximum float64
16+
minimum float64
17+
sampleCount float64
18+
sum float64
19+
values []float64
20+
counts []float64
21+
}
22+
23+
var _ (cloudwatch.HistogramDataPoint) = (*ExponentialMapping)(nil)
24+
25+
// ConvertOTelToCloudWatch converts an OpenTelemetry histogram datapoint to a CloudWatch histogram datapoint using
26+
// exponential mapping
27+
func ConvertOTelToCloudWatch(dp pmetric.HistogramDataPoint) cloudwatch.HistogramDataPoint {
28+
// maximumInnerBucketCount is the maximum number of inner buckets that each outer bucket can be represented with
29+
//
30+
// A larger values increase the resolution at which the data is sub-sampled while also incurring additional memory
31+
// allocation, processing time, and the maximum number of value/count pairs that are sent to CloudWatch which could
32+
// cause a CloudWatch PutMetricData / PutLogEvent request to be split into multiple requests due to the 100/150
33+
// metric datapoint limit.
34+
const maximumInnerBucketCount = 10
35+
36+
// No validations - assuming valid input histogram
37+
38+
em := &ExponentialMapping{
39+
maximum: dp.Max(),
40+
minimum: dp.Min(),
41+
sampleCount: float64(dp.Count()),
42+
sum: dp.Sum(),
43+
}
44+
45+
// bounds specifies the boundaries between buckets
46+
// bucketCounts specifies the number of datapoints in each bucket
47+
// there is always 1 more bucket count than there is boundaries
48+
// len(bucketCounts) = len(bounds) + 1
49+
bounds := dp.ExplicitBounds()
50+
lenBounds := bounds.Len()
51+
bucketCounts := dp.BucketCounts()
52+
lenBucketCounts := bucketCounts.Len()
53+
54+
// Special case: no boundaries implies a single bucket
55+
if lenBounds == 0 {
56+
em.counts = append(em.counts, float64(bucketCounts.At(0))) // recall that len(bucketCounts) = len(bounds)+1
57+
switch {
58+
case dp.HasMax() && dp.HasMin():
59+
em.values = append(em.values, em.minimum/2.0+em.maximum/2.0)
60+
case dp.HasMax():
61+
em.values = append(em.values, em.maximum) // only data point we have is the maximum
62+
case dp.HasMin():
63+
em.values = append(em.values, em.minimum) // only data point we have is the minimum
64+
default:
65+
em.values = append(em.values, 0) // arbitrary value
66+
}
67+
return em
68+
}
69+
70+
// To create inner buckets, all outer buckets need to have defined boundaries. The first and last bucket use the
71+
// min and max and their lower and upper bounds respectively. The min and max are optional on the OTel datapoint.
72+
// When min and max are not defined, make some reasonable about about what the min/max could be
73+
if !dp.HasMin() {
74+
// Find the first bucket which contains some data points. The min must be in that bucket
75+
minBucketIdx := 0
76+
for i := 0; i < lenBucketCounts; i++ {
77+
if bucketCounts.At(i) > 0 {
78+
minBucketIdx = i
79+
break
80+
}
81+
}
82+
83+
// take the lower bound of the bucket. lower bound of bucket index n is boundary index n-1
84+
if minBucketIdx != 0 {
85+
em.minimum = bounds.At(minBucketIdx - 1)
86+
} else {
87+
bucketWidth := 0.001 // arbitrary width - there's no information about this histogram to make an inference with if there are no bounds
88+
if lenBounds > 1 {
89+
bucketWidth = bounds.At(1) - bounds.At(0)
90+
}
91+
em.minimum = bounds.At(0) - bucketWidth
92+
93+
// if all boundaries are positive, assume all data is positive. this covers use cases where Prometheus
94+
// histogram metrics for non-zero values like request durations have their first bucket start at 0. for
95+
// these metrics, a negative minimum will cause percentile metrics to be unavailable
96+
if bounds.At(0) >= 0 {
97+
em.minimum = max(em.minimum, 0.0)
98+
}
99+
}
100+
}
101+
102+
if !dp.HasMax() {
103+
// Find the last bucket with some data in it. The max must be in that bucket
104+
maxBucketIdx := lenBounds - 1
105+
for i := lenBucketCounts - 1; i >= 0; i-- {
106+
if bucketCounts.At(i) > 0 {
107+
maxBucketIdx = i
108+
break
109+
}
110+
}
111+
112+
// we want the upper bound of the bucket. the upper bound of bucket index n is boundary index n
113+
if maxBucketIdx <= lenBounds-1 {
114+
em.maximum = bounds.At(maxBucketIdx)
115+
} else {
116+
bucketWidth := 0.01 // arbitrary width - there's no information about this histogram to make an inference with
117+
if lenBounds > 1 {
118+
bucketWidth = bounds.At(lenBounds-1) - bounds.At(lenBounds-2)
119+
}
120+
em.maximum = bounds.At(lenBounds-1) + bucketWidth
121+
}
122+
}
123+
124+
// Pre-calculate total output size to avoid dynamic growth
125+
totalOutputSize := 0
126+
for i := 0; i < lenBucketCounts; i++ {
127+
sampleCount := bucketCounts.At(i)
128+
if sampleCount > 0 {
129+
totalOutputSize += int(min(sampleCount, maximumInnerBucketCount))
130+
}
131+
}
132+
if totalOutputSize == 0 {
133+
// No samples in any bucket
134+
return em
135+
}
136+
137+
em.values = make([]float64, 0, totalOutputSize)
138+
em.counts = make([]float64, 0, totalOutputSize)
139+
140+
for i := 0; i < lenBucketCounts; i++ {
141+
sampleCount := int(bucketCounts.At(i))
142+
if sampleCount == 0 {
143+
// No need to operate on a bucket with no samples
144+
continue
145+
}
146+
147+
lowerBound := em.minimum
148+
if i > 0 {
149+
lowerBound = bounds.At(i - 1)
150+
}
151+
upperBound := em.maximum
152+
if i < lenBucketCounts-1 {
153+
upperBound = bounds.At(i)
154+
}
155+
156+
// This algorithm creates "inner buckets" between user-defined bucket based on the sample count, up to a
157+
// maximum. A logarithmic ratio (named "magnitude") compares the density between the current bucket and the
158+
// next bucket. This logarithmic ratio is used to decide how to spread samples amongst inner buckets.
159+
//
160+
// case 1: magnitude < 0
161+
// * What this means: Current bucket is denser than the next bucket -> density is decreasing.
162+
// * What we do: Use inverse quadratic distribution to spread the samples. This allocates more samples towards
163+
// the lower bound of the bucket.
164+
// case 2: 0 <= magnitude < 1
165+
// * What this means: Current bucket and next bucket has similar densities -> density is not changing much.
166+
// * What we do: Use inform distribution to spread the samples. Extra samples that can't be spread evenly are
167+
// (arbitrarily) allocated towards the start of the bucket.
168+
// case 3: 1 <= magnitude
169+
// * What this means: Current bucket is less dense than the next bucket -> density is increasing.
170+
// * What we do: Use quadratic distribution to spread the samples. This allocates more samples toward the end
171+
// of the bucket.
172+
//
173+
// As a small optimization, we omit the logarithm invocation and change the thresholds.
174+
ratio := 0.0
175+
if i < lenBucketCounts-1 {
176+
nextSampleCount := bucketCounts.At(i + 1)
177+
// If next bucket is empty, than density is surely decreasing
178+
if nextSampleCount == 0 {
179+
ratio = 0.0
180+
} else {
181+
var nextUpperBound float64
182+
if i+1 == lenBucketCounts-1 {
183+
nextUpperBound = em.maximum
184+
} else {
185+
nextUpperBound = bounds.At(i + 1)
186+
}
187+
188+
// original calculations for reference
189+
// currentBucketDensity := float64(sampleCount) / (upperBound - lowerBound)
190+
// nextBucketDensity := float64(nextSampleCount) / (nextUpperBound - upperBound)
191+
// ratio = nextBucketDensity / currentBucketDensity
192+
//
193+
// the following calculations are the same but improves speed by ~1% in benchmark tests
194+
denom := (nextUpperBound - upperBound) * float64(sampleCount)
195+
numerator := (upperBound - lowerBound) * float64(nextSampleCount)
196+
ratio = numerator / denom
197+
}
198+
}
199+
200+
// innerBucketCount is how many "inner buckets" to spread the sample count amongst
201+
innerBucketCount := min(sampleCount, maximumInnerBucketCount)
202+
delta := (upperBound - lowerBound) / float64(innerBucketCount)
203+
204+
switch {
205+
case ratio < 1.0/math.E: // magnitude < 0: Use -yx^2 (inverse quadratic)
206+
sigma := float64(sumOfSquares(innerBucketCount))
207+
epsilon := float64(sampleCount) / sigma
208+
entryStart := len(em.counts)
209+
210+
runningSum := 0
211+
for j := 0; j < innerBucketCount; j++ {
212+
innerBucketSampleCount := epsilon * float64((j-innerBucketCount)*(j-innerBucketCount))
213+
innerBucketSampleCountAdjusted := int(math.Floor(innerBucketSampleCount))
214+
if innerBucketSampleCountAdjusted > 0 {
215+
runningSum += innerBucketSampleCountAdjusted
216+
em.values = append(em.values, lowerBound+delta*float64(j+1))
217+
em.counts = append(em.counts, float64(innerBucketSampleCountAdjusted))
218+
}
219+
}
220+
221+
// distribute the remainder towards the front
222+
remainder := sampleCount - runningSum
223+
// make sure there's room for the remainder
224+
if len(em.counts) < entryStart+remainder {
225+
em.counts = append(em.counts, make([]float64, remainder)...)
226+
em.values = append(em.values, make([]float64, remainder)...)
227+
}
228+
for j := 0; j < remainder; j++ {
229+
em.counts[entryStart]++
230+
entryStart++
231+
}
232+
case ratio < math.E: // 0 <= magnitude < 1: Use x
233+
// Distribute samples evenly with integer counts
234+
baseCount := sampleCount / innerBucketCount
235+
remainder := sampleCount % innerBucketCount
236+
for j := 1; j <= innerBucketCount; j++ {
237+
count := baseCount
238+
239+
// Distribute remainder to first few buckets
240+
if j <= remainder {
241+
count++
242+
}
243+
em.values = append(em.values, lowerBound+delta*float64(j))
244+
em.counts = append(em.counts, float64(count))
245+
}
246+
default: // magnitude >= 1: Use yx^2 (quadratic)
247+
sigma := float64(sumOfSquares(innerBucketCount))
248+
epsilon := float64(sampleCount) / sigma
249+
entryStart := len(em.counts)
250+
251+
runningSum := 0
252+
for j := 1; j <= innerBucketCount; j++ {
253+
innerBucketSampleCount := epsilon * float64(j*j)
254+
innerBucketSampleCountAdjusted := int(math.Floor(innerBucketSampleCount))
255+
if innerBucketSampleCountAdjusted > 0 {
256+
runningSum += innerBucketSampleCountAdjusted
257+
em.values = append(em.values, lowerBound+delta*float64(j))
258+
em.counts = append(em.counts, float64(innerBucketSampleCountAdjusted))
259+
}
260+
}
261+
262+
// distribute the remainder towards the end
263+
remainder := sampleCount - runningSum
264+
// make sure there's room for the remainder
265+
if len(em.counts) < entryStart+remainder {
266+
em.counts = append(em.counts, make([]float64, remainder)...)
267+
em.values = append(em.values, make([]float64, remainder)...)
268+
}
269+
entryStart = len(em.counts) - 1
270+
for j := 0; j < remainder; j++ {
271+
em.counts[entryStart]++
272+
entryStart--
273+
}
274+
}
275+
}
276+
277+
// Move last entry to maximum if needed
278+
if dp.HasMax() && len(em.values) > 0 {
279+
lastIdx := len(em.values) - 1
280+
for i := lastIdx; i >= 0; i-- {
281+
if em.counts[i] > 0 {
282+
lastIdx = i
283+
break
284+
}
285+
}
286+
em.values[lastIdx] = em.maximum
287+
em.values = em.values[:lastIdx+1]
288+
em.counts = em.counts[:lastIdx+1]
289+
}
290+
291+
return em
292+
}
293+
294+
func (em *ExponentialMapping) ValuesAndCounts() ([]float64, []float64) {
295+
return em.values, em.counts
296+
}
297+
298+
func (em *ExponentialMapping) Minimum() float64 {
299+
return em.minimum
300+
}
301+
302+
func (em *ExponentialMapping) Maximum() float64 {
303+
return em.maximum
304+
}
305+
306+
func (em *ExponentialMapping) SampleCount() float64 {
307+
return em.sampleCount
308+
}
309+
310+
func (em *ExponentialMapping) Sum() float64 {
311+
return em.sum
312+
}
313+
314+
// sumOfSquares is a closed form calculation of Σx^2, for 1 to n
315+
func sumOfSquares(n int) int {
316+
return n * (n + 1) * (2*n + 1) / 6
317+
}

0 commit comments

Comments
 (0)