Skip to content

Commit 00e0023

Browse files
authored
Merge pull request #19 from clubpay/feat/kafka-consumer
Refactor `bi-kafka` module structure and remove unused components.
2 parents efcdfc7 + 79877fd commit 00e0023

8 files changed

Lines changed: 705 additions & 53 deletions

File tree

kafka/consumer_test.go

Lines changed: 270 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,270 @@
1+
package kafka
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
8+
kafkago "github.com/segmentio/kafka-go"
9+
"github.com/stretchr/testify/assert"
10+
"github.com/stretchr/testify/require"
11+
)
12+
13+
func TestNewConsumer_nilConfig(t *testing.T) {
14+
c, err := NewConsumer(nil)
15+
assert.Nil(t, c)
16+
assert.Error(t, err)
17+
}
18+
19+
func TestValidateConsumerConfig(t *testing.T) {
20+
base := func() *ConsumerConfig {
21+
return &ConsumerConfig{
22+
BootstrapServers: []string{"localhost:9092"},
23+
GroupID: "g",
24+
Topic: "orders",
25+
}
26+
}
27+
28+
t.Run("missing brokers", func(t *testing.T) {
29+
cfg := base()
30+
cfg.BootstrapServers = nil
31+
cfg.Brokers = nil
32+
_, err := NewConsumer(cfg)
33+
assert.Error(t, err)
34+
})
35+
36+
t.Run("missing group id", func(t *testing.T) {
37+
cfg := base()
38+
cfg.GroupID = ""
39+
_, err := NewConsumer(cfg)
40+
assert.Error(t, err)
41+
})
42+
43+
t.Run("topic and group topics both set", func(t *testing.T) {
44+
cfg := base()
45+
cfg.GroupTopics = []string{"a"}
46+
_, err := NewConsumer(cfg)
47+
assert.Error(t, err)
48+
})
49+
50+
t.Run("neither topic nor group topics", func(t *testing.T) {
51+
cfg := base()
52+
cfg.Topic = ""
53+
_, err := NewConsumer(cfg)
54+
assert.Error(t, err)
55+
})
56+
57+
t.Run("invalid start offset", func(t *testing.T) {
58+
cfg := base()
59+
cfg.StartOffset = 123
60+
_, err := NewConsumer(cfg)
61+
assert.Error(t, err)
62+
})
63+
64+
t.Run("SASL username without password", func(t *testing.T) {
65+
cfg := base()
66+
cfg.Username = "u"
67+
_, err := NewConsumer(cfg)
68+
assert.Error(t, err)
69+
})
70+
71+
t.Run("target batch only one field set", func(t *testing.T) {
72+
cfg := base()
73+
cfg.TargetBatchMessages = 10
74+
_, err := NewConsumer(cfg)
75+
assert.Error(t, err)
76+
})
77+
78+
t.Run("min bytes greater than max", func(t *testing.T) {
79+
cfg := base()
80+
cfg.MinBytes = 100
81+
cfg.MaxBytes = 50
82+
_, err := NewConsumer(cfg)
83+
assert.Error(t, err)
84+
})
85+
86+
t.Run("min bytes greater than derived max", func(t *testing.T) {
87+
cfg := base()
88+
cfg.MinBytes = 5000
89+
cfg.TargetBatchMessages = 10
90+
cfg.MessageBytesUpperBound = 100
91+
_, err := NewConsumer(cfg)
92+
assert.Error(t, err)
93+
})
94+
}
95+
96+
func TestReaderConfigFromConsumer(t *testing.T) {
97+
cfg := &ConsumerConfig{
98+
BootstrapServers: []string{"127.0.0.1:9092"},
99+
GroupID: "grp",
100+
Topic: "t",
101+
CommitInterval: 2 * time.Second,
102+
MinBytes: 512,
103+
MaxBytes: 2e6,
104+
MaxWait: 3 * time.Second,
105+
ReadBatchTimeout: 4 * time.Second,
106+
QueueCapacity: 200,
107+
SessionTimeout: 25 * time.Second,
108+
RebalanceTimeout: 30 * time.Second,
109+
RetentionTime: time.Hour,
110+
HeartbeatInterval: 2 * time.Second,
111+
StartOffset: kafkago.FirstOffset,
112+
IsolationLevel: kafkago.ReadCommitted,
113+
ReadLagInterval: -1,
114+
ReadBackoffMin: 50 * time.Millisecond,
115+
ReadBackoffMax: 500 * time.Millisecond,
116+
MaxAttempts: 5,
117+
GroupBalancers: []kafkago.GroupBalancer{kafkago.RoundRobinGroupBalancer{}},
118+
}
119+
d := newDialer(dialerParams{ClientID: "cid", Timeout: time.Second})
120+
rc, err := readerConfigFromConsumer(cfg, d)
121+
require.NoError(t, err)
122+
123+
assert.Equal(t, cfg.brokers(), rc.Brokers)
124+
assert.Equal(t, cfg.GroupID, rc.GroupID)
125+
assert.Equal(t, cfg.Topic, rc.Topic)
126+
assert.Equal(t, cfg.CommitInterval, rc.CommitInterval)
127+
assert.Equal(t, 512, rc.MinBytes)
128+
assert.Equal(t, int(2e6), rc.MaxBytes)
129+
assert.Equal(t, 3*time.Second, rc.MaxWait)
130+
assert.Equal(t, 4*time.Second, rc.ReadBatchTimeout)
131+
assert.Equal(t, 200, rc.QueueCapacity)
132+
assert.Equal(t, 25*time.Second, rc.SessionTimeout)
133+
assert.Equal(t, 30*time.Second, rc.RebalanceTimeout)
134+
assert.Equal(t, time.Hour, rc.RetentionTime)
135+
assert.Equal(t, 2*time.Second, rc.HeartbeatInterval)
136+
assert.Equal(t, kafkago.FirstOffset, rc.StartOffset)
137+
assert.Equal(t, kafkago.ReadCommitted, rc.IsolationLevel)
138+
assert.Equal(t, time.Duration(-1), rc.ReadLagInterval)
139+
assert.Equal(t, 50*time.Millisecond, rc.ReadBackoffMin)
140+
assert.Equal(t, 500*time.Millisecond, rc.ReadBackoffMax)
141+
assert.Equal(t, 5, rc.MaxAttempts)
142+
require.Len(t, rc.GroupBalancers, 1)
143+
_, ok := rc.GroupBalancers[0].(kafkago.RoundRobinGroupBalancer)
144+
assert.True(t, ok)
145+
assert.Same(t, d, rc.Dialer)
146+
147+
require.NoError(t, rc.Validate())
148+
}
149+
150+
func TestReaderConfigFromConsumer_derivedMaxBytes(t *testing.T) {
151+
cfg := &ConsumerConfig{
152+
BootstrapServers: []string{"127.0.0.1:9092"},
153+
GroupID: "g",
154+
Topic: "t",
155+
TargetBatchMessages: 50,
156+
MessageBytesUpperBound: 2048,
157+
}
158+
d := newDialer(dialerParams{ClientID: "c", Timeout: time.Second})
159+
rc, err := readerConfigFromConsumer(cfg, d)
160+
require.NoError(t, err)
161+
assert.Equal(t, 50*2048, rc.MaxBytes)
162+
require.NoError(t, rc.Validate())
163+
}
164+
165+
func TestConsumer_CommitMessages_autoModeRejected(t *testing.T) {
166+
cfg := &ConsumerConfig{
167+
BootstrapServers: []string{"localhost:9092"},
168+
GroupID: "g",
169+
Topic: "t",
170+
CommitInterval: time.Second,
171+
}
172+
c, err := NewConsumer(cfg)
173+
require.NoError(t, err)
174+
t.Cleanup(func() { _ = c.Close() })
175+
176+
err = c.CommitMessages(context.Background())
177+
assert.Error(t, err)
178+
assert.Contains(t, err.Error(), "manual commit")
179+
}
180+
181+
func TestConsumer_Stats(t *testing.T) {
182+
cfg := &ConsumerConfig{
183+
BootstrapServers: []string{"localhost:9092"},
184+
GroupID: "g",
185+
Topic: "t",
186+
}
187+
c, err := NewConsumer(cfg)
188+
require.NoError(t, err)
189+
t.Cleanup(func() { _ = c.Close() })
190+
191+
st := c.Stats()
192+
_ = st.Lag
193+
assert.NotEmpty(t, st.ClientID)
194+
}
195+
196+
func TestConsume_doubleStart(t *testing.T) {
197+
cfg := &ConsumerConfig{
198+
BootstrapServers: []string{"localhost:9092"},
199+
GroupID: "g",
200+
Topic: "t",
201+
}
202+
c, err := NewConsumer(cfg)
203+
require.NoError(t, err)
204+
t.Cleanup(func() { _ = c.Close() })
205+
206+
ctx, cancel := context.WithCancel(context.Background())
207+
defer cancel()
208+
209+
ch, err := c.Consume(ctx, 4)
210+
require.NoError(t, err)
211+
_, err = c.Consume(ctx, 4)
212+
assert.Error(t, err)
213+
assert.Contains(t, err.Error(), "already active")
214+
215+
cancel()
216+
for range ch {
217+
}
218+
}
219+
220+
func TestConsume_ctxAlreadyCancelled(t *testing.T) {
221+
cfg := &ConsumerConfig{
222+
BootstrapServers: []string{"localhost:9092"},
223+
GroupID: "g",
224+
Topic: "t",
225+
}
226+
c, err := NewConsumer(cfg)
227+
require.NoError(t, err)
228+
t.Cleanup(func() { _ = c.Close() })
229+
230+
ctx, cancel := context.WithCancel(context.Background())
231+
cancel()
232+
233+
ch, err := c.Consume(ctx, 2)
234+
require.NoError(t, err)
235+
_, ok := <-ch
236+
assert.False(t, ok, "channel should close without messages when ctx is done")
237+
}
238+
239+
func TestFetch_whileConsumeActive(t *testing.T) {
240+
cfg := &ConsumerConfig{
241+
BootstrapServers: []string{"localhost:9092"},
242+
GroupID: "g",
243+
Topic: "t",
244+
}
245+
c, err := NewConsumer(cfg)
246+
require.NoError(t, err)
247+
t.Cleanup(func() { _ = c.Close() })
248+
249+
ctx, cancel := context.WithCancel(context.Background())
250+
defer cancel()
251+
252+
_, err = c.Consume(ctx, 2)
253+
require.NoError(t, err)
254+
255+
_, err = c.Fetch(context.Background())
256+
assert.Error(t, err)
257+
assert.Contains(t, err.Error(), "Consume is active")
258+
}
259+
260+
func TestDialer_SASLAndTLS(t *testing.T) {
261+
d := newDialer(dialerParams{
262+
ClientID: "c",
263+
Timeout: time.Second,
264+
Username: "u",
265+
Password: "p",
266+
EnableTLS: true,
267+
})
268+
require.NotNil(t, d.TLS)
269+
require.NotNil(t, d.SASLMechanism)
270+
}

kafka/dialer.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package kafka
2+
3+
import (
4+
"crypto/tls"
5+
"time"
6+
7+
kafkago "github.com/segmentio/kafka-go"
8+
"github.com/segmentio/kafka-go/sasl/plain"
9+
)
10+
11+
// dialerParams holds connection settings shared by consumer (and future producer wiring).
12+
type dialerParams struct {
13+
ClientID string
14+
Timeout time.Duration
15+
Username string
16+
Password string
17+
EnableTLS bool
18+
}
19+
20+
func newDialer(p dialerParams) *kafkago.Dialer {
21+
d := &kafkago.Dialer{
22+
ClientID: p.ClientID,
23+
Timeout: p.Timeout,
24+
}
25+
if p.EnableTLS {
26+
d.TLS = &tls.Config{
27+
MinVersion: tls.VersionTLS12,
28+
}
29+
}
30+
if p.Username != "" || p.Password != "" {
31+
d.SASLMechanism = plain.Mechanism{
32+
Username: p.Username,
33+
Password: p.Password,
34+
}
35+
}
36+
return d
37+
}
Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,21 @@
1-
package bikafka
1+
package kafka
22

33
import (
44
"context"
55
"fmt"
66
"log"
77
"time"
88

9-
"github.com/segmentio/kafka-go"
9+
kafkago "github.com/segmentio/kafka-go"
1010
)
1111

12-
// Example demonstrates how to use the bi-kafka producer with traditional config
12+
// Example demonstrates how to use the producer (kafka-producer.go) with traditional config
1313
func Example() {
1414
// Create configuration
1515
config := &Config{
1616
BootstrapServers: []string{"localhost:9092"},
1717
// Optional: Set compression
18-
Compression: kafka.Snappy,
18+
Compression: kafkago.Snappy,
1919
// Optional: Set timeout
2020
Timeout: 30 * time.Second,
2121
// Optional: Set client ID
@@ -39,7 +39,7 @@ func Example() {
3939
Topic: "my-topic",
4040
Key: []byte("message-key"),
4141
Value: []byte("Hello, Kafka!"),
42-
Headers: []kafka.Header{
42+
Headers: []kafkago.Header{
4343
{
4444
Key: "source",
4545
Value: []byte("example-app"),
@@ -59,7 +59,7 @@ func Example() {
5959
fmt.Println("Message sent successfully!")
6060
}
6161

62-
// ExampleWithOptions demonstrates how to use the bi-kafka producer with options pattern
62+
// ExampleWithOptions demonstrates how to use the producer with options pattern
6363
func ExampleWithOptions() {
6464
// Create producer using options pattern
6565
producer, err := NewProducerWithOptions(
@@ -81,7 +81,7 @@ func ExampleWithOptions() {
8181
Topic: "my-topic",
8282
Key: []byte("message-key"),
8383
Value: []byte("Hello, Kafka with Options!"),
84-
Headers: []kafka.Header{
84+
Headers: []kafkago.Header{
8585
{
8686
Key: "source",
8787
Value: []byte("example-app"),

0 commit comments

Comments
 (0)