Skip to content

Latest commit

Β 

History

History
457 lines (380 loc) Β· 11.1 KB

File metadata and controls

457 lines (380 loc) Β· 11.1 KB

🐰 RabbitMQ Integration - User Registration

βœ… Đã Được Implement

Tính Năng

Khi user Δ‘Δƒng kΓ½ (register), hệ thα»‘ng tα»± Δ‘α»™ng push thΓ΄ng tin user lΓͺn RabbitMQ queue.

πŸ“‹ Flow HoαΊ‘t Động

1. User POST /auth/register
   ↓
2. Server tαΊ‘o user mα»›i
   ↓
3. Hash password vα»›i bcrypt
   ↓
4. LΖ°u user vΓ o database (in-memory)
   ↓
5. 🐰 PUSH DATA TO RABBITMQ
   ↓
6. Generate JWT token
   ↓
7. Return response to user

πŸ’» Implementation Code

File: src/auth/auth.service.ts

async register(registerDto: RegisterDto): Promise<AuthResponseDto> {
  this.logger.log(`New user registration: ${registerDto.email}`);

  // 1. Create user
  const user = await this.usersService.create(
    registerDto.email,
    registerDto.username,
    registerDto.password,
  );

  // 2. 🐰 PUSH USER INFO TO RABBITMQ
  try {
    const exchange = this.configService.get<string>(
      'rabbitmq.exchange.validateJson',
    );
    const routingKey = this.configService.get<string>(
      'rabbitmq.routing.topicValidateJson',
    );

    await this.rabbitmqProducerService.publish(exchange, routingKey, {
      event: 'user.registered',
      data: {
        id: user.id,
        email: user.email,
        username: user.username,
        createdAt: user.createdAt,
      },
      timestamp: new Date(),
    });

    this.logger.log(`User registration sent to RabbitMQ: ${user.email}`);
  } catch (error) {
    this.logger.error(
      `Failed to send registration to RabbitMQ: ${error.message}`,
    );
    // Continue even if RabbitMQ fails - khΓ΄ng block user registration
  }

  // 3. Generate JWT token and return
  const payload = { sub: user.id, email: user.email, username: user.username };
  const accessToken = this.jwtService.sign(payload);

  return {
    accessToken,
    user: {
      id: user.id,
      email: user.email,
      username: user.username,
    },
  };
}

πŸ“¦ Message Format

Khi user Δ‘Δƒng kΓ½, message được push lΓͺn RabbitMQ cΓ³ format:

{
  "event": "user.registered",
  "data": {
    "id": "user_1729363200000_abc123",
    "email": "user@example.com",
    "username": "johndoe",
    "createdAt": "2025-10-19T23:00:00.000Z"
  },
  "timestamp": "2025-10-19T23:00:00.000Z"
}

Chi tiαΊΏt fields:

Field Type Description
event string Event type: "user.registered"
data.id string User ID (unique)
data.email string User email
data.username string Username
data.createdAt Date Account creation timestamp
timestamp Date Event timestamp

πŸ”§ Configuration

Environment Variables (.env)

RABBITMQ_URL=amqps://your-rabbitmq-url
RABBITMQ_EXCHANGE_VALIDATE_JSON=validateJSON
RABBITMQ_TOPIC_EXCHANGE_VALIDATE_JSON=topic_validateJSON
RABBITMQ_ROUTING_KEY_VALIDATE_JSON=validateJSON
RABBITMQ_TOPIC_ROUTING_KEY_VALIDATE_JSON=topic_validateJSON
RABBITMQ_QUEUE_VALIDATE_JSON=topic_validateJSON

RabbitMQ Config (src/config/rabbitmq.config.ts)

import { registerAs } from '@nestjs/config';

export default registerAs('rabbitmq', () => ({
  server: process.env.RABBITMQ_URL,
  exchange: {
    validateJson: process.env.RABBITMQ_EXCHANGE_VALIDATE_JSON,
    topicValidateJson: process.env.RABBITMQ_TOPIC_EXCHANGE_VALIDATE_JSON,
  },
  routing: {
    validateJson: process.env.RABBITMQ_ROUTING_KEY_VALIDATE_JSON,
    topicValidateJson: process.env.RABBITMQ_TOPIC_ROUTING_KEY_VALIDATE_JSON,
  },
  queue: {
    topicValidateJson: process.env.RABBITMQ_QUEUE_VALIDATE_JSON,
  },
}));

πŸ§ͺ Testing

Test Register vα»›i RabbitMQ

# 1. Start server
yarn start:dev

# 2. Register new user
curl -X POST http://localhost:4000/auth/register \
  -H "Content-Type: application/json" \
  -d '{
    "email": "test@example.com",
    "username": "testuser",
    "password": "test123456"
  }'

# 3. Check server logs - you should see:
# [AuthService] New user registration: test@example.com
# [RabbitmqProducerService] Message sent successfully
# [AuthService] User registration sent to RabbitMQ: test@example.com
# [RabbitmqConsumerService] Received message: {"event":"user.registered",...}

Expected Logs

Producer (Auth Service):

[AuthService] New user registration: test@example.com
[AuthService] User registration sent to RabbitMQ: test@example.com

RabbitMQ Producer:

[RabbitmqProducerService] Message sent successfully

Consumer:

[RabbitmqConsumerService] Received message: {
  "event": "user.registered",
  "data": {
    "id": "user_xxx",
    "email": "test@example.com",
    "username": "testuser",
    "createdAt": "2025-10-19T23:00:00.000Z"
  },
  "timestamp": "2025-10-19T23:00:00.000Z"
}

πŸ“Š RabbitMQ Architecture

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚   Auth Service  β”‚
β”‚   (Producer)    β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”˜
         β”‚
         β”‚ publish()
         ↓
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚   RabbitMQ Exchange         β”‚
β”‚   (validateJSON)            β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
         β”‚
         β”‚ route to queue
         ↓
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚   RabbitMQ Queue            β”‚
β”‚   (topic_validateJSON)      β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
         β”‚
         β”‚ consume()
         ↓
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Consumer       β”‚
β”‚  Service        β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

πŸ” RabbitMQ Producer Service

File: src/rabbitmq/rabbitmq-producer.service.ts

@Injectable()
export class RabbitmqProducerService {
  private readonly logger = new Logger(RabbitmqProducerService.name);

  async publish(
    exchange: string,
    routingKey: string,
    payload: any,
  ): Promise<void> {
    try {
      const rabbitMqUrl = this.configService.get<string>('rabbitmq.server');
      const connection = await amqp.connect(rabbitMqUrl);
      const channel = await connection.createConfirmChannel();

      await channel.assertExchange(exchange, 'direct', {
        durable: true,
        autoDelete: false,
      });

      const messageBuffer = Buffer.from(JSON.stringify(payload));

      return new Promise((resolve, reject) => {
        channel.publish(
          '',
          routingKey,
          messageBuffer,
          { contentType: 'text/plain' },
          (err) => {
            if (err) {
              this.logger.error(`Failed to send message: ${err.message}`);
              reject(err);
            } else {
              this.logger.log('Message sent successfully');
              resolve();
            }
            connection.close();
          },
        );
      });
    } catch (error) {
      this.logger.error(`Error in publish: ${error.message}`);
      throw error;
    }
  }
}

🎯 Consumer Service

File: src/rabbitmq/rabbitmq-consumer.service.ts

Consumer tα»± Δ‘α»™ng lαΊ―ng nghe messages tα»« queue:

@Injectable()
export class RabbitmqConsumerService implements OnModuleInit {
  async onModuleInit() {
    await this.listen();
  }

  private async listen() {
    this.connection = await amqp.connect(rabbitMqUrl);
    this.channel = await this.connection.createChannel();

    // Assert queue
    await this.channel.assertQueue(topicRoutingKey, {
      durable: true,
      autoDelete: false,
      exclusive: false,
    });

    // Bind queue to exchange
    await this.channel.bindQueue(topicRoutingKey, routingKey, topicRoutingKey);

    // Start consuming
    this.channel.consume(
      topicRoutingKey,
      (message) => {
        if (message) {
          const content = message.content.toString();
          this.logger.log(`Received message: ${content}`);
          // Process the message here
        }
      },
      { noAck: true },
    );
  }
}

πŸš€ Demo Flow

Step-by-Step Example

1. User Registration Request:

POST /auth/register
{
  "email": "john@example.com",
  "username": "johndoe",
  "password": "securepass123"
}

2. Server Processes:

  • βœ… Validate input (email format, min length, etc.)
  • βœ… Check if email/username exists
  • βœ… Hash password with bcrypt
  • βœ… Create user record
  • βœ… Push to RabbitMQ
  • βœ… Generate JWT token

3. RabbitMQ Message:

{
  "event": "user.registered",
  "data": {
    "id": "user_1729363200000_xyz789",
    "email": "john@example.com",
    "username": "johndoe",
    "createdAt": "2025-10-19T23:00:00.000Z"
  },
  "timestamp": "2025-10-19T23:00:00.000Z"
}

4. Consumer Receives:

[RabbitmqConsumerService] Consumer connected successfully
[RabbitmqConsumerService] Received message: {"event":"user.registered",...}

5. Response to User:

{
  "accessToken": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
  "user": {
    "id": "user_1729363200000_xyz789",
    "email": "john@example.com",
    "username": "johndoe"
  }
}

⚑ Key Features

1. Non-Blocking

  • RabbitMQ publish khΓ΄ng block user registration
  • NαΊΏu RabbitMQ fail, user vαΊ«n register thΓ nh cΓ΄ng
  • Error được log nhΖ°ng khΓ΄ng throw exception

2. Automatic

  • KhΓ΄ng cαΊ§n manual trigger
  • Mα»—i registration tα»± Δ‘α»™ng push message
  • Consumer tα»± Δ‘α»™ng start khi app boot

3. Reliable

  • Durable queues
  • Persistent messages
  • Auto-reconnect on connection loss

4. Event-Driven

  • Event name: "user.registered"
  • Other services cΓ³ thể subscribe
  • Loosely coupled architecture

πŸ“ Use Cases

RabbitMQ message nΓ y cΓ³ thể được sα»­ dα»₯ng cho:

  1. Email Verification

    • Consumer nhαΊ­n message
    • Gα»­i verification email
    • Track verification status
  2. Welcome Email

    • Send welcome message
    • Onboarding instructions
  3. Analytics

    • Track user registrations
    • User demographics
    • Registration trends
  4. CRM Integration

    • Sync to CRM system
    • Create customer profile
    • Setup marketing campaigns
  5. Audit Logs

    • Log user activities
    • Compliance tracking
    • Security monitoring
  6. Notification Services

    • Push notifications
    • SMS alerts
    • Slack/Discord notifications

βœ… Summary

Feature Status Description
Auto Push on Register βœ… Messages sent automatically
Event Type βœ… "user.registered"
Message Format βœ… JSON vα»›i user data
Non-Blocking βœ… KhΓ΄ng αΊ£nh hưởng registration
Error Handling βœ… Graceful fallback
Consumer βœ… Auto-listening
Logs βœ… Detailed logging

πŸŽ‰ KαΊΏt LuαΊ­n

βœ… RabbitMQ Integration Δ‘Γ£ hoΓ n thiện!

Mα»—i khi user Δ‘Δƒng kΓ½:

  • βœ… Data được push lΓͺn RabbitMQ
  • βœ… Consumer nhαΊ­n được message
  • βœ… Event cΓ³ thể được xα»­ lΓ½ bởi cΓ‘c services khΓ‘c
  • βœ… System hoαΊ‘t Δ‘α»™ng async vΓ  reliable

Documentation Date: October 19, 2025
Feature: User Registration with RabbitMQ Integration
Status: βœ… Implemented & Working