Skip to content

Commit 8eeaf70

Browse files
Merge pull request #193 from menloresearch/add-redis-universal-connections
Switch to use Redis Universal connections
2 parents 55814f9 + e0f707d commit 8eeaf70

2 files changed

Lines changed: 91 additions & 41 deletions

File tree

apps/jan-api-gateway/application/app/infrastructure/cache/redis_cache_service.go

Lines changed: 84 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -3,45 +3,46 @@ package cache
33
import (
44
"context"
55
"fmt"
6-
"strconv"
6+
"strings"
77
"time"
88

99
"github.com/go-redsync/redsync/v4"
10+
"github.com/go-redsync/redsync/v4/redis/goredis/v9"
1011
"github.com/redis/go-redis/v9"
1112
"menlo.ai/jan-api-gateway/app/utils/logger"
1213
"menlo.ai/jan-api-gateway/config/environment_variables"
1314
)
1415

15-
// RedisCacheService provides caching functionality using Redis
1616
type RedisCacheService struct {
17-
client *redis.Client
17+
client redis.UniversalClient
1818
rs *redsync.Redsync
1919
}
2020

21-
// NewRedisCacheService creates a new Redis cache service
2221
func NewRedisCacheService() *RedisCacheService {
2322
redisURL := environment_variables.EnvironmentVariables.REDIS_URL
2423
if redisURL == "" {
2524
panic("REDIS_URL environment variable must be set")
2625
}
2726

28-
opts, err := redis.ParseURL(redisURL)
27+
opts, err := buildUniversalOptions(redisURL)
2928
if err != nil {
3029
panic(fmt.Sprintf("failed to parse Redis URL: %v", err))
3130
}
3231

33-
if environment_variables.EnvironmentVariables.REDIS_PASSWORD != "" {
34-
opts.Password = environment_variables.EnvironmentVariables.REDIS_PASSWORD
32+
if pwd := environment_variables.EnvironmentVariables.REDIS_PASSWORD; pwd != "" {
33+
opts.Password = pwd
3534
}
36-
if environment_variables.EnvironmentVariables.REDIS_DB != "" {
37-
if db, err := strconv.Atoi(environment_variables.EnvironmentVariables.REDIS_DB); err == nil {
38-
opts.DB = db
39-
} else {
40-
panic(fmt.Sprintf("invalid REDIS_DB value: %v", err))
41-
}
35+
36+
if dbVal := environment_variables.EnvironmentVariables.REDIS_DB; dbVal != 0 {
37+
opts.DB = dbVal
38+
}
39+
40+
if len(opts.Addrs) > 1 && opts.DB != 0 {
41+
logger.GetLogger().Warn("Ignoring non-zero REDIS_DB when using Redis Cluster configuration")
42+
opts.DB = 0
4243
}
4344

44-
client := redis.NewClient(opts)
45+
client := redis.NewUniversalClient(opts)
4546

4647
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
4748
defer cancel()
@@ -52,17 +53,83 @@ func NewRedisCacheService() *RedisCacheService {
5253

5354
logger.GetLogger().Info("Successfully connected to Redis")
5455

56+
rs := redsync.New(goredis.NewPool(client))
57+
5558
return &RedisCacheService{
5659
client: client,
60+
rs: rs,
5761
}
5862
}
5963

60-
// Set stores a string value in Redis with an expiration time
64+
func buildUniversalOptions(raw string) (*redis.UniversalOptions, error) {
65+
parts := strings.Split(raw, ",")
66+
opts := &redis.UniversalOptions{}
67+
68+
for _, part := range parts {
69+
part = strings.TrimSpace(part)
70+
if part == "" {
71+
continue
72+
}
73+
74+
if strings.Contains(part, "://") {
75+
parsed, err := redis.ParseURL(part)
76+
if err != nil {
77+
return nil, err
78+
}
79+
80+
opts.Addrs = append(opts.Addrs, parsed.Addr)
81+
82+
if opts.Username == "" {
83+
opts.Username = parsed.Username
84+
}
85+
86+
if opts.Password == "" {
87+
opts.Password = parsed.Password
88+
}
89+
90+
if opts.DB == 0 {
91+
opts.DB = parsed.DB
92+
}
93+
94+
if opts.TLSConfig == nil {
95+
opts.TLSConfig = parsed.TLSConfig
96+
}
97+
98+
if opts.ReadTimeout == 0 {
99+
opts.ReadTimeout = parsed.ReadTimeout
100+
}
101+
102+
if opts.WriteTimeout == 0 {
103+
opts.WriteTimeout = parsed.WriteTimeout
104+
}
105+
106+
if opts.DialTimeout == 0 {
107+
opts.DialTimeout = parsed.DialTimeout
108+
}
109+
110+
if opts.PoolSize == 0 {
111+
opts.PoolSize = parsed.PoolSize
112+
}
113+
114+
if opts.MinIdleConns == 0 {
115+
opts.MinIdleConns = parsed.MinIdleConns
116+
}
117+
} else {
118+
opts.Addrs = append(opts.Addrs, part)
119+
}
120+
}
121+
122+
if len(opts.Addrs) == 0 {
123+
return nil, fmt.Errorf("no Redis addresses provided")
124+
}
125+
126+
return opts, nil
127+
}
128+
61129
func (r *RedisCacheService) Set(ctx context.Context, key string, value string, expiration time.Duration) error {
62130
return r.client.Set(ctx, key, value, expiration).Err()
63131
}
64132

65-
// Get retrieves a string value from Redis
66133
func (r *RedisCacheService) Get(ctx context.Context, key string) (string, error) {
67134
val, err := r.client.Get(ctx, key).Result()
68135
if err != nil {
@@ -75,40 +142,32 @@ func (r *RedisCacheService) Get(ctx context.Context, key string) (string, error)
75142
return val, nil
76143
}
77144

78-
// GetWithFallback retrieves a string value from Redis, or executes fallback function if not found
79145
func (r *RedisCacheService) GetWithFallback(ctx context.Context, key string, fallback func() (string, error), expiration time.Duration) (string, error) {
80-
// Try to get from cache first
81146
result, err := r.Get(ctx, key)
82147
if err == nil {
83-
return result, nil // Found in cache
148+
return result, nil
84149
}
85150

86-
// Cache miss, execute fallback
87151
result, err = fallback()
88152
if err != nil {
89153
return "", fmt.Errorf("fallback function failed: %w", err)
90154
}
91155

92-
// Store in cache for future requests
93156
if err := r.Set(ctx, key, result, expiration); err != nil {
94157
logger.GetLogger().Error(fmt.Sprintf("Failed to cache value: %v", err))
95-
// Don't return error, just log it
96158
}
97159

98160
return result, nil
99161
}
100162

101-
// Delete removes a key from Redis synchronously (blocking)
102163
func (r *RedisCacheService) Delete(ctx context.Context, key string) error {
103164
return r.client.Del(ctx, key).Err()
104165
}
105166

106-
// Unlink removes a key from Redis asynchronously (non-blocking)
107167
func (r *RedisCacheService) Unlink(ctx context.Context, key string) error {
108168
return r.client.Unlink(ctx, key).Err()
109169
}
110170

111-
// DeletePattern removes all keys matching a pattern
112171
func (r *RedisCacheService) DeletePattern(ctx context.Context, pattern string) error {
113172
var cursor uint64
114173
for {
@@ -133,7 +192,6 @@ func (r *RedisCacheService) DeletePattern(ctx context.Context, pattern string) e
133192
return nil
134193
}
135194

136-
// Exists checks if a key exists in Redis
137195
func (r *RedisCacheService) Exists(ctx context.Context, key string) (bool, error) {
138196
result, err := r.client.Exists(ctx, key).Result()
139197
if err != nil {
@@ -142,37 +200,29 @@ func (r *RedisCacheService) Exists(ctx context.Context, key string) (bool, error
142200
return result > 0, nil
143201
}
144202

145-
// Close closes the Redis connection
146203
func (r *RedisCacheService) Close() error {
147204
return r.client.Close()
148205
}
149206

150-
// HealthCheck verifies Redis connectivity
151207
func (r *RedisCacheService) HealthCheck(ctx context.Context) error {
152208
return r.client.Ping(ctx).Err()
153209
}
154210

155-
// NewMutex creates a new distributed mutex using go-redsync
156211
func (r *RedisCacheService) NewMutex(name string, options ...redsync.Option) *redsync.Mutex {
157212
return r.rs.NewMutex(name, options...)
158213
}
159214

160-
// WithLock executes a function with a distributed lock using go-redsync
161215
func WithLock(cache RedisCacheService, lockName string, fn func() error, ttl time.Duration) error {
162216
mutex := cache.NewMutex(lockName, redsync.WithExpiry(ttl))
163217

164-
// Acquire lock
165218
if err := mutex.Lock(); err != nil {
166219
return err
167220
}
168221

169-
// Ensure lock is released
170222
defer func() {
171223
if _, err := mutex.Unlock(); err != nil {
172-
// swallow unlock error
173224
}
174225
}()
175226

176-
// Execute the function
177227
return fn()
178228
}

apps/jan-api-gateway/application/config/environment_variables/env.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
package environment_variables
22

33
import (
4-
"fmt"
54
"os"
65
"reflect"
76
"strconv"
87
"strings"
98
"time"
109

10+
"menlo.ai/jan-api-gateway/app/utils/logger"
1111
"menlo.ai/jan-api-gateway/config"
1212
)
1313

@@ -32,7 +32,7 @@ type EnvironmentVariable struct {
3232
// Redis configuration
3333
REDIS_URL string
3434
REDIS_PASSWORD string
35-
REDIS_DB string
35+
REDIS_DB int
3636
}
3737

3838
func (ev *EnvironmentVariable) LoadFromEnv() {
@@ -43,7 +43,7 @@ func (ev *EnvironmentVariable) LoadFromEnv() {
4343
envKey := field.Name
4444
envValue := os.Getenv(envKey)
4545
if envValue == "" {
46-
fmt.Printf("Missing SYSENV: %s\n", envKey)
46+
logger.GetLogger().Warnf("Missing SYSENV: %s", envKey)
4747
}
4848
if envValue != "" {
4949
switch v.Field(i).Kind() {
@@ -52,14 +52,14 @@ func (ev *EnvironmentVariable) LoadFromEnv() {
5252
case reflect.Int:
5353
intV, err := strconv.Atoi(envValue)
5454
if err != nil {
55-
fmt.Printf("Invalid int value for %s: %s\n", envKey, envValue)
55+
logger.GetLogger().Errorf("Invalid int value for %s: %s", envKey, envValue)
5656
} else {
5757
v.Field(i).SetInt(int64(intV))
5858
}
5959
case reflect.Bool:
6060
boolVal, err := strconv.ParseBool(envValue)
6161
if err != nil {
62-
fmt.Printf("Invalid boolean value for %s: %s\n", envKey, envValue)
62+
logger.GetLogger().Errorf("Invalid boolean value for %s: %s", envKey, envValue)
6363
} else {
6464
v.Field(i).SetBool(boolVal)
6565
}
@@ -70,10 +70,10 @@ func (ev *EnvironmentVariable) LoadFromEnv() {
7070
hosts := strings.Split(envValue, ",")
7171
v.Field(i).Set(reflect.ValueOf(hosts))
7272
} else {
73-
fmt.Printf("Unsupported slice type for %s\n", field.Name)
73+
logger.GetLogger().Errorf("Unsupported slice type for %s", field.Name)
7474
}
7575
default:
76-
fmt.Printf("Unsupported field type: %s\n", field.Name)
76+
logger.GetLogger().Errorf("Unsupported field type: %s", field.Name)
7777
}
7878
}
7979
}

0 commit comments

Comments
 (0)