Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.splunk.opentelemetry.profiler.exporter;

import com.splunk.opentelemetry.profiler.context.StackToSpanLinkage;
import java.time.Duration;
import java.time.Instant;

public interface CpuEventExporter {
Expand All @@ -30,7 +31,8 @@ default void export(
StackTraceElement[] stackTrace,
Instant eventTime,
String traceId,
String spanId) {}
String spanId,
Duration duration) {}

default void flush() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ public void export(
StackTraceElement[] stackTrace,
Instant eventTime,
String traceId,
String spanId) {
String spanId,
Duration duration) {
Sample.Builder sample = Sample.newBuilder();

pprof.addLabel(sample, THREAD_ID, threadId);
Expand All @@ -127,7 +128,7 @@ public void export(
pprof.incFrameCount();
}

pprof.addLabel(sample, SOURCE_EVENT_PERIOD, period.toMillis());
pprof.addLabel(sample, SOURCE_EVENT_PERIOD, duration.toMillis());
pprof.addLabel(sample, SOURCE_EVENT_TIME, eventTime.toEpochMilli());

if (TraceId.isValid(traceId)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.splunk.opentelemetry.profiler.exporter.CpuEventExporter;
import com.splunk.opentelemetry.profiler.exporter.PprofCpuEventExporter;
import io.opentelemetry.api.logs.Logger;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand All @@ -32,12 +31,10 @@ class AsyncStackTraceExporter implements StackTraceExporter {

private final ExecutorService executor = Executors.newSingleThreadExecutor();
private final Logger otelLogger;
private final Duration samplingPeriod;
private final int maxDepth;

AsyncStackTraceExporter(Logger logger, Duration samplingPeriod, int maxDepth) {
AsyncStackTraceExporter(Logger logger, int maxDepth) {
this.otelLogger = logger;
this.samplingPeriod = samplingPeriod;
this.maxDepth = maxDepth;
}

Expand All @@ -53,7 +50,6 @@ private Runnable pprofExporter(Logger otelLogger, List<StackTrace> stackTraces)
PprofCpuEventExporter.builder()
.otelLogger(otelLogger)
.stackDepth(maxDepth)
.period(samplingPeriod)
.instrumentationSource(InstrumentationSource.SNAPSHOT)
.build();

Expand All @@ -65,7 +61,8 @@ private Runnable pprofExporter(Logger otelLogger, List<StackTrace> stackTraces)
stackTrace.getStackFrames(),
stackTrace.getTimestamp(),
stackTrace.getTraceId(),
stackTrace.getSpanId());
stackTrace.getSpanId(),
stackTrace.getDuration());
}
cpuEventExporter.flush();
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ class ScheduledExecutorStackTraceSampler implements StackTraceSampler {
Logger.getLogger(ScheduledExecutorStackTraceSampler.class.getName());
private static final int SCHEDULER_INITIAL_DELAY = 0;

private final ConcurrentMap<String, ScheduledExecutorService> samplers =
new ConcurrentHashMap<>();
private final ConcurrentMap<String, ThreadSampler> samplers = new ConcurrentHashMap<>();
private final ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
private final StagingArea stagingArea;
private final Supplier<SpanTracker> spanTracker;
Expand All @@ -54,52 +53,68 @@ class ScheduledExecutorStackTraceSampler implements StackTraceSampler {
public void start(SpanContext spanContext) {
samplers.computeIfAbsent(
spanContext.getTraceId(),
traceId -> {
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
scheduler.scheduleAtFixedRate(
new StackTraceGatherer(
samplingPeriod, spanContext.getTraceId(), Thread.currentThread()),
SCHEDULER_INITIAL_DELAY,
samplingPeriod.toMillis(),
TimeUnit.MILLISECONDS);
return scheduler;
});
traceId -> new ThreadSampler(traceId, Thread.currentThread(), samplingPeriod));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this change in here?

Copy link
Copy Markdown
Contributor Author

@tduncan tduncan Apr 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The creation and scheduling of StackTraceGatherer instances has moved into the construction of the new ThreadSampler which now assumes responsibility of managing thread profiling mechanism, including stopping the scheduled sampling and taking the final on demand sample when trace profiling is stopped.

}

@Override
public void stop(SpanContext spanContext) {
ScheduledExecutorService scheduler = samplers.remove(spanContext.getTraceId());
if (scheduler != null) {
scheduler.shutdown();
String traceId = spanContext.getTraceId();
ThreadSampler sampler = samplers.remove(traceId);
if (sampler != null) {
sampler.stop();
}

stagingArea.empty(spanContext.getTraceId());
}

class StackTraceGatherer implements Runnable {
private final Duration samplingPeriod;
private class ThreadSampler {
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Address in new PR I just opened: #2283

private final StackTraceGatherer gatherer;

private ThreadSampler(String traceId, Thread thread, Duration period) {
gatherer = new StackTraceGatherer(traceId, thread, System.nanoTime());
scheduler.scheduleAtFixedRate(
gatherer, SCHEDULER_INITIAL_DELAY, period.toMillis(), TimeUnit.MILLISECONDS);
}

void stop() {
scheduler.shutdown();
gatherer.run();
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get it why you'd want to get a sample on end. That sample doesn't have any application code running, what use does it have?

Copy link
Copy Markdown
Contributor Author

@tduncan tduncan Apr 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It helps report more accurate estimated timing information. Prior to this change all the trace profiles would have timing data that was some multiple of 20 because every sample was submitted with a 20ms duration, and would almost always be more than 20ms longer than the reported span duration (#2277 also improves the default accuracy but will still be a multiple of 10ms).

Computing each sample period and taking a final sample greatly reduces the difference between the report span duration and the reported profiling duration. They are still not the same, however. In fact with this change the profiling duration is consistently undercounting the duration by a few milliseconds when compared to the duration of the entry span duration.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure that using the timing data this way makes sense at all for snapshot profiler? A stack sample proves that a method was executed at given time. It does not reveal anything about how long the method was executing. I get the feeling that you are deriving arbitrary meaning here for the period. As far as I remember profilers usually show percentages, like this method was observed in 52% of samples.
I guess the SCHEDULER_INITIAL_DELAY is set to 0 for similar reasons.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the same strategy used for classic java profilers. Unless methods are being instrumented directly then method timing is extrapolated the time based on the samples that it receives.

The assumption today is that for a particular callstack. That callstack spent $INTERVAL_TIME running. Say the interval is 10ms.

  • Span that runs 5ms will show method was executing for 10ms (1 sample on start, which says it took 10ms)
  • Span that runs for 11ms will show method was executing for 20ms (2 samples, 10ms each)
    Customers easily see this inaccuracy and wonder why method execution time is not accurate.

For our purposes because we are not taking a continuous sample we cannot assume the first sample took 10ms because we know we are starting this sample when the method starts. Making an assumption that the method has run for 5-10ms before we took a sample would be incorrect.

This extrapolation does lead to somewhat arbitrary inaccuracies for example these stack traces

Time: Stack
T0: java.lang.Thread.run() (synchronous)
T1: java.lang.Thread.run(), foo.bar()
T2:java.lang.Thread.run(), foo.bar()

Was the duration T1-T0 spent in run() or bar()? By necessity we have to extrapolate and guess. It is really a coin flip that we can go either way. What we can say for sure is that bar() ran for at least 10ms. And some combination of 10ms spent likely in either run() or bar() or even a method call that ran too fast to be tracked.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the same strategy used for classic java profilers. Unless methods are being instrumented directly then method timing is extrapolated the time based on the samples that it receives.

The difference is that a classic profiler has usually lots of samples. Here even if you somehow combined samples from multiple requests to the same endpoint it still wouldn't help because all the samples are biased because they are taken at 10ms intervals.

This extrapolation does lead to somewhat arbitrary inaccuracies

If you already know that this is not going to work correctly then perhaps you should considered alternatives. Whatever we choose to implement now is what we are going to be stuck with for the foreseeable future. If you believe that taking an extra sample at the start and end of the trace is the best option then you will need to get this into the gdi spec along with whatever magic you need to do to compute the period.
Imo it would be nice to see you moving from having a separate scheduler per profiled trace into something more standard like having a single scheduler that drives the sampling for all profiled traces.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arbitrary inaccuracy is too harsh, the main point is that the strategy is consistent. We always say for the time since last sample it is spent in the method that we see. We are looking at smaller time periods here, but still it is for the user to understand approximate time taken.

}
}

private class StackTraceGatherer implements Runnable {
private final String traceId;
private final Thread thread;
private long timestampNanos;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

timestampnanos is accessed from multiple threads so should be made volatile


StackTraceGatherer(Duration samplingPeriod, String traceId, Thread thread) {
this.samplingPeriod = samplingPeriod;
StackTraceGatherer(String traceId, Thread thread, long timestampNanos) {
this.traceId = traceId;
this.thread = thread;
this.timestampNanos = timestampNanos;
}

@Override
public void run() {
long currentSampleTimestamp = System.nanoTime();
try {
Instant now = Instant.now();
ThreadInfo threadInfo = threadMXBean.getThreadInfo(thread.getId(), Integer.MAX_VALUE);
SpanContext spanContext = retrieveActiveSpan(thread);
Duration samplingPeriod = samplingPeriod(timestampNanos, currentSampleTimestamp);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have heard that the period isn't really used for snapshot profiling events, you sure it is worth it to compute exact values? Secondly since this code is called from the scheduled executor and also manually on stop you have a bit of race possibility here. Wouldn't be surprised when with debugger you could create a situation where the computed period is negative.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously the sampling period was ignored (see changes in AsyncStackTraceExporter). Now the sampling period included in the profiling log message will be the one computed by the StackTraceSampler. This data is eventually stored in the profiling backend and used to calculated the estimated total time each stack frame was running.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 @laurit this timestampnanos needs to be threadsafe

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Imo the timestampnanos could just be the tip of the iceberg here. ThreadSampler#shutdown calls

scheduler.shutdown();
gatherer.run();

ExecutorService.shutdown javadoc says

Initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be accepted. Invocation has no additional effect if already shut down.
This method does not wait for previously submitted tasks to complete execution. Use awaitTermination to do that.

I didn't try this out but hypothetically assuming that the executor was already executing gatherer.run when shutdown was called gatherer.run may run in parallel from the executor and from the explicit gatherer.run() call. Similarly stagingArea.empty(spanContext.getTraceId()) called from ScheduledExecutorStackTraceSampler#stop could run in parallel with gatherer.run from the executor with some unlucky timing. I'd guess that when gatherer.run is run in parallel the previous timestamp computations could be off, also AccumulatingStagingArea internally uses ArrayList that probably could be corrupted with concurrent mutations.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 accumulatingstagingarea needs to be threadsafe. Internal use of ArrayList won't work.

Copy link
Copy Markdown
Contributor Author

@tduncan tduncan Apr 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@laurit, @lo-jason Thread-safety of AccumulatingStagingArea addressed in #2288.

String spanId = retrieveActiveSpan(thread).getSpanId();
StackTrace stackTrace =
StackTrace.from(now, samplingPeriod, threadInfo, traceId, spanContext.getSpanId());
StackTrace.from(Instant.now(), samplingPeriod, threadInfo, traceId, spanId);
stagingArea.stage(traceId, stackTrace);
} catch (Exception e) {
logger.log(Level.SEVERE, e, samplerErrorMessage(traceId, thread.getId()));
} finally {
timestampNanos = currentSampleTimestamp;
}
}

private Duration samplingPeriod(long fromNanos, long toNanos) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this method provides too much -- I'd just inline it.

return Duration.ofNanos(toNanos - fromNanos);
}

private SpanContext retrieveActiveSpan(Thread thread) {
return spanTracker.get().getActiveSpan(thread).orElse(SpanContext.getInvalid());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk;
import io.opentelemetry.sdk.autoconfigure.spi.ConfigProperties;
import io.opentelemetry.sdk.resources.Resource;
import java.time.Duration;

@AutoService(AgentListener.class)
public class StackTraceExporterActivator implements AgentListener {
Expand All @@ -46,10 +45,8 @@ public void afterAgent(AutoConfiguredOpenTelemetrySdk autoConfiguredOpenTelemetr
ConfigProperties properties = AutoConfigureUtil.getConfig(autoConfiguredOpenTelemetrySdk);
if (snapshotProfilingEnabled(properties)) {
int maxDepth = Configuration.getSnapshotProfilerStackDepth(properties);
Duration samplingPeriod = Configuration.getSnapshotProfilerSamplingInterval(properties);
Logger logger = buildLogger(autoConfiguredOpenTelemetrySdk, properties);
AsyncStackTraceExporter exporter =
new AsyncStackTraceExporter(logger, samplingPeriod, maxDepth);
AsyncStackTraceExporter exporter = new AsyncStackTraceExporter(logger, maxDepth);
StackTraceExporterProvider.INSTANCE.configure(exporter);
}
}
Expand Down
Loading