This repository is a fork of p-fedyukovich/nestjs-google-pubsub-microservice
Pub/Sub is an asynchronous messaging service that decouples services that produce events from services that process events.
You can use Pub/Sub as messaging-oriented middleware or event ingestion and delivery for streaming analytics pipelines.
Pub/Sub offers durable message storage and real-time message delivery with high availability and consistent performance at scale
To start building Pub/Sub-based microservices, first install the required packages:
$ npm i --save @google-cloud/pubsub nestjs-google-pubsub-microserviceTo use the Pub/Sub transporter, pass the following options object to the createMicroservice() method:
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
ApplicationModule,
{
strategy: new GCPubSubServer({
topic: 'cats_topic',
subscription: 'cats_subscription',
client: {
projectId: 'microservice',
},
}),
},
);The options property is specific to the chosen transporter. The GCloud Pub/Sub transporter exposes the properties described below.
topic |
Topic name which your server subscription will belong to |
subscription |
Subscription name which your server will listen to |
replyTopic |
Topic name which your client subscription will belong to |
replySubscription |
Subscription name which your client will listen to |
noAck |
If false, manual acknowledgment mode enabled |
init |
If false, topics and subscriptions will not be created, only validated |
checkExistence |
If false, topics and subscriptions will not be checked, only used. This only applies when init is false |
client |
Additional client options (read more here) |
publisher |
Additional topic publisher options (read more here) |
subscriber |
Additional subscriber options (read more here) |
createSubscriptionOptions |
Options to create subscription if init is set to true and a subscription is needed to be created (read more here) |
autoResume |
Automatically resume publishing a message with ordering key if it fails (read more here) |
autoDeleteSubscriptionOnShutdown |
Automatically delete the subscription that is connected by the client on shutdown. If the deletion fails, it will close the subscription |
clientIdFilter |
Allows a client to only receive the response from its own request |
appendClientIdtoSubscription |
Append client id to the name of the subscription on init |
Client can be instantiated by importing GCPubSubClientModule to the root module. The clients can be registered with both the register method or the registerAsync method via useFactory.
@Module({
imports: [
GCPubSubClientModule.register([{
name: 'client-1'.
config: options
}]),
GCPubSubClientModule.registerAsync([{
name: 'client-2',
imports: [ConfigModule],
inject: [ConfigService],
useFactory: (configService: ConfigService) => {
return this.configService.get('GCPubSubClientOptions')
}
}])
]
})
export class AppModule {}The client can then be injected with @InjectGCPubSubClient decorator
@Injectable()
export class AppService {
constructor(
@InjectGCPubSubClient('client-1') private readonly client: GCPubSUbCLient,
) {}
}If the token for the client is needed for tests, the package provides a utility function getGCPubSubClientToken to retrive the provider token of the client.
const token = getGCPubSubClientToken('client-1');To fully utilize the features provided by Google PubSub, the message needs to be serialized and deserialized in a certain way to ensure the integrity of the data. Therefore, a helper class, GCPubSubMessageBuilder is available to build messages with features such as attributes, ordering key and timeouts.
Usage
this.client.send(
'pattern',
new GCPubSubMessageBuilder(data)
.setAttributes(attrs)
.setOrderingKey('orderingKey')
.setTimeout(12000)
.build(),
);data? | TData | Data of the message payload |
attributes? | TAttrs | Attributes of the payload |
orderingKey? | string | Ordering key of the message |
timeout? | number | Timeout of the message, the request will not be processed if the request exceeds the timeout when it reaches the server |
setData | (data: TData) => this | Setting the data of the message |
setAttributes | (attributes: TAttrs) => this | Setting the attributes of the payload |
setOrderingKey | (orderingKey: string) => this | Setting the ordering key of the message |
setTimeout | (timeout: number) => this | Setting the timeout value of the payload |
build | () => GCPubSubMessage | Build the message, throws error if data is empty |
In more sophisticated scenarios, you may want to access more information about the incoming request. When using the Pub/Sub transporter, you can access the GCPubSubContext object.
@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: GCPubSubContext) {
console.log(`Pattern: ${context.getPattern()}`);
}To access the original Pub/Sub message (with the attributes, data, ack and nack), use the getMessage() method of the GCPubSubContext object, as follows:
@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: GCPubSubContext) {
console.log(context.getMessage());
}To make sure a message is never lost, Pub/Sub supports message acknowledgements. An acknowledgement is sent back by the consumer to tell Pub/Sub that a particular message has been received, processed and that Pub/Sub is free to delete it. If a consumer dies (its subscription is closed, connection is closed, or TCP connection is lost) without sending an ack, Pub/Sub will understand that a message wasn't processed fully and will re-deliver it.
To enable manual acknowledgment mode, set the noAck property to false:
{
replyTopic: 'cats_topic_reply',
replySubscription: 'cats_subscription_reply',
noAck: false,
client: {
projectId: 'microservice',
},
},When manual consumer acknowledgements are turned on, we must send a proper acknowledgement from the worker to signal that we are done with a task.
@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: GCPubSubContext) {
const originalMsg = context.getMessage();
originalMsg.ack();
}