Skip to content

Commit 6ccd729

Browse files
Merge pull request #200 from Workiva/add-sampling
add sampling support to span processors
2 parents 4b6145a + 441324e commit 6ccd729

14 files changed

+362
-149
lines changed

example/sampling.dart

+91
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
// Copyright 2021-2022 Workiva.
2+
// Licensed under the Apache License, Version 2.0. Please see https://github.com/Workiva/opentelemetry-dart/blob/master/LICENSE for more information
3+
4+
import 'package:collection/collection.dart';
5+
import 'package:opentelemetry/api.dart'
6+
show
7+
Attribute,
8+
Context,
9+
SpanKind,
10+
SpanLink,
11+
TraceId,
12+
registerGlobalTracerProvider,
13+
spanContextFromContext;
14+
import 'package:opentelemetry/sdk.dart'
15+
show
16+
ConsoleExporter,
17+
Decision,
18+
ParentBasedSampler,
19+
ReadOnlySpan,
20+
ReadWriteSpan,
21+
Sampler,
22+
SamplingResult,
23+
SimpleSpanProcessor,
24+
TracerProviderBase;
25+
26+
final Attribute samplingOffAttribute =
27+
Attribute.fromInt('sampling.priority', 0);
28+
29+
class SpanSamplingPrioritySampler implements Sampler {
30+
@override
31+
SamplingResult shouldSample(
32+
Context parentContext,
33+
TraceId traceId,
34+
String name,
35+
SpanKind spanKind,
36+
List<Attribute> attributes,
37+
List<SpanLink> links) {
38+
final decision = attributes.firstWhereOrNull((element) =>
39+
element.key == 'sampling.priority' && element.value == 0) !=
40+
null
41+
? Decision.recordOnly
42+
: Decision.recordAndSample;
43+
44+
return SamplingResult(
45+
decision, attributes, spanContextFromContext(parentContext).traceState);
46+
}
47+
48+
@override
49+
String get description => 'SpanSamplingPrioritySampler';
50+
}
51+
52+
class PrintingSpanProcessor extends SimpleSpanProcessor {
53+
PrintingSpanProcessor(super.exporter);
54+
55+
@override
56+
void onStart(ReadWriteSpan span, Context parentContext) {
57+
print('Span started: ${span.name}');
58+
super.onStart(span, parentContext);
59+
}
60+
61+
@override
62+
void onEnd(ReadOnlySpan span) {
63+
print('Span ended: ${span.name}');
64+
super.onEnd(span);
65+
}
66+
67+
@override
68+
void shutdown() {
69+
print('Shutting down');
70+
super.shutdown();
71+
}
72+
73+
@override
74+
void forceFlush() {}
75+
}
76+
77+
void main(List<String> args) async {
78+
final sampler = ParentBasedSampler(SpanSamplingPrioritySampler());
79+
final tp = TracerProviderBase(
80+
processors: [PrintingSpanProcessor(ConsoleExporter())], sampler: sampler);
81+
registerGlobalTracerProvider(tp);
82+
83+
final tracer = tp.getTracer('instrumentation-name');
84+
85+
tracer.startSpan('span-not-sampled', attributes: [
86+
samplingOffAttribute,
87+
]).end();
88+
tracer.startSpan('span-sampled').end();
89+
90+
tp.shutdown();
91+
}

lib/src/api/open_telemetry.dart

+4
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ Future<T> trace<T>(String name, Future<T> Function() fn,
8181
{api.Context? context,
8282
api.Tracer? tracer,
8383
bool newRoot = false,
84+
List<api.Attribute> spanAttributes = const [],
8485
api.SpanKind spanKind = api.SpanKind.internal,
8586
List<api.SpanLink> spanLinks = const []}) async {
8687
context ??= api.Context.current;
@@ -89,6 +90,7 @@ Future<T> trace<T>(String name, Future<T> Function() fn,
8990
final span = tracer.startSpan(name,
9091
// TODO: use start span option `newRoot` instead
9192
context: newRoot ? api.Context.root : context,
93+
attributes: spanAttributes,
9294
kind: spanKind,
9395
links: spanLinks);
9496
try {
@@ -116,6 +118,7 @@ T traceSync<T>(String name, T Function() fn,
116118
{api.Context? context,
117119
api.Tracer? tracer,
118120
bool newRoot = false,
121+
List<api.Attribute> spanAttributes = const [],
119122
api.SpanKind spanKind = api.SpanKind.internal,
120123
List<api.SpanLink> spanLinks = const []}) {
121124
context ??= api.Context.current;
@@ -124,6 +127,7 @@ T traceSync<T>(String name, T Function() fn,
124127
final span = tracer.startSpan(name,
125128
// TODO: use start span option `newRoot` instead
126129
context: newRoot ? api.Context.root : context,
130+
attributes: spanAttributes,
127131
kind: spanKind,
128132
links: spanLinks);
129133
try {

lib/src/api/span_processors/span_processor.dart

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import '../../../api.dart' as api;
55

66
@Deprecated(
7-
'This class will be moved to the SDK package in 0.19.0. Use [SpanExporter] from SDK instead.')
7+
'This class will be moved to the SDK package in 0.19.0. Use [SpanProcessor] from SDK instead.')
88
abstract class SpanProcessor {
99
void onStart(api.Span span, api.Context parentContext);
1010

lib/src/api/trace/span_context.dart

+16-26
Original file line numberDiff line numberDiff line change
@@ -5,39 +5,29 @@ import '../../../api.dart' as api;
55

66
/// Representation of the context of an individual span.
77
class SpanContext {
8-
final api.SpanId _spanId;
9-
final api.TraceId _traceId;
10-
final int _traceFlags;
11-
final api.TraceState _traceState;
12-
final bool _isRemote;
13-
14-
api.TraceId get traceId => _traceId;
15-
16-
api.SpanId get spanId => _spanId;
17-
18-
int get traceFlags => _traceFlags;
19-
20-
api.TraceState get traceState => _traceState;
8+
final api.TraceId traceId;
9+
final api.SpanId spanId;
10+
final int traceFlags;
11+
final api.TraceState traceState;
12+
final bool isRemote;
2113

2214
bool get isValid => spanId.isValid && traceId.isValid;
2315

2416
/// Construct a [SpanContext].
25-
SpanContext(this._traceId, this._spanId, this._traceFlags, this._traceState)
26-
: _isRemote = false;
17+
SpanContext(this.traceId, this.spanId, this.traceFlags, this.traceState)
18+
: isRemote = false;
2719

28-
/// Construct a [SpanContext] representing an operation which originated
29-
/// from a remote source.
20+
/// Construct a [SpanContext] representing an operation which originated from
21+
/// a remote source.
3022
SpanContext.remote(
31-
this._traceId, this._spanId, this._traceFlags, this._traceState)
32-
: _isRemote = true;
23+
this.traceId, this.spanId, this.traceFlags, this.traceState)
24+
: isRemote = true;
3325

3426
/// Construct an invalid [SpanContext].
3527
SpanContext.invalid()
36-
: _spanId = api.SpanId.invalid(),
37-
_traceId = api.TraceId.invalid(),
38-
_traceFlags = api.TraceFlags.none,
39-
_traceState = api.TraceState.empty(),
40-
_isRemote = false;
41-
42-
bool get isRemote => _isRemote;
28+
: spanId = api.SpanId.invalid(),
29+
traceId = api.TraceId.invalid(),
30+
traceFlags = api.TraceFlags.none,
31+
traceState = api.TraceState.empty(),
32+
isRemote = false;
4333
}

lib/src/sdk/trace/exporters/collector_exporter.dart

+2
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,8 @@ class CollectorExporter implements sdk.SpanExporter {
268268
return pb_common.AnyValue();
269269
}
270270

271+
@Deprecated(
272+
'This method will be removed in 0.19.0. Use [SpanProcessor] instead.')
271273
@override
272274
void forceFlush() {
273275
return;

lib/src/sdk/trace/exporters/console_exporter.dart

+2
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ class ConsoleExporter implements sdk.SpanExporter {
3333
_printSpans(spans);
3434
}
3535

36+
@Deprecated(
37+
'This method will be removed in 0.19.0. Use [SpanProcessor] instead.')
3638
@override
3739
void forceFlush() {
3840
return;

lib/src/sdk/trace/exporters/span_exporter.dart

+4-2
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
// Copyright 2021-2022 Workiva.
22
// Licensed under the Apache License, Version 2.0. Please see https://github.com/Workiva/opentelemetry-dart/blob/master/LICENSE for more information
33

4-
import '../../../../sdk.dart' as sdk;
4+
import '../read_only_span.dart';
55

66
abstract class SpanExporter {
7-
void export(List<sdk.ReadOnlySpan> spans);
7+
void export(List<ReadOnlySpan> spans);
88

9+
@Deprecated(
10+
'This method will be removed in 0.19.0. Use [SpanProcessor] instead.')
911
void forceFlush();
1012

1113
void shutdown();

lib/src/sdk/trace/span_processors/batch_processor.dart

+18-11
Original file line numberDiff line numberDiff line change
@@ -6,19 +6,23 @@ import 'dart:math';
66

77
import 'package:logging/logging.dart';
88

9-
import '../../../../api.dart' as api;
10-
import '../../../../sdk.dart' as sdk;
11-
12-
class BatchSpanProcessor implements sdk.SpanProcessor {
9+
import '../../../api/context/context.dart';
10+
import '../../../api/trace/trace_flags.dart';
11+
import '../exporters/span_exporter.dart';
12+
import '../read_only_span.dart';
13+
import '../read_write_span.dart';
14+
import 'span_processor.dart';
15+
16+
class BatchSpanProcessor implements SpanProcessor {
1317
static const int _DEFAULT_MAXIMUM_BATCH_SIZE = 512;
1418
static const int _DEFAULT_MAXIMUM_QUEUE_SIZE = 2048;
1519
static const int _DEFAULT_EXPORT_DELAY = 5000;
1620

17-
final sdk.SpanExporter _exporter;
21+
final SpanExporter _exporter;
1822
final Logger _log = Logger('opentelemetry.BatchSpanProcessor');
1923
final int _maxExportBatchSize;
2024
final int _maxQueueSize;
21-
final List<sdk.ReadOnlySpan> _spanBuffer = [];
25+
final List<ReadOnlySpan> _spanBuffer = [];
2226

2327
late final Timer _timer;
2428

@@ -41,19 +45,18 @@ class BatchSpanProcessor implements sdk.SpanProcessor {
4145
while (_spanBuffer.isNotEmpty) {
4246
_exportBatch(_timer);
4347
}
44-
_exporter.forceFlush();
4548
}
4649

4750
@override
48-
void onEnd(sdk.ReadOnlySpan span) {
51+
void onEnd(ReadOnlySpan span) {
4952
if (_isShutdown) {
5053
return;
5154
}
5255
_addToBuffer(span);
5356
}
5457

5558
@override
56-
void onStart(sdk.ReadWriteSpan span, api.Context parentContext) {}
59+
void onStart(ReadWriteSpan span, Context parentContext) {}
5760

5861
@override
5962
void shutdown() {
@@ -63,15 +66,19 @@ class BatchSpanProcessor implements sdk.SpanProcessor {
6366
_exporter.shutdown();
6467
}
6568

66-
void _addToBuffer(sdk.ReadOnlySpan span) {
69+
void _addToBuffer(ReadOnlySpan span) {
6770
if (_spanBuffer.length >= _maxQueueSize) {
6871
// Buffer is full, drop span.
6972
_log.warning(
7073
'Max queue size exceeded. Dropping ${_spanBuffer.length} spans.');
7174
return;
7275
}
7376

74-
_spanBuffer.add(span);
77+
final isSampled =
78+
span.spanContext.traceFlags & TraceFlags.sampled == TraceFlags.sampled;
79+
if (isSampled) {
80+
_spanBuffer.add(span);
81+
}
7582
}
7683

7784
void _exportBatch(Timer timer) {

lib/src/sdk/trace/span_processors/simple_processor.dart

+17-12
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,40 @@
11
// Copyright 2021-2022 Workiva.
22
// Licensed under the Apache License, Version 2.0. Please see https://github.com/Workiva/opentelemetry-dart/blob/master/LICENSE for more information
33

4-
import '../../../../api.dart' as api;
5-
import '../../../../sdk.dart' as sdk;
6-
7-
class SimpleSpanProcessor implements sdk.SpanProcessor {
8-
final sdk.SpanExporter _exporter;
4+
import '../../../api/context/context.dart';
5+
import '../../../api/trace/trace_flags.dart';
6+
import '../exporters/span_exporter.dart';
7+
import '../read_only_span.dart';
8+
import '../read_write_span.dart';
9+
import 'span_processor.dart';
10+
11+
class SimpleSpanProcessor implements SpanProcessor {
12+
final SpanExporter _exporter;
913
bool _isShutdown = false;
1014

1115
SimpleSpanProcessor(this._exporter);
1216

1317
@override
14-
void forceFlush() {
15-
_exporter.forceFlush();
16-
}
18+
void forceFlush() {}
1719

1820
@override
19-
void onEnd(sdk.ReadOnlySpan span) {
21+
void onEnd(ReadOnlySpan span) {
2022
if (_isShutdown) {
2123
return;
2224
}
2325

24-
_exporter.export([span]);
26+
final isSampled =
27+
span.spanContext.traceFlags & TraceFlags.sampled == TraceFlags.sampled;
28+
if (isSampled) {
29+
_exporter.export([span]);
30+
}
2531
}
2632

2733
@override
28-
void onStart(sdk.ReadWriteSpan span, api.Context parentContext) {}
34+
void onStart(ReadWriteSpan span, Context parentContext) {}
2935

3036
@override
3137
void shutdown() {
32-
forceFlush();
3338
_isShutdown = true;
3439
_exporter.shutdown();
3540
}

0 commit comments

Comments
 (0)