Skip to content
This repository was archived by the owner on Jun 23, 2023. It is now read-only.

Commit dab99cc

Browse files
authored
Merge pull request #66 from heetch/explicit-discarded-behaviour
2 parents 66e6a41 + 4e2aa40 commit dab99cc

File tree

4 files changed

+164
-86
lines changed

4 files changed

+164
-86
lines changed

codec/encoder.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ func (e *encoder) Length() int {
4444
return len(e.cache)
4545
}
4646

47-
e.Encode()
47+
_, _ = e.Encode()
4848
return len(e.cache)
4949
}
5050

consumer/config.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package consumer
22

33
import (
4+
"context"
45
"errors"
56

67
"github.com/Shopify/sarama"
@@ -19,8 +20,10 @@ type Config struct {
1920
KafkaAddrs []string
2021

2122
// If non-nil, Discarded is called when a message handler
22-
// returns an error.
23-
Discarded func(m *sarama.ConsumerMessage, err error)
23+
// returns an error. It receives the ConsumerGroupSession context as first parameter.
24+
// It returns if the message should be marked as committed.
25+
// If Discarded is not set, then the message will be marked as committed.
26+
Discarded func(ctx context.Context, m *sarama.ConsumerMessage, err error) (mark bool)
2427
}
2528

2629
// NewConfig returns a configuration filled in with default values.

consumer/consumer.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -156,9 +156,10 @@ func (c consumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, cla
156156
// and it's not possible to remove a topic from the handler list.
157157
h := c.consumer.handlers.get(claim.Topic())
158158
for msg := range claim.Messages() {
159+
mark := true
159160
if err := h.handleMessage(ctx, msg, claim); err != nil {
160-
if c.consumer.config.Discarded != nil {
161-
c.consumer.config.Discarded(msg, err)
161+
if c.consumer.config.Discarded != nil && ctx.Err() == nil {
162+
mark = c.consumer.config.Discarded(ctx, msg, err)
162163
}
163164
}
164165
if ctx.Err() != nil {
@@ -169,7 +170,9 @@ func (c consumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, cla
169170
}
170171
break
171172
}
172-
sess.MarkMessage(msg, "")
173+
if mark {
174+
sess.MarkMessage(msg, "")
175+
}
173176
}
174177
return nil
175178
}

consumer/consumer_test.go

Lines changed: 152 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package consumer_test
33
import (
44
"bytes"
55
"context"
6+
"errors"
67
"fmt"
78
"reflect"
89
"testing"
@@ -19,14 +20,14 @@ import (
1920

2021
func TestSimpleHandler(t *testing.T) {
2122
c := qt.New(t)
22-
defer c.Done()
23+
2324
c.Parallel()
2425

2526
k := newTestKafka(c)
2627
topic := k.NewTopic("testtopic")
2728

2829
t0 := time.Now()
29-
k.Produce(&sarama.ProducerMessage{
30+
err := k.Produce(&sarama.ProducerMessage{
3031
Topic: topic,
3132
Key: sarama.StringEncoder("a"),
3233
Value: sarama.StringEncoder(`{"x":1}`),
@@ -39,10 +40,8 @@ func TestSimpleHandler(t *testing.T) {
3940
Value: []byte("value2"),
4041
}},
4142
})
42-
type handleReq struct {
43-
m *consumer.Message
44-
reply chan error
45-
}
43+
c.Assert(err, qt.IsNil)
44+
4645
cs, err := k.NewConsumer()
4746
c.Assert(err, qt.Equals, nil)
4847
defer func() {
@@ -95,7 +94,7 @@ func TestSimpleHandler(t *testing.T) {
9594

9695
func TestCloseBeforeServe(t *testing.T) {
9796
c := qt.New(t)
98-
defer c.Done()
97+
9998
c.Parallel()
10099
cfg := consumer.NewConfig("clientid", "0.1.2.3:1234")
101100
cs, err := consumer.New(cfg)
@@ -106,86 +105,150 @@ func TestCloseBeforeServe(t *testing.T) {
106105

107106
func TestDiscardedCalledOnHandlerError(t *testing.T) {
108107
c := qt.New(t)
109-
defer c.Done()
108+
110109
c.Parallel()
111110

112111
k := newTestKafka(c)
113-
topic := k.NewTopic("testtopic")
112+
c.Run("Mark as committed", func(c *qt.C) {
113+
topic := k.NewTopic("testtopic")
114114

115-
t0 := time.Now()
116-
k.Produce(&sarama.ProducerMessage{
117-
Topic: topic,
118-
Key: sarama.StringEncoder("a"),
119-
Value: sarama.StringEncoder(`1`),
120-
Timestamp: t0,
121-
})
122-
k.Produce(&sarama.ProducerMessage{
123-
Topic: topic,
124-
Key: sarama.StringEncoder("a"),
125-
Value: sarama.StringEncoder(`2`),
126-
Timestamp: t0,
115+
t0 := time.Now()
116+
err := k.Produce(&sarama.ProducerMessage{
117+
Topic: topic,
118+
Key: sarama.StringEncoder("a"),
119+
Value: sarama.StringEncoder(`1`),
120+
Timestamp: t0,
121+
})
122+
c.Assert(err, qt.IsNil)
123+
124+
err = k.Produce(&sarama.ProducerMessage{
125+
Topic: topic,
126+
Key: sarama.StringEncoder("a"),
127+
Value: sarama.StringEncoder(`2`),
128+
Timestamp: t0,
129+
})
130+
c.Assert(err, qt.IsNil)
131+
132+
discarded := make(chan discardedCall)
133+
cfg := consumer.NewConfig("testclient", k.kt.Addrs()...)
134+
cfg.Consumer.Offsets.Initial = sarama.OffsetOldest
135+
cfg.Discarded = func(ctx context.Context, m *sarama.ConsumerMessage, err error) bool {
136+
discarded <- discardedCall{ctx: ctx, msg: m, err: err}
137+
return true
138+
}
139+
notDiscarded := make(chan string)
140+
141+
cs, err := consumer.New(cfg)
142+
c.Assert(err, qt.Equals, nil)
143+
cs.Handle(topic, consumer.MessageConverterV1(nil), handlerFunc(func(ctx context.Context, m *consumer.Message) error {
144+
val := string(m.Body.Bytes())
145+
if val == "1" {
146+
// When we return this error, the Discarded method should be called
147+
// and the message should be marked as consumed.
148+
return fmt.Errorf("some handler error")
149+
}
150+
select {
151+
case notDiscarded <- val:
152+
case <-ctx.Done():
153+
c.Errorf("error trying to send on notDiscarded: %v", ctx.Err())
154+
}
155+
return nil
156+
}))
157+
serveDone := make(chan error)
158+
go func() {
159+
serveDone <- cs.Serve(context.Background())
160+
}()
161+
defer func() {
162+
c.Check(cs.Close(), qt.Equals, nil)
163+
c.Check(<-serveDone, qt.Equals, nil)
164+
}()
165+
// The first message should be sent by the Discarded function.
166+
select {
167+
case call := <-discarded:
168+
c.Check(call.ctx, qt.Not(qt.IsNil))
169+
c.Check(string(call.msg.Value), qt.Equals, "1")
170+
c.Check(call.msg.Offset, qt.Equals, int64(0))
171+
c.Check(call.err, qt.ErrorMatches, "some handler error")
172+
case v := <-notDiscarded:
173+
c.Fatalf("unexpected non-error message %q (expecting call to Discarded)", v)
174+
case <-time.After(15 * time.Second):
175+
c.Fatalf("timed out waiting for first Discarded call")
176+
}
177+
// The second message should be handled normally.
178+
select {
179+
case call := <-discarded:
180+
c.Errorf("unexpected call to discarded %q (message received twice?)", call.msg.Value)
181+
case v := <-notDiscarded:
182+
c.Check(v, qt.Equals, "2")
183+
case <-time.After(15 * time.Second):
184+
c.Fatalf("timed out waiting for second HandleMessage call")
185+
}
127186
})
128-
type discardedCall struct {
129-
msg *sarama.ConsumerMessage
130-
err error
131-
}
132-
discarded := make(chan discardedCall)
133-
cfg := consumer.NewConfig("testclient", k.kt.Addrs()...)
134-
cfg.Consumer.Offsets.Initial = sarama.OffsetOldest
135-
cfg.Discarded = func(m *sarama.ConsumerMessage, err error) {
136-
discarded <- discardedCall{m, err}
137-
}
138-
notDiscarded := make(chan string)
139187

140-
cs, err := consumer.New(cfg)
141-
c.Assert(err, qt.Equals, nil)
142-
cs.Handle(topic, consumer.MessageConverterV1(nil), handlerFunc(func(ctx context.Context, m *consumer.Message) error {
143-
val := string(m.Body.Bytes())
144-
if val == "1" {
145-
// When we return this error, the Discarded method should be called
146-
// and the message should be marked as consumed.
147-
return fmt.Errorf("some handler error")
188+
c.Run("Mark as not committed", func(c *qt.C) {
189+
topic := k.NewTopic("testtopic1")
190+
err := k.Produce(&sarama.ProducerMessage{
191+
Topic: topic,
192+
Key: sarama.StringEncoder("msgkey"),
193+
Value: sarama.StringEncoder("value"),
194+
Timestamp: time.Now(),
195+
})
196+
c.Assert(err, qt.IsNil)
197+
198+
discarded := make(chan discardedCall)
199+
200+
cfg := consumer.NewConfig("testclient1", k.kt.Addrs()...)
201+
cfg.Consumer.Offsets.Initial = sarama.OffsetOldest
202+
cfg.Discarded = func(ctx context.Context, m *sarama.ConsumerMessage, err error) bool {
203+
discarded <- discardedCall{ctx: ctx, msg: m, err: err}
204+
// Don't mark as committed
205+
return false
148206
}
207+
208+
cs, err := consumer.New(cfg)
209+
c.Assert(err, qt.IsNil)
210+
cs.Handle(topic, consumer.MessageConverterV1(nil), handlerFunc(func(ctx context.Context, m *consumer.Message) error {
211+
c.Check(string(m.Body.Bytes()), qt.Equals, "value")
212+
c.Check(string(m.Key.Bytes()), qt.Equals, "msgkey")
213+
return errors.New("here I am")
214+
}))
215+
216+
serveDone := make(chan error)
217+
go func() {
218+
serveDone <- cs.Serve(context.Background())
219+
}()
220+
149221
select {
150-
case notDiscarded <- val:
151-
case <-ctx.Done():
152-
c.Errorf("error trying to send on notDiscarded: %v", ctx.Err())
222+
case call := <-discarded:
223+
c.Check(call.ctx, qt.Not(qt.IsNil))
224+
c.Check(string(call.msg.Value), qt.Equals, "value")
225+
c.Check(call.msg.Offset, qt.Equals, int64(0))
226+
c.Check(call.err, qt.ErrorMatches, "here I am")
227+
case <-time.After(15 * time.Second):
228+
c.Fatalf("timed out waiting for first Discarded call")
153229
}
154-
return nil
155-
}))
156-
serveDone := make(chan error)
157-
go func() {
158-
serveDone <- cs.Serve(context.Background())
159-
}()
160-
defer func() {
161-
c.Check(cs.Close(), qt.Equals, nil)
162-
c.Check(<-serveDone, qt.Equals, nil)
163-
}()
164-
// The first message should be sent by the Discarded function.
165-
select {
166-
case call := <-discarded:
167-
c.Check(string(call.msg.Value), qt.Equals, "1")
168-
c.Check(call.msg.Offset, qt.Equals, int64(0))
169-
c.Check(call.err, qt.ErrorMatches, "some handler error")
170-
case v := <-notDiscarded:
171-
c.Fatalf("unexpected non-error message %q (expecting call to Discarded)", v)
172-
case <-time.After(15 * time.Second):
173-
c.Fatalf("timed out waiting for first Discarded call")
174-
}
175-
// The second message should be handled normally.
176-
select {
177-
case call := <-discarded:
178-
c.Errorf("unexpected call to discarded %q (message received twice?)", call.msg.Value)
179-
case v := <-notDiscarded:
180-
c.Check(v, qt.Equals, "2")
181-
case <-time.After(15 * time.Second):
182-
c.Fatalf("timed out waiting for second HandleMessage call")
183-
}
230+
231+
c.Assert(cs.Close(), qt.IsNil)
232+
c.Assert(<-serveDone, qt.IsNil)
233+
234+
// Check no offset were marked
235+
adminClient, err := sarama.NewClusterAdmin(k.kt.Addrs(), cfg.Config)
236+
c.Assert(err, qt.IsNil)
237+
238+
resp, err := adminClient.ListConsumerGroupOffsets("testclient1", map[string][]int32{
239+
topic: []int32{0},
240+
})
241+
c.Assert(err, qt.IsNil)
242+
243+
c.Assert(resp.GetBlock(topic, 0).Offset, qt.Equals, sarama.OffsetNewest,
244+
qt.Commentf("message should not be marked"))
245+
246+
})
184247
}
185248

186249
func TestServeReturnsOnClose(t *testing.T) {
187250
c := qt.New(t)
188-
defer c.Done()
251+
189252
c.Parallel()
190253

191254
k := newTestKafka(c)
@@ -218,23 +281,21 @@ func TestServeReturnsOnClose(t *testing.T) {
218281

219282
func TestHandlerCanceledOnClose(t *testing.T) {
220283
c := qt.New(t)
221-
defer c.Done()
284+
222285
c.Parallel()
223286

224287
k := newTestKafka(c)
225288
topic := k.NewTopic("testtopic")
226289

227290
t0 := time.Now()
228-
k.Produce(&sarama.ProducerMessage{
291+
err := k.Produce(&sarama.ProducerMessage{
229292
Topic: topic,
230293
Key: sarama.StringEncoder("a"),
231294
Value: sarama.StringEncoder(`{"x":1}`),
232295
Timestamp: t0,
233296
})
234-
type handleReq struct {
235-
m *consumer.Message
236-
reply chan error
237-
}
297+
c.Assert(err, qt.IsNil)
298+
238299
cs, err := k.NewConsumer()
239300
c.Assert(err, qt.Equals, nil)
240301
defer func() {
@@ -278,6 +339,17 @@ func TestHandlerCanceledOnClose(t *testing.T) {
278339
}
279340
}
280341

342+
type handleReq struct {
343+
m *consumer.Message
344+
reply chan error
345+
}
346+
347+
type discardedCall struct {
348+
ctx context.Context
349+
msg *sarama.ConsumerMessage
350+
err error
351+
}
352+
281353
// handlerFunc implements Handle by calling the underlying function.
282354
type handlerFunc func(context.Context, *consumer.Message) error
283355

0 commit comments

Comments
 (0)