Skip to content

Commit 5317c76

Browse files
committed
cleaning up
1 parent d385e5b commit 5317c76

File tree

8 files changed

+20
-257
lines changed

8 files changed

+20
-257
lines changed

pkg/gofr/datasource/redis/config_test.go

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,6 @@ import (
1313
"gofr.dev/pkg/gofr/logging"
1414
)
1515

16-
// testGetRedisConfig is a helper function that creates mock logger and config,
17-
// then returns the Redis config. This reduces code duplication in tests.
1816
func testGetRedisConfig(t *testing.T, configMap map[string]string) *Config {
1917
t.Helper()
2018

@@ -51,7 +49,6 @@ func TestGetRedisConfig_InvalidPortAndDB(t *testing.T) {
5149
}
5250

5351
func TestGetRedisConfig_TLS(t *testing.T) {
54-
// Create temporary cert files
5552
certFile, err := os.CreateTemp(t.TempDir(), "cert-*.pem")
5653
require.NoError(t, err)
5754

@@ -70,12 +67,10 @@ func TestGetRedisConfig_TLS(t *testing.T) {
7067
defer os.Remove(caFile.Name())
7168
defer caFile.Close()
7269

73-
// Write dummy content (not valid PEM, but enough to trigger file read)
7470
_, _ = certFile.WriteString("-----BEGIN CERTIFICATE-----\nMIID\n-----END CERTIFICATE-----")
7571
_, _ = keyFile.WriteString("-----BEGIN PRIVATE KEY-----\nMIIE\n-----END PRIVATE KEY-----")
7672
_, _ = caFile.WriteString("-----BEGIN CERTIFICATE-----\nMIID\n-----END CERTIFICATE-----")
7773

78-
// This will log errors because dummy content is not valid PEM, but it tests the path
7974
conf := testGetRedisConfig(t, map[string]string{
8075
"REDIS_HOST": "localhost",
8176
"REDIS_TLS_ENABLED": "true",
@@ -98,7 +93,6 @@ func TestGetRedisConfig_TLS_InvalidFiles(t *testing.T) {
9893
})
9994

10095
assert.NotNil(t, conf.TLS)
101-
// Should be empty as files failed to load
10296
assert.Empty(t, conf.TLS.Certificates)
10397
assert.Nil(t, conf.TLS.RootCAs)
10498
}
@@ -153,7 +147,6 @@ func TestGetRedisConfig_PubSubStreams_InvalidValues(t *testing.T) {
153147
assert.Equal(t, "streams", conf.PubSubMode)
154148
require.NotNil(t, conf.PubSubStreamsConfig)
155149

156-
// Should use defaults
157150
assert.Equal(t, int64(0), conf.PubSubStreamsConfig.MaxLen)
158151
assert.Equal(t, 5*time.Second, conf.PubSubStreamsConfig.Block)
159152
}

pkg/gofr/datasource/redis/health.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ const (
1919
func (r *Redis) HealthCheck() datasource.Health {
2020
h := datasource.Health{
2121
Details: make(map[string]any),
22-
Status: datasource.StatusDown,
2322
}
2423

2524
h.Details["host"] = r.config.HostName + ":" + strconv.Itoa(r.config.Port)

pkg/gofr/datasource/redis/health_test.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ func TestPubSub_HealthDown(t *testing.T) {
2121
client, mock := setupMockTest(t, nil)
2222
defer client.Close()
2323

24-
// Test Health Down (Ping fails)
2524
mock.ExpectPing().SetErr(errMockPing)
2625

2726
h := client.PubSub.Health()
@@ -35,7 +34,6 @@ func TestPubSub_HealthUp(t *testing.T) {
3534
client, mock := setupMockTest(t, nil)
3635
defer client.Close()
3736

38-
// Test Health Up (Ping succeeds)
3937
mock.ExpectPing().SetVal("PONG")
4038

4139
h := client.PubSub.Health()
@@ -68,7 +66,6 @@ func TestPubSub_HealthDefaultMode(t *testing.T) {
6866
client, mock := setupMockTest(t, map[string]string{
6967
"REDIS_HOST": "localhost",
7068
"REDIS_PORT": "6379",
71-
// REDIS_PUBSUB_MODE not set, should default to streams
7269
})
7370
defer client.Close()
7471

pkg/gofr/datasource/redis/messages_test.go

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ func TestPubSubMessage_Commit(t *testing.T) {
2727
ctx := context.Background()
2828
topic := "pubsub-commit-topic"
2929

30-
// Start subscription first to ensure it's ready
3130
msgChan := make(chan *pubsub.Message)
3231
errChan := make(chan error)
3332

@@ -41,22 +40,18 @@ func TestPubSubMessage_Commit(t *testing.T) {
4140
msgChan <- msg
4241
}()
4342

44-
// Wait a bit for subscription to be ready
4543
time.Sleep(100 * time.Millisecond)
4644

47-
// Then publish the message
4845
err := client.PubSub.Publish(ctx, topic, []byte("test"))
4946
require.NoError(t, err)
5047

51-
// Wait for message with timeout
5248
select {
5349
case err := <-errChan:
5450
require.NoError(t, err)
5551
case msg := <-msgChan:
5652
require.NotNil(t, msg)
5753
require.NotNil(t, msg.Committer)
5854

59-
// Commit should not panic
6055
assert.NotPanics(t, func() {
6156
msg.Committer.Commit()
6257
})
@@ -163,7 +158,6 @@ func TestStreamMessage_Commit_Success(t *testing.T) {
163158

164159
mock.ExpectXAck(stream, group, id).SetVal(1)
165160

166-
// Test stream message commit through actual stream subscription
167161
client, s := setupTest(t, map[string]string{
168162
"REDIS_PUBSUB_MODE": "streams",
169163
"REDIS_STREAMS_CONSUMER_GROUP": group,
@@ -173,21 +167,18 @@ func TestStreamMessage_Commit_Success(t *testing.T) {
173167

174168
ctx := context.Background()
175169

176-
// Create topic and publish
177170
err := client.PubSub.CreateTopic(ctx, stream)
178171
require.NoError(t, err)
179172

180173
go func() {
181174
_ = client.PubSub.Publish(ctx, stream, []byte("test"))
182175
}()
183176

184-
// Subscribe to get message with committer
185177
msg, err := client.PubSub.Subscribe(ctx, stream)
186178
require.NoError(t, err)
187179
require.NotNil(t, msg)
188180
require.NotNil(t, msg.Committer)
189181

190-
// Commit should not panic
191182
assert.NotPanics(t, func() {
192183
msg.Committer.Commit()
193184
})
@@ -196,7 +187,6 @@ func TestStreamMessage_Commit_Success(t *testing.T) {
196187
func TestStreamMessage_Commit_Error(t *testing.T) {
197188
t.Parallel()
198189

199-
// Test stream message commit error handling
200190
client, s := setupTest(t, map[string]string{
201191
"REDIS_PUBSUB_MODE": "streams",
202192
"REDIS_STREAMS_CONSUMER_GROUP": "test-group",
@@ -207,24 +197,20 @@ func TestStreamMessage_Commit_Error(t *testing.T) {
207197
ctx := context.Background()
208198
stream := "test-stream-error"
209199

210-
// Create topic and publish
211200
err := client.PubSub.CreateTopic(ctx, stream)
212201
require.NoError(t, err)
213202

214203
go func() {
215204
_ = client.PubSub.Publish(ctx, stream, []byte("test"))
216205
}()
217206

218-
// Subscribe to get message
219207
msg, err := client.PubSub.Subscribe(ctx, stream)
220208
require.NoError(t, err)
221209
require.NotNil(t, msg)
222210
require.NotNil(t, msg.Committer)
223211

224-
// Close Redis to simulate error
225212
s.Close()
226213

227-
// Commit should handle error gracefully
228214
assert.NotPanics(t, func() {
229215
msg.Committer.Commit()
230216
})
@@ -233,7 +219,6 @@ func TestStreamMessage_Commit_Error(t *testing.T) {
233219
func TestStreamMessage_Commit_Timeout(t *testing.T) {
234220
t.Parallel()
235221

236-
// Test stream message commit with timeout
237222
client, s := setupTest(t, map[string]string{
238223
"REDIS_PUBSUB_MODE": "streams",
239224
"REDIS_STREAMS_CONSUMER_GROUP": "test-group",
@@ -244,21 +229,18 @@ func TestStreamMessage_Commit_Timeout(t *testing.T) {
244229
ctx := context.Background()
245230
stream := "test-stream-timeout"
246231

247-
// Create topic and publish
248232
err := client.PubSub.CreateTopic(ctx, stream)
249233
require.NoError(t, err)
250234

251235
go func() {
252236
_ = client.PubSub.Publish(ctx, stream, []byte("test"))
253237
}()
254238

255-
// Subscribe to get message
256239
msg, err := client.PubSub.Subscribe(ctx, stream)
257240
require.NoError(t, err)
258241
require.NotNil(t, msg)
259242
require.NotNil(t, msg.Committer)
260243

261-
// Commit should handle timeout gracefully (uses defaultRetryTimeout internally)
262244
assert.NotPanics(t, func() {
263245
msg.Committer.Commit()
264246
})

pkg/gofr/datasource/redis/pubsub.go

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -280,10 +280,6 @@ func (ps *PubSub) ensureConsumerGroup(ctx context.Context, topic, group string)
280280

281281
// checkGroupExists checks if a consumer group exists for the given stream.
282282
func (ps *PubSub) checkGroupExists(ctx context.Context, topic, group string) bool {
283-
if !ps.isConnected() {
284-
return false
285-
}
286-
287283
groups, err := ps.client.XInfoGroups(ctx, topic).Result()
288284
if err != nil {
289285
// If XInfoGroups failed (e.g., stream doesn't exist), we'll create it with MKSTREAM
@@ -460,7 +456,6 @@ func (ps *PubSub) createStreamTopic(ctx context.Context, name string) error {
460456

461457
group := ps.config.PubSubStreamsConfig.ConsumerGroup
462458

463-
// checkGroupExists already calls isConnected()
464459
groupExists := ps.checkGroupExists(ctx, name, group)
465460
if groupExists {
466461
return nil
@@ -734,14 +729,14 @@ func (*PubSub) collectMessages(ctx context.Context, ch <-chan *redis.Message, li
734729
collected := 0
735730

736731
for collected < limit {
737-
// Check context first
732+
// Check context first before attempting to receive from channel
738733
select {
739734
case <-ctx.Done():
740735
return result
741736
default:
742737
}
743738

744-
// Then try to receive message
739+
// Now try to receive from channel, but also check context
745740
select {
746741
case <-ctx.Done():
747742
return result

0 commit comments

Comments
 (0)