diff --git a/profiler/src/main/java/com/splunk/opentelemetry/profiler/Configuration.java b/profiler/src/main/java/com/splunk/opentelemetry/profiler/Configuration.java index 8ce4f0fea..89a302ec0 100644 --- a/profiler/src/main/java/com/splunk/opentelemetry/profiler/Configuration.java +++ b/profiler/src/main/java/com/splunk/opentelemetry/profiler/Configuration.java @@ -81,6 +81,14 @@ public class Configuration implements AutoConfigurationCustomizerProvider { "splunk.snapshot.profiler.sampling.interval"; private static final Duration DEFAULT_SNAPSHOT_PROFILER_SAMPLING_INTERVAL = Duration.ofMillis(10); + private static final String CONFIG_KEY_SNAPSHOT_PROFILER_EXPORT_INTERVAL = + "splunk.snapshot.profiler.export.interval"; + private static final Duration DEFAULT_SNAPSHOT_PROFILER_EXPORT_INTERVAL = Duration.ofSeconds(5); + + private static final String CONFIG_KEY_SNAPSHOT_PROFILER_STAGING_CAPACITY = + "splunk.snapshot.profiler.staging.capacity"; + private static final int DEFAULT_SNAPSHOT_PROFILER_STAGING_CAPACITY = 2000; + @Override public void customize(AutoConfigurationCustomizer autoConfiguration) { autoConfiguration.addPropertiesSupplier(this::defaultProperties); @@ -228,4 +236,14 @@ public static Duration getSnapshotProfilerSamplingInterval(ConfigProperties prop CONFIG_KEY_SNAPSHOT_PROFILER_SAMPLING_INTERVAL, DEFAULT_SNAPSHOT_PROFILER_SAMPLING_INTERVAL); } + + public static Duration getSnapshotProfilerExportInterval(ConfigProperties properties) { + return properties.getDuration( + CONFIG_KEY_SNAPSHOT_PROFILER_EXPORT_INTERVAL, DEFAULT_SNAPSHOT_PROFILER_EXPORT_INTERVAL); + } + + public static int getSnapshotProfilerStagingCapacity(ConfigProperties properties) { + return properties.getInt( + CONFIG_KEY_SNAPSHOT_PROFILER_STAGING_CAPACITY, DEFAULT_SNAPSHOT_PROFILER_STAGING_CAPACITY); + } } diff --git a/profiler/src/main/java/com/splunk/opentelemetry/profiler/snapshot/AccumulatingStagingArea.java b/profiler/src/main/java/com/splunk/opentelemetry/profiler/snapshot/AccumulatingStagingArea.java deleted file mode 100644 index 3d840fcf2..000000000 --- a/profiler/src/main/java/com/splunk/opentelemetry/profiler/snapshot/AccumulatingStagingArea.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Copyright Splunk Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.splunk.opentelemetry.profiler.snapshot; - -import java.util.Queue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ConcurrentMap; -import java.util.function.Supplier; - -class AccumulatingStagingArea implements StagingArea { - private final ConcurrentMap> stackTraces = new ConcurrentHashMap<>(); - private final Supplier exporter; - private volatile boolean closed = false; - - AccumulatingStagingArea(Supplier exporter) { - this.exporter = exporter; - } - - @Override - public void stage(String traceId, StackTrace stackTrace) { - if (closed) { - return; - } - - stackTraces.compute( - traceId, - (id, stackTraces) -> { - if (stackTraces == null) { - stackTraces = new ConcurrentLinkedQueue<>(); - } - stackTraces.add(stackTrace); - return stackTraces; - }); - } - - @Override - public void empty(String traceId) { - Queue stackTraces = this.stackTraces.remove(traceId); - if (stackTraces != null) { - exporter.get().export(stackTraces); - } - } - - @Override - public void close() { - closed = true; - stackTraces.values().forEach(exporter.get()::export); - stackTraces.clear(); - } -} diff --git a/profiler/src/main/java/com/splunk/opentelemetry/profiler/snapshot/PeriodicallyExportingStagingArea.java b/profiler/src/main/java/com/splunk/opentelemetry/profiler/snapshot/PeriodicallyExportingStagingArea.java new file mode 100644 index 000000000..475fddcf6 --- /dev/null +++ b/profiler/src/main/java/com/splunk/opentelemetry/profiler/snapshot/PeriodicallyExportingStagingArea.java @@ -0,0 +1,133 @@ +/* + * Copyright Splunk Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.splunk.opentelemetry.profiler.snapshot; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +class PeriodicallyExportingStagingArea implements StagingArea { + private static final String WORKER_THREAD_NAME = + PeriodicallyExportingStagingArea.class.getSimpleName() + "_WorkerThread"; + + private volatile boolean closed = false; + + private final Worker worker; + + PeriodicallyExportingStagingArea( + Supplier exporter, Duration delay, int capacity) { + worker = new Worker(exporter, delay, capacity); + worker.setName(WORKER_THREAD_NAME); + worker.setDaemon(true); + worker.start(); + } + + @Override + public void stage(StackTrace stackTrace) { + if (closed) { + return; + } + worker.add(stackTrace); + } + + @Override + public void empty() {} + + @Override + public void close() { + this.closed = true; + + // Wait for the worker thread to exit. Note that this does not guarantee that the pending items + // are exported as we don't attempt to wait for the actual export to complete. + try { + worker.shutdown(); + worker.join(); + } catch (InterruptedException exception) { + Thread.currentThread().interrupt(); + } + } + + private static class Worker extends Thread { + // when shutting down we queue a fake stack trace to ensure that shutdown process starts + // immediately + private static final Object SHUTDOWN_MARKER = new Object(); + + private final BlockingQueue queue; + private final Supplier exporter; + private final Duration delay; + private final int maxExportBatchSize; + + private volatile boolean shutdown = false; + private long nextExportTime; + + private Worker(Supplier exporter, Duration delay, int maxExportBatchSize) { + this.exporter = exporter; + this.delay = delay; + this.maxExportBatchSize = maxExportBatchSize; + // set the queue size to 4x the batch size, in sdk batch processors both of these are + // configurable but by default queue size is also 4*batch size + this.queue = new ArrayBlockingQueue<>(maxExportBatchSize * 4); + + updateNextExportTime(); + } + + void add(StackTrace stackTrace) { + // If queue is full drop the stack trace, not much we can do. + queue.offer(stackTrace); + } + + @Override + public void run() { + List stackTracesToExport = new ArrayList<>(); + try { + // run until shutdown is called and all queued spans are passed to the exporter + while (!shutdown || !queue.isEmpty() || !stackTracesToExport.isEmpty()) { + Object stackTrace = queue.poll(nextExportTime - System.nanoTime(), TimeUnit.NANOSECONDS); + if (stackTrace != null && stackTrace != SHUTDOWN_MARKER) { + stackTracesToExport.add((StackTrace) stackTrace); + } + // trigger export when either next export time is reached, we have max batch size, or we + // are shutting down and have read all the queued stacks + if (System.nanoTime() >= nextExportTime + || stackTracesToExport.size() >= maxExportBatchSize + || (shutdown && queue.isEmpty())) { + exporter.get().export(stackTracesToExport); + stackTracesToExport = new ArrayList<>(); + updateNextExportTime(); + } + } + } catch (InterruptedException exception) { + Thread.currentThread().interrupt(); + } + } + + private void updateNextExportTime() { + nextExportTime = System.nanoTime() + delay.toNanos(); + } + + private void shutdown() throws InterruptedException { + shutdown = true; + // we don't care if the queue is full and offer fails, we only wish to ensure that there is + // something in the queue so that shutdown could start immediately + queue.offer(SHUTDOWN_MARKER); + } + } +} diff --git a/profiler/src/main/java/com/splunk/opentelemetry/profiler/snapshot/ScheduledExecutorStackTraceSampler.java b/profiler/src/main/java/com/splunk/opentelemetry/profiler/snapshot/ScheduledExecutorStackTraceSampler.java index 57e34ea70..626ac4e6c 100644 --- a/profiler/src/main/java/com/splunk/opentelemetry/profiler/snapshot/ScheduledExecutorStackTraceSampler.java +++ b/profiler/src/main/java/com/splunk/opentelemetry/profiler/snapshot/ScheduledExecutorStackTraceSampler.java @@ -70,7 +70,6 @@ public void stop(SpanContext spanContext) { if (spanContext.equals(sampler.getSpanContext())) { sampler.shutdown(); waitForShutdown(sampler); - stagingArea.get().empty(spanContext.getTraceId()); return null; } return sampler; @@ -153,7 +152,7 @@ public void run() { String spanId = retrieveActiveSpan(thread).getSpanId(); StackTrace stackTrace = StackTrace.from(Instant.now(), samplingPeriod, threadInfo, traceId, spanId); - stagingArea.get().stage(traceId, stackTrace); + stagingArea.get().stage(stackTrace); } catch (Exception e) { logger.log(Level.SEVERE, e, samplerErrorMessage(traceId, thread.getId())); } finally { diff --git a/profiler/src/main/java/com/splunk/opentelemetry/profiler/snapshot/SnapshotProfilingSdkCustomizer.java b/profiler/src/main/java/com/splunk/opentelemetry/profiler/snapshot/SnapshotProfilingSdkCustomizer.java index 7f7e43dab..bc7f51e2c 100644 --- a/profiler/src/main/java/com/splunk/opentelemetry/profiler/snapshot/SnapshotProfilingSdkCustomizer.java +++ b/profiler/src/main/java/com/splunk/opentelemetry/profiler/snapshot/SnapshotProfilingSdkCustomizer.java @@ -47,12 +47,18 @@ public SnapshotProfilingSdkCustomizer() { private static Function stackTraceSamplerProvider() { return properties -> { Duration samplingPeriod = Configuration.getSnapshotProfilerSamplingInterval(properties); - ConfigurableSupplier supplier = StagingArea.SUPPLIER; - supplier.configure(new AccumulatingStagingArea(StackTraceExporter.SUPPLIER)); - return new ScheduledExecutorStackTraceSampler(supplier, SpanTracker.SUPPLIER, samplingPeriod); + StagingArea.SUPPLIER.configure(createStagingArea(properties)); + return new ScheduledExecutorStackTraceSampler( + StagingArea.SUPPLIER, SpanTracker.SUPPLIER, samplingPeriod); }; } + private static StagingArea createStagingArea(ConfigProperties properties) { + Duration interval = Configuration.getSnapshotProfilerExportInterval(properties); + int capacity = Configuration.getSnapshotProfilerStagingCapacity(properties); + return new PeriodicallyExportingStagingArea(StackTraceExporter.SUPPLIER, interval, capacity); + } + @VisibleForTesting SnapshotProfilingSdkCustomizer( TraceRegistry registry, StackTraceSampler sampler, SpanTrackingActivator activator) { diff --git a/profiler/src/main/java/com/splunk/opentelemetry/profiler/snapshot/StagingArea.java b/profiler/src/main/java/com/splunk/opentelemetry/profiler/snapshot/StagingArea.java index 55e208f22..0ad968362 100644 --- a/profiler/src/main/java/com/splunk/opentelemetry/profiler/snapshot/StagingArea.java +++ b/profiler/src/main/java/com/splunk/opentelemetry/profiler/snapshot/StagingArea.java @@ -26,16 +26,16 @@ interface StagingArea extends Closeable { StagingArea NOOP = new StagingArea() { @Override - public void stage(String traceId, StackTrace stackTrace) {} + public void stage(StackTrace stackTrace) {} @Override - public void empty(String traceId) {} + public void empty() {} }; ConfigurableSupplier SUPPLIER = new ConfigurableSupplier<>(NOOP); - void stage(String traceId, StackTrace stackTrace); + void stage(StackTrace stackTrace); - void empty(String traceId); + void empty(); default void close() {} } diff --git a/profiler/src/test/java/com/splunk/opentelemetry/profiler/ConfigurationTest.java b/profiler/src/test/java/com/splunk/opentelemetry/profiler/ConfigurationTest.java index 86128354f..dac5ac8fa 100644 --- a/profiler/src/test/java/com/splunk/opentelemetry/profiler/ConfigurationTest.java +++ b/profiler/src/test/java/com/splunk/opentelemetry/profiler/ConfigurationTest.java @@ -204,4 +204,37 @@ void getDefaultSnapshotProfilerSamplingInterval() { assertEquals( Duration.ofMillis(10), Configuration.getSnapshotProfilerSamplingInterval(properties)); } + + @ParameterizedTest + @ValueSource(ints = {128, 512, 2056}) + void getConfiguredSnapshotProfilerEmptyStagingInterval(int milliseconds) { + var properties = + DefaultConfigProperties.create( + Map.of("splunk.snapshot.profiler.export.interval", String.valueOf(milliseconds))); + assertEquals( + Duration.ofMillis(milliseconds), + Configuration.getSnapshotProfilerExportInterval(properties)); + } + + @Test + void getDefaultSnapshotProfilerEmptyStagingInterval() { + var properties = DefaultConfigProperties.create(Collections.emptyMap()); + assertEquals( + Duration.ofSeconds(5), Configuration.getSnapshotProfilerExportInterval(properties)); + } + + @ParameterizedTest + @ValueSource(ints = {100, 1000, 10_000}) + void getConfiguredSnapshotProfilerStagingCapacity(int value) { + var properties = + DefaultConfigProperties.create( + Map.of("splunk.snapshot.profiler.staging.capacity", String.valueOf(value))); + assertEquals(value, Configuration.getSnapshotProfilerStagingCapacity(properties)); + } + + @Test + void getDefaultSnapshotProfilerStagingCapacity() { + var properties = DefaultConfigProperties.create(Collections.emptyMap()); + assertEquals(2000, Configuration.getSnapshotProfilerStagingCapacity(properties)); + } } diff --git a/profiler/src/test/java/com/splunk/opentelemetry/profiler/snapshot/AccumulatingStagingAreaTest.java b/profiler/src/test/java/com/splunk/opentelemetry/profiler/snapshot/AccumulatingStagingAreaTest.java deleted file mode 100644 index a24e4646e..000000000 --- a/profiler/src/test/java/com/splunk/opentelemetry/profiler/snapshot/AccumulatingStagingAreaTest.java +++ /dev/null @@ -1,137 +0,0 @@ -/* - * Copyright Splunk Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.splunk.opentelemetry.profiler.snapshot; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.jupiter.api.Assertions.assertEquals; - -import io.opentelemetry.sdk.trace.IdGenerator; -import java.util.Collections; -import java.util.List; -import org.junit.jupiter.api.Test; - -class AccumulatingStagingAreaTest { - private final InMemoryStackTraceExporter exporter = new InMemoryStackTraceExporter(); - private final AccumulatingStagingArea stagingArea = new AccumulatingStagingArea(() -> exporter); - - @Test - void exportStackTracesToLogExporter() { - var traceId = IdGenerator.random().generateTraceId(); - var stackTrace = Snapshotting.stackTrace().build(); - - stagingArea.stage(traceId, stackTrace); - stagingArea.empty(traceId); - - assertEquals(List.of(stackTrace), exporter.stackTraces()); - } - - @Test - void onlyExportStackTracesWhenAtLeastOneHasBeenStaged() { - var traceId = IdGenerator.random().generateTraceId(); - stagingArea.empty(traceId); - assertEquals(Collections.emptyList(), exporter.stackTraces()); - } - - @Test - void exportMultipleStackTracesToLogExporter() { - var traceId = IdGenerator.random().generateTraceId(); - var stackTrace1 = Snapshotting.stackTrace().withId(1).withName("one").build(); - var stackTrace2 = Snapshotting.stackTrace().withId(1).withName("two").build(); - - stagingArea.stage(traceId, stackTrace1); - stagingArea.stage(traceId, stackTrace2); - stagingArea.empty(traceId); - - assertEquals(List.of(stackTrace1, stackTrace2), exporter.stackTraces()); - } - - @Test - void exportStackTracesForOnlySpecifiedThread() { - var traceId1 = IdGenerator.random().generateTraceId(); - var traceId2 = IdGenerator.random().generateTraceId(); - var stackTrace1 = Snapshotting.stackTrace().withId(1).withName("one").build(); - var stackTrace2 = Snapshotting.stackTrace().withId(1).withName("two").build(); - - stagingArea.stage(traceId1, stackTrace1); - stagingArea.stage(traceId2, stackTrace2); - stagingArea.empty(traceId1); - - assertEquals(List.of(stackTrace1), exporter.stackTraces()); - } - - @Test - void exportStackTracesForMultipleThreads() { - var traceId1 = IdGenerator.random().generateTraceId(); - var traceId2 = IdGenerator.random().generateTraceId(); - var stackTrace1 = Snapshotting.stackTrace().withId(1).withName("one").build(); - var stackTrace2 = Snapshotting.stackTrace().withId(1).withName("two").build(); - - stagingArea.stage(traceId1, stackTrace1); - stagingArea.stage(traceId2, stackTrace2); - stagingArea.empty(traceId1); - stagingArea.empty(traceId2); - - assertEquals(List.of(stackTrace1, stackTrace2), exporter.stackTraces()); - } - - @Test - void stackTracesAreNotExportedMultipleTimes() { - var traceId = IdGenerator.random().generateTraceId(); - var stackTrace = Snapshotting.stackTrace().build(); - - stagingArea.stage(traceId, stackTrace); - stagingArea.empty(traceId); - stagingArea.empty(traceId); - - assertEquals(List.of(stackTrace), exporter.stackTraces()); - } - - @Test - void exportStackTracesWhenClosed() { - var stackTrace = Snapshotting.stackTrace().build(); - - stagingArea.stage(stackTrace.getTraceId(), stackTrace); - stagingArea.close(); - - assertEquals(List.of(stackTrace), exporter.stackTraces()); - } - - @Test - void exportStackTracesForAllTracesWhenClosed() { - var stackTrace1 = Snapshotting.stackTrace().withId(1).withName("one").build(); - var stackTrace2 = Snapshotting.stackTrace().withId(2).withName("one").build(); - var stackTrace3 = Snapshotting.stackTrace().withId(3).withName("one").build(); - - stagingArea.stage(stackTrace1.getTraceId(), stackTrace1); - stagingArea.stage(stackTrace2.getTraceId(), stackTrace2); - stagingArea.stage(stackTrace3.getTraceId(), stackTrace3); - stagingArea.close(); - - assertThat(exporter.stackTraces()).contains(stackTrace1, stackTrace2, stackTrace3); - } - - @Test - void doNotAcceptNewStackTracesWhenClosed() { - var stackTrace = Snapshotting.stackTrace().build(); - - stagingArea.close(); - stagingArea.stage(stackTrace.getTraceId(), stackTrace); - stagingArea.empty(stackTrace.getTraceId()); - - assertEquals(Collections.emptyList(), exporter.stackTraces()); - } -} diff --git a/profiler/src/test/java/com/splunk/opentelemetry/profiler/snapshot/InMemoryStackTraceExporter.java b/profiler/src/test/java/com/splunk/opentelemetry/profiler/snapshot/InMemoryStackTraceExporter.java index 05004c623..a57cdad83 100644 --- a/profiler/src/test/java/com/splunk/opentelemetry/profiler/snapshot/InMemoryStackTraceExporter.java +++ b/profiler/src/test/java/com/splunk/opentelemetry/profiler/snapshot/InMemoryStackTraceExporter.java @@ -16,17 +16,17 @@ package com.splunk.opentelemetry.profiler.snapshot; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; /** * In memory implementation of the {@link StackTraceExporter} interface that allows for direct * access to the exported {@link StackTrace}s. Intended for testing use only. */ class InMemoryStackTraceExporter implements StackTraceExporter { - private final List stackTraces = new ArrayList<>(); + private final List stackTraces = new CopyOnWriteArrayList<>(); @Override public void export(Collection stackTraces) { diff --git a/profiler/src/test/java/com/splunk/opentelemetry/profiler/snapshot/InMemoryStagingArea.java b/profiler/src/test/java/com/splunk/opentelemetry/profiler/snapshot/InMemoryStagingArea.java index c56a33db5..59f485153 100644 --- a/profiler/src/test/java/com/splunk/opentelemetry/profiler/snapshot/InMemoryStagingArea.java +++ b/profiler/src/test/java/com/splunk/opentelemetry/profiler/snapshot/InMemoryStagingArea.java @@ -16,85 +16,31 @@ package com.splunk.opentelemetry.profiler.snapshot; -import io.opentelemetry.api.trace.SpanContext; -import java.util.AbstractList; -import java.util.ArrayList; +import java.util.Collections; import java.util.List; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.stream.Collectors; +import java.util.concurrent.CopyOnWriteArrayList; /** * In memory implementation of the {@link StagingArea} interface that allows for direct access to * the stockpiled {@link StackTrace}s. Intended for testing use only. */ class InMemoryStagingArea implements StagingArea { - private final ConcurrentMap stackTraces = new ConcurrentHashMap<>(); + private final List stackTraces = new CopyOnWriteArrayList<>(); @Override - public void stage(String traceId, StackTrace stackTrace) { - stackTraces.compute( - traceId, - (id, stackTraces) -> { - if (stackTraces == null) { - stackTraces = new StagedStackTraces(); - } - if (!stackTraces.emptied()) { - stackTraces.add(stackTrace); - } - return stackTraces; - }); + public void stage(StackTrace stackTrace) { + stackTraces.add(stackTrace); } - @Override - public void empty(String traceId) { - stackTraces.getOrDefault(traceId, new StagedStackTraces()).empty(); - } - - StagedStackTraces getStackTraces(SpanContext spanContext) { - return stackTraces.getOrDefault(spanContext.getTraceId(), new StagedStackTraces()); + public void empty() { + stackTraces.clear(); } - boolean hasStackTraces(SpanContext spanContext) { - return !stackTraces.get(spanContext.getTraceId()).isEmpty(); + boolean hasStackTraces() { + return !stackTraces.isEmpty(); } - public List allStackTraces() { - return stackTraces.values().stream() - .map(StagedStackTraces::stackTraces) - .flatMap(List::stream) - .collect(Collectors.toList()); - } - - static class StagedStackTraces extends AbstractList { - private final List stackTraces = new ArrayList<>(); - private boolean emptied = false; - - List stackTraces() { - return stackTraces; - } - - @Override - public int size() { - return stackTraces.size(); - } - - @Override - public boolean add(StackTrace stackTrace) { - return stackTraces.add(stackTrace); - } - - @Override - public StackTrace get(int index) { - return stackTraces.get(index); - } - - boolean emptied() { - return emptied; - } - - private void empty() { - emptied = true; - } + List allStackTraces() { + return Collections.unmodifiableList(stackTraces); } } diff --git a/profiler/src/test/java/com/splunk/opentelemetry/profiler/snapshot/PeriodicallyExportingStagingAreaTest.java b/profiler/src/test/java/com/splunk/opentelemetry/profiler/snapshot/PeriodicallyExportingStagingAreaTest.java new file mode 100644 index 000000000..d8b8e0859 --- /dev/null +++ b/profiler/src/test/java/com/splunk/opentelemetry/profiler/snapshot/PeriodicallyExportingStagingAreaTest.java @@ -0,0 +1,206 @@ +/* + * Copyright Splunk Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.splunk.opentelemetry.profiler.snapshot; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.jupiter.api.Test; + +class PeriodicallyExportingStagingAreaTest { + private final InMemoryStackTraceExporter exporter = new InMemoryStackTraceExporter(); + private final Duration emptyDuration = Duration.ofMillis(50); + + @Test + void automaticallyExportStackTraces() { + var stackTrace = Snapshotting.stackTrace().build(); + try (var stagingArea = + new PeriodicallyExportingStagingArea(() -> exporter, emptyDuration, 10)) { + stagingArea.stage(stackTrace); + await().untilAsserted(() -> assertThat(exporter.stackTraces()).contains(stackTrace)); + } + } + + @Test + void continuallyExportingStackTracesPeriodically() { + var stackTrace1 = Snapshotting.stackTrace().build(); + var stackTrace2 = Snapshotting.stackTrace().build(); + var stackTrace3 = Snapshotting.stackTrace().build(); + + try (var stagingArea = + new PeriodicallyExportingStagingArea(() -> exporter, emptyDuration, 10)) { + stagingArea.stage(stackTrace1); + await().untilAsserted(() -> assertThat(exporter.stackTraces()).contains(stackTrace1)); + + stagingArea.stage(stackTrace2); + stagingArea.stage(stackTrace3); + await() + .untilAsserted( + () -> assertThat(exporter.stackTraces()).contains(stackTrace2, stackTrace3)); + } + } + + @Test + void exportMultipleStackTracesToLogExporter() { + var stackTrace1 = Snapshotting.stackTrace().build(); + var stackTrace2 = Snapshotting.stackTrace().build(); + + try (var stagingArea = + new PeriodicallyExportingStagingArea(() -> exporter, emptyDuration, 10)) { + stagingArea.stage(stackTrace1); + stagingArea.stage(stackTrace2); + + await() + .untilAsserted( + () -> assertThat(exporter.stackTraces()).contains(stackTrace1, stackTrace2)); + } + } + + @Test + void stackTracesAreNotExportedMultipleTimes() { + var stackTrace = Snapshotting.stackTrace().build(); + + try (var stagingArea = + new PeriodicallyExportingStagingArea(() -> exporter, emptyDuration, 10)) { + stagingArea.stage(stackTrace); + await().until(() -> !exporter.stackTraces().isEmpty()); + + assertEquals(List.of(stackTrace), exporter.stackTraces()); + } + } + + @Test + void stackTracesAreExportedImmediatelyUponShutdown() { + var stackTrace = Snapshotting.stackTrace().build(); + + var stagingArea = new PeriodicallyExportingStagingArea(() -> exporter, Duration.ofDays(1), 10); + stagingArea.stage(stackTrace); + stagingArea.close(); + + await().untilAsserted(() -> assertEquals(List.of(stackTrace), exporter.stackTraces())); + } + + @Test + void doNotAcceptStackTracesAfterShutdown() { + var stackTrace = Snapshotting.stackTrace().build(); + + var stagingArea = new PeriodicallyExportingStagingArea(() -> exporter, emptyDuration, 10); + stagingArea.close(); + stagingArea.stage(stackTrace); + + assertEquals(Collections.emptyList(), exporter.stackTraces()); + } + + @Test + void exportStackTracesWhenCapacityReached() { + try (var stagingArea = + new PeriodicallyExportingStagingArea(() -> exporter, Duration.ofDays(1), 2)) { + stagingArea.stage(Snapshotting.stackTrace().build()); + stagingArea.stage(Snapshotting.stackTrace().build()); + + await().untilAsserted(() -> assertEquals(2, exporter.stackTraces().size())); + } + } + + @Test + void exportStackTracesOnNormalScheduleEvenAfterCapacityReached() { + try (var stagingArea = new PeriodicallyExportingStagingArea(() -> exporter, emptyDuration, 2)) { + stagingArea.stage(Snapshotting.stackTrace().build()); + stagingArea.stage(Snapshotting.stackTrace().build()); + stagingArea.stage(Snapshotting.stackTrace().build()); + + await().untilAsserted(() -> assertEquals(3, exporter.stackTraces().size())); + } + } + + @Test + void doNotExportStackTraceMultipleTimes() { + var stackTrace1 = Snapshotting.stackTrace().build(); + var stackTrace2 = Snapshotting.stackTrace().build(); + + var startLatch = new CountDownLatch(1); + + var executor = Executors.newFixedThreadPool(2); + try (var stagingArea = + new PeriodicallyExportingStagingArea(() -> exporter, Duration.ofDays(1), 1)) { + executor.submit(stage(stagingArea, startLatch, stackTrace1)); + executor.submit(stage(stagingArea, startLatch, stackTrace2)); + startLatch.countDown(); + + await().until(() -> !exporter.stackTraces().isEmpty()); + assertThat(exporter.stackTraces()).containsOnlyOnce(stackTrace1, stackTrace2); + } finally { + executor.shutdown(); + } + } + + @Test + void multipleThreadsStagingStackTracesWhenCapacityReachedDoesNotCauseMultipleExports() { + var stackTrace1 = Snapshotting.stackTrace().build(); + var stackTrace2 = Snapshotting.stackTrace().build(); + var stackTrace3 = Snapshotting.stackTrace().build(); + var stackTrace4 = Snapshotting.stackTrace().build(); + + var startLatch = new CountDownLatch(1); + + var executor = Executors.newFixedThreadPool(2); + try (var exporter = new CallCountingStackTraceExporter(); + var stagingArea = + new PeriodicallyExportingStagingArea(() -> exporter, Duration.ofDays(1), 3)) { + executor.submit(stage(stagingArea, startLatch, stackTrace1, stackTrace2)); + executor.submit(stage(stagingArea, startLatch, stackTrace3, stackTrace4)); + startLatch.countDown(); + + await().until(() -> !exporter.stackTraces().isEmpty()); + assertEquals(1, exporter.timesCalled.get()); + } finally { + executor.shutdown(); + } + } + + private Runnable stage( + StagingArea stagingArea, CountDownLatch startLatch, StackTrace... stackTraces) { + return () -> { + try { + startLatch.await(); + Arrays.stream(stackTraces).forEach(stagingArea::stage); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + }; + } + + private static class CallCountingStackTraceExporter extends InMemoryStackTraceExporter { + private final AtomicInteger timesCalled = new AtomicInteger(); + + @Override + public void export(Collection stackTraces) { + timesCalled.incrementAndGet(); + super.export(stackTraces); + } + } +} diff --git a/profiler/src/test/java/com/splunk/opentelemetry/profiler/snapshot/ScheduledExecutorStackTraceSamplerTest.java b/profiler/src/test/java/com/splunk/opentelemetry/profiler/snapshot/ScheduledExecutorStackTraceSamplerTest.java index db58a446f..d2fb1c5a4 100644 --- a/profiler/src/test/java/com/splunk/opentelemetry/profiler/snapshot/ScheduledExecutorStackTraceSamplerTest.java +++ b/profiler/src/test/java/com/splunk/opentelemetry/profiler/snapshot/ScheduledExecutorStackTraceSamplerTest.java @@ -47,7 +47,7 @@ void takeStackTraceSampleForGivenThread() { try { sampler.start(spanContext); - await().until(() -> staging.hasStackTraces(spanContext)); + await().until(staging::hasStackTraces); } finally { sampler.stop(spanContext); } @@ -61,28 +61,12 @@ void continuallySampleThreadForStackTraces() { try { sampler.start(spanContext); - await().until(() -> staging.getStackTraces(spanContext).size() >= expectedSamples); + await().until(() -> staging.allStackTraces().size() >= expectedSamples); } finally { sampler.stop(spanContext); } } - @Test - void emptyStagingAreaAfterSamplingStops() { - var halfSecond = Duration.ofMillis(500); - var spanContext = Snapshotting.spanContext().build(); - int expectedSamples = (int) halfSecond.dividedBy(SAMPLING_PERIOD.multipliedBy(2)); - - try { - sampler.start(spanContext); - await().until(() -> staging.getStackTraces(spanContext).size() >= expectedSamples); - } finally { - sampler.stop(spanContext); - } - - assertThat(staging.getStackTraces(spanContext).emptied()).isTrue(); - } - @Test void onlyTakeStackTraceSamplesForOneThreadPerTrace() { var executor = Executors.newFixedThreadPool(2); @@ -162,7 +146,7 @@ void includeTimestampOnStackTraces() { try { sampler.start(spanContext); - await().until(() -> staging.hasStackTraces(spanContext)); + await().until(staging::hasStackTraces); var stackTrace = staging.allStackTraces().stream().findFirst().orElseThrow(); assertThat(stackTrace.getTimestamp()).isNotNull().isAfter(now); @@ -177,7 +161,7 @@ void includeSamplingPeriodOnFirstRecordedStackTraces() { try { sampler.start(spanContext); - await().until(() -> staging.hasStackTraces(spanContext)); + await().until(staging::hasStackTraces); var stackTrace = staging.allStackTraces().stream().findFirst().orElseThrow(); assertThat(stackTrace.getDuration()).isNotNull().isGreaterThan(Duration.ZERO); @@ -192,10 +176,9 @@ void calculateSamplingPeriodAfterFirstRecordedStackTraces() { try { sampler.start(spanContext); - await().until(() -> staging.getStackTraces(spanContext).size() > 1); + await().until(() -> staging.allStackTraces().size() > 1); - var stackTrace = - staging.getStackTraces(spanContext).stream().skip(1).findFirst().orElseThrow(); + var stackTrace = staging.allStackTraces().stream().skip(1).findFirst().orElseThrow(); assertThat(stackTrace.getDuration()) .isNotNull() .isCloseTo(SAMPLING_PERIOD, Duration.ofMillis(5)); @@ -214,7 +197,7 @@ void includeThreadDetailsOnStackTraces() throws Exception { var future = executor.submit(startSampling(spanContext, startLatch, stopLatch)); startLatch.countDown(); - await().until(() -> staging.hasStackTraces(spanContext)); + await().until(staging::hasStackTraces); stopLatch.countDown(); var thread = future.get(); @@ -236,7 +219,7 @@ void includeTraceIdOnStackTraces() { try { sampler.start(spanContext); - await().until(() -> staging.hasStackTraces(spanContext)); + await().until(staging::hasStackTraces); var stackTrace = staging.allStackTraces().stream().findFirst().orElseThrow(); assertEquals(spanContext.getTraceId(), stackTrace.getTraceId()); @@ -252,7 +235,7 @@ void includeActiveSpanIdOnStackTraces() { try { sampler.start(spanContext); - await().until(() -> staging.hasStackTraces(spanContext)); + await().until(staging::hasStackTraces); var stackTrace = staging.allStackTraces().stream().findFirst().orElseThrow(); assertEquals(spanContext.getSpanId(), stackTrace.getSpanId()); @@ -272,7 +255,7 @@ void takeFinalSampleWhenTraceSamplingIsStopped() { scheduler.submit(startSampling(spanContext, startLatch, stopLatch)); scheduler.schedule( () -> sampler.stop(spanContext), expectedDuration.toMillis(), TimeUnit.MILLISECONDS); - await().until(() -> staging.hasStackTraces(spanContext)); + await().until(staging::hasStackTraces); stopLatch.countDown(); var stackTraces = staging.allStackTraces(); @@ -293,10 +276,10 @@ void finalSampleDurationIsLessSmallerThanSamplingPeriod() { scheduler.submit(startSampling(spanContext, startLatch, stopLatch)); scheduler.schedule( () -> sampler.stop(spanContext), expectedDuration.toMillis(), TimeUnit.MILLISECONDS); - await().until(() -> staging.hasStackTraces(spanContext)); + await().until(staging::hasStackTraces); stopLatch.countDown(); - var stackTraces = staging.getStackTraces(spanContext); + var stackTraces = staging.allStackTraces(); var lastStackTrace = stackTraces.get(stackTraces.size() - 1); assertThat(lastStackTrace.getDuration()).isLessThan(SAMPLING_PERIOD); } finally { @@ -341,15 +324,14 @@ void stopSamplingWhenClosed() throws Exception { await().until(() -> !staging.allStackTraces().isEmpty()); sampler.close(); - int previouslyStagedStackTraces = reportStackTracesStaged().call(); - staging.empty(spanContext.getTraceId()); + staging.empty(); var scheduler = Executors.newSingleThreadScheduledExecutor(); try { var future = scheduler.schedule( reportStackTracesStaged(), SAMPLING_PERIOD.toMillis() * 10, TimeUnit.MILLISECONDS); - assertEquals(previouslyStagedStackTraces, future.get()); + assertEquals(0, future.get()); } finally { scheduler.shutdownNow(); } @@ -369,15 +351,12 @@ void stopSamplingForEveryTraceWhenClosed() throws Exception { startSpanLatch.countDown(); await().until(() -> staging.allStackTraces().size() > 5); sampler.close(); - - int previouslyStagedStackTraces = reportStackTracesStaged().call(); - staging.empty(spanContext1.getTraceId()); - staging.empty(spanContext2.getTraceId()); + staging.empty(); var future = scheduler.schedule( reportStackTracesStaged(), SAMPLING_PERIOD.toMillis() * 10, TimeUnit.MILLISECONDS); - assertEquals(previouslyStagedStackTraces, future.get()); + assertEquals(0, future.get()); } finally { executor.shutdownNow(); scheduler.shutdownNow(); diff --git a/profiler/src/test/java/com/splunk/opentelemetry/profiler/snapshot/SdkShutdownHookTest.java b/profiler/src/test/java/com/splunk/opentelemetry/profiler/snapshot/SdkShutdownHookTest.java index 2050b5b04..552c85500 100644 --- a/profiler/src/test/java/com/splunk/opentelemetry/profiler/snapshot/SdkShutdownHookTest.java +++ b/profiler/src/test/java/com/splunk/opentelemetry/profiler/snapshot/SdkShutdownHookTest.java @@ -75,10 +75,10 @@ public void start(SpanContext spanContext) {} public void stop(SpanContext spanContext) {} @Override - public void stage(String traceId, StackTrace stackTrace) {} + public void stage(StackTrace stackTrace) {} @Override - public void empty(String traceId) {} + public void empty() {} @Override public void export(Collection stackTraces) {} diff --git a/profiler/src/test/java/com/splunk/opentelemetry/profiler/snapshot/SnapshotProfilingSdkCustomizerBuilder.java b/profiler/src/test/java/com/splunk/opentelemetry/profiler/snapshot/SnapshotProfilingSdkCustomizerBuilder.java index 529d6190b..a9479a96a 100644 --- a/profiler/src/test/java/com/splunk/opentelemetry/profiler/snapshot/SnapshotProfilingSdkCustomizerBuilder.java +++ b/profiler/src/test/java/com/splunk/opentelemetry/profiler/snapshot/SnapshotProfilingSdkCustomizerBuilder.java @@ -33,11 +33,13 @@ SnapshotProfilingSdkCustomizer real() { } SnapshotProfilingSdkCustomizerBuilder withRealStackTraceSampler() { - var supplier = StagingArea.SUPPLIER; - supplier.configure(new AccumulatingStagingArea(StackTraceExporter.SUPPLIER)); + var stagingAreaSupplier = StagingArea.SUPPLIER; + stagingAreaSupplier.configure( + new PeriodicallyExportingStagingArea( + StackTraceExporter.SUPPLIER, Duration.ofMillis(200), 10)); return with( new ScheduledExecutorStackTraceSampler( - supplier, SpanTracker.SUPPLIER, Duration.ofMillis(20))); + stagingAreaSupplier, SpanTracker.SUPPLIER, Duration.ofMillis(20))); } SnapshotProfilingSdkCustomizerBuilder with(StackTraceSampler sampler) {