Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 116 additions & 0 deletions integration/microservices/e2e/binary-redis.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
import { INestApplication } from '@nestjs/common';
import {
ClientRedis,
ClientsModule,
Deserializer,
IncomingResponse,
Transport,
} from '@nestjs/microservices';
import { Test } from '@nestjs/testing';
import { expect } from 'chai';
import { lastValueFrom } from 'rxjs';
import Redis from 'ioredis';

class BinaryDeserializer implements Deserializer<Buffer, IncomingResponse> {
deserialize(value: Buffer): IncomingResponse {
const firstSeparatorIndex = value.indexOf(':');
const secondSeparatorIndex = value.indexOf(':', firstSeparatorIndex + 1);

return {
id: value.subarray(0, firstSeparatorIndex).toString(),
isDisposed: true,
err: null,
response: value.subarray(secondSeparatorIndex + 1),
};
}
}

describe('REDIS transport', () => {
let app: INestApplication;
let client: ClientRedis;
let pub: Redis;
let sub: Redis;

beforeEach(async () => {
const module = await Test.createTestingModule({
imports: [
ClientsModule.register({
clients: [
{
name: 'REDIS_SERVICE',
transport: Transport.REDIS,
options: {
returnBuffers: true,
host: '0.0.0.0',
port: 6379,
deserializer: new BinaryDeserializer(),
},
},
],
}),
],
}).compile();

app = module.createNestApplication();

pub = new Redis();
sub = new Redis();

await sub.subscribe('binary', (_, __) => {});

sub.on('message', async (channel, message) => {
const data = JSON.parse(message);
const delay = data.data === 'slow' ? 25 : 0;
const responseBody =
data.data === 'bytes'
? Buffer.from([0, 1, 2, 3, 255])
: Buffer.from(`${data.data}-replied`);
const response = Buffer.concat([
Buffer.from(`${data.id}:${channel}:`),
responseBody,
]);

if (delay > 0) {
await new Promise(resolve => setTimeout(resolve, delay));
}

await pub.publish(`${channel}.reply`, response);
});

client = app.get<ClientRedis>('REDIS_SERVICE');

await client.connect();

await app.init();
});

it('should return a raw binary payload', async () => {
const data = await lastValueFrom(client.send('binary', 'data'));
expect(Buffer.isBuffer(data)).to.be.true;
expect(data).to.deep.equal(Buffer.from('data-replied'));
});

it('should route concurrent raw binary replies to the matching request', async () => {
const [slowResponse, fastResponse] = await Promise.all([
lastValueFrom(client.send('binary', 'slow')),
lastValueFrom(client.send('binary', 'fast')),
]);
expect(Buffer.isBuffer(slowResponse)).to.be.true;
expect(Buffer.isBuffer(fastResponse)).to.be.true;
expect(slowResponse).to.deep.equal(Buffer.from('slow-replied'));
expect(fastResponse).to.deep.equal(Buffer.from('fast-replied'));
});

it('should preserve non-utf8 bytes in the raw binary payload', async () => {
const data = await lastValueFrom(client.send('binary', 'bytes'));
expect(Buffer.isBuffer(data)).to.be.true;
expect(data).to.deep.equal(Buffer.from([0, 1, 2, 3, 255]));
});

afterEach(async () => {
await app.close();
await client.close();
await pub.quit();
await sub.quit();
});
});
31 changes: 28 additions & 3 deletions packages/microservices/client/client-redis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ import { ClientProxy } from './client-proxy';
// type Redis = import('ioredis').Redis;
type Redis = any;

type RedisOutputOptions = {
returnBuffers?: boolean;
};

let redisPackage = {} as any;

/**
Expand All @@ -34,7 +38,10 @@ export class ClientRedis extends ClientProxy<RedisEvents, RedisStatus> {
callback: RedisEvents[keyof RedisEvents];
}> = [];

constructor(protected readonly options: Required<RedisOptions>['options']) {
constructor(
protected readonly options: Required<RedisOptions>['options'] &
RedisOutputOptions,
) {
super();

redisPackage = loadPackage('ioredis', ClientRedis.name, () =>
Expand Down Expand Up @@ -139,7 +146,10 @@ export class ClientRedis extends ClientProxy<RedisEvents, RedisStatus> {

if (!this.wasInitialConnectionSuccessful) {
this.wasInitialConnectionSuccessful = true;
this.subClient.on('message', this.createResponseCallback());
this.subClient.on(
this.options.returnBuffers ? 'messageBuffer' : 'message',
this.createResponseCallback(),
);
}
});
}
Expand Down Expand Up @@ -223,12 +233,27 @@ export class ClientRedis extends ClientProxy<RedisEvents, RedisStatus> {
buffer: string,
) => Promise<void> {
return async (channel: string, buffer: string) => {
const packet = JSON.parse(buffer);
let packet: any;
try {
packet = JSON.parse(buffer);
} catch (err) {
this.logger.debug(
'Redis response packet is not in json format, bypassing...',
);
packet = buffer;
}
const { err, response, isDisposed, id } =
await this.deserializer.deserialize(packet);

const callback = this.routingMap.get(id);
if (!callback) {
if (Buffer.isBuffer(buffer))
this.logger.debug(
'You have to parse your buffer on your own to get id from it, because it is not in json format',
);
this.logger.debug(
'No matching callback found for Redis response packet with id: ' + id,
);
return;
}
if (isDisposed || err) {
Expand Down
82 changes: 82 additions & 0 deletions packages/microservices/test/client/client-redis.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,32 @@ describe('ClientRedis', () => {
expect(callback.called).to.be.false;
});
});
describe('custom binary format (not json)', () => {
it('should use buffer directly without parsing it as json', async () => {
const clientWithBuffers = new ClientRedis({ returnBuffers: true });
const callback = sinon.spy();
const str = `${responseMessage.id}|${responseMessage.response}`;
const bufferMessage = Buffer.from(str);
sinon
.stub(Reflect.get(clientWithBuffers, 'deserializer'), 'deserialize')
.resolves({
...responseMessage,
response: bufferMessage,
});
const subscription = clientWithBuffers.createResponseCallback();

clientWithBuffers['routingMap'].set(responseMessage.id, callback);
await subscription('channel', bufferMessage as any);

expect(callback.called).to.be.true;
expect(
callback.calledWith({
err: undefined,
response: bufferMessage,
}),
).to.be.true;
});
});
});
describe('close', () => {
const untypedClient = client as any;
Expand Down Expand Up @@ -303,6 +329,62 @@ describe('ClientRedis', () => {
client.registerReadyListener(emitter as any);
expect(callback.getCall(0).args[0]).to.be.eql(RedisEventsMap.READY);
});
it('should register "message" event when returnBuffers is not set', () => {
const onSpy = sinon.spy();
const client = new ClientRedis({});
const untypedClient = client as any;
const emitter = {
on: onSpy,
};

untypedClient.wasInitialConnectionSuccessful = false;
untypedClient.subClient = emitter;

client.registerReadyListener(emitter as any);
const readyHandler = onSpy.getCall(0).args[1];
readyHandler();

expect(onSpy.calledTwice).to.be.true;
expect(onSpy.getCall(1).args[0]).to.equal('message');
});
it('should register "message" event when returnBuffers is false', () => {
const onSpy = sinon.spy();
const client = new ClientRedis({ returnBuffers: false });
const untypedClient = client as any;

const emitter = {
on: onSpy,
};

untypedClient.wasInitialConnectionSuccessful = false;
untypedClient.subClient = emitter;

client.registerReadyListener(emitter as any);
const readyHandler = onSpy.getCall(0).args[1];
readyHandler();

expect(onSpy.calledTwice).to.be.true;
expect(onSpy.getCall(1).args[0]).to.equal('message');
});
it('should register "messageBuffer" event when returnBuffers is true', () => {
const onSpy = sinon.spy();
const clientWithBuffers = new ClientRedis({ returnBuffers: true });
const untypedClientWithBuffers = clientWithBuffers as any;

const emitter = {
on: onSpy,
};

untypedClientWithBuffers.wasInitialConnectionSuccessful = false;
untypedClientWithBuffers.subClient = emitter;

clientWithBuffers.registerReadyListener(emitter as any);
const readyHandler = onSpy.getCall(0).args[1];
readyHandler();

expect(onSpy.calledTwice).to.be.true;
expect(onSpy.getCall(1).args[0]).to.equal('messageBuffer');
});
});
describe('registerReconnectListener', () => {
it('should bind reconnect event handler', () => {
Expand Down