Skip to content

Commit 1a9521b

Browse files
author
Martin Schneppenheim
committed
Initial commit
1 parent c1a87f5 commit 1a9521b

File tree

13 files changed

+943
-0
lines changed

13 files changed

+943
-0
lines changed

.vscode/launch.json

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
{
2+
// Use IntelliSense to learn about possible attributes.
3+
// Hover to view descriptions of existing attributes.
4+
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
5+
"version": "0.2.0",
6+
"configurations": [
7+
{
8+
"name": "Kafka Minion",
9+
"type": "go",
10+
"request": "launch",
11+
"mode": "auto",
12+
"program": "${workspaceRoot}/main.go",
13+
"env": {
14+
"KAFKA_BROKERS": "outside-kafka-0.service.int.rewe-big-data.com:32400,outside-kafka-1.service.int.rewe-big-data.com:32401,outside-kafka-2.service.int.rewe-big-data.com:32402,outside-kafka-3.service.int.rewe-big-data.com:32403",
15+
"VERSION": "DEV"
16+
},
17+
"args": []
18+
}
19+
]
20+
}

Dockerfile

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
# build image
2+
FROM golang:1.11-alpine as builder
3+
RUN apk update && apk add git ca-certificates
4+
5+
WORKDIR /app
6+
COPY . .
7+
8+
RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 GO111MODULE=on go build -a -installsuffix cgo -o /go/bin/kafka-lag-exporter
9+
10+
# executable image
11+
FROM scratch
12+
COPY --from=builder /go/bin/kafka-lag-exporter /go/bin/kafka-lag-exporter
13+
14+
ENV VERSION 0.0.1
15+
ENTRYPOINT ["/go/bin/kafka-lag-exporter"]

README.md

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
# Kafka Minion (Alpha - Still in development)
2+
3+
Kafka minion is a prometheus exporter for Apache Kafka (v0.10.0+), created to expose consumer group lags on a per topic (rather than per partition) basis.
4+
5+
### Why did you create yet another kafka lag exporter?
6+
7+
It has been created because of two features which aren't provided by any of the other public kafka lag exporters:
8+
9+
- We are only interested in per consumergroup:topic lags. Some exporters export either only consumer group lags (of all topics alltogether) or they export per partition metrics
10+
11+
- In our environment developers occasionally replay data by creating a new consumer group. They do so by incrementing a trailing number (e. g. "sample-group-1" becomes "sample-group-2"). In order to setup a proper alerting based on increasing lags for all consumer groups in a cluster, we need to ignore those "outdated" consumer groups (in this case "sample-group-1" as it's not being used anymore). This exporter adds 3 labels on each exporter consumergroup:topic lag metric to make that possible: `consumer_group_base_name`, `consumer_group_version`, `is_latest_consumer_group`. The meaning of each label is explained in the section [Exposed Metrics](#exposed-metrics)
12+
13+
## Features
14+
15+
- [x] Supports Kafka 0.10.1.0 - 2.1.x (last updated 10th Feb 2019)
16+
- [x] Kafka SASL/SSL support
17+
- [x] Provides per consumergroup:topic lag metrics (removes a topic's metrics if a single partition metric in that topic couldn't be fetched)
18+
- [x] Created to use in Kubernetes clusters (has liveness/readiness check and helm chart)
19+
20+
## Setup
21+
22+
### Environment variables
23+
24+
| Variable name | Description | Default |
25+
| -------------- | ---------------------------------------------------------------------------------- | ----------------- |
26+
| PORT | HTTP Port to listen on for the prometheus exporter | 8080 |
27+
| LOG_LEVEL | Log granularity (debug, info, warn, error, fatal, panic) | info |
28+
| VERSION | Application version (env variable is set in Dockerfile) | (from Dockerfile) |
29+
| KAFKA_BROKERS | Array of broker addresses, delimited by comma (e. g. "kafka-1:9092, kafka-2:9092") | (No default) |
30+
| METRICS_PREFIX | A prefix for all exported prometheus metrics | kafka_minion |
31+
32+
## Exposed metrics
33+
34+
| Metric name | Description |
35+
| ------------------------------------- | ---------------------------------------------------------------------------------------------- |
36+
| kafka_minion_consumer_group_topic_lag | Sum of all partition lags for a consumerGroup:topic combination |
37+
| kafka_minion_topic_message_count | Estimated message count (calculated by the sum of all partitions' high offset - lowest offset) |
38+
| kafka_minion_topic_partition_count | Number of partitions for a topic |
39+
40+
`kafka_minion_consumer_group_topic_lag` has four labels:
41+
42+
- consumer_group: Consumer Group ID
43+
- consumer_group_base_name: The recognized name without "version"
44+
- consumer_group_version: The parsed consumer group version
45+
- is_latest_consumer_group: Indicates if this consumer group id has the highest version for the consumer group base name

collector/collector.go

Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
package collector
2+
3+
import (
4+
"github.com/google-cloud-tools/kafka-minion/kafka"
5+
"github.com/google-cloud-tools/kafka-minion/options"
6+
"github.com/prometheus/client_golang/prometheus"
7+
log "github.com/sirupsen/logrus"
8+
"strconv"
9+
"strings"
10+
)
11+
12+
var (
13+
topicPartitionCountDesc *prometheus.Desc
14+
topicMessageCountDesc *prometheus.Desc
15+
consumerGroupTopicLagDesc *prometheus.Desc
16+
latestConsumerGroupTopicLagDesc *prometheus.Desc
17+
)
18+
19+
// Collector collects and provides all Kafka metrics
20+
type Collector struct {
21+
kafkaClient *kafka.Client
22+
}
23+
24+
// NewKafkaCollector creates a new instance of our internal KafkaCollector
25+
func NewKafkaCollector(opts *options.Options) (*Collector, error) {
26+
kafkaClient, err := kafka.NewKafkaClient(opts)
27+
if err != nil {
28+
return nil, err
29+
}
30+
31+
// Initialize all metric types
32+
topicPartitionCountDesc = prometheus.NewDesc(
33+
prometheus.BuildFQName(opts.MetricsPrefix, "topic", "partition_count"),
34+
"Number of partitions for this topic",
35+
[]string{"topic"}, prometheus.Labels{},
36+
)
37+
topicMessageCountDesc = prometheus.NewDesc(
38+
prometheus.BuildFQName(opts.MetricsPrefix, "topic", "message_count"),
39+
"Number of expected messages on a given topic (not reliable on compacted topics)",
40+
[]string{"topic"}, prometheus.Labels{},
41+
)
42+
consumerGroupTopicLagDesc = prometheus.NewDesc(
43+
prometheus.BuildFQName(opts.MetricsPrefix, "consumer_group", "topic_lag"),
44+
"Current approximate lag of a consumergroup for a topic",
45+
[]string{"consumer_group", "consumer_group_base_name", "topic", "consumer_group_version", "is_latest_consumer_group"}, prometheus.Labels{},
46+
)
47+
48+
kafkaCollector := &Collector{
49+
kafkaClient: kafkaClient,
50+
}
51+
52+
return kafkaCollector, nil
53+
}
54+
55+
// Describe sends a description of all to be exposed metric types to Prometheus
56+
func (e *Collector) Describe(ch chan<- *prometheus.Desc) {
57+
ch <- topicPartitionCountDesc
58+
ch <- topicMessageCountDesc
59+
ch <- consumerGroupTopicLagDesc
60+
}
61+
62+
// Collect is called by the Prometheus registry when collecting
63+
// metrics. The implementation sends each collected metric via the
64+
// provided channel and returns once the last metric has been sent.
65+
func (e *Collector) Collect(ch chan<- prometheus.Metric) {
66+
// 1. Get a fresh copy of all available topic names
67+
topicNames, err := e.kafkaClient.GetTopicNames()
68+
if err != nil {
69+
log.Error(err)
70+
}
71+
72+
// 2. Get partition ids for all topics and expose the partition count by topic metric
73+
partitionIDsByTopicName := e.kafkaClient.GetPartitionIDsBulk(topicNames)
74+
for topicName, partitionIDs := range partitionIDsByTopicName {
75+
ch <- prometheus.MustNewConstMetric(
76+
topicPartitionCountDesc,
77+
prometheus.GaugeValue,
78+
float64(len(partitionIDs)),
79+
topicName,
80+
)
81+
}
82+
83+
// Get partition details for all partitions in all topics
84+
topicsByName := e.kafkaClient.GetPartitionOffsets(partitionIDsByTopicName)
85+
for topicName, topic := range topicsByName {
86+
ch <- prometheus.MustNewConstMetric(
87+
topicMessageCountDesc,
88+
prometheus.GaugeValue,
89+
float64(topic.MessageCount),
90+
topicName,
91+
)
92+
}
93+
94+
log.Debugf("Collecting consumer group metrics")
95+
consumerGroupTopicLagsByGroupName := e.kafkaClient.ConsumerGroupTopicLags(topicsByName)
96+
latestConsumerGroupsByName := getLatestConsumerGroupsByName(consumerGroupTopicLagsByGroupName)
97+
98+
for _, topicLags := range consumerGroupTopicLagsByGroupName {
99+
for _, value := range topicLags {
100+
isLatest := "false"
101+
baseName := value.Name
102+
version := uint8(0)
103+
104+
// If this group is the latest consumer group also add it with a different metrics description
105+
if consumerGroup, ok := latestConsumerGroupsByName[value.Name]; ok {
106+
isLatest = "true"
107+
baseName = consumerGroup.BaseName
108+
version = consumerGroup.Version
109+
}
110+
111+
ch <- prometheus.MustNewConstMetric(
112+
consumerGroupTopicLagDesc,
113+
prometheus.GaugeValue,
114+
float64(value.TopicLag),
115+
value.Name,
116+
baseName,
117+
value.TopicName,
118+
strconv.Itoa(int(version)),
119+
isLatest,
120+
)
121+
}
122+
}
123+
}
124+
125+
// versionedConsumerGroup contains information about the consumer group's base name and version
126+
type versionedConsumerGroup struct {
127+
BaseName string
128+
Name string
129+
Version uint8
130+
}
131+
132+
// getLatestConsumerGroupNames returns the latest consumer group names in a map where the consumer group name is the key.
133+
func getLatestConsumerGroupsByName(groupsByTopicName map[string][]*kafka.ConsumerGroupTopicLag) map[string]*versionedConsumerGroup {
134+
latestConsumerGroupByBaseName := make(map[string]*versionedConsumerGroup)
135+
136+
for _, groups := range groupsByTopicName {
137+
for _, group := range groups {
138+
consumerGroup := getVersionedConsumerGroup(group.Name)
139+
baseName := consumerGroup.BaseName
140+
// Check if there already is a potentially latest consumer group for this base name
141+
if _, ok := latestConsumerGroupByBaseName[baseName]; ok {
142+
// Only overwrite base consumer group if this consumergroup version is higher
143+
if latestConsumerGroupByBaseName[baseName].Version < consumerGroup.Version {
144+
latestConsumerGroupByBaseName[baseName] = consumerGroup
145+
}
146+
} else {
147+
latestConsumerGroupByBaseName[baseName] = consumerGroup
148+
}
149+
}
150+
}
151+
152+
// we already got consumer groups by base name, but now we want them in a map grouped by their actual group name as key
153+
consumerGroupByName := make(map[string]*versionedConsumerGroup)
154+
for _, group := range latestConsumerGroupByBaseName {
155+
consumerGroupByName[group.Name] = group
156+
}
157+
158+
return consumerGroupByName
159+
}
160+
161+
// getVersionedConsumerGroup returns the "base name" of a consumer group and it's version
162+
func getVersionedConsumerGroup(consumerGroupName string) *versionedConsumerGroup {
163+
baseName := consumerGroupName
164+
parsedVersion := 0
165+
166+
lastDashIndex := strings.LastIndex(consumerGroupName, "-")
167+
if lastDashIndex > 0 {
168+
// Potentially this is our base name (if the group has no trailing number at all, this is wrong though)
169+
baseName = consumerGroupName[:lastDashIndex]
170+
potentialVersion := consumerGroupName[lastDashIndex+1 : len(consumerGroupName)]
171+
var err error
172+
parsedVersion, err = strconv.Atoi(potentialVersion)
173+
if err != nil {
174+
parsedVersion = 0
175+
baseName = consumerGroupName
176+
}
177+
}
178+
return &versionedConsumerGroup{BaseName: baseName, Name: consumerGroupName, Version: uint8(parsedVersion)}
179+
}

go.mod

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
module github.com/google-cloud-tools/kafka-minion
2+
3+
require (
4+
github.com/DataDog/zstd v1.3.5 // indirect
5+
github.com/Shopify/sarama v1.20.1
6+
github.com/eapache/go-resiliency v1.1.0 // indirect
7+
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect
8+
github.com/eapache/queue v1.1.0 // indirect
9+
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db // indirect
10+
github.com/kelseyhightower/envconfig v1.3.0
11+
github.com/pierrec/lz4 v2.0.5+incompatible // indirect
12+
github.com/prometheus/client_golang v0.9.2
13+
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a // indirect
14+
github.com/sirupsen/logrus v1.3.0
15+
)

go.sum

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
github.com/DataDog/zstd v1.3.5 h1:DtpNbljikUepEPD16hD4LvIcmhnhdLTiW/5pHgbmp14=
2+
github.com/DataDog/zstd v1.3.5/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
3+
github.com/Shopify/sarama v1.20.1 h1:Bb0h3I++r4eX333Y0uZV2vwUXepJbt6ig05TUU1qt9I=
4+
github.com/Shopify/sarama v1.20.1/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo=
5+
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0=
6+
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
7+
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
8+
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
9+
github.com/eapache/go-resiliency v1.1.0 h1:1NtRmCAqadE2FN4ZcN6g90TP3uk8cg9rn9eNK2197aU=
10+
github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs=
11+
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw=
12+
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
13+
github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
14+
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
15+
github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM=
16+
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
17+
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db h1:woRePGFeVFfLKN/pOkfl+p/TAqKOfFu+7KPlMVpok/w=
18+
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
19+
github.com/kelseyhightower/envconfig v1.3.0 h1:IvRS4f2VcIQy6j4ORGIf9145T/AsUB+oY8LyvN8BXNM=
20+
github.com/kelseyhightower/envconfig v1.3.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg=
21+
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
22+
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
23+
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
24+
github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I=
25+
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
26+
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
27+
github.com/prometheus/client_golang v0.9.2 h1:awm861/B8OKDd2I/6o1dy3ra4BamzKhYOiGItCeZ740=
28+
github.com/prometheus/client_golang v0.9.2/go.mod h1:OsXs2jCmiKlQ1lTBmv21f2mNfw4xf/QclQDMrYNZzcM=
29+
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910 h1:idejC8f05m9MGOsuEi1ATq9shN03HrxNkD/luQvxCv8=
30+
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
31+
github.com/prometheus/common v0.0.0-20181126121408-4724e9255275 h1:PnBWHBf+6L0jOqq0gIVUe6Yk0/QMZ640k6NvkxcBf+8=
32+
github.com/prometheus/common v0.0.0-20181126121408-4724e9255275/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro=
33+
github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a h1:9a8MnZMP0X2nLJdBg+pBmGgkJlSaKC2KaQmTCk1XDtE=
34+
github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
35+
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhDYGoxY8uLVpewe1GDZ2vu2Tr/vTdVAkFQ=
36+
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
37+
github.com/sirupsen/logrus v1.3.0 h1:hI/7Q+DtNZ2kINb6qt/lS+IyXnHQe9e90POfeewL/ME=
38+
github.com/sirupsen/logrus v1.3.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
39+
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
40+
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
41+
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793 h1:u+LnwYTOOW7Ukr/fppxEb1Nwz0AtPflrblfvUudpo+I=
42+
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
43+
golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
44+
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
45+
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33 h1:I6FyU15t786LL7oL/hn43zqTuEGr4PN7F4XJ1p4E3Y8=
46+
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=

0 commit comments

Comments
 (0)