Skip to content

Commit d299f35

Browse files
fix: handle message too large error on kafka client and emit metric
1 parent 32a0859 commit d299f35

2 files changed

Lines changed: 19 additions & 3 deletions

File tree

publisher/kafka.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,10 +63,17 @@ func (pr *Kafka) ProduceBulk(events []*pb.Event, connGroup string, deliveryChann
6363
err := pr.kp.Produce(message, deliveryChannel)
6464
if err != nil {
6565
metrics.Increment("kafka_messages_delivered_total", fmt.Sprintf("success=false,conn_group=%s,event_type=%s", connGroup, event.Type))
66-
if err.Error() == "Local: Unknown topic" {
66+
switch err.Error() {
67+
case "Local: Unknown topic":
6768
errors[order] = fmt.Errorf("%v %s", err, topic)
68-
metrics.Increment("kafka_unknown_topic_failure_total", fmt.Sprintf("topic=%s,event_type=%s,conn_group=%s", topic, event.Type, connGroup))
69-
} else {
69+
metrics.Increment("kafka_unknown_topic_failure_total",
70+
fmt.Sprintf("topic=%s,event_type=%s,conn_group=%s", topic, event.Type, connGroup))
71+
72+
case "Local: Message size too large":
73+
errors[order] = fmt.Errorf("%v %s", err, topic)
74+
metrics.Increment("kafka_message_too_large_total",
75+
fmt.Sprintf("topic=%s,event_type=%s,conn_group=%s", topic, event.Type, connGroup))
76+
default:
7077
errors[order] = err
7178
}
7279
continue

publisher/kafka_test.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,15 @@ func TestKafka_ProduceBulk(suite *testing.T) {
9696
err := kp.ProduceBulk([]*pb.Event{{EventBytes: []byte{}, Type: topic}}, "group1", make(chan kafka.Event, 2))
9797
assert.EqualError(t, err.(BulkError).Errors[0], "Local: Unknown topic "+topic)
9898
})
99+
100+
t.Run("Should return topic name when message size large is found", func(t *testing.T) {
101+
client := &mockClient{}
102+
client.On("Produce", mock.Anything, mock.Anything).Return(fmt.Errorf("Local: Message size too large")).Once()
103+
kp := NewKafkaFromClient(client, 10, "%s")
104+
105+
err := kp.ProduceBulk([]*pb.Event{{EventBytes: []byte{}, Type: topic}}, "group1", make(chan kafka.Event, 2))
106+
assert.EqualError(t, err.(BulkError).Errors[0], "Local: Message size too large "+topic)
107+
})
99108
})
100109

101110
suite.Run("MessageFailedToProduce", func(t *testing.T) {

0 commit comments

Comments
 (0)