-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathindex.js
172 lines (143 loc) · 4.18 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
/* jshint node:true */
'use strict';
var redis = require('redis');
var debug = require('debug')('redis-events');
var slice = Array.prototype.slice;
var EventEmitter = require('events').EventEmitter;
function extend(destination, source) {
Object.keys(source).forEach(function(i) {
destination[i] = destination[i] || source[i];
});
return destination;
}
function RedisEventEmitter(options) {
if (!(this instanceof RedisEventEmitter)) {
return new RedisEventEmitter(options);
}
EventEmitter.call(this);
this.options = extend(options || {}, RedisEventEmitter.defaults);
debug('Created event emitter with options: %j', options);
}
/**
Default redis options - exposed so they can be edited as a means of setting global config
*/
RedisEventEmitter.defaults = {
auth: null,
port: 6379,
host: 'localhost'
};
// node events module compatibility
RedisEventEmitter.EventEmitter = RedisEventEmitter;
// proxy to EventEmitter
['usingDomains', 'defaultMaxListeners', 'init', 'listenerCount'].forEach(function(property) {
Object.defineProperty(RedisEventEmitter, property, {
get: function() {
return EventEmitter[property];
},
set: function(val) {
EventEmitter[property] = val;
}
});
});
var proto = RedisEventEmitter.prototype = Object.create(EventEmitter.prototype);
/**
Redis and pattern options
*/
proto.options = null;
/**
Redis client used by .emit
*/
proto.publisher = null;
/**
Redis client used by .on and .once
*/
proto.subscriber = null;
/**
Creates a new redis client
*/
proto.createClient = function() {
debug('Creating redis client');
var client = redis.createClient(this.options.port, this.options.host);
if (this.options.auth) {
client.auth(this.options.auth);
}
return client;
};
/**
Creates a redis subscriber if it doesn't exist and subscribes to the given event
*/
proto._subscribe = function(event) {
// lazily creates a redis client
if (!this.subscriber) {
debug('Creating subscriber');
this.subscriber = this.createClient();
// add a listener for incoming messages and emit them to local listeners
this._emitLocally();
// listen for new subscribe event on redis client and adds a message handler for that event
this.subscriber.on('subscribe', function(event) {
debug('New subscription for event: %s', event);
});
}
// only subscribes once
var listeners = EventEmitter.listenerCount(this, event);
if (!listeners) {
debug('No existing listeners - sending subscribe command');
this.subscriber.subscribe(event);
}
};
/**
Adds redis subscriber, then proxies to event emitter
*/
proto.addListener = function(event, listener) {
debug('Adding listener for %s', event);
this._subscribe(event);
EventEmitter.prototype.on.call(this, event, listener);
};
proto.on = proto.addListener;
/**
Adds redis subscriber, then proxies to event emitter
*/
proto.once = function(event, listener) {
debug('Adding one time listener for %s', event);
this._subscribe(event);
EventEmitter.prototype.once.call(this, event, listener);
};
/**
Emits an event to all the in-process attached listeners
*/
proto._emitLocally = function() {
this.subscriber.on('message', function(event, msg) {
var args = null, error = null;
debug('Received message - event: %s, msg: %s', event, msg);
try {
args = JSON.parse(msg);
} catch(e) {
error = e;
error.event = event;
error.msg = msg;
debug('Error while parsing message');
EventEmitter.prototype.emit.call(this, 'error', error);
}
// emit to all the attached listeners
args.unshift(event);
EventEmitter.prototype.emit.apply(this, args);
}.bind(this));
};
/**
Adds a publisher, then proxies to event emitter
*/
proto.emit = function(event) {
debug('Emitting event: %s', event);
// following events dont make sense to be passed onto redis subscribers
if (['error', 'newListener', 'removeListener'].indexOf(event) > -1) {
EventEmitter.prototype.emit.apply(this, arguments);
} else {
// lazily creates a redis client
this.publisher = this.publisher || this.createClient();
var args = slice.call(arguments);
args.shift(); // remove event from args
debug('Publishing event: %s, args: %j', event, args);
this.publisher.publish(event, JSON.stringify(args));
}
};
module.exports = RedisEventEmitter;