Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public class TraceReconstructionStage extends AbstractTraceProcessingStage<Execu
private volatile long maxTout = -1;
private volatile boolean terminated;
private final boolean ignoreInvalidTraces; // false
private final boolean asynchronousTrace;
private final long maxTraceDurationMillis;

private boolean traceProcessingErrorOccured; // false
Expand Down Expand Up @@ -108,13 +109,14 @@ public int compare(final ExecutionTrace t1, final ExecutionTrace t2) {
* Long.MAX_VALUE
*/
public TraceReconstructionStage(final SystemModelRepository repository, final TimeUnit timeunit,
final boolean ignoreInvalidTraces, final Long maxTraceDuration) {
final boolean ignoreInvalidTraces, final boolean asynchronousTrace, final Long maxTraceDuration) {
super(repository);

this.timeunit = timeunit;
this.maxTraceDurationMillis = this.timeunit.convert(maxTraceDuration == null ? Long.MAX_VALUE : maxTraceDuration, // NOCS
timeunit);
this.ignoreInvalidTraces = ignoreInvalidTraces;
this.asynchronousTrace = asynchronousTrace;

if (this.maxTraceDurationMillis < 0) {
throw new IllegalArgumentException(
Expand Down Expand Up @@ -170,7 +172,7 @@ protected void execute(final Execution execution) throws Exception {
this.reportError(traceId);
}
} else { // create and add new trace
executionTrace = new ExecutionTrace(traceId, execution.getSessionId());
executionTrace = new ExecutionTrace(traceId, execution.getSessionId(), asynchronousTrace);
this.pendingTraces.put(traceId, executionTrace);
}
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ private List<ExecutionTraceBasedSession> executeFilters(final List<OperationExec
StageTester.test(executionsRecordTransformationFilter).and().send(records).to(executionsRecordTransformationFilter.getInputPort()).and().receive(executions)
.from(executionsRecordTransformationFilter.getOutputPort()).start();

final TraceReconstructionStage traceReconstructionFilter = new TraceReconstructionStage(repository, TimeUnit.MILLISECONDS, false, Long.MAX_VALUE);
final TraceReconstructionStage traceReconstructionFilter = new TraceReconstructionStage(repository, TimeUnit.MILLISECONDS, false, false, Long.MAX_VALUE);

final List<ExecutionTrace> executionTraces = new ArrayList<>();
StageTester.test(traceReconstructionFilter).and().send(executions).to(traceReconstructionFilter.getInputPort()).and().receive(executionTraces)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public void testValidBookstoreTracePassed() throws InvalidTraceException, Illega

final SystemModelRepository repository = new SystemModelRepository();

final TraceReconstructionStage filter = new TraceReconstructionStage(repository, TimeUnit.NANOSECONDS, true, Long.MAX_VALUE);
final TraceReconstructionStage filter = new TraceReconstructionStage(repository, TimeUnit.NANOSECONDS, true, false, Long.MAX_VALUE);

final Execution[] events = validExecutionTrace.getTraceAsSortedExecutionSet()
.toArray(new Execution[validExecutionTrace.getTraceAsSortedExecutionSet().size()]);
Expand Down Expand Up @@ -176,7 +176,7 @@ public void testBrokenBookstoreTracePassed() throws InvalidTraceException, Illeg
// These are the trace representations we want to be reconstructed by the filter
final ExecutionTrace invalidExecutionTrace = this.genBrokenBookstoreTraceEssSkip(executionFactory);

final TraceReconstructionStage filter = new TraceReconstructionStage(systemEntityFactory, TimeUnit.NANOSECONDS, true, Long.MAX_VALUE);
final TraceReconstructionStage filter = new TraceReconstructionStage(systemEntityFactory, TimeUnit.NANOSECONDS, true, false, Long.MAX_VALUE);
Assert.assertTrue("Test invalid since trace length smaller than filter timeout", invalidExecutionTrace.getDuration() <= filter
.getMaxTraceDuration());

Expand Down
4 changes: 2 additions & 2 deletions model/gradle.properties
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
checkstyleErrorThreshold = 0
checkstyleErrorThreshold = 1
checkstyleWarningThreshold = 0

findbugsErrorThreshold = 0
findbugsWarningThreshold = 0

pmdErrorThreshold = 0
pmdWarningThreshold = 7
pmdWarningThreshold = 10

30 changes: 30 additions & 0 deletions model/src/kieker/model/system/model/ExecutionTrace.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
public class ExecutionTrace extends AbstractTrace {

private final AtomicReference<MessageTrace> messageTraceReference = new AtomicReference<>();
private final boolean asynchronousTrace;
private int minEoi = -1;
private int maxEoi = -1;
private long minTin = -1;
Expand All @@ -60,6 +61,7 @@ public class ExecutionTrace extends AbstractTrace {
*/
public ExecutionTrace(final long traceId) {
super(traceId);
asynchronousTrace = false;
}

/**
Expand All @@ -72,6 +74,20 @@ public ExecutionTrace(final long traceId) {
*/
public ExecutionTrace(final long traceId, final String sessionId) {
super(traceId, sessionId);
asynchronousTrace = false;
}

/**
* Creates a new instance of this class using the given parameters.
*
* @param traceId
* The ID of this trace.
* @param sessionId
* The ID of the current session.
*/
public ExecutionTrace(final long traceId, final String sessionId, boolean asynchronousTrace) {
super(traceId, sessionId);
this.asynchronousTrace = asynchronousTrace;
}

/**
Expand Down Expand Up @@ -203,14 +219,28 @@ private SynchronousCallMessage createCallMessage(final Execution rootExecution,
} else if ((prevE.getEss() + 1) == curE.getEss()) { // usual callMessage with senderComponentName and
// receiverComponentName
message = new SynchronousCallMessage(curE.getTin(), prevE, curE);
} else if (asynchronousTrace) {
Execution parentCandidate = null;
for (Execution somePreviousExecution : trace) {
if (somePreviousExecution.getEss() + 1 == curE.getEss()) {
parentCandidate = somePreviousExecution;
}

if (somePreviousExecution.getEoi() == curE.getEoi()) {
break;
}
}
message = new SynchronousCallMessage(curE.getTin(), parentCandidate, curE);
} else if (prevE.getEss() < curE.getEss()) { // detect ess incrementation by > 1
final InvalidTraceException ex = new InvalidTraceException(
"Ess are only allowed to increment by 1 --" + "but found sequence <" + prevE.getEss() + ","
+ curE.getEss() + ">" + "(Execution: " + curE + ")");

// don't log and throw
// LOG.error("Found invalid trace:" + ex.getMessage()); // don't need the stack
// trace here
throw ex;

} else {
final String errorMessage = "Unexpected trace: " + prevE + " and " + curE;
throw new IllegalStateException(errorMessage);
Expand Down
8 changes: 8 additions & 0 deletions tools/otel-transformer/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,12 @@ dependencies {
implementation "io.opentelemetry:opentelemetry-sdk:${opentelemetryJavaVersion}"
implementation "io.opentelemetry:opentelemetry-exporter-otlp:${opentelemetryJavaVersion}"
implementation "io.opentelemetry:opentelemetry-exporter-zipkin:${opentelemetryJavaVersion}"

implementation 'io.opentelemetry.proto:opentelemetry-proto:1.7.0-alpha'

implementation 'com.google.protobuf:protobuf-java:3.25.1'

implementation 'io.grpc:grpc-netty-shaded:1.72.0'
implementation 'io.grpc:grpc-protobuf:1.72.0'
implementation 'io.grpc:grpc-stub:1.72.0'
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public OpenTelemetryExportConfiguration(final int inputPort, final int bufferSiz

this.connectPorts(operationExecutionRecordMatcher.getOutputPort(), executionRecordTransformationStage.getInputPort());

final TraceReconstructionStage traceReconstructionStage = new TraceReconstructionStage(repository, TimeUnit.MILLISECONDS, true, 10000L);
final TraceReconstructionStage traceReconstructionStage = new TraceReconstructionStage(repository, TimeUnit.MILLISECONDS, true, false, 10000L);
this.connectPorts(executionRecordTransformationStage.getOutputPort(), traceReconstructionStage.getInputPort());

final OpenTelemetryExporterStage otstage = new OpenTelemetryExporterStage(configuration);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@
import kieker.common.exception.ConfigurationException;
import kieker.monitoring.core.configuration.ConfigurationFactory;
import kieker.tools.common.AbstractService;
import kieker.tools.oteltransformer.receiver.OtlpReceiverStageConfiguration;

/**
* Receives Kieker records, to later on pass them to OpenTelemetry (like the MooBench record receiver).
*
* @author David Georg Reichelt, Reiner Jung
*
*/
public final class RecordReceiverMain extends AbstractService<OpenTelemetryExportConfiguration, Settings> {
public final class RecordReceiverMain extends AbstractService<teetime.framework.Configuration, Settings> {

private final Settings parameter = new Settings();

Expand All @@ -47,14 +48,24 @@ public int run(final String title, final String label, final String[] args) {
}

@Override
protected OpenTelemetryExportConfiguration createTeetimeConfiguration() throws ConfigurationException {
final Configuration configuration;
if (parameter.getKiekerMonitoringProperties() != null) {
configuration = ConfigurationFactory.createConfigurationFromFile(parameter.getKiekerMonitoringProperties());
protected teetime.framework.Configuration createTeetimeConfiguration() throws ConfigurationException {
if (parameter.getStandard() == Settings.ReceivingStandard.KIEKER) {
final Configuration configuration;
if (parameter.getKiekerMonitoringProperties() != null) {
configuration = ConfigurationFactory.createConfigurationFromFile(parameter.getKiekerMonitoringProperties());
} else {
configuration = ConfigurationFactory.createDefaultConfiguration();
}
return new OpenTelemetryExportConfiguration(parameter.getListenPort(), 8192, configuration);
} else {
configuration = ConfigurationFactory.createDefaultConfiguration();
final Configuration configuration;
if (parameter.getKiekerMonitoringProperties() != null) {
configuration = ConfigurationFactory.createConfigurationFromFile(parameter.getKiekerMonitoringProperties());
} else {
configuration = ConfigurationFactory.createDefaultConfiguration();
}
return new OtlpReceiverStageConfiguration(parameter.getListenPort(), configuration);
}
return new OpenTelemetryExportConfiguration(parameter.getListenPort(), 8192, configuration);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,21 @@
import com.beust.jcommander.converters.PathConverter;

public class Settings {
enum ReceivingStandard {
KIEKER, OPENTELEMETRY
}

@Parameter(names = { "-lp",
"--listenPort" }, required = true, description = "Port where the otel-transformer listens for traces")
private int listenPort;

@Parameter(names = { "-c",
"--configuration" }, required = false, description = "Configuration file.", converter = PathConverter.class)
private Path configurationPath;

@Parameter(names = { "-standard",
"--standard" }, required = true, description = "Standard to wait for (either KIEKER or OPENTELEMETRY")
private ReceivingStandard standard;

public int getListenPort() {
return listenPort;
Expand All @@ -37,4 +45,8 @@ public int getListenPort() {
public Path getKiekerMonitoringProperties() {
return this.configurationPath;
}

public ReceivingStandard getStandard() {
return standard;
}
}
Loading
Loading