Skip to content

Commit 628650e

Browse files
committed
fix: C++ bindings 新增 utils,抽象日志等公共处理, ts 状态处理与 C++ 保持一致
1 parent c441b90 commit 628650e

8 files changed

Lines changed: 112 additions & 119 deletions

File tree

index.d.ts

Lines changed: 0 additions & 16 deletions
This file was deleted.

index.js

Lines changed: 0 additions & 4 deletions
This file was deleted.

lib/common_utils.cpp

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
#include "common_utils.h"
18+
19+
#include <LoggerConfig.h>
20+
21+
namespace __node_rocketmq__ {
22+
namespace utils {
23+
24+
void SetLoggerOptions(const Napi::Object& options) {
25+
// set log level
26+
Napi::Value log_level = options.Get("logLevel");
27+
if (log_level.IsNumber()) {
28+
int32_t level = log_level.ToNumber();
29+
if (level >= 0 && level < rocketmq::LogLevel::LOG_LEVEL_LEVEL_NUM) {
30+
rocketmq::GetDefaultLoggerConfig().set_level(
31+
static_cast<rocketmq::LogLevel>(level));
32+
}
33+
}
34+
35+
// set log directory
36+
Napi::Value log_dir = options.Get("logDir");
37+
if (log_dir.IsString()) {
38+
rocketmq::GetDefaultLoggerConfig().set_path(log_dir.ToString());
39+
}
40+
41+
// set log file size
42+
Napi::Value log_file_size = options.Get("logFileSize");
43+
if (log_file_size.IsNumber()) {
44+
rocketmq::GetDefaultLoggerConfig().set_file_size(log_file_size.ToNumber());
45+
}
46+
47+
// set log file num
48+
Napi::Value log_file_num = options.Get("logFileNum");
49+
if (log_file_num.IsNumber()) {
50+
rocketmq::GetDefaultLoggerConfig().set_file_count(log_file_num.ToNumber());
51+
}
52+
}
53+
54+
bool ValidateStringArguments(const Napi::CallbackInfo& info, size_t count, const char* error_msg) {
55+
if (info.Length() < count) {
56+
Napi::TypeError::New(info.Env(), "Wrong number of arguments").ThrowAsJavaScriptException();
57+
return false;
58+
}
59+
60+
for (size_t i = 0; i < count; ++i) {
61+
if (!info[i].IsString()) {
62+
Napi::TypeError::New(info.Env(), error_msg).ThrowAsJavaScriptException();
63+
return false;
64+
}
65+
}
66+
67+
return true;
68+
}
69+
70+
bool ValidateCallback(const Napi::CallbackInfo& info, size_t index, const char* error_msg) {
71+
if (info.Length() <= index || !info[index].IsFunction()) {
72+
Napi::TypeError::New(info.Env(), error_msg).ThrowAsJavaScriptException();
73+
return false;
74+
}
75+
return true;
76+
}
77+
78+
} // namespace utils
79+
} // namespace __node_rocketmq__

lib/common_utils.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
#include <cstdlib>
2121
#include <chrono>
22+
#include <napi.h>
2223

2324
namespace __node_rocketmq__ {
2425

@@ -38,6 +39,16 @@ namespace config {
3839
constexpr int MAX_BACKTRACE_FRAMES = 64;
3940
}
4041

42+
// 通用工具函数
43+
namespace utils {
44+
// 设置日志配置的通用函数
45+
void SetLoggerOptions(const Napi::Object& options);
46+
47+
// 参数验证辅助函数
48+
bool ValidateStringArguments(const Napi::CallbackInfo& info, size_t count, const char* error_msg);
49+
bool ValidateCallback(const Napi::CallbackInfo& info, size_t index, const char* error_msg);
50+
}
51+
4152
} // namespace __node_rocketmq__
4253

4354
#endif

lib/producer.cpp

Lines changed: 8 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -128,47 +128,16 @@ void RocketMQProducer::SetOptions(const Napi::Object& options) {
128128
producer_.set_send_msg_timeout(send_message_timeout.ToNumber());
129129
}
130130

131-
// set log level
132-
Napi::Value log_level = options.Get("logLevel");
133-
if (log_level.IsNumber()) {
134-
int32_t level = log_level.ToNumber();
135-
if (level >= 0 && level < rocketmq::LogLevel::LOG_LEVEL_LEVEL_NUM) {
136-
rocketmq::GetDefaultLoggerConfig().set_level(
137-
static_cast<rocketmq::LogLevel>(level));
138-
}
139-
}
140-
141-
// set log directory
142-
Napi::Value log_dir = options.Get("logDir");
143-
if (log_dir.IsString()) {
144-
rocketmq::GetDefaultLoggerConfig().set_path(log_dir.ToString());
145-
}
146-
147-
// set log file size
148-
Napi::Value log_file_size = options.Get("logFileSize");
149-
if (log_file_size.IsNumber()) {
150-
rocketmq::GetDefaultLoggerConfig().set_file_size(log_file_size.ToNumber());
151-
}
152-
153-
// set log file num
154-
Napi::Value log_file_num = options.Get("logFileNum");
155-
if (log_file_num.IsNumber()) {
156-
rocketmq::GetDefaultLoggerConfig().set_file_count(log_file_num.ToNumber());
157-
}
131+
// 使用通用的日志配置函数
132+
utils::SetLoggerOptions(options);
158133
}
159134

160135
Napi::Value RocketMQProducer::SetSessionCredentials(
161136
const Napi::CallbackInfo& info) {
162137
Napi::Env env = info.Env();
163138

164-
// Check if required parameters are provided
165-
if (info.Length() < 3) {
166-
Napi::TypeError::New(env, "Wrong number of arguments").ThrowAsJavaScriptException();
167-
return env.Undefined();
168-
}
169-
170-
if (!info[0].IsString() || !info[1].IsString() || !info[2].IsString()) {
171-
Napi::TypeError::New(env, "All arguments must be strings").ThrowAsJavaScriptException();
139+
// 使用通用的参数验证函数
140+
if (!utils::ValidateStringArguments(info, 3, "All arguments must be strings")) {
172141
return env.Undefined();
173142
}
174143

@@ -228,9 +197,8 @@ class ProducerStartWorker : public Napi::AsyncWorker {
228197
Napi::Value RocketMQProducer::Start(const Napi::CallbackInfo& info) {
229198
Napi::Env env = info.Env();
230199

231-
// Check if callback is provided and is a function
232-
if (info.Length() < 1 || !info[0].IsFunction()) {
233-
Napi::TypeError::New(env, "Function expected as first argument").ThrowAsJavaScriptException();
200+
// 使用通用的回调验证函数
201+
if (!utils::ValidateCallback(info, 0, "Function expected as first argument")) {
234202
return env.Undefined();
235203
}
236204

@@ -288,9 +256,8 @@ class ProducerShutdownWorker : public Napi::AsyncWorker {
288256
Napi::Value RocketMQProducer::Shutdown(const Napi::CallbackInfo& info) {
289257
Napi::Env env = info.Env();
290258

291-
// Check if callback is provided and is a function
292-
if (info.Length() < 1 || !info[0].IsFunction()) {
293-
Napi::TypeError::New(env, "Function expected as first argument").ThrowAsJavaScriptException();
259+
// 使用通用的回调验证函数
260+
if (!utils::ValidateCallback(info, 0, "Function expected as first argument")) {
294261
return env.Undefined();
295262
}
296263

lib/push_consumer.cpp

Lines changed: 10 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -132,47 +132,16 @@ void RocketMQPushConsumer::SetOptions(const Napi::Object& options) {
132132
consumer_.set_max_reconsume_times(max_reconsume_times.ToNumber());
133133
}
134134

135-
// set log level
136-
Napi::Value log_level = options.Get("logLevel");
137-
if (log_level.IsNumber()) {
138-
int32_t level = log_level.ToNumber();
139-
if (level >= 0 && level < rocketmq::LogLevel::LOG_LEVEL_LEVEL_NUM) {
140-
rocketmq::GetDefaultLoggerConfig().set_level(
141-
static_cast<rocketmq::LogLevel>(level));
142-
}
143-
}
144-
145-
// set log directory
146-
Napi::Value log_dir = options.Get("logDir");
147-
if (log_dir.IsString()) {
148-
rocketmq::GetDefaultLoggerConfig().set_path(log_dir.ToString());
149-
}
150-
151-
// set log file size
152-
Napi::Value log_file_size = options.Get("logFileSize");
153-
if (log_file_size.IsNumber()) {
154-
rocketmq::GetDefaultLoggerConfig().set_file_size(log_file_size.ToNumber());
155-
}
156-
157-
// set log file num
158-
Napi::Value log_file_num = options.Get("logFileNum");
159-
if (log_file_num.IsNumber()) {
160-
rocketmq::GetDefaultLoggerConfig().set_file_count(log_file_num.ToNumber());
161-
}
135+
// 使用通用的日志配置函数
136+
utils::SetLoggerOptions(options);
162137
}
163138

164139
Napi::Value RocketMQPushConsumer::SetSessionCredentials(
165140
const Napi::CallbackInfo& info) {
166141
Napi::Env env = info.Env();
167142

168-
// Check if required parameters are provided
169-
if (info.Length() < 3) {
170-
Napi::TypeError::New(env, "Wrong number of arguments").ThrowAsJavaScriptException();
171-
return env.Undefined();
172-
}
173-
174-
if (!info[0].IsString() || !info[1].IsString() || !info[2].IsString()) {
175-
Napi::TypeError::New(env, "All arguments must be strings").ThrowAsJavaScriptException();
143+
// 使用通用的参数验证函数
144+
if (!utils::ValidateStringArguments(info, 3, "All arguments must be strings")) {
176145
return env.Undefined();
177146
}
178147

@@ -232,9 +201,8 @@ class ConsumerStartWorker : public Napi::AsyncWorker {
232201
Napi::Value RocketMQPushConsumer::Start(const Napi::CallbackInfo& info) {
233202
Napi::Env env = info.Env();
234203

235-
// Check if callback is provided and is a function
236-
if (info.Length() < 1 || !info[0].IsFunction()) {
237-
Napi::TypeError::New(env, "Function expected as first argument").ThrowAsJavaScriptException();
204+
// 使用通用的回调验证函数
205+
if (!utils::ValidateCallback(info, 0, "Function expected as first argument")) {
238206
return env.Undefined();
239207
}
240208

@@ -297,9 +265,8 @@ class ConsumerShutdownWorker : public Napi::AsyncWorker {
297265
Napi::Value RocketMQPushConsumer::Shutdown(const Napi::CallbackInfo& info) {
298266
Napi::Env env = info.Env();
299267

300-
// Check if callback is provided and is a function
301-
if (info.Length() < 1 || !info[0].IsFunction()) {
302-
Napi::TypeError::New(env, "Function expected as first argument").ThrowAsJavaScriptException();
268+
// 使用通用的回调验证函数
269+
if (!utils::ValidateCallback(info, 0, "Function expected as first argument")) {
303270
return env.Undefined();
304271
}
305272

@@ -605,9 +572,8 @@ class ConsumerMessageListener : public rocketmq::MessageListenerConcurrently {
605572
Napi::Value RocketMQPushConsumer::SetListener(const Napi::CallbackInfo& info) {
606573
Napi::Env env = info.Env();
607574

608-
// Check parameters FIRST
609-
if (info.Length() < 1 || !info[0].IsFunction()) {
610-
Napi::TypeError::New(env, "Function expected as first argument").ThrowAsJavaScriptException();
575+
// 使用通用的回调验证函数
576+
if (!utils::ValidateCallback(info, 0, "Function expected as first argument")) {
611577
return env.Undefined();
612578
}
613579

src/consumer.ts

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -254,17 +254,13 @@ export class RocketMQPushConsumer extends EventEmitter {
254254
if (expression && typeof expression !== 'string') {
255255
throw new Error('Expression must be a string if provided');
256256
}
257-
// Allow subscribe in STOPPED and STARTED states
258-
// Prevent subscribe during state transitions (STARTING/STOPPING)
259-
if (this.status === Status.STARTING) {
260-
throw new Error('Cannot subscribe while consumer is starting, please wait for start to complete');
261-
}
257+
258+
// C++ 代码只检查 destroyed 和 shutting_down 状态
259+
// 允许在 STOPPED 和 STARTED 状态下订阅
262260
if (this.status === Status.STOPPING) {
263261
throw new Error('Cannot subscribe while consumer is stopping');
264262
}
265-
if (this.status === Status.STARTED) {
266-
throw new Error('Cannot subscribe while consumer is started, please shutdown first');
267-
}
263+
268264
this.core.subscribe(topic, expression);
269265
}
270266
}

src/producer.ts

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -290,12 +290,6 @@ export class RocketMQProducer {
290290

291291
this.core.send(topic, body, actualOptions, (err, status, msgId, offset) => {
292292
if (err) {
293-
// 在发送回调中也检查状态
294-
if (this.status !== Status.STARTED) {
295-
const statusName = this.getStatusName();
296-
const stateErr = new Error(`Producer state changed during send operation. Current status: ${statusName}`);
297-
return reject(stateErr);
298-
}
299293
return reject(err);
300294
}
301295

0 commit comments

Comments
 (0)