Skip to content

Commit d5d52b7

Browse files
committed
Merge branch 'include-adapter' into 'main'
Include adapter as component See merge request ExplorViz/code/span-service!26
2 parents 873a12f + 3d7bfc7 commit d5d52b7

25 files changed

+1763
-131
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ local.properties
8181
.idea/**/usage.statistics.xml
8282
.idea/**/dictionaries
8383
.idea/**/shelf
84+
*.hprof
8485

8586
# AWS User-specific
8687
.idea/**/aws.xml

build.gradle

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,10 @@ dependencies {
3333
// Needed for scheduled jobs
3434
implementation 'io.quarkus:quarkus-scheduler'
3535

36+
implementation "io.quarkus:quarkus-grpc"
37+
38+
implementation "io.opentelemetry.proto:opentelemetry-proto:0.19.0-alpha"
39+
3640
// Quarkus Cassandra Platform + DataStax Cassandra Driver
3741
implementation enforcedPlatform("${quarkusPlatformGroupId}:quarkus-cassandra-bom:${quarkusPlatformVersion}")
3842
implementation 'com.datastax.oss.quarkus:cassandra-quarkus-client'
@@ -59,6 +63,7 @@ dependencies {
5963
testImplementation 'com.datastax.oss.quarkus:cassandra-quarkus-test-framework'
6064
integrationTestImplementation 'com.datastax.oss.quarkus:cassandra-quarkus-test-framework'
6165
testImplementation "org.testcontainers:kafka:1.17.5"
66+
testImplementation group: "org.apache.kafka", name: "kafka-streams-test-utils", version: "3.3.1"
6267
integrationTestImplementation "org.testcontainers:kafka:1.17.5"
6368

6469
// Integration Testing
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
package net.explorviz.span.adapter.conversion
2+
3+
import com.google.protobuf.InvalidProtocolBufferException
4+
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
5+
import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest
6+
import io.opentelemetry.proto.trace.v1.Span
7+
import io.quarkus.scheduler.Scheduled
8+
import jakarta.enterprise.context.ApplicationScoped
9+
import jakarta.enterprise.inject.Produces
10+
import jakarta.inject.Inject
11+
import java.util.concurrent.atomic.AtomicInteger
12+
import net.explorviz.span.adapter.service.converter.SpanConverterImpl
13+
import net.explorviz.avro.EventType
14+
import net.explorviz.avro.TokenEvent
15+
import net.explorviz.span.adapter.service.validation.SpanValidator
16+
import net.explorviz.span.persistence.PersistenceSpan
17+
import net.explorviz.span.persistence.PersistenceSpanProcessor
18+
import net.explorviz.span.persistence.SpanConverter
19+
import org.apache.kafka.common.serialization.Serdes
20+
import org.apache.kafka.common.utils.Bytes
21+
import org.apache.kafka.streams.KeyValue
22+
import org.apache.kafka.streams.StreamsBuilder
23+
import org.apache.kafka.streams.Topology
24+
import org.apache.kafka.streams.kstream.Consumed
25+
import org.apache.kafka.streams.kstream.KStream
26+
import org.apache.kafka.streams.kstream.Materialized
27+
import org.apache.kafka.streams.kstream.Produced
28+
import org.apache.kafka.streams.state.KeyValueStore
29+
import org.eclipse.microprofile.config.inject.ConfigProperty
30+
import org.slf4j.Logger
31+
import org.slf4j.LoggerFactory
32+
33+
34+
/** Builds a KafkaStream topology instance with all its transformers. Entry point of the stream analysis. */
35+
@ApplicationScoped
36+
class TopologyProducer {
37+
38+
companion object {
39+
private val LOGGER: Logger = LoggerFactory.getLogger(TopologyProducer::class.java)
40+
}
41+
42+
private val lastReceivedSpans = AtomicInteger(0)
43+
private val lastInvalidSpans = AtomicInteger(0)
44+
45+
@ConfigProperty(name = "explorviz.kafka-streams.topics.in") lateinit var inTopic: String
46+
47+
@ConfigProperty(name = "explorviz.kafka-streams.topics.out.spans") lateinit var spansOutTopic: String
48+
49+
@ConfigProperty(name = "explorviz.kafka-streams.topics.in.tokens") lateinit var tokensInTopic: String
50+
51+
@ConfigProperty(name = "explorviz.kafka-streams.topics.out.tokens-table") lateinit var tokensOutTopic: String
52+
53+
@Inject lateinit var validator: SpanValidator
54+
55+
@Inject lateinit var spanAvroSerde: SpecificAvroSerde<net.explorviz.avro.Span>
56+
57+
@Inject lateinit var tokenEventAvroSerde: SpecificAvroSerde<TokenEvent>
58+
59+
@Inject lateinit var spanConverter: SpanConverterImpl
60+
61+
@Inject
62+
lateinit var persistenceSpanConverter: SpanConverter
63+
64+
65+
@Inject
66+
lateinit var persistenceProcessor: PersistenceSpanProcessor
67+
68+
69+
@Produces
70+
fun buildTopology(): Topology {
71+
val builder = StreamsBuilder()
72+
73+
// BEGIN Conversion Stream
74+
val spanByteStream: KStream<ByteArray, ByteArray> =
75+
builder.stream(inTopic, Consumed.with(Serdes.ByteArray(), Serdes.ByteArray()))
76+
77+
val spanStream: KStream<ByteArray, Span> =
78+
spanByteStream.flatMapValues { data ->
79+
try {
80+
val spanList = mutableListOf<Span>()
81+
ExportTraceServiceRequest.parseFrom(data).resourceSpansList.forEach { resourceSpans ->
82+
resourceSpans.scopeSpansList.forEach { scopeSpans -> spanList.addAll(scopeSpans.spansList) }
83+
}
84+
lastReceivedSpans.addAndGet(spanList.size)
85+
spanList
86+
} catch (e: InvalidProtocolBufferException) {
87+
LOGGER.trace("Invalid protocol buffer: ${e.message}")
88+
emptyList()
89+
}
90+
}
91+
92+
// Validate Spans
93+
val validSpanStream: KStream<ByteArray, Span> =
94+
spanStream.flatMapValues { value ->
95+
if (!validator.isValid(value)) {
96+
lastInvalidSpans.incrementAndGet()
97+
emptyList()
98+
} else {
99+
listOf(value)
100+
}
101+
}
102+
103+
// Convert to Span Structure
104+
val explorvizSpanStream: KStream<String, net.explorviz.avro.Span> =
105+
validSpanStream.map { _, value ->
106+
val span = spanConverter.fromOpenTelemetrySpan(value)
107+
KeyValue(span.landscapeToken, span)
108+
}
109+
110+
// Map to our more space-efficient PersistenceSpan format
111+
// ToDo: Combine with previous step
112+
val persistenceStream: KStream<String, PersistenceSpan> = explorvizSpanStream.mapValues(
113+
this.persistenceSpanConverter,
114+
)
115+
116+
persistenceStream.foreach { _ : String?, span: PersistenceSpan? ->
117+
persistenceProcessor.accept(
118+
span,
119+
)
120+
}
121+
// END Conversion Stream
122+
123+
// BEGIN Token Stream
124+
builder
125+
.stream(tokensInTopic, Consumed.with(Serdes.String(), tokenEventAvroSerde))
126+
.filter { key, value ->
127+
LOGGER.trace("Received token event for token value $key with event $value")
128+
value == null || value.type == EventType.CREATED
129+
}
130+
.to(tokensOutTopic, Produced.with(Serdes.String(), tokenEventAvroSerde))
131+
132+
builder.globalTable(
133+
tokensOutTopic,
134+
Materialized.`as`<String, TokenEvent, KeyValueStore<Bytes, ByteArray>>("token-events-global-store")
135+
.withKeySerde(Serdes.String())
136+
.withValueSerde(tokenEventAvroSerde),
137+
)
138+
// END Token Stream
139+
140+
return builder.build()
141+
}
142+
143+
@Scheduled(every = "{explorviz.log.span.interval}")
144+
fun logStatus() {
145+
val spans = lastReceivedSpans.getAndSet(0)
146+
val invalidSpans = lastInvalidSpans.getAndSet(0)
147+
LOGGER.debug("Received $spans spans: ${spans - invalidSpans} valid, $invalidSpans invalid")
148+
}
149+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package net.explorviz.span.adapter.injection
2+
3+
import io.quarkus.arc.DefaultBean
4+
import jakarta.enterprise.context.Dependent
5+
import jakarta.enterprise.inject.Produces
6+
import jakarta.inject.Inject
7+
import net.explorviz.avro.TokenEvent
8+
import org.apache.kafka.streams.KafkaStreams
9+
import org.apache.kafka.streams.StoreQueryParameters
10+
import org.apache.kafka.streams.errors.InvalidStateStoreException
11+
import org.apache.kafka.streams.state.QueryableStoreTypes
12+
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore
13+
import org.slf4j.Logger
14+
import org.slf4j.LoggerFactory
15+
16+
/** Returns a {@link SchemaRegistryClient} that is used by this application with a maximum number of 10 schemas. */
17+
@Dependent
18+
class ReadOnlyKeyValueStoreProducer {
19+
20+
companion object {
21+
private val LOGGER: Logger = LoggerFactory.getLogger(ReadOnlyKeyValueStoreProducer::class.java)
22+
}
23+
24+
@Inject lateinit var streams: KafkaStreams
25+
26+
@Produces
27+
@DefaultBean
28+
fun getTokenEventStore(): ReadOnlyKeyValueStore<String, TokenEvent> {
29+
while (true) {
30+
try {
31+
return streams.store(
32+
StoreQueryParameters.fromNameAndType(
33+
"token-events-global-store",
34+
QueryableStoreTypes.keyValueStore(),
35+
),
36+
)
37+
} catch (e: InvalidStateStoreException) {
38+
LOGGER.debug("State store not yet initialized: ${e.message}")
39+
LOGGER.debug("Will try again ...")
40+
}
41+
}
42+
}
43+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package net.explorviz.span.adapter.service
2+
3+
import jakarta.enterprise.context.ApplicationScoped
4+
import jakarta.inject.Inject
5+
import net.explorviz.avro.TokenEvent
6+
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore
7+
8+
/** Service to access available landscape tokens, backed by GlobalKTable and state store access. */
9+
@ApplicationScoped
10+
class TokenService @Inject constructor(private val keyValueStore: ReadOnlyKeyValueStore<String, TokenEvent>) {
11+
12+
/**
13+
* Checks whether a given token exists.
14+
*
15+
* @param tokenValue the token to check
16+
* @return `true` if the given token value is in the list of valid tokens
17+
*/
18+
fun validLandscapeTokenValue(tokenValue: String): Boolean {
19+
val potentialEvent = keyValueStore[tokenValue]
20+
return potentialEvent?.token?.value == tokenValue
21+
}
22+
23+
/**
24+
* Checks whether a given landscape token and secret exist and belong to each other.
25+
*
26+
* @param tokenValue the token to check
27+
* @param tokenSecret the secret to check
28+
* @return `true` if the given token value is valid and the secret belongs to this token.
29+
*/
30+
fun validLandscapeTokenValueAndSecret(tokenValue: String, tokenSecret: String): Boolean {
31+
val potentialEvent = keyValueStore[tokenValue]
32+
return potentialEvent?.token?.value == tokenValue && potentialEvent.token.secret == tokenSecret
33+
}
34+
}
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
package net.explorviz.span.adapter.service.converter
2+
3+
import io.opentelemetry.proto.common.v1.AnyValue
4+
import io.opentelemetry.proto.trace.v1.Span
5+
import net.explorviz.span.adapter.service.converter.DefaultAttributeValues.DEFAULT_APP_INSTANCE_ID
6+
import net.explorviz.span.adapter.service.converter.DefaultAttributeValues.DEFAULT_APP_LANG
7+
import net.explorviz.span.adapter.service.converter.DefaultAttributeValues.DEFAULT_APP_NAME
8+
import net.explorviz.span.adapter.service.converter.DefaultAttributeValues.DEFAULT_CLASS_FQN
9+
import net.explorviz.span.adapter.service.converter.DefaultAttributeValues.DEFAULT_FQN
10+
import net.explorviz.span.adapter.service.converter.DefaultAttributeValues.DEFAULT_GIT_COMMIT_CHECKSUM
11+
import net.explorviz.span.adapter.service.converter.DefaultAttributeValues.DEFAULT_HOST_IP
12+
import net.explorviz.span.adapter.service.converter.DefaultAttributeValues.DEFAULT_HOST_NAME
13+
import net.explorviz.span.adapter.service.converter.DefaultAttributeValues.DEFAULT_LANDSCAPE_SECRET
14+
import net.explorviz.span.adapter.service.converter.DefaultAttributeValues.DEFAULT_LANDSCAPE_TOKEN
15+
import net.explorviz.span.adapter.service.converter.DefaultAttributeValues.DEFAULT_PACKAGE_NAME
16+
import org.apache.commons.lang3.StringUtils
17+
18+
/** Reads the attributes of a [Span]. */
19+
open class AttributesReader(private val span: Span) {
20+
21+
companion object {
22+
const val LANDSCAPE_TOKEN = "explorviz.token.id"
23+
const val TOKEN_SECRET = "explorviz.token.secret"
24+
const val GIT_COMMIT_CHECKSUM = "git_commit_checksum"
25+
const val HOST_NAME = "host"
26+
const val HOST_IP = "host_address"
27+
const val APPLICATION_NAME = "service.name"
28+
const val APPLICATION_INSTANCE_ID = "service.instance.id"
29+
const val APPLICATION_LANGUAGE = "telemetry.sdk.language"
30+
const val CODE_FUNCTION = "code.function"
31+
const val CODE_NAMESPACE = "code.namespace"
32+
const val METHOD_FQN = "java.fqn"
33+
const val K8S_POD_NAME = "k8s.pod.name"
34+
const val K8S_NAMESPACE_NAME = "k8s.namespace.name"
35+
const val K8S_NODE_NAME = "k8s.node.name"
36+
const val K8S_DEPLOYMENT_NAME = "k8s.deployment.name"
37+
}
38+
39+
private val attributes: Map<String, AnyValue> = span.attributesList.associate { it.key to it.value }
40+
41+
open fun getAsString(key: String): String? {
42+
return attributes[key]?.stringValue
43+
}
44+
45+
val landscapeToken: String
46+
get() = getAsString(LANDSCAPE_TOKEN) ?: DEFAULT_LANDSCAPE_TOKEN
47+
48+
val secret: String
49+
get() = getAsString(TOKEN_SECRET) ?: DEFAULT_LANDSCAPE_SECRET
50+
51+
val hostName: String
52+
get() = getAsString(HOST_NAME) ?: DEFAULT_HOST_NAME
53+
54+
val hostIpAddress: String
55+
get() = getAsString(HOST_IP) ?: DEFAULT_HOST_IP
56+
57+
val gitCommitChecksum: String
58+
get() = getAsString(GIT_COMMIT_CHECKSUM) ?: DEFAULT_GIT_COMMIT_CHECKSUM
59+
60+
val applicationName: String
61+
get() = getAsString(APPLICATION_NAME) ?: DEFAULT_APP_NAME
62+
63+
val applicationInstanceId: String
64+
get() = getAsString(APPLICATION_INSTANCE_ID) ?: DEFAULT_APP_INSTANCE_ID
65+
66+
val applicationLanguage: String
67+
get() = getAsString(APPLICATION_LANGUAGE) ?: DEFAULT_APP_LANG
68+
69+
val methodFqn: String
70+
get() {
71+
val codeNamespace = getAsString(CODE_NAMESPACE)
72+
val codeFunction = getAsString(CODE_FUNCTION)
73+
val methodFqn = getAsString(METHOD_FQN)
74+
75+
return codeNamespace?.let { namespace -> codeFunction?.let { function -> "$namespace.$function" } }
76+
?: methodFqn
77+
?: generateMethodFqnFromSpanName()
78+
}
79+
80+
open fun generateMethodFqnFromSpanName(): String {
81+
val spanName = span.name
82+
if (spanName.isNullOrEmpty()) return DEFAULT_FQN
83+
84+
val hierarchyDepth = StringUtils.countMatches(spanName, ".")
85+
86+
return when {
87+
hierarchyDepth == 0 -> "$DEFAULT_CLASS_FQN.$spanName"
88+
hierarchyDepth == 1 -> "$DEFAULT_PACKAGE_NAME.$spanName"
89+
else -> spanName // Assume span name contains fully qualified name
90+
}
91+
}
92+
93+
val k8sPodName: String
94+
get() = getAsString(K8S_POD_NAME) ?: ""
95+
96+
val k8sNamespace: String
97+
get() = getAsString(K8S_NAMESPACE_NAME) ?: ""
98+
99+
val k8sNodeName: String
100+
get() = getAsString(K8S_NODE_NAME) ?: ""
101+
102+
val k8sDeploymentName: String
103+
get() = getAsString(K8S_DEPLOYMENT_NAME) ?: ""
104+
105+
fun appendToSpan(builder: net.explorviz.avro.Span.Builder) {
106+
builder.apply {
107+
landscapeToken = this@AttributesReader.landscapeToken
108+
gitCommitChecksum = this@AttributesReader.gitCommitChecksum
109+
hostname = this@AttributesReader.hostName
110+
hostIpAddress = this@AttributesReader.hostIpAddress
111+
appInstanceId = this@AttributesReader.applicationInstanceId
112+
appName = this@AttributesReader.applicationName
113+
appLanguage = this@AttributesReader.applicationLanguage
114+
fullyQualifiedOperationName = this@AttributesReader.methodFqn
115+
k8sPodName = this@AttributesReader.k8sPodName
116+
k8sNamespace = this@AttributesReader.k8sNamespace
117+
k8sNodeName = this@AttributesReader.k8sNodeName
118+
k8sDeploymentName = this@AttributesReader.k8sDeploymentName
119+
}
120+
}
121+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package net.explorviz.span.adapter.service.converter
2+
3+
/** Contains constants for missing attribute values of spans. */
4+
object DefaultAttributeValues {
5+
const val DEFAULT_LANDSCAPE_TOKEN = "mytokenvalue"
6+
const val DEFAULT_LANDSCAPE_SECRET = "mytokensecret"
7+
const val DEFAULT_GIT_COMMIT_CHECKSUM = "UNKNOWN" // NOPMD
8+
const val DEFAULT_HOST_IP = "0.0.0.0" // NOPMD
9+
const val DEFAULT_HOST_NAME = "UNKNOWN-HOST"
10+
const val DEFAULT_APP_NAME = "UNKNOWN-APPLICATION"
11+
const val DEFAULT_APP_INSTANCE_ID = "0"
12+
const val DEFAULT_APP_LANG = "UNKNOWN"
13+
const val DEFAULT_PACKAGE_NAME = "unknownpkg"
14+
const val DEFAULT_CLASS_NAME = "UnknownClass"
15+
16+
// FQN must adhere to the format <pkg.Class.method>, i.e., include at least two '.'
17+
const val DEFAULT_CLASS_FQN = "$DEFAULT_PACKAGE_NAME.$DEFAULT_CLASS_NAME"
18+
const val DEFAULT_METHOD = "unknownMethod"
19+
const val DEFAULT_FQN = "$DEFAULT_CLASS_FQN.$DEFAULT_METHOD"
20+
}

0 commit comments

Comments
 (0)