From fc224b4d2ab5db95ee792866bee3861f1b0e2f33 Mon Sep 17 00:00:00 2001 From: Siddhanta Rath Date: Mon, 25 Aug 2025 14:52:06 +0530 Subject: [PATCH] fix: error handling on reporting kafka broker stats --- publisher/kafka.go | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/publisher/kafka.go b/publisher/kafka.go index 9a21d1de..40b5c947 100644 --- a/publisher/kafka.go +++ b/publisher/kafka.go @@ -113,9 +113,16 @@ func (pr *Kafka) ReportStats() { switch e := v.(type) { case *kafka.Stats: var stats map[string]interface{} - json.Unmarshal([]byte(e.String()), &stats) - - brokers := stats["brokers"].(map[string]interface{}) + if err := json.Unmarshal([]byte(e.String()), &stats); err != nil { + logger.Errorf("failed to unmarshal kafka stats: %v", err) + continue + } + brokersRawJson, ok := stats["brokers"] + if !ok || brokersRawJson == nil { + logger.Errorf("kafka broker stats missing or null brokers field") + continue + } + brokers := brokersRawJson.(map[string]interface{}) metrics.Gauge("kafka_tx_messages_total", stats["txmsgs"], "") metrics.Gauge("kafka_tx_messages_bytes_total", stats["txmsg_bytes"], "") for _, broker := range brokers {