Skip to content

Commit 91e7ad5

Browse files
committed
Track active spans per thread rather than trace ID to account for traces that span multiple threads.
1 parent 6292ec3 commit 91e7ad5

File tree

6 files changed

+104
-48
lines changed

6 files changed

+104
-48
lines changed

profiler/src/main/java/com/splunk/opentelemetry/profiler/snapshot/ActiveSpanTracker.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
import javax.annotation.Nullable;
2828

2929
class ActiveSpanTracker implements ContextStorage, SpanTracker {
30-
private final ConcurrentMap<String, SpanContext> activeSpans = new ConcurrentHashMap<>();
30+
private final ConcurrentMap<Long, SpanContext> activeSpansForThreads = new ConcurrentHashMap<>();
3131

3232
private final ContextStorage delegate;
3333
private final TraceRegistry registry;
@@ -45,15 +45,15 @@ public Scope attach(Context toAttach) {
4545
return scope;
4646
}
4747

48-
String traceId = spanContext.getTraceId();
49-
SpanContext current = activeSpans.get(traceId);
48+
long threadId = Thread.currentThread().getId();
49+
SpanContext current = activeSpansForThreads.get(threadId);
5050
if (current == spanContext) {
5151
return scope;
5252
}
5353

54-
SpanContext previous = activeSpans.put(traceId, spanContext);
54+
SpanContext previous = activeSpansForThreads.put(threadId, spanContext);
5555
return () -> {
56-
activeSpans.computeIfPresent(traceId, (id, sc) -> previous);
56+
activeSpansForThreads.computeIfPresent(threadId, (id, sc) -> previous);
5757
scope.close();
5858
};
5959
}
@@ -69,7 +69,7 @@ public Context current() {
6969
}
7070

7171
@Override
72-
public Optional<SpanContext> getActiveSpan(String traceId) {
73-
return Optional.ofNullable(activeSpans.get(traceId));
72+
public Optional<SpanContext> getActiveSpan(long threadId) {
73+
return Optional.ofNullable(activeSpansForThreads.get(threadId));
7474
}
7575
}

profiler/src/main/java/com/splunk/opentelemetry/profiler/snapshot/ScheduledExecutorStackTraceSampler.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ public void run() {
9191
try {
9292
Instant now = Instant.now();
9393
ThreadInfo threadInfo = threadMXBean.getThreadInfo(threadId, Integer.MAX_VALUE);
94-
SpanContext spanContext = retrieveActiveSpan(traceId);
94+
SpanContext spanContext = retrieveActiveSpan(threadId);
9595
StackTrace stackTrace =
9696
StackTrace.from(now, samplingPeriod, threadInfo, traceId, spanContext.getSpanId());
9797
stagingArea.stage(traceId, stackTrace);
@@ -100,8 +100,8 @@ public void run() {
100100
}
101101
}
102102

103-
private SpanContext retrieveActiveSpan(String traceId) {
104-
return spanTracker.get().getActiveSpan(traceId).orElse(SpanContext.getInvalid());
103+
private SpanContext retrieveActiveSpan(long threadId) {
104+
return spanTracker.get().getActiveSpan(threadId).orElse(SpanContext.getInvalid());
105105
}
106106

107107
private Supplier<String> samplerErrorMessage(String traceId, long threadId) {

profiler/src/main/java/com/splunk/opentelemetry/profiler/snapshot/SpanTracker.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import java.util.Optional;
2121

2222
interface SpanTracker {
23-
SpanTracker NOOP = traceId -> Optional.empty();
23+
SpanTracker NOOP = threadId -> Optional.empty();
2424

25-
Optional<SpanContext> getActiveSpan(String traceId);
25+
Optional<SpanContext> getActiveSpan(long threadId);
2626
}

profiler/src/test/java/com/splunk/opentelemetry/profiler/snapshot/ActiveSpanTrackerTest.java

Lines changed: 86 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import io.opentelemetry.api.trace.Span;
2424
import io.opentelemetry.api.trace.SpanContext;
2525
import io.opentelemetry.api.trace.StatusCode;
26-
import io.opentelemetry.api.trace.TraceId;
2726
import io.opentelemetry.context.Context;
2827
import io.opentelemetry.context.ContextKey;
2928
import io.opentelemetry.context.ContextStorage;
@@ -49,9 +48,8 @@ void currentContextComesFromOpenTelemetryContextStorage() {
4948
}
5049

5150
@Test
52-
void noActiveSpanBeforeFirstSpanInTraceIsAttachedToContext() {
53-
var traceId = IdGenerator.random().generateTraceId();
54-
assertEquals(Optional.empty(), spanTracker.getActiveSpan(traceId));
51+
void noActiveSpanForThreadBeforeFirstSpanInTraceIsAttachedToContext() {
52+
assertEquals(Optional.empty(), spanTracker.getActiveSpan(currentThreadId()));
5553
}
5654

5755
@Test
@@ -62,8 +60,7 @@ void trackActiveSpanWhenNewContextAttached() {
6260
registry.register(spanContext);
6361

6462
try (var ignored = spanTracker.attach(context)) {
65-
var traceId = span.getSpanContext().getTraceId();
66-
assertEquals(Optional.of(spanContext), spanTracker.getActiveSpan(traceId));
63+
assertEquals(Optional.of(spanContext), spanTracker.getActiveSpan(currentThreadId()));
6764
}
6865
}
6966

@@ -77,7 +74,7 @@ void noActiveSpanForTraceAfterSpansScopeIsClosed() {
7774
var scope = spanTracker.attach(context);
7875
scope.close();
7976

80-
assertEquals(Optional.empty(), spanTracker.getActiveSpan(spanContext.getTraceId()));
77+
assertEquals(Optional.empty(), spanTracker.getActiveSpan(currentThreadId()));
8178
}
8279

8380
@Test
@@ -91,8 +88,7 @@ void trackActiveSpanAcrossMultipleContextChanges() {
9188
var spanContext = span.getSpanContext();
9289
context = context.with(span);
9390
try (var ignoredScope2 = spanTracker.attach(context)) {
94-
var traceId = span.getSpanContext().getTraceId();
95-
assertEquals(Optional.of(spanContext), spanTracker.getActiveSpan(traceId));
91+
assertEquals(Optional.of(spanContext), spanTracker.getActiveSpan(currentThreadId()));
9692
}
9793
}
9894
}
@@ -111,42 +107,100 @@ void restoreActiveSpanToPreviousSpanAfterScopeClosing() {
111107
scope.close();
112108

113109
var rootSpanContext = root.getSpanContext();
114-
assertEquals(
115-
Optional.of(rootSpanContext), spanTracker.getActiveSpan(rootSpanContext.getTraceId()));
110+
assertEquals(Optional.of(rootSpanContext), spanTracker.getActiveSpan(currentThreadId()));
116111
}
117112
}
118113

119114
@Test
120115
void trackActiveSpanForMultipleTraces() throws Exception {
121-
var executor = Executors.newSingleThreadExecutor();
122-
var root1 = FakeSpan.newSpan(Snapshotting.spanContext());
123-
var root2 = FakeSpan.newSpan(Snapshotting.spanContext());
124-
registry.register(root1.getSpanContext());
125-
registry.register(root2.getSpanContext());
126-
127-
try (var ignoredScope1 = executor.submit(attach(root1)).get();
128-
var ignoredScope2 = executor.submit(attach(root2)).get()) {
129-
var traceId1 = root1.getSpanContext().getTraceId();
130-
assertEquals(Optional.of(root1.getSpanContext()), spanTracker.getActiveSpan(traceId1));
131-
var traceId2 = root2.getSpanContext().getTraceId();
132-
assertEquals(Optional.of(root2.getSpanContext()), spanTracker.getActiveSpan(traceId2));
116+
var span1 = FakeSpan.newSpan(Snapshotting.spanContext());
117+
var span2 = FakeSpan.newSpan(Snapshotting.spanContext());
118+
registry.register(span1.getSpanContext());
119+
registry.register(span2.getSpanContext());
120+
121+
var executor = Executors.newFixedThreadPool(2);
122+
try (var scope1 = executor.submit(attach(span1)).get();
123+
var scope2 = executor.submit(attach(span2)).get()) {
124+
assertEquals(
125+
Optional.of(span1.getSpanContext()), spanTracker.getActiveSpan(scope1.getThreadId()));
126+
assertEquals(
127+
Optional.of(span2.getSpanContext()), spanTracker.getActiveSpan(scope2.getThreadId()));
133128
} finally {
134129
executor.shutdown();
135130
}
136131
}
137132

138-
private Callable<Scope> attach(Span span) {
133+
@Test
134+
void trackMultipleActiveSpansForSameTraceFromDifferentThreads() throws Exception {
135+
var traceId = IdGenerator.random().generateTraceId();
136+
var span1 = FakeSpan.newSpan(Snapshotting.spanContext().withTraceId(traceId));
137+
var span2 = FakeSpan.newSpan(Snapshotting.spanContext().withTraceId(traceId));
138+
registry.register(span1.getSpanContext());
139+
registry.register(span2.getSpanContext());
140+
141+
var executor = Executors.newFixedThreadPool(2);
142+
try (var scope1 = executor.submit(attach(span1)).get();
143+
var scope2 = executor.submit(attach(span2)).get()) {
144+
assertEquals(
145+
Optional.of(span1.getSpanContext()), spanTracker.getActiveSpan(scope1.getThreadId()));
146+
assertEquals(
147+
Optional.of(span2.getSpanContext()), spanTracker.getActiveSpan(scope2.getThreadId()));
148+
} finally {
149+
executor.shutdown();
150+
}
151+
}
152+
153+
private Callable<ThreadScope> attach(Span span) {
139154
return () -> {
140155
var context = Context.root().with(span);
141-
return spanTracker.attach(context);
156+
return new ThreadScope(spanTracker.attach(context), Thread.currentThread().getId());
142157
};
143158
}
144159

160+
private static class ThreadScope implements Scope {
161+
private final Scope scope;
162+
private final long threadId;
163+
164+
private ThreadScope(Scope scope, long threadId) {
165+
this.scope = scope;
166+
this.threadId = threadId;
167+
}
168+
169+
public long getThreadId() {
170+
return threadId;
171+
}
172+
173+
@Override
174+
public void close() {
175+
scope.close();
176+
}
177+
}
178+
179+
@Test
180+
void activeSpanForThreadIsUnchangedWhenTraceStartsSpanInAnotherThread() throws Exception {
181+
var traceId = IdGenerator.random().generateTraceId();
182+
var root = FakeSpan.newSpan(Snapshotting.spanContext().withTraceId(traceId));
183+
var child = FakeSpan.newSpan(Snapshotting.spanContext().withTraceId(traceId));
184+
registry.register(root.getSpanContext());
185+
registry.register(child.getSpanContext());
186+
187+
var executor = Executors.newSingleThreadExecutor();
188+
try (var scope1 = attach(root).call();
189+
var scope2 = executor.submit(attach(child)).get()) {
190+
assertEquals(
191+
Optional.of(root.getSpanContext()), spanTracker.getActiveSpan(scope1.getThreadId()));
192+
assertEquals(
193+
Optional.of(child.getSpanContext()), spanTracker.getActiveSpan(scope2.getThreadId()));
194+
} finally {
195+
executor.shutdown();
196+
}
197+
}
198+
145199
@Test
146200
void doNotTrackSpanWhenNoSpanPresentInContext() {
147201
var context = Context.root().with(ContextKey.named("test-key"), "value");
148202
try (var ignored = spanTracker.attach(context)) {
149-
assertEquals(Optional.empty(), spanTracker.getActiveSpan(TraceId.getInvalid()));
203+
assertEquals(Optional.empty(), spanTracker.getActiveSpan(currentThreadId()));
150204
}
151205
}
152206

@@ -157,8 +211,7 @@ void doNotTrackSpanWhenSpanIsNotSampled() {
157211
registry.register(span.getSpanContext());
158212

159213
try (var ignored = spanTracker.attach(context)) {
160-
var traceId = span.getSpanContext().getTraceId();
161-
assertEquals(Optional.empty(), spanTracker.getActiveSpan(traceId));
214+
assertEquals(Optional.empty(), spanTracker.getActiveSpan(currentThreadId()));
162215
}
163216
}
164217

@@ -170,8 +223,7 @@ void doNotTrackSpanTraceIsNotRegisteredForSnapshotting() {
170223
var context = Context.root().with(span);
171224

172225
try (var ignored = spanTracker.attach(context)) {
173-
var traceId = span.getSpanContext().getTraceId();
174-
assertEquals(Optional.empty(), spanTracker.getActiveSpan(traceId));
226+
assertEquals(Optional.empty(), spanTracker.getActiveSpan(currentThreadId()));
175227
}
176228
}
177229

@@ -191,6 +243,10 @@ void doNotTrackContinuallyTrackSameSpan() {
191243
}
192244
}
193245

246+
private long currentThreadId() {
247+
return Thread.currentThread().getId();
248+
}
249+
194250
private static class FakeSpan implements Span {
195251
static FakeSpan newSpan(Snapshotting.SpanContextBuilder spanContext) {
196252
return new FakeSpan(spanContext.build());

profiler/src/test/java/com/splunk/opentelemetry/profiler/snapshot/InMemorySpanTracker.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,14 @@
2222
import java.util.Optional;
2323

2424
class InMemorySpanTracker implements SpanTracker {
25-
private final Map<String, SpanContext> stackTraces = new HashMap<>();
25+
private final Map<Long, SpanContext> activeSpans = new HashMap<>();
2626

27-
void store(String traceId, SpanContext spanContext) {
28-
stackTraces.put(traceId, spanContext);
27+
void store(long threadId, SpanContext spanContext) {
28+
activeSpans.put(threadId, spanContext);
2929
}
3030

3131
@Override
32-
public Optional<SpanContext> getActiveSpan(String traceId) {
33-
return Optional.ofNullable(stackTraces.get(traceId));
32+
public Optional<SpanContext> getActiveSpan(long threadId) {
33+
return Optional.ofNullable(activeSpans.get(threadId));
3434
}
3535
}

profiler/src/test/java/com/splunk/opentelemetry/profiler/snapshot/ScheduledExecutorStackTraceSamplerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ void includeTraceIdOnStackTraces() {
216216
@Test
217217
void includeActiveSpanIdOnStackTraces() {
218218
var spanContext = Snapshotting.spanContext().build();
219-
spanTracker.store(spanContext.getTraceId(), spanContext);
219+
spanTracker.store(Thread.currentThread().getId(), spanContext);
220220

221221
try {
222222
sampler.start(spanContext);

0 commit comments

Comments
 (0)