diff --git a/publisher/kafka.go b/publisher/kafka.go index 9a21d1de..40b5c947 100644 --- a/publisher/kafka.go +++ b/publisher/kafka.go @@ -113,9 +113,16 @@ func (pr *Kafka) ReportStats() { switch e := v.(type) { case *kafka.Stats: var stats map[string]interface{} - json.Unmarshal([]byte(e.String()), &stats) - - brokers := stats["brokers"].(map[string]interface{}) + if err := json.Unmarshal([]byte(e.String()), &stats); err != nil { + logger.Errorf("failed to unmarshal kafka stats: %v", err) + continue + } + brokersRawJson, ok := stats["brokers"] + if !ok || brokersRawJson == nil { + logger.Errorf("kafka broker stats missing or null brokers field") + continue + } + brokers := brokersRawJson.(map[string]interface{}) metrics.Gauge("kafka_tx_messages_total", stats["txmsgs"], "") metrics.Gauge("kafka_tx_messages_bytes_total", stats["txmsg_bytes"], "") for _, broker := range brokers {