Skip to content

Commit 5b3e9b3

Browse files
authored
[processor/tailsampling] Remove internally used fields from samplingpolicy.TraceData (open-telemetry#44435)
#### Description These fields are only used by the internals of the tail sampling processor to track metrics or the decision that was made. They are not needed for any samplers, and therefore should not be used by any sampler extensions either.
1 parent 8c99a07 commit 5b3e9b3

File tree

4 files changed

+53
-21
lines changed

4 files changed

+53
-21
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: breaking
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog)
7+
component: processor/tail_sampling
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Remove only internally relevant fields from samplingpolicy.TraceData.
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [44435]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [api]

processor/tailsamplingprocessor/pkg/samplingpolicy/samplingpolicy.go

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,24 +5,17 @@ package samplingpolicy // import "github.com/open-telemetry/opentelemetry-collec
55

66
import (
77
"context"
8-
"time"
98

109
"go.opentelemetry.io/collector/pdata/pcommon"
1110
"go.opentelemetry.io/collector/pdata/ptrace"
1211
)
1312

1413
// TraceData stores the sampling related trace data.
1514
type TraceData struct {
16-
// Arrival time the first span for the trace was received.
17-
ArrivalTime time.Time
18-
// DecisionTime time when sampling decision was taken.
19-
DecisionTime time.Time
2015
// SpanCount track the number of spans on the trace.
2116
SpanCount int64
2217
// ReceivedBatches stores all the batches received for the trace.
2318
ReceivedBatches ptrace.Traces
24-
// FinalDecision.
25-
FinalDecision Decision
2619
}
2720

2821
// Decision gives the status of sampling decision.

processor/tailsamplingprocessor/processor.go

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,17 @@ type policy struct {
4141
attribute metric.MeasurementOption
4242
}
4343

44+
// traceData is a wrapper around the publically used samplingpolicy.TraceData
45+
// that tracks information related to the decision making process but not
46+
// needed by any sampler implementations.
47+
type traceData struct {
48+
samplingpolicy.TraceData
49+
50+
arrivalTime time.Time
51+
decisionTime time.Time
52+
finalDecision samplingpolicy.Decision
53+
}
54+
4455
type tailSamplingSpanProcessor struct {
4556
ctx context.Context
4657

@@ -51,7 +62,7 @@ type tailSamplingSpanProcessor struct {
5162
deleteTraceQueue *list.List
5263
nextConsumer consumer.Traces
5364
policies []*policy
54-
idToTrace map[pcommon.TraceID]*samplingpolicy.TraceData
65+
idToTrace map[pcommon.TraceID]*traceData
5566
tickerFrequency time.Duration
5667
decisionBatcher idbatcher.Batcher
5768
sampledIDCache cache.Cache[bool]
@@ -103,7 +114,7 @@ func newTracesProcessor(ctx context.Context, set processor.Settings, nextConsume
103114
sampledIDCache: sampledDecisions,
104115
nonSampledIDCache: nonSampledDecisions,
105116
logger: set.Logger,
106-
idToTrace: make(map[pcommon.TraceID]*samplingpolicy.TraceData),
117+
idToTrace: make(map[pcommon.TraceID]*traceData),
107118
deleteTraceQueue: list.New(),
108119
sampleOnFirstMatch: cfg.SampleOnFirstMatch,
109120
blockOnOverflow: cfg.BlockOnOverflow,
@@ -536,14 +547,14 @@ func (tsp *tailSamplingSpanProcessor) samplingPolicyOnTick() bool {
536547
metrics.idNotFoundOnMapCount++
537548
continue
538549
}
539-
trace.DecisionTime = time.Now()
550+
trace.decisionTime = time.Now()
540551

541-
decision := tsp.makeDecision(id, trace, metrics)
552+
decision := tsp.makeDecision(id, &trace.TraceData, metrics)
542553
globalTracesSampledByDecision[decision]++
543554

544555
// Sampled or not, remove the batches
545556
allSpans := trace.ReceivedBatches
546-
trace.FinalDecision = decision
557+
trace.finalDecision = decision
547558
trace.ReceivedBatches = ptrace.NewTraces()
548559

549560
if decision == samplingpolicy.Sampled {
@@ -691,10 +702,12 @@ func (tsp *tailSamplingSpanProcessor) processTrace(id pcommon.TraceID, rss ptrac
691702

692703
actualData, ok := tsp.idToTrace[id]
693704
if !ok {
694-
actualData = &samplingpolicy.TraceData{
695-
ArrivalTime: currTime,
696-
SpanCount: spanCount,
697-
ReceivedBatches: ptrace.NewTraces(),
705+
actualData = &traceData{
706+
arrivalTime: currTime,
707+
TraceData: samplingpolicy.TraceData{
708+
SpanCount: spanCount,
709+
ReceivedBatches: ptrace.NewTraces(),
710+
},
698711
}
699712

700713
tsp.idToTrace[id] = actualData
@@ -709,7 +722,7 @@ func (tsp *tailSamplingSpanProcessor) processTrace(id pcommon.TraceID, rss ptrac
709722
actualData.SpanCount += spanCount
710723
}
711724

712-
finalDecision := actualData.FinalDecision
725+
finalDecision := actualData.finalDecision
713726

714727
if finalDecision == samplingpolicy.Unspecified {
715728
// If the final decision hasn't been made, add the new spans to the
@@ -729,8 +742,8 @@ func (tsp *tailSamplingSpanProcessor) processTrace(id pcommon.TraceID, rss ptrac
729742
tsp.logger.Warn("Unexpected sampling decision", zap.Int("decision", int(finalDecision)))
730743
}
731744

732-
if !actualData.DecisionTime.IsZero() {
733-
tsp.telemetry.ProcessorTailSamplingSamplingLateSpanAge.Record(tsp.ctx, int64(time.Since(actualData.DecisionTime)/time.Second))
745+
if !actualData.decisionTime.IsZero() {
746+
tsp.telemetry.ProcessorTailSamplingSamplingLateSpanAge.Record(tsp.ctx, int64(time.Since(actualData.decisionTime)/time.Second))
734747
}
735748
}
736749

@@ -767,7 +780,7 @@ func (tsp *tailSamplingSpanProcessor) dropTrace(traceID pcommon.TraceID, deletio
767780
}
768781

769782
delete(tsp.idToTrace, traceID)
770-
tsp.telemetry.ProcessorTailSamplingSamplingTraceRemovalAge.Record(tsp.ctx, int64(deletionTime.Sub(trace.ArrivalTime)/time.Second))
783+
tsp.telemetry.ProcessorTailSamplingSamplingTraceRemovalAge.Record(tsp.ctx, int64(deletionTime.Sub(trace.arrivalTime)/time.Second))
771784
}
772785

773786
// forwardSpans sends the trace data to the next consumer. it is different from

processor/tailsamplingprocessor/processor_benchmarks_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ func BenchmarkSampling(b *testing.B) {
3939

4040
for _, batch := range batches {
4141
sampleBatches = append(sampleBatches, &samplingpolicy.TraceData{
42-
ArrivalTime: time.Now(),
4342
SpanCount: 0,
4443
ReceivedBatches: batch,
4544
})

0 commit comments

Comments
 (0)