Skip to content

Commit f65f4dc

Browse files
committed
feat: add availability zone awareness for standalone client
Signed-off-by: Rueian <[email protected]>
1 parent 8fd75f0 commit f65f4dc

File tree

2 files changed

+17
-23
lines changed

2 files changed

+17
-23
lines changed

standalone.go

Lines changed: 15 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -74,9 +74,9 @@ func (s *standalone) B() Builder {
7474
return s.primary.Load().B()
7575
}
7676

77-
func (s *standalone) pick() *singleClient {
77+
func (s *standalone) pick(slot uint16) *singleClient {
7878
if s.nodeSelector != nil {
79-
rIndex := s.nodeSelector(0, s.nodes)
79+
rIndex := s.nodeSelector(slot, s.nodes)
8080
if rIndex < 0 || rIndex >= len(s.nodes) {
8181
rIndex = 0
8282
}
@@ -134,7 +134,7 @@ func (s *standalone) Do(ctx context.Context, cmd Completed) (resp RedisResult) {
134134

135135
retry:
136136
if s.toReplicas != nil && s.toReplicas(cmd) {
137-
resp = s.pick().Do(ctx, cmd)
137+
resp = s.pick(cmd.Slot()).Do(ctx, cmd)
138138
} else {
139139
resp = s.primary.Load().Do(ctx, cmd)
140140
}
@@ -164,15 +164,12 @@ func (s *standalone) DoMulti(ctx context.Context, multi ...Completed) (resp []Re
164164
}
165165

166166
retry:
167-
toReplica := true
168-
for _, cmd := range multi {
169-
if s.toReplicas == nil || !s.toReplicas(cmd) {
170-
toReplica = false
171-
break
172-
}
167+
toReplica := s.toReplicas != nil
168+
for i := 0; i < len(multi) && toReplica; i++ {
169+
toReplica = s.toReplicas(multi[i])
173170
}
174-
if toReplica {
175-
resp = s.pick().DoMulti(ctx, multi...)
171+
if toReplica && len(multi) > 0 {
172+
resp = s.pick(multi[0].Slot()).DoMulti(ctx, multi...)
176173
} else {
177174
resp = s.primary.Load().DoMulti(ctx, multi...)
178175
}
@@ -199,7 +196,7 @@ retry:
199196

200197
func (s *standalone) Receive(ctx context.Context, subscribe Completed, fn func(msg PubSubMessage)) error {
201198
if s.toReplicas != nil && s.toReplicas(subscribe) {
202-
return s.pick().Receive(ctx, subscribe, fn)
199+
return s.pick(subscribe.Slot()).Receive(ctx, subscribe, fn)
203200
}
204201
return s.primary.Load().Receive(ctx, subscribe, fn)
205202
}
@@ -269,7 +266,7 @@ retry:
269266
func (s *standalone) DoStream(ctx context.Context, cmd Completed) RedisResultStream {
270267
var stream RedisResultStream
271268
if s.toReplicas != nil && s.toReplicas(cmd) {
272-
stream = s.pick().DoStream(ctx, cmd)
269+
stream = s.pick(cmd.Slot()).DoStream(ctx, cmd)
273270
} else {
274271
stream = s.primary.Load().DoStream(ctx, cmd)
275272
}
@@ -278,15 +275,12 @@ func (s *standalone) DoStream(ctx context.Context, cmd Completed) RedisResultStr
278275

279276
func (s *standalone) DoMultiStream(ctx context.Context, multi ...Completed) MultiRedisResultStream {
280277
var stream MultiRedisResultStream
281-
toReplica := true
282-
for _, cmd := range multi {
283-
if s.toReplicas == nil || !s.toReplicas(cmd) {
284-
toReplica = false
285-
break
286-
}
278+
toReplica := s.toReplicas != nil
279+
for i := 0; i < len(multi) && toReplica; i++ {
280+
toReplica = s.toReplicas(multi[i])
287281
}
288-
if toReplica {
289-
stream = s.pick().DoMultiStream(ctx, multi...)
282+
if toReplica && len(multi) > 0 {
283+
stream = s.pick(multi[0].Slot()).DoMultiStream(ctx, multi...)
290284
} else {
291285
stream = s.primary.Load().DoMultiStream(ctx, multi...)
292286
}

standalone_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -604,7 +604,7 @@ func TestStandalonePickReplica(t *testing.T) {
604604
defer s.Close()
605605

606606
// Test that pick() returns the single replica
607-
client := s.pick()
607+
client := s.pick(0)
608608
if client != s.replicas[0] {
609609
t.Errorf("expected replica client, got different client")
610610
}
@@ -683,7 +683,7 @@ func TestStandalonePickMultipleReplicas(t *testing.T) {
683683

684684
// Test that pick() returns a valid replica for multiple replicas
685685
for i := 0; i < 10; i++ {
686-
client := s.pick()
686+
client := s.pick(0)
687687
if client != s.replicas[0] && client != s.replicas[1] {
688688
t.Errorf("expected one of the replica clients, got different client")
689689
}

0 commit comments

Comments
 (0)