diff --git a/extension/storage/redisstorageextension/config.go b/extension/storage/redisstorageextension/config.go index bb291ac41d968..1f30a3b36c65a 100644 --- a/extension/storage/redisstorageextension/config.go +++ b/extension/storage/redisstorageextension/config.go @@ -12,10 +12,19 @@ import ( // Config defines configuration for the Redis storage extension. type Config struct { - Endpoint string `mapstructure:"endpoint"` - Password configopaque.String `mapstructure:"password"` - DB int `mapstructure:"db"` - Expiration time.Duration `mapstructure:"expiration"` - Prefix string `mapstructure:"prefix"` - TLS configtls.ClientConfig `mapstructure:"tls,omitempty"` + Endpoint string `mapstructure:"endpoint"` + Password configopaque.String `mapstructure:"password"` + DB int `mapstructure:"db"` + Expiration time.Duration `mapstructure:"expiration"` + Prefix string `mapstructure:"prefix"` + TLS configtls.ClientConfig `mapstructure:"tls,omitempty"` + MaxRetries int `mapstructure:"max_retries"` + RetryDelay time.Duration `mapstructure:"retry_delay"` + DialTimeout time.Duration `mapstructure:"dial_timeout"` + ReadTimeout time.Duration `mapstructure:"read_timeout"` + WriteTimeout time.Duration `mapstructure:"write_timeout"` + PoolTimeout time.Duration `mapstructure:"pool_timeout"` + MinRetryBackoff time.Duration `mapstructure:"min_retry_backoff"` + MaxRetryBackoff time.Duration `mapstructure:"max_retry_backoff"` + PingTimeout time.Duration `mapstructure:"ping_timeout"` } diff --git a/extension/storage/redisstorageextension/config_test.go b/extension/storage/redisstorageextension/config_test.go index ff0b0d379671d..5788ca37d0346 100644 --- a/extension/storage/redisstorageextension/config_test.go +++ b/extension/storage/redisstorageextension/config_test.go @@ -35,16 +35,16 @@ func TestLoadConfig(t *testing.T) { }, { id: component.NewIDWithName(metadata.Type, "all_settings"), - expected: &Config{ - Endpoint: "localhost:1234", - Password: "passwd", - DB: 1, - Expiration: 3 * time.Hour, - Prefix: "test_", - TLS: configtls.ClientConfig{ - Insecure: true, - }, - }, + expected: func() component.Config { + cfg := createDefaultConfig().(*Config) + cfg.Endpoint = "localhost:1234" + cfg.Password = "passwd" + cfg.DB = 1 + cfg.Expiration = 3 * time.Hour + cfg.Prefix = "test_" + cfg.TLS = configtls.ClientConfig{Insecure: true} + return cfg + }(), }, } for _, tt := range tests { diff --git a/extension/storage/redisstorageextension/extension.go b/extension/storage/redisstorageextension/extension.go index fd4c7df5b259e..1ac0be93e78c2 100644 --- a/extension/storage/redisstorageextension/extension.go +++ b/extension/storage/redisstorageextension/extension.go @@ -7,6 +7,7 @@ import ( "context" "errors" "fmt" + "net" "time" "github.com/redis/go-redis/v9" @@ -25,6 +26,8 @@ type redisStorage struct { // Ensure this storage extension implements the appropriate interface var _ storage.Extension = (*redisStorage)(nil) +var newRedisClient = redis.NewClient + func newRedisStorage(logger *zap.Logger, config *Config) (extension.Extension, error) { return &redisStorage{ cfg: config, @@ -38,14 +41,49 @@ func (rs *redisStorage) Start(ctx context.Context, _ component.Host) error { if err != nil { return err } - c := redis.NewClient(&redis.Options{ - Addr: rs.cfg.Endpoint, - Password: string(rs.cfg.Password), - DB: rs.cfg.DB, - TLSConfig: tlsConfig, - }) - rs.client = c - return nil + + var lastErr error + + for attempt := 0; attempt < rs.cfg.MaxRetries; attempt++ { + dialer := &net.Dialer{ + Timeout: rs.cfg.DialTimeout, + } + + c := newRedisClient(&redis.Options{ + Addr: rs.cfg.Endpoint, + Password: string(rs.cfg.Password), + DB: rs.cfg.DB, + TLSConfig: tlsConfig, + Dialer: dialer.DialContext, + DialTimeout: rs.cfg.DialTimeout, + ReadTimeout: rs.cfg.ReadTimeout, + WriteTimeout: rs.cfg.WriteTimeout, + PoolTimeout: rs.cfg.PoolTimeout, + MaxRetries: 0, + MinRetryBackoff: rs.cfg.MinRetryBackoff, + MaxRetryBackoff: rs.cfg.MaxRetryBackoff, + }) + + pingCtx, cancel := context.WithTimeout(ctx, rs.cfg.PingTimeout) + err := c.Ping(pingCtx).Err() + cancel() + + if err == nil { + rs.client = c + rs.logger.Info("Successfully connected to Redis", zap.String("endpoint", rs.cfg.Endpoint)) + return nil + } + + c.Close() + lastErr = err + + if attempt < rs.cfg.MaxRetries-1 { + rs.logger.Info("Redis connection attempt failed, retrying...", zap.String("endpoint", rs.cfg.Endpoint), zap.Int("attempt", attempt+1), zap.Error(err)) + time.Sleep(rs.cfg.RetryDelay) + } + } + + return fmt.Errorf("failed to connect to Redis at %s after %d attempts: %w", rs.cfg.Endpoint, rs.cfg.MaxRetries, lastErr) } // Shutdown will close any open databases diff --git a/extension/storage/redisstorageextension/extension_test.go b/extension/storage/redisstorageextension/extension_test.go index e5c64c666e1ba..89c7c095a91ce 100644 --- a/extension/storage/redisstorageextension/extension_test.go +++ b/extension/storage/redisstorageextension/extension_test.go @@ -4,15 +4,19 @@ package redisstorageextension import ( + "errors" "sync" "testing" + "time" "github.com/go-redis/redismock/v9" + "github.com/redis/go-redis/v9" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/extension/extensiontest" "go.opentelemetry.io/collector/extension/xextension/storage" + "go.uber.org/zap" ) func TestExtensionIntegrity(t *testing.T) { @@ -187,6 +191,7 @@ func TestTwoClientsWithDifferentNames(t *testing.T) { func TestRedisKey(t *testing.T) { t.Run("batch operations", func(t *testing.T) { mockedClient, mock := redismock.NewClientMock() + t.Cleanup(func() { require.NoError(t, mockedClient.Close()) }) ctx := t.Context() client := redisClient{ client: mockedClient, @@ -208,6 +213,7 @@ func TestRedisKey(t *testing.T) { t.Run("single operations", func(t *testing.T) { mockedClient, mock := redismock.NewClientMock() + t.Cleanup(func() { require.NoError(t, mockedClient.Close()) }) ctx := t.Context() client := redisClient{ client: mockedClient, @@ -315,3 +321,119 @@ func newTestExtension(t *testing.T) storage.Extension { func newTestEntity(name string) component.ID { return component.MustNewIDWithName("nop", name) } + +func TestStartRetriesThenFails(t *testing.T) { + defer func(prev func(opt *redis.Options) *redis.Client) { newRedisClient = prev }(newRedisClient) + + failErrs := []error{errors.New("ping1"), errors.New("ping2"), errors.New("ping3")} + clients := make([]*redis.Client, len(failErrs)) + mocks := make([]redismock.ClientMock, len(failErrs)) + for i := range failErrs { + c, m := redismock.NewClientMock() + m.ExpectPing().SetErr(failErrs[i]) + clients[i] = c + mocks[i] = m + } + t.Cleanup(func() { + for _, c := range clients { + if err := c.Close(); err != nil && !errors.Is(err, redis.ErrClosed) { + require.NoError(t, err) + } + } + }) + + var idx int + newRedisClient = func(opt *redis.Options) *redis.Client { + require.Less(t, idx, len(clients)) + c := clients[idx] + idx++ + return c + } + + cfg := &Config{ + Endpoint: "localhost:6379", + MaxRetries: len(failErrs), + RetryDelay: time.Millisecond, + DialTimeout: time.Millisecond, + ReadTimeout: time.Millisecond, + WriteTimeout: time.Millisecond, + PoolTimeout: time.Millisecond, + MinRetryBackoff: time.Millisecond, + MaxRetryBackoff: 2 * time.Millisecond, + PingTimeout: time.Millisecond, + } + + rs := &redisStorage{cfg: cfg, logger: zap.NewNop()} + err := rs.Start(t.Context(), componenttest.NewNopHost()) + require.Error(t, err) + require.ErrorIs(t, err, failErrs[len(failErrs)-1]) + + for _, m := range mocks { + require.NoError(t, m.ExpectationsWereMet()) + } +} + +func TestStartSucceedsAfterRetry(t *testing.T) { + defer func(prev func(opt *redis.Options) *redis.Client) { newRedisClient = prev }(newRedisClient) + + firstClient, firstMock := redismock.NewClientMock() + firstMock.ExpectPing().SetErr(errors.New("ping fail")) + + secondClient, secondMock := redismock.NewClientMock() + secondMock.ExpectPing().SetVal("PONG") + t.Cleanup(func() { + if err := firstClient.Close(); err != nil && !errors.Is(err, redis.ErrClosed) { + require.NoError(t, err) + } + if err := secondClient.Close(); err != nil && !errors.Is(err, redis.ErrClosed) { + require.NoError(t, err) + } + }) + + clients := []*redis.Client{firstClient, secondClient} + mocks := []redismock.ClientMock{firstMock, secondMock} + + var idx int + newRedisClient = func(opt *redis.Options) *redis.Client { + require.Less(t, idx, len(clients)) + c := clients[idx] + idx++ + return c + } + + cfg := &Config{ + Endpoint: "localhost:6379", + MaxRetries: 2, + RetryDelay: time.Millisecond, + DialTimeout: time.Millisecond, + ReadTimeout: time.Millisecond, + WriteTimeout: time.Millisecond, + PoolTimeout: time.Millisecond, + MinRetryBackoff: time.Millisecond, + MaxRetryBackoff: 2 * time.Millisecond, + PingTimeout: time.Millisecond, + } + + rs := &redisStorage{cfg: cfg, logger: zap.NewNop()} + err := rs.Start(t.Context(), componenttest.NewNopHost()) + require.NoError(t, err) + require.Equal(t, secondClient, rs.client) + require.NoError(t, rs.Shutdown(t.Context())) + + for _, m := range mocks { + require.NoError(t, m.ExpectationsWereMet()) + } +} + +func TestDefaultConfigHasTimeouts(t *testing.T) { + cfg := createDefaultConfig().(*Config) + require.Greater(t, cfg.MaxRetries, 0) + require.NotZero(t, cfg.RetryDelay) + require.NotZero(t, cfg.DialTimeout) + require.NotZero(t, cfg.ReadTimeout) + require.NotZero(t, cfg.WriteTimeout) + require.NotZero(t, cfg.PoolTimeout) + require.NotZero(t, cfg.MinRetryBackoff) + require.NotZero(t, cfg.MaxRetryBackoff) + require.NotZero(t, cfg.PingTimeout) +} diff --git a/extension/storage/redisstorageextension/factory.go b/extension/storage/redisstorageextension/factory.go index a47b7b73c0543..406b635952a7b 100644 --- a/extension/storage/redisstorageextension/factory.go +++ b/extension/storage/redisstorageextension/factory.go @@ -5,6 +5,7 @@ package redisstorageextension // import "github.com/open-telemetry/opentelemetry import ( "context" + "time" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configtls" @@ -28,6 +29,15 @@ func createDefaultConfig() component.Config { TLS: configtls.ClientConfig{ Insecure: false, }, + MaxRetries: 10, + RetryDelay: 2 * time.Second, + DialTimeout: 30 * time.Second, + ReadTimeout: 30 * time.Second, + WriteTimeout: 30 * time.Second, + PoolTimeout: 30 * time.Second, + MinRetryBackoff: 100 * time.Millisecond, + MaxRetryBackoff: 2 * time.Second, + PingTimeout: 30 * time.Second, } } diff --git a/extension/storage/redisstorageextension/factory_test.go b/extension/storage/redisstorageextension/factory_test.go index d76a05569667d..f3b94c758d775 100644 --- a/extension/storage/redisstorageextension/factory_test.go +++ b/extension/storage/redisstorageextension/factory_test.go @@ -4,8 +4,11 @@ package redisstorageextension import ( + "errors" "testing" + "github.com/go-redis/redismock/v9" + "github.com/redis/go-redis/v9" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/configtls" @@ -22,18 +25,32 @@ func TestFactory(t *testing.T) { { name: "Default", config: func() *Config { - return &Config{ - Endpoint: "localhost:6379", - TLS: configtls.ClientConfig{ - Insecure: true, - }, + cfg := createDefaultConfig().(*Config) + cfg.Endpoint = "localhost:6379" + cfg.TLS = configtls.ClientConfig{ + Insecure: true, } + return cfg }(), }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { + prev := newRedisClient + t.Cleanup(func() { newRedisClient = prev }) + + mockedClient, mock := redismock.NewClientMock() + mock.ExpectPing().SetVal("PONG") + t.Cleanup(func() { + if err := mockedClient.Close(); err != nil && !errors.Is(err, redis.ErrClosed) { + require.NoError(t, err) + } + }) + newRedisClient = func(opt *redis.Options) *redis.Client { + return mockedClient + } + e, err := f.Create( t.Context(), extensiontest.NewNopSettings(f.Type()), @@ -44,6 +61,7 @@ func TestFactory(t *testing.T) { ctx := t.Context() require.NoError(t, e.Start(ctx, componenttest.NewNopHost())) require.NoError(t, e.Shutdown(ctx)) + require.NoError(t, mock.ExpectationsWereMet()) }) } } diff --git a/extension/storage/redisstorageextension/generated_package_test.go b/extension/storage/redisstorageextension/generated_package_test.go index 5378f0ceecbed..a441c1ad6a2f8 100644 --- a/extension/storage/redisstorageextension/generated_package_test.go +++ b/extension/storage/redisstorageextension/generated_package_test.go @@ -9,5 +9,7 @@ import ( ) func TestMain(m *testing.M) { - goleak.VerifyTestMain(m) + goleak.VerifyTestMain(m, + goleak.IgnoreTopFunction("github.com/redis/go-redis/v9/maintnotifications.(*CircuitBreakerManager).cleanupLoop"), + ) }