Skip to content

Commit b15bee8

Browse files
authored
Merge pull request #20 from p-society/bull
2 parents 950d884 + 426386a commit b15bee8

33 files changed

+462
-855
lines changed

docker/docker-compose.dev.yml

Lines changed: 0 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,6 @@
11
version: '2'
22

33
services:
4-
zookeeper:
5-
image: zookeeper:latest
6-
ports:
7-
- "2181:2181"
8-
9-
kafka:
10-
image: confluentinc/cp-kafka:latest
11-
ports:
12-
- "9092:9092"
13-
environment:
14-
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
15-
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://$PRIVATE_IP:9092 # Use the dynamic private IP here
16-
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
17-
depends_on:
18-
- zookeeper
19-
20-
kafka-ui:
21-
container_name: kafka-ui
22-
image: provectuslabs/kafka-ui:latest
23-
ports:
24-
- 8080:8080
25-
depends_on:
26-
- kafka
27-
environment:
28-
DYNAMIC_CONFIG_ENABLED: 'true'
29-
KAFKA_CLUSTERS_0_NAME: broadcast_cluster_1
30-
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
31-
324
redis:
335
image: redis/redis-stack-server:latest
346
ports:

package.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
"@faker-js/faker": "^8.3.1",
2929
"@kafkajs/confluent-schema-registry": "^2.0.1",
3030
"@nestjs-modules/mailer": "^2.0.2",
31+
"@nestjs/bullmq": "^10.2.3",
3132
"@nestjs/common": "^10.4.3",
3233
"@nestjs/config": "^3.3.0",
3334
"@nestjs/core": "^10.4.3",
@@ -43,6 +44,7 @@
4344
"@socket.io/redis-adapter": "^8.3.0",
4445
"@types/nodemailer": "^6.4.17",
4546
"bcrypt": "^5.1.1",
47+
"bullmq": "^5.34.3",
4648
"class-transformer": "^0.5.1",
4749
"class-validator": "^0.14.1",
4850
"kafkajs": "^2.2.4",

src/app.module.ts

Lines changed: 10 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,10 @@ import { EventEmitterModule } from '@nestjs/event-emitter';
1010
import { RedisModule } from './services/redis/redis.module';
1111
import { UsersModule } from './services/apis/users/users.module';
1212
import { ConfigModule, ConfigService } from '@nestjs/config';
13-
import { SendOtpModule } from './services/apis/sendOtp/sendOtp.module';
1413
import { ReactionsModule } from './services/apis/reactions/reactions.module';
15-
import { KafkaModulev2 } from './services/kafka/kafka.module';
1614
import { ProfilesModule } from './services/apis/profiles/profiles.module';
17-
import { TOPIC_NAME } from './services/kafka/consumer/consumer.service';
18-
import fs from 'fs';
19-
import os from 'os';
15+
import { QueueModule } from './services/bullmq/queue.module';
16+
import { BullModule } from '@nestjs/bullmq';
2017

2118
@Module({
2219
imports: [
@@ -25,6 +22,14 @@ import os from 'os';
2522
envFilePath: `.env`,
2623
}),
2724
RedisModule,
25+
BullModule.forRoot({
26+
connection: {
27+
host: process.env.REDIS_HOST,
28+
port: parseInt(process.env.REDIS_PORT),
29+
password: process.env.REDIS_PASSWORD,
30+
},
31+
}),
32+
QueueModule,
2833
EventEmitterModule.forRoot(),
2934
MongooseModule.forRootAsync({
3035
imports: [ConfigModule],
@@ -35,32 +40,9 @@ import os from 'os';
3540
}),
3641
AuthModule,
3742
UsersModule,
38-
SendOtpModule,
3943
AdapterModule,
4044
ProfilesModule,
4145
ReactionsModule,
42-
KafkaModulev2.register([
43-
{
44-
name: 'KAFKA_SERVICE_1',
45-
options: {
46-
client: {
47-
clientId: 'gc-broadcast-server',
48-
brokers: ['localhost:9092'],
49-
retry: {
50-
retries: 2,
51-
initialRetryTime: 30,
52-
},
53-
},
54-
consumer: {
55-
groupId: 'notification',
56-
allowAutoTopicCreation: true,
57-
},
58-
seek: {
59-
[TOPIC_NAME]: new Date('2020-05-21'),
60-
},
61-
},
62-
},
63-
]),
6446
],
6547
controllers: [AppController],
6648
providers: [
Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
import { IsNotEmpty, IsString } from 'class-validator';
2-
31
/**
42
* DTO for incoming reaction payload.
53
* This ensures the payload structure and validates fields.
@@ -9,15 +7,11 @@ export class ReactionDto {
97
* The emoji representing the reaction.
108
* @example "👍"
119
*/
12-
@IsNotEmpty({ message: 'Emoji must not be empty' })
13-
@IsString({ message: 'Emoji must be a string' })
1410
emoji: string;
1511

1612
/**
1713
* The sport associated with the reaction.
1814
* @example "football"
1915
*/
20-
@IsNotEmpty({ message: 'Sport must not be empty' })
21-
@IsString({ message: 'Sport must be a string' })
2216
sport: string;
2317
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
import { Controller, Post } from '@nestjs/common';
2+
import { ModifyBody, setCreatedBy } from 'src/decorators/ModifyBody.decorator';
3+
import { ReactionService } from './reactions.service';
4+
import { ReactionDto } from './dto/reactions.dto';
5+
@Controller('reactions')
6+
export class ReactionController {
7+
constructor(private readonly reactionService: ReactionService) {}
8+
9+
@Post()
10+
async create(@ModifyBody(setCreatedBy()) createReactionsDto: ReactionDto) {
11+
return await this.reactionService.enqueueReactions(createReactionsDto);
12+
}
13+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,13 @@
11
import { Module } from '@nestjs/common';
22
import { ReactionGateway } from '../../gateways/reactions/reactions.gateway';
33
import { ReactionService } from './reactions.service';
4+
import { PresenceModule } from 'src/services/gateways/presence/presence.module';
5+
import { ReactionController } from './reactions.controller';
46

57
@Module({
8+
imports: [PresenceModule],
69
providers: [ReactionGateway, ReactionService],
10+
exports: [ReactionService],
11+
controllers: [ReactionController],
712
})
813
export class ReactionsModule {}
Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,21 @@
11
import { Injectable } from '@nestjs/common';
2+
import { ReactionDto } from './dto/reactions.dto';
3+
import { processReaction } from './reaction.helper';
4+
import { ReactionStreamProducer } from 'src/services/bullmq/producers/reaction-stream.producer';
25

36
@Injectable()
47
export class ReactionService {
5-
/**
6-
* Save the reaction or perform other business logic.
7-
* @param reaction - Processed reaction data (already validated and transformed).
8-
*/
9-
saveReaction(reaction: { emoji: string; sport: string }): string {
10-
// Simulate saving to a database or performing other business logic
11-
console.log('Saving reaction to database:', reaction);
12-
return 'Reaction saved successfully!';
8+
constructor(
9+
private readonly reactionStreamProducer: ReactionStreamProducer,
10+
) {}
11+
async enqueueReactions(createReactionsDto: ReactionDto) {
12+
const reaction = processReaction(createReactionsDto);
13+
await this.reactionStreamProducer.pushForAsyncStream(
14+
`process-reaction`,
15+
reaction,
16+
{
17+
removeOnComplete: true,
18+
},
19+
);
1320
}
1421
}

src/services/apis/sendOtp/dto/sendOtp.dto.ts

Lines changed: 0 additions & 8 deletions
This file was deleted.

src/services/apis/sendOtp/sendOtp.controller.spec.ts

Lines changed: 0 additions & 18 deletions
This file was deleted.

src/services/apis/sendOtp/sendOtp.controller.ts

Lines changed: 0 additions & 77 deletions
This file was deleted.

0 commit comments

Comments
 (0)