Skip to content

Commit c1d5dad

Browse files
committed
Add client capa
Signed-off-by: hwware <[email protected]>
1 parent cb91238 commit c1d5dad

File tree

8 files changed

+70
-23
lines changed

8 files changed

+70
-23
lines changed

src/networking.c

+2
Original file line numberDiff line numberDiff line change
@@ -4020,6 +4020,8 @@ void clientCommand(client *c) {
40204020
for (int i = 2; i < c->argc; i++) {
40214021
if (!strcasecmp(c->argv[i]->ptr, "redirect")) {
40224022
c->capa |= CLIENT_CAPA_REDIRECT;
4023+
} else if (!strcasecmp(c->argv[i]->ptr, "subv2")) {
4024+
c->capa |= CLIENT_CAPA_SUBV2;
40234025
}
40244026
}
40254027
addReply(c, shared.ok);

src/pubsub.c

+16-2
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,11 @@ void pubsubSubscribeChannel(client *c, robj *channel, pubsubtype type) {
285285
serverAssert(dictInsertAtPosition(type.clientPubSubChannels(c), channel, position));
286286
incrRefCount(channel);
287287
}
288+
289+
if (!(c->capa & CLIENT_CAPA_SUBV2)) {
290+
/* Notify the client */
291+
addReplyPubsubSubscribed(c, channel, type);
292+
}
288293
}
289294

290295
/* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or
@@ -567,7 +572,12 @@ void subscribeCommand(client *c) {
567572
addReplyError(c, "SUBSCRIBE isn't allowed for a DENY BLOCKING client");
568573
return;
569574
}
570-
addPubSubChannel(c, pubSubType);
575+
576+
if (c->capa & CLIENT_CAPA_SUBV2) {
577+
addPubSubChannel(c, pubSubType);
578+
} else {
579+
for (int j = 1; j < c->argc; j++) pubsubSubscribeChannel(c, c->argv[j], pubSubType);
580+
}
571581

572582
markClientAsPubSub(c);
573583
}
@@ -735,7 +745,11 @@ void ssubscribeCommand(client *c) {
735745
return;
736746
}
737747

738-
addPubSubChannel(c, pubSubShardType);
748+
if (c->capa & CLIENT_CAPA_SUBV2) {
749+
addPubSubChannel(c, pubSubShardType);
750+
} else {
751+
for (int j = 1; j < c->argc; j++) pubsubSubscribeChannel(c, c->argv[j], pubSubShardType);
752+
}
739753
markClientAsPubSub(c);
740754
}
741755

src/server.h

+1
Original file line numberDiff line numberDiff line change
@@ -343,6 +343,7 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
343343

344344
/* Client capabilities */
345345
#define CLIENT_CAPA_REDIRECT (1 << 0) /* Indicate that the client can handle redirection */
346+
#define CLIENT_CAPA_SUBV2 (1 << 1) /* Indicate that the client can handle pubsub v2 version */
346347

347348
/* Client block type (btype field in client structure)
348349
* if CLIENT_BLOCKED flag is set. */

src/valkey-cli.c

+16-1
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,7 @@ static struct config {
272272
char *test_hint_file;
273273
int prefer_ipv4; /* Prefer IPv4 over IPv6 on DNS lookup. */
274274
int prefer_ipv6; /* Prefer IPv6 over IPv4 on DNS lookup. */
275+
int pubsub_version;
275276
} config;
276277

277278
/* User preferences. */
@@ -2345,6 +2346,15 @@ static int cliSendCommand(int argc, char **argv, long repeat) {
23452346
config.output = OUTPUT_RAW;
23462347
}
23472348

2349+
if (!strcasecmp(command, "client") && argc >= 3 && !strcasecmp(argv[1], "capa")) {
2350+
for (int index = 2; index < argc; index++) {
2351+
if (!strcasecmp(argv[index], "subv2")) {
2352+
config.pubsub_version = 2;
2353+
break;
2354+
}
2355+
}
2356+
}
2357+
23482358
/* Setup argument length */
23492359
argvlen = zmalloc(argc * sizeof(size_t));
23502360
for (j = 0; j < argc; j++) argvlen[j] = sdslen(argv[j]);
@@ -2375,7 +2385,11 @@ static int cliSendCommand(int argc, char **argv, long repeat) {
23752385
* an in-band message is received, but these commands are confirmed
23762386
* using push replies only. There is one push reply per channel if
23772387
* channels are specified, otherwise at least one. */
2378-
num_expected_pubsub_push = 1;
2388+
if (config.pubsub_version == 2) {
2389+
num_expected_pubsub_push = 1;
2390+
} else {
2391+
num_expected_pubsub_push = argc > 1 ? argc - 1 : 1;
2392+
}
23792393
/* Unset our default PUSH handler so this works in RESP2/RESP3 */
23802394
redisSetPushCallback(context, NULL);
23812395
}
@@ -9535,6 +9549,7 @@ int main(int argc, char **argv) {
95359549
config.server_version = NULL;
95369550
config.prefer_ipv4 = 0;
95379551
config.prefer_ipv6 = 0;
9552+
config.pubsub_version = 1;
95389553
config.cluster_manager_command.name = NULL;
95399554
config.cluster_manager_command.argc = 0;
95409555
config.cluster_manager_command.argv = NULL;

tests/integration/valkey-cli.tcl

+8-2
Original file line numberDiff line numberDiff line change
@@ -204,11 +204,13 @@ start_server {tags {"cli"}} {
204204

205205
# Subscribe to some channels.
206206
set sub1 "1) \"subscribe\"\n2) \"ch1\"\n3) (integer) 1\n"
207-
set sub2 "4) \"subscribe\"\n5) \"ch2\"\n6) (integer) 2\n"
208-
set sub3 "7) \"subscribe\"\n8) \"ch3\"\n9) (integer) 3\n"
207+
set sub2 "1) \"subscribe\"\n2) \"ch2\"\n3) (integer) 2\n"
208+
set sub3 "1) \"subscribe\"\n2) \"ch3\"\n3) (integer) 3\n"
209209
assert_equal $sub1$sub2$sub3$reading \
210210
[run_command $fd "subscribe ch1 ch2 ch3"]
211211

212+
# set sub2 "4) \"subscribe\"\n5) \"ch2\"\n6) (integer) 2\n"
213+
# set sub3 "7) \"subscribe\"\n8) \"ch3\"\n9) (integer) 3\n"
212214
# Receive pubsub message.
213215
r publish ch2 hello
214216
set message "1) \"message\"\n2) \"ch2\"\n3) \"hello\"\n"
@@ -241,6 +243,10 @@ start_server {tags {"cli"}} {
241243
[run_command $fd "subscribe ch1"]
242244
}
243245

246+
test_interactive_cli "Subscribed mode" {
247+
248+
}
249+
244250
test_interactive_nontty_cli "Subscribed mode" {
245251
# Raw output and no "Reading messages..." info message.
246252
# Use RESP3 in this test case.

tests/unit/cluster/pubsubshard.tcl

+5-3
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,11 @@ test "sunsubscribe without specifying any channel would unsubscribe all shard ch
5959
set publishclient [valkey_client_by_addr $publishnode(host) $publishnode(port)]
6060
set subscribeclient [valkey_deferring_client_by_addr $publishnode(host) $publishnode(port)]
6161

62-
assert_equal {1} [ssubscribe $subscribeclient {"\{channel.0\}1"}]
63-
assert_equal {2} [ssubscribe $subscribeclient {"\{channel.0\}2"}]
64-
assert_equal {3} [ssubscribe $subscribeclient {"\{channel.0\}3"}]
62+
# assert_equal {1} [ssubscribe $subscribeclient {"\{channel.0\}1"}]
63+
# assert_equal {2} [ssubscribe $subscribeclient {"\{channel.0\}2"}]
64+
# assert_equal {3} [ssubscribe $subscribeclient {"\{channel.0\}3"}]
65+
set sub_res [ssubscribe $subscribeclient [list "\{channel.0\}1" "\{channel.0\}2" "\{channel.0\}3"]]
66+
assert_equal [list 1 2 3] $sub_res
6567

6668
sunsubscribe $subscribeclient
6769

tests/unit/pubsub.tcl

+18-12
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,9 @@ start_server {tags {"pubsub network"}} {
4545
set rd1 [valkey_deferring_client]
4646

4747
# subscribe to two channels
48-
assert_equal {1} [subscribe $rd1 {chan1}]
49-
assert_equal {2} [subscribe $rd1 {chan2}]
48+
#assert_equal {1} [subscribe $rd1 {chan1}]
49+
#assert_equal {2} [subscribe $rd1 {chan2}]
50+
assert_equal {1 2} [subscribe $rd1 {chan1 chan2}]
5051
assert_equal 1 [r publish chan1 hello]
5152
assert_equal 1 [r publish chan2 world]
5253
assert_equal {message chan1 hello} [$rd1 read]
@@ -84,9 +85,10 @@ start_server {tags {"pubsub network"}} {
8485

8586
test "PUBLISH/SUBSCRIBE after UNSUBSCRIBE without arguments" {
8687
set rd1 [valkey_deferring_client]
87-
assert_equal {1} [subscribe $rd1 {chan1}]
88-
assert_equal {2} [subscribe $rd1 {chan2}]
89-
assert_equal {3} [subscribe $rd1 {chan3}]
88+
#assert_equal {1} [subscribe $rd1 {chan1}]
89+
#assert_equal {2} [subscribe $rd1 {chan2}]
90+
#assert_equal {3} [subscribe $rd1 {chan3}]
91+
assert_equal {1 2 3} [subscribe $rd1 {chan1 chan2 chan3}]
9092
unsubscribe $rd1
9193
# wait for the unsubscribe to take effect
9294
wait_for_condition 50 100 {
@@ -104,9 +106,10 @@ start_server {tags {"pubsub network"}} {
104106

105107
test "SUBSCRIBE to one channel more than once" {
106108
set rd1 [valkey_deferring_client]
107-
assert_equal {1} [subscribe $rd1 {chan1}]
108-
assert_equal {2} [subscribe $rd1 {chan2}]
109-
assert_equal {3} [subscribe $rd1 {chan3}]
109+
#assert_equal {1} [subscribe $rd1 {chan1}]
110+
#assert_equal {2} [subscribe $rd1 {chan2}]
111+
#assert_equal {3} [subscribe $rd1 {chan3}]
112+
assert_equal {1 1 1} [subscribe $rd1 {chan1 chan1 chan1}]
110113
assert_equal 1 [r publish chan1 hello]
111114
assert_equal {message chan1 hello} [$rd1 read]
112115

@@ -128,9 +131,9 @@ start_server {tags {"pubsub network"}} {
128131
set rd1 [valkey_deferring_client]
129132

130133
# subscribe to two patterns
131-
# assert_equal {1 2} [psubscribe $rd1 {foo.* bar.*}]
132-
assert_equal {1} [psubscribe $rd1 {foo.*}]
133-
assert_equal {2} [psubscribe $rd1 {bar.*}]
134+
assert_equal {1 2} [psubscribe $rd1 {foo.* bar.*}]
135+
#assert_equal {1} [psubscribe $rd1 {foo.*}]
136+
#assert_equal {2} [psubscribe $rd1 {bar.*}]
134137
assert_equal 1 [r publish foo.1 hello]
135138
assert_equal 1 [r publish bar.1 hello]
136139
assert_equal 0 [r publish foo1 hello]
@@ -488,7 +491,10 @@ start_server {tags {"pubsub network"}} {
488491
# Note: SUBSCRIBE and UNSUBSCRIBE with multiple channels in the same command,
489492
# Only one response is returned
490493
# This update matches with Redis response: one command always returns one response
491-
assert_equal "subscribe foo 1 subscribe bar 2 subscribe baz 3" [r subscribe foo bar baz]
494+
#assert_equal "subscribe foo 1 subscribe bar 2 subscribe baz 3" [r subscribe foo bar baz]
495+
assert_equal "subscribe foo 1" [r subscribe foo bar baz]
496+
assert_equal "subscribe bar 2" [r read]
497+
assert_equal "subscribe baz 3" [r read]
492498

493499
r multi
494500
r ping abc

tests/unit/pubsubshard.tcl

+4-3
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,10 @@ start_server {tags {"pubsubshard external:skip"}} {
6464

6565
test "SSUBSCRIBE to one channel more than once" {
6666
set rd1 [valkey_deferring_client]
67-
assert_equal {1} [ssubscribe $rd1 {chan1}]
68-
assert_equal {1} [ssubscribe $rd1 {chan1}]
69-
assert_equal {1} [ssubscribe $rd1 {chan1}]
67+
#assert_equal {1} [ssubscribe $rd1 {chan1}]
68+
#assert_equal {1} [ssubscribe $rd1 {chan1}]
69+
#assert_equal {1} [ssubscribe $rd1 {chan1}]
70+
assert_equal {1 1 1} [ssubscribe $rd1 {chan1 chan1 chan1}]
7071
assert_equal 1 [r SPUBLISH chan1 hello]
7172
assert_equal {smessage chan1 hello} [$rd1 read]
7273

0 commit comments

Comments
 (0)