Skip to content

Commit 335b6ed

Browse files
refactor: error message
1 parent 5e649eb commit 335b6ed

2 files changed

Lines changed: 11 additions & 6 deletions

File tree

publisher/kafka.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,11 @@ import (
1515
"github.com/goto/raccoon/metrics"
1616
)
1717

18+
const (
19+
errUnknownTopic = "Local: Unknown topic" //error msg while producing a message to a topic which is not present in the kafka cluster
20+
errLargeMessageSize = "Broker: Message size too large" //error msg while producing a message which is larger than message.max.bytes config
21+
)
22+
1823
// KafkaProducer Produce data to kafka synchronously
1924
type KafkaProducer interface {
2025
// ProduceBulk message to kafka. Block until all messages are sent. Return array of error. Order is not guaranteed.
@@ -64,12 +69,12 @@ func (pr *Kafka) ProduceBulk(events []*pb.Event, connGroup string, deliveryChann
6469
if err != nil {
6570
metrics.Increment("kafka_messages_delivered_total", fmt.Sprintf("success=false,conn_group=%s,event_type=%s", connGroup, event.Type))
6671
switch err.Error() {
67-
case "Local: Unknown topic":
72+
case errUnknownTopic:
6873
errors[order] = fmt.Errorf("%v %s", err, topic)
6974
metrics.Increment("kafka_unknown_topic_failure_total",
7075
fmt.Sprintf("topic=%s,event_type=%s,conn_group=%s", topic, event.Type, connGroup))
7176

72-
case "Broker: Message size too large":
77+
case errLargeMessageSize:
7378
errors[order] = fmt.Errorf("%v %s", err, topic)
7479
metrics.Increment("kafka_message_too_large_total",
7580
fmt.Sprintf("topic=%s,event_type=%s,conn_group=%s", topic, event.Type, connGroup))

publisher/kafka_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -90,20 +90,20 @@ func TestKafka_ProduceBulk(suite *testing.T) {
9090

9191
t.Run("Should return topic name when unknown topic is returned", func(t *testing.T) {
9292
client := &mockClient{}
93-
client.On("Produce", mock.Anything, mock.Anything).Return(fmt.Errorf("Local: Unknown topic")).Once()
93+
client.On("Produce", mock.Anything, mock.Anything).Return(fmt.Errorf(errUnknownTopic)).Once()
9494
kp := NewKafkaFromClient(client, 10, "%s")
9595

9696
err := kp.ProduceBulk([]*pb.Event{{EventBytes: []byte{}, Type: topic}}, "group1", make(chan kafka.Event, 2))
97-
assert.EqualError(t, err.(BulkError).Errors[0], "Local: Unknown topic "+topic)
97+
assert.EqualError(t, err.(BulkError).Errors[0], errUnknownTopic+" "+topic)
9898
})
9999

100100
t.Run("Should return topic name when message size is too large", func(t *testing.T) {
101101
client := &mockClient{}
102-
client.On("Produce", mock.Anything, mock.Anything).Return(fmt.Errorf("Broker: Message size too large")).Once()
102+
client.On("Produce", mock.Anything, mock.Anything).Return(fmt.Errorf(errLargeMessageSize)).Once()
103103
kp := NewKafkaFromClient(client, 10, "%s")
104104

105105
err := kp.ProduceBulk([]*pb.Event{{EventBytes: []byte{}, Type: topic}}, "group1", make(chan kafka.Event, 2))
106-
assert.EqualError(t, err.(BulkError).Errors[0], "Broker: Message size too large "+topic)
106+
assert.EqualError(t, err.(BulkError).Errors[0], errLargeMessageSize+" "+topic)
107107
})
108108
})
109109

0 commit comments

Comments
 (0)