Skip to content

Commit 24e3656

Browse files
committed
Adds unit tests for kafka subscriber consumer
Signed-off-by: joshvanl <[email protected]>
1 parent cf19d0f commit 24e3656

File tree

4 files changed

+622
-19
lines changed

4 files changed

+622
-19
lines changed
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/*
2+
Copyright 2024 The Dapr Authors
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package mocks
15+
16+
import (
17+
"context"
18+
19+
"github.com/IBM/sarama"
20+
)
21+
22+
type FakeConsumerGroup struct {
23+
consumerFn func(context.Context, []string, sarama.ConsumerGroupHandler) error
24+
errorsFn func() <-chan error
25+
closeFn func() error
26+
pauseFn func(map[string][]int32)
27+
resumeFn func(map[string][]int32)
28+
pauseAllFn func()
29+
resumeAllFn func()
30+
}
31+
32+
func NewConsumerGroup() *FakeConsumerGroup {
33+
return &FakeConsumerGroup{
34+
consumerFn: func(context.Context, []string, sarama.ConsumerGroupHandler) error {
35+
return nil
36+
},
37+
errorsFn: func() <-chan error {
38+
return nil
39+
},
40+
closeFn: func() error {
41+
return nil
42+
},
43+
pauseFn: func(map[string][]int32) {
44+
},
45+
resumeFn: func(map[string][]int32) {
46+
},
47+
pauseAllFn: func() {
48+
},
49+
resumeAllFn: func() {
50+
},
51+
}
52+
}
53+
54+
func (f *FakeConsumerGroup) WithConsumeFn(fn func(context.Context, []string, sarama.ConsumerGroupHandler) error) *FakeConsumerGroup {
55+
f.consumerFn = fn
56+
return f
57+
}
58+
59+
func (f *FakeConsumerGroup) WithErrorsFn(fn func() <-chan error) *FakeConsumerGroup {
60+
f.errorsFn = fn
61+
return f
62+
}
63+
64+
func (f *FakeConsumerGroup) WithCloseFn(fn func() error) *FakeConsumerGroup {
65+
f.closeFn = fn
66+
return f
67+
}
68+
69+
func (f *FakeConsumerGroup) WithPauseFn(fn func(map[string][]int32)) *FakeConsumerGroup {
70+
f.pauseFn = fn
71+
return f
72+
}
73+
74+
func (f *FakeConsumerGroup) WithResumeFn(fn func(map[string][]int32)) *FakeConsumerGroup {
75+
f.resumeFn = fn
76+
return f
77+
}
78+
79+
func (f *FakeConsumerGroup) WithPauseAllFn(fn func()) *FakeConsumerGroup {
80+
f.pauseAllFn = fn
81+
return f
82+
}
83+
84+
func (f *FakeConsumerGroup) WithResumeAllFn(fn func()) *FakeConsumerGroup {
85+
f.resumeAllFn = fn
86+
return f
87+
}
88+
89+
func (f *FakeConsumerGroup) Consume(ctx context.Context, topics []string, handler sarama.ConsumerGroupHandler) error {
90+
return f.consumerFn(ctx, topics, handler)
91+
}
92+
93+
func (f *FakeConsumerGroup) Errors() <-chan error {
94+
return f.errorsFn()
95+
}
96+
97+
func (f *FakeConsumerGroup) Close() error {
98+
return f.closeFn()
99+
}
100+
101+
func (f *FakeConsumerGroup) Pause(partitions map[string][]int32) {
102+
f.pauseFn(partitions)
103+
}
104+
105+
func (f *FakeConsumerGroup) Resume(partitions map[string][]int32) {
106+
f.resumeFn(partitions)
107+
}
108+
109+
func (f *FakeConsumerGroup) PauseAll() {
110+
f.pauseAllFn()
111+
}
112+
113+
func (f *FakeConsumerGroup) ResumeAll() {
114+
f.resumeAllFn()
115+
}

common/component/kafka/mocks/mock_ISchemaRegistryClient.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

common/component/kafka/subscriber.go

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ func (k *Kafka) Subscribe(ctx context.Context, handlerConfig SubscriptionHandler
5757
func (k *Kafka) reloadConsumerGroup() {
5858
if k.consumerCancel != nil {
5959
k.consumerCancel()
60+
k.consumerCancel = nil
6061
k.consumerWG.Wait()
6162
}
6263

@@ -76,25 +77,27 @@ func (k *Kafka) reloadConsumerGroup() {
7677
k.consumerWG.Add(1)
7778
go func() {
7879
defer k.consumerWG.Done()
80+
k.consume(ctx, topics, consumer)
81+
k.logger.Debugf("Closing ConsumerGroup for topics: %v", topics)
82+
}()
83+
}
7984

80-
for {
81-
err := k.cg.Consume(ctx, topics, consumer)
82-
if errors.Is(err, context.Canceled) {
83-
break
84-
}
85-
if err != nil {
86-
k.logger.Errorf("Error consuming %v. Retrying...: %v", topics, err)
87-
}
88-
89-
select {
90-
case <-time.After(k.consumeRetryInterval):
91-
case <-k.closeCh:
92-
break
93-
case <-ctx.Done():
94-
break
95-
}
85+
func (k *Kafka) consume(ctx context.Context, topics []string, consumer *consumer) {
86+
for {
87+
err := k.cg.Consume(ctx, topics, consumer)
88+
if errors.Is(err, context.Canceled) {
89+
return
90+
}
91+
if err != nil {
92+
k.logger.Errorf("Error consuming %v. Retrying...: %v", topics, err)
9693
}
9794

98-
k.logger.Debugf("Closing ConsumerGroup for topics: %v", topics)
99-
}()
95+
select {
96+
case <-k.closeCh:
97+
return
98+
case <-ctx.Done():
99+
return
100+
case <-time.After(k.consumeRetryInterval):
101+
}
102+
}
100103
}

0 commit comments

Comments
 (0)