@@ -3,7 +3,12 @@ package publisher
33import (
44 "encoding/json"
55 "fmt"
6+ "github.com/goto/raccoon/proto"
7+ "github.com/goto/raccoon/serialization"
8+ "google.golang.org/protobuf/types/known/timestamppb"
69 "strings"
10+ "sync/atomic"
11+ "time"
712
813 "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
914 // Importing librd to make it work on vendor mode
@@ -20,6 +25,8 @@ const (
2025 errLargeMessageSize = "Broker: Message size too large" //error msg while producing a message which is larger than message.max.bytes config
2126)
2227
28+ var DeliveryEventCount atomic.Int64
29+
2330// KafkaProducer Produce data to kafka synchronously
2431type KafkaProducer interface {
2532 // ProduceBulk message to kafka. Block until all messages are sent. Return array of error. Order is not guaranteed.
@@ -100,6 +107,7 @@ func (pr *Kafka) ProduceBulk(events []*pb.Event, connGroup string, deliveryChann
100107 order := m .Opaque .(int )
101108 errors [order ] = m .TopicPartition .Error
102109 }
110+ DeliveryEventCount .Add (1 )
103111 }
104112
105113 if allNil (errors ) {
@@ -108,6 +116,17 @@ func (pr *Kafka) ProduceBulk(events []*pb.Event, connGroup string, deliveryChann
108116 return BulkError {Errors : errors }
109117}
110118
119+ func (pr * Kafka ) ProduceTotalEventMessage (topicName string , event * proto.TotalEventCountMessage ) error {
120+ value , err := serialization .SerializeProto (event )
121+ if err != nil {
122+ return fmt .Errorf ("failed to serialize proto: %w" , err )
123+ }
124+ return pr .kp .Produce (& kafka.Message {
125+ TopicPartition : kafka.TopicPartition {Topic : & topicName , Partition : kafka .PartitionAny },
126+ Value : value ,
127+ }, nil )
128+ }
129+
111130func (pr * Kafka ) ReportStats () {
112131 for v := range pr .kp .Events () {
113132 switch e := v .(type ) {
@@ -141,6 +160,24 @@ func (pr *Kafka) ReportStats() {
141160 }
142161}
143162
163+ func (pr * Kafka ) ReportDeliveryEventCount () {
164+ ticker := time .NewTicker (1 * time .Minute )
165+ defer ticker .Stop ()
166+
167+ for range ticker .C {
168+ fmt .Println ("get the current value:" , DeliveryEventCount .Load ())
169+ //build kafka message
170+ msg := & proto.TotalEventCountMessage {
171+ EventTimestamp : timestamppb .Now (),
172+ EventCount : int32 (DeliveryEventCount .Load ()),
173+ }
174+ //produce to kafka
175+ pr .ProduceTotalEventMessage ("clickstream-total-event" , msg )
176+ //reset the counter
177+ DeliveryEventCount .Store (0 )
178+ }
179+ }
180+
144181// Close wait for outstanding messages to be delivered within given flush interval timeout.
145182func (pr * Kafka ) Close () int {
146183 remaining := pr .kp .Flush (pr .flushInterval )
0 commit comments