Skip to content

Commit 6dee7d0

Browse files
committed
fixed revie changes
1 parent c5cae20 commit 6dee7d0

File tree

8 files changed

+92
-27
lines changed

8 files changed

+92
-27
lines changed

docs/advanced-guide/using-publisher-subscriber/page.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -434,7 +434,7 @@ The following configs apply specifically to Redis Pub/Sub behavior. For base Red
434434
---
435435

436436
- `REDIS_STREAMS_BLOCK_TIMEOUT`
437-
- Block duration for stream reads using Redis `XREADGROUP`. This controls how long the consumer blocks waiting for new messages before the Redis call times out and retries. **Benefits of configuring this:** (1) **Resource efficiency** - Without blocking, consumers would constantly poll Redis, wasting CPU cycles and network bandwidth. Blocking allows Redis to push messages immediately when available. (2) **Latency vs CPU trade-off** - Lower values (e.g., `1s-2s`) provide faster message detection but increase CPU from frequent timeouts. Higher values (e.g., `10s-30s`) reduce CPU usage and network round-trips but may delay processing. (3) **Cost optimization** - In cloud/serverless environments, reducing CPU usage directly reduces costs. (4) **Battery efficiency** - Important for mobile/edge deployments where power consumption matters. Choose based on your latency requirements: real-time systems may use `1s-2s`, while batch processing can use `10s-30s`.
437+
- Block duration for stream reads using Redis `XREADGROUP`. Controls how long the consumer blocks waiting for new messages before timing out. Lower values (1s-2s) provide faster detection but increase CPU usage. Higher values (10s-30s) reduce CPU usage, ideal for batch processing.
438438
- `5s`
439439
- `2s` (low latency) or `30s` (low CPU)
440440

@@ -498,7 +498,9 @@ docker run -d \
498498
> **Important**: If you are using **GoFr migrations** with **Redis** and also using **Redis Pub/Sub in Streams mode**, do not use the same Redis logical DB for both.
499499
> GoFr stores Redis migration state in a Redis **HASH** named `gofr_migrations`, while Redis Streams mode uses a Redis **STREAM** key for topics (including the PubSub migration topic `gofr_migrations`).
500500
> If both clients share the same DB, migrations can fail with `WRONGTYPE` errors.
501-
> Set `REDIS_PUBSUB_DB` to a different DB index (for example, `REDIS_DB=0` and `REDIS_PUBSUB_DB=1`).
501+
> By default, `REDIS_DB` is `0` and `REDIS_PUBSUB_DB` is `15` (highest default Redis database), so they are already separated. If you change `REDIS_DB` from the default, ensure `REDIS_PUBSUB_DB` is set to a different DB index (for example, `REDIS_DB=0` and `REDIS_PUBSUB_DB=1`).
502+
503+
> **Note on `REDIS_STREAMS_BLOCK_TIMEOUT`**: This configuration controls how long the consumer blocks waiting for new messages using Redis `XREADGROUP` before the call times out and retries. **Benefits of configuring this:** (1) **Resource efficiency** - Without blocking, consumers would constantly poll Redis, wasting CPU cycles and network bandwidth. Blocking allows Redis to push messages immediately when available. (2) **Latency vs CPU trade-off** - Lower values (e.g., `1s-2s`) provide faster message detection but increase CPU from frequent timeouts. Higher values (e.g., `10s-30s`) reduce CPU usage and network round-trips but may delay processing. (3) **Cost optimization** - In cloud/serverless environments, reducing CPU usage directly reduces costs. (4) **Battery efficiency** - Important for mobile/edge deployments where power consumption matters. Choose based on your latency requirements: real-time systems may use `1s-2s`, while batch processing can use `10s-30s`.
502504
503505
### Azure Event Hubs
504506
GoFr supports Event Hubs starting gofr version v1.22.0.

docs/references/configs/page.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -422,7 +422,7 @@ This document lists all the configuration options supported by the GoFr framewor
422422
---
423423

424424
- REDIS_STREAMS_BLOCK_TIMEOUT
425-
- Blocking duration for reading new messages. Enables efficient message consumption by allowing Redis to push messages immediately when available, avoiding constant polling. Lower values (1s-2s) provide faster detection but increase CPU usage. Higher values (10s-30s) reduce CPU usage, ideal for batch processing.
425+
- Blocking duration for reading new messages using Redis `XREADGROUP`. Lower values (1s-2s) provide faster detection but increase CPU usage. Higher values (10s-30s) reduce CPU usage, ideal for batch processing.
426426
- 5s
427427

428428
---
@@ -434,7 +434,9 @@ This document lists all the configuration options supported by the GoFr framewor
434434
{% /table %}
435435

436436
> If `REDIS_PUBSUB_MODE` is set to anything other than `streams` or `pubsub`, it falls back to `streams`.
437-
> If you are using GoFr migrations with Redis and Redis PubSub Streams mode together, set `REDIS_PUBSUB_DB` to a different DB than `REDIS_DB` to avoid `WRONGTYPE` errors on the `gofr_migrations` key.
437+
> If you are using GoFr migrations with Redis and Redis PubSub Streams mode together, set `REDIS_PUBSUB_DB` to a different DB than `REDIS_DB` to avoid `WRONGTYPE` errors on the `gofr_migrations` key. By default, `REDIS_DB` is `0` and `REDIS_PUBSUB_DB` is `15` (highest default Redis database), so they are already separated. If you change `REDIS_DB` from the default, ensure `REDIS_PUBSUB_DB` is set to a different DB index.
438+
439+
> **Note on `REDIS_STREAMS_BLOCK_TIMEOUT`**: This configuration controls how long the consumer blocks waiting for new messages using Redis `XREADGROUP` before the call times out and retries. **Benefits of configuring this:** (1) **Resource efficiency** - Without blocking, consumers would constantly poll Redis, wasting CPU cycles and network bandwidth. Blocking allows Redis to push messages immediately when available. (2) **Latency vs CPU trade-off** - Lower values (e.g., `1s-2s`) provide faster message detection but increase CPU from frequent timeouts. Higher values (e.g., `10s-30s`) reduce CPU usage and network round-trips but may delay processing. (3) **Cost optimization** - In cloud/serverless environments, reducing CPU usage directly reduces costs. (4) **Battery efficiency** - Important for mobile/edge deployments where power consumption matters. Choose based on your latency requirements: real-time systems may use `1s-2s`, while batch processing can use `10s-30s`.
438440
439441
### Pub/Sub
440442

pkg/gofr/container/container.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,11 @@ import (
3838
"gofr.dev/pkg/gofr/websocket"
3939
)
4040

41+
const (
42+
redisPubSubModeStreams = "streams"
43+
redisPubSubModePubSub = "pubsub"
44+
)
45+
4146
// Container is a collection of all common application level concerns. Things like Logger, Connection Pool for Redis
4247
// etc. which is shared across is placed here.
4348
type Container struct {
@@ -322,6 +327,13 @@ func (c *Container) createKafkaPubSub(conf config.Config) {
322327
}
323328

324329
partition, _ := strconv.Atoi(conf.GetOrDefault("PARTITION_SIZE", "0"))
330+
// PUBSUB_OFFSET determines the starting position for message consumption in Kafka.
331+
// This allows control over whether to read historical messages or only new ones:
332+
// - Default value -1: Start from the latest offset (only consume new messages after consumer starts)
333+
// - Value 0: Start from the earliest offset (read all historical messages from the beginning)
334+
// - Positive value: Start from a specific offset position (useful for resuming from a known point)
335+
// This is particularly important for scenarios like message replay, recovery from failures,
336+
// or when you only want to process messages that arrive after the consumer is initialized.
325337
offSet, _ := strconv.Atoi(conf.GetOrDefault("PUBSUB_OFFSET", "-1"))
326338
batchSize, _ := strconv.Atoi(conf.GetOrDefault("KAFKA_BATCH_SIZE", strconv.Itoa(kafka.DefaultBatchSize)))
327339
batchBytes, _ := strconv.Atoi(conf.GetOrDefault("KAFKA_BATCH_BYTES", strconv.Itoa(kafka.DefaultBatchBytes)))
@@ -366,11 +378,6 @@ func (c *Container) createRedisPubSub(conf config.Config) {
366378
c.PubSub = redis.NewPubSub(conf, c.Logger, c.metricsManager)
367379
}
368380

369-
const (
370-
redisPubSubModeStreams = "streams"
371-
redisPubSubModePubSub = "pubsub"
372-
)
373-
374381
func (c *Container) warnIfRedisPubSubSharesRedisDB(conf config.Config) {
375382
// Warn (do not fail): if Redis PubSub (streams mode) shares the same Redis logical DB as the primary Redis datasource,
376383
// GoFr migrations can later fail due to `gofr_migrations` key-type collision (HASH vs STREAM).

pkg/gofr/datasource/redis/health.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ func (ps *PubSub) Health() datasource.Health {
5757
}
5858

5959
addr := fmt.Sprintf("%s:%d", ps.config.HostName, ps.config.Port)
60-
res.Details["addr"] = addr
60+
res.Details["host"] = addr
6161

6262
mode := ps.config.PubSubMode
6363
if mode == "" {

pkg/gofr/datasource/redis/health_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ func TestPubSub_HealthDetails(t *testing.T) {
5656
h := client.PubSub.Health()
5757
assert.Equal(t, datasource.StatusUp, h.Status)
5858
assert.Equal(t, "REDIS", h.Details["backend"])
59-
assert.Equal(t, "localhost:6380", h.Details["addr"])
59+
assert.Equal(t, "localhost:6380", h.Details["host"])
6060
assert.Equal(t, "pubsub", h.Details["mode"])
6161

6262
assert.NoError(t, mock.ExpectationsWereMet())

pkg/gofr/datasource/redis/pubsub.go

Lines changed: 60 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -44,15 +44,25 @@ func (ps *PubSub) Publish(ctx context.Context, topic string, message []byte) err
4444

4545
// publishToChannel publishes a message to a Redis PubSub channel.
4646
func (ps *PubSub) publishToChannel(ctx context.Context, topic string, message []byte, span trace.Span) error {
47+
start := time.Now()
4748
err := ps.client.Publish(ctx, topic, message).Err()
49+
end := time.Since(start)
50+
4851
if err != nil {
4952
ps.logger.Errorf("failed to publish message to Redis channel '%s': %v", topic, err)
5053
return err
5154
}
5255

53-
traceID := span.SpanContext().TraceID().String()
5456
addr := fmt.Sprintf("%s:%d", ps.config.HostName, ps.config.Port)
55-
ps.logger.Debugf("PUB %s %s %s", topic, traceID, addr)
57+
ps.logger.Debug(&pubsub.Log{
58+
Mode: "PUB",
59+
CorrelationID: span.SpanContext().TraceID().String(),
60+
MessageValue: string(message),
61+
Topic: topic,
62+
Host: addr,
63+
PubSubBackend: "REDIS",
64+
Time: end.Microseconds(),
65+
})
5666
ps.metrics.IncrementCounter(ctx, "app_pubsub_publish_success_count", "topic", topic)
5767

5868
return nil
@@ -70,15 +80,25 @@ func (ps *PubSub) publishToStream(ctx context.Context, topic string, message []b
7080
args.Approx = true
7181
}
7282

83+
start := time.Now()
7384
_, err := ps.client.XAdd(ctx, args).Result()
85+
end := time.Since(start)
86+
7487
if err != nil {
7588
ps.logger.Errorf("failed to publish message to Redis stream '%s': %v", topic, err)
7689
return err
7790
}
7891

79-
traceID := span.SpanContext().TraceID().String()
8092
addr := fmt.Sprintf("%s:%d", ps.config.HostName, ps.config.Port)
81-
ps.logger.Debugf("PUB %s %s %s", topic, traceID, addr)
93+
ps.logger.Debug(&pubsub.Log{
94+
Mode: "PUB",
95+
CorrelationID: span.SpanContext().TraceID().String(),
96+
MessageValue: string(message),
97+
Topic: topic,
98+
Host: addr,
99+
PubSubBackend: "REDIS",
100+
Time: end.Microseconds(),
101+
})
82102
ps.metrics.IncrementCounter(ctx, "app_pubsub_publish_success_count", "topic", topic)
83103

84104
return nil
@@ -103,11 +123,28 @@ func (ps *PubSub) Subscribe(ctx context.Context, topic string) (*pubsub.Message,
103123
spanCtx, span := ps.tracer.Start(ctx, "redis-subscribe")
104124
defer span.End()
105125

106-
ps.metrics.IncrementCounter(spanCtx, "app_pubsub_subscribe_total_count", "topic", topic)
126+
// Determine mode and consumer group for metrics
127+
mode := ps.config.PubSubMode
128+
if mode == "" {
129+
mode = modeStreams
130+
}
131+
132+
var consumerGroup string
133+
if mode == modeStreams && ps.config.PubSubStreamsConfig != nil {
134+
consumerGroup = ps.config.PubSubStreamsConfig.ConsumerGroup
135+
}
136+
137+
// Increment subscribe total count with consumer_group label if using streams
138+
if consumerGroup != "" {
139+
ps.metrics.IncrementCounter(spanCtx, "app_pubsub_subscribe_total_count", "topic", topic, "consumer_group", consumerGroup)
140+
} else {
141+
ps.metrics.IncrementCounter(spanCtx, "app_pubsub_subscribe_total_count", "topic", topic)
142+
}
107143

144+
start := time.Now()
108145
msgChan := ps.ensureSubscription(ctx, topic)
109146

110-
msg := ps.waitForMessage(ctx, spanCtx, span, topic, msgChan)
147+
msg := ps.waitForMessage(ctx, spanCtx, span, topic, msgChan, start, consumerGroup)
111148

112149
return msg, nil
113150
}
@@ -175,15 +212,28 @@ func (ps *PubSub) ensureSubscription(_ context.Context, topic string) chan *pubs
175212

176213
// waitForMessage waits for a message from the channel.
177214
func (ps *PubSub) waitForMessage(ctx context.Context, spanCtx context.Context, span trace.Span,
178-
topic string, msgChan chan *pubsub.Message) *pubsub.Message {
215+
topic string, msgChan chan *pubsub.Message, start time.Time, consumerGroup string) *pubsub.Message {
179216
select {
180217
case msg := <-msgChan:
181-
ps.metrics.IncrementCounter(spanCtx, "app_pubsub_subscribe_success_count", "topic", topic)
218+
// Increment subscribe success count with consumer_group label if using streams
219+
if consumerGroup != "" {
220+
ps.metrics.IncrementCounter(spanCtx, "app_pubsub_subscribe_success_count", "topic", topic, "consumer_group", consumerGroup)
221+
} else {
222+
ps.metrics.IncrementCounter(spanCtx, "app_pubsub_subscribe_success_count", "topic", topic)
223+
}
182224

183225
if msg != nil {
184-
traceID := span.SpanContext().TraceID().String()
226+
end := time.Since(start)
185227
addr := fmt.Sprintf("%s:%d", ps.config.HostName, ps.config.Port)
186-
ps.logger.Debugf("SUB %s %s %s", topic, traceID, addr)
228+
ps.logger.Debug(&pubsub.Log{
229+
Mode: "SUB",
230+
CorrelationID: span.SpanContext().TraceID().String(),
231+
MessageValue: string(msg.Value),
232+
Topic: topic,
233+
Host: addr,
234+
PubSubBackend: "REDIS",
235+
Time: end.Microseconds(),
236+
})
187237
}
188238

189239
return msg

pkg/gofr/datasource/redis/redis.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -188,12 +188,6 @@ func (r *Redis) Close() error {
188188
func NewPubSub(conf config.Config, logger datasource.Logger, metrics Metrics) pubsub.Client {
189189
redisConfig := getRedisConfig(conf, logger)
190190

191-
// Always parse PubSub config for NewPubSub since we're explicitly creating a PubSub client.
192-
// getRedisConfig() only parses PubSub config when PUBSUB_BACKEND=REDIS, but NewPubSub() can be
193-
// called directly (not via container), so we need to ensure config is parsed here.
194-
// If it was already parsed, parsePubSubConfig() will just set the same values (idempotent).
195-
parsePubSubConfig(conf, redisConfig)
196-
197191
// if Hostname is not provided, we won't try to connect to Redis
198192
if redisConfig.HostName == "" {
199193
return nil

pkg/gofr/datasource/redis/redis_test.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ func TestRedis_QueryLogging(t *testing.T) {
4848
ctrl := gomock.NewController(t)
4949
defer ctrl.Finish()
5050

51+
// Mock Redis server setup
5152
s, err := miniredis.Run()
5253
require.NoError(t, err)
5354

@@ -73,6 +74,7 @@ func TestRedis_QueryLogging(t *testing.T) {
7374
assert.Equal(t, "OK", result)
7475
})
7576

77+
// Assertions
7678
assert.Contains(t, result, "ping")
7779
assert.Contains(t, result, "set key value ex 60")
7880
}
@@ -81,6 +83,7 @@ func TestRedis_PipelineQueryLogging(t *testing.T) {
8183
ctrl := gomock.NewController(t)
8284
defer ctrl.Finish()
8385

86+
// Mock Redis server setup
8487
s, err := miniredis.Run()
8588
require.NoError(t, err)
8689

@@ -90,6 +93,7 @@ func TestRedis_PipelineQueryLogging(t *testing.T) {
9093
mockMetric.EXPECT().RecordHistogram(gomock.Any(), "app_redis_stats", gomock.Any(),
9194
"hostname", gomock.Any(), "type", gomock.Any()).AnyTimes()
9295

96+
// Execute Redis pipeline
9397
result := testutil.StdoutOutputForFunc(func() {
9498
mockLogger := logging.NewMockLogger(logging.DEBUG)
9599
client := NewClient(config.NewMockConfig(map[string]string{
@@ -99,13 +103,16 @@ func TestRedis_PipelineQueryLogging(t *testing.T) {
99103

100104
require.NoError(t, err)
101105

106+
// Pipeline execution
102107
pipe := client.Pipeline()
103108
setCmd := pipe.Set(t.Context(), "key1", "value1", 1*time.Minute)
104109
getCmd := pipe.Get(t.Context(), "key1")
105110

111+
// Pipeline Exec should return a non-nil error
106112
_, err = pipe.Exec(t.Context())
107113
require.NoError(t, err)
108114

115+
// Retrieve results
109116
setResult, err := setCmd.Result()
110117
require.NoError(t, err)
111118
assert.Equal(t, "OK", setResult)
@@ -115,6 +122,7 @@ func TestRedis_PipelineQueryLogging(t *testing.T) {
115122
assert.Equal(t, "value1", getResult)
116123
})
117124

125+
// Assertions
118126
assert.Contains(t, result, "ping")
119127
assert.Contains(t, result, "set key1 value1 ex 60: OK")
120128
}
@@ -123,11 +131,13 @@ func TestRedis_Close(t *testing.T) {
123131
ctrl := gomock.NewController(t)
124132
defer ctrl.Finish()
125133

134+
// Mock Redis server setup
126135
s, err := miniredis.Run()
127136
require.NoError(t, err)
128137

129138
defer s.Close()
130139

140+
// Mock metrics setup
131141
mockMetric := NewMockMetrics(ctrl)
132142
mockMetric.EXPECT().RecordHistogram(gomock.Any(), "app_redis_stats", gomock.Any(), "hostname",
133143
gomock.Any(), "type", gomock.Any()).AnyTimes()

0 commit comments

Comments
 (0)