Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
23e6c9c
Add StagingArea implementation that periodically empties itself.
tduncan Apr 8, 2025
4dcbfeb
Use the PeriodicallyExportingStagingArea when customizing the OpenTel…
tduncan Apr 8, 2025
65922be
Add configuration option for the frequency in which to empty the Peri…
tduncan Apr 8, 2025
f291c53
Export a copy of collected stacktraces in PeriodicallyExportingStagin…
tduncan Apr 8, 2025
2f631f8
Return StackTraceExporter into accepting a List.
tduncan Apr 8, 2025
5f13f30
Modify the StagingArea interface to remove trace ID parameters.
tduncan Apr 8, 2025
7a3eb30
Apply spotless code formatting.
tduncan Apr 8, 2025
f7db0f1
Create a 'view' of the existing stacktraces prior to emptying the sta…
tduncan Apr 21, 2025
1ca2291
Schedule daemon threads.
tduncan Apr 21, 2025
1dd6131
Remove unnecessary test code.
tduncan Apr 25, 2025
55a58cd
Retrieve staging area export period from properties.
tduncan Apr 25, 2025
fb83db2
Apply spotless code formatting.
tduncan Apr 25, 2025
0909970
Remove debug print statements in test.
tduncan Apr 25, 2025
4d03248
Immediately shutdown the scheduler in PeriodicallyExportingStagingAre…
tduncan Apr 28, 2025
66dfc45
Check whether there are stack traces to export before making a copy.
tduncan Apr 28, 2025
7e61fb1
Use a Set to hold staged stack traces.
tduncan Apr 28, 2025
7b48c90
Add capacity considerations to PeriodicallyExportingStagingArea.
tduncan Apr 28, 2025
28da0f8
Add property for configuring the staging area capacity, default value…
tduncan Apr 28, 2025
ce6fde4
Rename constants and configuration function name.
tduncan Apr 28, 2025
1327e7c
Prevent multiple staged StackTraces that cause the capacity to be exc…
tduncan Apr 29, 2025
f798040
Add test requiring that stack traces be continually exported over time.
tduncan Apr 29, 2025
50b0b17
Export final StackTraces and shutdown exportWorker.
tduncan Apr 29, 2025
a45ff9b
Convert to background worker to a Thread object that internally manag…
tduncan Apr 30, 2025
6a0fa09
Apply spotless code formatting.
tduncan Apr 30, 2025
b7f8dea
Simplify exporting snapshot stacks
laurit May 1, 2025
ea9881c
Revert adding equals and hashCode methods in StackTrace.
tduncan May 1, 2025
b9ff741
spotless
laurit May 1, 2025
b020a09
Simplfy time comparison by using built in long comparator.
tduncan May 1, 2025
5d70bf9
Consider whether queue is empty when deciding whether to keep running…
tduncan May 1, 2025
1d293ab
Reorder methods.
tduncan May 1, 2025
c565d7f
Use more conventional while loop.
tduncan May 1, 2025
37998fc
Rename 'queue' to 'exports'.
tduncan May 1, 2025
1c9dbda
Set thread name.
tduncan May 1, 2025
4fc150b
Interrupt thread instead of throwing exception.
tduncan May 1, 2025
3d630bb
Remove unnecessary method.
tduncan May 1, 2025
96acf30
Rename queue to stackTraces.
tduncan May 1, 2025
279cccb
Reorder methods.
tduncan May 1, 2025
421978b
Minor refactors.
tduncan May 2, 2025
960516e
Apply spotless code formatting.
tduncan May 2, 2025
07ea81f
Merge branch 'batch-export-snapshot-profiling-stacktraces' into snaps…
tduncan May 2, 2025
b7378d3
Merge pull request #2 from laurit/snapshot-export
tduncan May 2, 2025
1844b1f
Move initial expor time calculation into constructor.
tduncan May 2, 2025
9e7d1a4
Remove unnecessary if statement.
tduncan May 2, 2025
feddbd5
Move max queue size calculation into Worker constructor.
tduncan May 2, 2025
fd0bb8e
spotless
laurit May 5, 2025
1b2819a
spotless
laurit May 5, 2025
e1141de
fix build on java17
laurit May 5, 2025
c27db01
Rename config property to 'splunk.snapshot.profiler.export.interval'.
tduncan May 5, 2025
677203c
Add CountDownLatch to wait for full shutdown of the staging area expo…
tduncan May 5, 2025
39a5a90
Apply spotless code formatting.
tduncan May 6, 2025
2bb2c86
We don't need a CountDownLatch...
tduncan May 6, 2025
c0cb4a8
spotless
laurit May 6, 2025
bb20b4c
Merge branch 'main' into batch-export-snapshot-profiling-stacktraces
tduncan May 6, 2025
3ecf6f9
Reinstate shutdown method call removed by mistake.
tduncan May 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,14 @@ public class Configuration implements AutoConfigurationCustomizerProvider {
"splunk.snapshot.profiler.sampling.interval";
private static final Duration DEFAULT_SNAPSHOT_PROFILER_SAMPLING_INTERVAL = Duration.ofMillis(10);

private static final String CONFIG_KEY_SNAPSHOT_PROFILER_EXPORT_INTERVAL =
"splunk.snapshot.profiler.export.interval";
private static final Duration DEFAULT_SNAPSHOT_PROFILER_EXPORT_INTERVAL = Duration.ofSeconds(5);

private static final String CONFIG_KEY_SNAPSHOT_PROFILER_STAGING_CAPACITY =
"splunk.snapshot.profiler.staging.capacity";
Copy link
Copy Markdown
Collaborator

@laurit laurit May 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sdk uses otel.bsp.max.export.batch.size for similar purpose

private static final int DEFAULT_SNAPSHOT_PROFILER_STAGING_CAPACITY = 2000;
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a reason for the chosen defaults. Happy to change them.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think 2000 is reasonable for start.


@Override
public void customize(AutoConfigurationCustomizer autoConfiguration) {
autoConfiguration.addPropertiesSupplier(this::defaultProperties);
Expand Down Expand Up @@ -228,4 +236,14 @@ public static Duration getSnapshotProfilerSamplingInterval(ConfigProperties prop
CONFIG_KEY_SNAPSHOT_PROFILER_SAMPLING_INTERVAL,
DEFAULT_SNAPSHOT_PROFILER_SAMPLING_INTERVAL);
}

public static Duration getSnapshotProfilerExportInterval(ConfigProperties properties) {
return properties.getDuration(
CONFIG_KEY_SNAPSHOT_PROFILER_EXPORT_INTERVAL, DEFAULT_SNAPSHOT_PROFILER_EXPORT_INTERVAL);
}

public static int getSnapshotProfilerStagingCapacity(ConfigProperties properties) {
return properties.getInt(
CONFIG_KEY_SNAPSHOT_PROFILER_STAGING_CAPACITY, DEFAULT_SNAPSHOT_PROFILER_STAGING_CAPACITY);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Copyright Splunk Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.splunk.opentelemetry.profiler.snapshot;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

class PeriodicallyExportingStagingArea implements StagingArea {
private static final String WORKER_THREAD_NAME =
PeriodicallyExportingStagingArea.class.getSimpleName() + "_WorkerThread";

private volatile boolean closed = false;

private final Worker worker;

PeriodicallyExportingStagingArea(
Supplier<StackTraceExporter> exporter, Duration delay, int capacity) {
worker = new Worker(exporter, delay, capacity);
worker.setName(WORKER_THREAD_NAME);
worker.setDaemon(true);
worker.start();
}

@Override
public void stage(StackTrace stackTrace) {
if (closed) {
return;
}
worker.add(stackTrace);
}

@Override
public void empty() {}

@Override
public void close() {
this.closed = true;

// Wait for the worker thread to exit. Note that this does not guarantee that the pending items
// are exported as we don't attempt to wait for the actual export to complete.
try {
worker.shutdown();
worker.join();
} catch (InterruptedException exception) {
Thread.currentThread().interrupt();
}
}

private static class Worker extends Thread {
// when shutting down we queue a fake stack trace to ensure that shutdown process starts
// immediately
private static final Object SHUTDOWN_MARKER = new Object();

private final BlockingQueue<Object> queue;
private final Supplier<StackTraceExporter> exporter;
private final Duration delay;
private final int maxExportBatchSize;

private volatile boolean shutdown = false;
private long nextExportTime;

private Worker(Supplier<StackTraceExporter> exporter, Duration delay, int maxExportBatchSize) {
this.exporter = exporter;
this.delay = delay;
this.maxExportBatchSize = maxExportBatchSize;
// set the queue size to 4x the batch size, in sdk batch processors both of these are
// configurable but by default queue size is also 4*batch size
this.queue = new ArrayBlockingQueue<>(maxExportBatchSize * 4);

updateNextExportTime();
}

void add(StackTrace stackTrace) {
// If queue is full drop the stack trace, not much we can do.
queue.offer(stackTrace);
}

@Override
public void run() {
List<StackTrace> stackTracesToExport = new ArrayList<>();
try {
// run until shutdown is called and all queued spans are passed to the exporter
while (!shutdown || !queue.isEmpty() || !stackTracesToExport.isEmpty()) {
Object stackTrace = queue.poll(nextExportTime - System.nanoTime(), TimeUnit.NANOSECONDS);
if (stackTrace != null && stackTrace != SHUTDOWN_MARKER) {
stackTracesToExport.add((StackTrace) stackTrace);
}
// trigger export when either next export time is reached, we have max batch size, or we
// are shutting down and have read all the queued stacks
if (System.nanoTime() >= nextExportTime
|| stackTracesToExport.size() >= maxExportBatchSize
|| (shutdown && queue.isEmpty())) {
exporter.get().export(stackTracesToExport);
stackTracesToExport = new ArrayList<>();
updateNextExportTime();
}
}
} catch (InterruptedException exception) {
Thread.currentThread().interrupt();
}
}

private void updateNextExportTime() {
nextExportTime = System.nanoTime() + delay.toNanos();
}

private void shutdown() throws InterruptedException {
shutdown = true;
// we don't care if the queue is full and offer fails, we only wish to ensure that there is
// something in the queue so that shutdown could start immediately
queue.offer(SHUTDOWN_MARKER);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ public void stop(SpanContext spanContext) {
if (spanContext.equals(sampler.getSpanContext())) {
sampler.shutdown();
waitForShutdown(sampler);
stagingArea.get().empty(spanContext.getTraceId());
return null;
}
return sampler;
Expand Down Expand Up @@ -153,7 +152,7 @@ public void run() {
String spanId = retrieveActiveSpan(thread).getSpanId();
StackTrace stackTrace =
StackTrace.from(Instant.now(), samplingPeriod, threadInfo, traceId, spanId);
stagingArea.get().stage(traceId, stackTrace);
stagingArea.get().stage(stackTrace);
} catch (Exception e) {
logger.log(Level.SEVERE, e, samplerErrorMessage(traceId, thread.getId()));
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,18 @@ public SnapshotProfilingSdkCustomizer() {
private static Function<ConfigProperties, StackTraceSampler> stackTraceSamplerProvider() {
return properties -> {
Duration samplingPeriod = Configuration.getSnapshotProfilerSamplingInterval(properties);
ConfigurableSupplier<StagingArea> supplier = StagingArea.SUPPLIER;
supplier.configure(new AccumulatingStagingArea(StackTraceExporter.SUPPLIER));
return new ScheduledExecutorStackTraceSampler(supplier, SpanTracker.SUPPLIER, samplingPeriod);
StagingArea.SUPPLIER.configure(createStagingArea(properties));
return new ScheduledExecutorStackTraceSampler(
StagingArea.SUPPLIER, SpanTracker.SUPPLIER, samplingPeriod);
};
}

private static StagingArea createStagingArea(ConfigProperties properties) {
Duration interval = Configuration.getSnapshotProfilerExportInterval(properties);
int capacity = Configuration.getSnapshotProfilerStagingCapacity(properties);
return new PeriodicallyExportingStagingArea(StackTraceExporter.SUPPLIER, interval, capacity);
}

@VisibleForTesting
SnapshotProfilingSdkCustomizer(
TraceRegistry registry, StackTraceSampler sampler, SpanTrackingActivator activator) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,16 @@ interface StagingArea extends Closeable {
StagingArea NOOP =
new StagingArea() {
@Override
public void stage(String traceId, StackTrace stackTrace) {}
public void stage(StackTrace stackTrace) {}

@Override
public void empty(String traceId) {}
public void empty() {}
};
ConfigurableSupplier<StagingArea> SUPPLIER = new ConfigurableSupplier<>(NOOP);

void stage(String traceId, StackTrace stackTrace);
void stage(StackTrace stackTrace);

void empty(String traceId);
void empty();

default void close() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -204,4 +204,37 @@ void getDefaultSnapshotProfilerSamplingInterval() {
assertEquals(
Duration.ofMillis(10), Configuration.getSnapshotProfilerSamplingInterval(properties));
}

@ParameterizedTest
@ValueSource(ints = {128, 512, 2056})
void getConfiguredSnapshotProfilerEmptyStagingInterval(int milliseconds) {
var properties =
DefaultConfigProperties.create(
Map.of("splunk.snapshot.profiler.export.interval", String.valueOf(milliseconds)));
assertEquals(
Duration.ofMillis(milliseconds),
Configuration.getSnapshotProfilerExportInterval(properties));
}

@Test
void getDefaultSnapshotProfilerEmptyStagingInterval() {
var properties = DefaultConfigProperties.create(Collections.emptyMap());
assertEquals(
Duration.ofSeconds(5), Configuration.getSnapshotProfilerExportInterval(properties));
}

@ParameterizedTest
@ValueSource(ints = {100, 1000, 10_000})
void getConfiguredSnapshotProfilerStagingCapacity(int value) {
var properties =
DefaultConfigProperties.create(
Map.of("splunk.snapshot.profiler.staging.capacity", String.valueOf(value)));
assertEquals(value, Configuration.getSnapshotProfilerStagingCapacity(properties));
}

@Test
void getDefaultSnapshotProfilerStagingCapacity() {
var properties = DefaultConfigProperties.create(Collections.emptyMap());
assertEquals(2000, Configuration.getSnapshotProfilerStagingCapacity(properties));
}
}
Loading