@@ -71,16 +71,18 @@ func (pr *Kafka) ProduceBulk(events []*pb.Event, connGroup string, deliveryChann
7171 switch err .Error () {
7272 case errUnknownTopic :
7373 errors [order ] = fmt .Errorf ("%v %s" , err , topic )
74- metrics .Increment ("kafka_unknown_topic_failure_total" ,
75- fmt .Sprintf ("topic=%s,event_type=%s,conn_group=%s" , topic , event .Type , connGroup ))
74+ metrics .Increment ("kafka_error" ,
75+ fmt .Sprintf ("type=%s,topic=%s,event_type=%s,conn_group=%s" ,
76+ "unknown_topic" , topic , event .Type , connGroup ))
7677
7778 case errLargeMessageSize :
7879 errors [order ] = fmt .Errorf ("%v %s" , err , topic )
79- metrics .Increment ("kafka_message_too_large_total" ,
80- fmt .Sprintf ("topic=%s,event_type=%s,conn_group=%s" , topic , event .Type , connGroup ))
80+ metrics .Increment ("kafka_error" ,
81+ fmt .Sprintf ("type=%s,topic=%s,event_type=%s,conn_group=%s" ,
82+ "unknown_topic" , topic , event .Type , connGroup ))
8183 default :
8284 errors [order ] = err
83- logger .Errorf ("produce to kafka failed due to: %v" , err )
85+ logger .Errorf ("produce to kafka failed due to: %v on topic : %s " , err , topic )
8486 }
8587 continue
8688 }
0 commit comments