Skip to content

Commit 158ba59

Browse files
committed
Add downsampling for profiling events
1 parent ed02380 commit 158ba59

File tree

2 files changed

+147
-23
lines changed

2 files changed

+147
-23
lines changed

exporter/elasticsearchexporter/internal/serializer/otelserializer/profile.go

Lines changed: 16 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -22,52 +22,45 @@ const (
2222

2323
// SerializeProfile serializes a profile and calls the `pushData` callback for each generated document.
2424
func SerializeProfile(resource pcommon.Resource, scope pcommon.InstrumentationScope, profile pprofile.Profile, pushData func(*bytes.Buffer, string, string) error) error {
25+
pushDataAsJSON := func(data any, id, index string) error {
26+
c, err := toJSON(data)
27+
if err != nil {
28+
return err
29+
}
30+
return pushData(c, id, index)
31+
}
32+
2533
data, err := serializeprofiles.Transform(resource, scope, profile)
2634
if err != nil {
2735
return err
2836
}
2937

3038
for _, payload := range data {
31-
if payload.StackTraceEvent.StackTraceID != "" {
32-
c, err := toJSON(payload.StackTraceEvent)
33-
if err != nil {
39+
event := payload.StackTraceEvent
40+
41+
if event.StackTraceID != "" {
42+
if err = pushDataAsJSON(event, "", allEventsIndex); err != nil {
3443
return err
3544
}
36-
err = pushData(c, "", allEventsIndex)
37-
if err != nil {
45+
if err = serializeprofiles.IndexDownsampledEvent(event, pushDataAsJSON); err != nil {
3846
return err
3947
}
4048
}
4149

4250
if payload.StackTrace.DocID != "" {
43-
c, err := toJSON(payload.StackTrace)
44-
if err != nil {
45-
return err
46-
}
47-
err = pushData(c, payload.StackTrace.DocID, stackTraceIndex)
48-
if err != nil {
51+
if err = pushDataAsJSON(payload.StackTrace, payload.StackTrace.DocID, stackTraceIndex); err != nil {
4952
return err
5053
}
5154
}
5255

5356
for _, stackFrame := range payload.StackFrames {
54-
c, err := toJSON(stackFrame)
55-
if err != nil {
56-
return err
57-
}
58-
err = pushData(c, stackFrame.DocID, stackFrameIndex)
59-
if err != nil {
57+
if err = pushDataAsJSON(stackFrame, stackFrame.DocID, stackFrameIndex); err != nil {
6058
return err
6159
}
6260
}
6361

6462
for _, executable := range payload.Executables {
65-
c, err := toJSON(executable)
66-
if err != nil {
67-
return err
68-
}
69-
err = pushData(c, executable.DocID, executablesIndex)
70-
if err != nil {
63+
if err = pushDataAsJSON(executable, executable.DocID, executablesIndex); err != nil {
7164
return err
7265
}
7366
}
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
package serializeprofiles
2+
3+
import (
4+
"fmt"
5+
"math/rand/v2"
6+
)
7+
8+
// ## Why do we need downsampling ?
9+
// For every (virtual) CPU core, the host agent (HA) retrieves 20 stacktrace events which are
10+
// stored as a timeseries in an ES index (name 'profiling-events'). With an increasing number
11+
// of hosts and/or an increasing number of cores the number of stored events per second
12+
// become high quickly. E.g. data from 10000 cores generate 846 million events per day.
13+
// Since users want to drill down into e.g. single hosts and/or single applications, we can't
14+
// reduce the amount of data in advance. Querying such amounts data is costly, even when using
15+
// highly specialised database backends - costly in terms of I/O, CPU. And this results in
16+
// increased latency - the user has to wait eventually a long time for his query results.
17+
// In order to reduce the costs and to keep the latency as low as possible, we add 'helper'
18+
// indexes with downsampled subsets of the stacktrace events.
19+
//
20+
// ## How does our downsampling approach work ?
21+
// The idea is to create downsampled indexes with factors of 5^N (5, 25, 125, 625, ...).
22+
// In the 5^1 index we would store 1/5th of the original events, in the 5^2 index we store
23+
// 1/25th of the original events and so on.
24+
// So each event has a probability of p=1/5=0.2 to also be stored in the next downsampled index.
25+
// Since we aggregate identical stacktrace events by timestamp when reported and stored, we have
26+
// a 'Count' value for each. To be statistically correct, we have to apply p=0.2 to each single
27+
// event independently and not just to the aggregate. We can do so by looping over 'Count' and
28+
// apply p=0.2 on every iteration to generate a new 'Count' value for the next downsampled index.
29+
// We only store aggregates with 'Count' > 0.
30+
//
31+
// At some point we decided that 20k events per query is good enough. With 5^N it means that we
32+
// possibly can end up with 5x more events (100k) from an index. As of this writing, retrieving
33+
// and processing of 100k events is still fast enough. While in Clickhouse we could further
34+
// downsample on-the-fly to get 20k, ES currently doesn't allow this (may change in the future).
35+
//
36+
// At query time we have to find the index that has enough data to be statistically sound,
37+
// without having too much data to avoid costs and latency. The code for that is implemented on
38+
// the read path (Kibana profiler plugin) and described there in detail.
39+
//
40+
// ## Example of a query / calculation
41+
// Let's imagine, a given query spans a time range of 7 days and would result in 100 million
42+
// events without down-sampling. But we only really need 20k events for a good enough result.
43+
// In the 5^1 downsampled index we have 5x less data - this still results in 20 millions events.
44+
// Going deeper we end up in the 5^5 downsampled index with 32k results - 5^4 would give us 160k
45+
// (too many) and 5^6 would give us 6.4k events (not enough).
46+
// We now read and process all 32k events from the 5^5 index. The counts for any aggregation
47+
// (TopN, Flamegraph, ...) needs to be multiplied by 5^5, which is an estimate of what we would
48+
// have found in the full events index (the not downsampled index).
49+
//
50+
// ## How deep do we have to downsample ?
51+
// The current code assumes an arbitrary upper limit of 100k CPU cores and a query time range
52+
// of 7 days. (Please be aware that we get 20 events per core per second only if the core is
53+
// 100% busy.)
54+
//
55+
// The condition is
56+
//
57+
// (100k * 86400 * 7 * 20) / 5^N in [20k, 100k-1]
58+
// ^-- max number of events per second
59+
// ^------ number of days
60+
// ^-------------- seconds per day
61+
// ^--------------------- number of cores
62+
//
63+
// For N=11 the condition is satisfied with a value of 24772.
64+
// In numbers, the 5^11 downsampled index holds 48828125x fewer entries than the full events table.
65+
//
66+
// ## What is the cost of downsampling ?
67+
// The additional cost in terms of storage size is
68+
//
69+
// 1/5^1 +1/5^2 + ... + 1/5^11 = 25%
70+
//
71+
// The same goes for the additional CPU cost on the write path.
72+
//
73+
// The average benefit on the read/query path depends on the query. But it seems that in average
74+
// a factor of few hundred to a few thousand in terms of I/O, CPU and latency can be achieved.
75+
const (
76+
MaxEventsIndexes = 11
77+
SamplingFactor = 5
78+
SamplingRatio = 1.0 / float64(SamplingFactor)
79+
80+
eventsIndexPrefix = "profiling-events"
81+
)
82+
83+
var eventIndices = initEventIndexes(MaxEventsIndexes)
84+
85+
// initEventIndexes initializes eventIndexes to avoid calculations for every TraceEvent later.
86+
func initEventIndexes(count int) []string {
87+
indices := make([]string, 0, count)
88+
89+
for i := range count {
90+
indices = append(indices, fmt.Sprintf("%s-%dpow%02d",
91+
eventsIndexPrefix, SamplingFactor, i+1))
92+
}
93+
94+
return indices
95+
}
96+
97+
func IndexDownsampledEvent(event StackTraceEvent, pushData func(any, string, string) error) error {
98+
origCount := event.Count
99+
defer func() { event.Count = origCount }()
100+
101+
// Each event has a probability of p=1/5=0.2 to go from one index into the next downsampled
102+
// index. Since we aggregate identical stacktrace events by timestamp when reported and stored,
103+
// we have a 'Count' value for each. To be statistically correct, we have to apply p=0.2 to
104+
// each single stacktrace event independently and not just to the aggregate. We can do so by
105+
// looping over 'Count' and apply p=0.2 on every iteration to generate a new 'Count' value for
106+
// the next downsampled index.
107+
// We only store aggregates with 'Count' > 0. If 'Count' becomes 0, we are done and can
108+
// continue with the next stacktrace event.
109+
for _, index := range eventIndices {
110+
var count uint16
111+
for range event.Count {
112+
// samplingRatio is the probability p=0.2 for an event to be copied into the next
113+
// downsampled index.
114+
if rand.Float64() < SamplingRatio {
115+
count++
116+
}
117+
}
118+
if count == 0 {
119+
return nil
120+
}
121+
122+
// Store the event with its new downsampled count in the downsampled index.
123+
event.Count = count
124+
125+
if err := pushData(event, "", index); err != nil {
126+
return err
127+
}
128+
}
129+
130+
return nil
131+
}

0 commit comments

Comments
 (0)