diff --git a/publisher/kafka.go b/publisher/kafka.go index 14754eef..9af1cf67 100644 --- a/publisher/kafka.go +++ b/publisher/kafka.go @@ -15,6 +15,11 @@ import ( "github.com/goto/raccoon/metrics" ) +const ( + errUnknownTopic = "Local: Unknown topic" //error msg while producing a message to a topic which is not present in the kafka cluster + errLargeMessageSize = "Broker: Message size too large" //error msg while producing a message which is larger than message.max.bytes config +) + // KafkaProducer Produce data to kafka synchronously type KafkaProducer interface { // ProduceBulk message to kafka. Block until all messages are sent. Return array of error. Order is not guaranteed. @@ -63,11 +68,21 @@ func (pr *Kafka) ProduceBulk(events []*pb.Event, connGroup string, deliveryChann err := pr.kp.Produce(message, deliveryChannel) if err != nil { metrics.Increment("kafka_messages_delivered_total", fmt.Sprintf("success=false,conn_group=%s,event_type=%s", connGroup, event.Type)) - if err.Error() == "Local: Unknown topic" { + switch err.Error() { + case errUnknownTopic: + errors[order] = fmt.Errorf("%v %s", err, topic) + metrics.Increment("kafka_error", + fmt.Sprintf("type=%s,topic=%s,event_type=%s,conn_group=%s", + "unknown_topic", topic, event.Type, connGroup)) + + case errLargeMessageSize: errors[order] = fmt.Errorf("%v %s", err, topic) - metrics.Increment("kafka_unknown_topic_failure_total", fmt.Sprintf("topic=%s,event_type=%s,conn_group=%s", topic, event.Type, connGroup)) - } else { + metrics.Increment("kafka_error", + fmt.Sprintf("type=%s,topic=%s,event_type=%s,conn_group=%s", + "message_too_large", topic, event.Type, connGroup)) + default: errors[order] = err + logger.Errorf("produce to kafka failed due to: %v on topic : %s", err, topic) } continue } diff --git a/publisher/kafka_test.go b/publisher/kafka_test.go index 9a0a4651..6af8b33d 100644 --- a/publisher/kafka_test.go +++ b/publisher/kafka_test.go @@ -90,11 +90,20 @@ func TestKafka_ProduceBulk(suite *testing.T) { t.Run("Should return topic name when unknown topic is returned", func(t *testing.T) { client := &mockClient{} - client.On("Produce", mock.Anything, mock.Anything).Return(fmt.Errorf("Local: Unknown topic")).Once() + client.On("Produce", mock.Anything, mock.Anything).Return(fmt.Errorf(errUnknownTopic)).Once() kp := NewKafkaFromClient(client, 10, "%s") err := kp.ProduceBulk([]*pb.Event{{EventBytes: []byte{}, Type: topic}}, "group1", make(chan kafka.Event, 2)) - assert.EqualError(t, err.(BulkError).Errors[0], "Local: Unknown topic "+topic) + assert.EqualError(t, err.(BulkError).Errors[0], errUnknownTopic+" "+topic) + }) + + t.Run("Should return topic name when message size is too large", func(t *testing.T) { + client := &mockClient{} + client.On("Produce", mock.Anything, mock.Anything).Return(fmt.Errorf(errLargeMessageSize)).Once() + kp := NewKafkaFromClient(client, 10, "%s") + + err := kp.ProduceBulk([]*pb.Event{{EventBytes: []byte{}, Type: topic}}, "group1", make(chan kafka.Event, 2)) + assert.EqualError(t, err.(BulkError).Errors[0], errLargeMessageSize+" "+topic) }) })