Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,15 @@ to make sure manually cancellation is respected, especially for blocking request
All read-only commands are automatically retried on failures by default before their context deadlines exceeded.
You can disable this by setting `DisableRetry` or adjust the number of retries and durations between retries using `RetryDelay` function.

### Retryable Commands

Write commands can set Retryable to automatically retried on failures like read-only commands. Make sure you only use this feature with idempotent operations.

```golang
client.Do(ctx, client.B().Set().Key("key").Value("val").Build().ToRetryable())
client.DoMulti(ctx, client.B().Set().Key("key").Value("val").Build().ToRetryable())
```

## Pub/Sub

To receive messages from channels, `client.Receive()` should be used. It supports `SUBSCRIBE`, `PSUBSCRIBE`, and Redis 7.0's `SSUBSCRIBE`:
Expand Down
12 changes: 6 additions & 6 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ retry:
if err == errConnExpired {
goto retry
}
if c.retry && cmd.IsReadOnly() && c.isRetryable(err, ctx) {
if c.retry && cmd.IsRetryable() && c.isRetryable(err, ctx) {
if c.retryHandler.WaitOrSkipRetry(ctx, attempts, cmd, err) {
attempts++
goto retry
Expand Down Expand Up @@ -120,7 +120,7 @@ retry:
goto recover
}
}
if c.retry && allReadOnly(multi) {
if c.retry && allRetryable(multi) {
for i, resp := range resps {
if c.isRetryable(resp.Error(), ctx) {
shouldRetry := c.retryHandler.WaitOrSkipRetry(
Expand Down Expand Up @@ -275,7 +275,7 @@ retry:
return newErrResult(err)
}
resp = c.wire.Do(ctx, cmd)
if c.retry && cmd.IsReadOnly() && isRetryable(resp.Error(), c.wire, ctx) {
if c.retry && cmd.IsRetryable() && isRetryable(resp.Error(), c.wire, ctx) {
shouldRetry := c.retryHandler.WaitOrSkipRetry(
ctx, attempts, cmd, resp.Error(),
)
Expand All @@ -297,7 +297,7 @@ func (c *dedicatedSingleClient) DoMulti(ctx context.Context, multi ...Completed)
attempts := 1
retryable := c.retry
if retryable {
retryable = allReadOnly(multi)
retryable = allRetryable(multi)
}
retry:
if err := c.check(); err != nil {
Expand Down Expand Up @@ -392,9 +392,9 @@ func isRetryable(err error, w wire, ctx context.Context) bool {
return true
}

func allReadOnly(multi []Completed) bool {
func allRetryable(multi []Completed) bool {
for _, cmd := range multi {
if cmd.IsWrite() {
if !cmd.IsRetryable() {
return false
}
}
Expand Down
64 changes: 60 additions & 4 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -754,6 +754,17 @@ func SetupClientRetry(t *testing.T, fn func(mock *mockConn) Client) {
}
})

t.Run("Delegate Do Write Retryable Retry", func(t *testing.T) {
c, m := setup()
m.DoFn = makeDoFn(
newErrResult(ErrClosing),
newResult(strmsg('+', "Do"), nil),
)
if v, err := c.Do(context.Background(), c.B().Set().Key("Do").Value("V").Build().ToRetryable()).ToString(); err != nil || v != "Do" {
t.Fatalf("unexpected response %v %v", v, err)
}
})

t.Run("Delegate DoMulti ReadOnly Retry", func(t *testing.T) {
c, m := setup()
m.DoMultiFn = makeDoMultiFn(
Expand Down Expand Up @@ -832,6 +843,17 @@ func SetupClientRetry(t *testing.T, fn func(mock *mockConn) Client) {
}
})

t.Run("Delegate DoMulti Write Retryable Retry", func(t *testing.T) {
c, m := setup()
m.DoMultiFn = makeDoMultiFn(
[]RedisResult{newErrResult(ErrClosing)},
[]RedisResult{newResult(strmsg('+', "Do"), nil)},
)
if v, err := c.DoMulti(context.Background(), c.B().Set().Key("Do").Value("V").Build().ToRetryable())[0].ToString(); err != nil || v != "Do" {
t.Fatalf("unexpected response %v %v", v, err)
}
})

t.Run("Delegate DoCache Retry", func(t *testing.T) {
c, m := setup()
m.DoCacheFn = makeDoCacheFn(
Expand Down Expand Up @@ -1105,6 +1127,23 @@ func SetupClientRetry(t *testing.T, fn func(mock *mockConn) Client) {
}
})

t.Run("Dedicate Delegate Do Write Retryable Retry", func(t *testing.T) {
c, m := setup()
m.DoFn = makeDoFn(
newErrResult(ErrClosing),
newResult(strmsg('+', "Do"), nil),
)
m.AcquireFn = func() wire { return &mockWire{DoFn: m.DoFn} }
if ret := c.Dedicated(func(cc DedicatedClient) error {
if v, err := cc.Do(context.Background(), c.B().Set().Key("Do").Value("Do").Build().ToRetryable()).ToString(); err != nil || v != "Do" {
t.Fatalf("unexpected response %v %v", v, err)
}
return errors.New("done")
}); ret.Error() != "done" {
t.Fatalf("Dedicated not executed")
}
})

t.Run("Dedicate Delegate DoMulti ReadOnly Retry", func(t *testing.T) {
c, m := setup()
m.DoMultiFn = makeDoMultiFn(
Expand Down Expand Up @@ -1190,7 +1229,24 @@ func SetupClientRetry(t *testing.T, fn func(mock *mockConn) Client) {
}
})

t.Run("Delegate Receive Retry", func(t *testing.T) {
t.Run("Dedicate Delegate DoMulti Write Retryable Retry", func(t *testing.T) {
c, m := setup()
m.DoMultiFn = makeDoMultiFn(
[]RedisResult{newErrResult(ErrClosing)},
[]RedisResult{newResult(strmsg('+', "Do"), nil)},
)
m.AcquireFn = func() wire { return &mockWire{DoMultiFn: m.DoMultiFn} }
if ret := c.Dedicated(func(cc DedicatedClient) error {
if v, err := cc.DoMulti(context.Background(), c.B().Set().Key("Do").Value("Do").Build().ToRetryable())[0].ToString(); err != nil || v != "Do" {
t.Fatalf("unexpected response %v %v", v, err)
}
return errors.New("done")
}); ret.Error() != "done" {
t.Fatalf("Dedicated not executed")
}
})

t.Run("Dedicate Delegate Receive Retry", func(t *testing.T) {
c, m := setup()
m.ReceiveFn = makeReceiveFn(ErrClosing, nil)
m.AcquireFn = func() wire { return &mockWire{ReceiveFn: m.ReceiveFn} }
Expand All @@ -1204,7 +1260,7 @@ func SetupClientRetry(t *testing.T, fn func(mock *mockConn) Client) {
}
})

t.Run("Delegate Receive NoRetry - broken", func(t *testing.T) {
t.Run("Dedicate Delegate Receive NoRetry - broken", func(t *testing.T) {
c, m := setup()
m.ReceiveFn = makeReceiveFn(ErrClosing)
m.ErrorFn = func() error { return ErrClosing }
Expand All @@ -1216,7 +1272,7 @@ func SetupClientRetry(t *testing.T, fn func(mock *mockConn) Client) {
}
})

t.Run("Delegate Receive NoRetry - ctx done", func(t *testing.T) {
t.Run("Dedicate Delegate Receive NoRetry - ctx done", func(t *testing.T) {
c, m := setup()
m.ReceiveFn = makeReceiveFn(ErrClosing)
m.AcquireFn = func() wire { return &mockWire{ReceiveFn: m.ReceiveFn} }
Expand All @@ -1229,7 +1285,7 @@ func SetupClientRetry(t *testing.T, fn func(mock *mockConn) Client) {
}
})

t.Run("Delegate Receive NoRetry - not retryable", func(t *testing.T) {
t.Run("Dedicate Delegate Receive NoRetry - not retryable", func(t *testing.T) {
c, m := setup()
m.ReceiveFn = makeReceiveFn(ErrClosing, nil)
m.AcquireFn = func() wire { return &mockWire{ReceiveFn: m.ReceiveFn} }
Expand Down
8 changes: 4 additions & 4 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,7 @@ process:
resultsp.Put(results)
goto process
case RedirectRetry:
if c.retry && cmd.IsReadOnly() {
if c.retry && cmd.IsRetryable() {
shouldRetry := c.retryHandler.WaitOrSkipRetry(ctx, attempts, cmd, resp.Error())
if shouldRetry {
attempts++
Expand Down Expand Up @@ -741,7 +741,7 @@ func (c *clusterClient) doresultfn(
nc := cc
retryDelay := time.Duration(-1)
if mode == RedirectRetry {
if !c.retry || !cm.IsReadOnly() {
if !c.retry || !cm.IsRetryable() {
continue
}
retryDelay = c.retryHandler.RetryDelay(attempts, cm, resp.Error())
Expand Down Expand Up @@ -1477,7 +1477,7 @@ retry:
resp = w.Do(ctx, cmd)
switch _, mode := c.client.shouldRefreshRetry(resp.Error(), ctx); mode {
case RedirectRetry:
if c.retry && cmd.IsReadOnly() && w.Error() == nil {
if c.retry && cmd.IsRetryable() && w.Error() == nil {
shouldRetry := c.retryHandler.WaitOrSkipRetry(
ctx, attempts, cmd, resp.Error(),
)
Expand All @@ -1504,7 +1504,7 @@ func (c *dedicatedClusterClient) DoMulti(ctx context.Context, multi ...Completed
}
retryable := c.retry
if retryable {
retryable = allReadOnly(multi)
retryable = allRetryable(multi)
}
attempts := 1
retry:
Expand Down
28 changes: 20 additions & 8 deletions internal/cmds/cmds.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ package cmds
import "strings"

const (
optInTag = uint16(1 << 15)
blockTag = uint16(1 << 14)
readonly = uint16(1 << 13)
noRetTag = uint16(1<<12) | readonly | pipeTag // make noRetTag can also be retried and auto pipelining
mtGetTag = uint16(1<<11) | readonly // make mtGetTag can also be retried
scrRoTag = uint16(1<<10) | readonly // make scrRoTag can also be retried
unsubTag = uint16(1<<9) | noRetTag
pipeTag = uint16(1 << 8) // make blocking mode request can use auto pipelining
optInTag = uint16(1 << 15)
blockTag = uint16(1 << 14)
readonly = uint16(1<<13) | retryableTag
noRetTag = uint16(1<<12) | readonly | pipeTag // make noRetTag can also be retried and auto pipelining
mtGetTag = uint16(1<<11) | readonly // make mtGetTag can also be retried
scrRoTag = uint16(1<<10) | readonly // make scrRoTag can also be retried
unsubTag = uint16(1<<9) | noRetTag
pipeTag = uint16(1 << 8) // make blocking mode request can use auto pipelining
retryableTag = uint16(1 << 7) // make command retryable
// InitSlot indicates that the command be sent to any redis node in cluster
InitSlot = uint16(1 << 14)
// NoSlot indicates that the command has no key slot specified
Expand Down Expand Up @@ -123,6 +124,12 @@ func (c Completed) ToPipe() Completed {
return c
}

// ToRetryable return a new command with retryableTag
func (c Completed) ToRetryable() Completed {
c.cf |= retryableTag
return c
}

// IsEmpty checks if it is an empty command.
func (c *Completed) IsEmpty() bool {
return c.cs == nil || len(c.cs.s) == 0
Expand Down Expand Up @@ -163,6 +170,11 @@ func (c *Completed) IsPipe() bool {
return c.cf&pipeTag == pipeTag
}

// IsRetryable checks if it is set retryableTag
func (c *Completed) IsRetryable() bool {
return c.cf&retryableTag == retryableTag
}

// Commands returns the commands as []string.
// Note that the returned []string should not be modified
// and should not be read after passing into the Client interface, because it will be recycled.
Expand Down
4 changes: 2 additions & 2 deletions sentinel.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ retry:
if err == errConnExpired {
goto retry
}
if c.retry && cmd.IsReadOnly() && c.isRetryable(err, ctx) {
if c.retry && cmd.IsRetryable() && c.isRetryable(err, ctx) {
if c.retryHandler.WaitOrSkipRetry(ctx, attempts, cmd, err) {
attempts++
goto retry
Expand Down Expand Up @@ -136,7 +136,7 @@ retry:
goto recover
}
}
if c.retry && allReadOnly(multi) {
if c.retry && allRetryable(multi) {
for i, resp := range resps.s {
if c.isRetryable(resp.Error(), ctx) {
shouldRetry := c.retryHandler.WaitOrSkipRetry(
Expand Down
Loading