Skip to content

Commit c96f261

Browse files
fix: unit test failure
1 parent 71f8f28 commit c96f261

3 files changed

Lines changed: 17 additions & 18 deletions

File tree

app/server_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,7 @@ func TestShutDownServer(t *testing.T) {
1818
ctx, cancel := context.WithCancel(context.Background())
1919
defer cancel()
2020
mockKafka := &mockKafkaClient{}
21-
22-
kp := publisher.NewKafkaFromClient(mockKafka, 50, "test", 1*time.Millisecond, "clickstream-test-log")
21+
kp := publisher.NewKafkaFromClient(mockKafka, 50, "test", "clickstream-test-log")
2322

2423
shutdownCh := make(chan bool, 1)
2524
bufferCh := make(chan collection.CollectRequest, 1)

publisher/kafka.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,14 @@ func NewKafka() (*Kafka, error) {
4343
}
4444

4545
func NewKafkaFromClient(client Client, flushInterval int, topicFormat string,
46-
clickstreamStats ClickstreamStats) *Kafka {
46+
clickstreamStatsTopicName string) *Kafka {
4747
return &Kafka{
48-
kp: client,
49-
flushInterval: flushInterval,
50-
topicFormat: topicFormat,
51-
clickstreamStats: clickstreamStats,
48+
kp: client,
49+
flushInterval: flushInterval,
50+
topicFormat: topicFormat,
51+
clickstreamStats: ClickstreamStats{
52+
clickstreamStatsTopicName,
53+
},
5254
}
5355
}
5456

publisher/kafka_test.go

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,17 +27,15 @@ func TestMain(t *testing.M) {
2727
}
2828

2929
var (
30-
clickstreamStats = ClickstreamStats{
31-
topicName: "clickstream-test-log",
32-
}
30+
clickstreamStatsTopicName = "clickstream-test-log"
3331
)
3432

3533
func TestProducer_Close(suite *testing.T) {
3634
suite.Run("Should flush before closing the client", func(t *testing.T) {
3735
client := &mockClient{}
3836
client.On("Flush", 10).Return(0)
3937
client.On("Close").Return()
40-
kp := NewKafkaFromClient(client, 10, "%s", clickstreamStats)
38+
kp := NewKafkaFromClient(client, 10, "%s", clickstreamStatsTopicName)
4139
kp.Close()
4240
client.AssertExpectations(t)
4341
})
@@ -61,7 +59,7 @@ func TestKafka_ProduceBulk(suite *testing.T) {
6159
}
6260
}()
6361
})
64-
kp := NewKafkaFromClient(client, 10, "%s", clickstreamStats)
62+
kp := NewKafkaFromClient(client, 10, "%s", clickstreamStatsTopicName)
6563
err := kp.ProduceBulk([]*pb.Event{{EventBytes: []byte{}, Type: topic}, {EventBytes: []byte{}, Type: topic}}, group1, make(chan kafka.Event, 2))
6664
assert.NoError(t, err)
6765
})
@@ -88,7 +86,7 @@ func TestKafka_ProduceBulk(suite *testing.T) {
8886

8987
// For stats message -> match by topic and decode value
9088
client.On("Produce", mock.MatchedBy(func(msg *kafka.Message) bool {
91-
return msg.TopicPartition.Topic != nil && *msg.TopicPartition.Topic == clickstreamStats.topicName
89+
return msg.TopicPartition.Topic != nil && *msg.TopicPartition.Topic == clickstreamStatsTopicName
9290
}), mock.Anything).Return(nil).Run(func(args mock.Arguments) {
9391
// decode the protobuf payload into TotalEventCountMessage
9492
statsMsg := &pb.TotalEventCountMessage{}
@@ -97,7 +95,7 @@ func TestKafka_ProduceBulk(suite *testing.T) {
9795
assert.EqualValues(t, 2, statsMsg.EventCount) // we sent 2 events
9896
})
9997

100-
kp := NewKafkaFromClient(client, 10, "%s", clickstreamStats)
98+
kp := NewKafkaFromClient(client, 10, "%s", clickstreamStatsTopicName)
10199

102100
err := kp.ProduceBulk(
103101
[]*pb.Event{
@@ -133,7 +131,7 @@ func TestKafka_ProduceBulk(suite *testing.T) {
133131
}).Once()
134132
client.On("Produce", mock.Anything, mock.Anything).Return(fmt.Errorf("buffer full")).Once()
135133
client.On("Produce", mock.Anything, mock.Anything).Return(nil).Once()
136-
kp := NewKafkaFromClient(client, 10, "%s", clickstreamStats)
134+
kp := NewKafkaFromClient(client, 10, "%s", clickstreamStatsTopicName)
137135
err := kp.ProduceBulk([]*pb.Event{{EventBytes: []byte{}, Type: topic}, {EventBytes: []byte{}, Type: topic}, {EventBytes: []byte{}, Type: topic}}, group1, make(chan kafka.Event, 2))
138136
assert.Len(t, err.(BulkError).Errors, 3)
139137
assert.Error(t, err.(BulkError).Errors[0])
@@ -144,15 +142,15 @@ func TestKafka_ProduceBulk(suite *testing.T) {
144142
t.Run("Should return topic name when unknown topic is returned", func(t *testing.T) {
145143
client := &mockClient{}
146144
client.On("Produce", mock.Anything, mock.Anything).Return(fmt.Errorf(errUnknownTopic))
147-
kp := NewKafkaFromClient(client, 10, "%s", clickstreamStats)
145+
kp := NewKafkaFromClient(client, 10, "%s", clickstreamStatsTopicName)
148146
err := kp.ProduceBulk([]*pb.Event{{EventBytes: []byte{}, Type: topic}}, "group1", make(chan kafka.Event, 2))
149147
assert.EqualError(t, err.(BulkError).Errors[0], errUnknownTopic+" "+topic)
150148
})
151149

152150
t.Run("Should return topic name when message size is too large", func(t *testing.T) {
153151
client := &mockClient{}
154152
client.On("Produce", mock.Anything, mock.Anything).Return(fmt.Errorf(errLargeMessageSize)).Once()
155-
kp := NewKafkaFromClient(client, 10, "%s", clickstreamStats)
153+
kp := NewKafkaFromClient(client, 10, "%s", clickstreamStatsTopicName)
156154
err := kp.ProduceBulk([]*pb.Event{{EventBytes: []byte{}, Type: topic}}, "group1", make(chan kafka.Event, 2))
157155
assert.EqualError(t, err.(BulkError).Errors[0], errLargeMessageSize+" "+topic)
158156
})
@@ -175,7 +173,7 @@ func TestKafka_ProduceBulk(suite *testing.T) {
175173
}
176174
}()
177175
}).Once()
178-
kp := NewKafkaFromClient(client, 10, "%s", clickstreamStats)
176+
kp := NewKafkaFromClient(client, 10, "%s", clickstreamStatsTopicName)
179177
err := kp.ProduceBulk([]*pb.Event{{EventBytes: []byte{}, Type: topic}, {EventBytes: []byte{}, Type: topic}}, "group1", make(chan kafka.Event, 2))
180178
assert.NotEmpty(t, err)
181179
assert.Len(t, err.(BulkError).Errors, 2)

0 commit comments

Comments
 (0)