Skip to content

add sampling support to span processors #200

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions example/sampling.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright 2021-2022 Workiva.
// Licensed under the Apache License, Version 2.0. Please see https://github.com/Workiva/opentelemetry-dart/blob/master/LICENSE for more information

import 'package:collection/collection.dart';
import 'package:opentelemetry/api.dart'
show
Attribute,
Context,
SpanKind,
SpanLink,
TraceId,
registerGlobalTracerProvider,
spanContextFromContext;
import 'package:opentelemetry/sdk.dart'
show
ConsoleExporter,
Decision,
ParentBasedSampler,
ReadOnlySpan,
ReadWriteSpan,
Sampler,
SamplingResult,
SimpleSpanProcessor,
TracerProviderBase;

final Attribute samplingOffAttribute =
Attribute.fromInt('sampling.priority', 0);

class SpanSamplingPrioritySampler implements Sampler {
@override
SamplingResult shouldSample(
Context parentContext,
TraceId traceId,
String name,
SpanKind spanKind,
List<Attribute> attributes,
List<SpanLink> links) {
final decision = attributes.firstWhereOrNull((element) =>
element.key == 'sampling.priority' && element.value == 0) !=
null
? Decision.recordOnly
: Decision.recordAndSample;

return SamplingResult(
decision, attributes, spanContextFromContext(parentContext).traceState);
}

@override
String get description => 'SpanSamplingPrioritySampler';
}

class PrintingSpanProcessor extends SimpleSpanProcessor {
PrintingSpanProcessor(super.exporter);

@override
void onStart(ReadWriteSpan span, Context parentContext) {
print('Span started: ${span.name}');
super.onStart(span, parentContext);
}

@override
void onEnd(ReadOnlySpan span) {
print('Span ended: ${span.name}');
super.onEnd(span);
}

@override
void shutdown() {
print('Shutting down');
super.shutdown();
}

@override
void forceFlush() {}
}

void main(List<String> args) async {
final sampler = ParentBasedSampler(SpanSamplingPrioritySampler());
final tp = TracerProviderBase(
processors: [PrintingSpanProcessor(ConsoleExporter())], sampler: sampler);
registerGlobalTracerProvider(tp);

final tracer = tp.getTracer('instrumentation-name');

tracer.startSpan('span-not-sampled', attributes: [
samplingOffAttribute,
]).end();
tracer.startSpan('span-sampled').end();

tp.shutdown();
}
4 changes: 4 additions & 0 deletions lib/src/api/open_telemetry.dart
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ Future<T> trace<T>(String name, Future<T> Function() fn,
{api.Context? context,
api.Tracer? tracer,
bool newRoot = false,
List<api.Attribute> spanAttributes = const [],
api.SpanKind spanKind = api.SpanKind.internal,
List<api.SpanLink> spanLinks = const []}) async {
context ??= api.Context.current;
Expand All @@ -89,6 +90,7 @@ Future<T> trace<T>(String name, Future<T> Function() fn,
final span = tracer.startSpan(name,
// TODO: use start span option `newRoot` instead
context: newRoot ? api.Context.root : context,
attributes: spanAttributes,
kind: spanKind,
links: spanLinks);
try {
Expand Down Expand Up @@ -116,6 +118,7 @@ T traceSync<T>(String name, T Function() fn,
{api.Context? context,
api.Tracer? tracer,
bool newRoot = false,
List<api.Attribute> spanAttributes = const [],
api.SpanKind spanKind = api.SpanKind.internal,
List<api.SpanLink> spanLinks = const []}) {
context ??= api.Context.current;
Expand All @@ -124,6 +127,7 @@ T traceSync<T>(String name, T Function() fn,
final span = tracer.startSpan(name,
// TODO: use start span option `newRoot` instead
context: newRoot ? api.Context.root : context,
attributes: spanAttributes,
kind: spanKind,
links: spanLinks);
try {
Expand Down
2 changes: 1 addition & 1 deletion lib/src/api/span_processors/span_processor.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import '../../../api.dart' as api;

@Deprecated(
'This class will be moved to the SDK package in 0.19.0. Use [SpanExporter] from SDK instead.')
'This class will be moved to the SDK package in 0.19.0. Use [SpanProcessor] from SDK instead.')
abstract class SpanProcessor {
void onStart(api.Span span, api.Context parentContext);

Expand Down
42 changes: 16 additions & 26 deletions lib/src/api/trace/span_context.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,39 +5,29 @@ import '../../../api.dart' as api;

/// Representation of the context of an individual span.
class SpanContext {
final api.SpanId _spanId;
final api.TraceId _traceId;
final int _traceFlags;
final api.TraceState _traceState;
final bool _isRemote;

api.TraceId get traceId => _traceId;

api.SpanId get spanId => _spanId;

int get traceFlags => _traceFlags;

api.TraceState get traceState => _traceState;
final api.TraceId traceId;
final api.SpanId spanId;
final int traceFlags;
final api.TraceState traceState;
final bool isRemote;

bool get isValid => spanId.isValid && traceId.isValid;

/// Construct a [SpanContext].
SpanContext(this._traceId, this._spanId, this._traceFlags, this._traceState)
: _isRemote = false;
SpanContext(this.traceId, this.spanId, this.traceFlags, this.traceState)
: isRemote = false;

/// Construct a [SpanContext] representing an operation which originated
/// from a remote source.
/// Construct a [SpanContext] representing an operation which originated from
/// a remote source.
SpanContext.remote(
this._traceId, this._spanId, this._traceFlags, this._traceState)
: _isRemote = true;
this.traceId, this.spanId, this.traceFlags, this.traceState)
: isRemote = true;

/// Construct an invalid [SpanContext].
SpanContext.invalid()
: _spanId = api.SpanId.invalid(),
_traceId = api.TraceId.invalid(),
_traceFlags = api.TraceFlags.none,
_traceState = api.TraceState.empty(),
_isRemote = false;

bool get isRemote => _isRemote;
: spanId = api.SpanId.invalid(),
traceId = api.TraceId.invalid(),
traceFlags = api.TraceFlags.none,
traceState = api.TraceState.empty(),
isRemote = false;
}
2 changes: 2 additions & 0 deletions lib/src/sdk/trace/exporters/collector_exporter.dart
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,8 @@ class CollectorExporter implements sdk.SpanExporter {
return pb_common.AnyValue();
}

@Deprecated(
'This method will be removed in 0.19.0. Use [SpanProcessor] instead.')
@override
void forceFlush() {
return;
Expand Down
2 changes: 2 additions & 0 deletions lib/src/sdk/trace/exporters/console_exporter.dart
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ class ConsoleExporter implements sdk.SpanExporter {
_printSpans(spans);
}

@Deprecated(
'This method will be removed in 0.19.0. Use [SpanProcessor] instead.')
@override
void forceFlush() {
return;
Expand Down
6 changes: 4 additions & 2 deletions lib/src/sdk/trace/exporters/span_exporter.dart
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
// Copyright 2021-2022 Workiva.
// Licensed under the Apache License, Version 2.0. Please see https://github.com/Workiva/opentelemetry-dart/blob/master/LICENSE for more information

import '../../../../sdk.dart' as sdk;
import '../read_only_span.dart';

abstract class SpanExporter {
void export(List<sdk.ReadOnlySpan> spans);
void export(List<ReadOnlySpan> spans);

@Deprecated(
'This method will be removed in 0.19.0. Use [SpanProcessor] instead.')
void forceFlush();

void shutdown();
Expand Down
29 changes: 18 additions & 11 deletions lib/src/sdk/trace/span_processors/batch_processor.dart
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,23 @@ import 'dart:math';

import 'package:logging/logging.dart';

import '../../../../api.dart' as api;
import '../../../../sdk.dart' as sdk;

class BatchSpanProcessor implements sdk.SpanProcessor {
import '../../../api/context/context.dart';
import '../../../api/trace/trace_flags.dart';
import '../exporters/span_exporter.dart';
import '../read_only_span.dart';
import '../read_write_span.dart';
import 'span_processor.dart';

class BatchSpanProcessor implements SpanProcessor {
static const int _DEFAULT_MAXIMUM_BATCH_SIZE = 512;
static const int _DEFAULT_MAXIMUM_QUEUE_SIZE = 2048;
static const int _DEFAULT_EXPORT_DELAY = 5000;

final sdk.SpanExporter _exporter;
final SpanExporter _exporter;
final Logger _log = Logger('opentelemetry.BatchSpanProcessor');
final int _maxExportBatchSize;
final int _maxQueueSize;
final List<sdk.ReadOnlySpan> _spanBuffer = [];
final List<ReadOnlySpan> _spanBuffer = [];

late final Timer _timer;

Expand All @@ -41,19 +45,18 @@ class BatchSpanProcessor implements sdk.SpanProcessor {
while (_spanBuffer.isNotEmpty) {
_exportBatch(_timer);
}
_exporter.forceFlush();
}

@override
void onEnd(sdk.ReadOnlySpan span) {
void onEnd(ReadOnlySpan span) {
if (_isShutdown) {
return;
}
_addToBuffer(span);
}

@override
void onStart(sdk.ReadWriteSpan span, api.Context parentContext) {}
void onStart(ReadWriteSpan span, Context parentContext) {}

@override
void shutdown() {
Expand All @@ -63,15 +66,19 @@ class BatchSpanProcessor implements sdk.SpanProcessor {
_exporter.shutdown();
}

void _addToBuffer(sdk.ReadOnlySpan span) {
void _addToBuffer(ReadOnlySpan span) {
if (_spanBuffer.length >= _maxQueueSize) {
// Buffer is full, drop span.
_log.warning(
'Max queue size exceeded. Dropping ${_spanBuffer.length} spans.');
return;
}

_spanBuffer.add(span);
final isSampled =
span.spanContext.traceFlags & TraceFlags.sampled == TraceFlags.sampled;
if (isSampled) {
_spanBuffer.add(span);
}
}

void _exportBatch(Timer timer) {
Expand Down
29 changes: 17 additions & 12 deletions lib/src/sdk/trace/span_processors/simple_processor.dart
Original file line number Diff line number Diff line change
@@ -1,35 +1,40 @@
// Copyright 2021-2022 Workiva.
// Licensed under the Apache License, Version 2.0. Please see https://github.com/Workiva/opentelemetry-dart/blob/master/LICENSE for more information

import '../../../../api.dart' as api;
import '../../../../sdk.dart' as sdk;

class SimpleSpanProcessor implements sdk.SpanProcessor {
final sdk.SpanExporter _exporter;
import '../../../api/context/context.dart';
import '../../../api/trace/trace_flags.dart';
import '../exporters/span_exporter.dart';
import '../read_only_span.dart';
import '../read_write_span.dart';
import 'span_processor.dart';

class SimpleSpanProcessor implements SpanProcessor {
final SpanExporter _exporter;
bool _isShutdown = false;

SimpleSpanProcessor(this._exporter);

@override
void forceFlush() {
_exporter.forceFlush();
}
void forceFlush() {}

@override
void onEnd(sdk.ReadOnlySpan span) {
void onEnd(ReadOnlySpan span) {
if (_isShutdown) {
return;
}

_exporter.export([span]);
final isSampled =
span.spanContext.traceFlags & TraceFlags.sampled == TraceFlags.sampled;
if (isSampled) {
_exporter.export([span]);
}
}

@override
void onStart(sdk.ReadWriteSpan span, api.Context parentContext) {}
void onStart(ReadWriteSpan span, Context parentContext) {}

@override
void shutdown() {
forceFlush();
_isShutdown = true;
_exporter.shutdown();
}
Expand Down
Loading
Loading