Skip to content

Commit d38f19c

Browse files
author
Martin Schneppenheim
committed
Add healthcheck
1 parent 1a9521b commit d38f19c

File tree

3 files changed

+37
-13
lines changed

3 files changed

+37
-13
lines changed

collector/collector.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,3 +177,8 @@ func getVersionedConsumerGroup(consumerGroupName string) *versionedConsumerGroup
177177
}
178178
return &versionedConsumerGroup{BaseName: baseName, Name: consumerGroupName, Version: uint8(parsedVersion)}
179179
}
180+
181+
// IsHealthy returns a bool which indicates if the collector is in a healthy state or not
182+
func (e *Collector) IsHealthy() bool {
183+
return e.kafkaClient.IsHealthy()
184+
}

kafka/client.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,11 @@ func NewKafkaClient(opts *options.Options) (*Client, error) {
9999

100100
// GetTopicNames returns an array of topic names
101101
func (c *Client) GetTopicNames() ([]string, error) {
102+
err := c.client.RefreshMetadata()
103+
if err != nil {
104+
return nil, fmt.Errorf("Cannot refresh metadata. %s", err)
105+
}
106+
102107
topicNames, err := c.consumer.Topics()
103108
if err != nil {
104109
return nil, fmt.Errorf("Cannot fetch topic names. %s", err)
@@ -107,6 +112,16 @@ func (c *Client) GetTopicNames() ([]string, error) {
107112
return topicNames, nil
108113
}
109114

115+
// IsHealthy returns true if communication with kafka brokers is fine
116+
func (c *Client) IsHealthy() bool {
117+
err := c.client.RefreshMetadata()
118+
if err != nil {
119+
return false
120+
}
121+
122+
return true
123+
}
124+
110125
// GetPartitionIDs returns an int32 array with all partitionIDs for a specific topic
111126
func (c *Client) GetPartitionIDs(topicName string) ([]int32, error) {
112127
partitionIDs, err := c.client.Partitions(topicName)

main.go

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -33,25 +33,29 @@ func main() {
3333
log.SetLevel(level)
3434

3535
log.Info("Starting kafka minion version%v", opts.Version)
36-
startCollector(opts)
36+
kafkaCollector, err := collector.NewKafkaCollector(opts)
37+
if err != nil {
38+
log.Fatal("Could not create kafka exporter. ", err)
39+
}
40+
prometheus.MustRegister(kafkaCollector)
41+
log.Infof("Successfully started kafka exporter")
3742

3843
// Start listening on /metrics endpoint
3944
http.Handle("/metrics", promhttp.Handler())
45+
http.Handle("/healthcheck", healthcheck(kafkaCollector))
4046
listenAddress := fmt.Sprintf(":%d", opts.Port)
4147
log.Fatal(http.ListenAndServe(listenAddress, nil))
4248
log.Infof("Listening on: '%s", listenAddress)
43-
4449
}
4550

46-
func startCollector(opts *options.Options) {
47-
log.Infof("Starting kafka lag exporter v%v", opts.Version)
48-
49-
// Start kafka exporter
50-
log.Infof("Starting kafka exporter")
51-
kafkaCollector, err := collector.NewKafkaCollector(opts)
52-
if err != nil {
53-
log.Fatal("Could not create kafka exporter. ", err)
54-
}
55-
prometheus.MustRegister(kafkaCollector)
56-
log.Infof("Successfully started kafka exporter")
51+
func healthcheck(kafkaCollector *collector.Collector) http.HandlerFunc {
52+
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
53+
log.Debug("Healthcheck has been called")
54+
isHealthy := kafkaCollector.IsHealthy()
55+
if isHealthy {
56+
w.Write([]byte("Status: Healthy"))
57+
} else {
58+
http.Error(w, "Healthcheck failed", http.StatusServiceUnavailable)
59+
}
60+
})
5761
}

0 commit comments

Comments
 (0)