Skip to content

Commit a4911aa

Browse files
chore: add more unit test case
1 parent 9c65897 commit a4911aa

6 files changed

Lines changed: 553 additions & 7 deletions

File tree

services/mqtt/client/pubsub.go

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,18 @@ import (
1313

1414
// MqttPubSubClient wraps a courier MQTT client with start/stop lifecycle management.
1515
type MqttPubSubClient struct {
16-
client *courier.Client
17-
consulResolver *consul.Resolver
16+
client client
17+
resolver resolver
18+
}
19+
20+
type client interface {
21+
Start() error
22+
Stop()
23+
IsConnected() bool
24+
}
25+
26+
type resolver interface {
27+
Start()
1828
}
1929

2030
// NewMqttPubSubClient initializes a new MQTT client with Consul-based service discovery and credentials.
@@ -28,11 +38,13 @@ func NewMqttPubSubClient(ctx context.Context, handler courier.MessageHandler, cl
2838
WaitTime: consulCfg.WaitTime,
2939
})
3040
if err != nil {
41+
logger.Infof("error while creating the consul new resolver %v", err)
3142
return nil, fmt.Errorf("failed to create consul resolver: %w", err)
3243
}
3344

3445
credFetcher, err := newCredentialFetcher()
3546
if err != nil {
47+
logger.Infof("error while creating credential %v", err)
3648
return nil, fmt.Errorf("failed to create credential fetcher: %w", err)
3749
}
3850

@@ -51,13 +63,13 @@ func NewMqttPubSubClient(ctx context.Context, handler courier.MessageHandler, cl
5163
courier.WithCustomDecoder(protoDecoder),
5264
}
5365

54-
client, err := courier.NewClient(clientOpts...)
66+
c, err := courier.NewClient(clientOpts...)
5567
if err != nil {
5668
return nil, fmt.Errorf("failed to initialize MQTT client: %w", err)
5769
}
5870

5971
logger.Infof("MQTT client initialized successfully for clientID=%s", clientID)
60-
return &MqttPubSubClient{client: client, consulResolver: rs}, nil
72+
return &MqttPubSubClient{client: c, resolver: rs}, nil
6173
}
6274

6375
// registerHandler registers the subscription handler when the client connects.
@@ -74,7 +86,7 @@ func registerHandler(ctx context.Context, handler courier.MessageHandler) func(c
7486

7587
// Start begins the MQTT client operation.
7688
func (m *MqttPubSubClient) Start() error {
77-
go m.consulResolver.Start()
89+
go m.resolver.Start()
7890
if err := m.client.Start(); err != nil {
7991
logger.Infof("MQTT client start failed due to %v", err)
8092
return fmt.Errorf("failed to start MQTT client: %w", err)
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
package client
2+
3+
import (
4+
"context"
5+
"errors"
6+
"github.com/gojek/courier-go"
7+
"github.com/goto/raccoon/config"
8+
"sync"
9+
"testing"
10+
"time"
11+
12+
"github.com/stretchr/testify/assert"
13+
"github.com/stretchr/testify/mock"
14+
)
15+
16+
// ---- Mock Definitions ----
17+
18+
type mockCourierClient struct {
19+
mock.Mock
20+
}
21+
22+
func (m *mockCourierClient) Start() error {
23+
args := m.Called()
24+
return args.Error(0)
25+
}
26+
func (m *mockCourierClient) Stop() {
27+
m.Called()
28+
}
29+
func (m *mockCourierClient) IsConnected() bool {
30+
args := m.Called()
31+
return args.Bool(0)
32+
}
33+
34+
type mockResolver struct {
35+
mock.Mock
36+
wg *sync.WaitGroup
37+
}
38+
39+
func (m *mockResolver) Start() {
40+
defer m.wg.Done()
41+
m.Called()
42+
}
43+
44+
// ---- Tests ----
45+
46+
func TestMqttPubSubClient_Start(t *testing.T) {
47+
48+
t.Run("Start success", func(t *testing.T) {
49+
50+
wg := &sync.WaitGroup{}
51+
wg.Add(1)
52+
mockResolver := &mockResolver{wg: wg}
53+
mockResolver.On("Start").Return().Once()
54+
55+
mockClient := new(mockCourierClient)
56+
mockClient.On("Start").Return(nil).Once()
57+
58+
m := &MqttPubSubClient{client: mockClient, resolver: mockResolver}
59+
err := m.Start()
60+
assert.NoError(t, err)
61+
62+
wg.Wait() // wait for goroutine completion
63+
mockResolver.AssertExpectations(t)
64+
mockClient.AssertExpectations(t)
65+
66+
})
67+
68+
t.Run("Start failure", func(t *testing.T) {
69+
70+
wg := &sync.WaitGroup{}
71+
wg.Add(1)
72+
mockResolver := &mockResolver{wg: wg}
73+
mockResolver.On("Start").Return().Once()
74+
75+
mockClient := new(mockCourierClient)
76+
mockClient.On("Start").Return(errors.New("connection failed")).Once()
77+
78+
m := &MqttPubSubClient{client: mockClient, resolver: mockResolver}
79+
err := m.Start()
80+
assert.ErrorContains(t, err, "failed to start MQTT client")
81+
82+
wg.Wait() // wait for goroutine completion
83+
mockResolver.AssertExpectations(t)
84+
mockClient.AssertExpectations(t)
85+
})
86+
87+
}
88+
89+
func TestMqttPubSubClient_Stop(t *testing.T) {
90+
91+
t.Run("Stop always succeeds", func(t *testing.T) {
92+
mockClient := new(mockCourierClient)
93+
mockResolver := new(mockResolver)
94+
95+
mockClient.On("Stop").Return().Once()
96+
97+
m := &MqttPubSubClient{
98+
client: mockClient,
99+
resolver: mockResolver,
100+
}
101+
102+
err := m.Stop()
103+
assert.NoError(t, err)
104+
mockClient.AssertExpectations(t)
105+
})
106+
}
107+
108+
func TestMqttPubSubClient_IsConnected(t *testing.T) {
109+
110+
t.Run("IsConnected reflects client state", func(t *testing.T) {
111+
mockClient := new(mockCourierClient)
112+
mockResolver := new(mockResolver)
113+
114+
mockClient.On("IsConnected").Return(true).Once()
115+
116+
m := &MqttPubSubClient{
117+
client: mockClient,
118+
resolver: mockResolver,
119+
}
120+
121+
assert.True(t, m.IsConnected())
122+
mockClient.AssertExpectations(t)
123+
})
124+
}
125+
126+
func TestNewMqttPubSubClient(t *testing.T) {
127+
128+
t.Run("new pubsub client should be created", func(t *testing.T) {
129+
mockPubSub := new(MockPubSub)
130+
config.ServerMQTT.ConsulConfig.KVKey = "/test/path"
131+
config.ServerMQTT.ConsulConfig.Address = "localhost:8085"
132+
config.ServerMQTT.ConsulConfig.HealthOnly = true
133+
config.ServerMQTT.ConsulConfig.WaitTime = 1 * time.Second
134+
config.ServerMQTT.AuthConfig.Username = "test"
135+
config.ServerMQTT.AuthConfig.Password = "pass"
136+
c, err := NewMqttPubSubClient(context.Background(), mockPubSub.Subscribe, "test-client-1")
137+
assert.Nil(t, err)
138+
assert.NotNil(t, c.client)
139+
assert.NotNil(t, c.resolver)
140+
})
141+
142+
}
143+
144+
type MockPubSub struct {
145+
mock.Mock
146+
}
147+
148+
func (m *MockPubSub) Subscribe(ctx context.Context, c courier.PubSub, message *courier.Message) {
149+
m.Called(ctx, c, message)
150+
return
151+
}

services/mqtt/consumer_test.go

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
package mqtt
2+
3+
import (
4+
"errors"
5+
"testing"
6+
7+
"github.com/stretchr/testify/assert"
8+
"github.com/stretchr/testify/mock"
9+
)
10+
11+
func TestConsumer_Init(t *testing.T) {
12+
tests := []struct {
13+
name string
14+
startErr error
15+
expectErr bool
16+
}{
17+
{
18+
name: "successfully starts client",
19+
startErr: nil,
20+
expectErr: false,
21+
},
22+
{
23+
name: "fails to start client",
24+
startErr: errors.New("connection failed"),
25+
expectErr: true,
26+
},
27+
}
28+
29+
for _, tt := range tests {
30+
t.Run(tt.name, func(t *testing.T) {
31+
mockClient := new(mockPubSubClient)
32+
mockClient.On("Start").Return(tt.startErr).Once()
33+
34+
c := NewConsumer(mockClient)
35+
err := c.Init()
36+
37+
if tt.expectErr {
38+
assert.Error(t, err)
39+
assert.Contains(t, err.Error(), "failed to start consumer")
40+
} else {
41+
assert.NoError(t, err)
42+
}
43+
44+
mockClient.AssertExpectations(t)
45+
})
46+
}
47+
}
48+
49+
func TestConsumer_Shutdown(t *testing.T) {
50+
tests := []struct {
51+
name string
52+
stopErr error
53+
expectErr bool
54+
}{
55+
{
56+
name: "successfully stops client",
57+
stopErr: nil,
58+
expectErr: false,
59+
},
60+
{
61+
name: "fails to stop client",
62+
stopErr: errors.New("disconnect failed"),
63+
expectErr: true,
64+
},
65+
}
66+
67+
for _, tt := range tests {
68+
t.Run(tt.name, func(t *testing.T) {
69+
mockClient := new(mockPubSubClient)
70+
mockClient.On("Stop").Return(tt.stopErr).Once()
71+
72+
c := NewConsumer(mockClient)
73+
err := c.Shutdown()
74+
75+
if tt.expectErr {
76+
assert.Error(t, err)
77+
assert.Contains(t, err.Error(), "failed to stop consumer")
78+
} else {
79+
assert.NoError(t, err)
80+
}
81+
82+
mockClient.AssertExpectations(t)
83+
})
84+
}
85+
}
86+
87+
func TestConsumer_IsHealthy(t *testing.T) {
88+
tests := []struct {
89+
name string
90+
isConnected bool
91+
}{
92+
{"client connected", true},
93+
{"client disconnected", false},
94+
}
95+
96+
for _, tt := range tests {
97+
t.Run(tt.name, func(t *testing.T) {
98+
mockClient := new(mockPubSubClient)
99+
mockClient.On("IsConnected").Return(tt.isConnected).Once()
100+
101+
c := NewConsumer(mockClient)
102+
assert.Equal(t, tt.isConnected, c.IsHealthy())
103+
104+
mockClient.AssertExpectations(t)
105+
})
106+
}
107+
}
108+
109+
// --- Mock Implementation ---
110+
111+
type mockPubSubClient struct {
112+
mock.Mock
113+
}
114+
115+
func (m *mockPubSubClient) Start() error {
116+
args := m.Called()
117+
return args.Error(0)
118+
}
119+
120+
func (m *mockPubSubClient) Stop() error {
121+
args := m.Called()
122+
return args.Error(0)
123+
}
124+
125+
func (m *mockPubSubClient) IsConnected() bool {
126+
args := m.Called()
127+
return args.Bool(0)
128+
}

services/mqtt/handler.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,14 +39,18 @@ func (h *Handler) MQTTHandler(ctx context.Context, c courier.PubSub, message *co
3939
log.Errorf("mqtt message decoding failed due to : %v", err)
4040
return
4141
}
42-
log.Infof("MQTT message content post deserialization %v", req)
42+
//to be removed post end-to-end test
43+
for _, event := range req.Events {
44+
log.Infof("MQTT message content post deserialization %v", event)
45+
}
46+
4347
//instrument the request number
4448
metrics.Increment(
4549
"batches_read_total",
4650
fmt.Sprintf("status=success,conn_group=%s", identifier.Group),
4751
)
4852
//instrument the request size
49-
reqBytes, err := serialization.SerializeProto(req)
53+
reqBytes, err := serialization.SerializeProto(&req)
5054
if err != nil {
5155
log.Errorf("mqtt message serialization failed : %v", err)
5256
} else {

0 commit comments

Comments
 (0)