Skip to content

Commit 833ad62

Browse files
committed
Fix stats computation pipeline
1 parent e50f749 commit 833ad62

File tree

7 files changed

+113
-52
lines changed

7 files changed

+113
-52
lines changed

tracer/src/Datadog.Trace/Agent/AgentWriter.cs

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -433,34 +433,36 @@ private void SerializeTrace(in SpanCollection spans)
433433
var chunk = spans;
434434
if (CanComputeStats)
435435
{
436-
chunk = _statsAggregator.ProcessTrace(in chunk);
437-
bool shouldSendTrace = _statsAggregator.ShouldKeepTrace(in chunk);
436+
var dropReason = _statsAggregator.ProcessTrace(ref chunk);
438437
_statsAggregator.AddRange(in chunk);
439-
var singleSpanSamplingSpans = new List<Span>(); // TODO maybe we can store this from above?
438+
List<Span>? singleSpanSamplingSpans = null; // TODO maybe we can store this from above?
440439

441440
foreach (var span in chunk)
442441
{
443442
if (span.GetMetric(Metrics.SingleSpanSampling.SamplingMechanism) is not null)
444443
{
444+
singleSpanSamplingSpans ??= new();
445445
singleSpanSamplingSpans.Add(span);
446446
}
447447
}
448448

449-
if (shouldSendTrace)
449+
if (dropReason is null)
450450
{
451451
TelemetryFactory.Metrics.RecordCountTraceChunkEnqueued(MetricTags.TraceChunkEnqueueReason.P0Keep);
452452
TelemetryFactory.Metrics.RecordCountSpanEnqueuedForSerialization(MetricTags.SpanEnqueueReason.P0Keep, chunk.Count);
453453
}
454454
else
455455
{
456-
// If stats computation determined that we can drop the P0 Trace,
456+
// If stats computation determined that we can drop the trace,
457457
// skip all other processing
458-
TelemetryFactory.Metrics.RecordCountTraceChunkDropped(MetricTags.DropReason.P0Drop);
459-
if (singleSpanSamplingSpans.Count == 0)
458+
var reasonTag = dropReason.Value.ToTagReason();
459+
460+
TelemetryFactory.Metrics.RecordCountTraceChunkDropped(reasonTag);
461+
if (singleSpanSamplingSpans is null)
460462
{
461463
Interlocked.Increment(ref _droppedP0Traces);
462464
Interlocked.Add(ref _droppedP0Spans, chunk.Count);
463-
TelemetryFactory.Metrics.RecordCountSpanDropped(MetricTags.DropReason.P0Drop, chunk.Count);
465+
TelemetryFactory.Metrics.RecordCountSpanDropped(reasonTag, chunk.Count);
464466
return;
465467
}
466468
else
@@ -472,7 +474,7 @@ private void SerializeTrace(in SpanCollection spans)
472474
var spansDropped = chunk.Count - singleSpanSamplingSpans.Count;
473475
Interlocked.Add(ref _droppedP0Spans, spansDropped);
474476
chunk = new SpanCollection(singleSpanSamplingSpans.ToArray(), singleSpanSamplingSpans.Count);
475-
TelemetryFactory.Metrics.RecordCountSpanDropped(MetricTags.DropReason.P0Drop, spansDropped);
477+
TelemetryFactory.Metrics.RecordCountSpanDropped(reasonTag, spansDropped);
476478
TelemetryFactory.Metrics.RecordCountSpanEnqueuedForSerialization(MetricTags.SpanEnqueueReason.SingleSpanSampling, chunk.Count);
477479
TelemetryFactory.Metrics.RecordCountTracePartialFlush(MetricTags.PartialFlushReason.SingleSpanIngestion);
478480
}

tracer/src/Datadog.Trace/Agent/IStatsAggregator.cs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,13 +42,12 @@ internal interface IStatsAggregator
4242
void AddRange(in SpanCollection spans);
4343

4444
/// <summary>
45-
/// Runs a series of samplers over the entire trace chunk
45+
/// Apply normalization, filtering, obfuscation, and sampling, to understand if the
46+
/// trace should be kept
4647
/// </summary>
47-
/// <param name="spans">The trace chunk to sample</param>
48-
/// <returns>True if the trace chunk should be sampled, false otherwise.</returns>
49-
bool ShouldKeepTrace(in SpanCollection spans);
50-
51-
SpanCollection ProcessTrace(in SpanCollection trace);
48+
/// <param name="spans">The spans chunk to process</param>
49+
/// <returns>An optional trace drop reason, or <c>null</c> if the trace should _not_ be dropped</returns>
50+
TraceDropReason? ProcessTrace(ref SpanCollection spans);
5251

5352
Task DisposeAsync();
5453

tracer/src/Datadog.Trace/Agent/NullStatsAggregator.cs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,7 @@ public void AddRange(in SpanCollection spans)
2121
{
2222
}
2323

24-
public bool ShouldKeepTrace(in SpanCollection spans) => true;
25-
26-
public SpanCollection ProcessTrace(in SpanCollection trace) => trace;
24+
public TraceDropReason? ProcessTrace(ref SpanCollection spans) => null;
2725

2826
public Task DisposeAsync()
2927
{

tracer/src/Datadog.Trace/Agent/StatsAggregator.cs

Lines changed: 61 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -134,13 +134,6 @@ public void Add(params Span[] spans)
134134

135135
public void AddRange(in SpanCollection spans)
136136
{
137-
// Trace-level filters from the agent must be applied before stats computation.
138-
// Rejected traces should not contribute to stats.
139-
if (IsTraceFiltered(in spans))
140-
{
141-
return;
142-
}
143-
144137
// Contention around this lock is expected to be very small:
145138
// AddRange is called from the serialization thread, and concurrent serialization
146139
// of traces is a rare corner-case (happening only during shutdown).
@@ -154,21 +147,72 @@ public void AddRange(in SpanCollection spans)
154147
}
155148
}
156149

157-
public bool ShouldKeepTrace(in SpanCollection trace)
150+
public TraceDropReason? ProcessTrace(ref SpanCollection spans)
158151
{
159-
// For OTLP, align with the OpenTelemetry SDK behavior to export a trace based
160-
// solely on its sampling decision.
161-
if (_isOtlp)
152+
// Follow the same processing steps as the Go tracer
153+
spans = NormalizeTrace(in spans);
154+
if (ShouldFilterTrace(in spans))
162155
{
163-
return _prioritySampler.Sample(in trace);
156+
return TraceDropReason.TraceFilter;
157+
}
158+
159+
spans = ObfuscateTrace(in spans);
160+
if (!ShouldKeepTrace(in spans))
161+
{
162+
return TraceDropReason.Unsampled;
163+
}
164+
165+
return null; // keep
166+
}
167+
168+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
169+
internal SpanCollection NormalizeTrace(in SpanCollection trace)
170+
{
171+
try
172+
{
173+
return _normalizerProcessor.Process(in trace);
164174
}
175+
catch (Exception e)
176+
{
177+
Log.Error(e, "Error executing normalizer trace processor");
178+
return trace;
179+
}
180+
}
165181

166-
// Trace-level filters from the agent must reject traces before sampling.
167-
if (IsTraceFiltered(in trace))
182+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
183+
internal bool ShouldFilterTrace(in SpanCollection trace)
184+
{
185+
var filter = Volatile.Read(ref _traceFilter);
186+
if (filter is null)
168187
{
169188
return false;
170189
}
171190

191+
// Find the local root span, searching from the last span, as that's where it normally is
192+
var traceContext = TraceContext.GetTraceContext(in trace);
193+
var localRoot = traceContext?.RootSpan;
194+
195+
if (localRoot is not null && trace.ContainsSpanId(localRoot.SpanId, trace.Count - 1))
196+
{
197+
// localRoot is in the trace chunk, so we can apply the filter directly
198+
return filter.ShouldKeepTrace(localRoot);
199+
}
200+
201+
// local root isn't in the trace chunk (can happen with partial flushing)
202+
// or we don't have a local root (I don't know when that happens!)
203+
return false;
204+
}
205+
206+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
207+
internal bool ShouldKeepTrace(in SpanCollection trace)
208+
{
209+
// For OTLP, align with the OpenTelemetry SDK behavior to export a trace based
210+
// solely on its sampling decision.
211+
if (_isOtlp)
212+
{
213+
return _prioritySampler.Sample(in trace);
214+
}
215+
172216
// Note: The RareSampler must be run before all other samplers so that
173217
// the first rare span in the trace chunk (if any) is marked with "_dd.rare".
174218
// The sampling decision is only used if no other samplers choose to keep the trace chunk.
@@ -181,33 +225,23 @@ public bool ShouldKeepTrace(in SpanCollection trace)
181225
}
182226

183227
[MethodImpl(MethodImplOptions.AggressiveInlining)]
184-
public SpanCollection ProcessTrace(in SpanCollection trace)
228+
internal SpanCollection ObfuscateTrace(in SpanCollection trace)
185229
{
186-
var spans = trace;
187-
try
188-
{
189-
spans = _normalizerProcessor.Process(in spans);
190-
}
191-
catch (Exception e)
192-
{
193-
Log.Error(e, "Error executing normalizer trace processor");
194-
}
195-
196230
// Only obfuscate resources when the tracer has negotiated obfuscation responsibility.
197231
// The tracer currently only supports obfuscation version 1 (SQL, Cassandra, Redis).
198232
if (Volatile.Read(ref _tracerObfuscationVersion) == 1)
199233
{
200234
try
201235
{
202-
spans = _obfuscatorProcessor.Process(in spans);
236+
return _obfuscatorProcessor.Process(in trace);
203237
}
204238
catch (Exception e)
205239
{
206240
Log.Error(e, "Error executing obfuscator trace processor");
207241
}
208242
}
209243

210-
return spans;
244+
return trace;
211245
}
212246

213247
public StatsAggregationKey BuildKey(Span span, out List<byte[]> utf8PeerTags)
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
// <copyright file="TraceDropReason.cs" company="Datadog">
2+
// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License.
3+
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc.
4+
// </copyright>
5+
6+
using Datadog.Trace.Telemetry.Metrics;
7+
8+
namespace Datadog.Trace.Agent;
9+
10+
internal enum TraceDropReason
11+
{
12+
TraceFilter,
13+
Unsampled,
14+
}
15+
16+
#pragma warning disable SA1649 // File name must match first type name
17+
internal static class TraceDropReasonExtensions
18+
{
19+
public static MetricTags.DropReason ToTagReason(this TraceDropReason reason) => reason switch
20+
{
21+
TraceDropReason.TraceFilter => MetricTags.DropReason.TraceFilter,
22+
_ => MetricTags.DropReason.P0Drop,
23+
};
24+
}

tracer/test/Datadog.Trace.Tests/Agent/AgentWriterTests.cs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -601,9 +601,11 @@ public void AddRange(in SpanCollection spans)
601601
AddedSpans.Add(spans);
602602
}
603603

604-
public bool ShouldKeepTrace(in SpanCollection spans) => shouldKeepTrace;
605-
606-
public SpanCollection ProcessTrace(in SpanCollection trace) => processTrace(trace);
604+
public TraceDropReason? ProcessTrace(ref SpanCollection spans)
605+
{
606+
spans = processTrace(spans);
607+
return shouldKeepTrace ? null : TraceDropReason.Unsampled;
608+
}
607609

608610
public Task DisposeAsync() => Task.CompletedTask;
609611

tracer/test/Datadog.Trace.Tests/Agent/StatsAggregatorTests.cs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -499,7 +499,7 @@ public void CreateStatsAggregator_Otlp_AlwaysComputesStats()
499499
}
500500

501501
[Fact]
502-
public async Task Otlp_ShouldKeepTraces_TrueWhenTraceSampled()
502+
public async Task Otlp_ProcessTrace_WhenTraceSampled()
503503
{
504504
var aggregator = StatsAggregator.Create(Mock.Of<IApi>(), GetSettings(), NullDiscoveryService.Instance, isOtlp: true);
505505
await using var tracer = TracerHelper.CreateWithFakeAgent();
@@ -511,11 +511,12 @@ public async Task Otlp_ShouldKeepTraces_TrueWhenTraceSampled()
511511
traceContext.SetSamplingPriority(priority: SamplingPriorityValues.AutoKeep, mechanism: SamplingMechanism.LocalTraceSamplingRule, rate: null, limiterRate: null);
512512

513513
var traceChunk = new SpanCollection([span]);
514-
aggregator.ShouldKeepTrace(traceChunk).Should().BeTrue();
514+
var dropReason = aggregator.ProcessTrace(ref traceChunk);
515+
dropReason.Should().BeNull("sampled trace should be kept");
515516
}
516517

517518
[Fact]
518-
public async Task Otlp_ShouldKeepTraces_FalseWhenTraceNotSampled()
519+
public async Task Otlp_ProcessTrace_WhenTraceNotSampled()
519520
{
520521
var aggregator = StatsAggregator.Create(Mock.Of<IApi>(), GetSettings(), NullDiscoveryService.Instance, isOtlp: true);
521522
await using var tracer = TracerHelper.CreateWithFakeAgent();
@@ -527,7 +528,8 @@ public async Task Otlp_ShouldKeepTraces_FalseWhenTraceNotSampled()
527528
traceContext.SetSamplingPriority(priority: SamplingPriorityValues.AutoReject, mechanism: SamplingMechanism.LocalTraceSamplingRule, rate: null, limiterRate: null);
528529

529530
var traceChunk = new SpanCollection([span]);
530-
aggregator.ShouldKeepTrace(traceChunk).Should().BeFalse();
531+
var dropReason = aggregator.ProcessTrace(ref traceChunk);
532+
dropReason.Should().Be(TraceDropReason.Unsampled);
531533
}
532534

533535
[Fact]

0 commit comments

Comments
 (0)