Skip to content
This repository was archived by the owner on Jun 23, 2023. It is now read-only.

Commit 68226f7

Browse files
authored
Merge pull request #33 from heetch/sendmethod
Send method
2 parents fc470ba + 97ec29e commit 68226f7

File tree

5 files changed

+170
-65
lines changed

5 files changed

+170
-65
lines changed

codec/encoder.go

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,33 @@
11
package codec
22

3-
import (
4-
"github.com/Shopify/sarama"
5-
)
3+
// Encoder is a simple interface for any type that can be encoded as an array of bytes
4+
// in order to be sent as the key or value of a Kafka message. It matches the sarama.Encoder interface.
5+
type Encoder interface {
6+
Encode() ([]byte, error)
7+
Length() int
8+
}
69

7-
// StringEncoder creates a sarama.Encoder that encodes using the String codec.
8-
func StringEncoder(v string) sarama.Encoder {
10+
// StringEncoder creates a Encoder that encodes using the String codec.
11+
func StringEncoder(v string) Encoder {
912
return &encoder{codec: String(), v: v}
1013
}
1114

12-
// Int64Encoder creates a sarama.Encoder that encodes using the Int64 codec.
13-
func Int64Encoder(v int) sarama.Encoder {
15+
// Int64Encoder creates a Encoder that encodes using the Int64 codec.
16+
func Int64Encoder(v int) Encoder {
1417
return &encoder{codec: Int64(), v: int64(v)}
1518
}
1619

17-
// Float64Encoder creates a sarama.Encoder that encodes using the Float64 Codec.
18-
func Float64Encoder(v float64) sarama.Encoder {
20+
// Float64Encoder creates a Encoder that encodes using the Float64 Codec.
21+
func Float64Encoder(v float64) Encoder {
1922
return &encoder{codec: Float64(), v: v}
2023
}
2124

22-
// JSONEncoder creates a sarama.Encoder that encodes using the JSON Codec.
23-
func JSONEncoder(v interface{}) sarama.Encoder {
25+
// JSONEncoder creates a Encoder that encodes using the JSON Codec.
26+
func JSONEncoder(v interface{}) Encoder {
2427
return &encoder{codec: JSON(), v: v}
2528
}
2629

27-
// encoder implements the sarama.Encoder interface.
30+
// encoder implements the Encoder interface.
2831
type encoder struct {
2932
codec Codec
3033
v interface{}

producer/doc_test.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,3 +59,18 @@ func ExampleNewFrom() {
5959
panic(err)
6060
}
6161
}
62+
63+
func ExampleProducer_Send() {
64+
config := producer.NewConfig("some-id", producer.MessageConverterV1())
65+
66+
p, err := producer.New(config, endpoints...)
67+
if err != nil {
68+
panic(err)
69+
}
70+
defer p.Close()
71+
72+
_, err = p.Send(context.Background(), "some topic", "some body", producer.StrKey("some key"))
73+
if err != nil {
74+
panic(err)
75+
}
76+
}

producer/message.go

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
package producer
2+
3+
import (
4+
"time"
5+
6+
"github.com/Shopify/sarama"
7+
"github.com/heetch/felice/codec"
8+
uuid "github.com/satori/go.uuid"
9+
)
10+
11+
// Message represents a message to be sent via Kafka.
12+
// Before sending it, the producer will transform this structure into a
13+
// sarama.ProducerMessage using the registered Converter.
14+
type Message struct {
15+
// The Kafka topic this Message applies to.
16+
Topic string
17+
18+
// If specified, messages with the same key will be sent to the same Kafka partition.
19+
Key sarama.Encoder
20+
21+
// Body of the Kafka message.
22+
Body interface{}
23+
24+
// The time at which this Message was produced.
25+
ProducedAt time.Time
26+
27+
// Partition where this publication was stored.
28+
Partition int32
29+
30+
// Offset where this publication was stored.
31+
Offset int64
32+
33+
// Headers of the message.
34+
Headers map[string]string
35+
36+
// Unique ID of the message. Defaults to an uuid.
37+
ID string
38+
}
39+
40+
// prepare makes sure the message contains a unique ID and
41+
// the Headers map memory is allocated.
42+
func (m *Message) prepare() {
43+
if m.ID == "" {
44+
m.ID = uuid.Must(uuid.NewV4()).String()
45+
}
46+
47+
if m.Headers == nil {
48+
m.Headers = make(map[string]string)
49+
}
50+
}
51+
52+
// NewMessage creates a configured message with a generated unique ID.
53+
func NewMessage(topic string, body interface{}) *Message {
54+
return &Message{
55+
Topic: topic,
56+
Body: body,
57+
Headers: make(map[string]string),
58+
ID: uuid.Must(uuid.NewV4()).String(),
59+
}
60+
}
61+
62+
// Option is a function type that receives a pointer to a Message and
63+
// modifies it in place. Options are intended to customize a message
64+
// before sending it. You can do this either by passing them as
65+
// parameters to the New function, or by calling them directly against
66+
// a Message.
67+
type Option func(*Message)
68+
69+
// Header is an Option that adds a custom header to the message. You
70+
// may pass as many Header options to New as you wish. If multiple
71+
// Header's are defined for the same key, the value of the last one
72+
// past to New will be the value that appears on the Message.
73+
func Header(k, v string) Option {
74+
return func(m *Message) {
75+
m.Headers[k] = v
76+
}
77+
}
78+
79+
// Key is an Option that specifies a key for the message. You should
80+
// only pass this once to the New function, but if you pass it multiple
81+
// times, the value set by the final one you pass will be what is set
82+
// on the Message when it is returned by New.
83+
func Key(key codec.Encoder) Option {
84+
return func(m *Message) {
85+
m.Key = key
86+
}
87+
}
88+
89+
// StrKey is an Option that specifies a key for the message as a string.
90+
func StrKey(key string) Option {
91+
return func(m *Message) {
92+
m.Key = codec.StringEncoder(key)
93+
}
94+
}
95+
96+
// Int64Key is an Option that specifies a key for the message as an integer.
97+
func Int64Key(key int) Option {
98+
return func(m *Message) {
99+
m.Key = codec.Int64Encoder(key)
100+
}
101+
}
102+
103+
// Float64Key is an Option that specifies a key for the message as a float.
104+
func Float64Key(key float64) Option {
105+
return func(m *Message) {
106+
m.Key = codec.Float64Encoder(key)
107+
}
108+
}

producer/producer.go

Lines changed: 12 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,10 @@ package producer
22

33
import (
44
"context"
5-
"time"
65

76
"github.com/Shopify/sarama"
87
"github.com/heetch/felice/codec"
98
"github.com/pkg/errors"
10-
uuid "github.com/satori/go.uuid"
119
)
1210

1311
// Producer sends messages to Kafka.
@@ -55,6 +53,18 @@ func NewFrom(producer sarama.SyncProducer, config Config) (*Producer, error) {
5553
return &Producer{SyncProducer: producer, config: config}, nil
5654
}
5755

56+
// Send creates and sends a message to Kafka synchronously.
57+
// It returns the message.Message sent to the brokers.
58+
func (p *Producer) Send(ctx context.Context, topic string, body interface{}, opts ...Option) (*Message, error) {
59+
msg := NewMessage(topic, body)
60+
for _, opt := range opts {
61+
opt(msg)
62+
}
63+
64+
err := p.SendMessage(ctx, msg)
65+
return msg, err
66+
}
67+
5868
// SendMessage sends the given message to Kafka synchronously.
5969
func (p *Producer) SendMessage(ctx context.Context, msg *Message) error {
6070
select {
@@ -80,57 +90,6 @@ func (p *Producer) SendMessage(ctx context.Context, msg *Message) error {
8090
return errors.Wrap(err, "producer: failed to send message")
8191
}
8292

83-
// Message represents a message to be sent via Kafka.
84-
// Before sending it, the producer will transform this structure into a
85-
// sarama.ProducerMessage using the registered Converter.
86-
type Message struct {
87-
// The Kafka topic this Message applies to.
88-
Topic string
89-
90-
// If specified, messages with the same key will be sent to the same Kafka partition.
91-
Key sarama.Encoder
92-
93-
// Body of the Kafka message.
94-
Body interface{}
95-
96-
// The time at which this Message was produced.
97-
ProducedAt time.Time
98-
99-
// Partition where this publication was stored.
100-
Partition int32
101-
102-
// Offset where this publication was stored.
103-
Offset int64
104-
105-
// Headers of the message.
106-
Headers map[string]string
107-
108-
// Unique ID of the message. Defaults to an uuid.
109-
ID string
110-
}
111-
112-
// prepare makes sure the message contains a unique ID and
113-
// the Headers map memory is allocated.
114-
func (m *Message) prepare() {
115-
if m.ID == "" {
116-
m.ID = uuid.Must(uuid.NewV4()).String()
117-
}
118-
119-
if m.Headers == nil {
120-
m.Headers = make(map[string]string)
121-
}
122-
}
123-
124-
// NewMessage creates a configured message with a generated unique ID.
125-
func NewMessage(topic string, body interface{}) *Message {
126-
return &Message{
127-
Topic: topic,
128-
Body: body,
129-
Headers: make(map[string]string),
130-
ID: uuid.Must(uuid.NewV4()).String(),
131-
}
132-
}
133-
13493
// A MessageConverter transforms a Message into a sarama.ProducerMessage.
13594
// The role of the converter is to decouple the conventions defined by users from
13695
// the producer.

producer/producer_test.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,3 +38,23 @@ func TestSendMessage(t *testing.T) {
3838
require.EqualError(t, err, "producer: failed to send message: cannot produce message")
3939
}
4040
}
41+
42+
func TestSend(t *testing.T) {
43+
msp := mocks.NewSyncProducer(t, nil)
44+
cfg := producer.NewConfig("id", producer.MessageConverterV1())
45+
p, err := producer.NewFrom(msp, cfg)
46+
require.NoError(t, err)
47+
48+
msp.ExpectSendMessageAndSucceed()
49+
msg, err := p.Send(context.Background(), "topic", "message", producer.Int64Key(10), producer.Header("k", "v"))
50+
require.NoError(t, err)
51+
key, err := msg.Key.Encode()
52+
require.NoError(t, err)
53+
54+
require.EqualValues(t, "10", key)
55+
require.Equal(t, "v", msg.Headers["k"])
56+
57+
msp.ExpectSendMessageAndFail(fmt.Errorf("cannot produce message"))
58+
_, err = p.Send(context.Background(), "topic", "message")
59+
require.EqualError(t, err, "producer: failed to send message: cannot produce message")
60+
}

0 commit comments

Comments
 (0)