Skip to content

Commit d9780ef

Browse files
fix: error handling on reporting kafka broker stats (#18)
1 parent 06d2879 commit d9780ef

1 file changed

Lines changed: 10 additions & 3 deletions

File tree

publisher/kafka.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,9 +113,16 @@ func (pr *Kafka) ReportStats() {
113113
switch e := v.(type) {
114114
case *kafka.Stats:
115115
var stats map[string]interface{}
116-
json.Unmarshal([]byte(e.String()), &stats)
117-
118-
brokers := stats["brokers"].(map[string]interface{})
116+
if err := json.Unmarshal([]byte(e.String()), &stats); err != nil {
117+
logger.Errorf("failed to unmarshal kafka stats: %v", err)
118+
continue
119+
}
120+
brokersRawJson, ok := stats["brokers"]
121+
if !ok || brokersRawJson == nil {
122+
logger.Errorf("kafka broker stats missing or null brokers field")
123+
continue
124+
}
125+
brokers := brokersRawJson.(map[string]interface{})
119126
metrics.Gauge("kafka_tx_messages_total", stats["txmsgs"], "")
120127
metrics.Gauge("kafka_tx_messages_bytes_total", stats["txmsg_bytes"], "")
121128
for _, broker := range brokers {

0 commit comments

Comments
 (0)