Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,27 @@ func (c *singleClient) DoMultiStream(ctx context.Context, multi ...Completed) Mu
return s
}

func (c *singleClient) DoWithReader(ctx context.Context, cmd Completed, fn ReaderFunc) (err error) {
attempts := 1
retry:
err = c.conn.DoWithReader(ctx, cmd, fn)
if err != nil {
if err == errConnExpired {
goto retry
}
if c.retry && cmd.IsRetryable() && c.isRetryable(err, ctx) {
if c.retryHandler.WaitOrSkipRetry(ctx, attempts, cmd, err) {
attempts++
goto retry
}
}
}
if err == nil || err == Nil {
cmds.PutCompleted(cmd)
}
return err
}

func (c *singleClient) DoMulti(ctx context.Context, multi ...Completed) (resps []RedisResult) {
if len(multi) == 0 {
return nil
Expand Down Expand Up @@ -321,6 +342,29 @@ retry:
return resp
}

func (c *dedicatedSingleClient) DoWithReader(ctx context.Context, cmd Completed, fn ReaderFunc) (err error) {
attempts := 1
retry:
if err := c.check(); err != nil {
return err
}
// For dedicated clients, pass nil as pool since wire shouldn't be recycled
err = c.wire.DoWithReader(ctx, nil, cmd, fn)
if c.retry && cmd.IsRetryable() && isRetryable(err, c.wire, ctx) {
shouldRetry := c.retryHandler.WaitOrSkipRetry(
ctx, attempts, cmd, err,
)
if shouldRetry {
attempts++
goto retry
}
}
if err == nil || err == Nil {
cmds.PutCompleted(cmd)
}
return err
}

func (c *dedicatedSingleClient) Receive(ctx context.Context, subscribe Completed, fn func(msg PubSubMessage)) (err error) {
attempts := 1
retry:
Expand Down
91 changes: 91 additions & 0 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1339,6 +1339,70 @@ func (c *clusterClient) DoStream(ctx context.Context, cmd Completed) RedisResult
return ret
}

func (c *clusterClient) DoWithReader(ctx context.Context, cmd Completed, fn ReaderFunc) (err error) {
if err = c.doWithReader(ctx, cmd, fn); err == nil || err == Nil {
cmds.PutCompleted(cmd)
}
return err
}

func (c *clusterClient) doWithReader(ctx context.Context, cmd Completed, fn ReaderFunc) (err error) {
attempts := 1
redirects := 0
retry:
cc, pickErr := c.pick(ctx, cmd.Slot(), c.toReplica(cmd))
if pickErr != nil {
return pickErr
}
err = cc.DoWithReader(ctx, cmd, fn)
if err == errConnExpired {
goto retry
}
process:
switch addr, mode := c.shouldRefreshRetry(err, ctx); mode {
case RedirectMove:
redirects++
if c.opt.ClusterOption.MaxMovedRedirections > 0 && redirects > c.opt.ClusterOption.MaxMovedRedirections {
return err
}
ncc := c.redirectOrNew(addr, cc, cmd.Slot(), mode)
recover1:
err = ncc.DoWithReader(ctx, cmd, fn)
if err == errConnExpired {
goto recover1
}
cc = ncc
goto process
case RedirectAsk:
redirects++
if c.opt.ClusterOption.MaxMovedRedirections > 0 && redirects > c.opt.ClusterOption.MaxMovedRedirections {
return err
}
ncc := c.redirectOrNew(addr, cc, cmd.Slot(), mode)
recover2:
// Send ASKING command first, then the actual command
if askResp := ncc.Do(ctx, cmds.AskingCmd); askResp.NonRedisError() == nil {
err = ncc.DoWithReader(ctx, cmd, fn)
if err == errConnExpired {
goto recover2
}
} else {
err = askResp.NonRedisError()
}
cc = ncc
goto process
case RedirectRetry:
if c.retry && cmd.IsRetryable() {
shouldRetry := c.retryHandler.WaitOrSkipRetry(ctx, attempts, cmd, err)
if shouldRetry {
attempts++
goto retry
}
}
}
return err
}

func (c *clusterClient) DoMultiStream(ctx context.Context, multi ...Completed) MultiRedisResultStream {
if len(multi) == 0 {
return RedisResultStream{e: io.EOF}
Expand Down Expand Up @@ -1521,6 +1585,33 @@ retry:
return resp
}

func (c *dedicatedClusterClient) DoWithReader(ctx context.Context, cmd Completed, fn ReaderFunc) (err error) {
attempts := 1
retry:
if w, acquireErr := c.acquire(ctx, cmd.Slot()); acquireErr != nil {
err = acquireErr
} else {
// For dedicated clients, pass nil as pool since wire shouldn't be recycled
err = w.DoWithReader(ctx, nil, cmd, fn)
switch _, mode := c.client.shouldRefreshRetry(err, ctx); mode {
case RedirectRetry:
if c.retry && cmd.IsRetryable() && w.Error() == nil {
shouldRetry := c.retryHandler.WaitOrSkipRetry(
ctx, attempts, cmd, err,
)
if shouldRetry {
attempts++
goto retry
}
}
}
}
if err == nil || err == Nil {
cmds.PutCompleted(cmd)
}
return err
}

func (c *dedicatedClusterClient) DoMulti(ctx context.Context, multi ...Completed) (resp []RedisResult) {
if len(multi) == 0 {
return nil
Expand Down
76 changes: 76 additions & 0 deletions lua.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,82 @@ type Lua struct {
retryable bool
}

// ExecWithReader executes the script and provides direct access to the raw RESP response
// through a callback function for zero-allocation parsing.
// It will first try with EVALSHA/EVALSHA_RO and then EVAL/EVAL_RO if NOSCRIPT error occurs.
// If Lua is initialized with disabled SHA1, it will use EVAL/EVAL_RO without the EVALSHA/EVALSHA_RO attempt.
// If Lua is initialized with SHA-1 loading, it will call SCRIPT LOAD once to obtain the SHA-1 from Redis.
// Cross-slot keys are prohibited if the Client is a cluster client.
//
// The callback is ONLY invoked for successful script execution.
// All Redis errors (including NOSCRIPT) are handled automatically by rueidis.
// The callback must fully consume the response or return an error (see ReaderFunc documentation).
func (s *Lua) ExecWithReader(ctx context.Context, c Client, keys, args []string, fn ReaderFunc) error {
var isNoScript bool
var scriptSha1 string

// Determine which SHA-1 to use (same logic as Exec)
if s.loadSha1 {
s.sha1Mu.RLock()
scriptSha1 = s.sha1
s.sha1Mu.RUnlock()

if scriptSha1 == "" {
err := s.sha1Call.Do(ctx, func() error {
result := c.Do(ctx, c.B().ScriptLoad().Script(s.script).Build().ToRetryable())
if shaStr, err := result.ToString(); err == nil {
s.sha1Mu.Lock()
s.sha1 = shaStr
s.sha1Mu.Unlock()
return nil
}
return result.Error()
})
if err != nil {
return err
}
s.sha1Mu.RLock()
scriptSha1 = s.sha1
s.sha1Mu.RUnlock()
}
} else {
scriptSha1 = s.sha1
}

// Try EVALSHA if SHA-1 is available
if !s.noSha1 && scriptSha1 != "" {
var err error
if s.readonly {
err = c.DoWithReader(ctx, c.B().EvalshaRo().Sha1(scriptSha1).Numkeys(int64(len(keys))).Key(keys...).Arg(args...).Build(), fn)
} else {
err = c.DoWithReader(ctx, s.mayRetryable(c.B().Evalsha().Sha1(scriptSha1).Numkeys(int64(len(keys))).Key(keys...).Arg(args...).Build()), fn)
}

// Check if it's a NOSCRIPT error - if so, retry with EVAL
if redisErr, isErr := IsRedisErr(err); isErr {
isNoScript = redisErr.IsNoScript()
if !isNoScript {
// It's a Redis error but not NOSCRIPT, return it
return err
}
// Fall through to EVAL below
} else {
// Either success (nil) or other error (network, callback error)
return err
}
}

// Use EVAL if noSha1 or NOSCRIPT error occurred
if s.noSha1 || isNoScript {
if s.readonly {
return c.DoWithReader(ctx, c.B().EvalRo().Script(s.script).Numkeys(int64(len(keys))).Key(keys...).Arg(args...).Build(), fn)
} else {
return c.DoWithReader(ctx, s.mayRetryable(c.B().Eval().Script(s.script).Numkeys(int64(len(keys))).Key(keys...).Arg(args...).Build()), fn)
}
}
return nil
}

// Exec the script to the given Client.
// It will first try with the EVALSHA/EVALSHA_RO and then EVAL/EVAL_RO if the first try failed.
// If Lua is initialized with disabled SHA1, it will use EVAL/EVAL_RO without the EVALSHA/EVALSHA_RO attempt.
Expand Down
32 changes: 32 additions & 0 deletions mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type conn interface {
Receive(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error
DoStream(ctx context.Context, cmd Completed) RedisResultStream
DoMultiStream(ctx context.Context, multi ...Completed) MultiRedisResultStream
DoWithReader(ctx context.Context, cmd Completed, fn ReaderFunc) error
Info() map[string]RedisMessage
Version() int
AZ() string
Expand All @@ -53,6 +54,13 @@ type muxwire struct {
mu sync.Mutex
}

const (
// usePoolForDoWithReader controls whether DoWithReader uses pool (true) or pipeline (false)
// Default: true (use pool for exclusive synchronous access)
// Set to false to enable experimental pipeline mode for DoWithReader
usePoolForDoWithReader = true
)

type mux struct {
init wire
dead wire
Expand Down Expand Up @@ -231,6 +239,30 @@ func (m *mux) DoMultiStream(ctx context.Context, multi ...Completed) MultiRedisR
return wire.DoMultiStream(ctx, m.spool, multi...)
}

func (m *mux) DoWithReader(ctx context.Context, cmd Completed, fn ReaderFunc) error {
if usePoolForDoWithReader {
// Original pool-based implementation
wire := m.spool.Acquire(ctx)
return wire.DoWithReader(ctx, m.spool, cmd, fn)
} else {
// Pipeline-based implementation (experimental)
return m.pipelineWithReader(ctx, cmd, fn)
}
}

func (m *mux) pipelineWithReader(ctx context.Context, cmd Completed, fn ReaderFunc) error {
// TODO: Implement pipeline mode for DoWithReader
// This requires:
// 1. Adding ReaderFunc field to the queue entry
// 2. Modifying _backgroundRead to detect DoWithReader commands
// 3. Calling fn(reader, typ) inline in background reader instead of parsing
// 4. Signaling completion/error back to caller via future
//
// For now, fall back to pool mode
wire := m.spool.Acquire(ctx)
return wire.DoWithReader(ctx, m.spool, cmd, fn)
}

func (m *mux) Do(ctx context.Context, cmd Completed) (resp RedisResult) {
if m.usePool && !cmd.IsPipe() {
resp = m.blocking(m.spool, ctx, cmd)
Expand Down
Loading
Loading