@@ -41,7 +41,7 @@ func main() {
4141 clusterCh := make (chan * kafka.StorageRequest , 200 )
4242
4343 // Create storage module
44- cache := storage .NewOffsetStorage (consumerOffsetsCh , clusterCh )
44+ cache := storage .NewMemoryStorage (consumerOffsetsCh , clusterCh )
4545 cache .Start ()
4646
4747 // Create cluster module
@@ -58,13 +58,14 @@ func main() {
5858
5959 // Start listening on /metrics endpoint
6060 http .Handle ("/metrics" , promhttp .Handler ())
61- http .Handle ("/healthcheck" , healthcheck (cluster ))
61+ http .Handle ("/healthcheck" , healthCheck (cluster ))
62+ http .Handle ("/readycheck" , readyCheck (cache ))
6263 listenAddress := net .JoinHostPort (opts .TelemetryHost , strconv .Itoa (opts .TelemetryPort ))
6364 log .Infof ("Listening on: '%s" , listenAddress )
6465 log .Fatal (http .ListenAndServe (listenAddress , nil ))
6566}
6667
67- func healthcheck (cluster * kafka.Cluster ) http.HandlerFunc {
68+ func healthCheck (cluster * kafka.Cluster ) http.HandlerFunc {
6869 return http .HandlerFunc (func (w http.ResponseWriter , r * http.Request ) {
6970 if cluster .IsHealthy () {
7071 w .Write ([]byte ("Healthy" ))
@@ -73,3 +74,16 @@ func healthcheck(cluster *kafka.Cluster) http.HandlerFunc {
7374 }
7475 })
7576}
77+
78+ // readyCheck only returns 200 when it has initially consumed the __consumer_offsets topic
79+ // Utilizing this ready check you can ensure to slow down rolling updates until a pod is ready
80+ // to expose consumer group metrics which are up to date
81+ func readyCheck (storage * storage.MemoryStorage ) http.HandlerFunc {
82+ return http .HandlerFunc (func (w http.ResponseWriter , r * http.Request ) {
83+ if storage .IsConsumed () {
84+ w .Write ([]byte ("Ready" ))
85+ } else {
86+ http .Error (w , "Offsets topic has not been consumed yet" , http .StatusServiceUnavailable )
87+ }
88+ })
89+ }
0 commit comments