From 63ecb0fcd2378f28d386f8545f38d91e985dc518 Mon Sep 17 00:00:00 2001 From: Siddhanta Rath Date: Fri, 22 Aug 2025 12:15:48 +0530 Subject: [PATCH] fix: add all error tags on kafka error --- publisher/kafka.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/publisher/kafka.go b/publisher/kafka.go index 9af1cf67..9a21d1de 100644 --- a/publisher/kafka.go +++ b/publisher/kafka.go @@ -68,22 +68,21 @@ func (pr *Kafka) ProduceBulk(events []*pb.Event, connGroup string, deliveryChann err := pr.kp.Produce(message, deliveryChannel) if err != nil { metrics.Increment("kafka_messages_delivered_total", fmt.Sprintf("success=false,conn_group=%s,event_type=%s", connGroup, event.Type)) + var errorTag string switch err.Error() { case errUnknownTopic: errors[order] = fmt.Errorf("%v %s", err, topic) - metrics.Increment("kafka_error", - fmt.Sprintf("type=%s,topic=%s,event_type=%s,conn_group=%s", - "unknown_topic", topic, event.Type, connGroup)) - + errorTag = "unknown_topic" case errLargeMessageSize: errors[order] = fmt.Errorf("%v %s", err, topic) - metrics.Increment("kafka_error", - fmt.Sprintf("type=%s,topic=%s,event_type=%s,conn_group=%s", - "message_too_large", topic, event.Type, connGroup)) + errorTag = "message_too_large" default: errors[order] = err logger.Errorf("produce to kafka failed due to: %v on topic : %s", err, topic) + errorTag = "unknown" } + metrics.Increment("kafka_error", fmt.Sprintf("type=%s,event_type=%s,conn_group=%s", + errorTag, event.Type, connGroup)) continue } metrics.Increment("kafka_messages_delivered_total", fmt.Sprintf("success=true,conn_group=%s,event_type=%s", connGroup, event.Type)) @@ -97,6 +96,7 @@ func (pr *Kafka) ProduceBulk(events []*pb.Event, connGroup string, deliveryChann eventType := events[i].Type metrics.Decrement("kafka_messages_delivered_total", fmt.Sprintf("success=true,conn_group=%s,event_type=%s", connGroup, eventType)) metrics.Increment("kafka_messages_delivered_total", fmt.Sprintf("success=false,conn_group=%s,event_type=%s", connGroup, eventType)) + metrics.Increment("kafka_error", fmt.Sprintf("type=%s,event_type=%s,conn_group=%s", "delivery_failed", eventType, connGroup)) order := m.Opaque.(int) errors[order] = m.TopicPartition.Error }