Skip to content

Conversation

@FZambia
Copy link
Contributor

@FZambia FZambia commented Jan 12, 2026

Hello @rueian , hope you are doing well!

Currently, rueidis does not allow parsing large Redis replies with multiple messages in nested arrays without extra allocations in readB (like results from XRANGE).

Here is a profile of one of the benches I have.

Screenshot 2026-01-12 at 10 55 27 PM

From what I see these allocs are often unavoidable, because they happen before we get control over RedisResult in Do.

I noticed DoStream, but it seems very limited in supported types, and the purpose of it seems different.

So eventually I tried to go in a bit different direction: introduce DoWithReader.

type ReaderFunc func(reader *bufio.Reader) error

// DoWithReader sends a command to redis and provides direct access to the raw RESP response
// through a callback function for zero-allocation parsing.
//
// Unlike DoStream, DoWithReader:
// - Works with ALL Redis response types (arrays, maps, sets, nested structures)
// - Automatically handles cluster redirects (MOVED/ASK)
// - Automatically handles retries (TRYAGAIN, LOADING, connection errors)
//
// The callback is ONLY invoked for successful (non-error) responses.
// All error handling (redirects, retries) is done automatically by rueidis.
//
// The reader is only valid during callback execution and must not be stored.
// The cmd parameter is recycled after DoWithReader returns.
//
// IMPORTANT - Error Handling in Callback:
// The callback MUST either:
//  1. Fully consume the response and return nil, OR
//  2. Return an error (in which case the connection will be closed)
//
// If the callback returns an error, the underlying connection is closed to prevent
// data corruption. Therefore, all errors from io operations (io.ReadFull, r.Discard, etc.)
// MUST be checked and returned properly.
DoWithReader(ctx context.Context, cmd Completed, fn ReaderFunc) error

A benchmark that demonstrates the problem and how DoWithReader helps:

func BenchmarkXRange1000Entries(b *testing.B) {
	ctx := context.Background()
	client, err := rueidis.NewClient(rueidis.ClientOption{InitAddress: []string{"127.0.0.1:6379"}})
	if err != nil {
		b.Fatal(err)
	}
	defer client.Close()

	// Setup: Create a stream with 1000 entries
	streamKey := "benchmark:xrange:stream"

	// Clean up any existing stream
	client.Do(ctx, client.B().Del().Key(streamKey).Build())

	// Add 1000 entries to the stream
	for i := 0; i < 1000; i++ {
		err := client.Do(ctx, client.B().Xadd().
			Key(streamKey).
			Id("*").
			FieldValue().
			FieldValue("field1", "value"+strconv.Itoa(i)).
			Build()).Error()
		if err != nil {
			b.Fatal(err)
		}
	}

	// Benchmark 1: Using client.Do with AsXRangeSlices
	b.Run("Do_AsXRangeSlices", func(b *testing.B) {
		b.ResetTimer()
		b.ReportAllocs()

		for b.Loop() {
			result := client.Do(ctx, client.B().Xrange().
				Key(streamKey).
				Start("-").
				End("+").
				Build())

			if result.Error() != nil {
				b.Fatal(result.Error())
			}

			entries, err := result.AsXRangeSlices()
			if err != nil {
				b.Fatal(err)
			}

			if len(entries) != 1000 {
				b.Fatalf("expected 1000 entries, got %d", len(entries))
			}
		}
	})

	// Benchmark 2: Using client.DoWithReader with resp package
	b.Run("DoWithReader_RespParse", func(b *testing.B) {
		b.ResetTimer()
		b.ReportAllocs()

		for b.Loop() {
			var entryCount int

			var results [][]byte
			err := client.DoWithReader(ctx, client.B().Xrange().
				Key(streamKey).
				Start("-").
				End("+").
				Build(), func(reader *bufio.Reader) error {

				respReader := NewReader(reader)

				// Expect array of entries
				count, err := respReader.ExpectArray()
				if err != nil {
					return err
				}

				entryCount = int(count)
				results = make([][]byte, entryCount)

				// Parse each entry
				for j := int64(0); j < count; j++ {
					// Each entry is a 2-element array [id, fields]
					if err := respReader.ExpectArrayWithLen(2); err != nil {
						return err
					}

					// Read and discard ID
					if _, err := respReader.ReadStringBytes(); err != nil {
						return err
					}

					// Read field-value pairs
					fieldCount, err := respReader.ExpectArray()
					if err != nil {
						return err
					}

					if fieldCount != 2 {
						panic("expected 2 fields, got " + strconv.Itoa(int(fieldCount)))
					}

					_, err = respReader.ReadStringBytes() // Key.
					if err != nil {
						return err
					}
					buf, err := respReader.ReadStringBytes() // Value.
					if err != nil {
						return err
					}
					// copy buf.
					safeBuf := make([]byte, len(buf))
					copy(safeBuf, buf)
					results[j] = safeBuf
				}

				return nil
			})

			if err != nil {
				b.Fatal(err)
			}

			if len(results) != 1000 {
				b.Fatalf("expected 1000 entries, got %d", entryCount)
			}
		}
	})

	// Cleanup
	client.Do(ctx, client.B().Del().Key(streamKey).Build())
}

Results:

❯ go test -run xxx -bench BenchmarkXRange1000Entries -benchmem -memprofile mem
goos: darwin
goarch: arm64
pkg: github.com/redis/rueidis/resp
cpu: Apple M4
BenchmarkXRange1000Entries/Do_AsXRangeSlices-10         	    3450	    329988 ns/op	  306000 B/op	    6002 allocs/op
BenchmarkXRange1000Entries/DoWithReader_RespParse-10    	    4616	    258745 ns/op	   32867 B/op	    1004 allocs/op

To simplify parsing the idea is introducing new package in rueidis called resp with resp.Reader helper. The example above already uses the prototype of that package. It's a wrapper to read values.

Most of the code here is generated by LLM, so maybe I missed something important which ruins the idea.

Some design ideas which I had in mind:

  • It's low-level and dangerous and must be marked accordingly - response must be fully read by user to avoid corrupting state. But generally proper tests in app should help here.
  • The idea was to parse errors on rueidis level - so that it works natively with Redis Cluster.
  • It uses pool now, for my use case it's totally fine, because requests which return a lot of data (not so much actually, like 100-1000 rows in array) from Redis are better to send via pool instead of pipeline I suppose. Still, maybe there is a way to extend this to pipeline mode, but seems not trivial.
  • I am not sure what to do with streaming responses and blocking commands for now, did not look into it. Having a possibility to XREAD without allocs is also good (or maybe it's already possible?)
  • Now in PR we handle Nil response on rueidis level - most probably must be handled by user (or documented)
  • No tests for now

There are more open questions, which I'll postpone for now waiting for your general feedback on the above. Maybe there is a simpler way to achieve the same?

@FZambia
Copy link
Contributor Author

FZambia commented Jan 12, 2026

Hmm, I think I was so glad about results that missed that strings were returned unsafe in my bench. It seems results are invalid here. The benefit exists, but it's not that huge in all cases. Let me re-evaluate and reopen later with proper bench if the benefit will be obvious still.

@FZambia FZambia closed this Jan 12, 2026
@FZambia
Copy link
Contributor Author

FZambia commented Jan 12, 2026

Yep, so the original benchmark I used in description had a problem - missing data copy. But the bench which was reading from Redis stream seems correct - data is properly copied, and the benefit is sufficient - I updated PR description to use proper bench.

@FZambia FZambia reopened this Jan 12, 2026
@FZambia FZambia changed the title DoWithReader for zero-alloc Redis response parsing DoWithReader for reduced allocations during Redis response parsing Jan 12, 2026
@FZambia
Copy link
Contributor Author

FZambia commented Jan 12, 2026

Added BenchmarkXRange1000Entries benchmark instead of my custom scripts and updated description again.

UPD. And more benchmarks for XREAD and blocking XREAD:

BenchmarkXRange/Do_AsXRangeSlices-10         	    3213	    359484 ns/op	  306001 B/op	    6002 allocs/op
BenchmarkXRange/DoWithReader-10              	    4567	    255455 ns/op	   32878 B/op	    1004 allocs/op
BenchmarkXRead/Do_AsXRead-10                 	    3128	    373835 ns/op	  594188 B/op	    7007 allocs/op
BenchmarkXRead/DoWithReader-10               	    4659	    252882 ns/op	    8300 B/op	    1005 allocs/op
BenchmarkBlockingXRead/Do_AsXRead-10         	       9	 126898982 ns/op	  584266 B/op	    3001 allocs/op
BenchmarkBlockingXRead/DoWithReader-10       	       8	 130826203 ns/op	  142813 B/op	     815 allocs/op

@rueian
Copy link
Collaborator

rueian commented Jan 13, 2026

Hi @FZambia,

Seems like you want to only pick certain parts from a complex RESP response. Given that the DoWithReader implementation is similar to DoStream, I think we should extend DoStream instead.

I noticed DoStream, but it seems very limited in supported types, and the purpose of it seems different.

The purpose of RedisResultStream.WriteTo is writing out a simple RESP response as bytes. I think we can introduce something like RedisResultStream.IterateStrings for complex types and somehow give users choices to skip certain allocations.

For example:

var results []string
for val := range c.DoStream(ctx, cmd).IterateStrings(func(typ byte, length, depth, index int) bool {
    if typ == '*' {
        return true // go into the array
    } else if typ == '$' && index % 2 == 1 {
        return true // read only values from key value pairs
    }
    return false // discard anything else
}) {
    results = append(results, val)
}

The idea was to parse errors on rueidis level - so that it works natively with Redis Cluster.

I feel like this can still be done in DoStream. It is just not implemented yet.

It's low-level and dangerous and must be marked accordingly - response must be fully read by user to avoid corrupting state. But generally proper tests in app should help here.

Yes, the NewReader approach is dangerous. It hands the connection over to users directly, which basically means the connection can't be reused and can't be pipelined. I would prefer we only provide users with choices to skip certain allocations instead of letting users control the parsing flow entirely.

@FZambia
Copy link
Contributor Author

FZambia commented Jan 13, 2026

Hi, thanks for response!

Do you mean that it will work like picking some specific parts of the response and collecting slice of strings? Generally – yes, the goal is to avoid allocs during processing, just it's not always values from one level, and sometimes I need more complex parsing of byte streams than just saving a raw value, sometimes parsing some prefixes, sometimes extracting int64 from bytes directly. From this perspective the feeling is that while resp.Reader is more heavy - it may be the best tool to handle all possible cases and avoid limitations.

In benchmarks I appended here I demonstrated extracting a single value from each entry, but generally there are more complex cases where I have more complex data returned by Lua and where the end goal is not just picking a value from top-level array, but get some other values from top level array, and then collect entries from array inside array (result of xrange).

It seems it will be still more allocs than needed - because with streaming approach it's possible to avoid one extra copy for each stream entry because we can convert it to a struct during iteration. While with iteration - most probably it will be extra copy to safely provide string val.

BTW, I missed the fact that DoStream may be extended in a way to add more methods to RedisResultStream. If it's possible to extending it instead of separate DoWithReader – it is definitely better.

Let me try to provide another benchmark with Lua to demonstrate a more real-life use case, to evaluate it over IterateStrings-like idea.

@rueian
Copy link
Collaborator

rueian commented Jan 13, 2026

From this perspective the feeling is that while resp.Reader is more heavy - it may be the best tool to handle all possible cases and avoid limitations.

Agree, the Reader is actually like how rueidis parses messages internally. It is for sure without limitations. I think it is fine to provide the Reader under DoStream via RedisResultStream, given that they both can't be pipelined. We just need to be careful about whether the connection can be reused or not after being read by the user.

While with iteration - most probably it will be extra copy to safely provide string val.

I think we can also provide the result of buf.Peek to the IterateStrings.

var results []string
for range c.DoStream(ctx, cmd).IterateStrings(func(typ byte, intval, depth, index int, peak []byte) bool {
    if typ == '*' {
        return true // go into the array
    } else if typ == '$' && index % 2 == 1 {
        if intval == len(peak) {
            results = append(results, string(peak))
        }
    }
    return false // discard anything else
}) {
    // nothing
}

However, there is always a risk that the peak may fall short.

@FZambia
Copy link
Contributor Author

FZambia commented Jan 13, 2026

Added one more bench which is close to what I have in real-life - BenchmarkLuaScript:

BenchmarkLuaScript/Do_Standard-10         	    7766	    152032 ns/op	   26896 B/op	     605 allocs/op
BenchmarkLuaScript/DoWithReader-10        	    8155	    139422 ns/op	    4466 B/op	     106 allocs/op

Maybe it can help you to evaluate what fits better here? For now it's hard for me to imagine how to use IterateStrings properly and efficiently while Reader is just a streaming reading (of course with complexities, but it feels natural).

Agree, the Reader is actually like how rueidis parses messages internally. It is for sure without limitations. I think it is fine to provide the Reader under DoStream via RedisResultStream, given that they both can't be pipelined. We just need to be careful about whether the connection can be reused or not after being read by the user.

Maybe it's possible to somehow make the API more explicit to reduce chance of mistake, some ideas:

  1. Avoid exposing *bufio.Reader in callback - only *resp.Reader. Generally it may help to avoid completely arbitrary reads from buffer. It seems its methods may fully cover main use cases.
  2. I am not sure how exactly - but maybe it's possible to detect that reply has not been fully read? 🤔 At least keeping an error internally and not relying on user to return an error. IDK - this is the main idea here, and the price for massive allocs reduction..
  3. But even without these points a clear documentation with corner cases about data copies and warnings that user must be super accurate should be enough I think, and connection already closed in case of any error.

The feeling is that it's hard to forget what is possible to achieve allocation wise after looking at these numbers..

@rueian
Copy link
Collaborator

rueian commented Jan 14, 2026

Maybe it can help you to evaluate what fits better here? For now it's hard for me to imagine how to use IterateStrings properly and efficiently while Reader is just a streaming reading (of course with complexities, but it feels natural).

The iterate function may look like this for your example:

func (typ byte, intval, depth, index int, peak []byte) bool {
	switch depth {
	case 0:
		return typ == '*'
	case 1:
		switch {
		case index == 0 && typ == ':':
			offset = uint64(intval)
		case index == 0 && typ == '$' && intval == len(peak):
			offset, _ = strconv.ParseUint(string(peak), 10, 64)
		case index == 1 && typ == '$' && intval == len(peak):
			epoch = string(peak)
		case index == 2 && typ == '*':
			pubs = make([]BenchPub, 0, intval)
			return true
		}
	case 2:
		return typ == '*'
	case 3:
		switch {
		case index == 0 && typ == '$' && intval == len(peak):
			hyphenIdx := bytes.IndexByte(peak, '-')
			pubOffset, _ = strconv.ParseUint(unsafe.String(peak[:hyphenIdx], hyphenIdx), 10, 64)
		case index == 1 && typ == '*':
			return true
		}
	case 4:
		switch {
		case index == 0 && typ == '$' && len(peak) == 1 && peak[0] == 'd':
			isdata = true
		case index == 1 && typ == '$' && intval == len(peak) && isdata:
			pubs = append(pubs, BenchPub{Offset: pubOffset, Data: string(peak)})
		}
	}
	return false
}

Yes, this may feel a bit unnatural and may require some state management.

Avoid exposing *bufio.Reader in callback - only *resp.Reader. Generally it may help to avoid completely arbitrary reads from buffer. It seems its methods may fully cover main use cases.

Yes, we should not expose *bufio.Reader, and even with *resp.Reader, we should make sure the reader can't be used after reading the response that belongs to it.

I am not sure how exactly - but maybe it's possible to detect that reply has not been fully read? 🤔 At least keeping an error internally and not relying on user to return an error. IDK - this is the main idea here, and the price for massive allocs reduction..

We probably need to stack how many messages are left in each depth. We can't return a connection until it is in depth 0 and there are no messages left in depth 0.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants