Skip to content

Commit 698ca5d

Browse files
committed
refactor(core):优化Redis会话处理逻辑
- 引入线程持有机制,确保数据写入的线程安全性- 移除冗余的异常打印代码,统一异常处理方式- 优化空字符串处理,复用空输出流实例- 调整客户端连接的获取与释放时机,提高资源利用率 - 添加线程安全检查,防止跨线程操作缓冲区
1 parent c6a0ec7 commit 698ca5d

4 files changed

Lines changed: 24 additions & 10 deletions

File tree

src/main/java/tech/smartboot/redisun/RedisMessageProcessor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ public void stateEvent0(AioSession session, StateMachineEnum stateMachineEnum, T
126126

127127
// 如果有异常发生,打印异常堆栈信息
128128
if (throwable != null) {
129-
throwable.printStackTrace();
129+
// throwable.printStackTrace();
130130
}
131131
}
132132
}

src/main/java/tech/smartboot/redisun/RedisSession.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,15 @@ final class RedisSession {
3434
*/
3535
private RESP decodingResponse;
3636

37+
private Thread holdThread;
38+
39+
public Thread getHoldThread() {
40+
return holdThread;
41+
}
42+
43+
public void setHoldThread(Thread holdThread) {
44+
this.holdThread = holdThread;
45+
}
3746

3847
/**
3948
* 获取正在解码的响应对象

src/main/java/tech/smartboot/redisun/Redisun.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -510,10 +510,10 @@ private CompletableFuture<RESP> execute(Command command) {
510510
// 创建用于接收结果的CompletableFuture
511511
CompletableFuture<RESP> future = new CompletableFuture<>();
512512
// 设置异常处理
513-
future.exceptionally(t -> {
514-
t.printStackTrace();
515-
return null;
516-
});
513+
// future.exceptionally(t -> {
514+
// t.printStackTrace();
515+
// return null;
516+
// });
517517
AioQuickClient client = null;
518518
try {
519519
// 构建命令参数
@@ -524,17 +524,21 @@ private CompletableFuture<RESP> execute(Command command) {
524524
// 获取可用的客户端连接
525525
client = multiplexClient.acquire();
526526
AioSession session = client.getSession();
527-
527+
RedisSession redisSession = session.getAttachment();
528+
redisSession.setHoldThread(Thread.currentThread());
529+
multiplexClient.reuse(client);
528530
synchronized (client) {
529-
RedisSession redisSession = session.getAttachment();
531+
530532
// 设置当前命令的future
531533
redisSession.offer(future);
532534
// 将命令编码为RESP格式并写入缓冲区
533535
arrays.writeTo(session.writeBuffer());
534536
}
535-
multiplexClient.reuse(client);
537+
536538
// 刷新缓冲区,发送数据
537-
session.writeBuffer().flush();
539+
if (redisSession.getHoldThread() == Thread.currentThread()) {
540+
session.writeBuffer().flush();
541+
}
538542
} catch (Throwable e) {
539543
// 发生异常时完成future
540544
if (client != null) {

src/main/java/tech/smartboot/redisun/resp/BulkStrings.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ public class BulkStrings extends RESP<String> {
4343

4444
// 用于存储字符串数据的输出流
4545
private ByteArrayOutputStream out;
46+
private final static ByteArrayOutputStream EMPTY_OUTPUT_STREAM = new ByteArrayOutputStream(0);
4647

4748
/**
4849
* 私有构造函数,防止外部直接实例化
@@ -73,7 +74,7 @@ public boolean decode(ByteBuffer readBuffer) {
7374
if (length == 0) {
7475
// 空字符串
7576
state = DECODE_STATE_END;
76-
out = new ByteArrayOutputStream(0);
77+
out = EMPTY_OUTPUT_STREAM;
7778
} else if (length > 0) {
7879
// 初始化输出流以存储字符串数据
7980
out = new ByteArrayOutputStream(length);

0 commit comments

Comments
 (0)