@@ -39,8 +39,9 @@ class Publisher {
39
39
40
40
class Subscriber extends events . EventEmitter {
41
41
42
- constructor ( ) {
42
+ constructor ( options ) {
43
43
super ( ) ;
44
+ this . disableFanOut = options . disableFanOut || false ;
44
45
}
45
46
46
47
subscribe ( channel ) {
@@ -72,27 +73,32 @@ class Subscriber extends events.EventEmitter {
72
73
}
73
74
74
75
_createSubscription ( topic , channel ) {
75
-
76
- const subscriptionUUID = uuid . v1 ( ) ;
77
- var subscriptionName = `${ namePrefix } -${ channel } -${ subscriptionUUID } ` ;
76
+ let subscriptionName ;
77
+ if ( ! this . disableFanOut ) {
78
+ const subscriptionUUID = uuid . v1 ( ) ;
79
+ subscriptionName = `${ namePrefix } -${ channel } -${ subscriptionUUID } ` ;
80
+ } else {
81
+ subscriptionName = `${ namePrefix } -${ channel } ` ;
82
+ }
78
83
79
84
topic . subscribe ( subscriptionName , ( err , subscription ) => {
80
-
81
85
if ( err ) {
82
86
console . error ( `Failed to create subscription ${ err } ` ) ;
83
87
return ;
84
88
}
85
89
86
90
console . log ( `Subscription ${ subscription . name } created.` ) ;
87
91
88
- function deleteSubscription ( ) {
92
+ const deleteSubscription = ( ) => {
89
93
removeListeners ( ) ;
90
- console . log ( 'Subscriber: Signal received, deleting subscription' ) ;
91
- subscription . delete ( ) . then ( ( ) => {
92
- console . log ( 'Subscriber: subscription deleted...' ) ;
93
- } , ( err ) => {
94
- console . error ( `Subscriber: Error deleting subscription` , err ) ;
95
- } ) ;
94
+ if ( ! this . disableFanOut ) {
95
+ console . log ( 'Subscriber: Signal received, deleting subscription' ) ;
96
+ subscription . delete ( ) . then ( ( ) => {
97
+ console . log ( 'Subscriber: subscription deleted...' ) ;
98
+ } , ( err ) => {
99
+ console . error ( `Subscriber: Error deleting subscription` , err ) ;
100
+ } ) ;
101
+ }
96
102
}
97
103
98
104
function messageHandler ( message ) {
@@ -121,6 +127,7 @@ class Subscriber extends events.EventEmitter {
121
127
function removeListeners ( ) {
122
128
subscription . removeListener ( 'message' , onMessage ) ;
123
129
subscription . removeListener ( 'error' , onError ) ;
130
+
124
131
process . removeListener ( 'SIGTERM' , deleteSubscription ) ;
125
132
process . removeListener ( 'SIGINT' , deleteSubscription ) ;
126
133
}
@@ -140,8 +147,8 @@ function createPublisher() {
140
147
return new Publisher ( emitter ) ;
141
148
}
142
149
143
- function createSubscriber ( ) {
144
- return new Subscriber ( ) ;
150
+ function createSubscriber ( options ) {
151
+ return new Subscriber ( options ) ;
145
152
}
146
153
147
154
module . exports = {
0 commit comments