Skip to content

Commit 2e64ff0

Browse files
refactor: optimise the atomic increment call
1 parent 064641e commit 2e64ff0

1 file changed

Lines changed: 7 additions & 1 deletion

File tree

publisher/kafka.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ func (pr *Kafka) ProduceBulk(events []*pb.Event, connGroup string, deliveryChann
102102
totalProcessed++
103103
}
104104
// Wait for deliveryChannel as many as processed
105+
localSuccesses := int64(0)
105106
for i := 0; i < totalProcessed; i++ {
106107
d := <-deliveryChannel
107108
m := d.(*kafka.Message)
@@ -113,10 +114,15 @@ func (pr *Kafka) ProduceBulk(events []*pb.Event, connGroup string, deliveryChann
113114
order := m.Opaque.(int)
114115
errors[order] = m.TopicPartition.Error
115116
} else {
116-
atomic.AddInt64(&DeliveryEventCount, 1) // if no error is received, increment the count
117+
localSuccesses++
117118
}
118119
}
119120

121+
// Single atomic update per batch
122+
if localSuccesses > 0 {
123+
atomic.AddInt64(&DeliveryEventCount, localSuccesses)
124+
}
125+
120126
if allNil(errors) {
121127
return nil
122128
}

0 commit comments

Comments
 (0)