Skip to content

Commit 71f8f28

Browse files
refactor: remove the global variable and atomic update
1 parent 2e64ff0 commit 71f8f28

7 files changed

Lines changed: 105 additions & 126 deletions

File tree

.env.sample

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,7 @@ PUBLISHER_KAFKA_CLIENT_RETRY_BACKOFF_MS="100"
2626
PUBLISHER_KAFKA_CLIENT_STATISTICS_INTERVAL_MS=5000
2727
PUBLISHER_KAFKA_CLIENT_QUEUE_BUFFERING_MAX_MESSAGES=100000
2828
PUBLISHER_KAFKA_FLUSH_INTERVAL_MS=1000
29-
PUBLISHER_KAFKA_DELIVERY_REPORT_INTERVAL_MS=60000
30-
PUBLISHER_KAFKA_DELIVERY_REPORT_TOPIC_NAME="clickstream-total-event-log"
29+
PUBLISHER_KAFKA_CLICKSTREAM_STATS_TOPIC_NAME="clickstream-total-event-log"
3130

3231
METRIC_STATSD_ADDRESS=":8125"
3332
METRIC_STATSD_FLUSH_PERIOD_MS=100

.env.test

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,7 @@ PUBLISHER_KAFKA_CLIENT_RETRY_BACKOFF_MS="100"
2727
PUBLISHER_KAFKA_CLIENT_STATISTICS_INTERVAL_MS=5000
2828
PUBLISHER_KAFKA_CLIENT_QUEUE_BUFFERING_MAX_MESSAGES=100000
2929
PUBLISHER_KAFKA_FLUSH_INTERVAL_MS=1000
30-
PUBLISHER_KAFKA_DELIVERY_REPORT_INTERVAL_MS=60000
31-
PUBLISHER_KAFKA_DELIVERY_REPORT_TOPIC_NAME="clickstream-total-event-log"
30+
PUBLISHER_KAFKA_CLICKSTREAM_STATS_TOPIC_NAME="clickstream-total-event-log"
3231

3332
METRIC_STATSD_ADDRESS=":8125"
3433
METRIC_STATSD_FLUSH_PERIOD_MS=100

app/server.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ func StartServer(ctx context.Context, cancel context.CancelFunc, shutdown chan b
3737
workerPool.StartWorkers()
3838
go kPublisher.ReportStats()
3939
go reportProcMetrics()
40-
go kPublisher.ReportDeliveryEventCount()
4140
// create signal channel at startup
4241
signalChan := make(chan os.Signal, 1)
4342
signal.Notify(signalChan, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)

config/load_test.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,14 +63,13 @@ func TestDynamicConfigLoad(t *testing.T) {
6363

6464
func TestKafkaConfig_ToKafkaConfigMap(t *testing.T) {
6565
os.Setenv("PUBLISHER_KAFKA_FLUSH_INTERVAL_MS", "1000")
66-
os.Setenv("PUBLISHER_KAFKA_DELIVERY_REPORT_TOPIC_NAME", "clickstream-test-log")
66+
os.Setenv("PUBLISHER_KAFKA_CLICKSTREAM_STATS_TOPIC_NAME", "clickstream-test-log")
6767
os.Setenv("PUBLISHER_KAFKA_DELIVERY_REPORT_INTERVAL_MS", "60")
6868
os.Setenv("PUBLISHER_KAFKA_CLIENT_BOOTSTRAP_SERVERS", "kafka:9092")
6969
os.Setenv("PUBLISHER_KAFKA_CLIENT_ACKS", "1")
7070
os.Setenv("PUBLISHER_KAFKA_CLIENT_QUEUE_BUFFERING_MAX_MESSAGES", "10000")
7171
os.Setenv("SOMETHING_PUBLISHER_KAFKA_CLIENT_SOMETHING", "anything")
7272
publisherKafkaConfigLoader()
73-
expectedDeliveryReportInterval := 60 * time.Millisecond
7473
kafkaConfig := PublisherKafka.ToKafkaConfigMap()
7574
bootstrapServer, _ := kafkaConfig.Get("bootstrap.servers", "")
7675
topic, _ := kafkaConfig.Get("topic", "")
@@ -79,8 +78,7 @@ func TestKafkaConfig_ToKafkaConfigMap(t *testing.T) {
7978
assert.Equal(t, "", topic)
8079
assert.NotEqual(t, something, "anything")
8180
assert.Equal(t, 4, len(*kafkaConfig))
82-
assert.Equal(t, expectedDeliveryReportInterval, PublisherKafka.DeliveryReportInterval)
83-
assert.Equal(t, "clickstream-test-log", PublisherKafka.DeliveryReportTopic)
81+
assert.Equal(t, "clickstream-test-log", PublisherKafka.ClickstreamStatsTopicName)
8482
}
8583

8684
func TestWorkerConfig(t *testing.T) {

config/publisher.go

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@ var PublisherKafka publisherKafka
1515
var dynamicKafkaClientConfigPrefix = "PUBLISHER_KAFKA_CLIENT_"
1616

1717
type publisherKafka struct {
18-
FlushInterval int
19-
DeliveryReportInterval time.Duration
20-
DeliveryReportTopic string
18+
FlushInterval int
19+
DeliveryReportInterval time.Duration
20+
ClickstreamStatsTopicName string
2121
}
2222

2323
func (k publisherKafka) ToKafkaConfigMap() *confluent.ConfigMap {
@@ -46,13 +46,10 @@ func dynamicKafkaClientConfigLoad() []byte {
4646
func publisherKafkaConfigLoader() {
4747
viper.SetDefault("PUBLISHER_KAFKA_CLIENT_QUEUE_BUFFERING_MAX_MESSAGES", "100000")
4848
viper.SetDefault("PUBLISHER_KAFKA_FLUSH_INTERVAL_MS", "1000")
49-
viper.SetDefault("PUBLISHER_KAFKA_DELIVERY_REPORT_INTERVAL_MS", "60000")
50-
viper.SetDefault("PUBLISHER_KAFKA_DELIVERY_REPORT_TOPIC_NAME", "clickstream-total-event-log")
49+
viper.SetDefault("PUBLISHER_KAFKA_CLICKSTREAM_STATS_TOPIC_NAME", "clickstream-total-event-log")
5150
viper.MergeConfig(bytes.NewBuffer(dynamicKafkaClientConfigLoad()))
52-
5351
PublisherKafka = publisherKafka{
54-
FlushInterval: util.MustGetInt("PUBLISHER_KAFKA_FLUSH_INTERVAL_MS"),
55-
DeliveryReportInterval: util.MustGetDuration("PUBLISHER_KAFKA_DELIVERY_REPORT_INTERVAL_MS", time.Millisecond),
56-
DeliveryReportTopic: util.MustGetString("PUBLISHER_KAFKA_DELIVERY_REPORT_TOPIC_NAME"),
52+
FlushInterval: util.MustGetInt("PUBLISHER_KAFKA_FLUSH_INTERVAL_MS"),
53+
ClickstreamStatsTopicName: util.MustGetString("PUBLISHER_KAFKA_CLICKSTREAM_STATS_TOPIC_NAME"),
5754
}
5855
}

publisher/kafka.go

Lines changed: 33 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,8 @@ import (
55
"fmt"
66
"github.com/goto/raccoon/serialization"
77
"google.golang.org/protobuf/types/known/timestamppb"
8-
"strings"
9-
"sync/atomic"
10-
"time"
11-
128
"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
9+
"strings"
1310
// Importing librd to make it work on vendor mode
1411
_ "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka/librdkafka"
1512

@@ -24,8 +21,6 @@ const (
2421
errLargeMessageSize = "Broker: Message size too large" //error msg while producing a message which is larger than message.max.bytes config
2522
)
2623

27-
var DeliveryEventCount int64
28-
2924
// KafkaProducer Produce data to kafka synchronously
3025
type KafkaProducer interface {
3126
// ProduceBulk message to kafka. Block until all messages are sent. Return array of error. Order is not guaranteed.
@@ -38,31 +33,34 @@ func NewKafka() (*Kafka, error) {
3833
return &Kafka{}, err
3934
}
4035
return &Kafka{
41-
kp: kp,
42-
flushInterval: config.PublisherKafka.FlushInterval,
43-
topicFormat: config.EventDistribution.PublisherPattern,
44-
deliveryReportInterval: config.PublisherKafka.DeliveryReportInterval,
45-
deliveryReportTopic: config.PublisherKafka.DeliveryReportTopic,
36+
kp: kp,
37+
flushInterval: config.PublisherKafka.FlushInterval,
38+
topicFormat: config.EventDistribution.PublisherPattern,
39+
clickstreamStats: ClickstreamStats{
40+
config.PublisherKafka.ClickstreamStatsTopicName,
41+
},
4642
}, nil
4743
}
4844

49-
func NewKafkaFromClient(client Client, flushInterval int, topicFormat string, deliveryReportInterval time.Duration,
50-
topicName string) *Kafka {
45+
func NewKafkaFromClient(client Client, flushInterval int, topicFormat string,
46+
clickstreamStats ClickstreamStats) *Kafka {
5147
return &Kafka{
52-
kp: client,
53-
flushInterval: flushInterval,
54-
topicFormat: topicFormat,
55-
deliveryReportInterval: deliveryReportInterval,
56-
deliveryReportTopic: topicName,
48+
kp: client,
49+
flushInterval: flushInterval,
50+
topicFormat: topicFormat,
51+
clickstreamStats: clickstreamStats,
5752
}
5853
}
5954

6055
type Kafka struct {
61-
kp Client
62-
flushInterval int
63-
topicFormat string
64-
deliveryReportInterval time.Duration
65-
deliveryReportTopic string
56+
kp Client
57+
flushInterval int
58+
topicFormat string
59+
clickstreamStats ClickstreamStats
60+
}
61+
62+
type ClickstreamStats struct {
63+
topicName string
6664
}
6765

6866
// ProduceBulk messages to kafka. Block until all messages are sent. Return array of error. Order of Errors is guaranteed.
@@ -102,7 +100,7 @@ func (pr *Kafka) ProduceBulk(events []*pb.Event, connGroup string, deliveryChann
102100
totalProcessed++
103101
}
104102
// Wait for deliveryChannel as many as processed
105-
localSuccesses := int64(0)
103+
totalEventCount := int64(0)
106104
for i := 0; i < totalProcessed; i++ {
107105
d := <-deliveryChannel
108106
m := d.(*kafka.Message)
@@ -114,13 +112,17 @@ func (pr *Kafka) ProduceBulk(events []*pb.Event, connGroup string, deliveryChann
114112
order := m.Opaque.(int)
115113
errors[order] = m.TopicPartition.Error
116114
} else {
117-
localSuccesses++
115+
totalEventCount++
118116
}
119117
}
120-
121-
// Single atomic update per batch
122-
if localSuccesses > 0 {
123-
atomic.AddInt64(&DeliveryEventCount, localSuccesses)
118+
// send the total successful delivered event count to kafka topic
119+
if totalEventCount > 0 {
120+
//build kafka message for total message count
121+
msg := &pb.TotalEventCountMessage{
122+
EventTimestamp: timestamppb.Now(),
123+
EventCount: int32(totalEventCount),
124+
}
125+
pr.produceClickstreamStats(pr.clickstreamStats.topicName, msg)
124126
}
125127

126128
if allNil(errors) {
@@ -129,7 +131,8 @@ func (pr *Kafka) ProduceBulk(events []*pb.Event, connGroup string, deliveryChann
129131
return BulkError{Errors: errors}
130132
}
131133

132-
func (pr *Kafka) produceTotalEventMessage(topicName string, event *pb.TotalEventCountMessage) error {
134+
// produceClickstreamStats : method to produce stats to a kafka topic
135+
func (pr *Kafka) produceClickstreamStats(topicName string, event *pb.TotalEventCountMessage) error {
133136
value, err := serialization.SerializeProto(event)
134137
if err != nil {
135138
return fmt.Errorf("failed to serialize proto: %w", err)
@@ -173,25 +176,6 @@ func (pr *Kafka) ReportStats() {
173176
}
174177
}
175178

176-
func (pr *Kafka) ReportDeliveryEventCount() {
177-
ticker := time.NewTicker(pr.deliveryReportInterval)
178-
defer ticker.Stop()
179-
180-
for range ticker.C {
181-
// read the value
182-
eventCount := atomic.LoadInt64(&DeliveryEventCount)
183-
//build kafka message
184-
msg := &pb.TotalEventCountMessage{
185-
EventTimestamp: timestamppb.Now(),
186-
EventCount: int32(eventCount),
187-
}
188-
//produce to kafka
189-
pr.produceTotalEventMessage(pr.deliveryReportTopic, msg)
190-
//reset the counter
191-
atomic.StoreInt64(&DeliveryEventCount, 0)
192-
}
193-
}
194-
195179
// Close wait for outstanding messages to be delivered within given flush interval timeout.
196180
func (pr *Kafka) Close() int {
197181
remaining := pr.kp.Flush(pr.flushInterval)

0 commit comments

Comments
 (0)