@@ -25,7 +25,7 @@ const (
2525 errLargeMessageSize = "Broker: Message size too large" //error msg while producing a message which is larger than message.max.bytes config
2626)
2727
28- var DeliveryEventCount atomic. Int64
28+ var DeliveryEventCount int64
2929
3030// KafkaProducer Produce data to kafka synchronously
3131type KafkaProducer interface {
@@ -107,7 +107,7 @@ func (pr *Kafka) ProduceBulk(events []*pb.Event, connGroup string, deliveryChann
107107 order := m .Opaque .(int )
108108 errors [order ] = m .TopicPartition .Error
109109 }
110- DeliveryEventCount . Add ( 1 )
110+ atomic . AddInt64 ( & DeliveryEventCount , 1 )
111111 }
112112
113113 if allNil (errors ) {
@@ -165,16 +165,18 @@ func (pr *Kafka) ReportDeliveryEventCount() {
165165 defer ticker .Stop ()
166166
167167 for range ticker .C {
168- fmt .Println ("get the current value:" , DeliveryEventCount .Load ())
168+ // read
169+ eventCount := atomic .LoadInt64 (& DeliveryEventCount )
170+ fmt .Println ("get the current value:" , eventCount )
169171 //build kafka message
170172 msg := & proto.TotalEventCountMessage {
171173 EventTimestamp : timestamppb .Now (),
172- EventCount : int32 (DeliveryEventCount . Load () ),
174+ EventCount : int32 (eventCount ),
173175 }
174176 //produce to kafka
175177 pr .ProduceTotalEventMessage ("clickstream-total-event" , msg )
176178 //reset the counter
177- DeliveryEventCount . Store ( 0 )
179+ atomic . StoreInt64 ( & DeliveryEventCount , 0 )
178180 }
179181}
180182
0 commit comments