Skip to content

Commit b28b279

Browse files
nickngsrNick Reesrueian
committed
feat: add ability to get AZ from an INFO command (#105)
* feat: add ability to get AZ from an INFO command Signed-off-by: nickngsr <nick@logicallyus.com> * docs: update Availability Affinity Routing to include hint about AZFromInfo Signed-off-by: nickngsr <nick@logicallyus.com> * fix: use raw string on infoAZ regex Signed-off-by: nickngsr <nick@logicallyus.com> * Update valkey.go Signed-off-by: Rueian <rueiancsie@gmail.com> --------- Signed-off-by: nickngsr <nick@logicallyus.com> Signed-off-by: Rueian <rueiancsie@gmail.com> Co-authored-by: Nick Rees <nick@logicallyus.com> Co-authored-by: Rueian <rueiancsie@gmail.com> Signed-off-by: Rueian <rueiancsie@gmail.com>
1 parent fa8f4fc commit b28b279

File tree

4 files changed

+209
-10
lines changed

4 files changed

+209
-10
lines changed

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -557,6 +557,9 @@ client, err := rueidis.NewClient(rueidis.ClientOption{
557557
})
558558
```
559559

560+
For deployments that only provide the availability zone via the INFO command, set the `AZFromInfo`
561+
option as well as `EnableReplicaAZInfo`.
562+
560563
## Arbitrary Command
561564

562565
If you want to construct commands that are absent from the command builder, you can use `client.B().Arbitrary()`:

pipe.go

Lines changed: 38 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,10 @@ import (
2323
const LibName = "rueidis"
2424
const LibVer = "1.0.70"
2525

26-
var noHello = regexp.MustCompile("unknown command .?(HELLO|hello).?")
26+
var (
27+
noHello = regexp.MustCompile("unknown command .?(HELLO|hello).?")
28+
infoAZ = regexp.MustCompile(`availability_zone:([^\r\n]+)`)
29+
)
2730

2831
// See https://github.com/redis/rueidis/pull/691
2932
func isUnsubReply(msg *RedisMessage) bool {
@@ -168,15 +171,22 @@ func _newPipe(ctx context.Context, connFn func(context.Context) (net.Conn, error
168171
helloCmd = append(helloCmd, "SETNAME", option.ClientName)
169172
}
170173

171-
init := make([][]string, 0, 5)
172-
if option.ClientTrackingOptions == nil {
173-
init = append(init, helloCmd, []string{"CLIENT", "TRACKING", "ON", "OPTIN"})
174-
} else {
175-
init = append(init, helloCmd, append([]string{"CLIENT", "TRACKING", "ON"}, option.ClientTrackingOptions...))
174+
init := make([][]string, 0, 6)
175+
176+
init = append(init, helloCmd)
177+
178+
if option.EnableReplicaAZInfo && option.AZFromInfo {
179+
init = append(init, []string{"INFO", "SERVER"})
176180
}
177-
if option.DisableCache {
178-
init = init[:1]
181+
182+
if !option.DisableCache {
183+
if option.ClientTrackingOptions == nil {
184+
init = append(init, []string{"CLIENT", "TRACKING", "ON", "OPTIN"})
185+
} else {
186+
init = append(init, append([]string{"CLIENT", "TRACKING", "ON"}, option.ClientTrackingOptions...))
187+
}
179188
}
189+
180190
if option.SelectDB != 0 {
181191
init = append(init, []string{"SELECT", strconv.Itoa(option.SelectDB)})
182192
}
@@ -224,6 +234,14 @@ func _newPipe(ctx context.Context, connFn func(context.Context) (net.Conn, error
224234
for i, r := range resp.s[:count] {
225235
if i == 0 {
226236
p.info, err = r.AsMap()
237+
} else if i == 1 && option.EnableReplicaAZInfo && option.AZFromInfo {
238+
var infoStr string
239+
infoStr, err = r.ToString()
240+
if err == nil {
241+
if sm := infoAZ.FindStringSubmatch(infoStr); len(sm) > 1 {
242+
p.info["availability_zone"] = strmsg('+', sm[1])
243+
}
244+
}
227245
} else {
228246
err = r.Error()
229247
}
@@ -271,6 +289,9 @@ func _newPipe(ctx context.Context, connFn func(context.Context) (net.Conn, error
271289
}
272290
helloIndex := len(init)
273291
init = append(init, []string{"HELLO", "2"})
292+
if option.EnableReplicaAZInfo && option.AZFromInfo {
293+
init = append(init, []string{"INFO", "SERVER"})
294+
}
274295
if option.ClientName != "" {
275296
init = append(init, []string{"CLIENT", "SETNAME", option.ClientName})
276297
}
@@ -324,6 +345,14 @@ func _newPipe(ctx context.Context, connFn func(context.Context) (net.Conn, error
324345
}
325346
if i == helloIndex {
326347
p.info, err = r.AsMap()
348+
} else if option.EnableReplicaAZInfo && option.AZFromInfo && i == helloIndex+1 {
349+
var infoStr string
350+
infoStr, err = r.ToString()
351+
if err == nil {
352+
if sm := infoAZ.FindStringSubmatch(infoStr); len(sm) > 1 {
353+
p.info["availability_zone"] = strmsg('+', sm[1])
354+
}
355+
}
327356
}
328357
}
329358
}
@@ -652,7 +681,7 @@ func (p *pipe) _backgroundRead() (err error) {
652681

653682
func (p *pipe) backgroundPing() {
654683
var prev, recv int32
655-
var mu sync.Mutex
684+
var mu sync.Mutex
656685

657686
mu.Lock()
658687
defer mu.Unlock()

pipe_test.go

Lines changed: 164 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,15 @@ func (r *redisExpect) ReplyBlobString(replies ...string) *redisExpect {
7676
return r
7777
}
7878

79+
func (r *redisExpect) ReplyVerbatimString(replies ...string) *redisExpect {
80+
for _, reply := range replies {
81+
if r.err == nil {
82+
r.Reply(strmsg('=', reply))
83+
}
84+
}
85+
return r
86+
}
87+
7988
func (r *redisExpect) ReplyError(replies ...string) *redisExpect {
8089
for _, reply := range replies {
8190
if r.err == nil {
@@ -110,7 +119,7 @@ func (r *redisMock) Close() {
110119
func write(o io.Writer, m RedisMessage) (err error) {
111120
_, err = o.Write([]byte{m.typ})
112121
switch m.typ {
113-
case '$':
122+
case '$', '=':
114123
_, _ = o.Write(append([]byte(strconv.Itoa(len(m.string()))), '\r', '\n'))
115124
_, err = o.Write(append([]byte(m.string()), '\r', '\n'))
116125
case '+', '-', '_':
@@ -251,6 +260,81 @@ func TestNewPipe(t *testing.T) {
251260
n1.Close()
252261
n2.Close()
253262
})
263+
t.Run("Auth without Username AZ from INFO", func(t *testing.T) {
264+
n1, n2 := net.Pipe()
265+
mock := &redisMock{buf: bufio.NewReader(n2), conn: n2, t: t}
266+
go func() {
267+
mock.Expect("HELLO", "3", "AUTH", "default", "pa", "SETNAME", "cn").
268+
Reply(slicemsg(
269+
'%',
270+
[]RedisMessage{
271+
strmsg('+', "proto"),
272+
{typ: ':', intlen: 3},
273+
},
274+
))
275+
mock.Expect("INFO", "SERVER").
276+
ReplyVerbatimString(`txt:# Server
277+
redis_version:7.2.4
278+
server_name:valkey
279+
valkey_version:7.3.0
280+
redis_git_sha1:0
281+
redis_git_dirty:0
282+
redis_build_id:0
283+
redis_mode:cluster
284+
os:Amazon MemoryDB
285+
arch_bits:64
286+
monotonic_clock:ARM CNTVCT @ 121 ticks/us
287+
multiplexing_api:epoll
288+
atomicvar_api:c11-builtin
289+
gcc_version:0.0.0
290+
process_id:1
291+
run_id:63c3785f34bcf381b8652586ad4e57779ff59ede
292+
tcp_port:6379
293+
server_time_usec:1768651545574803
294+
uptime_in_seconds:1451582
295+
uptime_in_days:16
296+
hz:10
297+
configured_hz:10
298+
lru_clock:7043865
299+
executable:-
300+
config_file:-
301+
availability_zone:us-west-1a
302+
`)
303+
mock.Expect("CLIENT", "TRACKING", "ON", "OPTIN").
304+
ReplyString("OK")
305+
mock.Expect("SELECT", "1").
306+
ReplyString("OK")
307+
mock.Expect("CLIENT", "NO-TOUCH", "ON").
308+
ReplyString("OK")
309+
mock.Expect("CLIENT", "NO-EVICT", "ON").
310+
ReplyString("OK")
311+
mock.Expect("CLIENT", "SETINFO", "LIB-NAME", "libname").
312+
ReplyString("OK")
313+
mock.Expect("CLIENT", "SETINFO", "LIB-VER", "1").
314+
ReplyString("OK")
315+
}()
316+
p, err := newPipe(context.Background(), func(ctx context.Context) (net.Conn, error) { return n1, nil }, &ClientOption{
317+
SelectDB: 1,
318+
Password: "pa",
319+
ClientName: "cn",
320+
ClientNoEvict: true,
321+
ClientSetInfo: []string{"libname", "1"},
322+
ClientNoTouch: true,
323+
EnableReplicaAZInfo: true,
324+
AZFromInfo: true,
325+
})
326+
if err != nil {
327+
t.Fatalf("pipe setup failed: %v", err)
328+
}
329+
go func() { mock.Expect("PING").ReplyString("OK") }()
330+
if p.AZ() != "us-west-1a" {
331+
t.Fatalf("unexpected az: %v", p.AZ())
332+
}
333+
p.Close()
334+
mock.Close()
335+
n1.Close()
336+
n2.Close()
337+
})
254338
t.Run("AlwaysRESP2", func(t *testing.T) {
255339
n1, n2 := net.Pipe()
256340
mock := &redisMock{buf: bufio.NewReader(n2), conn: n2, t: t}
@@ -302,6 +386,85 @@ func TestNewPipe(t *testing.T) {
302386
n1.Close()
303387
n2.Close()
304388
})
389+
t.Run("AlwaysRESP2 AZ from INFO", func(t *testing.T) {
390+
n1, n2 := net.Pipe()
391+
mock := &redisMock{buf: bufio.NewReader(n2), conn: n2, t: t}
392+
go func() {
393+
mock.Expect("AUTH", "pa").
394+
ReplyString("OK")
395+
mock.Expect("HELLO", "2").
396+
Reply(slicemsg(
397+
'*',
398+
[]RedisMessage{
399+
strmsg('+', "proto"),
400+
{typ: ':', intlen: 2},
401+
},
402+
))
403+
mock.Expect("INFO", "SERVER").
404+
ReplyBlobString(`# Server
405+
redis_version:7.2.4
406+
server_name:valkey
407+
valkey_version:7.3.0
408+
redis_git_sha1:0
409+
redis_git_dirty:0
410+
redis_build_id:0
411+
redis_mode:cluster
412+
os:Amazon MemoryDB
413+
arch_bits:64
414+
monotonic_clock:ARM CNTVCT @ 121 ticks/us
415+
multiplexing_api:epoll
416+
atomicvar_api:c11-builtin
417+
gcc_version:0.0.0
418+
process_id:1
419+
run_id:63c3785f34bcf381b8652586ad4e57779ff59ede
420+
tcp_port:6379
421+
server_time_usec:1768651545574803
422+
uptime_in_seconds:1451582
423+
uptime_in_days:16
424+
hz:10
425+
configured_hz:10
426+
lru_clock:7043865
427+
executable:-
428+
config_file:-
429+
availability_zone:us-west-1a
430+
`)
431+
mock.Expect("CLIENT", "SETNAME", "cn").
432+
ReplyString("OK")
433+
mock.Expect("SELECT", "1").
434+
ReplyString("OK")
435+
mock.Expect("CLIENT", "NO-TOUCH", "ON").
436+
ReplyString("OK")
437+
mock.Expect("CLIENT", "NO-EVICT", "ON").
438+
ReplyString("OK")
439+
mock.Expect("CLIENT", "SETINFO", "LIB-NAME", "libname").
440+
ReplyString("OK")
441+
mock.Expect("CLIENT", "SETINFO", "LIB-VER", "1").
442+
ReplyString("OK")
443+
}()
444+
p, err := newPipe(context.Background(), func(ctx context.Context) (net.Conn, error) { return n1, nil }, &ClientOption{
445+
SelectDB: 1,
446+
Password: "pa",
447+
ClientName: "cn",
448+
ClientNoEvict: true,
449+
ClientSetInfo: []string{"libname", "1"},
450+
ClientNoTouch: true,
451+
AlwaysRESP2: true,
452+
DisableCache: true,
453+
EnableReplicaAZInfo: true,
454+
AZFromInfo: true,
455+
})
456+
if err != nil {
457+
t.Fatalf("pipe setup failed: %v", err)
458+
}
459+
go func() { mock.Expect("PING").ReplyString("OK") }()
460+
if p.AZ() != "us-west-1a" {
461+
t.Fatalf("unexpected az: %v", p.AZ())
462+
}
463+
p.Close()
464+
mock.Close()
465+
n1.Close()
466+
n2.Close()
467+
})
305468
t.Run("Auth with Username", func(t *testing.T) {
306469
n1, n2 := net.Pipe()
307470
mock := &redisMock{buf: bufio.NewReader(n2), conn: n2, t: t}

rueidis.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,10 @@ type ClientOption struct {
281281
// EnableReplicaAZInfo enables the client to load the replica node's availability zone.
282282
// If true, the client will set the `AZ` field in `ReplicaInfo`.
283283
EnableReplicaAZInfo bool
284+
285+
// AZFromInfo forces the `availability_zone` field to be taken from an INFO command instead of HELLO.
286+
// Primarily used for AWS MemoryDB.
287+
AZFromInfo bool
284288
}
285289

286290
// SentinelOption contains MasterSet,

0 commit comments

Comments
 (0)