Skip to content

Commit 61e0c36

Browse files
committed
fix: 优化 c++ bindings 的析构函数和生命周期管理
1 parent 76163ac commit 61e0c36

9 files changed

Lines changed: 481 additions & 56 deletions

File tree

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
1-
"use strict";
2-
3-
module.exports = {
1+
export const config = {
42
nameServer: '127.0.0.1:9876'
53
// nameServer: 'onsaddr.cn-hangzhou.mq.aliyuncs.com:80'
64
// nameServer: 'v0-default-<instanceId>.mq.cloudrun.local:9515'
Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,36 @@
1-
"use strict";
2-
3-
if (typeof Promise.withResolvers === 'undefined') {
4-
Promise.withResolvers = function () {
5-
let resolve, reject
1+
import * as readline from 'readline';
2+
import { config } from './common';
3+
import { Producer } from '../';
4+
5+
if (typeof (Promise as any).withResolvers === 'undefined') {
6+
(Promise as any).withResolvers = function () {
7+
let resolve: ((value?: any) => void) | null = null;
8+
let reject: ((reason?: any) => void) | null = null;
69
const promise = new Promise((res, rej) => {
7-
resolve = res
8-
reject = rej
9-
})
10-
return { promise, resolve, reject }
11-
}
10+
resolve = res;
11+
reject = rej;
12+
});
13+
return { promise, resolve, reject };
14+
};
1215
}
1316

14-
const readline = require('readline');
15-
const common = require('./common');
16-
const Producer = require('../').Producer;
17-
1817
const rl = readline.createInterface({
1918
input: process.stdin,
2019
output: process.stdout
2120
});
2221

2322
void async function () {
2423
const producer = new Producer('GID_GROUP', {
25-
nameServer: common.nameServer,
24+
nameServer: config.nameServer,
2625
});
2726

2827
// producer.setSessionCredentials('accessKey', 'secretKey', 'ALIYUN')
2928

3029
await producer.start();
3130

3231
while(1) {
33-
const { promise, resolve, reject } = Promise.withResolvers()
34-
rl.question('Enter input: ', async input => {
32+
const { promise, resolve, reject } = (Promise as any).withResolvers();
33+
rl.question('Enter input: ', async (input: string) => {
3534
await producer.send('TP_TOPIC', input).catch(reject);
3635
resolve();
3736
});
Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,18 @@
1-
"use strict";
2-
3-
const common = require('./common');
4-
const PushConsumer = require('../').PushConsumer;
1+
import { config } from './common';
2+
import { PushConsumer, Message, ConsumerAck } from '../';
53

64
void function () {
75
const consumer = new PushConsumer('GID_GROUP', {
8-
nameServer: common.nameServer,
6+
nameServer: config.nameServer,
97
});
108

119
// consumer.setSessionCredentials('accessKey', 'secretKey', 'ALIYUN');
1210

1311
consumer.subscribe('TP_TOPIC', '*');
14-
consumer.on('message', function(msg, ack) {
15-
console.log(msg)
16-
ack.done();
17-
})
12+
consumer.on('message', function(msg: Message, ack: ConsumerAck) {
13+
console.log(msg);
14+
ack.done(true);
15+
});
1816

1917
consumer.start(function() {
2018
console.log('consumer started');
Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
const { RocketMQProducer } = require('../src/producer');
2-
const { Status } = require('../src/constants');
1+
import { RocketMQProducer } from '../src/producer';
2+
import { Status } from '../src/constants';
33

4-
async function demonstrateStatusBehavior() {
4+
async function demonstrateStatusBehavior(): Promise<void> {
55
const producer = new RocketMQProducer('demo-group');
66

77
console.log('=== 新的状态行为演示 ===');
@@ -21,14 +21,14 @@ async function demonstrateStatusBehavior() {
2121
try {
2222
await promise1;
2323
console.log('第一个 start() 调用成功');
24-
} catch (err) {
24+
} catch (err: any) {
2525
console.log('第一个 start() 调用失败:', err.message);
2626
}
2727

2828
try {
2929
await promise2;
3030
console.log('第二个 start() 调用成功');
31-
} catch (err) {
31+
} catch (err: any) {
3232
console.log('第二个 start() 调用失败:', err.message);
3333
}
3434

@@ -37,7 +37,7 @@ async function demonstrateStatusBehavior() {
3737
console.log('最终状态:', getStatusName(producer.status)); // STOPPED
3838
}
3939

40-
function getStatusName(status) {
40+
function getStatusName(status: Status): string {
4141
switch (status) {
4242
case Status.STOPPED: return 'STOPPED';
4343
case Status.STARTED: return 'STARTED';

lib/producer.cpp

Lines changed: 113 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,28 @@ RocketMQProducer::RocketMQProducer(const Napi::CallbackInfo& info)
8484
}
8585

8686
RocketMQProducer::~RocketMQProducer() {
87-
producer_.shutdown();
87+
SafeShutdown();
88+
}
89+
90+
void RocketMQProducer::SafeShutdown() {
91+
std::lock_guard<std::mutex> lock(state_mutex_);
92+
93+
if (is_destroyed_.exchange(true)) {
94+
return; // Already destroyed
95+
}
96+
97+
if (is_started_.load() && !is_shutting_down_.exchange(true)) {
98+
try {
99+
producer_.shutdown();
100+
} catch (const std::exception& e) {
101+
// Log error but don't throw in destructor
102+
fprintf(stderr, "[RocketMQ] Warning: Producer shutdown failed in destructor: %s\n", e.what());
103+
} catch (...) {
104+
fprintf(stderr, "[RocketMQ] Warning: Unknown error during producer shutdown in destructor\n");
105+
}
106+
}
107+
108+
is_started_.store(false);
88109
}
89110

90111
void RocketMQProducer::SetOptions(const Napi::Object& options) {
@@ -179,11 +200,30 @@ class ProducerStartWorker : public Napi::AsyncWorker {
179200
RocketMQProducer* wrapper)
180201
: Napi::AsyncWorker(callback),
181202
wrapper_ref_(Napi::Persistent(wrapper->Value())),
182-
producer_(&wrapper->producer_) {}
203+
producer_(&wrapper->producer_),
204+
wrapper_(wrapper) {}
183205

184206
void Execute() override {
207+
std::lock_guard<std::mutex> lock(wrapper_->state_mutex_);
208+
209+
if (wrapper_->is_destroyed_.load()) {
210+
SetError("Producer has been destroyed");
211+
return;
212+
}
213+
214+
if (wrapper_->is_started_.load()) {
215+
SetError("Producer is already started");
216+
return;
217+
}
218+
219+
if (wrapper_->is_shutting_down_.load()) {
220+
SetError("Producer is shutting down");
221+
return;
222+
}
223+
185224
try {
186225
producer_->start();
226+
wrapper_->is_started_.store(true);
187227
} catch (const std::exception& e) {
188228
SetError(e.what());
189229
}
@@ -192,6 +232,7 @@ class ProducerStartWorker : public Napi::AsyncWorker {
192232
private:
193233
Napi::ObjectReference wrapper_ref_;
194234
rocketmq::DefaultMQProducer* producer_;
235+
RocketMQProducer* wrapper_;
195236
};
196237

197238
Napi::Value RocketMQProducer::Start(const Napi::CallbackInfo& info) {
@@ -216,19 +257,41 @@ class ProducerShutdownWorker : public Napi::AsyncWorker {
216257
RocketMQProducer* wrapper)
217258
: Napi::AsyncWorker(callback),
218259
wrapper_ref_(Napi::Persistent(wrapper->Value())),
219-
producer_(&wrapper->producer_) {}
260+
producer_(&wrapper->producer_),
261+
wrapper_(wrapper) {}
220262

221263
void Execute() override {
264+
std::lock_guard<std::mutex> lock(wrapper_->state_mutex_);
265+
266+
if (wrapper_->is_destroyed_.load()) {
267+
SetError("Producer has been destroyed");
268+
return;
269+
}
270+
271+
if (!wrapper_->is_started_.load()) {
272+
SetError("Producer is not started");
273+
return;
274+
}
275+
276+
if (wrapper_->is_shutting_down_.exchange(true)) {
277+
SetError("Producer is already shutting down");
278+
return;
279+
}
280+
222281
try {
223282
producer_->shutdown();
283+
wrapper_->is_started_.store(false);
284+
wrapper_->is_shutting_down_.store(false); // Reset shutdown flag after successful shutdown
224285
} catch (const std::exception& e) {
286+
wrapper_->is_shutting_down_.store(false); // Reset on error
225287
SetError(e.what());
226288
}
227289
}
228290

229291
private:
230292
Napi::ObjectReference wrapper_ref_;
231293
rocketmq::DefaultMQProducer* producer_;
294+
RocketMQProducer* wrapper_;
232295
};
233296

234297
Napi::Value RocketMQProducer::Shutdown(const Napi::CallbackInfo& info) {
@@ -265,7 +328,8 @@ class ProducerSendCallback : public rocketmq::AutoDeleteSendCallback {
265328
: prevent_gc_(new Napi::ObjectReference(std::move(producer_ref))),
266329
cleanup_ctx_(nullptr),
267330
callback_(),
268-
prevent_prevent_release_(false) {
331+
prevent_prevent_release_(false),
332+
callback_scheduled_(false) {
269333
std::unique_ptr<CleanupContext> ctx(new CleanupContext());
270334
callback_ = Callback::New(env,
271335
callback,
@@ -280,7 +344,13 @@ class ProducerSendCallback : public rocketmq::AutoDeleteSendCallback {
280344

281345
~ProducerSendCallback() {
282346
if (!prevent_prevent_release_.exchange(true)) {
283-
callback_.Abort();
347+
try {
348+
callback_.Abort();
349+
} catch (const std::exception& e) {
350+
fprintf(stderr, "[RocketMQ] Warning: Error aborting send callback: %s\n", e.what());
351+
} catch (...) {
352+
fprintf(stderr, "[RocketMQ] Warning: Unknown error aborting send callback\n");
353+
}
284354
}
285355
}
286356

@@ -297,6 +367,11 @@ class ProducerSendCallback : public rocketmq::AutoDeleteSendCallback {
297367
private:
298368
void ScheduleCallback(std::unique_ptr<rocketmq::SendResult> result,
299369
std::exception_ptr exception) {
370+
// Prevent multiple callback scheduling
371+
if (callback_scheduled_.exchange(true)) {
372+
return;
373+
}
374+
300375
auto prevent_gc = std::move(prevent_gc_);
301376
auto* data = new CallbackData{
302377
std::move(result),
@@ -315,12 +390,18 @@ class ProducerSendCallback : public rocketmq::AutoDeleteSendCallback {
315390
status = callback_.BlockingCall(data);
316391
#endif
317392
if (status != napi_ok) {
318-
fprintf(stderr, "Failed to schedule JavaScript callback: %d\n", status);
393+
fprintf(stderr, "[RocketMQ] Failed to schedule JavaScript callback: %d\n", status);
319394
cleanup_ctx_->pending.reset(data);
320395
}
321396

322397
if (!prevent_prevent_release_.exchange(true)) {
323-
callback_.Release();
398+
try {
399+
callback_.Release();
400+
} catch (const std::exception& e) {
401+
fprintf(stderr, "[RocketMQ] Warning: Error releasing send callback: %s\n", e.what());
402+
} catch (...) {
403+
fprintf(stderr, "[RocketMQ] Warning: Unknown error releasing send callback\n");
404+
}
324405
}
325406
}
326407

@@ -357,6 +438,10 @@ class ProducerSendCallback : public rocketmq::AutoDeleteSendCallback {
357438
}
358439
} catch (const Napi::Error& e) {
359440
e.ThrowAsJavaScriptException();
441+
} catch (const std::exception& e) {
442+
fprintf(stderr, "[RocketMQ] Warning: Exception in send callback: %s\n", e.what());
443+
} catch (...) {
444+
fprintf(stderr, "[RocketMQ] Warning: Unknown exception in send callback\n");
360445
}
361446
}
362447

@@ -372,12 +457,13 @@ class ProducerSendCallback : public rocketmq::AutoDeleteSendCallback {
372457
CleanupContext* cleanup_ctx_;
373458
Callback callback_;
374459
std::atomic<bool> prevent_prevent_release_;
460+
std::atomic<bool> callback_scheduled_;
375461
};
376462

377463
Napi::Value RocketMQProducer::Send(const Napi::CallbackInfo& info) {
378464
Napi::Env env = info.Env();
379465

380-
// Check if required parameters are provided
466+
// Check if required parameters are provided FIRST (before state checks)
381467
if (info.Length() < 4) {
382468
Napi::TypeError::New(env, "Wrong number of arguments").ThrowAsJavaScriptException();
383469
return env.Undefined();
@@ -393,6 +479,25 @@ Napi::Value RocketMQProducer::Send(const Napi::CallbackInfo& info) {
393479
return env.Undefined();
394480
}
395481

482+
// Check if producer is in valid state AFTER parameter validation
483+
{
484+
std::lock_guard<std::mutex> lock(state_mutex_);
485+
if (is_destroyed_.load()) {
486+
Napi::Error::New(env, "Producer has been destroyed").ThrowAsJavaScriptException();
487+
return env.Undefined();
488+
}
489+
490+
if (!is_started_.load()) {
491+
Napi::Error::New(env, "Producer is not started").ThrowAsJavaScriptException();
492+
return env.Undefined();
493+
}
494+
495+
if (is_shutting_down_.load()) {
496+
Napi::Error::New(env, "Producer is shutting down").ThrowAsJavaScriptException();
497+
return env.Undefined();
498+
}
499+
}
500+
396501
rocketmq::MQMessage message = [&]() {
397502
Napi::String topic = info[0].As<Napi::String>();
398503
Napi::Value body = info[1];

lib/producer.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
#define __ROCKETMQ_PRODUCER_H__
1919

2020
#include <napi.h>
21+
#include <atomic>
22+
#include <mutex>
2123

2224
#include <DefaultMQProducer.h>
2325

@@ -46,9 +48,14 @@ class RocketMQProducer : public Napi::ObjectWrap<RocketMQProducer> {
4648

4749
private:
4850
void SetOptions(const Napi::Object& options);
51+
void SafeShutdown();
4952

5053
private:
5154
rocketmq::DefaultMQProducer producer_;
55+
std::atomic<bool> is_started_{false};
56+
std::atomic<bool> is_shutting_down_{false};
57+
std::atomic<bool> is_destroyed_{false};
58+
mutable std::mutex state_mutex_;
5259
};
5360

5461
} // namespace __node_rocketmq__

0 commit comments

Comments
 (0)