Skip to content

Commit cb91238

Browse files
committed
Add one helper function for subscribe and ssubscribe command
Signed-off-by: hwware <[email protected]>
1 parent bbc5235 commit cb91238

File tree

1 file changed

+22
-34
lines changed

1 file changed

+22
-34
lines changed

src/pubsub.c

+22-34
Original file line numberDiff line numberDiff line change
@@ -533,21 +533,8 @@ int pubsubPublishMessage(robj *channel, robj *message, int sharded) {
533533
* Pubsub commands implementation
534534
*----------------------------------------------------------------------------*/
535535

536-
/* SUBSCRIBE channel [channel ...] */
537-
void subscribeCommand(client *c) {
536+
void addPubSubChannel(client *c, pubsubtype type) {
538537
int j;
539-
if (c->flag.deny_blocking && !c->flag.multi) {
540-
/**
541-
* A client that has CLIENT_DENY_BLOCKING flag on
542-
* expect a reply per command and so can not execute subscribe.
543-
*
544-
* Notice that we have a special treatment for multi because of
545-
* backward compatibility
546-
*/
547-
addReplyError(c, "SUBSCRIBE isn't allowed for a DENY BLOCKING client");
548-
return;
549-
}
550-
551538
struct ClientFlags old_flags = c->flag;
552539
c->flag.pushing = 1;
553540
int number = (c->argc - 1) * 3;
@@ -558,12 +545,29 @@ void subscribeCommand(client *c) {
558545
addReplyPushLen(c, number);
559546

560547
for (j = 1; j < c->argc; j++) {
561-
pubsubSubscribeChannel(c, c->argv[j], pubSubType);
562-
addReply(c, *pubSubType.subscribeMsg);
548+
pubsubSubscribeChannel(c, c->argv[j], type);
549+
addReply(c, *type.subscribeMsg);
563550
addReplyBulk(c, c->argv[j]);
564-
addReplyLongLong(c, pubSubType.subscriptionCount(c));
551+
addReplyLongLong(c, type.subscriptionCount(c));
565552
}
566553
if (!old_flags.pushing) c->flag.pushing = 0;
554+
}
555+
556+
557+
/* SUBSCRIBE channel [channel ...] */
558+
void subscribeCommand(client *c) {
559+
if (c->flag.deny_blocking && !c->flag.multi) {
560+
/**
561+
* A client that has CLIENT_DENY_BLOCKING flag on
562+
* expect a reply per command and so can not execute subscribe.
563+
*
564+
* Notice that we have a special treatment for multi because of
565+
* backward compatibility
566+
*/
567+
addReplyError(c, "SUBSCRIBE isn't allowed for a DENY BLOCKING client");
568+
return;
569+
}
570+
addPubSubChannel(c, pubSubType);
567571

568572
markClientAsPubSub(c);
569573
}
@@ -730,24 +734,8 @@ void ssubscribeCommand(client *c) {
730734
addReplyError(c, "SSUBSCRIBE isn't allowed for a DENY BLOCKING client");
731735
return;
732736
}
733-
int j;
734-
735-
struct ClientFlags old_flags = c->flag;
736-
c->flag.pushing = 1;
737-
int number = (c->argc - 1) * 3;
738737

739-
if (c->resp == 2)
740-
addReply(c, shared.mbulkhdr[number]);
741-
else
742-
addReplyPushLen(c, number);
743-
744-
for (j = 1; j < c->argc; j++) {
745-
pubsubSubscribeChannel(c, c->argv[j], pubSubShardType);
746-
addReply(c, *pubSubShardType.subscribeMsg);
747-
addReplyBulk(c, c->argv[j]);
748-
addReplyLongLong(c, pubSubShardType.subscriptionCount(c));
749-
}
750-
if (!old_flags.pushing) c->flag.pushing = 0;
738+
addPubSubChannel(c, pubSubShardType);
751739
markClientAsPubSub(c);
752740
}
753741

0 commit comments

Comments
 (0)