Skip to content

Commit 3af8b90

Browse files
committed
Add one helper function for subscribe and ssubscribe command
Signed-off-by: hwware <[email protected]>
1 parent dc6f6c4 commit 3af8b90

File tree

1 file changed

+22
-34
lines changed

1 file changed

+22
-34
lines changed

src/pubsub.c

+22-34
Original file line numberDiff line numberDiff line change
@@ -533,21 +533,8 @@ int pubsubPublishMessage(robj *channel, robj *message, int sharded) {
533533
* Pubsub commands implementation
534534
*----------------------------------------------------------------------------*/
535535

536-
/* SUBSCRIBE channel [channel ...] */
537-
void subscribeCommand(client *c) {
536+
void addPubSubChannel(client *c, pubsubtype type) {
538537
int j;
539-
if (c->flag.deny_blocking && !c->flag.multi) {
540-
/**
541-
* A client that has CLIENT_DENY_BLOCKING flag on
542-
* expect a reply per command and so can not execute subscribe.
543-
*
544-
* Notice that we have a special treatment for multi because of
545-
* backward compatibility
546-
*/
547-
addReplyError(c, "SUBSCRIBE isn't allowed for a DENY BLOCKING client");
548-
return;
549-
}
550-
551538
struct ClientFlags old_flags = c->flag;
552539
c->flag.pushing = 1;
553540
int number = (c->argc - 1) * 3;
@@ -558,12 +545,29 @@ void subscribeCommand(client *c) {
558545
addReplyPushLen(c, number);
559546

560547
for (j = 1; j < c->argc; j++) {
561-
pubsubSubscribeChannel(c, c->argv[j], pubSubType);
562-
addReply(c, *pubSubType.subscribeMsg);
548+
pubsubSubscribeChannel(c, c->argv[j], type);
549+
addReply(c, *type.subscribeMsg);
563550
addReplyBulk(c, c->argv[j]);
564-
addReplyLongLong(c, pubSubType.subscriptionCount(c));
551+
addReplyLongLong(c, type.subscriptionCount(c));
565552
}
566553
if (!old_flags.pushing) c->flag.pushing = 0;
554+
}
555+
556+
557+
/* SUBSCRIBE channel [channel ...] */
558+
void subscribeCommand(client *c) {
559+
if (c->flag.deny_blocking && !c->flag.multi) {
560+
/**
561+
* A client that has CLIENT_DENY_BLOCKING flag on
562+
* expect a reply per command and so can not execute subscribe.
563+
*
564+
* Notice that we have a special treatment for multi because of
565+
* backward compatibility
566+
*/
567+
addReplyError(c, "SUBSCRIBE isn't allowed for a DENY BLOCKING client");
568+
return;
569+
}
570+
addPubSubChannel(c, pubSubType);
567571

568572
markClientAsPubSub(c);
569573
}
@@ -729,24 +733,8 @@ void ssubscribeCommand(client *c) {
729733
addReplyError(c, "SSUBSCRIBE isn't allowed for a DENY BLOCKING client");
730734
return;
731735
}
732-
int j;
733-
734-
struct ClientFlags old_flags = c->flag;
735-
c->flag.pushing = 1;
736-
int number = (c->argc - 1) * 3;
737736

738-
if (c->resp == 2)
739-
addReply(c, shared.mbulkhdr[number]);
740-
else
741-
addReplyPushLen(c, number);
742-
743-
for (j = 1; j < c->argc; j++) {
744-
pubsubSubscribeChannel(c, c->argv[j], pubSubShardType);
745-
addReply(c, *pubSubShardType.subscribeMsg);
746-
addReplyBulk(c, c->argv[j]);
747-
addReplyLongLong(c, pubSubShardType.subscriptionCount(c));
748-
}
749-
if (!old_flags.pushing) c->flag.pushing = 0;
737+
addPubSubChannel(c, pubSubShardType);
750738
markClientAsPubSub(c);
751739
}
752740

0 commit comments

Comments
 (0)