@@ -12,16 +12,6 @@ module.exports = function rabbitTopology() {
1212 msqQueue = QueueModel . rabbit . exchange . queue ( { name, durable : true } )
1313 }
1414
15- function setupQueues ( Model ) {
16- QueueModel = Model
17-
18- // Loop through all the defined queues
19- _ . forEach ( QueueModel . topology , ( handlers , queue ) => {
20- // Setup the actual queue on RabbitMQ
21- setupQueue ( queue )
22- } )
23- }
24-
2515 function setupQueueConsumer ( app , queue , definition ) {
2616 debug ( 'setupQueueConsumer' )
2717 const modelName = definition . model
@@ -41,10 +31,7 @@ module.exports = function rabbitTopology() {
4131 }
4232
4333 // Start consuming the queue
44- if ( msqQueue ) {
45- msqQueue . consume ( Method )
46- }
47-
34+ msqQueue . consume ( Method )
4835
4936 debug ( 'setupQueueConsumer: queue: %s, model: %s, method: %s' , queue , modelName , methodName )
5037 }
@@ -62,12 +49,7 @@ module.exports = function rabbitTopology() {
6249
6350 Model [ methodName ] = function queueProducer ( params ) {
6451 debug ( `${ modelName } .${ methodName } (%o)` , params )
65- if ( QueueModel . rabbit && QueueModel . rabbit . exchange ) {
66- QueueModel . rabbit . exchange . publish ( params , { key : queue } )
67- }
68- else {
69- debug ( 'setupQueueProducer: queue %s is not yet initialised' , queue )
70- }
52+ QueueModel . rabbit . exchange . publish ( params , { key : queue } )
7153 }
7254 }
7355
@@ -77,6 +59,9 @@ module.exports = function rabbitTopology() {
7759 // Loop through all the defined queues
7860 _ . forEach ( QueueModel . topology , ( handlers , queue ) => {
7961
62+ // Setup the actual queue on RabbitMQ
63+ setupQueue ( queue )
64+
8065 // Setup the consumer of this queue
8166 if ( handlers . consumer ) {
8267 setupQueueConsumer ( QueueModel . app , queue , handlers . consumer )
@@ -112,7 +97,6 @@ module.exports = function rabbitTopology() {
11297
11398 return {
11499 setupQueue,
115- setupQueues,
116100 setupTopology,
117101 setupQueueProducer,
118102 setupQueueConsumer,
0 commit comments