Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions processors/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,50 @@ logger_provider:

`FilteringLogRecordProcessor` is a `LogRecordProcessor` that only keep logs based on a predicate

## Filtering Span Exporter

`FilteringSpanExporter` is a `SpanExporter` wrapper that filters spans within each export batch before delegating to the underlying exporter. Filtering is composable via two interfaces:

- `SpanFilter` - evaluates individual spans (e.g., error status, slow duration)
- `TraceFilter` - evaluates all spans belonging to a trace within the batch (e.g., overall trace wall-clock duration)

Within a batch, if any `SpanFilter` matches any span or any `TraceFilter` matches a trace's span group, all spans sharing that trace ID in the batch are exported together.

**Note:** Filtering decisions are scoped to a single `export()` call. Spans from the same trace arriving in different batches are evaluated independently, so a trace split across batches may be partially exported.

Built-in filters:

- `ErrorSpanFilter` - matches spans with error status
- `DurationSpanFilter` - matches spans exceeding a duration threshold
- `TraceDurationFilter` - matches when a trace's wall-clock duration (max end - min start) in the batch exceeds a threshold

Usage:

```java
SpanExporter delegate = OtlpGrpcSpanExporter.getDefault();

// Export spans whose batch-colocated trace has errors, individual spans > 2s, or trace duration > 10s
SpanExporter filtering = new FilteringSpanExporter(
delegate,
Arrays.asList(new ErrorSpanFilter(), new DurationSpanFilter(Duration.ofSeconds(2))),
Collections.singletonList(new TraceDurationFilter(Duration.ofSeconds(10))));

// Custom filters
SpanFilter nameFilter = span -> span.getName().contains("important");
SpanExporter custom = new FilteringSpanExporter(
delegate,
Collections.singletonList(nameFilter),
Collections.emptyList());

// Optionally pass a Meter to emit dropped-span metrics
Meter meter = openTelemetry.getMeter("my-service");
SpanExporter withMetrics = new FilteringSpanExporter(
delegate,
Arrays.asList(new ErrorSpanFilter(), new DurationSpanFilter(Duration.ofSeconds(2))),
Collections.singletonList(new TraceDurationFilter(Duration.ofSeconds(10))),
meter);
```

## Component owners

- [Cesar Munoz](https://github.com/LikeTheSalad), Elastic
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.filter;

import io.opentelemetry.sdk.trace.data.SpanData;
import java.time.Duration;

/**
* A {@link SpanFilter} that matches spans whose duration exceeds a configurable threshold, causing
* all batch-colocated spans sharing the same trace ID to be exported.
*/
public final class DurationSpanFilter implements SpanFilter {

private final long thresholdNanos;

/**
* Creates a new {@code DurationSpanFilter}.
*
* @param threshold the duration threshold; spans with duration strictly greater than this are
* considered interesting
*/
public DurationSpanFilter(Duration threshold) {
if (threshold.isNegative()) {
throw new IllegalArgumentException("threshold must be non-negative, got: " + threshold);
}
this.thresholdNanos = threshold.toNanos();
}

@Override
public boolean shouldKeep(SpanData spanData) {
long durationNanos = spanData.getEndEpochNanos() - spanData.getStartEpochNanos();
return durationNanos > thresholdNanos;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.filter;

import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.sdk.trace.data.SpanData;

/**
* A {@link SpanFilter} that matches spans with {@link StatusCode#ERROR}, causing all
* batch-colocated spans sharing the same trace ID to be exported.
*/
public final class ErrorSpanFilter implements SpanFilter {

@Override
public boolean shouldKeep(SpanData spanData) {
return spanData.getStatus().getStatusCode() == StatusCode.ERROR;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.filter;

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import javax.annotation.Nullable;

/**
* A {@link SpanExporter} wrapper that filters spans before delegating to the underlying exporter.
* Filtering operates at the trace level within each export batch: if any filter matches, all spans
* sharing that trace ID <em>in that batch</em> are exported together.
*
* <p>Two types of filters are supported:
*
* <ul>
* <li>{@link SpanFilter} - evaluates individual spans (e.g., error status, slow duration)
* <li>{@link TraceFilter} - evaluates all spans belonging to a trace within the batch (e.g.,
* overall trace wall-clock duration)
* </ul>
*
* <p>A trace's spans are kept if any {@code SpanFilter} matches any span in the batch, OR any
* {@code TraceFilter} matches the trace's span group within the batch.
*
* <p><strong>Important:</strong> Filtering decisions are scoped to a single {@link
* #export(Collection)} call. Spans from the same trace that arrive in different batches are
* evaluated independently, so a trace split across batches may be partially exported.
*/
public final class FilteringSpanExporter implements SpanExporter {

private static final AttributeKey<String> REASON_KEY = AttributeKey.stringKey("reason");

private final SpanExporter delegate;
private final List<SpanFilter> spanFilters;
private final List<TraceFilter> traceFilters;
@Nullable private final LongCounter droppedSpansCounter;

/**
* Creates a new {@code FilteringSpanExporter}.
*
* @param delegate the exporter to delegate to for spans that pass filtering
* @param spanFilters per-span filters; a trace's spans in the batch are kept if any filter
* matches
* @param traceFilters batch-level filters; a trace's spans in the batch are kept if any filter
* matches
* @param meter optional {@link Meter} for emitting dropped-span metrics; pass {@code null} to
* disable metrics
*/
public FilteringSpanExporter(
SpanExporter delegate,
List<SpanFilter> spanFilters,
List<TraceFilter> traceFilters,
@Nullable Meter meter) {
this.delegate = Objects.requireNonNull(delegate, "delegate");
Objects.requireNonNull(spanFilters, "spanFilters");
Objects.requireNonNull(traceFilters, "traceFilters");
this.spanFilters = Collections.unmodifiableList(new ArrayList<>(spanFilters));
this.traceFilters = Collections.unmodifiableList(new ArrayList<>(traceFilters));
if (meter != null) {
this.droppedSpansCounter =
meter
.counterBuilder("otel.contrib.processor.span.filtered")
.setDescription("Number of spans dropped by the filtering span exporter")
Comment thread
jaydeluca marked this conversation as resolved.
.setUnit("{span}")
.build();
} else {
this.droppedSpansCounter = null;
}
}

/**
* Creates a new {@code FilteringSpanExporter} without metrics.
*
* @param delegate the exporter to delegate to for spans that pass filtering
* @param spanFilters per-span filters
* @param traceFilters batch-level filters
*/
public FilteringSpanExporter(
SpanExporter delegate, List<SpanFilter> spanFilters, List<TraceFilter> traceFilters) {
this(delegate, spanFilters, traceFilters, null);
}

@Override
public CompletableResultCode export(Collection<SpanData> spans) {
// Group spans by trace ID and evaluate span-level filters in a single pass
Set<String> interestingTraceIds = new HashSet<>();
Map<String, List<SpanData>> spansByTrace = new HashMap<>();

for (SpanData span : spans) {
String traceId = span.getSpanContext().getTraceId();

List<SpanData> traceSpans = spansByTrace.get(traceId);
if (traceSpans == null) {
traceSpans = new ArrayList<>();
spansByTrace.put(traceId, traceSpans);
}
traceSpans.add(span);

// Check span-level filters
if (!interestingTraceIds.contains(traceId)) {
for (SpanFilter filter : spanFilters) {
if (filter.shouldKeep(span)) {
interestingTraceIds.add(traceId);
break;
}
}
}
}

// Evaluate trace-level filters
if (!traceFilters.isEmpty()) {
for (Map.Entry<String, List<SpanData>> entry : spansByTrace.entrySet()) {
String traceId = entry.getKey();
if (!interestingTraceIds.contains(traceId)) {
for (TraceFilter filter : traceFilters) {
if (filter.shouldKeep(traceId, entry.getValue())) {
interestingTraceIds.add(traceId);
break;
}
}
}
}
}

// Collect filtered spans
List<SpanData> filtered = new ArrayList<>();
long droppedCount = 0;

for (SpanData span : spans) {
if (interestingTraceIds.contains(span.getSpanContext().getTraceId())) {
filtered.add(span);
} else {
droppedCount++;
}
}

if (droppedSpansCounter != null && droppedCount > 0) {
droppedSpansCounter.add(droppedCount, Attributes.of(REASON_KEY, "not_interesting"));
}

if (filtered.isEmpty()) {
return CompletableResultCode.ofSuccess();
}

return delegate.export(filtered);
}

@Override
public CompletableResultCode flush() {
return delegate.flush();
}

@Override
public CompletableResultCode shutdown() {
return delegate.shutdown();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.filter;

import io.opentelemetry.sdk.trace.data.SpanData;

/**
* A filter that evaluates individual spans to determine if their containing trace's spans (within
* the same export batch) should be exported. Used by {@link FilteringSpanExporter} to make per-span
* keep/drop decisions.
*
* <p>If any {@code SpanFilter} returns {@code true} for any span in a batch, all spans sharing that
* trace ID within the same batch are exported.
*/
public interface SpanFilter {

/**
* Evaluates whether the given span is interesting enough to keep its trace's spans in the batch.
*
* @param spanData the span to evaluate
* @return {@code true} if this span should cause its trace's batch-colocated spans to be exported
*/
boolean shouldKeep(SpanData spanData);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.filter;

import io.opentelemetry.sdk.trace.data.SpanData;
import java.time.Duration;
import java.util.Collection;

/**
* A {@link TraceFilter} that matches when the wall-clock duration (max end - min start across all
* spans in the batch sharing a trace ID) exceeds a configurable threshold.
*/
public final class TraceDurationFilter implements TraceFilter {

private final long thresholdNanos;

/**
* Creates a new {@code TraceDurationFilter}.
*
* @param threshold the trace duration threshold; traces with wall-clock duration strictly greater
* than this are considered interesting
*/
public TraceDurationFilter(Duration threshold) {
if (threshold.isNegative()) {
throw new IllegalArgumentException("threshold must be non-negative, got: " + threshold);
}
this.thresholdNanos = threshold.toNanos();
}

@Override
public boolean shouldKeep(String traceId, Collection<SpanData> spans) {
Comment thread
jaydeluca marked this conversation as resolved.
if (spans.isEmpty()) {
return false;
}
long minStart = Long.MAX_VALUE;
long maxEnd = Long.MIN_VALUE;
for (SpanData span : spans) {
if (span.getStartEpochNanos() < minStart) {
minStart = span.getStartEpochNanos();
}
if (span.getEndEpochNanos() > maxEnd) {
maxEnd = span.getEndEpochNanos();
}
}
return (maxEnd - minStart) > thresholdNanos;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.filter;

import io.opentelemetry.sdk.trace.data.SpanData;
import java.util.Collection;

/**
* A filter that evaluates all spans belonging to a single trace within an export batch to determine
* if those spans should be exported. Used by {@link FilteringSpanExporter} for decisions that
* require batch-level context (e.g., overall trace wall-clock duration).
*
* <p>If any {@code TraceFilter} returns {@code true} for a trace, all spans sharing that trace ID
* within the same batch are exported.
*/
public interface TraceFilter {

/**
* Evaluates whether the given trace (represented as all spans sharing a trace ID within the
* current batch) should be exported.
*
* @param traceId the trace ID
* @param spans all spans in the current batch belonging to this trace
* @return {@code true} if this trace should be exported
*/
boolean shouldKeep(String traceId, Collection<SpanData> spans);
}
Loading
Loading