Skip to content

Commit 3e3faef

Browse files
committed
fix: 抽象 common_utils, 修复了线程安全问题以及内存泄漏风险
1 parent 04093a8 commit 3e3faef

5 files changed

Lines changed: 143 additions & 130 deletions

File tree

lib/common_utils.h

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
#ifndef __ROCKETMQ_COMMON_UTILS_H__
18+
#define __ROCKETMQ_COMMON_UTILS_H__
19+
20+
#include <cstdlib>
21+
#include <chrono>
22+
23+
namespace __node_rocketmq__ {
24+
25+
#if defined(ROCKETMQ_COVERAGE) || defined(ROCKETMQ_USE_STUB)
26+
inline bool IsEnvEnabled(const char* name) {
27+
const char* value = std::getenv(name);
28+
if (value == nullptr) {
29+
return false;
30+
}
31+
return value[0] != '\0' && value[0] != '0';
32+
}
33+
#endif
34+
35+
// 配置常量
36+
namespace config {
37+
constexpr std::chrono::seconds DEFAULT_MESSAGE_TIMEOUT{30};
38+
constexpr int MAX_BACKTRACE_FRAMES = 64;
39+
}
40+
41+
} // namespace __node_rocketmq__
42+
43+
#endif

lib/consumer_ack.cpp

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -22,21 +22,10 @@
2222
#include <napi.h>
2323

2424
#include "addon_data.h"
25+
#include "common_utils.h"
2526

2627
namespace __node_rocketmq__ {
2728

28-
#if defined(ROCKETMQ_COVERAGE) || defined(ROCKETMQ_USE_STUB)
29-
namespace {
30-
bool IsEnvEnabled(const char* name) {
31-
const char* value = std::getenv(name);
32-
if (value == nullptr) {
33-
return false;
34-
}
35-
return value[0] != '\0' && value[0] != '0';
36-
}
37-
}
38-
#endif
39-
4029
Napi::Object ConsumerAck::Init(Napi::Env env, Napi::Object exports, AddonData* addon_data) {
4130
Napi::Function func = DefineClass(
4231
env, "ConsumerAck", {InstanceMethod<&ConsumerAck::Done>("done")});
@@ -71,7 +60,11 @@ void ConsumerAck::Done(std::exception_ptr exception) {
7160
if (!done_called_.exchange(true)) {
7261
try {
7362
promise_.set_exception(exception);
74-
} catch (const std::future_error&) {
63+
} catch (const std::future_error& e) {
64+
// 记录错误但不抛出异常,因为这可能在析构函数中调用
65+
fprintf(stderr, "[RocketMQ] Warning: Failed to set exception in ConsumerAck::Done: %s\n", e.what());
66+
} catch (const std::exception& e) {
67+
fprintf(stderr, "[RocketMQ] Warning: Unexpected error in ConsumerAck::Done: %s\n", e.what());
7568
}
7669
}
7770
}
@@ -99,7 +92,10 @@ Napi::Value ConsumerAck::Done(const Napi::CallbackInfo& info) {
9992
#endif
10093
try {
10194
promise_.set_value(ack);
102-
} catch (const std::future_error&) {
95+
} catch (const std::future_error& e) {
96+
fprintf(stderr, "[RocketMQ] Warning: Failed to set value in ConsumerAck::Done: %s\n", e.what());
97+
} catch (const std::exception& e) {
98+
fprintf(stderr, "[RocketMQ] Warning: Unexpected error in ConsumerAck::Done: %s\n", e.what());
10399
}
104100
return info.Env().Undefined();
105101
}

lib/producer.cpp

Lines changed: 45 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -31,21 +31,10 @@
3131
#include <SendCallback.h>
3232

3333
#include "addon_data.h"
34+
#include "common_utils.h"
3435

3536
namespace __node_rocketmq__ {
3637

37-
#if defined(ROCKETMQ_COVERAGE) || defined(ROCKETMQ_USE_STUB)
38-
namespace {
39-
bool IsEnvEnabled(const char* name) {
40-
const char* value = std::getenv(name);
41-
if (value == nullptr) {
42-
return false;
43-
}
44-
return value[0] != '\0' && value[0] != '0';
45-
}
46-
}
47-
#endif
48-
4938
Napi::Object RocketMQProducer::Init(Napi::Env env, Napi::Object exports, AddonData* addon_data) {
5039
Napi::Function func =
5140
DefineClass(env,
@@ -204,27 +193,24 @@ class ProducerStartWorker : public Napi::AsyncWorker {
204193
wrapper_(wrapper) {}
205194

206195
void Execute() override {
207-
// 只在状态检查时持有锁
208-
{
209-
std::lock_guard<std::mutex> lock(wrapper_->state_mutex_);
210-
211-
if (wrapper_->is_destroyed_.load()) {
212-
SetError("Producer has been destroyed");
213-
return;
214-
}
215-
216-
if (wrapper_->is_started_.load()) {
217-
SetError("Producer is already started");
218-
return;
219-
}
220-
221-
if (wrapper_->is_shutting_down_.load()) {
222-
SetError("Producer is shutting down");
223-
return;
224-
}
196+
// 在整个操作期间持有锁以避免竞态条件
197+
std::lock_guard<std::mutex> lock(wrapper_->state_mutex_);
198+
199+
if (wrapper_->is_destroyed_.load()) {
200+
SetError("Producer has been destroyed");
201+
return;
202+
}
203+
204+
if (wrapper_->is_started_.load()) {
205+
SetError("Producer is already started");
206+
return;
207+
}
208+
209+
if (wrapper_->is_shutting_down_.load()) {
210+
SetError("Producer is shutting down");
211+
return;
225212
}
226213

227-
// 锁释放后执行耗时操作
228214
try {
229215
producer_->start();
230216
wrapper_->is_started_.store(true);
@@ -265,27 +251,24 @@ class ProducerShutdownWorker : public Napi::AsyncWorker {
265251
wrapper_(wrapper) {}
266252

267253
void Execute() override {
268-
// 只在状态检查时持有锁
269-
{
270-
std::lock_guard<std::mutex> lock(wrapper_->state_mutex_);
271-
272-
if (wrapper_->is_destroyed_.load()) {
273-
SetError("Producer has been destroyed");
274-
return;
275-
}
276-
277-
if (!wrapper_->is_started_.load()) {
278-
SetError("Producer is not started");
279-
return;
280-
}
281-
282-
if (wrapper_->is_shutting_down_.exchange(true)) {
283-
SetError("Producer is already shutting down");
284-
return;
285-
}
254+
// 在整个操作期间持有锁以避免竞态条件
255+
std::lock_guard<std::mutex> lock(wrapper_->state_mutex_);
256+
257+
if (wrapper_->is_destroyed_.load()) {
258+
SetError("Producer has been destroyed");
259+
return;
260+
}
261+
262+
if (!wrapper_->is_started_.load()) {
263+
SetError("Producer is not started");
264+
return;
265+
}
266+
267+
if (wrapper_->is_shutting_down_.exchange(true)) {
268+
SetError("Producer is already shutting down");
269+
return;
286270
}
287271

288-
// 锁释放后执行耗时操作
289272
try {
290273
producer_->shutdown();
291274
wrapper_->is_started_.store(false);
@@ -337,7 +320,8 @@ class ProducerSendCallback : public rocketmq::AutoDeleteSendCallback {
337320
cleanup_ctx_(nullptr),
338321
callback_(),
339322
prevent_prevent_release_(false),
340-
callback_scheduled_(false) {
323+
callback_scheduled_(false),
324+
callback_completed_(false) {
341325
std::unique_ptr<CleanupContext> ctx(new CleanupContext());
342326
callback_ = Callback::New(env,
343327
callback,
@@ -401,6 +385,8 @@ class ProducerSendCallback : public rocketmq::AutoDeleteSendCallback {
401385
if (status != napi_ok) {
402386
fprintf(stderr, "[RocketMQ] Failed to schedule JavaScript callback: %d\n", status);
403387
cleanup_ctx_->pending.reset(data);
388+
} else {
389+
callback_completed_.store(true);
404390
}
405391

406392
if (!prevent_prevent_release_.exchange(true)) {
@@ -467,6 +453,7 @@ class ProducerSendCallback : public rocketmq::AutoDeleteSendCallback {
467453
Callback callback_;
468454
std::atomic<bool> prevent_prevent_release_;
469455
std::atomic<bool> callback_scheduled_;
456+
std::atomic<bool> callback_completed_;
470457
};
471458

472459
Napi::Value RocketMQProducer::Send(const Napi::CallbackInfo& info) {
@@ -542,12 +529,15 @@ Napi::Value RocketMQProducer::Send(const Napi::CallbackInfo& info) {
542529
}
543530
}
544531

545-
auto* send_callback =
546-
new ProducerSendCallback(env, Napi::Persistent(Value()), info[3].As<Napi::Function>());
532+
// 使用智能指针管理回调对象的生命周期
533+
std::unique_ptr<ProducerSendCallback> send_callback(
534+
new ProducerSendCallback(env, Napi::Persistent(Value()), info[3].As<Napi::Function>()));
535+
547536
try {
548-
producer_.send(message, send_callback);
537+
// 转移所有权给 RocketMQ,成功后智能指针释放所有权
538+
producer_.send(message, send_callback.release());
549539
} catch (const std::exception& e) {
550-
delete send_callback;
540+
// 如果发送失败,智能指针会自动清理回调对象
551541
Napi::Error::New(env, e.what()).ThrowAsJavaScriptException();
552542
return env.Undefined();
553543
}

0 commit comments

Comments
 (0)