Skip to content

Commit 5e649eb

Browse files
fix : error handling on message too large
1 parent b21b499 commit 5e649eb

2 files changed

Lines changed: 5 additions & 5 deletions

File tree

publisher/kafka.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,19 +63,19 @@ func (pr *Kafka) ProduceBulk(events []*pb.Event, connGroup string, deliveryChann
6363
err := pr.kp.Produce(message, deliveryChannel)
6464
if err != nil {
6565
metrics.Increment("kafka_messages_delivered_total", fmt.Sprintf("success=false,conn_group=%s,event_type=%s", connGroup, event.Type))
66-
logger.Errorf("produce to kafka failed : %v and string value : %s", err, err.Error())
6766
switch err.Error() {
6867
case "Local: Unknown topic":
6968
errors[order] = fmt.Errorf("%v %s", err, topic)
7069
metrics.Increment("kafka_unknown_topic_failure_total",
7170
fmt.Sprintf("topic=%s,event_type=%s,conn_group=%s", topic, event.Type, connGroup))
7271

73-
case "Local: Message size too large":
72+
case "Broker: Message size too large":
7473
errors[order] = fmt.Errorf("%v %s", err, topic)
7574
metrics.Increment("kafka_message_too_large_total",
7675
fmt.Sprintf("topic=%s,event_type=%s,conn_group=%s", topic, event.Type, connGroup))
7776
default:
7877
errors[order] = err
78+
logger.Errorf("produce to kafka failed due to: %v", err)
7979
}
8080
continue
8181
}

publisher/kafka_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -97,13 +97,13 @@ func TestKafka_ProduceBulk(suite *testing.T) {
9797
assert.EqualError(t, err.(BulkError).Errors[0], "Local: Unknown topic "+topic)
9898
})
9999

100-
t.Run("Should return topic name when message size large is found", func(t *testing.T) {
100+
t.Run("Should return topic name when message size is too large", func(t *testing.T) {
101101
client := &mockClient{}
102-
client.On("Produce", mock.Anything, mock.Anything).Return(fmt.Errorf("Local: Message size too large")).Once()
102+
client.On("Produce", mock.Anything, mock.Anything).Return(fmt.Errorf("Broker: Message size too large")).Once()
103103
kp := NewKafkaFromClient(client, 10, "%s")
104104

105105
err := kp.ProduceBulk([]*pb.Event{{EventBytes: []byte{}, Type: topic}}, "group1", make(chan kafka.Event, 2))
106-
assert.EqualError(t, err.(BulkError).Errors[0], "Local: Message size too large "+topic)
106+
assert.EqualError(t, err.(BulkError).Errors[0], "Broker: Message size too large "+topic)
107107
})
108108
})
109109

0 commit comments

Comments
 (0)