@@ -137,7 +137,7 @@ func (pr *Kafka) ReportStats() {
137137 }
138138
139139 default :
140- fmt . Printf ("Ignored %v \n " , e )
140+ logger . Infof ("Ignored %v \n " , e )
141141 }
142142 }
143143}
@@ -154,38 +154,12 @@ func (pr *Kafka) reportBatchMetrics(stats map[string]interface{}) {
154154 if ! ok {
155155 continue
156156 }
157-
158- // Sum messages across all partitions
159- msgCnt := 0.0
160- if partitions , ok := topicStats ["partitions" ].(map [string ]interface {}); ok {
161- for partition , p := range partitions {
162- if partition == "-1" { // ignore invalid partition
163- continue
164- }
165- partStats , ok := p .(map [string ]interface {})
166- if ! ok {
167- continue
168- }
169- msgCnt += getFloat (partStats , "msgs" )
170- }
171- }
172-
173- // Batch metrics
174- batchCnt := 0.0
175- if bc , ok := topicStats ["batchcnt" ].(map [string ]interface {}); ok {
176- batchCnt = getFloat (bc , "cnt" )
177- }
178-
179157 batchSizeAvg := 0.0
180158 if bs , ok := topicStats ["batchsize" ].(map [string ]interface {}); ok {
181159 batchSizeAvg = getFloat (bs , "avg" )
182160 }
183-
184161 // Emit metrics
185- metrics .Gauge ("kafka_producer_batch_count_total" , int (batchCnt ), fmt .Sprintf ("topic=%s" , topicName ))
186- metrics .Gauge ("kafka_producer_message_count_total" , int (msgCnt ), fmt .Sprintf ("topic=%s" , topicName ))
187162 metrics .Gauge ("kafka_producer_batch_size_avg_bytes" , batchSizeAvg , fmt .Sprintf ("topic=%s" , topicName ))
188-
189163 }
190164}
191165
0 commit comments