Skip to content

Commit 7d140d1

Browse files
authored
[processor/spanpruning] Add histogram support for latency distribution on aggregated summary spans (#47604)
#### Description The span pruning processor currently emits min/max/avg/total duration statistics on summary spans, but these aggregates hide the shape of the latency distribution. A flat average can mask bimodal distributions and tail latency problems that are critical for debugging. This PR adds configurable cumulative histogram buckets to summary spans, giving users latency distribution visibility without requiring a separate metrics pipeline. Bucket boundaries default to the OpenTelemetry SDK explicit bucket histogram boundaries (`[5ms, 10ms, 25ms, 50ms, 100ms, 250ms, 500ms, 1s, 2.5s, 5s, 10s]`) and can be customized or disabled entirely by setting `aggregation_histogram_buckets` to an empty list. Summary spans gain two new slice attributes: - `<prefix>histogram_bucket_bounds_s` -- upper bounds in seconds (excludes +Inf) - `<prefix>histogram_bucket_counts` -- cumulative counts per bucket (includes +Inf) Cumulative semantics were chosen to match Prometheus/OpenTelemetry histogram conventions, where each bucket count includes all observations at or below that boundary. #### Testing - Unit tests for cumulative bucket counting logic and the disabled-histogram path (`stats_test.go`) - Config validation tests covering valid buckets, negative values, unsorted values, and empty list (`config_test.go`) - Integration tests exercising the full processor pipeline with histogram attributes enabled and disabled, asserting exact bucket bounds and counts on output summary spans (`processor_test.go`) - YAML unmarshal test verifying custom bucket configuration round-trips through `TestLoadConfig` #### Documentation - Added `aggregation_histogram_buckets` to the configuration table in README - Added `histogram_bucket_bounds_s` and `histogram_bucket_counts` to the summary span attributes table - Added a "Histogram Buckets" section with a worked example showing cumulative bucket semantics
1 parent a0eac20 commit 7d140d1

10 files changed

Lines changed: 334 additions & 11 deletions

File tree

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog)
7+
component: processor/spanpruning
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add cumulative histogram bucket support for latency distribution on aggregated summary spans
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [47277]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

processor/spanpruningprocessor/README.md

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,11 @@ processors:
6464
# Prefix for aggregation statistics attributes
6565
# Default: "aggregation."
6666
aggregation_attribute_prefix: "batch."
67+
68+
# Upper bounds for histogram buckets (latency distribution)
69+
# Default: [5ms, 10ms, 25ms, 50ms, 100ms, 250ms, 500ms, 1s, 2.5s, 5s, 10s]
70+
# Set to empty list to disable histogram attributes
71+
aggregation_histogram_buckets: [10ms, 50ms, 100ms, 500ms, 1s]
6772
```
6873
6974
## Configuration Options
@@ -74,6 +79,7 @@ processors:
7479
| `min_spans_to_aggregate` | int | 5 | Minimum group size before aggregation occurs |
7580
| `max_parent_depth` | int | 1 | Max depth of parent aggregation (0=none, -1=unlimited) |
7681
| `aggregation_attribute_prefix` | string | "aggregation." | Prefix for aggregation statistics attributes |
82+
| `aggregation_histogram_buckets` | []time.Duration | `[5ms, 10ms, 25ms, 50ms, 100ms, 250ms, 500ms, 1s, 2.5s, 5s, 10s]` | Upper bounds for latency histogram buckets |
7783

7884
### Glob Pattern Support
7985

@@ -129,6 +135,20 @@ The following attributes are added to the summary span (shown with default `aggr
129135
| `<prefix>duration_max_ns` | int64 | Maximum duration in nanoseconds |
130136
| `<prefix>duration_avg_ns` | int64 | Average duration in nanoseconds |
131137
| `<prefix>duration_total_ns` | int64 | Total duration in nanoseconds |
138+
| `<prefix>histogram_bucket_bounds_s` | []float64 | Bucket upper bounds in seconds (excludes +Inf) |
139+
| `<prefix>histogram_bucket_counts` | []int64 | Cumulative count per bucket (includes +Inf bucket) |
140+
141+
### Histogram Buckets
142+
143+
When `aggregation_histogram_buckets` is configured, summary spans include latency distribution data as cumulative histogram buckets. Cumulative means each bucket count includes all spans with duration less than or equal to that bucket boundary.
144+
145+
**Worked example** with buckets `[10ms, 50ms, 100ms]` and span durations `[5ms, 15ms, 25ms, 75ms, 150ms]`:
146+
- `histogram_bucket_bounds_s`: `[0.01, 0.05, 0.1]`
147+
- `histogram_bucket_counts`: `[1, 3, 4, 5]`
148+
- Bucket 0 (<=10ms): 1 span (5ms)
149+
- Bucket 1 (<=50ms): 3 spans (5ms, 15ms, 25ms)
150+
- Bucket 2 (<=100ms): 4 spans (5ms, 15ms, 25ms, 75ms)
151+
- Bucket 3 (+Inf): 5 spans (all spans)
132152

133153
## Pipeline Placement
134154

processor/spanpruningprocessor/aggregation.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"encoding/binary"
88
"math/rand/v2"
99
"sort"
10+
"time"
1011

1112
"go.opentelemetry.io/collector/pdata/pcommon"
1213
"go.opentelemetry.io/collector/pdata/ptrace"
@@ -174,5 +175,20 @@ func (p *spanPruningProcessor) createSummarySpanWithParent(group aggregationGrou
174175
newSpan.Attributes().PutInt(prefix+"duration_avg_ns", int64(data.sumDuration)/data.count)
175176
}
176177

178+
// Add histogram attributes if enabled.
179+
if len(p.config.AggregationHistogramBuckets) > 0 {
180+
// Add bucket bounds in seconds.
181+
bucketBoundsSlice := newSpan.Attributes().PutEmptySlice(prefix + "histogram_bucket_bounds_s")
182+
for _, bucket := range p.config.AggregationHistogramBuckets {
183+
bucketBoundsSlice.AppendEmpty().SetDouble(float64(bucket) / float64(time.Second))
184+
}
185+
186+
// Add cumulative bucket counts.
187+
bucketCountsSlice := newSpan.Attributes().PutEmptySlice(prefix + "histogram_bucket_counts")
188+
for _, count := range data.bucketCounts {
189+
bucketCountsSlice.AppendEmpty().SetInt(count)
190+
}
191+
}
192+
177193
return newSpan
178194
}

processor/spanpruningprocessor/config.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"errors"
88
"fmt"
99
"strings"
10+
"time"
1011

1112
"github.com/gobwas/glob"
1213
"go.opentelemetry.io/collector/component"
@@ -40,6 +41,10 @@ type Config struct {
4041
// are added to summary spans.
4142
// Default: "aggregation."
4243
AggregationAttributePrefix string `mapstructure:"aggregation_attribute_prefix"`
44+
45+
// AggregationHistogramBuckets lists cumulative histogram bucket upper bounds
46+
// for latency tracking on aggregated spans. Empty slice disables histograms.
47+
AggregationHistogramBuckets []time.Duration `mapstructure:"aggregation_histogram_buckets"`
4348
}
4449

4550
var _ component.Config = (*Config)(nil)
@@ -75,5 +80,15 @@ func (cfg *Config) Validate() error {
7580
}
7681
}
7782

83+
// Validate histogram buckets
84+
for i, bucket := range cfg.AggregationHistogramBuckets {
85+
if bucket <= 0 {
86+
return errors.New("histogram bucket values must be positive")
87+
}
88+
if i > 0 && bucket <= cfg.AggregationHistogramBuckets[i-1] {
89+
return errors.New("histogram buckets must be sorted in ascending order")
90+
}
91+
}
92+
7893
return nil
7994
}

processor/spanpruningprocessor/config_test.go

Lines changed: 83 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package spanpruningprocessor
66
import (
77
"path/filepath"
88
"testing"
9+
"time"
910

1011
"github.com/stretchr/testify/assert"
1112
"github.com/stretchr/testify/require"
@@ -15,6 +16,28 @@ import (
1516
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/spanpruningprocessor/internal/metadata"
1617
)
1718

19+
var defaultHistogramBuckets = []time.Duration{
20+
5 * time.Millisecond,
21+
10 * time.Millisecond,
22+
25 * time.Millisecond,
23+
50 * time.Millisecond,
24+
100 * time.Millisecond,
25+
250 * time.Millisecond,
26+
500 * time.Millisecond,
27+
time.Second,
28+
2500 * time.Millisecond,
29+
5 * time.Second,
30+
10 * time.Second,
31+
}
32+
33+
var customHistogramBuckets = []time.Duration{
34+
10 * time.Millisecond,
35+
50 * time.Millisecond,
36+
100 * time.Millisecond,
37+
500 * time.Millisecond,
38+
time.Second,
39+
}
40+
1841
func TestLoadConfig(t *testing.T) {
1942
t.Parallel()
2043

@@ -26,19 +49,21 @@ func TestLoadConfig(t *testing.T) {
2649
{
2750
id: component.NewIDWithName(metadata.Type, ""),
2851
expected: &Config{
29-
GroupByAttributes: []string{"db.operation"},
30-
MinSpansToAggregate: 5,
31-
MaxParentDepth: 1,
32-
AggregationAttributePrefix: "aggregation.",
52+
GroupByAttributes: []string{"db.operation"},
53+
MinSpansToAggregate: 5,
54+
MaxParentDepth: 1,
55+
AggregationAttributePrefix: "aggregation.",
56+
AggregationHistogramBuckets: defaultHistogramBuckets,
3357
},
3458
},
3559
{
3660
id: component.NewIDWithName(metadata.Type, "custom"),
3761
expected: &Config{
38-
GroupByAttributes: []string{"db.operation", "db.name"},
39-
MinSpansToAggregate: 3,
40-
MaxParentDepth: 1,
41-
AggregationAttributePrefix: "batch.",
62+
GroupByAttributes: []string{"db.operation", "db.name"},
63+
MinSpansToAggregate: 3,
64+
MaxParentDepth: 1,
65+
AggregationAttributePrefix: "batch.",
66+
AggregationHistogramBuckets: customHistogramBuckets,
4267
},
4368
},
4469
}
@@ -155,6 +180,56 @@ func TestConfig_Validate(t *testing.T) {
155180
},
156181
expectError: false,
157182
},
183+
{
184+
name: "valid histogram buckets",
185+
config: &Config{
186+
MinSpansToAggregate: 2,
187+
AggregationAttributePrefix: "aggregation.",
188+
GroupByAttributes: []string{"db.operation"},
189+
AggregationHistogramBuckets: []time.Duration{
190+
10 * time.Millisecond,
191+
50 * time.Millisecond,
192+
100 * time.Millisecond,
193+
},
194+
},
195+
expectError: false,
196+
},
197+
{
198+
name: "negative histogram bucket value",
199+
config: &Config{
200+
MinSpansToAggregate: 2,
201+
AggregationAttributePrefix: "aggregation.",
202+
GroupByAttributes: []string{"db.operation"},
203+
AggregationHistogramBuckets: []time.Duration{
204+
10 * time.Millisecond,
205+
-1 * time.Millisecond,
206+
},
207+
},
208+
expectError: true,
209+
},
210+
{
211+
name: "unsorted histogram buckets",
212+
config: &Config{
213+
MinSpansToAggregate: 2,
214+
AggregationAttributePrefix: "aggregation.",
215+
GroupByAttributes: []string{"db.operation"},
216+
AggregationHistogramBuckets: []time.Duration{
217+
100 * time.Millisecond,
218+
50 * time.Millisecond,
219+
},
220+
},
221+
expectError: true,
222+
},
223+
{
224+
name: "empty histogram buckets",
225+
config: &Config{
226+
MinSpansToAggregate: 2,
227+
AggregationAttributePrefix: "aggregation.",
228+
GroupByAttributes: []string{"db.operation"},
229+
AggregationHistogramBuckets: []time.Duration{},
230+
},
231+
expectError: false,
232+
},
158233
}
159234
for _, tt := range tests {
160235
t.Run(tt.name, func(t *testing.T) {

processor/spanpruningprocessor/factory.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package spanpruningprocessor // import "github.com/open-telemetry/opentelemetry-
55

66
import (
77
"context"
8+
"time"
89

910
"go.opentelemetry.io/collector/component"
1011
"go.opentelemetry.io/collector/consumer"
@@ -29,6 +30,19 @@ func createDefaultConfig() component.Config {
2930
MinSpansToAggregate: 5,
3031
MaxParentDepth: 1,
3132
AggregationAttributePrefix: "aggregation.",
33+
AggregationHistogramBuckets: []time.Duration{
34+
5 * time.Millisecond,
35+
10 * time.Millisecond,
36+
25 * time.Millisecond,
37+
50 * time.Millisecond,
38+
100 * time.Millisecond,
39+
250 * time.Millisecond,
40+
500 * time.Millisecond,
41+
time.Second,
42+
2500 * time.Millisecond,
43+
5 * time.Second,
44+
10 * time.Second,
45+
},
3246
}
3347
}
3448

processor/spanpruningprocessor/processor_test.go

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package spanpruningprocessor
66
import (
77
"strings"
88
"testing"
9+
"time"
910

1011
"github.com/stretchr/testify/assert"
1112
"github.com/stretchr/testify/require"
@@ -246,6 +247,88 @@ func TestLeafSpanPruning_DurationStats(t *testing.T) {
246247
assert.Equal(t, int64(600), totalDuration.Int())
247248
}
248249

250+
func TestLeafSpanPruningProcessorWithHistogram(t *testing.T) {
251+
factory := NewFactory()
252+
cfg := factory.CreateDefaultConfig().(*Config)
253+
cfg.MinSpansToAggregate = 2
254+
cfg.AggregationHistogramBuckets = []time.Duration{
255+
10 * time.Millisecond,
256+
50 * time.Millisecond,
257+
100 * time.Millisecond,
258+
}
259+
260+
tp, err := factory.CreateTraces(t.Context(), processortest.NewNopSettings(metadata.Type), cfg, consumertest.NewNop())
261+
require.NoError(t, err)
262+
263+
td := createTestTraceWithKnownDurations(t, []int64{
264+
int64(5 * time.Millisecond),
265+
int64(15 * time.Millisecond),
266+
int64(25 * time.Millisecond),
267+
int64(75 * time.Millisecond),
268+
int64(150 * time.Millisecond),
269+
})
270+
271+
err = tp.ConsumeTraces(t.Context(), td)
272+
require.NoError(t, err)
273+
274+
assert.Equal(t, 2, countSpans(td))
275+
276+
summarySpan, found := findSummarySpan(td)
277+
require.True(t, found)
278+
279+
attrs := summarySpan.Attributes()
280+
281+
bounds, exists := attrs.Get("aggregation.histogram_bucket_bounds_s")
282+
require.True(t, exists)
283+
require.Equal(t, 3, bounds.Slice().Len())
284+
expectedBounds := []float64{0.01, 0.05, 0.1}
285+
for i, expected := range expectedBounds {
286+
assert.InDelta(t, expected, bounds.Slice().At(i).Double(), 1e-9)
287+
}
288+
289+
counts, exists := attrs.Get("aggregation.histogram_bucket_counts")
290+
require.True(t, exists)
291+
expectedCounts := []int64{1, 3, 4, 5}
292+
require.Equal(t, len(expectedCounts), counts.Slice().Len())
293+
for i, expected := range expectedCounts {
294+
assert.Equal(t, expected, counts.Slice().At(i).Int())
295+
}
296+
}
297+
298+
func TestLeafSpanPruningProcessorWithHistogramDisabled(t *testing.T) {
299+
factory := NewFactory()
300+
cfg := factory.CreateDefaultConfig().(*Config)
301+
cfg.MinSpansToAggregate = 2
302+
cfg.AggregationHistogramBuckets = []time.Duration{}
303+
304+
tp, err := factory.CreateTraces(t.Context(), processortest.NewNopSettings(metadata.Type), cfg, consumertest.NewNop())
305+
require.NoError(t, err)
306+
307+
td := createTestTraceWithKnownDurations(t, []int64{
308+
int64(5 * time.Millisecond),
309+
int64(15 * time.Millisecond),
310+
int64(25 * time.Millisecond),
311+
int64(75 * time.Millisecond),
312+
int64(150 * time.Millisecond),
313+
})
314+
315+
err = tp.ConsumeTraces(t.Context(), td)
316+
require.NoError(t, err)
317+
318+
assert.Equal(t, 2, countSpans(td))
319+
320+
summarySpan, found := findSummarySpan(td)
321+
require.True(t, found)
322+
323+
attrs := summarySpan.Attributes()
324+
325+
_, exists := attrs.Get("aggregation.histogram_bucket_bounds_s")
326+
assert.False(t, exists)
327+
328+
_, exists = attrs.Get("aggregation.histogram_bucket_counts")
329+
assert.False(t, exists)
330+
}
331+
249332
func TestLeafSpanPruning_GroupByNonStringAttributes(t *testing.T) {
250333
factory := NewFactory()
251334
cfg := factory.CreateDefaultConfig().(*Config)

0 commit comments

Comments
 (0)