44 "encoding/json"
55 "fmt"
66 "strings"
7+ "time"
78
89 "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
910 // Importing librd to make it work on vendor mode
@@ -57,7 +58,10 @@ func (pr *Kafka) ProduceBulk(events []*pb.Event, connGroup string, deliveryChann
5758 message := & kafka.Message {
5859 Value : event .EventBytes ,
5960 TopicPartition : kafka.TopicPartition {Topic : & topic , Partition : kafka .PartitionAny },
60- Opaque : order ,
61+ Opaque : struct {
62+ Order int
63+ Sent time.Time
64+ }{order , time .Now ()},
6165 }
6266
6367 err := pr .kp .Produce (message , deliveryChannel )
@@ -78,12 +82,22 @@ func (pr *Kafka) ProduceBulk(events []*pb.Event, connGroup string, deliveryChann
7882 for i := 0 ; i < totalProcessed ; i ++ {
7983 d := <- deliveryChannel
8084 m := d .(* kafka.Message )
85+ // Extract timestamp from Opaque
86+ opaque := m .Opaque .(struct {
87+ Order int
88+ Sent time.Time
89+ })
90+ deliveryLatency := time .Since (opaque .Sent )
91+ order := opaque .Order
8192 if m .TopicPartition .Error != nil {
8293 eventType := events [i ].Type
8394 metrics .Decrement ("kafka_messages_delivered_total" , fmt .Sprintf ("success=true,conn_group=%s,event_type=%s" , connGroup , eventType ))
8495 metrics .Increment ("kafka_messages_delivered_total" , fmt .Sprintf ("success=false,conn_group=%s,event_type=%s" , connGroup , eventType ))
85- order := m .Opaque .(int )
8696 errors [order ] = m .TopicPartition .Error
97+ } else {
98+ // Record metric
99+ metrics .Timing ("kafka_delivery_latency_ms" , deliveryLatency .Milliseconds (),
100+ fmt .Sprintf ("conn_group=%s,event_type=%s" , connGroup , events [order ].Type ))
87101 }
88102 }
89103
0 commit comments