From d299f3539a17cba5222020fff88115a4cbb4e840 Mon Sep 17 00:00:00 2001 From: Siddhanta Rath Date: Thu, 21 Aug 2025 14:04:24 +0530 Subject: [PATCH 1/6] fix: handle message too large error on kafka client and emit metric --- publisher/kafka.go | 13 ++++++++++--- publisher/kafka_test.go | 9 +++++++++ 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/publisher/kafka.go b/publisher/kafka.go index 14754eef..403d54fb 100644 --- a/publisher/kafka.go +++ b/publisher/kafka.go @@ -63,10 +63,17 @@ func (pr *Kafka) ProduceBulk(events []*pb.Event, connGroup string, deliveryChann err := pr.kp.Produce(message, deliveryChannel) if err != nil { metrics.Increment("kafka_messages_delivered_total", fmt.Sprintf("success=false,conn_group=%s,event_type=%s", connGroup, event.Type)) - if err.Error() == "Local: Unknown topic" { + switch err.Error() { + case "Local: Unknown topic": errors[order] = fmt.Errorf("%v %s", err, topic) - metrics.Increment("kafka_unknown_topic_failure_total", fmt.Sprintf("topic=%s,event_type=%s,conn_group=%s", topic, event.Type, connGroup)) - } else { + metrics.Increment("kafka_unknown_topic_failure_total", + fmt.Sprintf("topic=%s,event_type=%s,conn_group=%s", topic, event.Type, connGroup)) + + case "Local: Message size too large": + errors[order] = fmt.Errorf("%v %s", err, topic) + metrics.Increment("kafka_message_too_large_total", + fmt.Sprintf("topic=%s,event_type=%s,conn_group=%s", topic, event.Type, connGroup)) + default: errors[order] = err } continue diff --git a/publisher/kafka_test.go b/publisher/kafka_test.go index 9a0a4651..164e6e2f 100644 --- a/publisher/kafka_test.go +++ b/publisher/kafka_test.go @@ -96,6 +96,15 @@ func TestKafka_ProduceBulk(suite *testing.T) { err := kp.ProduceBulk([]*pb.Event{{EventBytes: []byte{}, Type: topic}}, "group1", make(chan kafka.Event, 2)) assert.EqualError(t, err.(BulkError).Errors[0], "Local: Unknown topic "+topic) }) + + t.Run("Should return topic name when message size large is found", func(t *testing.T) { + client := &mockClient{} + client.On("Produce", mock.Anything, mock.Anything).Return(fmt.Errorf("Local: Message size too large")).Once() + kp := NewKafkaFromClient(client, 10, "%s") + + err := kp.ProduceBulk([]*pb.Event{{EventBytes: []byte{}, Type: topic}}, "group1", make(chan kafka.Event, 2)) + assert.EqualError(t, err.(BulkError).Errors[0], "Local: Message size too large "+topic) + }) }) suite.Run("MessageFailedToProduce", func(t *testing.T) { From b21b499b5845d053e6486759b35580f73ace84b5 Mon Sep 17 00:00:00 2001 From: Siddhanta Rath Date: Thu, 21 Aug 2025 15:26:55 +0530 Subject: [PATCH 2/6] debug: add logger statement --- publisher/kafka.go | 1 + 1 file changed, 1 insertion(+) diff --git a/publisher/kafka.go b/publisher/kafka.go index 403d54fb..02259882 100644 --- a/publisher/kafka.go +++ b/publisher/kafka.go @@ -63,6 +63,7 @@ func (pr *Kafka) ProduceBulk(events []*pb.Event, connGroup string, deliveryChann err := pr.kp.Produce(message, deliveryChannel) if err != nil { metrics.Increment("kafka_messages_delivered_total", fmt.Sprintf("success=false,conn_group=%s,event_type=%s", connGroup, event.Type)) + logger.Errorf("produce to kafka failed : %v and string value : %s", err, err.Error()) switch err.Error() { case "Local: Unknown topic": errors[order] = fmt.Errorf("%v %s", err, topic) From 5e649eb9fe53b244cb3f139b305f96cc84300cb1 Mon Sep 17 00:00:00 2001 From: Siddhanta Rath Date: Thu, 21 Aug 2025 16:38:06 +0530 Subject: [PATCH 3/6] fix : error handling on message too large --- publisher/kafka.go | 4 ++-- publisher/kafka_test.go | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/publisher/kafka.go b/publisher/kafka.go index 02259882..f37f68ca 100644 --- a/publisher/kafka.go +++ b/publisher/kafka.go @@ -63,19 +63,19 @@ func (pr *Kafka) ProduceBulk(events []*pb.Event, connGroup string, deliveryChann err := pr.kp.Produce(message, deliveryChannel) if err != nil { metrics.Increment("kafka_messages_delivered_total", fmt.Sprintf("success=false,conn_group=%s,event_type=%s", connGroup, event.Type)) - logger.Errorf("produce to kafka failed : %v and string value : %s", err, err.Error()) switch err.Error() { case "Local: Unknown topic": errors[order] = fmt.Errorf("%v %s", err, topic) metrics.Increment("kafka_unknown_topic_failure_total", fmt.Sprintf("topic=%s,event_type=%s,conn_group=%s", topic, event.Type, connGroup)) - case "Local: Message size too large": + case "Broker: Message size too large": errors[order] = fmt.Errorf("%v %s", err, topic) metrics.Increment("kafka_message_too_large_total", fmt.Sprintf("topic=%s,event_type=%s,conn_group=%s", topic, event.Type, connGroup)) default: errors[order] = err + logger.Errorf("produce to kafka failed due to: %v", err) } continue } diff --git a/publisher/kafka_test.go b/publisher/kafka_test.go index 164e6e2f..5596a894 100644 --- a/publisher/kafka_test.go +++ b/publisher/kafka_test.go @@ -97,13 +97,13 @@ func TestKafka_ProduceBulk(suite *testing.T) { assert.EqualError(t, err.(BulkError).Errors[0], "Local: Unknown topic "+topic) }) - t.Run("Should return topic name when message size large is found", func(t *testing.T) { + t.Run("Should return topic name when message size is too large", func(t *testing.T) { client := &mockClient{} - client.On("Produce", mock.Anything, mock.Anything).Return(fmt.Errorf("Local: Message size too large")).Once() + client.On("Produce", mock.Anything, mock.Anything).Return(fmt.Errorf("Broker: Message size too large")).Once() kp := NewKafkaFromClient(client, 10, "%s") err := kp.ProduceBulk([]*pb.Event{{EventBytes: []byte{}, Type: topic}}, "group1", make(chan kafka.Event, 2)) - assert.EqualError(t, err.(BulkError).Errors[0], "Local: Message size too large "+topic) + assert.EqualError(t, err.(BulkError).Errors[0], "Broker: Message size too large "+topic) }) }) From 335b6ed41d1adee70781425b15bd7b3c85c2637f Mon Sep 17 00:00:00 2001 From: Siddhanta Rath Date: Thu, 21 Aug 2025 16:51:09 +0530 Subject: [PATCH 4/6] refactor: error message --- publisher/kafka.go | 9 +++++++-- publisher/kafka_test.go | 8 ++++---- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/publisher/kafka.go b/publisher/kafka.go index f37f68ca..892aede2 100644 --- a/publisher/kafka.go +++ b/publisher/kafka.go @@ -15,6 +15,11 @@ import ( "github.com/goto/raccoon/metrics" ) +const ( + errUnknownTopic = "Local: Unknown topic" //error msg while producing a message to a topic which is not present in the kafka cluster + errLargeMessageSize = "Broker: Message size too large" //error msg while producing a message which is larger than message.max.bytes config +) + // KafkaProducer Produce data to kafka synchronously type KafkaProducer interface { // ProduceBulk message to kafka. Block until all messages are sent. Return array of error. Order is not guaranteed. @@ -64,12 +69,12 @@ func (pr *Kafka) ProduceBulk(events []*pb.Event, connGroup string, deliveryChann if err != nil { metrics.Increment("kafka_messages_delivered_total", fmt.Sprintf("success=false,conn_group=%s,event_type=%s", connGroup, event.Type)) switch err.Error() { - case "Local: Unknown topic": + case errUnknownTopic: errors[order] = fmt.Errorf("%v %s", err, topic) metrics.Increment("kafka_unknown_topic_failure_total", fmt.Sprintf("topic=%s,event_type=%s,conn_group=%s", topic, event.Type, connGroup)) - case "Broker: Message size too large": + case errLargeMessageSize: errors[order] = fmt.Errorf("%v %s", err, topic) metrics.Increment("kafka_message_too_large_total", fmt.Sprintf("topic=%s,event_type=%s,conn_group=%s", topic, event.Type, connGroup)) diff --git a/publisher/kafka_test.go b/publisher/kafka_test.go index 5596a894..6af8b33d 100644 --- a/publisher/kafka_test.go +++ b/publisher/kafka_test.go @@ -90,20 +90,20 @@ func TestKafka_ProduceBulk(suite *testing.T) { t.Run("Should return topic name when unknown topic is returned", func(t *testing.T) { client := &mockClient{} - client.On("Produce", mock.Anything, mock.Anything).Return(fmt.Errorf("Local: Unknown topic")).Once() + client.On("Produce", mock.Anything, mock.Anything).Return(fmt.Errorf(errUnknownTopic)).Once() kp := NewKafkaFromClient(client, 10, "%s") err := kp.ProduceBulk([]*pb.Event{{EventBytes: []byte{}, Type: topic}}, "group1", make(chan kafka.Event, 2)) - assert.EqualError(t, err.(BulkError).Errors[0], "Local: Unknown topic "+topic) + assert.EqualError(t, err.(BulkError).Errors[0], errUnknownTopic+" "+topic) }) t.Run("Should return topic name when message size is too large", func(t *testing.T) { client := &mockClient{} - client.On("Produce", mock.Anything, mock.Anything).Return(fmt.Errorf("Broker: Message size too large")).Once() + client.On("Produce", mock.Anything, mock.Anything).Return(fmt.Errorf(errLargeMessageSize)).Once() kp := NewKafkaFromClient(client, 10, "%s") err := kp.ProduceBulk([]*pb.Event{{EventBytes: []byte{}, Type: topic}}, "group1", make(chan kafka.Event, 2)) - assert.EqualError(t, err.(BulkError).Errors[0], "Broker: Message size too large "+topic) + assert.EqualError(t, err.(BulkError).Errors[0], errLargeMessageSize+" "+topic) }) }) From 3ebb76d5ce34316e3bb02bd8b891bff93108b8eb Mon Sep 17 00:00:00 2001 From: Siddhanta Rath Date: Thu, 21 Aug 2025 21:03:21 +0530 Subject: [PATCH 5/6] refactor: metric name and tags while producing to kafka --- publisher/kafka.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/publisher/kafka.go b/publisher/kafka.go index 892aede2..701f3ce8 100644 --- a/publisher/kafka.go +++ b/publisher/kafka.go @@ -71,16 +71,18 @@ func (pr *Kafka) ProduceBulk(events []*pb.Event, connGroup string, deliveryChann switch err.Error() { case errUnknownTopic: errors[order] = fmt.Errorf("%v %s", err, topic) - metrics.Increment("kafka_unknown_topic_failure_total", - fmt.Sprintf("topic=%s,event_type=%s,conn_group=%s", topic, event.Type, connGroup)) + metrics.Increment("kafka_error", + fmt.Sprintf("type=%s,topic=%s,event_type=%s,conn_group=%s", + "unknown_topic", topic, event.Type, connGroup)) case errLargeMessageSize: errors[order] = fmt.Errorf("%v %s", err, topic) - metrics.Increment("kafka_message_too_large_total", - fmt.Sprintf("topic=%s,event_type=%s,conn_group=%s", topic, event.Type, connGroup)) + metrics.Increment("kafka_error", + fmt.Sprintf("type=%s,topic=%s,event_type=%s,conn_group=%s", + "unknown_topic", topic, event.Type, connGroup)) default: errors[order] = err - logger.Errorf("produce to kafka failed due to: %v", err) + logger.Errorf("produce to kafka failed due to: %v on topic : %s", err, topic) } continue } From bc0392e24f32e784369d4aa37359409197eb3796 Mon Sep 17 00:00:00 2001 From: Siddhanta Rath Date: Thu, 21 Aug 2025 21:06:40 +0530 Subject: [PATCH 6/6] fix: metric tag name for message too large --- publisher/kafka.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/publisher/kafka.go b/publisher/kafka.go index 701f3ce8..9af1cf67 100644 --- a/publisher/kafka.go +++ b/publisher/kafka.go @@ -79,7 +79,7 @@ func (pr *Kafka) ProduceBulk(events []*pb.Event, connGroup string, deliveryChann errors[order] = fmt.Errorf("%v %s", err, topic) metrics.Increment("kafka_error", fmt.Sprintf("type=%s,topic=%s,event_type=%s,conn_group=%s", - "unknown_topic", topic, event.Type, connGroup)) + "message_too_large", topic, event.Type, connGroup)) default: errors[order] = err logger.Errorf("produce to kafka failed due to: %v on topic : %s", err, topic)