Skip to content

Commit bfecfcb

Browse files
committed
Merge pull request #523 from xetorthio/pubsub
Add pubsub commands
2 parents d5f984a + 4ab54d9 commit bfecfcb

File tree

6 files changed

+171
-3
lines changed

6 files changed

+171
-3
lines changed

src/main/java/redis/clients/jedis/BinaryClient.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -560,7 +560,10 @@ public void punsubscribe() {
560560
public void punsubscribe(final byte[]... patterns) {
561561
sendCommand(PUNSUBSCRIBE, patterns);
562562
}
563-
563+
564+
public void pubsub(final byte[]... args) {
565+
sendCommand(PUBSUB, args);
566+
}
564567
public void zcount(final byte[] key, final double min, final double max) {
565568

566569
byte byteArrayMin[] = (min == Double.NEGATIVE_INFINITY) ? "-inf"

src/main/java/redis/clients/jedis/BuilderFactory.java

+1
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ public String toString() {
103103
}
104104

105105
};
106+
106107
public static final Builder<Set<String>> STRING_SET = new Builder<Set<String>>() {
107108
@SuppressWarnings("unchecked")
108109
public Set<String> build(Object data) {

src/main/java/redis/clients/jedis/Client.java

+21
Original file line numberDiff line numberDiff line change
@@ -672,6 +672,18 @@ public void subscribe(final String... channels) {
672672
}
673673
subscribe(cs);
674674
}
675+
676+
public void pubsubChannels(String pattern) {
677+
pubsub(Protocol.PUBSUB_CHANNELS, pattern);
678+
}
679+
680+
public void pubsubNumPat() {
681+
pubsub(Protocol.PUBSUB_NUM_PAT);
682+
}
683+
684+
public void pubsubNumSub(String... channels) {
685+
pubsub(Protocol.PUBSUB_NUMSUB, channels);
686+
}
675687

676688
public void configSet(String parameter, String value) {
677689
configSet(SafeEncoder.encode(parameter), SafeEncoder.encode(value));
@@ -841,6 +853,15 @@ public void cluster(final String subcommand, final int... args) {
841853
arg[0] = SafeEncoder.encode(subcommand);
842854
cluster(arg);
843855
}
856+
857+
public void pubsub(final String subcommand, final String... args) {
858+
final byte[][] arg = new byte[args.length+1][];
859+
for (int i = 1; i < arg.length; i++) {
860+
arg[i] = SafeEncoder.encode(args[i-1]);
861+
}
862+
arg[0] = SafeEncoder.encode(subcommand);
863+
pubsub(arg);
864+
}
844865

845866
public void cluster(final String subcommand, final String... args) {
846867
final byte[][] arg = new byte[args.length + 1][];

src/main/java/redis/clients/jedis/Jedis.java

+19
Original file line numberDiff line numberDiff line change
@@ -3232,4 +3232,23 @@ public String asking() {
32323232
client.asking();
32333233
return client.getStatusCodeReply();
32343234
}
3235+
3236+
public List<String> pubsubChannels(String pattern) {
3237+
checkIsInMulti();
3238+
client.pubsubChannels(pattern);
3239+
return client.getMultiBulkReply();
3240+
}
3241+
3242+
public Long pubsubNumPat() {
3243+
checkIsInMulti();
3244+
client.pubsubNumPat();
3245+
return client.getIntegerReply();
3246+
}
3247+
3248+
public Map<String, String> pubsubNumSub(String... channels) {
3249+
checkIsInMulti();
3250+
client.pubsubNumSub(channels);
3251+
return BuilderFactory.STRING_MAP
3252+
.build(client.getBinaryMultiBulkReply());
3253+
}
32353254
}

src/main/java/redis/clients/jedis/Protocol.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@ public final class Protocol {
4444
public static final String CLUSTER_SETSLOT_NODE = "node";
4545
public static final String CLUSTER_SETSLOT_MIGRATING = "migrating";
4646
public static final String CLUSTER_SETSLOT_IMPORTING = "importing";
47+
public static final String PUBSUB_CHANNELS= "channels";
48+
public static final String PUBSUB_NUMSUB = "numsub";
49+
public static final String PUBSUB_NUM_PAT = "numpat";
4750

4851
private Protocol() {
4952
// this prevent the class from instantiation
@@ -181,7 +184,7 @@ public static Object read(final RedisInputStream is) {
181184
}
182185

183186
public static final byte[] toByteArray(final boolean value) {
184-
return toByteArray(value ? 1 : 0);
187+
return toByteArray(value ? 1 : 0);
185188
}
186189

187190
public static final byte[] toByteArray(final int value) {
@@ -197,7 +200,7 @@ public static final byte[] toByteArray(final double value) {
197200
}
198201

199202
public static enum Command {
200-
PING, SET, GET, QUIT, EXISTS, DEL, TYPE, FLUSHDB, KEYS, RANDOMKEY, RENAME, RENAMENX, RENAMEX, DBSIZE, EXPIRE, EXPIREAT, TTL, SELECT, MOVE, FLUSHALL, GETSET, MGET, SETNX, SETEX, MSET, MSETNX, DECRBY, DECR, INCRBY, INCR, APPEND, SUBSTR, HSET, HGET, HSETNX, HMSET, HMGET, HINCRBY, HEXISTS, HDEL, HLEN, HKEYS, HVALS, HGETALL, RPUSH, LPUSH, LLEN, LRANGE, LTRIM, LINDEX, LSET, LREM, LPOP, RPOP, RPOPLPUSH, SADD, SMEMBERS, SREM, SPOP, SMOVE, SCARD, SISMEMBER, SINTER, SINTERSTORE, SUNION, SUNIONSTORE, SDIFF, SDIFFSTORE, SRANDMEMBER, ZADD, ZRANGE, ZREM, ZINCRBY, ZRANK, ZREVRANK, ZREVRANGE, ZCARD, ZSCORE, MULTI, DISCARD, EXEC, WATCH, UNWATCH, SORT, BLPOP, BRPOP, AUTH, SUBSCRIBE, PUBLISH, UNSUBSCRIBE, PSUBSCRIBE, PUNSUBSCRIBE, ZCOUNT, ZRANGEBYSCORE, ZREVRANGEBYSCORE, ZREMRANGEBYRANK, ZREMRANGEBYSCORE, ZUNIONSTORE, ZINTERSTORE, SAVE, BGSAVE, BGREWRITEAOF, LASTSAVE, SHUTDOWN, INFO, MONITOR, SLAVEOF, CONFIG, STRLEN, SYNC, LPUSHX, PERSIST, RPUSHX, ECHO, LINSERT, DEBUG, BRPOPLPUSH, SETBIT, GETBIT, SETRANGE, GETRANGE, EVAL, EVALSHA, SCRIPT, SLOWLOG, OBJECT, BITCOUNT, BITOP, SENTINEL, DUMP, RESTORE, PEXPIRE, PEXPIREAT, PTTL, INCRBYFLOAT, PSETEX, CLIENT, TIME, MIGRATE, HINCRBYFLOAT, SCAN, HSCAN, SSCAN, ZSCAN, WAIT, CLUSTER, ASKING;
203+
PING, SET, GET, QUIT, EXISTS, DEL, TYPE, FLUSHDB, KEYS, RANDOMKEY, RENAME, RENAMENX, RENAMEX, DBSIZE, EXPIRE, EXPIREAT, TTL, SELECT, MOVE, FLUSHALL, GETSET, MGET, SETNX, SETEX, MSET, MSETNX, DECRBY, DECR, INCRBY, INCR, APPEND, SUBSTR, HSET, HGET, HSETNX, HMSET, HMGET, HINCRBY, HEXISTS, HDEL, HLEN, HKEYS, HVALS, HGETALL, RPUSH, LPUSH, LLEN, LRANGE, LTRIM, LINDEX, LSET, LREM, LPOP, RPOP, RPOPLPUSH, SADD, SMEMBERS, SREM, SPOP, SMOVE, SCARD, SISMEMBER, SINTER, SINTERSTORE, SUNION, SUNIONSTORE, SDIFF, SDIFFSTORE, SRANDMEMBER, ZADD, ZRANGE, ZREM, ZINCRBY, ZRANK, ZREVRANK, ZREVRANGE, ZCARD, ZSCORE, MULTI, DISCARD, EXEC, WATCH, UNWATCH, SORT, BLPOP, BRPOP, AUTH, SUBSCRIBE, PUBLISH, UNSUBSCRIBE, PSUBSCRIBE, PUNSUBSCRIBE, PUBSUB, ZCOUNT, ZRANGEBYSCORE, ZREVRANGEBYSCORE, ZREMRANGEBYRANK, ZREMRANGEBYSCORE, ZUNIONSTORE, ZINTERSTORE, SAVE, BGSAVE, BGREWRITEAOF, LASTSAVE, SHUTDOWN, INFO, MONITOR, SLAVEOF, CONFIG, STRLEN, SYNC, LPUSHX, PERSIST, RPUSHX, ECHO, LINSERT, DEBUG, BRPOPLPUSH, SETBIT, GETBIT, SETRANGE, GETRANGE, EVAL, EVALSHA, SCRIPT, SLOWLOG, OBJECT, BITCOUNT, BITOP, SENTINEL, DUMP, RESTORE, PEXPIRE, PEXPIREAT, PTTL, INCRBYFLOAT, PSETEX, CLIENT, TIME, MIGRATE, HINCRBYFLOAT, SCAN, HSCAN, SSCAN, ZSCAN, WAIT, CLUSTER, ASKING;
201204

202205
public final byte[] raw;
203206

src/test/java/redis/clients/jedis/tests/commands/PublishSubscribeCommandsTest.java

+121
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33
import java.io.IOException;
44
import java.net.UnknownHostException;
55
import java.util.Arrays;
6+
import java.util.HashMap;
7+
import java.util.List;
8+
import java.util.Map;
69
import java.util.concurrent.atomic.AtomicBoolean;
710

811
import org.junit.Test;
@@ -61,6 +64,124 @@ public void onPMessage(String pattern, String channel,
6164
}
6265
}, "foo");
6366
}
67+
68+
69+
@Test
70+
public void pubSubChannels(){
71+
final List<String> expectedActiveChannels = Arrays.asList("testchan1", "testchan2", "testchan3");
72+
jedis.subscribe(new JedisPubSub() {
73+
private int count = 0;
74+
75+
@Override
76+
public void onUnsubscribe(String channel, int subscribedChannels) {
77+
}
78+
79+
@Override
80+
public void onSubscribe(String channel, int subscribedChannels) {
81+
count++;
82+
//All channels are subscribed
83+
if (count == 3) {
84+
Jedis otherJedis = createJedis();
85+
List<String> activeChannels = otherJedis.pubsubChannels("test*");
86+
assertTrue(expectedActiveChannels.containsAll(activeChannels));
87+
unsubscribe();
88+
}
89+
}
90+
91+
@Override
92+
public void onPUnsubscribe(String pattern, int subscribedChannels) {
93+
}
94+
95+
@Override
96+
public void onPSubscribe(String pattern, int subscribedChannels) {
97+
}
98+
99+
@Override
100+
public void onPMessage(String pattern, String channel, String message) {
101+
}
102+
103+
@Override
104+
public void onMessage(String channel, String message) {
105+
}
106+
}, "testchan1", "testchan2", "testchan3");
107+
}
108+
109+
@Test
110+
public void pubSubNumPat(){
111+
jedis.psubscribe(new JedisPubSub() {
112+
private int count=0;
113+
@Override
114+
public void onUnsubscribe(String channel, int subscribedChannels) {
115+
}
116+
117+
@Override
118+
public void onSubscribe(String channel, int subscribedChannels) {
119+
}
120+
121+
@Override
122+
public void onPUnsubscribe(String pattern, int subscribedChannels) {
123+
}
124+
125+
@Override
126+
public void onPSubscribe(String pattern, int subscribedChannels) {
127+
count++;
128+
if (count == 3) {
129+
Jedis otherJedis = createJedis();
130+
Long numPatterns = otherJedis.pubsubNumPat();
131+
assertEquals(new Long(2l), numPatterns);
132+
punsubscribe();
133+
}
134+
}
135+
136+
@Override
137+
public void onPMessage(String pattern, String channel, String message) {
138+
}
139+
140+
@Override
141+
public void onMessage(String channel, String message) {
142+
}
143+
}, "test*", "test*", "chan*");
144+
}
145+
146+
@Test
147+
public void pubSubNumSub(){
148+
final Map<String, String> expectedNumSub = new HashMap<String, String>();
149+
expectedNumSub.put("testchannel2", "1");
150+
expectedNumSub.put("testchannel1", "1");
151+
jedis.subscribe(new JedisPubSub() {
152+
private int count=0;
153+
@Override
154+
public void onUnsubscribe(String channel, int subscribedChannels) {
155+
}
156+
157+
@Override
158+
public void onSubscribe(String channel, int subscribedChannels) {
159+
count++;
160+
if (count == 2) {
161+
Jedis otherJedis = createJedis();
162+
Map<String, String> numSub = otherJedis.pubsubNumSub("testchannel1", "testchannel2");
163+
assertEquals(expectedNumSub, numSub);
164+
unsubscribe();
165+
}
166+
}
167+
168+
@Override
169+
public void onPUnsubscribe(String pattern, int subscribedChannels) {
170+
}
171+
172+
@Override
173+
public void onPSubscribe(String pattern, int subscribedChannels) {
174+
}
175+
176+
@Override
177+
public void onPMessage(String pattern, String channel, String message) {
178+
}
179+
180+
@Override
181+
public void onMessage(String channel, String message) {
182+
}
183+
}, "testchannel1", "testchannel2");
184+
}
64185

65186
@Test
66187
public void subscribeMany() throws UnknownHostException, IOException,

0 commit comments

Comments
 (0)