Skip to content

Commit 84f76d2

Browse files
smthinggitee-org
authored andcommitted
!3 feat: 发布消息、订阅消息
Merge pull request !3 from 杜福忠/master
2 parents 9d6e6dc + b71bdd5 commit 84f76d2

10 files changed

Lines changed: 866 additions & 40 deletions

File tree

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
/target/
2+
/.idea/

src/main/java/tech/smartboot/redisun/RedisMessageProcessor.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,10 @@ public RESP decode(ByteBuffer readBuffer, AioSession session) {
8585
public void process0(AioSession session, RESP msg) {
8686
// 获取当前会话关联的Redis会话对象
8787
RedisSession redisSession = session.getAttachment();
88+
if (redisSession.isPubSub()){
89+
redisSession.getPubSub().handleMessage(msg);
90+
return;
91+
}
8892
CompletableFuture<RESP> future = redisSession.poll();
8993
if (future == null) {
9094
// 如果没有等待的CompletableFuture,则将消息记录为错误并返回
@@ -128,6 +132,11 @@ public void stateEvent0(AioSession session, StateMachineEnum stateMachineEnum, T
128132
}
129133
case SESSION_CLOSED: {
130134
RedisSession redisSession = session.getAttachment();
135+
if (redisSession.isPubSub()){
136+
// 通知订阅会话
137+
redisSession.getPubSub().onSessionClosed(throwable);
138+
break;
139+
}
131140
CompletableFuture<RESP> future;
132141
while ((future = redisSession.poll()) != null) {
133142
future.completeExceptionally(new RedisunException("session closed"));
@@ -141,4 +150,4 @@ public void stateEvent0(AioSession session, StateMachineEnum stateMachineEnum, T
141150
// throwable.printStackTrace();
142151
}
143152
}
144-
}
153+
}

src/main/java/tech/smartboot/redisun/RedisSession.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,11 @@ final class RedisSession {
3737
private int offerCount = 0;
3838
private int pollCount = 0;
3939

40+
/**
41+
* 订阅会话
42+
*/
43+
private volatile RedisunPubSub pubSub;
44+
4045
public int incrOfferCount() {
4146
return ++offerCount;
4247
}
@@ -82,4 +87,19 @@ int load() {
8287
return size >= 0 ? size : -size;
8388
}
8489

85-
}
90+
/**
91+
* 设置订阅会话
92+
* @param pubSub 订阅会话
93+
*/
94+
void setPubSub(RedisunPubSub pubSub) {
95+
this.pubSub = pubSub;
96+
}
97+
98+
RedisunPubSub getPubSub() {
99+
return pubSub;
100+
}
101+
102+
boolean isPubSub() {
103+
return this.pubSub != null;
104+
}
105+
}

src/main/java/tech/smartboot/redisun/Redisun.java

Lines changed: 110 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -4,37 +4,7 @@
44
import org.smartboot.socket.extension.multiplex.MultiplexClient;
55
import org.smartboot.socket.transport.AioQuickClient;
66
import org.smartboot.socket.transport.AioSession;
7-
import tech.smartboot.redisun.cmd.AppendCommand;
8-
import tech.smartboot.redisun.cmd.DBSizeCommand;
9-
import tech.smartboot.redisun.cmd.DecrByCommand;
10-
import tech.smartboot.redisun.cmd.DecrCommand;
11-
import tech.smartboot.redisun.cmd.DelCommand;
12-
import tech.smartboot.redisun.cmd.ExistsCommand;
13-
import tech.smartboot.redisun.cmd.ExpireCommand;
14-
import tech.smartboot.redisun.cmd.FlushAllCommand;
15-
import tech.smartboot.redisun.cmd.FlushDbCommand;
16-
import tech.smartboot.redisun.cmd.GetCommand;
17-
import tech.smartboot.redisun.cmd.HGetCommand;
18-
import tech.smartboot.redisun.cmd.HSetCommand;
19-
import tech.smartboot.redisun.cmd.HelloCommand;
20-
import tech.smartboot.redisun.cmd.IncrByCommand;
21-
import tech.smartboot.redisun.cmd.IncrCommand;
22-
import tech.smartboot.redisun.cmd.LPopCommand;
23-
import tech.smartboot.redisun.cmd.LPushCommand;
24-
import tech.smartboot.redisun.cmd.MGetCommand;
25-
import tech.smartboot.redisun.cmd.MSetCommand;
26-
import tech.smartboot.redisun.cmd.RPopCommand;
27-
import tech.smartboot.redisun.cmd.RPushCommand;
28-
import tech.smartboot.redisun.cmd.SAddCommand;
29-
import tech.smartboot.redisun.cmd.SelectCommand;
30-
import tech.smartboot.redisun.cmd.SetCommand;
31-
import tech.smartboot.redisun.cmd.StrlenCommand;
32-
import tech.smartboot.redisun.cmd.TtlCommand;
33-
import tech.smartboot.redisun.cmd.TypeCommand;
34-
import tech.smartboot.redisun.cmd.ZAddCommand;
35-
import tech.smartboot.redisun.cmd.ZRangeCommand;
36-
import tech.smartboot.redisun.cmd.ZRemCommand;
37-
import tech.smartboot.redisun.cmd.ZScoreCommand;
7+
import tech.smartboot.redisun.cmd.*;
388
import tech.smartboot.redisun.resp.Arrays;
399
import tech.smartboot.redisun.resp.BulkStrings;
4010
import tech.smartboot.redisun.resp.Doubles;
@@ -253,8 +223,8 @@ public CompletableFuture<List<ZRangeCommand.Tuple>> asyncZrange(String key, long
253223
options.accept(cmd);
254224
}
255225
return execute(cmd).thenApply(resp -> {
256-
if (resp instanceof tech.smartboot.redisun.resp.Arrays) {
257-
List<RESP> resps = ((tech.smartboot.redisun.resp.Arrays) resp).getValue();
226+
if (resp instanceof Arrays) {
227+
List<RESP> resps = ((Arrays) resp).getValue();
258228
List<ZRangeCommand.Tuple> result = new ArrayList<>(resps.size());
259229
for (RESP r : resps) {
260230
ZRangeCommand.Tuple tuple = new ZRangeCommand.Tuple();
@@ -425,8 +395,8 @@ public List<String> mget(List<String> keys) {
425395
*/
426396
public CompletableFuture<List<String>> asyncMget(List<String> keys) {
427397
return execute(new MGetCommand(keys)).thenApply(resp -> {
428-
if (resp instanceof tech.smartboot.redisun.resp.Arrays) {
429-
List<RESP> resps = ((tech.smartboot.redisun.resp.Arrays) resp).getValue();
398+
if (resp instanceof Arrays) {
399+
List<RESP> resps = ((Arrays) resp).getValue();
430400
List<String> result = new ArrayList<>(resps.size());
431401
for (RESP r : resps) {
432402
if (r instanceof Nulls) {
@@ -1173,4 +1143,108 @@ public CompletableFuture<String> asyncRpop(String key) {
11731143
throw new RedisunException("invalid response:" + resp);
11741144
});
11751145
}
1176-
}
1146+
1147+
/**
1148+
* 发布消息到指定频道
1149+
*
1150+
* @param channel 频道名称
1151+
* @param message 要发布的消息
1152+
* @return 接收到此消息的客户端数量
1153+
*/
1154+
public int publish(String channel, String message) {
1155+
try {
1156+
return asyncPublish(channel, message).get();
1157+
} catch (Exception e) {
1158+
throw new RedisunException(e);
1159+
}
1160+
}
1161+
1162+
/**
1163+
* 异步发布消息到指定频道
1164+
*
1165+
* @param channel 频道名称
1166+
* @param message 要发布的消息
1167+
* @return 接收到此消息的客户端数量
1168+
*/
1169+
public CompletableFuture<Integer> asyncPublish(String channel, String message) {
1170+
return execute(new PublishCommand(channel, message)).thenApply(resp -> {
1171+
if (resp instanceof Integers) {
1172+
return ((Integers) resp).getValue();
1173+
}
1174+
throw new RedisunException("invalid response:" + resp);
1175+
});
1176+
}
1177+
1178+
/**
1179+
* 订阅给定的一个或多个频道
1180+
* 注意:一旦进入订阅状态,连接就不能用于执行其他命令,直到取消订阅
1181+
*
1182+
* @param pubsub 消息回调处理类
1183+
* @param channels 要订阅的频道列表
1184+
* @return 订阅对象
1185+
*/
1186+
public RedisunPubSub subscribe(RedisunPubSub pubsub, String... channels) {
1187+
// 获取零负载的连接,用于独占连接
1188+
AioQuickClient client = findFirstZeroLoadClient();
1189+
client.connectTimeout(0);
1190+
AioSession session = client.getSession();
1191+
RedisSession redisSession = session.getAttachment();
1192+
redisSession.setPubSub(pubsub);
1193+
// 设置取消订阅回调
1194+
pubsub.setUnsubscribe(uChannels -> {
1195+
try {
1196+
new UnsubscribeCommand(uChannels).writeTo(session.writeBuffer());
1197+
session.writeBuffer().flush();
1198+
} catch (IOException e) {
1199+
throw new RedisunException(e);
1200+
}
1201+
});
1202+
// 设置异常关闭时继续订阅
1203+
pubsub.setSubscribe(this::subscribe);
1204+
// 设置释放连接回调
1205+
pubsub.setReleaseClient(() -> {
1206+
multiplexClient.release(client);
1207+
redisSession.setPubSub(null);
1208+
});
1209+
// 执行订阅命令
1210+
try {
1211+
new SubscribeCommand(channels).writeTo(session.writeBuffer());
1212+
session.writeBuffer().flush();
1213+
} catch (IOException e) {
1214+
multiplexClient.release(client);
1215+
redisSession.setPubSub(null);
1216+
throw new RuntimeException(e);
1217+
}
1218+
return pubsub;
1219+
}
1220+
1221+
/**
1222+
* 寻找一个零负载的连接
1223+
* @return 零负载的连接
1224+
*/
1225+
private AioQuickClient findFirstZeroLoadClient() {
1226+
List<AioQuickClient> nonZeroClients = new ArrayList<>();
1227+
try {
1228+
while (true) {
1229+
AioQuickClient client = multiplexClient.acquire();
1230+
AioSession session = client.getSession();
1231+
RedisSession redisSession = session.getAttachment();
1232+
if (redisSession.load() == 0) {
1233+
// 找到零负载连接,先把其他连接归还
1234+
for (int i = nonZeroClients.size() - 1; i >= 0; i--) {
1235+
multiplexClient.reuse(nonZeroClients.get(i));
1236+
}
1237+
if (client != currentClient) {
1238+
return client;
1239+
}
1240+
}
1241+
nonZeroClients.add(client);
1242+
}
1243+
} catch (Throwable e) {
1244+
for (AioQuickClient c : nonZeroClients) {
1245+
multiplexClient.reuse(c);
1246+
}
1247+
throw new RedisunException(e);
1248+
}
1249+
}
1250+
}

0 commit comments

Comments
 (0)