Skip to content

Commit b0d26f4

Browse files
committed
connect/disconnect at each flush, tested with 30K metric/second, fix #4
1 parent d62c8ff commit b0d26f4

File tree

2 files changed

+19
-18
lines changed

2 files changed

+19
-18
lines changed

README.md

+6-3
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@ Clone `statsd` and our new backend
1212
$ git clone https://github.com/statsd/statsd.git
1313
$ git clone https://github.com/uptimedog/statsd-rabbitmq-backend.git statsd/backends/statsd-rabbitmq-backend
1414
15-
$ cd statsd/backends/statsd-rabbitmq-backend
15+
$ cd statsd
16+
$ npm install
17+
$ cd backends/statsd-rabbitmq-backend
1618
$ npm install
1719
$ cd ../..
1820
```
@@ -23,7 +25,8 @@ Create a config file `config.js` like the following to use rabbitmq backend. Mor
2325
{
2426
amqp: {
2527
connection: "amqp://guest:guest@localhost:5672",
26-
queue: "metrics"
28+
queue: "metrics",
29+
durable: false
2730
},
2831

2932
backends: [ "./backends/statsd-rabbitmq-backend" ]
@@ -57,7 +60,7 @@ var open = require('amqplib').connect("amqp://guest:guest@localhost:5672");
5760
open.then(function(conn) {
5861
return conn.createChannel();
5962
}).then(function(ch) {
60-
return ch.assertQueue(q).then(function(ok) {
63+
return ch.assertQueue(q, {durable: false}).then(function(ok) {
6164
return ch.consume(q, function(msg) {
6265
if (msg !== null) {
6366
console.log(msg.content.toString());

statsd-rabbitmq-backend/index.js

+13-15
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@
1010
*
1111
* amqp.connection: RabbitMQ Connection eg. amqp://guest:guest@localhost:5672
1212
* amqp.queue: RabbitMQ Exchange eg. metrics
13+
* ampq.durable: Whether the queue will survive a broker restart
1314
*
1415
*/
15-
var util = require('util');
1616

1717

1818
function RabbitmqBackend(startupTime, config, emitter) {
@@ -21,8 +21,6 @@ function RabbitmqBackend(startupTime, config, emitter) {
2121
this.lastException = startupTime;
2222
this.config = config;
2323

24-
this.connection = require('amqplib').connect(this.config.amqp.connection);
25-
2624
// attach
2725
emitter.on('flush', function(timestamp, metrics) {
2826
self.flush(timestamp, metrics);
@@ -73,21 +71,21 @@ RabbitmqBackend.prototype.flush = function(timestamp, metrics) {
7371

7472
var queue = this.config.amqp.queue;
7573
var msg = JSON.stringify(metric);
74+
var durable = this.config.amqp.durable;
7675

77-
console.log('Sending metrics ', msg);
76+
console.log('Attempt to send metrics ', msg);
7877

7978
// Publish
80-
this.connection.then(function(conn) {
81-
if (this.ch) {
82-
return this.ch;
83-
}
84-
this.ch = conn.createChannel();
85-
return this.ch;
86-
}).then(function(ch) {
87-
return ch.assertQueue(queue).then(function(ok) {
88-
console.log('Sent ', msg);
89-
return ch.sendToQueue(queue, Buffer.from(msg));
90-
});
79+
require('amqplib').connect(this.config.amqp.connection).then(function(conn) {
80+
return conn.createChannel().then(function(ch) {
81+
var ok = ch.assertQueue(queue, {durable: durable});
82+
83+
return ok.then(function(_qok) {
84+
ch.sendToQueue(queue, Buffer.from(msg));
85+
console.log("Sent ", msg);
86+
return ch.close();
87+
});
88+
}).finally(function() { conn.close(); });
9189
}).catch(console.warn);
9290
};
9391

0 commit comments

Comments
 (0)