-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathkafka-job-management-queue.ts
40 lines (34 loc) · 1.16 KB
/
kafka-job-management-queue.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import { Injectable, Logger } from '@nestjs/common';
import { Message, Producer } from 'kafkajs';
import { orchestratorConstants } from '../../auth/constants';
import { JobManagementTask } from '../../database/jobs/jobs.dto';
import { JobManagementQueue } from './job-management-queue';
export interface KafkaJobManagementTask {
jobId: string;
task: JobManagementTask;
}
@Injectable()
export class KafkaJobManagementQueue implements JobManagementQueue {
private logger = new Logger(KafkaJobManagementQueue.name);
constructor(private producer: Producer) {}
public async publish(...jobManagementTasks: KafkaJobManagementTask[]) {
this.logger.debug(
`Publishing ${
jobManagementTasks.length
} tasks to the job management queue on topic ${
orchestratorConstants.topics.jobManagement
}: ${JSON.stringify(jobManagementTasks)}.`,
);
for (const task of jobManagementTasks) {
const serializedTasks: Message[] = [
{
value: JSON.stringify(task),
},
];
await this.producer.send({
topic: orchestratorConstants.topics.jobManagement,
messages: serializedTasks,
});
}
}
}