@@ -10,7 +10,7 @@ import (
1010 "github.com/redis/rueidis"
1111)
1212
13- func BenchmarkXRange1000Entries (b * testing.B ) {
13+ func BenchmarkXRange (b * testing.B ) {
1414 ctx := context .Background ()
1515 client , err := rueidis .NewClient (rueidis.ClientOption {InitAddress : []string {"127.0.0.1:6379" }})
1616 if err != nil {
@@ -24,8 +24,10 @@ func BenchmarkXRange1000Entries(b *testing.B) {
2424 // Clean up any existing stream
2525 client .Do (ctx , client .B ().Del ().Key (streamKey ).Build ())
2626
27+ totalEntries := 1000
28+
2729 // Add 1000 entries to the stream
28- for i := 0 ; i < 1000 ; i ++ {
30+ for i := 0 ; i < totalEntries ; i ++ {
2931 err := client .Do (ctx , client .B ().Xadd ().
3032 Key (streamKey ).
3133 Id ("*" ).
@@ -58,8 +60,8 @@ func BenchmarkXRange1000Entries(b *testing.B) {
5860 b .Fatal (err )
5961 }
6062
61- if len (entries ) != 1000 {
62- b .Fatalf ("expected 1000 entries, got %d" , len (entries ))
63+ if len (entries ) != totalEntries {
64+ b .Fatalf ("expected %d entries, got %d" , totalEntries , len (entries ))
6365 }
6466 }
6567 })
@@ -133,8 +135,8 @@ func BenchmarkXRange1000Entries(b *testing.B) {
133135 b .Fatal (err )
134136 }
135137
136- if len (results ) != 1000 {
137- b .Fatalf ("expected 1000 entries, got %d" , entryCount )
138+ if len (results ) != totalEntries {
139+ b .Fatalf ("expected %d entries, got %d" , totalEntries , entryCount )
138140 }
139141 }
140142 })
@@ -143,13 +145,16 @@ func BenchmarkXRange1000Entries(b *testing.B) {
143145 client .Do (ctx , client .B ().Del ().Key (streamKey ).Build ())
144146}
145147
148+ var testVal []byte
149+
146150func process (val []byte ) {
147151 if len (val ) == 0 {
148152 panic ("empty value" )
149153 }
154+ testVal = val
150155}
151156
152- func BenchmarkXRead1000Entries (b * testing.B ) {
157+ func BenchmarkXRead (b * testing.B ) {
153158 ctx := context .Background ()
154159 client , err := rueidis .NewClient (rueidis.ClientOption {InitAddress : []string {"127.0.0.1:6379" }})
155160 if err != nil {
@@ -163,8 +168,11 @@ func BenchmarkXRead1000Entries(b *testing.B) {
163168 // Clean up any existing stream
164169 client .Do (ctx , client .B ().Del ().Key (streamKey ).Build ())
165170
166- // Add 1000 entries to the stream
167- for i := 0 ; i < 1000 ; i ++ {
171+ totalEntries := 1000
172+ var batchSize int64 = 1000
173+
174+ // Add entries to the stream
175+ for i := 0 ; i < totalEntries ; i ++ {
168176 err := client .Do (ctx , client .B ().Xadd ().
169177 Key (streamKey ).
170178 Id ("*" ).
@@ -183,12 +191,12 @@ func BenchmarkXRead1000Entries(b *testing.B) {
183191
184192 for b .Loop () {
185193 lastID := "0-0"
186- totalEntries := 0
194+ consumedEntries := 0
187195
188- // Read in batches of 10 until we've read all 1000 entries
189- for totalEntries < 1000 {
196+ // Read in batches until we've read all entries
197+ for consumedEntries < totalEntries {
190198 result := client .Do (ctx , client .B ().Xread ().
191- Count (100 ).
199+ Count (batchSize ).
192200 Streams ().
193201 Key (streamKey ).
194202 Id (lastID ).
@@ -212,12 +220,12 @@ func BenchmarkXRead1000Entries(b *testing.B) {
212220 break
213221 }
214222
215- totalEntries += len (entries )
223+ consumedEntries += len (entries )
216224 lastID = entries [len (entries )- 1 ].ID
217225 }
218226
219- if totalEntries != 1000 {
220- b .Fatalf ("expected 1000 entries, got %d" , totalEntries )
227+ if consumedEntries != totalEntries {
228+ b .Fatalf ("expected %d entries, got %d" , totalEntries , consumedEntries )
221229 }
222230 }
223231 })
@@ -229,15 +237,15 @@ func BenchmarkXRead1000Entries(b *testing.B) {
229237
230238 for b .Loop () {
231239 lastID := "0-0"
232- totalEntries := 0
240+ consumedEntries := 0
233241
234- // Read in batches of 10 until we've read all 1000 entries
235- for totalEntries < 1000 {
242+ // Read in batches until we've read all entries
243+ for consumedEntries < totalEntries {
236244 var batchEntries int
237245 var newLastID string
238246
239247 err := client .DoWithReader (ctx , client .B ().Xread ().
240- Count (100 ).
248+ Count (batchSize ).
241249 Streams ().
242250 Key (streamKey ).
243251 Id (lastID ).
@@ -359,12 +367,12 @@ func BenchmarkXRead1000Entries(b *testing.B) {
359367 break
360368 }
361369
362- totalEntries += batchEntries
370+ consumedEntries += batchEntries
363371 lastID = newLastID
364372 }
365373
366- if totalEntries != 1000 {
367- b .Fatalf ("expected 1000 entries, got %d" , totalEntries )
374+ if consumedEntries != totalEntries {
375+ b .Fatalf ("expected %d entries, got %d" , totalEntries , consumedEntries )
368376 }
369377 }
370378 })
@@ -373,7 +381,7 @@ func BenchmarkXRead1000Entries(b *testing.B) {
373381 client .Do (ctx , client .B ().Del ().Key (streamKey ).Build ())
374382}
375383
376- func BenchmarkXReadStreaming (b * testing.B ) {
384+ func BenchmarkBlockingXRead (b * testing.B ) {
377385 ctx := context .Background ()
378386 client , err := rueidis .NewClient (rueidis.ClientOption {InitAddress : []string {"127.0.0.1:6379" }})
379387 if err != nil {
@@ -386,6 +394,9 @@ func BenchmarkXReadStreaming(b *testing.B) {
386394 // Clean up any existing stream
387395 client .Do (ctx , client .B ().Del ().Key (streamKey ).Build ())
388396
397+ totalEntries := 100
398+ var batchSize int64 = 1
399+
389400 // Benchmark 1: Using client.Do with AsXRead
390401 b .Run ("Do_AsXRead" , func (b * testing.B ) {
391402 b .ResetTimer ()
@@ -420,11 +431,12 @@ func BenchmarkXReadStreaming(b *testing.B) {
420431 }()
421432
422433 lastID := "$" // Start from new entries only
423- totalEntries := 0
434+ consumedEntries := 0
424435
425436 // Consumer: read entries as they arrive using blocking XREAD
426- for totalEntries < 100 {
437+ for consumedEntries < totalEntries {
427438 result := client .Do (ctx , client .B ().Xread ().
439+ Count (batchSize ).
428440 Block (1000 ).
429441 Streams ().
430442 Key (streamKey ).
@@ -446,15 +458,15 @@ func BenchmarkXReadStreaming(b *testing.B) {
446458
447459 entries := streams [streamKey ]
448460 if len (entries ) > 0 {
449- totalEntries += len (entries )
461+ consumedEntries += len (entries )
450462 lastID = entries [len (entries )- 1 ].ID
451463 }
452464 }
453465
454466 close (done )
455467
456- if totalEntries != 100 {
457- b .Fatalf ("expected 100 entries, got %d" , totalEntries )
468+ if consumedEntries != totalEntries {
469+ b .Fatalf ("expected %d entries, got %d" , totalEntries , consumedEntries )
458470 }
459471 }
460472 })
@@ -493,15 +505,16 @@ func BenchmarkXReadStreaming(b *testing.B) {
493505 }()
494506
495507 lastID := "$" // Start from new entries only
496- totalEntries := 0
508+ consumedEntries := 0
497509
498510 // Consumer: read entries as they arrive using blocking XREAD
499- for totalEntries < 100 {
511+ for consumedEntries < totalEntries {
500512 var batchEntries int
501513 var newLastID string
502514
503515 err := client .DoWithReader (ctx , client .B ().Xread ().
504- Block (1000 ). // Block for up to 1 second
516+ Count (batchSize ).
517+ Block (1000 ).
505518 Streams ().
506519 Key (streamKey ).
507520 Id (lastID ).
@@ -612,15 +625,15 @@ func BenchmarkXReadStreaming(b *testing.B) {
612625 }
613626
614627 if batchEntries > 0 {
615- totalEntries += batchEntries
628+ consumedEntries += batchEntries
616629 lastID = newLastID
617630 }
618631 }
619632
620633 close (done )
621634
622- if totalEntries != 100 {
623- b .Fatalf ("expected 100 entries, got %d" , totalEntries )
635+ if consumedEntries != totalEntries {
636+ b .Fatalf ("expected %d entries, got %d" , totalEntries , consumedEntries )
624637 }
625638 }
626639 })
0 commit comments