Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
f3e9126
lazy cluster topology reload
ndyakov Nov 24, 2025
6ed2349
Merge branch 'ndyakov/feature/CAE-1313-maint-cluster' into ndyakov/CA…
ndyakov Dec 1, 2025
e9da546
fix discrepancies between options structs
ndyakov Dec 2, 2025
644a1aa
Merge branch 'ndyakov/feature/CAE-1313-maint-cluster' into ndyakov/CA…
ndyakov Dec 2, 2025
fc05874
Update osscluster_lazy_reload_test.go
ndyakov Dec 2, 2025
f3bab0d
Update osscluster.go
ndyakov Dec 2, 2025
60c6edc
wip fault with mock proxy
ndyakov Dec 4, 2025
1902cbb
Merge branch 'ndyakov/feature/CAE-1313-maint-cluster' into ndyakov/CA…
ndyakov Dec 4, 2025
853c7f2
Merge branch 'ndyakov/feature/CAE-1313-maint-cluster' into ndyakov/CA…
ndyakov Dec 4, 2025
908d602
make lint happy
ndyakov Dec 4, 2025
aa1f970
fix linter issues
ndyakov Dec 5, 2025
e097b3e
faster tests with mocks
ndyakov Dec 5, 2025
ce7fb37
linter once again
ndyakov Dec 5, 2025
e390e2d
Merge branch 'ndyakov/feature/CAE-1313-maint-cluster' into ndyakov/CA…
ndyakov Dec 5, 2025
3923484
add complex node test
ndyakov Dec 5, 2025
7971f01
add ci e2e
ndyakov Dec 5, 2025
8e4ea91
use correct redis container
ndyakov Dec 6, 2025
7decf55
e2e fix
ndyakov Dec 6, 2025
9c0d34d
additional e2e tests
ndyakov Dec 11, 2025
27b0f87
Merge branch 'ndyakov/feature/CAE-1313-maint-cluster' into ndyakov/ad…
ndyakov Dec 11, 2025
a99127f
fix data race
ndyakov Dec 11, 2025
8eef599
Merge branch 'ndyakov/feature/CAE-1313-maint-cluster' into ndyakov/ad…
ndyakov Dec 16, 2025
115ad8a
fix random shard picker
ndyakov Dec 17, 2025
4ec6958
fix e2e tests
ndyakov Dec 17, 2025
6deefcf
fix for empty endpoint
ndyakov Dec 17, 2025
0ce0fdc
Merge branch 'ndyakov/feature/CAE-1313-maint-cluster' into ndyakov/ad…
ndyakov Jan 12, 2026
341847e
fix case when semaphore is full, but still need to check idle
ndyakov Jan 12, 2026
b14e4b2
scenario tests
ndyakov Jan 22, 2026
01269e2
create database from config
ndyakov Jan 23, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions internal/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,9 +239,9 @@ func (p *ConnPool) checkMinIdleConns() {
for p.poolSize.Load() < p.cfg.PoolSize && p.idleConnsLen.Load() < p.cfg.MinIdleConns {
// Try to acquire a semaphore token
if !p.semaphore.TryAcquire() {
// Semaphore is full, can't create more connections
p.idleCheckInProgress.Store(false)
return
// Semaphore is full, can't create more connections right now
// Break out of inner loop to check if we need to retry
break
}

p.poolSize.Add(1)
Expand Down
166 changes: 165 additions & 1 deletion maintnotifications/e2e/fault_injector.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,82 @@ const (
ActionFailover ActionType = "failover"
ActionMigrate ActionType = "migrate"
ActionBind ActionType = "bind"

// Slot migrate action (OSS Cluster API testing)
ActionSlotMigrate ActionType = "slot_migrate"
)

// SlotMigrateEffect represents the effect type for slot migration
type SlotMigrateEffect string

const (
// SlotMigrateEffectRemoveAdd migrates all shards from source node to empty node
// Result: One endpoint removed, one endpoint added
SlotMigrateEffectRemoveAdd SlotMigrateEffect = "remove-add"

// SlotMigrateEffectRemove migrates all shards from source node to existing node
// Result: One endpoint removed
SlotMigrateEffectRemove SlotMigrateEffect = "remove"

// SlotMigrateEffectAdd migrates one shard to empty node
// Result: One endpoint added
SlotMigrateEffectAdd SlotMigrateEffect = "add"

// SlotMigrateEffectSlotShuffle migrates one shard between existing nodes
// Result: Slots move, endpoints unchanged
SlotMigrateEffectSlotShuffle SlotMigrateEffect = "slot-shuffle"
)

// SlotMigrateVariant represents the mechanism to achieve the slot migration effect
type SlotMigrateVariant string

const (
// SlotMigrateVariantDefault is an alias for migrate
SlotMigrateVariantDefault SlotMigrateVariant = "default"

// SlotMigrateVariantMigrate uses rladmin migrate to move shards
SlotMigrateVariantMigrate SlotMigrateVariant = "migrate"

// SlotMigrateVariantMaintenanceMode puts node in maintenance mode
// Only supported for remove-add and remove effects
SlotMigrateVariantMaintenanceMode SlotMigrateVariant = "maintenance_mode"

// SlotMigrateVariantFailover triggers failover to swap master/replica roles
// Requires replication to be enabled
SlotMigrateVariantFailover SlotMigrateVariant = "failover"
)

// SlotMigrateRequest represents a request to trigger a slot migration
type SlotMigrateRequest struct {
Effect SlotMigrateEffect `json:"effect"`
BdbID string `json:"bdb_id"`
ClusterIndex int `json:"cluster_index,omitempty"`
Variant SlotMigrateVariant `json:"variant,omitempty"`
SourceNode *int `json:"source_node,omitempty"`
TargetNode *int `json:"target_node,omitempty"`
}

// SlotMigrateTrigger represents a trigger configuration for slot migration
type SlotMigrateTrigger struct {
Name string `json:"name"`
Description string `json:"description"`
Requirements []SlotMigrateTriggerRequirement `json:"requirements"`
}

// SlotMigrateTriggerRequirement represents database configuration requirements
type SlotMigrateTriggerRequirement struct {
DBConfig map[string]interface{} `json:"dbconfig"`
Cluster map[string]interface{} `json:"cluster"`
Description string `json:"description"`
}

// SlotMigrateTriggersResponse represents the response from GET /slot-migrate
type SlotMigrateTriggersResponse struct {
Effect SlotMigrateEffect `json:"effect"`
Cluster map[string]interface{} `json:"cluster"`
Triggers []SlotMigrateTrigger `json:"triggers"`
}

// ActionStatus represents the status of an action
type ActionStatus string

Expand Down Expand Up @@ -209,7 +283,7 @@ func (c *FaultInjectorClient) TriggerClusterReshard(ctx context.Context, slots [
})
}

// TriggerSlotMigration triggers migration of specific slots
// TriggerSlotMigration triggers migration of specific slots (legacy API)
func (c *FaultInjectorClient) TriggerSlotMigration(ctx context.Context, startSlot, endSlot int, sourceNode, targetNode string) (*ActionResponse, error) {
return c.TriggerAction(ctx, ActionRequest{
Type: ActionSlotMigration,
Expand All @@ -222,6 +296,96 @@ func (c *FaultInjectorClient) TriggerSlotMigration(ctx context.Context, startSlo
})
}

// Slot Migrate Actions (OSS Cluster API Testing)
// These methods use the /slot-migrate endpoint for testing cluster topology changes

// GetSlotMigrateTriggers returns available triggers for a slot migration effect
// This is useful for discovering what database configurations are needed for each effect/variant
func (c *FaultInjectorClient) GetSlotMigrateTriggers(ctx context.Context, effect SlotMigrateEffect, clusterIndex int) (*SlotMigrateTriggersResponse, error) {
var response SlotMigrateTriggersResponse
path := fmt.Sprintf("/slot-migrate?effect=%s&cluster_index=%d", effect, clusterIndex)
err := c.request(ctx, "GET", path, nil, &response)
return &response, err
}

// TriggerSlotMigrate triggers a slot migration with the specified effect and variant
// This is the new API for testing OSS Cluster API client behavior during endpoint changes
//
// Effects:
// - remove-add: One endpoint removed, one added (migrate all shards to empty node)
// - remove: One endpoint removed (migrate all shards to existing node)
// - add: One endpoint added (migrate one shard to empty node)
// - slot-shuffle: Slots moved without endpoint change (migrate one shard between existing nodes)
//
// Variants:
// - default/migrate: Use rladmin migrate to move shards
// - maintenance_mode: Put node in maintenance mode (only for remove-add, remove)
// - failover: Trigger failover to swap master/replica roles (requires replication)
func (c *FaultInjectorClient) TriggerSlotMigrate(ctx context.Context, req SlotMigrateRequest) (*ActionResponse, error) {
var response ActionResponse

// Build query parameters
path := fmt.Sprintf("/slot-migrate?effect=%s&bdb_id=%s&cluster_index=%d",
req.Effect, req.BdbID, req.ClusterIndex)

if req.Variant != "" {
path += fmt.Sprintf("&variant=%s", req.Variant)
}
if req.SourceNode != nil {
path += fmt.Sprintf("&source_node=%d", *req.SourceNode)
}
if req.TargetNode != nil {
path += fmt.Sprintf("&target_node=%d", *req.TargetNode)
}

err := c.request(ctx, "POST", path, nil, &response)
return &response, err
}

// TriggerSlotMigrateRemoveAdd triggers a remove-add slot migration
// This migrates all shards from source node to an empty node
// Result: One endpoint removed, one endpoint added
func (c *FaultInjectorClient) TriggerSlotMigrateRemoveAdd(ctx context.Context, bdbID string, variant SlotMigrateVariant) (*ActionResponse, error) {
return c.TriggerSlotMigrate(ctx, SlotMigrateRequest{
Effect: SlotMigrateEffectRemoveAdd,
BdbID: bdbID,
Variant: variant,
})
}

// TriggerSlotMigrateRemove triggers a remove slot migration
// This migrates all shards from source node to an existing node
// Result: One endpoint removed
func (c *FaultInjectorClient) TriggerSlotMigrateRemove(ctx context.Context, bdbID string, variant SlotMigrateVariant) (*ActionResponse, error) {
return c.TriggerSlotMigrate(ctx, SlotMigrateRequest{
Effect: SlotMigrateEffectRemove,
BdbID: bdbID,
Variant: variant,
})
}

// TriggerSlotMigrateAdd triggers an add slot migration
// This migrates one shard to an empty node
// Result: One endpoint added
func (c *FaultInjectorClient) TriggerSlotMigrateAdd(ctx context.Context, bdbID string, variant SlotMigrateVariant) (*ActionResponse, error) {
return c.TriggerSlotMigrate(ctx, SlotMigrateRequest{
Effect: SlotMigrateEffectAdd,
BdbID: bdbID,
Variant: variant,
})
}

// TriggerSlotMigrateSlotShuffle triggers a slot-shuffle migration
// This migrates one shard between existing nodes
// Result: Slots move, endpoints unchanged
func (c *FaultInjectorClient) TriggerSlotMigrateSlotShuffle(ctx context.Context, bdbID string, variant SlotMigrateVariant) (*ActionResponse, error) {
return c.TriggerSlotMigrate(ctx, SlotMigrateRequest{
Effect: SlotMigrateEffectSlotShuffle,
BdbID: bdbID,
Variant: variant,
})
}

// Node Management Actions

// RestartNode restarts a specific Redis node
Expand Down
36 changes: 27 additions & 9 deletions maintnotifications/e2e/logcollector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,22 +92,34 @@ func (tlc *TestLogCollector) DoPrint() {
// MatchFunc is a slice of functions that check the logs for a specific condition
// use in WaitForLogMatchFunc
type MatchFunc struct {
completed atomic.Bool
F func(lstring string) bool
matches []string
found chan struct{} // channel to notify when match is found, will be closed
done func()
completed atomic.Bool
F func(lstring string) bool
matches []string
matchesMu sync.Mutex // protects matches slice
found chan struct{} // channel to notify when match is found, will be closed
done func()
}

func (tlc *TestLogCollector) Printf(_ context.Context, format string, v ...interface{}) {
tlc.mu.Lock()
defer tlc.mu.Unlock()
lstr := fmt.Sprintf(format, v...)
if len(tlc.matchFuncs) > 0 {

// Check if there are match functions to process
// Use matchFuncsMutex to safely read matchFuncs
tlc.matchFuncsMutex.Lock()
hasMatchFuncs := len(tlc.matchFuncs) > 0
// Create a copy of matchFuncs to avoid holding the lock while processing
matchFuncsCopy := make([]*MatchFunc, len(tlc.matchFuncs))
copy(matchFuncsCopy, tlc.matchFuncs)
tlc.matchFuncsMutex.Unlock()

if hasMatchFuncs {
go func(lstr string) {
for _, matchFunc := range tlc.matchFuncs {
for _, matchFunc := range matchFuncsCopy {
if matchFunc.F(lstr) {
matchFunc.matchesMu.Lock()
matchFunc.matches = append(matchFunc.matches, lstr)
matchFunc.matchesMu.Unlock()
matchFunc.done()
return
}
Expand All @@ -118,6 +130,7 @@ func (tlc *TestLogCollector) Printf(_ context.Context, format string, v ...inter
fmt.Println(lstr)
}
tlc.l = append(tlc.l, fmt.Sprintf(format, v...))
tlc.mu.Unlock()
}

func (tlc *TestLogCollector) WaitForLogContaining(searchString string, timeout time.Duration) bool {
Expand Down Expand Up @@ -170,7 +183,12 @@ func (tlc *TestLogCollector) WaitForLogMatchFunc(mf func(string) bool, timeout t

select {
case <-matchFunc.found:
return matchFunc.matches[0], true
matchFunc.matchesMu.Lock()
defer matchFunc.matchesMu.Unlock()
if len(matchFunc.matches) > 0 {
return matchFunc.matches[0], true
}
return "", false
case <-time.After(timeout):
return "", false
}
Expand Down
59 changes: 50 additions & 9 deletions maintnotifications/e2e/notification_injector.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,21 @@ type NotificationInjector interface {
InjectSMIGRATED(ctx context.Context, seqID int64, hostPort string, slots ...string) error

// InjectMOVING injects a MOVING notification (for standalone)
InjectMOVING(ctx context.Context, seqID int64, slot int) error
// Format: ["MOVING", seqID, timeS, endpoint]
InjectMOVING(ctx context.Context, seqID int64, timeS int64, endpoint string) error

// InjectMIGRATING injects a MIGRATING notification (for standalone)
InjectMIGRATING(ctx context.Context, seqID int64, slot int) error

// InjectMIGRATED injects a MIGRATED notification (for standalone)
InjectMIGRATED(ctx context.Context, seqID int64, slot int) error

// InjectFAILING_OVER injects a FAILING_OVER notification
InjectFAILING_OVER(ctx context.Context, seqID int64) error

// InjectFAILED_OVER injects a FAILED_OVER notification
InjectFAILED_OVER(ctx context.Context, seqID int64) error

// Start starts the injector (if needed)
Start() error

Expand Down Expand Up @@ -475,8 +482,8 @@ func (p *ProxyNotificationInjector) InjectSMIGRATED(ctx context.Context, seqID i
return p.injectNotification(notification)
}

func (p *ProxyNotificationInjector) InjectMOVING(ctx context.Context, seqID int64, slot int) error {
notification := formatMovingNotification(seqID, slot)
func (p *ProxyNotificationInjector) InjectMOVING(ctx context.Context, seqID int64, timeS int64, endpoint string) error {
notification := formatMovingNotification(seqID, timeS, endpoint)
return p.injectNotification(notification)
}

Expand All @@ -490,6 +497,16 @@ func (p *ProxyNotificationInjector) InjectMIGRATED(ctx context.Context, seqID in
return p.injectNotification(notification)
}

func (p *ProxyNotificationInjector) InjectFAILING_OVER(ctx context.Context, seqID int64) error {
notification := formatFailingOverNotification(seqID)
return p.injectNotification(notification)
}

func (p *ProxyNotificationInjector) InjectFAILED_OVER(ctx context.Context, seqID int64) error {
notification := formatFailedOverNotification(seqID)
return p.injectNotification(notification)
}

func (p *ProxyNotificationInjector) injectNotification(notification string) error {
url := p.apiBaseURL + "/send-to-all-clients?encoding=raw"
resp, err := p.httpClient.Post(url, "application/octet-stream", strings.NewReader(notification))
Expand Down Expand Up @@ -559,9 +576,14 @@ func formatSMigratedNotification(seqID int64, endpoints ...string) string {
return strings.Join(parts, "")
}

func formatMovingNotification(seqID int64, slot int) string {
slotStr := fmt.Sprintf("%d", slot)
return fmt.Sprintf(">3\r\n$6\r\nMOVING\r\n:%d\r\n$%d\r\n%s\r\n", seqID, len(slotStr), slotStr)
func formatMovingNotification(seqID int64, timeS int64, endpoint string) string {
// Format: ["MOVING", seqID, timeS, endpoint]
if endpoint == "" {
// 3 elements: MOVING, seqID, timeS
return fmt.Sprintf(">3\r\n$6\r\nMOVING\r\n:%d\r\n:%d\r\n", seqID, timeS)
}
// 4 elements: MOVING, seqID, timeS, endpoint
return fmt.Sprintf(">4\r\n$6\r\nMOVING\r\n:%d\r\n:%d\r\n$%d\r\n%s\r\n", seqID, timeS, len(endpoint), endpoint)
}

func formatMigratingNotification(seqID int64, slot int) string {
Expand All @@ -574,6 +596,16 @@ func formatMigratedNotification(seqID int64, slot int) string {
return fmt.Sprintf(">3\r\n$8\r\nMIGRATED\r\n:%d\r\n$%d\r\n%s\r\n", seqID, len(slotStr), slotStr)
}

func formatFailingOverNotification(seqID int64) string {
// Format: ["FAILING_OVER", seqID]
return fmt.Sprintf(">2\r\n$12\r\nFAILING_OVER\r\n:%d\r\n", seqID)
}

func formatFailedOverNotification(seqID int64) string {
// Format: ["FAILED_OVER", seqID]
return fmt.Sprintf(">2\r\n$11\r\nFAILED_OVER\r\n:%d\r\n", seqID)
}


// FaultInjectorNotificationInjector implements NotificationInjector using the real fault injector
type FaultInjectorNotificationInjector struct {
Expand Down Expand Up @@ -664,9 +696,9 @@ func (f *FaultInjectorNotificationInjector) InjectSMIGRATED(ctx context.Context,
return fmt.Errorf("SMIGRATED cannot be directly injected with real fault injector - it's generated when migration completes")
}

func (f *FaultInjectorNotificationInjector) InjectMOVING(ctx context.Context, seqID int64, slot int) error {
// MOVING notifications are generated during slot migration
return fmt.Errorf("MOVING cannot be directly injected with real fault injector - it's generated during migration")
func (f *FaultInjectorNotificationInjector) InjectMOVING(ctx context.Context, seqID int64, timeS int64, endpoint string) error {
// MOVING notifications are generated during bind action
return fmt.Errorf("MOVING cannot be directly injected with real fault injector - it's generated during bind action")
}

func (f *FaultInjectorNotificationInjector) InjectMIGRATING(ctx context.Context, seqID int64, slot int) error {
Expand All @@ -685,4 +717,13 @@ func (f *FaultInjectorNotificationInjector) InjectMIGRATED(ctx context.Context,
return fmt.Errorf("MIGRATED cannot be directly injected with real fault injector - it's generated when migration completes")
}

func (f *FaultInjectorNotificationInjector) InjectFAILING_OVER(ctx context.Context, seqID int64) error {
// FAILING_OVER is generated automatically when failover starts
return fmt.Errorf("FAILING_OVER cannot be directly injected with real fault injector - it's generated when failover starts")
}

func (f *FaultInjectorNotificationInjector) InjectFAILED_OVER(ctx context.Context, seqID int64) error {
// FAILED_OVER is generated automatically when failover completes
return fmt.Errorf("FAILED_OVER cannot be directly injected with real fault injector - it's generated when failover completes")
}

Loading
Loading