Skip to content

Feature Request: Efficient Event Publishing with NestJS and RabbitMQ #1940

@masterj3y

Description

@masterj3y

Goal

I want to implement an event publisher in NestJS that can:

  1. Publish events internally to local event handlers.
  2. Publish events externally using RabbitMQ for microservices communication.

Current Implementation

I wrote the following custom event publisher that:

  • Scans for all IEventHandler implementations using ExplorerService.
  • Registers event handlers dynamically.
  • Routes events internally or publishes them to RabbitMQ based on metadata.

Here’s the current implementation of the RabbitMQEventPublisher:

import { AmqpConnection } from '@golevelup/nestjs-rabbitmq';
import { ModuleRef, ModulesContainer, Reflector } from '@nestjs/core';
import { AsyncContext, IEvent, IEventHandler, IEventPublisher } from '@nestjs/cqrs';
import { CqrsEventMetaData } from '../decorators/cqrs-event.decorator';
import { RMQ_EVENTS_EXCHANGE_NAME } from 'infrastructure/config';
import { Logger, Type } from '@nestjs/common';
import { ExplorerService } from '@nestjs/cqrs/dist/services/explorer.service';
import { InstanceWrapper } from '@nestjs/core/injector/instance-wrapper';
import { defaultEventIdProvider } from '../helpers/default-id-provder.helper';

export class RabbitMQEventPublisher<EventBase extends IEvent = IEvent>
  implements IEventPublisher<EventBase>
{
  private readonly logger = new Logger(RabbitMQEventPublisher.name);
  private readonly eventIdProvider = defaultEventIdProvider;
  private readonly eventHandlers = new Map<
    string,
    (eventBase: EventBase) => Promise<void>
  >();

  constructor(
    private readonly reflector: Reflector,
    modulesContainer: ModulesContainer,
    private readonly moduleRef: ModuleRef,
    explorerService: ExplorerService,
    private readonly amqpConnection: AmqpConnection,
  ) {
    const modules = [...modulesContainer.values()];
    explorerService
      .flatMap<IEventHandler>(modules, (instance) =>
        explorerService.filterByMetadataKey(instance, '__eventsHandler__'),
      )
      .forEach((handler) => this.registerHandler(handler));
  }

  private registerHandler(handler: InstanceWrapper<IEventHandler<EventBase>>) {
    const typeRef = handler.metatype as Type<IEventHandler<EventBase>>;
    const events = this.reflectEvents(typeRef);

    if (!events || events.length === 0) {
      this.logger.error(`No events found for handler: ${typeRef.name}`);
      return;
    }

    events.forEach((event) => {
      const eventId = this.eventIdProvider.getEventId(event);
      const boundHandler = this.bind(handler, eventId!);
      this.eventHandlers.set(event.name, boundHandler);
      this.logger.log(`Registered event handler for: ${event.name}`);
    });
  }

  private reflectEvents(handler: Type<IEventHandler<EventBase>>): Type<EventBase>[] {
    const events = Reflect.getMetadata('__eventsHandler__', handler) || [];
    if (events.length === 0) {
      this.logger.error(`No metadata found for handler: ${handler.name}`);
    }
    return events;
  }

  async publish<TEvent extends EventBase>(
    event: TEvent,
    _dispatcherContext?: unknown,
    _asyncContext?: AsyncContext,
  ) {
    const eventMetadata = this.reflector.get<CqrsEventMetaData>(
      'metadata',
      event.constructor,
    );

    const eventData = {
      message: event,
      event: event.constructor.name,
    };

    try {
      const handler = this.eventHandlers.get(event.constructor.name);

      if (!handler) {
        this.logger.error(`No handler found for event: ${event.constructor.name}`);
        return;
      }

      this.logger.log(`Executing handler for event: ${event.constructor.name}`);
      await handler(event);

      if (eventMetadata && eventMetadata.routingKeys.length > 0) {
        for (const routingKey of eventMetadata.routingKeys) {
          this.amqpConnection.publish(RMQ_EVENTS_EXCHANGE_NAME, routingKey, eventData);
        }
      }
    } catch (e) {
      this.logger.error(`Error publishing event <${eventData.event}>`, e);
    }
  }
}

Question

Is this the best way to retrieve event handlers dynamically, or is there an existing feature in NestJS CQRS that I should use instead?

  • I am currently scanning for IEventHandler instances manually using ExplorerService and ModulesContainer.
  • Does the NestJS CQRS module already provide a built-in way to retrieve event handlers dynamically without this manual lookup?

Would appreciate any insights or alternative approaches! 🚀

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions