Skip to content

Commit 5b005c0

Browse files
fix: handle message too large error on kafka client and emit metric (#15)
* fix: error handling on message too large * fix: metric and tag name while producing messages to Kafka
1 parent adca050 commit 5b005c0

2 files changed

Lines changed: 29 additions & 5 deletions

File tree

publisher/kafka.go

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,11 @@ import (
1515
"github.com/goto/raccoon/metrics"
1616
)
1717

18+
const (
19+
errUnknownTopic = "Local: Unknown topic" //error msg while producing a message to a topic which is not present in the kafka cluster
20+
errLargeMessageSize = "Broker: Message size too large" //error msg while producing a message which is larger than message.max.bytes config
21+
)
22+
1823
// KafkaProducer Produce data to kafka synchronously
1924
type KafkaProducer interface {
2025
// ProduceBulk message to kafka. Block until all messages are sent. Return array of error. Order is not guaranteed.
@@ -63,11 +68,21 @@ func (pr *Kafka) ProduceBulk(events []*pb.Event, connGroup string, deliveryChann
6368
err := pr.kp.Produce(message, deliveryChannel)
6469
if err != nil {
6570
metrics.Increment("kafka_messages_delivered_total", fmt.Sprintf("success=false,conn_group=%s,event_type=%s", connGroup, event.Type))
66-
if err.Error() == "Local: Unknown topic" {
71+
switch err.Error() {
72+
case errUnknownTopic:
73+
errors[order] = fmt.Errorf("%v %s", err, topic)
74+
metrics.Increment("kafka_error",
75+
fmt.Sprintf("type=%s,topic=%s,event_type=%s,conn_group=%s",
76+
"unknown_topic", topic, event.Type, connGroup))
77+
78+
case errLargeMessageSize:
6779
errors[order] = fmt.Errorf("%v %s", err, topic)
68-
metrics.Increment("kafka_unknown_topic_failure_total", fmt.Sprintf("topic=%s,event_type=%s,conn_group=%s", topic, event.Type, connGroup))
69-
} else {
80+
metrics.Increment("kafka_error",
81+
fmt.Sprintf("type=%s,topic=%s,event_type=%s,conn_group=%s",
82+
"message_too_large", topic, event.Type, connGroup))
83+
default:
7084
errors[order] = err
85+
logger.Errorf("produce to kafka failed due to: %v on topic : %s", err, topic)
7186
}
7287
continue
7388
}

publisher/kafka_test.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,11 +90,20 @@ func TestKafka_ProduceBulk(suite *testing.T) {
9090

9191
t.Run("Should return topic name when unknown topic is returned", func(t *testing.T) {
9292
client := &mockClient{}
93-
client.On("Produce", mock.Anything, mock.Anything).Return(fmt.Errorf("Local: Unknown topic")).Once()
93+
client.On("Produce", mock.Anything, mock.Anything).Return(fmt.Errorf(errUnknownTopic)).Once()
9494
kp := NewKafkaFromClient(client, 10, "%s")
9595

9696
err := kp.ProduceBulk([]*pb.Event{{EventBytes: []byte{}, Type: topic}}, "group1", make(chan kafka.Event, 2))
97-
assert.EqualError(t, err.(BulkError).Errors[0], "Local: Unknown topic "+topic)
97+
assert.EqualError(t, err.(BulkError).Errors[0], errUnknownTopic+" "+topic)
98+
})
99+
100+
t.Run("Should return topic name when message size is too large", func(t *testing.T) {
101+
client := &mockClient{}
102+
client.On("Produce", mock.Anything, mock.Anything).Return(fmt.Errorf(errLargeMessageSize)).Once()
103+
kp := NewKafkaFromClient(client, 10, "%s")
104+
105+
err := kp.ProduceBulk([]*pb.Event{{EventBytes: []byte{}, Type: topic}}, "group1", make(chan kafka.Event, 2))
106+
assert.EqualError(t, err.(BulkError).Errors[0], errLargeMessageSize+" "+topic)
98107
})
99108
})
100109

0 commit comments

Comments
 (0)