Skip to content

Commit 92477c0

Browse files
fix: remove message present in the ivalid partitions
1 parent fe38d94 commit 92477c0

1 file changed

Lines changed: 6 additions & 3 deletions

File tree

publisher/kafka.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,10 @@ func (pr *Kafka) reportBatchMetrics(stats map[string]interface{}) {
158158
// Sum messages across all partitions
159159
msgCnt := 0.0
160160
if partitions, ok := topicStats["partitions"].(map[string]interface{}); ok {
161-
for _, p := range partitions {
161+
for partition, p := range partitions {
162+
if partition == "-1" { // ignore invalid partition
163+
continue
164+
}
162165
partStats, ok := p.(map[string]interface{})
163166
if !ok {
164167
continue
@@ -185,8 +188,8 @@ func (pr *Kafka) reportBatchMetrics(stats map[string]interface{}) {
185188
}
186189

187190
// Emit metrics
188-
metrics.Count("kafka_producer_batch_count_total", int(batchCnt), fmt.Sprintf("topic=%s", topicName))
189-
metrics.Count("kafka_producer_message_count_total", int(msgCnt), fmt.Sprintf("topic=%s", topicName))
191+
metrics.Gauge("kafka_producer_batch_count_total", int(batchCnt), fmt.Sprintf("topic=%s", topicName))
192+
metrics.Gauge("kafka_producer_message_count_total", int(msgCnt), fmt.Sprintf("topic=%s", topicName))
190193
metrics.Gauge("kafka_producer_batch_size_avg_bytes", batchSizeAvg, fmt.Sprintf("topic=%s", topicName))
191194
metrics.Gauge("kafka_producer_messages_per_batch_avg", msgsPerBatch, fmt.Sprintf("topic=%s", topicName))
192195

0 commit comments

Comments
 (0)