Skip to content

Commit 8e860ae

Browse files
yellyCopilot
andauthored
Use FT._LIST in CreateAndAliasIndex (#970)
Fixes #969. The current implementation fails tests when running against Redis 8 (presumably the version in which the error message from `FT.INFO` changed). Introduced a new implementation for `CreateAndAliasIndex` which does the following: 1. Query `FT._LIST`. 2. Finds all existing indexes which match the versioned index pattern (index + "_v" + version). 3. Creates a new index with the latest version + 1 (or 1 if no existing index). 4. Updates the alias to point to the new index (works whether or not the alias already exists). 5. Drops all the old indexes. The implementation is shared between Hash and JSON repositories to avoid duplication. The new implementation passes the tests against Redis 8 and redis-stack. <!-- CURSOR_SUMMARY --> --- > [!NOTE] > **Medium Risk** > Changes the RediSearch index rotation logic (create/alias/drop), which could impact index availability or unexpectedly drop indexes if naming matches; behavior is exercised by updated tests across two Redis versions. > > **Overview** > Fixes `CreateAndAliasIndex` for both Hash and JSON repositories by replacing the `FT.INFO`/error-string-based flow with a shared implementation that uses `FT._LIST` to discover existing `*_vN` indexes, create the next version, `FT.ALIASUPDATE` the alias, and drop prior versions. > > Updates tests to run the alias/versioning behavior against both redis-stack (RediSearch 2.8.4) and Redis 8.6.0, and refactors test client setup to support multiple server targets. > > <sup>Reviewed by [Cursor Bugbot](https://cursor.com/bugbot) for commit da04f17. Bugbot is set up for automated code reviews on this repo. Configure [here](https://www.cursor.com/dashboard/bugbot).</sup> <!-- /CURSOR_SUMMARY --> --------- Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
1 parent bec8a26 commit 8e860ae

6 files changed

Lines changed: 164 additions & 183 deletions

File tree

om/hash.go

Lines changed: 8 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,14 @@ package om
22

33
import (
44
"context"
5-
"fmt"
65
"reflect"
76
"strconv"
8-
"strings"
97
"time"
108

119
"github.com/oklog/ulid/v2"
1210

1311
"github.com/redis/rueidis"
12+
"github.com/redis/rueidis/internal/cmds"
1413
)
1514

1615
// NewHashRepository creates a HashRepository.
@@ -162,74 +161,13 @@ func (r *HashRepository[T]) CreateIndex(ctx context.Context, cmdFn func(schema F
162161

163162
// CreateAndAliasIndex creates a new index, aliases it, and drops the old index if needed.
164163
func (r *HashRepository[T]) CreateAndAliasIndex(ctx context.Context, cmdFn func(schema FtCreateSchema) rueidis.Completed) error {
165-
alias := r.idx
166-
167-
var currentIndex string
168-
aliasExists := false
169-
infoCmd := r.client.B().FtInfo().Index(alias).Build()
170-
infoResp, err := r.client.Do(ctx, infoCmd).ToMap()
171-
if err != nil {
172-
if strings.Contains(err.Error(), "Unknown index name") {
173-
// This is expected when the alias doesn't exist yet
174-
aliasExists = false
175-
} else {
176-
// This is an unexpected error (network, connection, etc.)
177-
return fmt.Errorf("failed to check if index exists: %w", err)
178-
}
179-
} else {
180-
aliasExists = true
181-
}
182-
183-
if aliasExists {
184-
message, ok := infoResp["index_name"]
185-
if !ok {
186-
return fmt.Errorf("index_name not found in FT.INFO response")
187-
}
188-
189-
currentIndex, err = message.ToString()
190-
if err != nil {
191-
return fmt.Errorf("failed to convert index_name to string: %w", err)
192-
}
193-
}
194-
195-
newIndex := alias + "_v1"
196-
if aliasExists && currentIndex != "" {
197-
// Find the last occurrence of "_v" followed by digits
198-
lastVersionIndex := strings.LastIndex(currentIndex, "_v")
199-
if lastVersionIndex != -1 && lastVersionIndex+2 < len(currentIndex) {
200-
versionStr := currentIndex[lastVersionIndex+2:]
201-
if version, err := strconv.Atoi(versionStr); err == nil {
202-
newIndex = fmt.Sprintf("%s_v%d", alias, version+1)
203-
}
204-
}
205-
}
206-
207-
// Create the new index
208-
cmd := r.client.B().FtCreate().Index(newIndex).OnHash().Prefix(1).Prefix(r.prefix + ":")
209-
if err := r.client.Do(ctx, cmdFn(cmd.Schema())).Error(); err != nil {
210-
return err
211-
}
212-
213-
// Update or add the alias
214-
var aliasErr error
215-
if aliasExists {
216-
aliasErr = r.client.Do(ctx, r.client.B().FtAliasupdate().Alias(alias).Index(newIndex).Build()).Error()
217-
} else {
218-
aliasErr = r.client.Do(ctx, r.client.B().FtAliasadd().Alias(alias).Index(newIndex).Build()).Error()
219-
}
220-
221-
if aliasErr != nil {
222-
return fmt.Errorf("failed to update alias: %w", aliasErr)
223-
}
224-
225-
// Drop the old index if it exists and differs from the new one
226-
if aliasExists && currentIndex != "" && currentIndex != newIndex {
227-
if err := r.client.Do(ctx, r.client.B().FtDropindex().Index(currentIndex).Build()).Error(); err != nil {
228-
return fmt.Errorf("failed to drop old index: %w", err)
229-
}
230-
}
231-
232-
return nil
164+
return createAndAliasIndex(ctx, r.idx, r.client, func(idx string) cmds.FtCreatePrefixPrefix {
165+
return r.client.B().FtCreate().
166+
Index(idx).
167+
OnHash().
168+
Prefix(1).
169+
Prefix(r.prefix + ":")
170+
}, cmdFn)
233171
}
234172

235173
// DropIndex uses FT.DROPINDEX from the RediSearch module to drop the index whose name is `hashidx:{prefix}`

om/hash_test.go

Lines changed: 35 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -697,32 +697,46 @@ func TestNewHashRepositoryTTL(t *testing.T) {
697697
func TestCreateAndAliasIndex(t *testing.T) {
698698
ctx := context.Background()
699699

700-
client := setup(t)
701-
client.Do(ctx, client.B().Flushall().Build())
702-
defer client.Close()
700+
testCases := []struct {
701+
name string
702+
clientOpts []option
703+
}{
704+
{
705+
name: "RediSearch 2.8.4",
706+
},
707+
{
708+
name: "Redis 8.6.0",
709+
clientOpts: []option{withRedis86},
710+
},
711+
}
712+
for _, tc := range testCases {
713+
t.Run(fmt.Sprintf("CreateAndAliasIndex: %s", tc.name), func(t *testing.T) {
714+
client := setup(t, tc.clientOpts...)
715+
client.Do(ctx, client.B().Flushall().Build())
716+
defer client.Close()
703717

704-
repo := NewHashRepository("hashalias", HashTestStruct{}, client)
718+
repo := NewHashRepository("hashalias", HashTestStruct{}, client)
705719

706-
t.Run("CreateAndAliasIndex", func(t *testing.T) {
707-
err := repo.CreateAndAliasIndex(ctx, func(schema FtCreateSchema) rueidis.Completed {
708-
return schema.FieldName("Val").Text().Build()
709-
})
710-
if err != nil {
711-
t.Fatalf("failed to create and alias index: %v", err)
712-
}
720+
err := repo.CreateAndAliasIndex(ctx, func(schema FtCreateSchema) rueidis.Completed {
721+
return schema.FieldName("Val").Text().Build()
722+
})
723+
if err != nil {
724+
t.Fatalf("failed to create and alias index: %v", err)
725+
}
713726

714-
verifyAliasTarget(t, ctx, client, repo.IndexName(), repo.IndexName()+"_v1")
727+
verifyAliasTarget(t, ctx, client, repo.IndexName(), repo.IndexName()+"_v1")
715728

716-
// Step 3: Create new index version and update alias
717-
err = repo.CreateAndAliasIndex(ctx, func(schema FtCreateSchema) rueidis.Completed {
718-
return schema.FieldName("Val").Text().Build()
719-
})
720-
if err != nil {
721-
t.Fatalf("failed to create and alias new index version: %v", err)
722-
}
729+
// Step 3: Create new index version and update alias
730+
err = repo.CreateAndAliasIndex(ctx, func(schema FtCreateSchema) rueidis.Completed {
731+
return schema.FieldName("Val").Text().Build()
732+
})
733+
if err != nil {
734+
t.Fatalf("failed to create and alias new index version: %v", err)
735+
}
723736

724-
verifyAliasTarget(t, ctx, client, repo.IndexName(), repo.IndexName()+"_v2")
725-
})
737+
verifyAliasTarget(t, ctx, client, repo.IndexName(), repo.IndexName()+"_v2")
738+
})
739+
}
726740
}
727741

728742
// Helper to verify that alias points to the expected index name

om/indexes.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package om
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"regexp"
7+
"slices"
8+
"strconv"
9+
10+
"github.com/redis/rueidis"
11+
"github.com/redis/rueidis/internal/cmds"
12+
)
13+
14+
// createAndAliasIndex creates a new versioned index, aliases it to idx, and then drops all
15+
// existing versioned indexes (idx_vN) that were previously associated with that alias.
16+
func createAndAliasIndex(ctx context.Context, idx string, client rueidis.Client, createCmd func(idx string) cmds.FtCreatePrefixPrefix, cmdFn func(schema FtCreateSchema) rueidis.Completed) error {
17+
idxRE, err := regexp.Compile("^" + regexp.QuoteMeta(idx) + "_v(\\d+)$")
18+
if err != nil {
19+
return fmt.Errorf("compiling regular expression: %w", err)
20+
}
21+
22+
listCmd := client.B().FtList().Build()
23+
listResp, err := client.Do(ctx, listCmd).ToArray()
24+
if err != nil {
25+
return fmt.Errorf("listing indexes: %w", err)
26+
}
27+
28+
currVers := make([]int, 0)
29+
for _, message := range listResp {
30+
n, err := message.ToString()
31+
if err != nil {
32+
return fmt.Errorf("FT._LIST returned non-string response: %w", err)
33+
}
34+
match := idxRE.FindStringSubmatch(n)
35+
if len(match) < 2 {
36+
continue
37+
}
38+
ver, err := strconv.Atoi(match[1])
39+
if err != nil {
40+
return fmt.Errorf("converting version number for index %q: %w", n, err)
41+
}
42+
currVers = append(currVers, ver)
43+
}
44+
45+
newIndex := idx + "_v1"
46+
if len(currVers) > 0 {
47+
newIndex = fmt.Sprintf("%s_v%d", idx, slices.Max(currVers)+1)
48+
}
49+
50+
// Create the new index
51+
if err := client.Do(ctx, cmdFn(createCmd(newIndex).Schema())).Error(); err != nil {
52+
return fmt.Errorf("creating new index: %w", err)
53+
}
54+
55+
if err := client.Do(ctx, client.B().FtAliasupdate().Alias(idx).Index(newIndex).Build()).Error(); err != nil {
56+
return fmt.Errorf("updating alias: %w", err)
57+
}
58+
59+
for _, ver := range currVers {
60+
currIdx := fmt.Sprintf("%s_v%d", idx, ver)
61+
if err := client.Do(ctx, client.B().FtDropindex().Index(currIdx).Build()).Error(); err != nil {
62+
return fmt.Errorf("dropping old index %q: %w", currIdx, err)
63+
}
64+
}
65+
66+
return nil
67+
}

om/json.go

Lines changed: 8 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,14 @@ package om
33
import (
44
"context"
55
"encoding/json"
6-
"fmt"
76
"reflect"
87
"strconv"
98
"strings"
109
"time"
1110

1211
"github.com/oklog/ulid/v2"
1312
"github.com/redis/rueidis"
13+
"github.com/redis/rueidis/internal/cmds"
1414
)
1515

1616
// NewJSONRepository creates a JSONRepository.
@@ -157,75 +157,13 @@ func (r *JSONRepository[T]) CreateIndex(ctx context.Context, cmdFn func(schema F
157157

158158
// CreateAndAliasIndex creates a new index, aliases it, and drops the old index if needed.
159159
func (r *JSONRepository[T]) CreateAndAliasIndex(ctx context.Context, cmdFn func(schema FtCreateSchema) rueidis.Completed) error {
160-
alias := r.idx
161-
162-
var currentIndex string
163-
aliasExists := false
164-
infoCmd := r.client.B().FtInfo().Index(alias).Build()
165-
infoResp, err := r.client.Do(ctx, infoCmd).ToMap()
166-
if err != nil {
167-
if strings.Contains(err.Error(), "Unknown index name") {
168-
aliasExists = false
169-
} else {
170-
return fmt.Errorf("failed to check if index exists: %w", err)
171-
}
172-
} else {
173-
aliasExists = true
174-
}
175-
176-
if aliasExists {
177-
message, ok := infoResp["index_name"]
178-
if !ok {
179-
return fmt.Errorf("index_name not found in FT.INFO response")
180-
}
181-
currentIndex, err = message.ToString()
182-
if err != nil {
183-
return fmt.Errorf("failed to convert index_name to string: %w", err)
184-
}
185-
}
186-
187-
// Compute new index version name
188-
newIndex := alias + "_v1"
189-
if aliasExists && currentIndex != "" {
190-
lastVersionIndex := strings.LastIndex(currentIndex, "_v")
191-
if lastVersionIndex != -1 && lastVersionIndex+2 < len(currentIndex) {
192-
versionStr := currentIndex[lastVersionIndex+2:]
193-
if version, err := strconv.Atoi(versionStr); err == nil {
194-
newIndex = fmt.Sprintf("%s_v%d", alias, version+1)
195-
}
196-
}
197-
}
198-
199-
// Create the new index with schema
200-
createCmd := r.client.B().FtCreate().
201-
Index(newIndex).
202-
OnJson().
203-
Prefix(1).
204-
Prefix(r.prefix + ":")
205-
if err := r.client.Do(ctx, cmdFn(createCmd.Schema())).Error(); err != nil {
206-
return fmt.Errorf("failed to create index %s: %w", newIndex, err)
207-
}
208-
209-
// Set alias to point to new index
210-
var aliasErr error
211-
if aliasExists {
212-
aliasErr = r.client.Do(ctx, r.client.B().FtAliasupdate().Alias(alias).Index(newIndex).Build()).Error()
213-
} else {
214-
aliasErr = r.client.Do(ctx, r.client.B().FtAliasadd().Alias(alias).Index(newIndex).Build()).Error()
215-
}
216-
217-
if aliasErr != nil {
218-
return fmt.Errorf("failed to update alias: %w", aliasErr)
219-
}
220-
221-
// Drop old index if it's different from the new one
222-
if aliasExists && currentIndex != "" && currentIndex != newIndex {
223-
if err := r.client.Do(ctx, r.client.B().FtDropindex().Index(currentIndex).Build()).Error(); err != nil {
224-
return fmt.Errorf("failed to drop old index: %w", err)
225-
}
226-
}
227-
228-
return nil
160+
return createAndAliasIndex(ctx, r.idx, r.client, func(idx string) cmds.FtCreatePrefixPrefix {
161+
return r.client.B().FtCreate().
162+
Index(idx).
163+
OnJson().
164+
Prefix(1).
165+
Prefix(r.prefix + ":")
166+
}, cmdFn)
229167
}
230168

231169
// DropIndex uses FT.DROPINDEX from the RediSearch module to drop the index whose name is `jsonidx:{prefix}`

om/json_test.go

Lines changed: 34 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -692,31 +692,45 @@ func TestNewJSONTTLRepository(t *testing.T) {
692692
func TestCreateAndAliasIndex_JSON(t *testing.T) {
693693
ctx := context.Background()
694694

695-
client := setup(t)
696-
client.Do(ctx, client.B().Flushall().Build())
697-
defer client.Close()
695+
testCases := []struct {
696+
name string
697+
clientOpts []option
698+
}{
699+
{
700+
name: "RediSearch 2.8.4",
701+
},
702+
{
703+
name: "Redis 8.6.0",
704+
clientOpts: []option{withRedis86},
705+
},
706+
}
707+
for _, tc := range testCases {
708+
t.Run(fmt.Sprintf("CreateAndAliasIndex: %s", tc.name), func(t *testing.T) {
709+
client := setup(t, tc.clientOpts...)
710+
client.Do(ctx, client.B().Flushall().Build())
711+
defer client.Close()
698712

699-
repo := NewJSONRepository("jsonalias", JSONTestStruct{}, client)
713+
repo := NewJSONRepository("jsonalias", JSONTestStruct{}, client)
700714

701-
t.Run("CreateAndAliasIndex_JSON", func(t *testing.T) {
702-
err := repo.CreateAndAliasIndex(ctx, func(schema FtCreateSchema) rueidis.Completed {
703-
return schema.FieldName("$.val").As("val").Text().Build()
704-
})
705-
if err != nil {
706-
t.Fatalf("failed to create and alias JSON index: %v", err)
707-
}
715+
err := repo.CreateAndAliasIndex(ctx, func(schema FtCreateSchema) rueidis.Completed {
716+
return schema.FieldName("$.val").As("val").Text().Build()
717+
})
718+
if err != nil {
719+
t.Fatalf("failed to create and alias JSON index: %v", err)
720+
}
708721

709-
verifyAliasTarget(t, ctx, client, repo.IndexName(), repo.IndexName()+"_v1")
722+
verifyAliasTarget(t, ctx, client, repo.IndexName(), repo.IndexName()+"_v1")
710723

711-
err = repo.CreateAndAliasIndex(ctx, func(schema FtCreateSchema) rueidis.Completed {
712-
return schema.FieldName("$.val").As("val").Text().Build()
713-
})
714-
if err != nil {
715-
t.Fatalf("failed to create and alias new JSON index version: %v", err)
716-
}
724+
err = repo.CreateAndAliasIndex(ctx, func(schema FtCreateSchema) rueidis.Completed {
725+
return schema.FieldName("$.val").As("val").Text().Build()
726+
})
727+
if err != nil {
728+
t.Fatalf("failed to create and alias new JSON index version: %v", err)
729+
}
717730

718-
verifyAliasTarget(t, ctx, client, repo.IndexName(), repo.IndexName()+"_v2")
719-
})
731+
verifyAliasTarget(t, ctx, client, repo.IndexName(), repo.IndexName()+"_v2")
732+
})
733+
}
720734
}
721735

722736
type JSONTestVerlessTTLStruct struct {

0 commit comments

Comments
 (0)