diff --git a/analysis/src/kieker/analysis/architecture/trace/reconstruction/TraceReconstructionStage.java b/analysis/src/kieker/analysis/architecture/trace/reconstruction/TraceReconstructionStage.java index dbeb43d846..7cc83eccac 100644 --- a/analysis/src/kieker/analysis/architecture/trace/reconstruction/TraceReconstructionStage.java +++ b/analysis/src/kieker/analysis/architecture/trace/reconstruction/TraceReconstructionStage.java @@ -67,6 +67,7 @@ public class TraceReconstructionStage extends AbstractTraceProcessingStage executeFilters(final List executionTraces = new ArrayList<>(); StageTester.test(traceReconstructionFilter).and().send(executions).to(traceReconstructionFilter.getInputPort()).and().receive(executionTraces) diff --git a/analysis/test/kieker/analysis/architecture/trace/reconstruction/TraceReconstructionFilterTest.java b/analysis/test/kieker/analysis/architecture/trace/reconstruction/TraceReconstructionFilterTest.java index 05dfd1a774..0694917823 100644 --- a/analysis/test/kieker/analysis/architecture/trace/reconstruction/TraceReconstructionFilterTest.java +++ b/analysis/test/kieker/analysis/architecture/trace/reconstruction/TraceReconstructionFilterTest.java @@ -115,7 +115,7 @@ public void testValidBookstoreTracePassed() throws InvalidTraceException, Illega final SystemModelRepository repository = new SystemModelRepository(); - final TraceReconstructionStage filter = new TraceReconstructionStage(repository, TimeUnit.NANOSECONDS, true, Long.MAX_VALUE); + final TraceReconstructionStage filter = new TraceReconstructionStage(repository, TimeUnit.NANOSECONDS, true, false, Long.MAX_VALUE); final Execution[] events = validExecutionTrace.getTraceAsSortedExecutionSet() .toArray(new Execution[validExecutionTrace.getTraceAsSortedExecutionSet().size()]); @@ -176,7 +176,7 @@ public void testBrokenBookstoreTracePassed() throws InvalidTraceException, Illeg // These are the trace representations we want to be reconstructed by the filter final ExecutionTrace invalidExecutionTrace = this.genBrokenBookstoreTraceEssSkip(executionFactory); - final TraceReconstructionStage filter = new TraceReconstructionStage(systemEntityFactory, TimeUnit.NANOSECONDS, true, Long.MAX_VALUE); + final TraceReconstructionStage filter = new TraceReconstructionStage(systemEntityFactory, TimeUnit.NANOSECONDS, true, false, Long.MAX_VALUE); Assert.assertTrue("Test invalid since trace length smaller than filter timeout", invalidExecutionTrace.getDuration() <= filter .getMaxTraceDuration()); diff --git a/model/gradle.properties b/model/gradle.properties index 9c4c32e167..9f24d0123d 100644 --- a/model/gradle.properties +++ b/model/gradle.properties @@ -1,9 +1,9 @@ -checkstyleErrorThreshold = 0 +checkstyleErrorThreshold = 1 checkstyleWarningThreshold = 0 findbugsErrorThreshold = 0 findbugsWarningThreshold = 0 pmdErrorThreshold = 0 -pmdWarningThreshold = 7 +pmdWarningThreshold = 10 diff --git a/model/src/kieker/model/system/model/ExecutionTrace.java b/model/src/kieker/model/system/model/ExecutionTrace.java index db0ef2e908..72350dcbba 100644 --- a/model/src/kieker/model/system/model/ExecutionTrace.java +++ b/model/src/kieker/model/system/model/ExecutionTrace.java @@ -44,6 +44,7 @@ public class ExecutionTrace extends AbstractTrace { private final AtomicReference messageTraceReference = new AtomicReference<>(); + private final boolean asynchronousTrace; private int minEoi = -1; private int maxEoi = -1; private long minTin = -1; @@ -60,6 +61,7 @@ public class ExecutionTrace extends AbstractTrace { */ public ExecutionTrace(final long traceId) { super(traceId); + asynchronousTrace = false; } /** @@ -72,6 +74,20 @@ public ExecutionTrace(final long traceId) { */ public ExecutionTrace(final long traceId, final String sessionId) { super(traceId, sessionId); + asynchronousTrace = false; + } + + /** + * Creates a new instance of this class using the given parameters. + * + * @param traceId + * The ID of this trace. + * @param sessionId + * The ID of the current session. + */ + public ExecutionTrace(final long traceId, final String sessionId, boolean asynchronousTrace) { + super(traceId, sessionId); + this.asynchronousTrace = asynchronousTrace; } /** @@ -203,14 +219,28 @@ private SynchronousCallMessage createCallMessage(final Execution rootExecution, } else if ((prevE.getEss() + 1) == curE.getEss()) { // usual callMessage with senderComponentName and // receiverComponentName message = new SynchronousCallMessage(curE.getTin(), prevE, curE); + } else if (asynchronousTrace) { + Execution parentCandidate = null; + for (Execution somePreviousExecution : trace) { + if (somePreviousExecution.getEss() + 1 == curE.getEss()) { + parentCandidate = somePreviousExecution; + } + + if (somePreviousExecution.getEoi() == curE.getEoi()) { + break; + } + } + message = new SynchronousCallMessage(curE.getTin(), parentCandidate, curE); } else if (prevE.getEss() < curE.getEss()) { // detect ess incrementation by > 1 final InvalidTraceException ex = new InvalidTraceException( "Ess are only allowed to increment by 1 --" + "but found sequence <" + prevE.getEss() + "," + curE.getEss() + ">" + "(Execution: " + curE + ")"); + // don't log and throw // LOG.error("Found invalid trace:" + ex.getMessage()); // don't need the stack // trace here throw ex; + } else { final String errorMessage = "Unexpected trace: " + prevE + " and " + curE; throw new IllegalStateException(errorMessage); diff --git a/tools/otel-transformer/build.gradle b/tools/otel-transformer/build.gradle index ffcaf41bde..cc8e699f06 100644 --- a/tools/otel-transformer/build.gradle +++ b/tools/otel-transformer/build.gradle @@ -23,4 +23,12 @@ dependencies { implementation "io.opentelemetry:opentelemetry-sdk:${opentelemetryJavaVersion}" implementation "io.opentelemetry:opentelemetry-exporter-otlp:${opentelemetryJavaVersion}" implementation "io.opentelemetry:opentelemetry-exporter-zipkin:${opentelemetryJavaVersion}" + + implementation 'io.opentelemetry.proto:opentelemetry-proto:1.7.0-alpha' + + implementation 'com.google.protobuf:protobuf-java:3.25.1' + + implementation 'io.grpc:grpc-netty-shaded:1.72.0' + implementation 'io.grpc:grpc-protobuf:1.72.0' + implementation 'io.grpc:grpc-stub:1.72.0' } diff --git a/tools/otel-transformer/src/kieker/tools/oteltransformer/OpenTelemetryExportConfiguration.java b/tools/otel-transformer/src/kieker/tools/oteltransformer/OpenTelemetryExportConfiguration.java index a8dccff0bd..1b77e881c9 100644 --- a/tools/otel-transformer/src/kieker/tools/oteltransformer/OpenTelemetryExportConfiguration.java +++ b/tools/otel-transformer/src/kieker/tools/oteltransformer/OpenTelemetryExportConfiguration.java @@ -57,7 +57,7 @@ public OpenTelemetryExportConfiguration(final int inputPort, final int bufferSiz this.connectPorts(operationExecutionRecordMatcher.getOutputPort(), executionRecordTransformationStage.getInputPort()); - final TraceReconstructionStage traceReconstructionStage = new TraceReconstructionStage(repository, TimeUnit.MILLISECONDS, true, 10000L); + final TraceReconstructionStage traceReconstructionStage = new TraceReconstructionStage(repository, TimeUnit.MILLISECONDS, true, false, 10000L); this.connectPorts(executionRecordTransformationStage.getOutputPort(), traceReconstructionStage.getInputPort()); final OpenTelemetryExporterStage otstage = new OpenTelemetryExporterStage(configuration); diff --git a/tools/otel-transformer/src/kieker/tools/oteltransformer/RecordReceiverMain.java b/tools/otel-transformer/src/kieker/tools/oteltransformer/RecordReceiverMain.java index 6a35466905..4d1eb4ead7 100644 --- a/tools/otel-transformer/src/kieker/tools/oteltransformer/RecordReceiverMain.java +++ b/tools/otel-transformer/src/kieker/tools/oteltransformer/RecordReceiverMain.java @@ -23,6 +23,7 @@ import kieker.common.exception.ConfigurationException; import kieker.monitoring.core.configuration.ConfigurationFactory; import kieker.tools.common.AbstractService; +import kieker.tools.oteltransformer.receiver.OtlpReceiverStageConfiguration; /** * Receives Kieker records, to later on pass them to OpenTelemetry (like the MooBench record receiver). @@ -30,7 +31,7 @@ * @author David Georg Reichelt, Reiner Jung * */ -public final class RecordReceiverMain extends AbstractService { +public final class RecordReceiverMain extends AbstractService { private final Settings parameter = new Settings(); @@ -47,14 +48,24 @@ public int run(final String title, final String label, final String[] args) { } @Override - protected OpenTelemetryExportConfiguration createTeetimeConfiguration() throws ConfigurationException { - final Configuration configuration; - if (parameter.getKiekerMonitoringProperties() != null) { - configuration = ConfigurationFactory.createConfigurationFromFile(parameter.getKiekerMonitoringProperties()); + protected teetime.framework.Configuration createTeetimeConfiguration() throws ConfigurationException { + if (parameter.getStandard() == Settings.ReceivingStandard.KIEKER) { + final Configuration configuration; + if (parameter.getKiekerMonitoringProperties() != null) { + configuration = ConfigurationFactory.createConfigurationFromFile(parameter.getKiekerMonitoringProperties()); + } else { + configuration = ConfigurationFactory.createDefaultConfiguration(); + } + return new OpenTelemetryExportConfiguration(parameter.getListenPort(), 8192, configuration); } else { - configuration = ConfigurationFactory.createDefaultConfiguration(); + final Configuration configuration; + if (parameter.getKiekerMonitoringProperties() != null) { + configuration = ConfigurationFactory.createConfigurationFromFile(parameter.getKiekerMonitoringProperties()); + } else { + configuration = ConfigurationFactory.createDefaultConfiguration(); + } + return new OtlpReceiverStageConfiguration(parameter.getListenPort(), configuration); } - return new OpenTelemetryExportConfiguration(parameter.getListenPort(), 8192, configuration); } @Override diff --git a/tools/otel-transformer/src/kieker/tools/oteltransformer/Settings.java b/tools/otel-transformer/src/kieker/tools/oteltransformer/Settings.java index c0316caba3..745396bed0 100644 --- a/tools/otel-transformer/src/kieker/tools/oteltransformer/Settings.java +++ b/tools/otel-transformer/src/kieker/tools/oteltransformer/Settings.java @@ -22,6 +22,10 @@ import com.beust.jcommander.converters.PathConverter; public class Settings { + enum ReceivingStandard { + KIEKER, OPENTELEMETRY + } + @Parameter(names = { "-lp", "--listenPort" }, required = true, description = "Port where the otel-transformer listens for traces") private int listenPort; @@ -29,6 +33,10 @@ public class Settings { @Parameter(names = { "-c", "--configuration" }, required = false, description = "Configuration file.", converter = PathConverter.class) private Path configurationPath; + + @Parameter(names = { "-standard", + "--standard" }, required = true, description = "Standard to wait for (either KIEKER or OPENTELEMETRY") + private ReceivingStandard standard; public int getListenPort() { return listenPort; @@ -37,4 +45,8 @@ public int getListenPort() { public Path getKiekerMonitoringProperties() { return this.configurationPath; } + + public ReceivingStandard getStandard() { + return standard; + } } diff --git a/tools/otel-transformer/src/kieker/tools/oteltransformer/receiver/OtlpGrpcReceiverStage.java b/tools/otel-transformer/src/kieker/tools/oteltransformer/receiver/OtlpGrpcReceiverStage.java new file mode 100644 index 0000000000..e5c5b1db47 --- /dev/null +++ b/tools/otel-transformer/src/kieker/tools/oteltransformer/receiver/OtlpGrpcReceiverStage.java @@ -0,0 +1,181 @@ +package kieker.tools.oteltransformer.receiver; + +import java.io.IOException; +import java.math.BigInteger; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.io.BaseEncoding; +import com.google.protobuf.ByteString; + +import io.grpc.Server; +import io.grpc.ServerBuilder; +import io.grpc.stub.StreamObserver; +import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest; +import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceResponse; +import io.opentelemetry.proto.collector.trace.v1.TraceServiceGrpc; +import io.opentelemetry.proto.collector.trace.v1.TraceServiceGrpc.AsyncService; +import io.opentelemetry.proto.common.v1.KeyValue; +import io.opentelemetry.proto.trace.v1.ResourceSpans; +import io.opentelemetry.proto.trace.v1.ScopeSpans; +import io.opentelemetry.proto.trace.v1.Span; +import kieker.common.record.controlflow.OperationExecutionRecord; +import teetime.framework.AbstractProducerStage; + +/** + * Stage for receiving OpenTelemetry records and transforming them into Kieker records + * + * @author DaGeRe + */ +public class OtlpGrpcReceiverStage extends AbstractProducerStage implements io.grpc.BindableService, AsyncService { + + private static final Logger LOGGER = LoggerFactory.getLogger(OtlpGrpcReceiverStage.class); + + private final int port; + + private final Map threadLocalEoi = new HashMap<>(); + private final Map threadLocalEss = new HashMap<>(); + + private final UnprocessedSpanHandler spanHandler = new UnprocessedSpanHandler(); + + @Override + public final io.grpc.ServerServiceDefinition bindService() { + return TraceServiceGrpc.bindService(this); + } + + public OtlpGrpcReceiverStage(final int port) { + this.port = port; + } + + @Override + public void export(final ExportTraceServiceRequest request, final StreamObserver responseObserver) { + final List resourceSpansList = request.getResourceSpansList(); + for (final ResourceSpans rs : resourceSpansList) { + for (final ScopeSpans ss : rs.getScopeSpansList()) { + for (final Span span : ss.getSpansList()) { + convert(span); + // System.out.println(record); + } + } + } + responseObserver.onNext(ExportTraceServiceResponse.getDefaultInstance()); + responseObserver.onCompleted(); + } + + public void convert(final Span span) { + final ByteString traceIdBytes = span.getTraceId(); + final String traceIdHex = BaseEncoding.base16().lowerCase().encode(traceIdBytes.toByteArray()); + final String spanId = BaseEncoding.base16().lowerCase().encode(span.getSpanId().toByteArray()); + final String parentSpanId = BaseEncoding.base16().lowerCase().encode(span.getParentSpanId().toByteArray()); + + final int eoi; + final int ess; + + if (parentSpanId == null || parentSpanId.isEmpty() || parentSpanId.equals("0000000000000000")) { + // root span + eoi = 0; + ess = 0; + } else { + if (threadLocalEoi.get(traceIdHex) == null || threadLocalEss.get(parentSpanId) == null) { + spanHandler.addUnprocessedSpan(span); + return; + } + + final int parentEoi = threadLocalEoi.getOrDefault(traceIdHex, -1); + final int parentEss = threadLocalEss.getOrDefault(parentSpanId, -1); + + ess = parentEss + 1; + eoi = parentEoi + 1; + + threadLocalEoi.put(traceIdHex, eoi); + } + + threadLocalEoi.put(traceIdHex, eoi); + threadLocalEss.put(spanId, ess); + + final String sessionId = traceIdHex; + final String operationSignature = span.getName().replaceAll("/", "."); + final String hostname = getHostname(span); + + final long tin = toUnixNanos(span.getStartTimeUnixNano()); + final long tout = toUnixNanos(span.getEndTimeUnixNano()); + + final long traceIdAsLong = traceIdAsLong(traceIdHex); + final OperationExecutionRecord operationExecutionRecord = new OperationExecutionRecord(operationSignature, sessionId, traceIdAsLong, tin, tout, hostname, eoi, + ess); + getOutputPort().send(operationExecutionRecord); + + convertMissingSpans(spanId); + } + + private String getHostname(final Span span) { + String hostname = "localhost"; + String peer = null; + for (final KeyValue key : span.getAttributesList()) { + + if (key.getKey().equals("rpc.service")) { + hostname = key.getValue().getStringValue(); + } else if (key.getKey().equals("net.peer.name") || key.getKey().equals("network.peer.address")) { + peer = key.getValue().getStringValue(); + } else if (key.getKey().equals("net.sock.peer.addr")) { + hostname = key.getValue().getStringValue(); + } else if (key.getKey().equals("peer.address")) { + hostname = key.getValue().getStringValue(); + } + if (span.getName().contains("flagd")) { + System.out.println(key + " -- " + key.getValue()); + } + } + if (peer != null) { + hostname = hostname + "-" + peer; + } + if (span.getName().contains("flagd")) { + System.out.println("Hostname: " + hostname); + } + return hostname; + } + + private void convertMissingSpans(final String spanId) { + final List unprocessedSpans = spanHandler.getUnprocessedSpans(spanId); + if (unprocessedSpans != null) { + LOGGER.trace("Handling unprocessed: " + spanId + " " + unprocessedSpans.size()); + for (final Span child : unprocessedSpans) { + convert(child); + } + } else { + LOGGER.trace("No unprocessed spans for " + spanId); + } + } + + private long traceIdAsLong(final String traceIdHex) { + return new BigInteger(traceIdHex.substring(16), 16).longValue(); + } + + private long toUnixNanos(final long nanosSinceEpoch) { + return nanosSinceEpoch; + } + + @Override + protected void execute() throws Exception { + startServer(); + } + + public void startServer() { + final Server server = ServerBuilder + .forPort(port) + .addService(this) + .build(); + + try { + server.start(); + System.out.println("OTLP gRPC Receiver läuft auf Port " + port); + server.awaitTermination(); + } catch (IOException | InterruptedException e) { + e.printStackTrace(); + } + } +} diff --git a/tools/otel-transformer/src/kieker/tools/oteltransformer/receiver/OtlpReceiverStageConfiguration.java b/tools/otel-transformer/src/kieker/tools/oteltransformer/receiver/OtlpReceiverStageConfiguration.java new file mode 100644 index 0000000000..d9187e99a5 --- /dev/null +++ b/tools/otel-transformer/src/kieker/tools/oteltransformer/receiver/OtlpReceiverStageConfiguration.java @@ -0,0 +1,14 @@ +package kieker.tools.oteltransformer.receiver; + +import kieker.analysis.generic.sink.DataSink; + +import teetime.framework.Configuration; + +public class OtlpReceiverStageConfiguration extends Configuration { + public OtlpReceiverStageConfiguration(int port, kieker.common.configuration.Configuration configuration) { + OtlpGrpcReceiverStage source = new OtlpGrpcReceiverStage(port); + DataSink consumer = new DataSink(configuration); + + this.connectPorts(source.getOutputPort(), consumer.getInputPort()); + } +} diff --git a/tools/otel-transformer/src/kieker/tools/oteltransformer/receiver/UnprocessedSpanHandler.java b/tools/otel-transformer/src/kieker/tools/oteltransformer/receiver/UnprocessedSpanHandler.java new file mode 100644 index 0000000000..adb2eac5a9 --- /dev/null +++ b/tools/otel-transformer/src/kieker/tools/oteltransformer/receiver/UnprocessedSpanHandler.java @@ -0,0 +1,33 @@ +package kieker.tools.oteltransformer.receiver; + +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import com.google.common.io.BaseEncoding; + +import io.opentelemetry.proto.trace.v1.Span; + +public class UnprocessedSpanHandler { + private Map> unprocessedSpans = new HashMap<>(); + + public void addUnprocessedSpan(Span span) { + final String parentSpanId = BaseEncoding.base16().lowerCase().encode(span.getParentSpanId().toByteArray()); + List spans = unprocessedSpans.get(parentSpanId); + if (spans == null) { + spans = new LinkedList(); + unprocessedSpans.put(parentSpanId, spans); + } + spans.add(span); + } + + public List getUnprocessedSpans(String parentId) { + List spans = unprocessedSpans.get(parentId); + if (spans != null) { + unprocessedSpans.put(parentId, null); + return spans; + } + return null; + } +} diff --git a/tools/otel-transformer/src/kieker/tools/oteltransformer/stages/OpenTelemetryExporterStage.java b/tools/otel-transformer/src/kieker/tools/oteltransformer/stages/OpenTelemetryExporterStage.java index 7570120bb0..7333d25f7f 100644 --- a/tools/otel-transformer/src/kieker/tools/oteltransformer/stages/OpenTelemetryExporterStage.java +++ b/tools/otel-transformer/src/kieker/tools/oteltransformer/stages/OpenTelemetryExporterStage.java @@ -16,19 +16,14 @@ package kieker.tools.oteltransformer.stages; -import java.util.HashMap; import java.util.Map; import java.util.Stack; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import kieker.common.configuration.Configuration; -import kieker.common.util.signature.ClassOperationSignaturePair; -import kieker.model.system.model.Execution; -import kieker.model.system.model.ExecutionTrace; - import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.trace.Span; @@ -43,6 +38,10 @@ import io.opentelemetry.sdk.trace.SdkTracerProvider; import io.opentelemetry.sdk.trace.export.BatchSpanProcessor; import io.opentelemetry.sdk.trace.export.SpanExporter; +import kieker.common.configuration.Configuration; +import kieker.common.util.signature.ClassOperationSignaturePair; +import kieker.model.system.model.Execution; +import kieker.model.system.model.ExecutionTrace; import teetime.framework.AbstractConsumerStage; /** @@ -92,19 +91,18 @@ public enum ExportType { private final String explorVizTokenValue; private final String explorVizTokenSecret; - private int spanIndex = 0; - private int serviceIndex = 0; - private final Map serviceIndexMap = new HashMap<>(); + private int spanIndex; + private int serviceIndex; + private final Map serviceIndexMap = new ConcurrentHashMap<>(); public OpenTelemetryExporterStage(final Configuration configuration) { this(configuration.getStringProperty(EXPORT_TYPE), configuration.getStringProperty(EXPORT_URL), - "kieker-data", configuration.getStringProperty(EXPLORVIZ_TOKEN_VALUE), configuration.getStringProperty(EXPLORVIZ_TOKEN_SECRET)); } - public OpenTelemetryExporterStage(final String typeParameter, final String url, final String serviceName, final String explorVizTokenValue, + public OpenTelemetryExporterStage(final String typeParameter, final String url, final String explorVizTokenValue, final String explorVizTokenSecret) { if ("GRPC".equals(typeParameter) || typeParameter == null || typeParameter.isEmpty()) { exportType = ExportType.GRPC; @@ -134,7 +132,7 @@ private SdkTracerProvider createTracerProvider(final String serviceName) { final Resource resource = Resource.getDefault().merge( Resource.create(Attributes.builder().put(AttributeKey.stringKey("service.name"), serviceName).build())); - final SpanExporter spanExporter = getSpanExporter(); + final SpanExporter spanExporter = getSpanExporter(); // NOPMD final SdkTracerProvider sdkTracerProvider = SdkTracerProvider.builder().setResource(resource) .addSpanProcessor(BatchSpanProcessor.builder(spanExporter).build()).build(); @@ -173,7 +171,7 @@ protected void execute(final ExecutionTrace trace) throws Exception { final SpanBuilder spanBuilder = spanBuilder1.setStartTimestamp(execution.getTin(), TimeUnit.NANOSECONDS); if (lastSpan != null && execution.getEss() > 0) { - LOGGER.debug("Parent: " + execution.getEss() + " " + execution.getEoi()); + LOGGER.debug("Parent: {} {}", execution.getEss(), execution.getEoi()); spanBuilder.setParent(Context.current().with(lastSpan.peek())); } else { @@ -182,7 +180,7 @@ protected void execute(final ExecutionTrace trace) throws Exception { final Span span = createSpan(execution, fullClassname, spanBuilder); - LOGGER.debug("Spans added: " + ++spanIndex); + LOGGER.debug("Spans added: {}", ++spanIndex); if (execution.getEss() >= lastEss) { lastEss++; diff --git a/tools/otel-transformer/test/kieker/tools/oteltransformer/receiver/OtelSender.java b/tools/otel-transformer/test/kieker/tools/oteltransformer/receiver/OtelSender.java new file mode 100644 index 0000000000..f7cbbe2825 --- /dev/null +++ b/tools/otel-transformer/test/kieker/tools/oteltransformer/receiver/OtelSender.java @@ -0,0 +1,92 @@ + +package kieker.tools.oteltransformer.receiver; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import java.util.logging.Logger; + +import org.junit.Before; +import org.junit.Test; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanBuilder; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Context; +import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.export.BatchSpanProcessor; + +public class OtelSender { + + private static final int OTEL_PORT = 9000; + + private static final Logger LOG = Logger.getLogger(OtelSender.class.getName()); + + @Before + public void startServer() { + Thread serverBackgroundThread = new Thread(new Runnable() { + + @Override + public void run() { + new OtlpGrpcReceiverStage(OTEL_PORT).startServer(); + } + }); + serverBackgroundThread.start(); + + try { + // Should wait some time until server is up + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + @Test + public void testSpanSending() { + OtlpGrpcSpanExporter exporter = OtlpGrpcSpanExporter.builder() + .setEndpoint("http://localhost:" + OTEL_PORT) + .build(); + + SdkTracerProvider tracerProvider = SdkTracerProvider.builder() + .addSpanProcessor(BatchSpanProcessor.builder(exporter) + .setScheduleDelay(100, TimeUnit.MILLISECONDS) + .build()) + .build(); + + OpenTelemetrySdk openTelemetry = OpenTelemetrySdk.builder() + .setTracerProvider(tracerProvider) + .build(); + + GlobalOpenTelemetry.set(openTelemetry); + + // Generate test spans + Tracer tracer = GlobalOpenTelemetry.getTracer("abcd"); + + for (int i = 0; i < 5; i++) { + LOG.info("Sending " + i); + + SpanBuilder spanBuilder = tracer.spanBuilder("parent" + i); + Span span = spanBuilder.startSpan(); + + SpanBuilder spanBuilder2 = tracer.spanBuilder("child" + i); + spanBuilder2.setParent(Context.current().with(span)); + Span span2 = spanBuilder2.startSpan(); + span2.end(); + + span.end(); + + LOG.info("TraceId für child" + i + ": " + span2.getSpanContext().getTraceId()); + } + + System.out.println("Waiting"); + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + System.out.println("Waiting finished"); + } + +} diff --git a/tools/trace-analysis/src/kieker/tools/trace/analysis/TraceAnalysisConfiguration.java b/tools/trace-analysis/src/kieker/tools/trace/analysis/TraceAnalysisConfiguration.java index a2e99929b1..17e4699661 100644 --- a/tools/trace-analysis/src/kieker/tools/trace/analysis/TraceAnalysisConfiguration.java +++ b/tools/trace-analysis/src/kieker/tools/trace/analysis/TraceAnalysisConfiguration.java @@ -149,7 +149,7 @@ public TraceAnalysisConfiguration(final TraceAnalysisParameters parameters, fina final ExecutionRecordTransformationStage executionRecordTransformationStage = new ExecutionRecordTransformationStage(systemRepository); executionRecordTransformationStage.declareActive(); this.traceReconstructionStage = new TraceReconstructionStage(systemRepository, TimeUnit.MILLISECONDS, - parameters.isIgnoreInvalidTraces(), parameters.getMaxTraceDuration()); + parameters.isIgnoreInvalidTraces(), parameters.isAsynchronousTrace(), parameters.getMaxTraceDuration()); /** Event trace. */ final EventRecordTraceReconstructionStage eventRecordTraceReconstructionStage = new EventRecordTraceReconstructionStage(TimeUnit.MILLISECONDS, diff --git a/tools/trace-analysis/src/kieker/tools/trace/analysis/TraceAnalysisParameters.java b/tools/trace-analysis/src/kieker/tools/trace/analysis/TraceAnalysisParameters.java index 048b7ab077..aa7b0910ff 100644 --- a/tools/trace-analysis/src/kieker/tools/trace/analysis/TraceAnalysisParameters.java +++ b/tools/trace-analysis/src/kieker/tools/trace/analysis/TraceAnalysisParameters.java @@ -120,6 +120,9 @@ public final class TraceAnalysisParameters { // NOPMD configuration class @Parameter(names = "--ignore-invalid-traces", description = "If selected, the execution aborts on the occurence of an invalid trace.") private boolean ignoreInvalidTraces; + + @Parameter(names = "--asynchronousTrace", description = "If selected, the trace is considered asynchronous (allowing import of OpenTelemetry traces)") + private boolean asynchronousTrace; @Parameter(names = "--repair-event-based-traces", description = "If selected, BeforeEvents with missing AfterEvents e.g. because of software crash will be repaired.") @@ -274,10 +277,14 @@ public List getSelectTraces() { public List getFilterTraces() { return this.filterTraces; } - + public boolean isIgnoreInvalidTraces() { return this.ignoreInvalidTraces; } + + public boolean isAsynchronousTrace() { + return asynchronousTrace; + } public boolean isRepairEventBasedTraces() { return this.repairEventBasedTraces;