This is a dependency which provides some functionality to read Zipkin Span list as Opentracing
implementation of metrics. Messages with metrics content are read from Kafka topics, these can be serialized and de-serialized
to make an abstraction from common work across the requirement of this feature in our projects.
Exposing metrics to be treated by tools let us have a knowledge about how our system works, the traffic which is supporting,
when they have some weaknesses or when definitely system breaks. We are plenty of those tools,
such as Zipkin UI, Prometheus, etc… they all provide a good bunch of views to monitor our service metrics,
but we might custom something appropriated for our tailored solutions.
The purpose in here is abstract the implementation to gather the correlated spans in a linked list which gives us the next span across the multiple services involved in the request, as well as the client which did the request to next server go gather durations and details of how the connection to a third party was.
At the moment, implementation has been done focused in the Http and RPC metrics with the conventions in next link.
More work and test is needed to rely in this idea.
Working in observability feature in conjunction with Spring Boot project, which comes with micrometer as metrics
provider which lets push them into a kafka topic to be consumed by tools to monitor the traffic with the Opentracing
standard, such as Zipkin, Jaeger, OpenCensus, etc.
The convention for Opentracing can be found in:
As it says, it is just convention, but we need also implementation to get the information and deal with their content, in our Java world there is a well known Open Source project which works quite well with this convention and has evolved along the different versions.
and its Github page as:
From the Zipkin project we will get the Span definition as well as the serializer and deserializer with the Span detector
in case the metrics definition is in any of the known message types (JSON_V1, JSON_V2, THRIFT, PROTO3).
Putting more tech on the table and given the nature of the project, Kafka in the transport channel which brings us the messages
to be treated firstly by Zipkin deserializer somehow.
The way Kafka deal the message to be parsed into a format know by our application is through Serdes (SErializerDESerializer),
some of them provided by Kafka and some of them implemented by third parties, like the one we have implemented in here.
One of them to stand out is from Spring Project which reads those messages as json and if this is enough for you, I encourage you to use it
rather creating a new one.
In our particular case, the Span doesn’t meet with those requirements to convert into a json object, so we have implemented our own.
com.currofy.kafka.extensions.opentracing.serialization.OpentracingSerde
With the message read from topic and given the type of content-type as application/opentracing it will deserialize the
messages as NodeTrace which is a bunch of Zipkin Span and the common traceId which all the spans should have.
If we are working with Kafka itself we have to build the pipeline to workout the linked spans, but with kafka streams we
are able to group information, transform… essentially operate with the functionality given by this library. So, now on we
suppose we are with KAFKA STREAMS.
A request is solved (or should be solved) in an amount of time (hopefully short). Once we are starting receiving the metrics
from the different tasks involved in the request we want to group them in a list of span with same traceId and if possible
when the events are emitted we can have them all to reconstruct the followed path.
This work to have the linked spans is an aggregator, which gets the spans and link them to follow that path.
com.currofy.kafka.extensions.group.aggregator.NodeTraceAggregator
The result will be a LinkedSpanList containing all the LinkedSpan and from which we can get the root or initial Span
( the one without parentId ). This LinkedSpan does not contain now all the information in the Zipkin Span, but something
essential (this needs to be revisited, What is essential?) in order to deal with smaller objects in the new messages which go
to kafka topics.
These list of LinkedSpan can be now used as java.util.stream.Stream from which we can filter within the attributes in the object.
This dependency is also a Spring Started, which provides MessageConverter for the Opentracing messages.
So, by importing the dependency and the required one from spring-cloud-stream which creates the StreamMessageConverter in the Spring context.
if it is used in a Spring project with Spring-Cloud:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
<version>${spring-cloud-stream.version}</version>
</dependency> <dependency>
<groupId>com.currofy.kafka.extensions</groupId>
<artifactId>kafka-extension</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>|
Note
|
At the moment this dependency is not upload to any repository, so as to use it, you have to clone in your host and build it to be included into your local repository. |
We are taking for granted you have a kafka topic from which your application is reading the metrics. In our case we are
naming it on app_metrics:
The reference guide to configure kafka-streams-binder within Spring Cloud application can be found at:
but we have some particular needs to bring the messages to our converters, our configuration properties would look like something similar to this:
spring:
cloud:
stream:
bindings:
metrics_in:
destination: app_metrics
content-type: application/opentracing
group: metrics
kafka:
streams:
binder:
brokers: localhost:9092
bindings:
metrics_in:
consumer:
application-id: metrics_in-1
key-serde: org.apache.kafka.common.serialization.Serdes$StringSerde
value-serde: com.currofy.kafka.extensions.opentracing.serialization.OpentracingSerdeBy reading the configuration yaml file, we can see that the key in the topic (unused) is a string, and the value needs
to be de-serialized with our implementation.
Bare in mind the content-type which is one of the keys to make it redirect the messages to our Serde.
From Spring Cloud we can enable the bindings with @EnableBinding in the configuration file. We might give an example such as:
public interface MetricBinding {
String METRICS_IN = "metrics_in";
@Input(METRICS_IN)
KStream<String, NodeTrace> metrics_in();
}with configuration class as
@Configuration
@EnableBinding(MetricBinding.class)
public class BindingConfig {
}This is linking the topics with the ones given in our application.yaml file.
Once we have this burden configured is time to build what really matters, your logic.
As an example of method to read, group and convert those methods we might think in an implementation as:
@Slf4j
@Service
class MetricConsumerService {
@StreamListener
public void metrics(@Input(MetricBinding.METRICS_IN) final KStream<String, NodeTrace> stream) {
stream.groupBy((key, value) -> value.getTraceId(), Grouped.with(Serdes.String(), new OpentracingSerde()))
.windowedBy(TimeWindows.of(Duration.ofSeconds(20L)))
.aggregate(
LinkedSpanList::new,
new NodeTraceAggregator(),
Materialized.with(Serdes.String(), new LinkedSpanListSerde())
)
.toStream()
.foreach((key, v) -> {
log.info("{}", v.httpTraces()
.filter(s -> s.getKind().equals(Span.Kind.SERVER))
.map(LinkedSpan::getId).collect(Collectors.joining(",")));
}
);
}
}I have written a first approach of this dependency which suits with Spring Cloud 2.x, as Spring Cloud 3.x is being released
I know some of this functionality is going to be deprecated, such as the StreamMessageConverter.
Also, I am using as example @StreamListener which is discouraged to be used by Pivotal, in favour of functional programming with
interfaces Supplier, Function and Consumer.