Skip to content

Commit c441b90

Browse files
committed
fix: ProducerSendCallback 构造函数内存泄漏问题,ConsumerMessageListener 内存管理
1 parent 3e0a4b9 commit c441b90

3 files changed

Lines changed: 105 additions & 13 deletions

File tree

AGENTS.md

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
# AI 编码助手项目指令
2+
3+
> **语言要求**: 始终使用中文回复
4+
5+
---
6+
7+
## 🔴 强制规则 (MUST)
8+
9+
以下规则必须严格遵守,违反任何一条都是不可接受的:
10+
11+
12+
### 代码质量检查
13+
编码任务完成前,**必须按顺序执行以下检查**
14+
1. TypeScript 检查必须通过
15+
2. Lint 检查必须通过
16+
3. 单元测试必须全部通过
17+
4. Code Review 必须完成
18+
19+
### 单元测试要求
20+
- **服务端代码必须编写单测**
21+
- **单测覆盖率必须达到 100%**
22+
- **禁止 skip 任何测试用例**
23+
- **必须使用终端工具实际运行单测** - 不能假装运行
24+
25+
---
26+
27+
## 📋 命令参考
28+
29+
### TypeScript 检查
30+
31+
## TIPS
32+
- ts check
33+
```shell
34+
npm run build:ts
35+
```
36+
37+
### Lint 检查
38+
```shell
39+
# TypeScript/JavaScript 文件
40+
npm run lint
41+
42+
```
43+
44+
### 单元测试
45+
```shell
46+
## 可以有选择执行,对 lib 下的 C++ 文件进行编译
47+
./build.sh local
48+
## 然后执行
49+
npm run test <文件路径>
50+
```
51+
52+
53+
## ⚡ 执行流程
54+
55+
完成编码任务的标准流程:
56+
57+
```
58+
1. 理解需求
59+
60+
2. 编写/修改代码
61+
62+
3. 运行 TypeScript 检查
63+
64+
4. 运行 Lint 检查
65+
66+
5. 编写单测 (如需要)
67+
68+
6. 运行单测并等待完成
69+
70+
7. Code Review
71+
72+
8. 如有问题,返回步骤 2 修复
73+
74+
9. 完成
75+
```
76+
77+
---
78+
79+
## ⏳ 单测执行注意事项
80+
81+
- 单测启动和执行需要较长时间,**耐心等待**
82+
- 如支持后台进程读取,可使用 Background process 异步等待
83+
- **绝对禁止使用 sleep 命令等待**
84+

lib/producer.cpp

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -323,15 +323,21 @@ class ProducerSendCallback : public rocketmq::AutoDeleteSendCallback {
323323
callback_scheduled_(false),
324324
callback_completed_(false) {
325325
std::unique_ptr<CleanupContext> ctx(new CleanupContext());
326-
callback_ = Callback::New(env,
327-
callback,
328-
"RocketMQ Send Callback",
329-
0,
330-
1,
331-
ctx.get(),
332-
&Finalize,
333-
static_cast<void*>(nullptr));
334-
cleanup_ctx_ = ctx.release();
326+
try {
327+
callback_ = Callback::New(env,
328+
callback,
329+
"RocketMQ Send Callback",
330+
0,
331+
1,
332+
ctx.get(),
333+
&Finalize,
334+
static_cast<void*>(nullptr));
335+
// 只有在 Callback::New 成功后才释放所有权
336+
cleanup_ctx_ = ctx.release();
337+
} catch (...) {
338+
// 如果 Callback::New 失败,智能指针会自动清理
339+
throw;
340+
}
335341
}
336342

337343
~ProducerSendCallback() {

lib/push_consumer.cpp

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -506,8 +506,9 @@ class ConsumerMessageListener : public rocketmq::MessageListenerConcurrently {
506506
return rocketmq::ConsumeStatus::RECONSUME_LATER;
507507
}
508508

509-
// 使用原始指针管理内存,确保异常安全
510-
auto* data = new MessageAndPromise{msg, std::promise<bool>()};
509+
// 使用智能指针管理内存,确保异常安全
510+
std::unique_ptr<MessageAndPromise> data_ptr(new MessageAndPromise{msg, std::promise<bool>()});
511+
auto* data = data_ptr.get();
511512
auto future = data->promise.get_future();
512513

513514
#if defined(ROCKETMQ_COVERAGE) || defined(ROCKETMQ_USE_STUB)
@@ -548,17 +549,18 @@ class ConsumerMessageListener : public rocketmq::MessageListenerConcurrently {
548549
#else
549550
// Check if we're already aborted before making the call
550551
if (aborted_.load() || shutdown_requested_.load()) {
551-
delete data;
552552
return rocketmq::ConsumeStatus::RECONSUME_LATER;
553553
}
554554

555555
status = listener_.BlockingCall(data);
556556
#endif
557557
if (status != napi_ok) {
558-
delete data;
559558
return rocketmq::ConsumeStatus::RECONSUME_LATER;
560559
}
561560

561+
// 成功调用后释放智能指针的所有权,让 CallConsumerMessageJsListener 管理内存
562+
data_ptr.release();
563+
562564
try {
563565
#if defined(ROCKETMQ_COVERAGE) || defined(ROCKETMQ_USE_STUB)
564566
if (IsEnvEnabled("ROCKETMQ_STUB_CONSUMER_FORCE_FUTURE_ERROR")) {

0 commit comments

Comments
 (0)