Skip to content

Commit f517c05

Browse files
authored
Support Sharded PubSub (#3396)
1 parent 3f695a9 commit f517c05

10 files changed

+368
-30
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package redis.clients.jedis;
2+
3+
public abstract class BinaryJedisShardedPubSub extends JedisShardedPubSubBase<byte[]> {
4+
5+
@Override
6+
protected final byte[] encode(byte[] raw) {
7+
return raw;
8+
}
9+
}

src/main/java/redis/clients/jedis/CommandObjects.java

+8
Original file line numberDiff line numberDiff line change
@@ -3106,6 +3106,14 @@ public final CommandObject<Long> publish(String channel, String message) {
31063106
public final CommandObject<Long> publish(byte[] channel, byte[] message) {
31073107
return new CommandObject<>(commandArguments(PUBLISH).add(channel).add(message), BuilderFactory.LONG);
31083108
}
3109+
3110+
public final CommandObject<Long> spublish(String channel, String message) {
3111+
return new CommandObject<>(commandArguments(SPUBLISH).key(channel).add(message), BuilderFactory.LONG);
3112+
}
3113+
3114+
public final CommandObject<Long> spublish(byte[] channel, byte[] message) {
3115+
return new CommandObject<>(commandArguments(SPUBLISH).key(channel).add(message), BuilderFactory.LONG);
3116+
}
31093117
// Miscellaneous commands
31103118

31113119
// RediSearch commands

src/main/java/redis/clients/jedis/Jedis.java

+18
Original file line numberDiff line numberDiff line change
@@ -7935,6 +7935,24 @@ public Map<String, Long> pubsubNumSub(String... channels) {
79357935
return BuilderFactory.PUBSUB_NUMSUB_MAP.build(connection.getOne());
79367936
}
79377937

7938+
public List<String> pubsubShardChannels() {
7939+
checkIsInMultiOrPipeline();
7940+
connection.sendCommand(PUBSUB, SHARDCHANNELS);
7941+
return connection.getMultiBulkReply();
7942+
}
7943+
7944+
public List<String> pubsubShardChannels(final String pattern) {
7945+
checkIsInMultiOrPipeline();
7946+
connection.sendCommand(PUBSUB, SHARDCHANNELS.name(), pattern);
7947+
return connection.getMultiBulkReply();
7948+
}
7949+
7950+
public Map<String, Long> pubsubShardNumSub(String... channels) {
7951+
checkIsInMultiOrPipeline();
7952+
connection.sendCommand(PUBSUB, joinParameters(SHARDNUMSUB.name(), channels));
7953+
return BuilderFactory.PUBSUB_NUMSUB_MAP.build(connection.getOne());
7954+
}
7955+
79387956
@Override
79397957
public Object eval(final String script, final int keyCount, final String... params) {
79407958
checkIsInMultiOrPipeline();

src/main/java/redis/clients/jedis/JedisCluster.java

+25
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,11 @@
44
import java.util.Collections;
55
import java.util.Map;
66
import java.util.Set;
7+
78
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
9+
810
import redis.clients.jedis.providers.ClusterConnectionProvider;
11+
import redis.clients.jedis.util.JedisClusterCRC16;
912

1013
public class JedisCluster extends UnifiedJedis {
1114

@@ -205,6 +208,28 @@ public Connection getConnectionFromSlot(int slot) {
205208
return ((ClusterConnectionProvider) provider).getConnectionFromSlot(slot);
206209
}
207210

211+
// commands
212+
public long spublish(String channel, String message) {
213+
return executeCommand(commandObjects.spublish(channel, message));
214+
}
215+
216+
public long spublish(byte[] channel, byte[] message) {
217+
return executeCommand(commandObjects.spublish(channel, message));
218+
}
219+
220+
public void ssubscribe(final JedisShardedPubSub jedisPubSub, final String... channels) {
221+
try (Connection connection = getConnectionFromSlot(JedisClusterCRC16.getSlot(channels[0]))) {
222+
jedisPubSub.proceed(connection, channels);
223+
}
224+
}
225+
226+
public void ssubscribe(BinaryJedisShardedPubSub jedisPubSub, final byte[]... channels) {
227+
try (Connection connection = getConnectionFromSlot(JedisClusterCRC16.getSlot(channels[0]))) {
228+
jedisPubSub.proceed(connection, channels);
229+
}
230+
}
231+
// commands
232+
208233
@Override
209234
public ClusterPipeline pipelined() {
210235
return new ClusterPipeline((ClusterConnectionProvider) provider, (ClusterCommandObjects) commandObjects);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package redis.clients.jedis;
2+
3+
import redis.clients.jedis.util.SafeEncoder;
4+
5+
public abstract class JedisShardedPubSub extends JedisShardedPubSubBase<String> {
6+
7+
@Override
8+
protected final String encode(byte[] raw) {
9+
return SafeEncoder.encode(raw);
10+
}
11+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
package redis.clients.jedis;
2+
3+
import static redis.clients.jedis.Protocol.ResponseKeyword.*;
4+
5+
import java.util.Arrays;
6+
import java.util.List;
7+
8+
import redis.clients.jedis.Protocol.Command;
9+
import redis.clients.jedis.exceptions.JedisException;
10+
11+
public abstract class JedisShardedPubSubBase<T> {
12+
13+
private int subscribedChannels = 0;
14+
private volatile Connection client;
15+
16+
public void onSMessage(T channel, T message) {
17+
}
18+
19+
public void onSSubscribe(T channel, int subscribedChannels) {
20+
}
21+
22+
public void onSUnsubscribe(T channel, int subscribedChannels) {
23+
}
24+
25+
private void sendAndFlushCommand(Command command, T... args) {
26+
if (client == null) {
27+
throw new JedisException(getClass() + " is not connected to a Connection.");
28+
}
29+
CommandArguments cargs = new CommandArguments(command).addObjects(args);
30+
client.sendCommand(cargs);
31+
client.flush();
32+
}
33+
34+
public final void sunsubscribe() {
35+
sendAndFlushCommand(Command.SUNSUBSCRIBE);
36+
}
37+
38+
public final void sunsubscribe(T... channels) {
39+
sendAndFlushCommand(Command.SUNSUBSCRIBE, channels);
40+
}
41+
42+
public final void ssubscribe(T... channels) {
43+
sendAndFlushCommand(Command.SSUBSCRIBE, channels);
44+
}
45+
46+
public final boolean isSubscribed() {
47+
return subscribedChannels > 0;
48+
}
49+
50+
public final int getSubscribedChannels() {
51+
return subscribedChannels;
52+
}
53+
54+
public final void proceed(Connection client, T... channels) {
55+
this.client = client;
56+
this.client.setTimeoutInfinite();
57+
try {
58+
ssubscribe(channels);
59+
process();
60+
} finally {
61+
this.client.rollbackTimeout();
62+
}
63+
}
64+
65+
protected abstract T encode(byte[] raw);
66+
67+
// private void process(Client client) {
68+
private void process() {
69+
70+
do {
71+
Object reply = client.getUnflushedObject();
72+
73+
if (reply instanceof List) {
74+
List<Object> listReply = (List<Object>) reply;
75+
final Object firstObj = listReply.get(0);
76+
if (!(firstObj instanceof byte[])) {
77+
throw new JedisException("Unknown message type: " + firstObj);
78+
}
79+
final byte[] resp = (byte[]) firstObj;
80+
if (Arrays.equals(SSUBSCRIBE.getRaw(), resp)) {
81+
subscribedChannels = ((Long) listReply.get(2)).intValue();
82+
final byte[] bchannel = (byte[]) listReply.get(1);
83+
final T enchannel = (bchannel == null) ? null : encode(bchannel);
84+
onSSubscribe(enchannel, subscribedChannels);
85+
} else if (Arrays.equals(SUNSUBSCRIBE.getRaw(), resp)) {
86+
subscribedChannels = ((Long) listReply.get(2)).intValue();
87+
final byte[] bchannel = (byte[]) listReply.get(1);
88+
final T enchannel = (bchannel == null) ? null : encode(bchannel);
89+
onSUnsubscribe(enchannel, subscribedChannels);
90+
} else if (Arrays.equals(SMESSAGE.getRaw(), resp)) {
91+
final byte[] bchannel = (byte[]) listReply.get(1);
92+
final byte[] bmesg = (byte[]) listReply.get(2);
93+
final T enchannel = (bchannel == null) ? null : encode(bchannel);
94+
final T enmesg = (bmesg == null) ? null : encode(bmesg);
95+
onSMessage(enchannel, enmesg);
96+
} else {
97+
System.out.println(redis.clients.jedis.util.SafeEncoder.encodeObject(resp));
98+
throw new JedisException("Unknown message type: " + firstObj);
99+
}
100+
} else {
101+
throw new JedisException("Unknown message type: " + reply);
102+
}
103+
} while (isSubscribed());
104+
105+
// /* Invalidate instance since this thread is no longer listening */
106+
// this.client = null;
107+
}
108+
}

src/main/java/redis/clients/jedis/Protocol.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -287,11 +287,12 @@ public static enum Keyword implements Rawable {
287287
STREAMS, CREATE, MKSTREAM, SETID, DESTROY, DELCONSUMER, MAXLEN, GROUP, IDLE, TIME, BLOCK, NOACK,
288288
RETRYCOUNT, STREAM, GROUPS, CONSUMERS, JUSTID, WITHVALUES, NOMKSTREAM, MINID, CREATECONSUMER,
289289
SETUSER, GETUSER, DELUSER, WHOAMI, USERS, CAT, GENPASS, LOG, SAVE, DRYRUN, COPY, AUTH, AUTH2,
290-
NX, XX, EX, PX, EXAT, PXAT, CH, ABSTTL, KEEPTTL, INCR, INFO, CHANNELS, NUMPAT, NUMSUB, NOW, REV,
290+
NX, XX, EX, PX, EXAT, PXAT, CH, ABSTTL, KEEPTTL, INCR, INFO, NOW, REV,
291291
WITHCOORD, WITHDIST, WITHHASH, ANY, FROMMEMBER, FROMLONLAT, BYRADIUS, BYBOX, BYLEX, BYSCORE,
292292
STOREDIST, TO, FORCE, TIMEOUT, DB, UNLOAD, ABORT, IDX, MINMATCHLEN, WITHMATCHLEN, FULL,
293293
DELETE, LIBRARYNAME, WITHCODE, DESCRIPTION, GETKEYS, GETKEYSANDFLAGS, DOCS, FILTERBY, DUMP,
294-
MODULE, ACLCAT, PATTERN, DOCTOR, USAGE, SAMPLES, PURGE, STATS, LOADEX, CONFIG, ARGS;
294+
MODULE, ACLCAT, PATTERN, DOCTOR, USAGE, SAMPLES, PURGE, STATS, LOADEX, CONFIG, ARGS,
295+
CHANNELS, NUMPAT, NUMSUB, SHARDCHANNELS, SHARDNUMSUB;
295296

296297
private final byte[] raw;
297298

@@ -328,7 +329,8 @@ public byte[] getRaw() {
328329

329330
public static enum ResponseKeyword implements Rawable {
330331

331-
SUBSCRIBE, PSUBSCRIBE, UNSUBSCRIBE, PUNSUBSCRIBE, MESSAGE, PMESSAGE, PONG;
332+
SUBSCRIBE, PSUBSCRIBE, UNSUBSCRIBE, PUNSUBSCRIBE, MESSAGE, PMESSAGE, PONG,
333+
SSUBSCRIBE, SUNSUBSCRIBE, SMESSAGE;
332334

333335
private final byte[] raw;
334336

src/main/java/redis/clients/jedis/util/JedisClusterCRC16.java

+7-8
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
11
package redis.clients.jedis.util;
22

3-
import redis.clients.jedis.exceptions.JedisClusterOperationException;
4-
53
/**
64
* CRC16 Implementation according to CCITT standard Polynomial : 1021 (x^16 + x^12 + x^5 + 1) See <a
75
* href="http://redis.io/topics/cluster-spec">Appendix A. CRC16 reference implementation in ANSI
86
* C</a>
97
*/
108
public final class JedisClusterCRC16 {
9+
1110
private static final int[] LOOKUP_TABLE = { 0x0000, 0x1021, 0x2042, 0x3063, 0x4084, 0x50A5,
1211
0x60C6, 0x70E7, 0x8108, 0x9129, 0xA14A, 0xB16B, 0xC18C, 0xD1AD, 0xE1CE, 0xF1EF, 0x1231,
1312
0x0210, 0x3273, 0x2252, 0x52B5, 0x4294, 0x72F7, 0x62D6, 0x9339, 0x8318, 0xB37B, 0xA35A,
@@ -33,13 +32,9 @@ public final class JedisClusterCRC16 {
3332
0x2C83, 0x1CE0, 0x0CC1, 0xEF1F, 0xFF3E, 0xCF5D, 0xDF7C, 0xAF9B, 0xBFBA, 0x8FD9, 0x9FF8,
3433
0x6E17, 0x7E36, 0x4E55, 0x5E74, 0x2E93, 0x3EB2, 0x0ED1, 0x1EF0, };
3534

36-
private JedisClusterCRC16() {
37-
throw new InstantiationError("Must not instantiate this class");
38-
}
39-
4035
public static int getSlot(String key) {
4136
if (key == null) {
42-
throw new JedisClusterOperationException("Slot calculation of null is impossible");
37+
throw new NullPointerException("Slot calculation of null is impossible");
4338
}
4439

4540
key = JedisClusterHashTag.getHashTag(key);
@@ -49,7 +44,7 @@ public static int getSlot(String key) {
4944

5045
public static int getSlot(byte[] key) {
5146
if (key == null) {
52-
throw new JedisClusterOperationException("Slot calculation of null is impossible");
47+
throw new NullPointerException("Slot calculation of null is impossible");
5348
}
5449

5550
int s = -1;
@@ -97,4 +92,8 @@ public static int getCRC16(String key) {
9792
byte[] bytesKey = SafeEncoder.encode(key);
9893
return getCRC16(bytesKey, 0, bytesKey.length);
9994
}
95+
96+
private JedisClusterCRC16() {
97+
throw new InstantiationError("Must not instantiate this class");
98+
}
10099
}

0 commit comments

Comments
 (0)