Skip to content
This repository was archived by the owner on Jun 23, 2023. It is now read-only.

Commit 50d1ccb

Browse files
authored
Merge pull request #54 from heetch/rog-002-sendmessages
producer: add SendMessages method
2 parents 86d4bbe + 5646c9c commit 50d1ccb

File tree

2 files changed

+135
-1
lines changed

2 files changed

+135
-1
lines changed

producer/producer.go

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package producer
22

33
import (
44
"context"
5+
"fmt"
56

67
"github.com/Shopify/sarama"
78
"github.com/heetch/felice/codec"
@@ -65,14 +66,70 @@ func (p *Producer) Send(ctx context.Context, topic string, body interface{}, opt
6566
return msg, err
6667
}
6768

69+
// SendMessagesErrors is the error type returned if SendMessages
70+
// fails to send to Kafka.
71+
type SendMessagesErrors []*SendMessagesError
72+
73+
func (e SendMessagesErrors) Error() string {
74+
return fmt.Sprintf("kafka: failed to deliver %d messages", len(e))
75+
}
76+
77+
// SendMessagesError describes why one message
78+
// failed to be sent.
79+
type SendMessagesError struct {
80+
Msg *Message
81+
Err error
82+
}
83+
84+
// SendMessages sends all the given messages in order.
85+
// If it fails to send the messages to Kafka, it will return a
86+
// SendMessagesErrors error describing which messages failed.
87+
func (p *Producer) SendMessages(ctx context.Context, msgs []*Message) error {
88+
select {
89+
case <-ctx.Done():
90+
return ctx.Err()
91+
default:
92+
}
93+
if p.config.Converter == nil {
94+
return errors.New("producer: missing Converter in config")
95+
}
96+
pmsgs := make([]*sarama.ProducerMessage, len(msgs))
97+
for i, m := range msgs {
98+
m.prepare()
99+
pmsg, err := p.config.Converter.ToKafka(ctx, m)
100+
if err != nil {
101+
return err
102+
}
103+
pmsg.Metadata = m
104+
pmsgs[i] = pmsg
105+
}
106+
err := p.SyncProducer.SendMessages(pmsgs)
107+
// Some messages may still have been sent, so copy all the
108+
// send information across even when there are errors.
109+
for i, m := range msgs {
110+
p := pmsgs[i]
111+
m.Partition, m.Offset, m.ProducedAt = p.Partition, p.Offset, p.Timestamp
112+
}
113+
if producerErrs, ok := err.(sarama.ProducerErrors); ok {
114+
sendErrs := make(SendMessagesErrors, len(producerErrs))
115+
for i, e := range producerErrs {
116+
sendErrs[i] = &SendMessagesError{
117+
Msg: e.Msg.Metadata.(*Message),
118+
Err: e.Err,
119+
}
120+
}
121+
err = sendErrs
122+
}
123+
return err
124+
}
125+
68126
// SendMessage sends the given message to Kafka synchronously.
69127
func (p *Producer) SendMessage(ctx context.Context, msg *Message) error {
70128
select {
71129
case <-ctx.Done():
72130
return ctx.Err()
73131
default:
74132
}
75-
76133
if p.config.Converter == nil {
77134
return errors.New("producer: missing Converter in config")
78135
}

producer/producer_test.go

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66
"testing"
77

8+
"github.com/Shopify/sarama"
89
"github.com/Shopify/sarama/mocks"
910
"github.com/heetch/felice/producer"
1011
"github.com/stretchr/testify/require"
@@ -39,6 +40,73 @@ func TestSendMessage(t *testing.T) {
3940
}
4041
}
4142

43+
func TestSendMessages(t *testing.T) {
44+
msgs := []*producer.Message{
45+
producer.NewMessage("topic1", "message1"),
46+
&producer.Message{Topic: "topic2", Body: "message2"},
47+
&producer.Message{Topic: "topic2", Body: "message3"},
48+
}
49+
cfg := producer.NewConfig("id", producer.MessageConverterV1())
50+
msp := mocks.NewSyncProducer(t, &cfg.Config)
51+
p, err := producer.NewFrom(msp, cfg)
52+
require.NoError(t, err)
53+
54+
for i, msg := range msgs {
55+
i, msg := i, msg
56+
msp.ExpectSendMessageWithCheckerFunctionAndSucceed(func(val []byte) error {
57+
if got, want := string(val), fmt.Sprintf("%q", msg.Body); got != want {
58+
return fmt.Errorf("unexpected message %d; got %q want %q", i, got, want)
59+
}
60+
return nil
61+
})
62+
}
63+
err = p.SendMessages(context.Background(), msgs)
64+
require.NoError(t, err)
65+
66+
// Unfortunately the Sarama mock doesn't fill out of any of the Offset, Partition
67+
// or Timestamp fields when SendMessages is called, so we can't test
68+
// that functionality here.
69+
}
70+
71+
func TestSendMessagesError(t *testing.T) {
72+
msgs := []*producer.Message{
73+
producer.NewMessage("topic1", "message1"),
74+
&producer.Message{Topic: "topic2", Body: "message2"},
75+
&producer.Message{Topic: "topic2", Body: "message3"},
76+
}
77+
perr1 := fmt.Errorf("first error")
78+
perr2 := fmt.Errorf("second error")
79+
cfg := producer.NewConfig("id", producer.MessageConverterV1())
80+
sendMessages := func(smsgs []*sarama.ProducerMessage) error {
81+
return sarama.ProducerErrors{{
82+
Msg: smsgs[0],
83+
Err: perr1,
84+
}, {
85+
Msg: smsgs[2],
86+
Err: perr2,
87+
}}
88+
}
89+
p, err := producer.NewFrom(sendMessagesFunc{f: sendMessages}, cfg)
90+
require.NoError(t, err)
91+
err = p.SendMessages(context.Background(), msgs)
92+
sendErrs, ok := err.(producer.SendMessagesErrors)
93+
require.True(t, ok)
94+
require.Len(t, sendErrs, 2)
95+
96+
if sendErrs[0].Msg != msgs[0] {
97+
t.Errorf("unexpected message at 0: %#v", sendErrs[0].Msg)
98+
}
99+
if sendErrs[0].Err != perr1 {
100+
t.Errorf("unexpected error at 0: %#v", sendErrs[0].Err)
101+
}
102+
if sendErrs[1].Msg != msgs[2] {
103+
t.Errorf("unexpected message at 0: %#v", sendErrs[0].Msg)
104+
}
105+
if sendErrs[1].Err != perr2 {
106+
t.Errorf("unexpected error at 0: %#v", sendErrs[0].Err)
107+
}
108+
}
109+
42110
func TestSend(t *testing.T) {
43111
msp := mocks.NewSyncProducer(t, nil)
44112
cfg := producer.NewConfig("id", producer.MessageConverterV1())
@@ -58,3 +126,12 @@ func TestSend(t *testing.T) {
58126
_, err = p.Send(context.Background(), "topic", "message")
59127
require.EqualError(t, err, "producer: failed to send message: cannot produce message")
60128
}
129+
130+
type sendMessagesFunc struct {
131+
sarama.SyncProducer
132+
f func(msgs []*sarama.ProducerMessage) error
133+
}
134+
135+
func (f sendMessagesFunc) SendMessages(msgs []*sarama.ProducerMessage) error {
136+
return f.f(msgs)
137+
}

0 commit comments

Comments
 (0)