44import org .smartboot .socket .extension .multiplex .MultiplexClient ;
55import org .smartboot .socket .transport .AioQuickClient ;
66import org .smartboot .socket .transport .AioSession ;
7- import tech .smartboot .redisun .cmd .AppendCommand ;
8- import tech .smartboot .redisun .cmd .DBSizeCommand ;
9- import tech .smartboot .redisun .cmd .DecrByCommand ;
10- import tech .smartboot .redisun .cmd .DecrCommand ;
11- import tech .smartboot .redisun .cmd .DelCommand ;
12- import tech .smartboot .redisun .cmd .ExistsCommand ;
13- import tech .smartboot .redisun .cmd .ExpireCommand ;
14- import tech .smartboot .redisun .cmd .FlushAllCommand ;
15- import tech .smartboot .redisun .cmd .FlushDbCommand ;
16- import tech .smartboot .redisun .cmd .GetCommand ;
17- import tech .smartboot .redisun .cmd .HGetCommand ;
18- import tech .smartboot .redisun .cmd .HSetCommand ;
19- import tech .smartboot .redisun .cmd .HelloCommand ;
20- import tech .smartboot .redisun .cmd .IncrByCommand ;
21- import tech .smartboot .redisun .cmd .IncrCommand ;
22- import tech .smartboot .redisun .cmd .LPopCommand ;
23- import tech .smartboot .redisun .cmd .LPushCommand ;
24- import tech .smartboot .redisun .cmd .MGetCommand ;
25- import tech .smartboot .redisun .cmd .MSetCommand ;
26- import tech .smartboot .redisun .cmd .RPopCommand ;
27- import tech .smartboot .redisun .cmd .RPushCommand ;
28- import tech .smartboot .redisun .cmd .SAddCommand ;
29- import tech .smartboot .redisun .cmd .SelectCommand ;
30- import tech .smartboot .redisun .cmd .SetCommand ;
31- import tech .smartboot .redisun .cmd .StrlenCommand ;
32- import tech .smartboot .redisun .cmd .TtlCommand ;
33- import tech .smartboot .redisun .cmd .TypeCommand ;
34- import tech .smartboot .redisun .cmd .ZAddCommand ;
35- import tech .smartboot .redisun .cmd .ZRangeCommand ;
36- import tech .smartboot .redisun .cmd .ZRemCommand ;
37- import tech .smartboot .redisun .cmd .ZScoreCommand ;
7+ import tech .smartboot .redisun .cmd .*;
388import tech .smartboot .redisun .resp .Arrays ;
399import tech .smartboot .redisun .resp .BulkStrings ;
4010import tech .smartboot .redisun .resp .Doubles ;
@@ -253,8 +223,8 @@ public CompletableFuture<List<ZRangeCommand.Tuple>> asyncZrange(String key, long
253223 options .accept (cmd );
254224 }
255225 return execute (cmd ).thenApply (resp -> {
256- if (resp instanceof tech . smartboot . redisun . resp . Arrays ) {
257- List <RESP > resps = ((tech . smartboot . redisun . resp . Arrays ) resp ).getValue ();
226+ if (resp instanceof Arrays ) {
227+ List <RESP > resps = ((Arrays ) resp ).getValue ();
258228 List <ZRangeCommand .Tuple > result = new ArrayList <>(resps .size ());
259229 for (RESP r : resps ) {
260230 ZRangeCommand .Tuple tuple = new ZRangeCommand .Tuple ();
@@ -425,8 +395,8 @@ public List<String> mget(List<String> keys) {
425395 */
426396 public CompletableFuture <List <String >> asyncMget (List <String > keys ) {
427397 return execute (new MGetCommand (keys )).thenApply (resp -> {
428- if (resp instanceof tech . smartboot . redisun . resp . Arrays ) {
429- List <RESP > resps = ((tech . smartboot . redisun . resp . Arrays ) resp ).getValue ();
398+ if (resp instanceof Arrays ) {
399+ List <RESP > resps = ((Arrays ) resp ).getValue ();
430400 List <String > result = new ArrayList <>(resps .size ());
431401 for (RESP r : resps ) {
432402 if (r instanceof Nulls ) {
@@ -1173,4 +1143,108 @@ public CompletableFuture<String> asyncRpop(String key) {
11731143 throw new RedisunException ("invalid response:" + resp );
11741144 });
11751145 }
1176- }
1146+
1147+ /**
1148+ * 发布消息到指定频道
1149+ *
1150+ * @param channel 频道名称
1151+ * @param message 要发布的消息
1152+ * @return 接收到此消息的客户端数量
1153+ */
1154+ public int publish (String channel , String message ) {
1155+ try {
1156+ return asyncPublish (channel , message ).get ();
1157+ } catch (Exception e ) {
1158+ throw new RedisunException (e );
1159+ }
1160+ }
1161+
1162+ /**
1163+ * 异步发布消息到指定频道
1164+ *
1165+ * @param channel 频道名称
1166+ * @param message 要发布的消息
1167+ * @return 接收到此消息的客户端数量
1168+ */
1169+ public CompletableFuture <Integer > asyncPublish (String channel , String message ) {
1170+ return execute (new PublishCommand (channel , message )).thenApply (resp -> {
1171+ if (resp instanceof Integers ) {
1172+ return ((Integers ) resp ).getValue ();
1173+ }
1174+ throw new RedisunException ("invalid response:" + resp );
1175+ });
1176+ }
1177+
1178+ /**
1179+ * 订阅给定的一个或多个频道
1180+ * 注意:一旦进入订阅状态,连接就不能用于执行其他命令,直到取消订阅
1181+ *
1182+ * @param pubsub 消息回调处理类
1183+ * @param channels 要订阅的频道列表
1184+ * @return 订阅对象
1185+ */
1186+ public RedisunPubSub subscribe (RedisunPubSub pubsub , String ... channels ) {
1187+ // 获取零负载的连接,用于独占连接
1188+ AioQuickClient client = findFirstZeroLoadClient ();
1189+ client .connectTimeout (0 );
1190+ AioSession session = client .getSession ();
1191+ RedisSession redisSession = session .getAttachment ();
1192+ redisSession .setPubSub (pubsub );
1193+ // 设置取消订阅回调
1194+ pubsub .setUnsubscribe (uChannels -> {
1195+ try {
1196+ new UnsubscribeCommand (uChannels ).writeTo (session .writeBuffer ());
1197+ session .writeBuffer ().flush ();
1198+ } catch (IOException e ) {
1199+ throw new RedisunException (e );
1200+ }
1201+ });
1202+ // 设置异常关闭时继续订阅
1203+ pubsub .setSubscribe (this ::subscribe );
1204+ // 设置释放连接回调
1205+ pubsub .setReleaseClient (() -> {
1206+ multiplexClient .release (client );
1207+ redisSession .setPubSub (null );
1208+ });
1209+ // 执行订阅命令
1210+ try {
1211+ new SubscribeCommand (channels ).writeTo (session .writeBuffer ());
1212+ session .writeBuffer ().flush ();
1213+ } catch (IOException e ) {
1214+ multiplexClient .release (client );
1215+ redisSession .setPubSub (null );
1216+ throw new RuntimeException (e );
1217+ }
1218+ return pubsub ;
1219+ }
1220+
1221+ /**
1222+ * 寻找一个零负载的连接
1223+ * @return 零负载的连接
1224+ */
1225+ private AioQuickClient findFirstZeroLoadClient () {
1226+ List <AioQuickClient > nonZeroClients = new ArrayList <>();
1227+ try {
1228+ while (true ) {
1229+ AioQuickClient client = multiplexClient .acquire ();
1230+ AioSession session = client .getSession ();
1231+ RedisSession redisSession = session .getAttachment ();
1232+ if (redisSession .load () == 0 ) {
1233+ // 找到零负载连接,先把其他连接归还
1234+ for (int i = nonZeroClients .size () - 1 ; i >= 0 ; i --) {
1235+ multiplexClient .reuse (nonZeroClients .get (i ));
1236+ }
1237+ if (client != currentClient ) {
1238+ return client ;
1239+ }
1240+ }
1241+ nonZeroClients .add (client );
1242+ }
1243+ } catch (Throwable e ) {
1244+ for (AioQuickClient c : nonZeroClients ) {
1245+ multiplexClient .reuse (c );
1246+ }
1247+ throw new RedisunException (e );
1248+ }
1249+ }
1250+ }
0 commit comments