Skip to content

Commit 2123040

Browse files
derekperkinsburaksezer
authored andcommitted
feat: add PipelineConcurrency option
1 parent 3599b76 commit 2123040

File tree

3 files changed

+28
-8
lines changed

3 files changed

+28
-8
lines changed

client.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,17 @@ type DMap interface {
236236
// results in case of big pipelines and small read/write timeouts.
237237
// Redis client has retransmission logic in case of timeouts, pipeline
238238
// can be retransmitted and commands can be executed more than once.
239-
Pipeline() (*DMapPipeline, error)
239+
Pipeline(opts ...PipelineOption) (*DMapPipeline, error)
240+
}
241+
242+
// PipelineOption is a function for defining options to control behavior of the Pipeline command.
243+
type PipelineOption func(pipeline *DMapPipeline)
244+
245+
// PipelineConcurrency is a PipelineOption controlling the number of concurrent goroutines.
246+
func PipelineConcurrency(concurrency int) PipelineOption {
247+
return func(dp *DMapPipeline) {
248+
dp.concurrency = concurrency
249+
}
240250
}
241251

242252
type statsConfig struct {

embedded_client.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ type EmbeddedDMap struct {
7272
// results in case of big pipelines and small read/write timeouts.
7373
// Redis client has retransmission logic in case of timeouts, pipeline
7474
// can be retransmitted and commands can be executed more than once.
75-
func (dm *EmbeddedDMap) Pipeline() (*DMapPipeline, error) {
75+
func (dm *EmbeddedDMap) Pipeline(opts ...PipelineOption) (*DMapPipeline, error) {
7676
cc, err := NewClusterClient([]string{dm.client.db.rt.This().String()})
7777
if err != nil {
7878
return nil, err
@@ -81,7 +81,7 @@ func (dm *EmbeddedDMap) Pipeline() (*DMapPipeline, error) {
8181
if err != nil {
8282
return nil, err
8383
}
84-
return cdm.Pipeline()
84+
return cdm.Pipeline(opts...)
8585
}
8686

8787
// RefreshMetadata fetches a list of available members and the latest routing

pipeline.go

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"bytes"
1919
"context"
2020
"errors"
21+
"runtime"
2122
"strconv"
2223
"sync"
2324
"time"
@@ -57,6 +58,8 @@ type DMapPipeline struct {
5758
result map[uint64][]redis.Cmder
5859
ctx context.Context
5960
cancel context.CancelFunc
61+
62+
concurrency int // defaults to runtime.NumCPU()
6063
}
6164

6265
func (dp *DMapPipeline) addCommand(key string, cmd redis.Cmder) (uint64, int) {
@@ -418,8 +421,7 @@ func (dp *DMapPipeline) Exec(ctx context.Context) error {
418421
defer dp.cancel()
419422

420423
var errGr errgroup.Group
421-
numCpu := 1
422-
sem := semaphore.NewWeighted(int64(numCpu))
424+
sem := semaphore.NewWeighted(int64(dp.concurrency))
423425
for i := uint64(0); i < dp.dm.clusterClient.partitionCount; i++ {
424426
err := sem.Acquire(ctx, 1)
425427
if err != nil {
@@ -494,15 +496,23 @@ func (dp *DMapPipeline) Close() {
494496
// results in case of big pipelines and small read/write timeouts.
495497
// Redis client has retransmission logic in case of timeouts, pipeline
496498
// can be retransmitted and commands can be executed more than once.
497-
func (dm *ClusterDMap) Pipeline() (*DMapPipeline, error) {
499+
func (dm *ClusterDMap) Pipeline(opts ...PipelineOption) (*DMapPipeline, error) {
498500
ctx, cancel := context.WithCancel(context.Background())
499-
return &DMapPipeline{
501+
dp := &DMapPipeline{
500502
dm: dm,
501503
commands: make(map[uint64][]redis.Cmder),
502504
result: make(map[uint64][]redis.Cmder),
503505
ctx: ctx,
504506
cancel: cancel,
505-
}, nil
507+
508+
concurrency: runtime.NumCPU(),
509+
}
510+
511+
for _, opt := range opts {
512+
opt(dp)
513+
}
514+
515+
return dp, nil
506516
}
507517

508518
// This stores a slice of commands for each partition. There is a possibility that a single

0 commit comments

Comments
 (0)