Skip to content

Commit 8212c61

Browse files
committed
Add internal metrics
1 parent 17ab001 commit 8212c61

File tree

4 files changed

+54
-0
lines changed

4 files changed

+54
-0
lines changed

kafka/consumer_group_metadata.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"encoding/binary"
66
"fmt"
77
log "github.com/sirupsen/logrus"
8+
"strconv"
89
)
910

1011
type consumerGroupMetadata struct {
@@ -53,6 +54,7 @@ func newConsumerGroupMetadata(key *bytes.Buffer, value *bytes.Buffer, logger *lo
5354

5455
return nil, err
5556
}
57+
groupMetadata.WithLabelValues(strconv.Itoa(int(valueVersion))).Add(1)
5658

5759
// Decode value content
5860
var metadata *consumerGroupMetadata

kafka/consumer_partition_offset.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"encoding/binary"
66
"fmt"
77
log "github.com/sirupsen/logrus"
8+
"strconv"
89
)
910

1011
// ConsumerPartitionOffset represents a consumer group commit which can be decoded from the consumer_offsets topic
@@ -69,6 +70,7 @@ func newConsumerPartitionOffset(key *bytes.Buffer, value *bytes.Buffer, logger *
6970

7071
return nil, fmt.Errorf("message value has no version")
7172
}
73+
offsetCommit.WithLabelValues(strconv.Itoa(int(valueVersion))).Add(1)
7274

7375
// Decode message value using the right decoding function for given version
7476
var decodedValue offsetValue

kafka/metrics.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package kafka
2+
3+
import (
4+
"github.com/prometheus/client_golang/prometheus"
5+
)
6+
7+
const internalMetricsName = "kafka_minion_internal"
8+
9+
var (
10+
offsetCommit = prometheus.NewCounterVec(prometheus.CounterOpts{
11+
Name: prometheus.BuildFQName(internalMetricsName, "offset_consumer", "offset_commits_read"),
12+
Help: "Number of read offset commits",
13+
}, []string{"version"})
14+
offsetCommitTombstone = prometheus.NewCounter(prometheus.CounterOpts{
15+
Name: prometheus.BuildFQName(internalMetricsName, "offset_consumer", "offset_commits_tombstones_read"),
16+
Help: "Number of read group offset commit tombstone messages",
17+
})
18+
19+
groupMetadata = prometheus.NewCounterVec(prometheus.CounterOpts{
20+
Name: prometheus.BuildFQName(internalMetricsName, "offset_consumer", "group_metadata_read"),
21+
Help: "Number of read group meta data messages",
22+
}, []string{"version"})
23+
groupMetadataTombstone = prometheus.NewCounter(prometheus.CounterOpts{
24+
Name: prometheus.BuildFQName(internalMetricsName, "offset_consumer", "group_metadata_tombstones_read"),
25+
Help: "Number of read group meta data tombstone messages",
26+
})
27+
28+
messagesInSuccess = prometheus.NewCounterVec(prometheus.CounterOpts{
29+
Name: prometheus.BuildFQName(internalMetricsName, "kafka", "messages_in_success"),
30+
Help: "Number of messages successfully consumed from a topic",
31+
}, []string{"topic"})
32+
messagesInFailed = prometheus.NewCounterVec(prometheus.CounterOpts{
33+
Name: prometheus.BuildFQName(internalMetricsName, "kafka", "messages_in_failed"),
34+
Help: "Number of messages failed to consume from a topic",
35+
}, []string{"topic"})
36+
)
37+
38+
func init() {
39+
prometheus.MustRegister(offsetCommit)
40+
prometheus.MustRegister(offsetCommitTombstone)
41+
42+
prometheus.MustRegister(groupMetadata)
43+
prometheus.MustRegister(groupMetadataTombstone)
44+
45+
prometheus.MustRegister(messagesInSuccess)
46+
prometheus.MustRegister(messagesInFailed)
47+
}

kafka/offset_consumer.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,8 +108,10 @@ func (module *OffsetConsumer) partitionConsumer(consumer sarama.Consumer, partit
108108
for {
109109
select {
110110
case msg := <-pconsumer.Messages():
111+
messagesInSuccess.WithLabelValues(msg.Topic).Add(1)
111112
module.processMessage(msg)
112113
case err := <-pconsumer.Errors():
114+
messagesInFailed.WithLabelValues(err.Topic).Add(1)
113115
log.WithFields(log.Fields{
114116
"error": err.Error(),
115117
"topic": err.Topic,
@@ -162,6 +164,7 @@ func (module *OffsetConsumer) processOffsetCommit(key *bytes.Buffer, value *byte
162164
// A tombstone on the __consumer_offsets topic indicates that the consumer group either expired
163165
// due too configured group retention or that the consumed topic has been deleted
164166
if isTombstone {
167+
offsetCommitTombstone.Add(1)
165168
group, err := readString(key)
166169
if err != nil {
167170
logger.WithFields(log.Fields{

0 commit comments

Comments
 (0)