Skip to content

Commit 77bff71

Browse files
authored
[processor/spanpruning] Optimize executeAggregations by reusing trace tree (#47771)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description Eliminate the parentReplacements and spansToRemove maps from executeAggregations by leveraging the existing traceTree structure. Parent replacement lookups now walk the tree's parent pointers via a new replacementSpanID field on spanNode, and span removal uses the tree's markedForRemoval flags in a single pass per ScopeSpans. There are two benefits to this change: 1. Fix OOMs when a trace is fragmented across many scope spans. Right now there is a bug where for each scope span a map of `len(spans-in-aggregation)` is pre-allocated and added to `spansToRemove`. By removing `spansToRemove` this is no longer done. However, this could also be fixed by avoiding the pre-allocation. 2. Using the existing tree instead of the two maps is also 10-20% faster, and reduces the amount of data allocated to process each trace. ``` │ sec/op │ sec/op vs base │ ProcessTrace_SmallTrace-8 8.219µ ± 7% 8.028µ ± 1% -2.32% (p=0.000 n=10) ProcessTrace_MediumTrace-8 72.36µ ± 7% 64.70µ ± 2% -10.59% (p=0.000 n=10) ProcessTrace_LargeTrace-8 739.1µ ± 1% 690.5µ ± 4% -6.58% (p=0.000 n=10) ProcessTrace_SparseAggregation-8 480.2µ ± 3% 488.4µ ± 1% +1.72% (p=0.019 n=10) DeepTrace_Depth1-8 441.0µ ± 5% 389.1µ ± 4% -11.75% (p=0.000 n=10) DeepTrace_Depth5-8 488.0µ ± 0% 428.6µ ± 4% -12.18% (p=0.000 n=10) DeepTrace_Depth10-8 487.8µ ± 0% 417.2µ ± 8% -14.48% (p=0.000 n=10) ExecuteAggregations-8 16.97µ ± 2% 13.58µ ± 2% -19.99% (p=0.000 n=10) │ B/op │ B/op vs base │ ProcessTrace_SmallTrace-8 13.92Ki ± 0% 13.80Ki ± 0% -0.90% (p=0.000 n=10) ProcessTrace_MediumTrace-8 119.9Ki ± 0% 113.5Ki ± 0% -5.32% (p=0.000 n=10) ProcessTrace_LargeTrace-8 1.184Mi ± 0% 1.075Mi ± 0% -9.19% (p=0.000 n=10) ProcessTrace_SparseAggregation-8 860.7Ki ± 0% 860.1Ki ± 0% -0.07% (p=0.000 n=10) DeepTrace_Depth1-8 730.6Ki ± 0% 675.1Ki ± 0% -7.59% (p=0.000 n=10) DeepTrace_Depth5-8 814.8Ki ± 0% 701.9Ki ± 0% -13.86% (p=0.000 n=10) DeepTrace_Depth10-8 814.8Ki ± 0% 701.9Ki ± 0% -13.86% (p=0.000 n=10) ExecuteAggregations-8 20.24Ki ± 0% 18.97Ki ± 0% -6.26% (p=0.000 n=10) ¹ all samples are equal │ allocs/op │ allocs/op vs base │ ProcessTrace_SmallTrace-8 204.0 ± 0% 202.0 ± 0% -0.98% (p=0.000 n=10) ProcessTrace_MediumTrace-8 1.508k ± 0% 1.493k ± 0% -0.99% (p=0.000 n=10) ProcessTrace_LargeTrace-8 13.80k ± 0% 13.77k ± 0% -0.24% (p=0.000 n=10) ProcessTrace_SparseAggregation-8 10.63k ± 0% 10.62k ± 0% -0.07% (p=0.000 n=10) DeepTrace_Depth1-8 8.230k ± 0% 8.204k ± 0% -0.32% (p=0.000 n=10) DeepTrace_Depth5-8 8.895k ± 0% 8.855k ± 0% -0.45% (p=0.000 n=10) DeepTrace_Depth10-8 8.895k ± 0% 8.855k ± 0% -0.45% (p=0.000 n=10) ExecuteAggregations-8 247.0 ± 0% 237.0 ± 0% -4.05% (p=0.000 n=10) ``` <!--Describe what testing was performed and which tests were added.--> #### Testing All the existing tests pass, and we have verified the memory improvement on a fragmented trace that was collected locally. This PR no longer OOMs when we try to process that trace.
1 parent f52b9a9 commit 77bff71

5 files changed

Lines changed: 60 additions & 37 deletions

File tree

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: bug_fix
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog)
7+
component: processor/spanpruning
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Avoid excessive memory usage on large and fragmented traces
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [47771]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

processor/spanpruningprocessor/aggregation.go

Lines changed: 23 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -77,52 +77,47 @@ func (*spanPruningProcessor) buildAggregationPlan(groups map[string]aggregationG
7777
return aggregationPlan{groups: groupSlice}
7878
}
7979

80-
// executeAggregations performs the top-down creation of summary spans, batch
81-
// removes originals, and returns the number of pruned spans.
82-
func (p *spanPruningProcessor) executeAggregations(plan aggregationPlan) int {
83-
// Track which parent SpanID should map to which summary SpanID
84-
parentReplacements := make(map[pcommon.SpanID]pcommon.SpanID, len(plan.groups)*4)
85-
86-
// Track spans to remove per ScopeSpans for batch removal
87-
spansToRemove := make(map[ptrace.ScopeSpans]map[pcommon.SpanID]struct{}, len(plan.groups))
80+
// executeAggregations performs the top-down creation of summary spans, removes
81+
// originals using the tree's markedForRemoval flags, and returns the number of
82+
// pruned spans.
83+
func (p *spanPruningProcessor) executeAggregations(plan aggregationPlan, tree *traceTree) int {
8884
prunedCount := 0
8985

9086
for i := range plan.groups {
9187
group := &plan.groups[i]
9288
// Calculate statistics and time range in single pass
9389
data := p.calculateAggregationData(group.nodes)
9490

95-
// Determine the parent SpanID for the summary span
96-
// Use the first node's parent as template
97-
originalParentID := group.nodes[0].span.ParentSpanID()
98-
99-
// Check if the parent is being replaced by a summary span
100-
summaryParentID := originalParentID
101-
if replacementID, exists := parentReplacements[originalParentID]; exists {
102-
summaryParentID = replacementID
91+
// Determine the parent SpanID for the summary span.
92+
// Walk the tree: if the parent node was already replaced by a summary
93+
// span (from a higher-depth group), use that replacement ID.
94+
summaryParentID := group.nodes[0].span.ParentSpanID()
95+
if parentNode := group.nodes[0].parent; parentNode != nil && !parentNode.replacementSpanID.IsEmpty() {
96+
summaryParentID = parentNode.replacementSpanID
10397
}
10498

10599
// Create summary span with correct parent
106100
p.createSummarySpanWithParent(*group, data, summaryParentID)
107101

108-
// Record that these original span IDs should be replaced by the summary span ID
102+
// Record replacement span ID on each node so child groups can find it
109103
for _, node := range group.nodes {
110-
spanID := node.span.SpanID()
111-
parentReplacements[spanID] = group.summarySpanID
112-
scopeSpans := node.scopeSpans
113-
if spansToRemove[scopeSpans] == nil {
114-
spansToRemove[scopeSpans] = make(map[pcommon.SpanID]struct{}, len(group.nodes))
115-
}
116-
spansToRemove[scopeSpans][spanID] = struct{}{}
104+
node.replacementSpanID = group.summarySpanID
117105
}
118106
prunedCount += len(group.nodes)
119107
}
120108

121-
// Batch remove all marked spans in a single pass per ScopeSpans
122-
for scopeSpans, spanIDs := range spansToRemove {
109+
// Collect unique ScopeSpans that contain marked nodes, then remove in a
110+
// single pass per ScopeSpans using the tree's flags set during analysis.
111+
seen := make(map[ptrace.ScopeSpans]struct{})
112+
for _, node := range tree.nodeByID {
113+
if node.markedForRemoval {
114+
seen[node.scopeSpans] = struct{}{}
115+
}
116+
}
117+
for scopeSpans := range seen {
123118
scopeSpans.Spans().RemoveIf(func(span ptrace.Span) bool {
124-
_, shouldRemove := spanIDs[span.SpanID()]
125-
return shouldRemove
119+
n, ok := tree.nodeByID[span.SpanID()]
120+
return ok && n.markedForRemoval
126121
})
127122
}
128123

processor/spanpruningprocessor/processor.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ func (p *spanPruningProcessor) processTrace(ctx context.Context, spans []spanInf
144144
plan := p.buildAggregationPlan(aggregationGroups)
145145

146146
// Phase 3: Execute aggregations (top-down) and record pruned spans
147-
prunedCount := p.executeAggregations(plan)
147+
prunedCount := p.executeAggregations(plan, tree)
148148

149149
// Record telemetry after aggregation is complete
150150
p.telemetryBuilder.ProcessorSpanpruningSpansPruned.Add(ctx, int64(prunedCount))

processor/spanpruningprocessor/processor_benchmark_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ func BenchmarkExecuteAggregations(b *testing.B) {
132132
plan := proc.buildAggregationPlan(groups)
133133

134134
b.StartTimer()
135-
proc.executeAggregations(plan)
135+
proc.executeAggregations(plan, tree)
136136
b.StopTimer()
137137
}
138138
}

processor/spanpruningprocessor/tree.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,14 @@ import (
1212
// spanNode models a span in the trace tree with cached relationships and
1313
// aggregation bookkeeping.
1414
type spanNode struct {
15-
span ptrace.Span
16-
scopeSpans ptrace.ScopeSpans
17-
parent *spanNode
18-
children []*spanNode
19-
groupKey string // cached group key for leaf spans
20-
isLeaf bool // true if node has no children
21-
markedForRemoval bool // true if node will be aggregated
15+
span ptrace.Span
16+
scopeSpans ptrace.ScopeSpans
17+
parent *spanNode
18+
children []*spanNode
19+
groupKey string // cached group key for leaf spans
20+
replacementSpanID pcommon.SpanID // summary span ID that replaced this node's group
21+
isLeaf bool // true if node has no children
22+
markedForRemoval bool // true if node will be aggregated
2223
}
2324

2425
// traceTree holds span nodes indexed by ID plus quick leaf/orphan lists for

0 commit comments

Comments
 (0)