diff --git a/integration/microservices/e2e/fanout-exchange-rmq.spec.ts b/integration/microservices/e2e/fanout-exchange-rmq.spec.ts new file mode 100644 index 00000000000..c271f88bda1 --- /dev/null +++ b/integration/microservices/e2e/fanout-exchange-rmq.spec.ts @@ -0,0 +1,46 @@ +import { INestApplication, INestMicroservice } from '@nestjs/common'; +import { MicroserviceOptions, Transport } from '@nestjs/microservices'; +import { Test } from '@nestjs/testing'; +import * as request from 'supertest'; +import { RMQFanoutExchangeProducerController } from '../src/rmq/fanout-exchange-producer-rmq.controller'; +import { RMQFanoutExchangeConsumerController } from '../src/rmq/fanout-exchange-consumer-rmq.controller'; + +describe('RabbitMQ transport (Fanout Exchange)', () => { + let server: any; + let appProducer: INestApplication; + let appConsumer: INestMicroservice; + + beforeEach(async () => { + const producerModule = await Test.createTestingModule({ + controllers: [RMQFanoutExchangeProducerController], + }).compile(); + const consumerModule = await Test.createTestingModule({ + controllers: [RMQFanoutExchangeConsumerController], + }).compile(); + + appProducer = producerModule.createNestApplication(); + server = appProducer.getHttpAdapter().getInstance(); + + appConsumer = consumerModule.createNestMicroservice({ + transport: Transport.RMQ, + options: { + urls: [`amqp://0.0.0.0:5672`], + queue: '', + exchange: 'test.fanout', + exchangeType: 'fanout', + queueOptions: { + exclusive: true, + }, + }, + }); + await Promise.all([appProducer.init(), appConsumer.listen()]); + }); + + it(`should send message to fanout exchange`, async () => { + await request(server).get('/fanout-exchange').expect(200, 'ping/pong'); + }); + + afterEach(async () => { + await Promise.all([appProducer.close(), appConsumer.close()]); + }); +}); diff --git a/integration/microservices/src/rmq/fanout-exchange-consumer-rmq.controller.ts b/integration/microservices/src/rmq/fanout-exchange-consumer-rmq.controller.ts new file mode 100644 index 00000000000..ebfc62e6e29 --- /dev/null +++ b/integration/microservices/src/rmq/fanout-exchange-consumer-rmq.controller.ts @@ -0,0 +1,12 @@ +import { Controller } from '@nestjs/common'; +import { Ctx, MessagePattern, RmqContext } from '@nestjs/microservices'; + +@Controller() +export class RMQFanoutExchangeConsumerController { + constructor() {} + + @MessagePattern('ping') + handleTopicExchange(@Ctx() ctx: RmqContext): string { + return ctx.getPattern() + '/pong'; + } +} diff --git a/integration/microservices/src/rmq/fanout-exchange-producer-rmq.controller.ts b/integration/microservices/src/rmq/fanout-exchange-producer-rmq.controller.ts new file mode 100644 index 00000000000..78c1946919a --- /dev/null +++ b/integration/microservices/src/rmq/fanout-exchange-producer-rmq.controller.ts @@ -0,0 +1,28 @@ +import { Controller, Get } from '@nestjs/common'; +import { + ClientProxy, + ClientProxyFactory, + Transport, +} from '@nestjs/microservices'; +import { lastValueFrom } from 'rxjs'; + +@Controller() +export class RMQFanoutExchangeProducerController { + client: ClientProxy; + + constructor() { + this.client = ClientProxyFactory.create({ + transport: Transport.RMQ, + options: { + urls: [`amqp://localhost:5672`], + exchange: 'test.fanout', + exchangeType: 'fanout', + }, + }); + } + + @Get('fanout-exchange') + async topicExchange() { + return lastValueFrom(this.client.send('ping', 1)); + } +} diff --git a/packages/microservices/client/client-rmq.ts b/packages/microservices/client/client-rmq.ts index 95de82dee58..b62602802c2 100644 --- a/packages/microservices/client/client-rmq.ts +++ b/packages/microservices/client/client-rmq.ts @@ -204,19 +204,19 @@ export class ClientRMQ extends ClientProxy { this.getOptionsProp(this.options, 'isGlobalPrefetchCount') || RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT; - if (!this.noAssert) { - await channel.assertQueue(this.queue, this.queueOptions); - } - - if (this.options.exchange && this.options.routingKey) { - await channel.bindQueue( - this.queue, - this.options.exchange, - this.options.routingKey, - ); - } + if (!this.options.wildcards && this.options.exchangeType !== 'fanout') { + if (!this.noAssert) { + await channel.assertQueue(this.queue, this.queueOptions); + } - if (this.options.wildcards) { + if (this.options.exchange && this.options.routingKey) { + await channel.bindQueue( + this.queue, + this.options.exchange, + this.options.exchangeType === 'fanout' ? '' : this.options.routingKey, + ); + } + } else { const exchange = this.getOptionsProp( this.options, 'exchange', @@ -391,7 +391,7 @@ export class ClientRMQ extends ClientProxy { correlationId, }; - if (this.options.wildcards) { + if (this.options.wildcards || this.options.exchangeType === 'fanout') { const stringifiedPattern = isString(message.pattern) ? message.pattern : JSON.stringify(message.pattern); @@ -443,7 +443,7 @@ export class ClientRMQ extends ClientProxy { const errorCallback = (err: unknown) => err ? reject(err as Error) : resolve(); - return this.options.wildcards + return this.options.wildcards || this.options.exchangeType === 'fanout' ? this.channel!.publish( // The exchange is the same as the queue when wildcards are enabled // and the exchange is not explicitly set diff --git a/packages/microservices/server/server-rmq.ts b/packages/microservices/server/server-rmq.ts index 39a90f14485..102c6a81a0b 100644 --- a/packages/microservices/server/server-rmq.ts +++ b/packages/microservices/server/server-rmq.ts @@ -191,8 +191,16 @@ export class ServerRMQ extends Server { this.queueOptions.noAssert ?? RQM_DEFAULT_NO_ASSERT; - if (!noAssert) { - await channel.assertQueue(this.queue, this.queueOptions); + let createdQueue: string; + + if (this.queue === RQM_DEFAULT_QUEUE || !noAssert) { + const { queue } = await channel.assertQueue( + this.queue, + this.queueOptions, + ); + createdQueue = queue; + } else { + createdQueue = this.queue; } const isGlobalPrefetchCount = this.getOptionsProp( @@ -223,15 +231,19 @@ export class ServerRMQ extends Server { arguments: this.getOptionsProp(this.options, 'exchangeArguments', {}), }); - if (this.options.routingKey) { - await channel.bindQueue(this.queue, exchange, this.options.routingKey); + if (this.options.routingKey || this.options.exchangeType === 'fanout') { + await channel.bindQueue( + createdQueue, + exchange, + this.options.exchangeType === 'fanout' ? '' : this.options.routingKey, + ); } if (this.options.wildcards) { const routingKeys = Array.from(this.getHandlers().keys()); await Promise.all( routingKeys.map(routingKey => - channel.bindQueue(this.queue, exchange, routingKey), + channel.bindQueue(createdQueue, exchange, routingKey), ), ); @@ -243,7 +255,7 @@ export class ServerRMQ extends Server { await channel.prefetch(prefetchCount, isGlobalPrefetchCount); channel.consume( - this.queue, + createdQueue, (msg: Record | null) => this.handleMessage(msg!, channel), { noAck: this.noAck, diff --git a/packages/microservices/test/client/client-rmq.spec.ts b/packages/microservices/test/client/client-rmq.spec.ts index 7130d4d55e9..243ef89051e 100644 --- a/packages/microservices/test/client/client-rmq.spec.ts +++ b/packages/microservices/test/client/client-rmq.spec.ts @@ -119,6 +119,7 @@ describe('ClientRMQ', function () { describe('setupChannel', () => { const queue = 'test'; + const exchange = 'test.exchange'; const queueOptions = {}; const isGlobalPrefetchCount = true; const prefetchCount = 10; @@ -134,6 +135,8 @@ describe('ClientRMQ', function () { channel = { assertQueue: sinon.spy(() => ({})), prefetch: sinon.spy(), + bindQueue: sinon.spy(), + assertExchange: sinon.spy(), }; consumeStub = sinon.stub(client, 'consumeChannel').callsFake(() => null!); }); @@ -152,6 +155,28 @@ describe('ClientRMQ', function () { await client.setupChannel(channel, () => null); expect(channel.assertQueue.called).not.to.be.true; }); + it('should not call "assertQueue" when exchangeType is fanout', async () => { + untypedClient['options']['exchangeType'] = 'fanout'; + untypedClient['options']['exchange'] = exchange; + await client.setupChannel(channel, () => null); + expect(channel.assertQueue.called).not.to.be.true; + }); + it('should not call "assertQueue" when wildcards is true', async () => { + untypedClient['options']['wildcards'] = true; + await client.setupChannel(channel, () => null); + expect(channel.assertQueue.called).not.to.be.true; + }); + it('should not call "bindQueue" when exchangeType is fanout', async () => { + untypedClient['options']['exchangeType'] = 'fanout'; + untypedClient['options']['exchange'] = exchange; + await client.setupChannel(channel, () => null); + expect(channel.bindQueue.called).not.to.be.true; + }); + it('should not call "bindQueue" when wildcards is true', async () => { + untypedClient['options']['wildcards'] = true; + await client.setupChannel(channel, () => null); + expect(channel.bindQueue.called).not.to.be.true; + }); it('should call "prefetch" with prefetchCount and "isGlobalPrefetchCount"', async () => { await client.setupChannel(channel, () => null); expect(channel.prefetch.calledWith(prefetchCount, isGlobalPrefetchCount)) @@ -183,9 +208,11 @@ describe('ClientRMQ', function () { describe('publish', () => { const pattern = 'test'; + const exchange = 'test.exchange'; let msg: ReadPacket; let connectSpy: sinon.SinonSpy, sendToQueueStub: sinon.SinonStub, + publishStub: sinon.SinonStub, eventSpy: sinon.SinonSpy; beforeEach(() => { @@ -196,9 +223,11 @@ describe('ClientRMQ', function () { connectSpy = sinon.spy(client, 'connect'); eventSpy = sinon.spy(); sendToQueueStub = sinon.stub().callsFake(() => ({ catch: sinon.spy() })); + publishStub = sinon.stub().callsFake(() => ({ catch: sinon.spy() })); client['channel'] = { sendToQueue: sendToQueueStub, + publish: publishStub, }; client['responseEmitter'] = new EventEmitter(); client['responseEmitter'].on(pattern, eventSpy); @@ -214,6 +243,14 @@ describe('ClientRMQ', function () { expect(sendToQueueStub.getCall(0).args[0]).to.be.eql(client['queue']); }); }); + it('should send message to exchange when exchangeType is fanout', async () => { + untypedClient['options']['exchangeType'] = 'fanout'; + untypedClient['options']['exchange'] = exchange; + client['publish'](msg, () => { + expect(publishStub.called).to.be.true; + expect(publishStub.getCall(0).args[0]).to.be.eql(exchange); + }); + }); it('should send buffer from stringified message', () => { client['publish'](msg, () => { @@ -380,7 +417,8 @@ describe('ClientRMQ', function () { }); describe('dispatchEvent', () => { let msg: ReadPacket; - let sendToQueueStub: sinon.SinonStub, channel; + const exchange = 'test.exchange'; + let sendToQueueStub: sinon.SinonStub, publishStub: sinon.SinonStub, channel; beforeEach(() => { client = new ClientRMQ({}); @@ -388,8 +426,10 @@ describe('ClientRMQ', function () { msg = { pattern: 'pattern', data: 'data' }; sendToQueueStub = sinon.stub(); + publishStub = sinon.stub(); channel = { sendToQueue: sendToQueueStub, + publish: publishStub, }; untypedClient.channel = channel; }); @@ -400,6 +440,15 @@ describe('ClientRMQ', function () { expect(sendToQueueStub.called).to.be.true; }); + it('should publish packet to exchange when exchangeType is fanout', async () => { + untypedClient['options']['exchangeType'] = 'fanout'; + untypedClient['options']['exchange'] = exchange; + publishStub.callsFake((a, b, c, d, f) => f()); + await client['dispatchEvent'](msg); + + expect(publishStub.called).to.be.true; + expect(publishStub.getCall(0).args[0]).to.be.eql(exchange); + }); it('should throw error', async () => { sendToQueueStub.callsFake((a, b, c, d) => d(new Error())); client['dispatchEvent'](msg).catch(err => diff --git a/packages/microservices/test/server/server-rmq.spec.ts b/packages/microservices/test/server/server-rmq.spec.ts index d3fa260551b..2872891bc60 100644 --- a/packages/microservices/test/server/server-rmq.spec.ts +++ b/packages/microservices/test/server/server-rmq.spec.ts @@ -1,6 +1,6 @@ import { assert, expect } from 'chai'; import * as sinon from 'sinon'; -import { NO_MESSAGE_HANDLER } from '../../constants'; +import { NO_MESSAGE_HANDLER, RQM_DEFAULT_QUEUE } from '../../constants'; import { RmqContext } from '../../ctx-host'; import { ServerRMQ } from '../../server/server-rmq'; import { objectToMap } from './utils/object-to-map'; @@ -179,6 +179,7 @@ describe('ServerRMQ', () => { }); describe('setupChannel', () => { const queue = 'test'; + const exchange = 'test.exchange'; const queueOptions = {}; const isGlobalPrefetchCount = true; const prefetchCount = 10; @@ -194,9 +195,11 @@ describe('ServerRMQ', () => { }; channel = { - assertQueue: sinon.spy(() => ({})), + assertQueue: sinon.spy(() => ({ queue })), prefetch: sinon.spy(), consume: sinon.spy(), + assertExchange: sinon.spy(() => ({})), + bindQueue: sinon.spy(), }; }); it('should call "assertQueue" with queue and queue options when noAssert is false', async () => { @@ -205,6 +208,13 @@ describe('ServerRMQ', () => { await server.setupChannel(channel, () => null); expect(channel.assertQueue.calledWith(queue, queueOptions)).to.be.true; }); + it('should call "assertQueue" with queue and queue options when queue is default queue', async () => { + server['queue' as any] = RQM_DEFAULT_QUEUE; + + await server.setupChannel(channel, () => null); + expect(channel.assertQueue.calledWith(RQM_DEFAULT_QUEUE, queueOptions)).to + .be.true; + }); it('should not call "assertQueue" when noAssert is true', async () => { server['options' as any] = { ...(server as any)['options'], @@ -214,6 +224,18 @@ describe('ServerRMQ', () => { await server.setupChannel(channel, () => null); expect(channel.assertQueue.called).not.to.be.true; }); + it('should call "bindQueue" with exchangeType is fanout', async () => { + const namedQueue = 'exclusive-queue-name'; + channel.assertQueue = sinon.spy(() => ({ queue: namedQueue })); + server['queue' as any] = RQM_DEFAULT_QUEUE; + server['options' as any] = { + ...(server as any)['options'], + exchangeType: 'fanout', + exchange: exchange, + }; + await server.setupChannel(channel, () => null); + expect(channel.bindQueue.calledWith(namedQueue, exchange, '')).to.be.true; + }); it('should call "prefetch" with prefetchCount and "isGlobalPrefetchCount"', async () => { await server.setupChannel(channel, () => null); expect(channel.prefetch.calledWith(prefetchCount, isGlobalPrefetchCount))