Node.js client for Apache RocketMQ, implemented as native addons.
npm install rocketmq-nodejsNode.js 18.17.0+ / 20.3.0+ / 22.14.0+
const { Producer } = require('rocketmq-nodejs');
// Basic usage
const producer = new Producer('GID_GROUP', {
nameServer: '127.0.0.1:9876'
});
// With all options
const producer = new Producer('GID_GROUP', 'INSTANCE_NAME', {
nameServer: '127.0.0.1:9876',
groupName: 'GROUP_NAME',
compressLevel: 5, // 0-9, default 5
sendMessageTimeout: 3000, // ms, default 3000
maxMessageSize: 131072, // bytes, default 128KB
logDir: '$HOME/logs/rocketmq',
logFileNum: 3,
logFileSize: 104857600, // bytes, default 100MB
logLevel: 'info' // fatal|error|warn|info|debug|trace|num
});// With callback
producer.start((err) => {
if (err) {
console.error(err);
return;
}
console.log('Producer started');
});
// With Promise
await producer.start();// Basic send
producer.send('TP_TOPIC', 'Hello RocketMQ', (err, result) => {
if (err) {
console.error(err);
return;
}
console.log(result);
// Result example:
// {
// status: 0,
// statusStr: 'OK',
// msgId: '0101007F0000367E0000309DD68B0700',
// offset: 0
// }
});
// With options
producer.send('TP_TOPIC', 'Hello RocketMQ', {
keys: 'key1',
tags: 'TagA',
}, callback);
// With Promise
const result = await producer.send('TP_TOPIC', 'Hello RocketMQ');Send Status Codes:
status |
statusStr |
|---|---|
0 |
OK |
1 |
FLUSH_DISK_TIMEOUT |
2 |
FLUSH_SLAVE_TIMEOUT |
3 |
SLAVE_NOT_AVAILABLE |
// With callback
producer.shutdown((err) => {
if (err) {
console.error(err);
return;
}
console.log('Producer shutdown');
});
// With Promise
await producer.shutdown();const { PushConsumer } = require('rocketmq-nodejs');
// Basic usage
const consumer = new PushConsumer('GID_GROUP', {
nameServer: '127.0.0.1:9876'
});
// With all options
const consumer = new PushConsumer('GID_GROUP', 'INSTANCE_NAME', {
nameServer: '127.0.0.1:9876',
threadCount: 3, // Number of consumer threads
maxBatchSize: 32, // Max batch size for consuming messages
maxReconsumeTimes: 16, // Max retries before the message is dropped (default 16)
logDir: '$HOME/logs/rocketmq',
logFileNum: 3,
logFileSize: 104857600, // bytes, default 100MB
logLevel: 'info' // fatal|error|warn|info|debug|trace|num
});// With callback
consumer.start((err) => {
if (err) {
console.error(err);
return;
}
console.log('Consumer started');
});
// With Promise
await consumer.start();// Subscribe to all messages
consumer.subscribe('TP_TOPIC', '*');
// Subscribe with expression
consumer.subscribe('TP_TOPIC', 'TagA');consumer.on('message', (msg, ack) => {
console.log(msg);
// Message example:
// {
// topic: 'TopicTest',
// tags: 'TagA',
// keys: 'key1',
// body: 'Hello RocketMQ',
// msgId: '0101007F0000367E0000339DD68B0800'
// }
// Acknowledge success
ack.done();
// Or acknowledge failure
// ack.done(false);
});// With callback
consumer.shutdown((err) => {
if (err) {
console.error(err);
return;
}
console.log('Consumer shutdown');
});
// With Promise
await consumer.shutdown();For Aliyun RocketMQ, additional configuration is required:
// Producer
const producer = new Producer('GID_GROUP', {
nameServer: 'onsaddr.cn-hangzhou.mq.aliyuncs.com:80'
});
producer.setSessionCredentials(
'YOUR_ACCESS_KEY', // Access Key ID
'YOUR_SECRET_KEY', // Access Key Secret
'ALIYUN' // Channel
);
// Consumer
const consumer = new PushConsumer('GID_GROUP', {
nameServer: 'onsaddr.cn-hangzhou.mq.aliyuncs.com:80'
});
consumer.setSessionCredentials(
'YOUR_ACCESS_KEY', // Access Key ID
'YOUR_SECRET_KEY', // Access Key Secret
'ALIYUN' // Channel
);