@@ -13,20 +13,22 @@ import (
13
13
)
14
14
15
15
type handler struct {
16
- * testing.T
17
- cancel context.CancelFunc
16
+ messageCh chan * ConsumerMessage
18
17
}
19
18
20
19
func (h * handler ) Setup (s ConsumerGroupSession ) error { return nil }
21
20
func (h * handler ) Cleanup (s ConsumerGroupSession ) error { return nil }
22
21
func (h * handler ) ConsumeClaim (sess ConsumerGroupSession , claim ConsumerGroupClaim ) error {
23
- for msg := range claim .Messages () {
24
- sess .MarkMessage (msg , "" )
25
- h .Logf ("consumed msg %v" , msg )
26
- h .cancel ()
27
- break
22
+ for {
23
+ select {
24
+ case msg := <- claim .Messages ():
25
+ sess .MarkMessage (msg , "" )
26
+ h .messageCh <- msg
27
+ case <- sess .Context ().Done ():
28
+ h .messageCh <- & ConsumerMessage {Value : []byte ("session done" )}
29
+ return nil
30
+ }
28
31
}
29
- return nil
30
32
}
31
33
32
34
func TestNewConsumerGroupFromClient (t * testing.T ) {
@@ -82,32 +84,36 @@ func TestConsumerGroupNewSessionDuringOffsetLoad(t *testing.T) {
82
84
).SetError (ErrNoError ),
83
85
"FetchRequest" : NewMockSequence (
84
86
NewMockFetchResponse (t , 1 ).
85
- SetMessage ("my-topic" , 0 , 0 , StringEncoder ("foo" )).
87
+ SetMessage ("my-topic" , 0 , 0 , StringEncoder ("foo" )),
88
+ NewMockFetchResponse (t , 1 ).
86
89
SetMessage ("my-topic" , 0 , 1 , StringEncoder ("bar" )),
87
- NewMockFetchResponse (t , 1 ),
88
90
),
89
91
})
90
92
91
93
group , err := NewConsumerGroup ([]string {broker0 .Addr ()}, "my-group" , config )
92
94
if err != nil {
93
95
t .Fatal (err )
94
96
}
95
- defer func () { _ = group .Close () }()
96
97
97
- ctx , cancel := context .WithCancel (context .Background ())
98
- h := & handler {t , cancel }
99
-
100
- var wg sync.WaitGroup
101
- wg .Add (1 )
98
+ ctx := context .Background ()
99
+ h := & handler {make (chan * ConsumerMessage )}
100
+ defer close (h .messageCh )
102
101
103
102
go func () {
104
103
topics := []string {"my-topic" }
105
104
if err := group .Consume (ctx , topics , h ); err != nil {
106
105
t .Error (err )
107
106
}
108
- wg .Done ()
109
107
}()
110
- wg .Wait ()
108
+
109
+ assert .Equal (t , "foo" , string ((<- h .messageCh ).Value ))
110
+ assert .Equal (t , "bar" , string ((<- h .messageCh ).Value ))
111
+ go func () {
112
+ if err := group .Close (); err != nil {
113
+ t .Error (err )
114
+ }
115
+ }()
116
+ assert .Equal (t , "session done" , string ((<- h .messageCh ).Value ))
111
117
}
112
118
113
119
func TestConsume_RaceTest (t * testing.T ) {
@@ -219,8 +225,8 @@ func TestConsumerGroupSessionDoesNotRetryForever(t *testing.T) {
219
225
}
220
226
defer func () { _ = group .Close () }()
221
227
222
- ctx , cancel := context .WithCancel ( context . Background () )
223
- h := & handler {t , cancel }
228
+ ctx := context .Background ()
229
+ h := & handler {}
224
230
225
231
var wg sync.WaitGroup
226
232
wg .Add (1 )
0 commit comments