Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions integration/microservices/e2e/fanout-exchange-rmq.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import { INestApplication, INestMicroservice } from '@nestjs/common';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';
import { Test } from '@nestjs/testing';
import * as request from 'supertest';
import { RMQFanoutExchangeProducerController } from '../src/rmq/fanout-exchange-producer-rmq.controller';
import { RMQFanoutExchangeConsumerController } from '../src/rmq/fanout-exchange-consumer-rmq.controller';

describe('RabbitMQ transport (Fanout Exchange)', () => {
let server: any;
let appProducer: INestApplication;
let appConsumer: INestMicroservice;

beforeEach(async () => {
const producerModule = await Test.createTestingModule({
controllers: [RMQFanoutExchangeProducerController],
}).compile();
const consumerModule = await Test.createTestingModule({
controllers: [RMQFanoutExchangeConsumerController],
}).compile();

appProducer = producerModule.createNestApplication();
server = appProducer.getHttpAdapter().getInstance();

appConsumer = consumerModule.createNestMicroservice<MicroserviceOptions>({
transport: Transport.RMQ,
options: {
urls: [`amqp://0.0.0.0:5672`],
queue: '',
exchange: 'test.fanout',
exchangeType: 'fanout',
queueOptions: {
exclusive: true,
},
},
});
await Promise.all([appProducer.init(), appConsumer.listen()]);
});

it(`should send message to fanout exchange`, async () => {
await request(server).get('/fanout-exchange').expect(200, 'ping/pong');
});

afterEach(async () => {
await Promise.all([appProducer.close(), appConsumer.close()]);
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { Controller } from '@nestjs/common';
import { Ctx, MessagePattern, RmqContext } from '@nestjs/microservices';

@Controller()
export class RMQFanoutExchangeConsumerController {
constructor() {}

@MessagePattern('ping')
handleTopicExchange(@Ctx() ctx: RmqContext): string {
return ctx.getPattern() + '/pong';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import { Controller, Get } from '@nestjs/common';
import {
ClientProxy,
ClientProxyFactory,
Transport,
} from '@nestjs/microservices';
import { lastValueFrom } from 'rxjs';

@Controller()
export class RMQFanoutExchangeProducerController {
client: ClientProxy;

constructor() {
this.client = ClientProxyFactory.create({
transport: Transport.RMQ,
options: {
urls: [`amqp://localhost:5672`],
exchange: 'test.fanout',
exchangeType: 'fanout',
},
});
}

@Get('fanout-exchange')
async topicExchange() {
return lastValueFrom(this.client.send<string>('ping', 1));
}
}
28 changes: 14 additions & 14 deletions packages/microservices/client/client-rmq.ts
Original file line number Diff line number Diff line change
Expand Up @@ -204,19 +204,19 @@ export class ClientRMQ extends ClientProxy<RmqEvents, RmqStatus> {
this.getOptionsProp(this.options, 'isGlobalPrefetchCount') ||
RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT;

if (!this.noAssert) {
await channel.assertQueue(this.queue, this.queueOptions);
}

if (this.options.exchange && this.options.routingKey) {
await channel.bindQueue(
this.queue,
this.options.exchange,
this.options.routingKey,
);
}
if (!this.options.wildcards && this.options.exchangeType !== 'fanout') {
if (!this.noAssert) {
await channel.assertQueue(this.queue, this.queueOptions);
}

if (this.options.wildcards) {
if (this.options.exchange && this.options.routingKey) {
await channel.bindQueue(
this.queue,
this.options.exchange,
this.options.exchangeType === 'fanout' ? '' : this.options.routingKey,
);
}
} else {
const exchange = this.getOptionsProp(
this.options,
'exchange',
Expand Down Expand Up @@ -391,7 +391,7 @@ export class ClientRMQ extends ClientProxy<RmqEvents, RmqStatus> {
correlationId,
};

if (this.options.wildcards) {
if (this.options.wildcards || this.options.exchangeType === 'fanout') {
const stringifiedPattern = isString(message.pattern)
? message.pattern
: JSON.stringify(message.pattern);
Expand Down Expand Up @@ -443,7 +443,7 @@ export class ClientRMQ extends ClientProxy<RmqEvents, RmqStatus> {
const errorCallback = (err: unknown) =>
err ? reject(err as Error) : resolve();

return this.options.wildcards
return this.options.wildcards || this.options.exchangeType === 'fanout'
? this.channel!.publish(
// The exchange is the same as the queue when wildcards are enabled
// and the exchange is not explicitly set
Expand Down
24 changes: 18 additions & 6 deletions packages/microservices/server/server-rmq.ts
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,16 @@ export class ServerRMQ extends Server<RmqEvents, RmqStatus> {
this.queueOptions.noAssert ??
RQM_DEFAULT_NO_ASSERT;

if (!noAssert) {
await channel.assertQueue(this.queue, this.queueOptions);
let createdQueue: string;

if (this.queue === RQM_DEFAULT_QUEUE || !noAssert) {
const { queue } = await channel.assertQueue(
this.queue,
this.queueOptions,
);
createdQueue = queue;
} else {
createdQueue = this.queue;
}

const isGlobalPrefetchCount = this.getOptionsProp(
Expand Down Expand Up @@ -223,15 +231,19 @@ export class ServerRMQ extends Server<RmqEvents, RmqStatus> {
arguments: this.getOptionsProp(this.options, 'exchangeArguments', {}),
});

if (this.options.routingKey) {
await channel.bindQueue(this.queue, exchange, this.options.routingKey);
if (this.options.routingKey || this.options.exchangeType === 'fanout') {
await channel.bindQueue(
createdQueue,
exchange,
this.options.exchangeType === 'fanout' ? '' : this.options.routingKey,
);
}

if (this.options.wildcards) {
const routingKeys = Array.from(this.getHandlers().keys());
await Promise.all(
routingKeys.map(routingKey =>
channel.bindQueue(this.queue, exchange, routingKey),
channel.bindQueue(createdQueue, exchange, routingKey),
),
);

Expand All @@ -243,7 +255,7 @@ export class ServerRMQ extends Server<RmqEvents, RmqStatus> {

await channel.prefetch(prefetchCount, isGlobalPrefetchCount);
channel.consume(
this.queue,
createdQueue,
(msg: Record<string, any> | null) => this.handleMessage(msg!, channel),
{
noAck: this.noAck,
Expand Down
51 changes: 50 additions & 1 deletion packages/microservices/test/client/client-rmq.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ describe('ClientRMQ', function () {

describe('setupChannel', () => {
const queue = 'test';
const exchange = 'test.exchange';
const queueOptions = {};
const isGlobalPrefetchCount = true;
const prefetchCount = 10;
Expand All @@ -134,6 +135,8 @@ describe('ClientRMQ', function () {
channel = {
assertQueue: sinon.spy(() => ({})),
prefetch: sinon.spy(),
bindQueue: sinon.spy(),
assertExchange: sinon.spy(),
};
consumeStub = sinon.stub(client, 'consumeChannel').callsFake(() => null!);
});
Expand All @@ -152,6 +155,28 @@ describe('ClientRMQ', function () {
await client.setupChannel(channel, () => null);
expect(channel.assertQueue.called).not.to.be.true;
});
it('should not call "assertQueue" when exchangeType is fanout', async () => {
untypedClient['options']['exchangeType'] = 'fanout';
untypedClient['options']['exchange'] = exchange;
await client.setupChannel(channel, () => null);
expect(channel.assertQueue.called).not.to.be.true;
});
it('should not call "assertQueue" when wildcards is true', async () => {
untypedClient['options']['wildcards'] = true;
await client.setupChannel(channel, () => null);
expect(channel.assertQueue.called).not.to.be.true;
});
it('should not call "bindQueue" when exchangeType is fanout', async () => {
untypedClient['options']['exchangeType'] = 'fanout';
untypedClient['options']['exchange'] = exchange;
await client.setupChannel(channel, () => null);
expect(channel.bindQueue.called).not.to.be.true;
});
it('should not call "bindQueue" when wildcards is true', async () => {
untypedClient['options']['wildcards'] = true;
await client.setupChannel(channel, () => null);
expect(channel.bindQueue.called).not.to.be.true;
});
it('should call "prefetch" with prefetchCount and "isGlobalPrefetchCount"', async () => {
await client.setupChannel(channel, () => null);
expect(channel.prefetch.calledWith(prefetchCount, isGlobalPrefetchCount))
Expand Down Expand Up @@ -183,9 +208,11 @@ describe('ClientRMQ', function () {

describe('publish', () => {
const pattern = 'test';
const exchange = 'test.exchange';
let msg: ReadPacket;
let connectSpy: sinon.SinonSpy,
sendToQueueStub: sinon.SinonStub,
publishStub: sinon.SinonStub,
eventSpy: sinon.SinonSpy;

beforeEach(() => {
Expand All @@ -196,9 +223,11 @@ describe('ClientRMQ', function () {
connectSpy = sinon.spy(client, 'connect');
eventSpy = sinon.spy();
sendToQueueStub = sinon.stub().callsFake(() => ({ catch: sinon.spy() }));
publishStub = sinon.stub().callsFake(() => ({ catch: sinon.spy() }));

client['channel'] = {
sendToQueue: sendToQueueStub,
publish: publishStub,
};
client['responseEmitter'] = new EventEmitter();
client['responseEmitter'].on(pattern, eventSpy);
Expand All @@ -214,6 +243,14 @@ describe('ClientRMQ', function () {
expect(sendToQueueStub.getCall(0).args[0]).to.be.eql(client['queue']);
});
});
it('should send message to exchange when exchangeType is fanout', async () => {
untypedClient['options']['exchangeType'] = 'fanout';
untypedClient['options']['exchange'] = exchange;
client['publish'](msg, () => {
expect(publishStub.called).to.be.true;
expect(publishStub.getCall(0).args[0]).to.be.eql(exchange);
});
});

it('should send buffer from stringified message', () => {
client['publish'](msg, () => {
Expand Down Expand Up @@ -380,16 +417,19 @@ describe('ClientRMQ', function () {
});
describe('dispatchEvent', () => {
let msg: ReadPacket;
let sendToQueueStub: sinon.SinonStub, channel;
const exchange = 'test.exchange';
let sendToQueueStub: sinon.SinonStub, publishStub: sinon.SinonStub, channel;

beforeEach(() => {
client = new ClientRMQ({});
untypedClient = client as any;

msg = { pattern: 'pattern', data: 'data' };
sendToQueueStub = sinon.stub();
publishStub = sinon.stub();
channel = {
sendToQueue: sendToQueueStub,
publish: publishStub,
};
untypedClient.channel = channel;
});
Expand All @@ -400,6 +440,15 @@ describe('ClientRMQ', function () {

expect(sendToQueueStub.called).to.be.true;
});
it('should publish packet to exchange when exchangeType is fanout', async () => {
untypedClient['options']['exchangeType'] = 'fanout';
untypedClient['options']['exchange'] = exchange;
publishStub.callsFake((a, b, c, d, f) => f());
await client['dispatchEvent'](msg);

expect(publishStub.called).to.be.true;
expect(publishStub.getCall(0).args[0]).to.be.eql(exchange);
});
it('should throw error', async () => {
sendToQueueStub.callsFake((a, b, c, d) => d(new Error()));
client['dispatchEvent'](msg).catch(err =>
Expand Down
26 changes: 24 additions & 2 deletions packages/microservices/test/server/server-rmq.spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { assert, expect } from 'chai';
import * as sinon from 'sinon';
import { NO_MESSAGE_HANDLER } from '../../constants';
import { NO_MESSAGE_HANDLER, RQM_DEFAULT_QUEUE } from '../../constants';
import { RmqContext } from '../../ctx-host';
import { ServerRMQ } from '../../server/server-rmq';
import { objectToMap } from './utils/object-to-map';
Expand Down Expand Up @@ -179,6 +179,7 @@ describe('ServerRMQ', () => {
});
describe('setupChannel', () => {
const queue = 'test';
const exchange = 'test.exchange';
const queueOptions = {};
const isGlobalPrefetchCount = true;
const prefetchCount = 10;
Expand All @@ -194,9 +195,11 @@ describe('ServerRMQ', () => {
};

channel = {
assertQueue: sinon.spy(() => ({})),
assertQueue: sinon.spy(() => ({ queue })),
prefetch: sinon.spy(),
consume: sinon.spy(),
assertExchange: sinon.spy(() => ({})),
bindQueue: sinon.spy(),
};
});
it('should call "assertQueue" with queue and queue options when noAssert is false', async () => {
Expand All @@ -205,6 +208,13 @@ describe('ServerRMQ', () => {
await server.setupChannel(channel, () => null);
expect(channel.assertQueue.calledWith(queue, queueOptions)).to.be.true;
});
it('should call "assertQueue" with queue and queue options when queue is default queue', async () => {
server['queue' as any] = RQM_DEFAULT_QUEUE;

await server.setupChannel(channel, () => null);
expect(channel.assertQueue.calledWith(RQM_DEFAULT_QUEUE, queueOptions)).to
.be.true;
});
it('should not call "assertQueue" when noAssert is true', async () => {
server['options' as any] = {
...(server as any)['options'],
Expand All @@ -214,6 +224,18 @@ describe('ServerRMQ', () => {
await server.setupChannel(channel, () => null);
expect(channel.assertQueue.called).not.to.be.true;
});
it('should call "bindQueue" with exchangeType is fanout', async () => {
const namedQueue = 'exclusive-queue-name';
channel.assertQueue = sinon.spy(() => ({ queue: namedQueue }));
server['queue' as any] = RQM_DEFAULT_QUEUE;
server['options' as any] = {
...(server as any)['options'],
exchangeType: 'fanout',
exchange: exchange,
};
await server.setupChannel(channel, () => null);
expect(channel.bindQueue.calledWith(namedQueue, exchange, '')).to.be.true;
});
it('should call "prefetch" with prefetchCount and "isGlobalPrefetchCount"', async () => {
await server.setupChannel(channel, () => null);
expect(channel.prefetch.calledWith(prefetchCount, isGlobalPrefetchCount))
Expand Down