Skip to content

Commit 6d328b9

Browse files
authored
Merge pull request #344 from red-kite-solutions/feature/cancel_running_job
Cancel running job
2 parents 31a58c2 + 1f88a82 commit 6d328b9

File tree

71 files changed

+728
-152
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

71 files changed

+728
-152
lines changed

devspace.yaml

+8
Original file line numberDiff line numberDiff line change
@@ -587,15 +587,23 @@ deployments:
587587
config:
588588
max.message.bytes: 64000
589589
flush.messages: 1
590+
- name: stalker.jobs.management
591+
partitions: 1
592+
replicationFactor: 1
593+
config:
594+
max.message.bytes: 64000
595+
flush.messages: 1
590596
extraProvisioningCommands: # Setting the ACLs
591597
- "/opt/bitnami/kafka/bin/kafka-acls.sh --bootstrap-server $KAFKA_SERVICE --command-config $CLIENT_CONF --add --allow-principal User:jobs-manager --operation read --operation write --operation describe --topic stalker.jobs.findings"
592598
- "/opt/bitnami/kafka/bin/kafka-acls.sh --bootstrap-server $KAFKA_SERVICE --command-config $CLIENT_CONF --add --allow-principal User:jobs-manager --operation write --operation describe --topic stalker.jobs.requests"
593599
- "/opt/bitnami/kafka/bin/kafka-acls.sh --bootstrap-server $KAFKA_SERVICE --command-config $CLIENT_CONF --add --allow-principal User:jobs-manager --operation read --operation describe --topic stalker.jobs.logs"
594600
- "/opt/bitnami/kafka/bin/kafka-acls.sh --bootstrap-server $KAFKA_SERVICE --command-config $CLIENT_CONF --add --allow-principal User:jobs-manager --operation read --operation write --operation describe --topic stalker.jobs.models"
601+
- "/opt/bitnami/kafka/bin/kafka-acls.sh --bootstrap-server $KAFKA_SERVICE --command-config $CLIENT_CONF --add --allow-principal User:jobs-manager --operation read --operation write --operation describe --topic stalker.jobs.management"
595602
- "/opt/bitnami/kafka/bin/kafka-acls.sh --bootstrap-server $KAFKA_SERVICE --command-config $CLIENT_CONF --add --allow-principal User:orchestrator --operation write --operation describe --topic stalker.jobs.findings"
596603
- "/opt/bitnami/kafka/bin/kafka-acls.sh --bootstrap-server $KAFKA_SERVICE --command-config $CLIENT_CONF --add --allow-principal User:orchestrator --operation read --operation describe --topic stalker.jobs.requests"
597604
- "/opt/bitnami/kafka/bin/kafka-acls.sh --bootstrap-server $KAFKA_SERVICE --command-config $CLIENT_CONF --add --allow-principal User:orchestrator --operation write --operation describe --topic stalker.jobs.logs"
598605
- "/opt/bitnami/kafka/bin/kafka-acls.sh --bootstrap-server $KAFKA_SERVICE --command-config $CLIENT_CONF --add --allow-principal User:orchestrator --operation read --operation describe --topic stalker.jobs.models"
606+
- "/opt/bitnami/kafka/bin/kafka-acls.sh --bootstrap-server $KAFKA_SERVICE --command-config $CLIENT_CONF --add --allow-principal User:orchestrator --operation read --operation describe --topic stalker.jobs.management"
599607
- "/opt/bitnami/kafka/bin/kafka-acls.sh --bootstrap-server $KAFKA_SERVICE --command-config $CLIENT_CONF --add --allow-principal User:jobs-manager --operation read --operation describe --group jobs-manager"
600608
- "/opt/bitnami/kafka/bin/kafka-acls.sh --bootstrap-server $KAFKA_SERVICE --command-config $CLIENT_CONF --add --allow-principal User:jobs-manager --operation read --operation describe --group jobs-manager-job-logs"
601609
- "/opt/bitnami/kafka/bin/kafka-acls.sh --bootstrap-server $KAFKA_SERVICE --command-config $CLIENT_CONF --add --allow-principal User:orchestrator --operation read --operation describe --group stalker"

packages/backend/jobs-manager/service/src/modules/auth/constants.ts

+1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ export const orchestratorConstants = {
2222
findings: 'stalker.jobs.findings',
2323
jobLogs: 'stalker.jobs.logs',
2424
jobModels: 'stalker.jobs.models',
25+
jobManagement: 'stalker.jobs.management',
2526
},
2627
};
2728

packages/backend/jobs-manager/service/src/modules/database/custom-job-templates/custom-job-templates.module.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { Module } from '@nestjs/common';
22
import { DataSourcesModule } from '../../datasources/data-sources.module';
3-
import { QueueModule } from '../../job-queue/queue.module';
3+
import { QueueModule } from '../../queues/queue.module';
44
import { DatalayerModule } from '../datalayer.module';
55
import { CustomJobTemplatesController } from './custom-job-templates.controller';
66
import { jobTemplatesInitProvider } from './custom-job-templates.provider';

packages/backend/jobs-manager/service/src/modules/database/custom-jobs/custom-jobs.module.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { Module } from '@nestjs/common';
22
import { MongooseModule } from '@nestjs/mongoose';
33
import { DataSourcesModule } from '../../datasources/data-sources.module';
4-
import { QueueModule } from '../../job-queue/queue.module';
4+
import { QueueModule } from '../../queues/queue.module';
55
import { ConfigService } from '../admin/config/config.service';
66
import { DatalayerModule } from '../datalayer.module';
77
import { CustomJobsController } from './custom-jobs.controller';

packages/backend/jobs-manager/service/src/modules/database/custom-jobs/custom-jobs.service.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import { DeleteResult } from 'mongodb';
44
import { Model, Types } from 'mongoose';
55
import { HttpNotFoundException } from '../../../exceptions/http.exceptions';
66
import { JobSummary } from '../../../types/job-summary.type';
7-
import { JobModelUpdateQueue } from '../../job-queue/job-model-update-queue';
7+
import { JobModelUpdateQueue } from '../../queues/job-model-update-queue/job-model-update-queue';
88
import {
99
JobContainer,
1010
JobContainerDocument,

packages/backend/jobs-manager/service/src/modules/database/custom-jobs/jobs.provider.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import { UpdateFilter } from 'mongodb';
44
import { Model } from 'mongoose';
55
import { isConsumerMode } from '../../app.constants';
66
import { DataSources } from '../../datasources/data-sources';
7-
import { JobModelUpdateQueue } from '../../job-queue/job-model-update-queue';
7+
import { JobModelUpdateQueue } from '../../queues/job-model-update-queue/job-model-update-queue';
88
import { DATABASE_INIT } from '../admin/config/config.provider';
99
import { JobPodConfiguration } from '../admin/config/job-pod-config/job-pod-config.model';
1010
import { JobContainer } from '../container/job-container.model';

packages/backend/jobs-manager/service/src/modules/database/jobs/job-executions.service.ts

+17-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@ import {
1111
} from '../../../types/timestamped-string.type';
1212
import { ProjectUnassigned } from '../../../validators/is-project-id.validator';
1313
import { isTest } from '../../app.constants';
14-
import { JobQueue } from '../../job-queue/job-queue';
14+
import { JobManagementQueue } from '../../queues/job-management-queue/job-management-queue';
15+
import { KafkaJobManagementTask } from '../../queues/job-management-queue/kafka-job-management-queue';
16+
import { JobQueue } from '../../queues/job-queue/job-queue';
1517
import { ConfigService } from '../admin/config/config.service';
1618
import { Project } from '../reporting/project.model';
1719
import { JobExecutionsDto } from './jobs.dto';
@@ -25,6 +27,7 @@ export class JobExecutionsService {
2527
private jobQueue: JobQueue,
2628
@InjectModel('job') private readonly jobModel: Model<Job & Document>,
2729
@InjectModel('project') private readonly projectModel: Model<Project>,
30+
private jobManagementQueue: JobManagementQueue,
2831
) {}
2932

3033
public async getAll(dto: JobExecutionsDto): Promise<Page<JobDocument>> {
@@ -141,6 +144,19 @@ export class JobExecutionsService {
141144
};
142145
}
143146

147+
public async terminate(jobId: string) {
148+
const j = await this.jobModel.findById(jobId);
149+
150+
if (j.endTime) return;
151+
152+
const task: KafkaJobManagementTask = {
153+
jobId: j._id.toString(),
154+
task: 'TerminateJob',
155+
};
156+
157+
this.jobManagementQueue.publish(task);
158+
}
159+
144160
public async addJobOutputLine(
145161
jobId: string,
146162
timestamp: number,

packages/backend/jobs-manager/service/src/modules/database/jobs/jobs.controller.ts

+16-1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import {
44
Delete,
55
Get,
66
Param,
7+
Patch,
78
Post,
89
Query,
910
UseGuards,
@@ -13,6 +14,7 @@ import { isNotEmpty, isString } from 'class-validator';
1314
import {
1415
HttpBadRequestException,
1516
HttpNotFoundException,
17+
HttpNotImplementedException,
1618
} from '../../../exceptions/http.exceptions';
1719
import { MongoIdDto } from '../../../types/dto/mongo-id.dto';
1820
import { JobLog } from '../../../types/job-log.model';
@@ -32,7 +34,7 @@ import { SecretsService } from '../secrets/secrets.service';
3234
import { JobParameter } from '../subscriptions/subscriptions.type';
3335
import { JobExecutionsService } from './job-executions.service';
3436
import { JobDefinitions } from './job-model.module';
35-
import { JobExecutionsDto, StartJobDto } from './jobs.dto';
37+
import { JobExecutionsDto, JobManagementDto, StartJobDto } from './jobs.dto';
3638
import { JobFactory, JobFactoryUtils } from './jobs.factory';
3739
import { CustomJob } from './models/custom-job.model';
3840
import { JobDocument } from './models/jobs.model';
@@ -131,6 +133,19 @@ export class JobsController {
131133
return await this.jobsService.publish(job);
132134
}
133135

136+
@UseGuards(AuthGuard([JwtStrategy.name, ApiKeyStrategy.name]), RolesGuard)
137+
@Roles(Role.User)
138+
@Patch(':id')
139+
async stopJob(@Param() id: MongoIdDto, @Body() dto: JobManagementDto) {
140+
switch (dto.task) {
141+
case 'TerminateJob':
142+
await this.jobsService.terminate(id.id);
143+
break;
144+
default:
145+
throw new HttpNotImplementedException();
146+
}
147+
}
148+
134149
@UseGuards(AuthGuard([JwtStrategy.name, ApiKeyStrategy.name]), RolesGuard)
135150
@Roles(Role.User)
136151
@Delete()

packages/backend/jobs-manager/service/src/modules/database/jobs/jobs.dto.ts

+10
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { IntersectionType } from '@nestjs/swagger';
22
import { Type } from 'class-transformer';
33
import {
44
IsArray,
5+
IsIn,
56
IsMongoId,
67
IsNotEmpty,
78
IsOptional,
@@ -40,3 +41,12 @@ export class StartJobDto {
4041
@IsOptional()
4142
projectId?: string;
4243
}
44+
45+
export const jobManagementTasks = ['TerminateJob'] as const;
46+
export type JobManagementTask = (typeof jobManagementTasks)[number];
47+
48+
export class JobManagementDto {
49+
@IsNotEmpty()
50+
@IsIn(jobManagementTasks)
51+
task: JobManagementTask;
52+
}

packages/backend/jobs-manager/service/src/modules/database/jobs/jobs.module.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { Module } from '@nestjs/common';
22
import { JwtModule } from '@nestjs/jwt';
3-
import { QueueModule } from '../../job-queue/queue.module';
3+
import { QueueModule } from '../../queues/queue.module';
44
import { ConfigModule } from '../admin/config/config.module';
55
import { CustomJobsModule } from '../custom-jobs/custom-jobs.module';
66
import { DatalayerModule } from '../datalayer.module';

packages/backend/jobs-manager/service/src/modules/database/reporting/domain/domain.module.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { forwardRef, Module } from '@nestjs/common';
2-
import { QueueModule } from '../../../job-queue/queue.module';
2+
import { QueueModule } from '../../../queues/queue.module';
33
import { ConfigModule } from '../../admin/config/config.module';
44
import { DatalayerModule } from '../../datalayer.module';
55
import { JobsModule } from '../../jobs/jobs.module';

packages/backend/jobs-manager/service/src/modules/database/reporting/domain/domain.service.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import {
88
} from '../../../../exceptions/http.exceptions';
99
import escapeStringRegexp from '../../../../utils/escape-string-regexp';
1010
import { HostnameFinding } from '../../../findings/findings.service';
11-
import { FindingsQueue } from '../../../job-queue/findings-queue';
11+
import { FindingsQueue } from '../../../queues/finding-queue/findings-queue';
1212
import { ConfigService } from '../../admin/config/config.service';
1313
import { MONGO_DUPLICATE_ERROR } from '../../database.constants';
1414
import { JobExecutionsService } from '../../jobs/job-executions.service';

packages/backend/jobs-manager/service/src/modules/database/reporting/host/host.module.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { forwardRef, Module } from '@nestjs/common';
2-
import { QueueModule } from '../../../job-queue/queue.module';
2+
import { QueueModule } from '../../../queues/queue.module';
33
import { ConfigModule } from '../../admin/config/config.module';
44
import { DatalayerModule } from '../../datalayer.module';
55
import { JobsModule } from '../../jobs/jobs.module';

packages/backend/jobs-manager/service/src/modules/database/reporting/host/host.service.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import { FilterQuery, Model, Types } from 'mongoose';
55
import { HttpNotFoundException } from '../../../../exceptions/http.exceptions';
66
import escapeStringRegexp from '../../../../utils/escape-string-regexp';
77
import { IpFinding } from '../../../findings/findings.service';
8-
import { FindingsQueue } from '../../../job-queue/findings-queue';
8+
import { FindingsQueue } from '../../../queues/finding-queue/findings-queue';
99
import { ConfigService } from '../../admin/config/config.service';
1010
import { TagsService } from '../../tags/tag.service';
1111
import { CorrelationKeyUtils } from '../correlation.utils';

packages/backend/jobs-manager/service/src/modules/database/reporting/websites/website.module.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { Module } from '@nestjs/common';
2-
import { QueueModule } from '../../../job-queue/queue.module';
2+
import { QueueModule } from '../../../queues/queue.module';
33
import { DatalayerModule } from '../../datalayer.module';
44
import { TagsModule } from '../../tags/tag.module';
55
import { WebsiteController } from './website.controller';

packages/backend/jobs-manager/service/src/modules/database/reporting/websites/website.service.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import {
88
} from '../../../../exceptions/http.exceptions';
99
import escapeStringRegexp from '../../../../utils/escape-string-regexp';
1010
import { WebsiteFinding } from '../../../findings/findings.service';
11-
import { FindingsQueue } from '../../../job-queue/findings-queue';
11+
import { FindingsQueue } from '../../../queues/finding-queue/findings-queue';
1212
import { TagsService } from '../../tags/tag.service';
1313
import { CorrelationKeyUtils } from '../correlation.utils';
1414
import { Domain } from '../domain/domain.model';

packages/backend/jobs-manager/service/src/modules/findings/findings.module.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import { SecretsModule } from '../database/secrets/secrets.module';
1616
import { EventSubscriptionsModule } from '../database/subscriptions/event-subscriptions/event-subscriptions.module';
1717
import { SubscriptionTriggersModule } from '../database/subscriptions/subscription-triggers/subscription-triggers.module';
1818
import { TagsModule } from '../database/tags/tag.module';
19-
import { kafkaConfig } from '../job-queue/queue.module';
19+
import { kafkaConfig } from '../queues/queue.module';
2020
import { FindingsHandlers } from './commands/findings-commands';
2121
import { FindingsConsumer } from './findings.consumer';
2222
import { FindingsController } from './findings.controller';

packages/backend/jobs-manager/service/src/modules/job-queue/findings-queue.ts renamed to packages/backend/jobs-manager/service/src/modules/queues/finding-queue/findings-queue.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { Finding } from '../findings/findings.service';
1+
import { Finding } from '../../findings/findings.service';
22

33
export abstract class FindingsQueue {
44
public abstract publish(...findings: Finding[]): Promise<void>;

packages/backend/jobs-manager/service/src/modules/job-queue/kafka-findings-queue.ts renamed to packages/backend/jobs-manager/service/src/modules/queues/finding-queue/kafka-findings-queue.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { Injectable, Logger } from '@nestjs/common';
22
import { Message, Producer } from 'kafkajs';
3-
import { orchestratorConstants } from '../auth/constants';
4-
import { Finding } from '../findings/findings.service';
3+
import { orchestratorConstants } from '../../auth/constants';
4+
import { Finding } from '../../findings/findings.service';
55
import { FindingsQueue } from './findings-queue';
66

77
@Injectable()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
import { KafkaJobManagementTask } from './kafka-job-management-queue';
2+
3+
export abstract class JobManagementQueue {
4+
public abstract publish(
5+
...jobManagementTasks: KafkaJobManagementTask[]
6+
): Promise<void>;
7+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
import { Injectable, Logger } from '@nestjs/common';
2+
import { Message, Producer } from 'kafkajs';
3+
import { orchestratorConstants } from '../../auth/constants';
4+
import { JobManagementTask } from '../../database/jobs/jobs.dto';
5+
import { JobManagementQueue } from './job-management-queue';
6+
7+
export interface KafkaJobManagementTask {
8+
jobId: string;
9+
task: JobManagementTask;
10+
}
11+
12+
@Injectable()
13+
export class KafkaJobManagementQueue implements JobManagementQueue {
14+
private logger = new Logger(KafkaJobManagementQueue.name);
15+
16+
constructor(private producer: Producer) {}
17+
18+
public async publish(...jobManagementTasks: KafkaJobManagementTask[]) {
19+
this.logger.debug(
20+
`Publishing ${
21+
jobManagementTasks.length
22+
} tasks to the job management queue on topic ${
23+
orchestratorConstants.topics.jobManagement
24+
}: ${JSON.stringify(jobManagementTasks)}.`,
25+
);
26+
27+
for (const task of jobManagementTasks) {
28+
const serializedTasks: Message[] = [
29+
{
30+
value: JSON.stringify(task),
31+
},
32+
];
33+
34+
await this.producer.send({
35+
topic: orchestratorConstants.topics.jobManagement,
36+
messages: serializedTasks,
37+
});
38+
}
39+
}
40+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
import { Injectable, Logger } from '@nestjs/common';
2+
import { JobManagementQueue } from './job-management-queue';
3+
import { KafkaJobManagementTask } from './kafka-job-management-queue';
4+
5+
@Injectable()
6+
export class NullJobManagementQueue implements JobManagementQueue {
7+
private logger = new Logger(NullJobManagementQueue.name);
8+
9+
public async publish(...jobManagementTasks: KafkaJobManagementTask[]) {
10+
this.logger.debug('Job management not posted to queue');
11+
}
12+
}

packages/backend/jobs-manager/service/src/modules/job-queue/job-model-update-queue.ts renamed to packages/backend/jobs-manager/service/src/modules/queues/job-model-update-queue/job-model-update-queue.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { CustomJobsDocument } from '../database/custom-jobs/custom-jobs.model';
1+
import { CustomJobsDocument } from '../../database/custom-jobs/custom-jobs.model';
22

33
export abstract class JobModelUpdateQueue {
44
public abstract publish(

packages/backend/jobs-manager/service/src/modules/job-queue/kafka-job-model-update-queue.ts renamed to packages/backend/jobs-manager/service/src/modules/queues/job-model-update-queue/kafka-job-model-update-queue.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
import { Injectable, Logger } from '@nestjs/common';
22
import { Message, Producer } from 'kafkajs';
3-
import { orchestratorConstants } from '../auth/constants';
3+
import { orchestratorConstants } from '../../auth/constants';
44
import {
55
CustomJobEntry,
66
CustomJobsDocument,
7-
} from '../database/custom-jobs/custom-jobs.model';
7+
} from '../../database/custom-jobs/custom-jobs.model';
88
import { JobModelUpdateQueue } from './job-model-update-queue';
99

1010
export interface KafkaJobModelUpdate extends CustomJobEntry {

packages/backend/jobs-manager/service/src/modules/job-queue/null-job-model-update-queue.ts renamed to packages/backend/jobs-manager/service/src/modules/queues/job-model-update-queue/null-job-model-update-queue.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { Injectable, Logger } from '@nestjs/common';
2-
import { CustomJobEntry } from '../database/custom-jobs/custom-jobs.model';
2+
import { CustomJobEntry } from '../../database/custom-jobs/custom-jobs.model';
33
import { JobModelUpdateQueue } from './job-model-update-queue';
44

55
@Injectable()

packages/backend/jobs-manager/service/src/modules/job-queue/kafka-job-queue.ts renamed to packages/backend/jobs-manager/service/src/modules/queues/job-queue/kafka-job-queue.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { Injectable, Logger } from '@nestjs/common';
22
import { Producer } from 'kafkajs';
3-
import { orchestratorConstants } from '../auth/constants';
3+
import { orchestratorConstants } from '../../auth/constants';
44
import { JobQueue } from './job-queue';
55

66
@Injectable()

packages/backend/jobs-manager/service/src/modules/job-queue/queue.module.ts renamed to packages/backend/jobs-manager/service/src/modules/queues/queue.module.ts

+26-10
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,18 @@ import { Kafka, KafkaConfig } from 'kafkajs';
33
import { readFileSync } from 'node:fs';
44
import { isTest } from '../app.constants';
55
import { orchestratorConstants } from '../auth/constants';
6-
import { FindingsQueue } from './findings-queue';
7-
import { JobModelUpdateQueue } from './job-model-update-queue';
8-
import { JobQueue } from './job-queue';
9-
import { KafkaFindingsQueue } from './kafka-findings-queue';
10-
import { KafkaJobModelUpdateQueue } from './kafka-job-model-update-queue';
11-
import { KafkaJobQueue } from './kafka-job-queue';
12-
import { NullFindingsQueue } from './null-findings-queue';
13-
import { NullJobModelUpdateQueue } from './null-job-model-update-queue';
14-
import { NullJobQueue } from './null-job-queue';
6+
import { FindingsQueue } from './finding-queue/findings-queue';
7+
import { KafkaFindingsQueue } from './finding-queue/kafka-findings-queue';
8+
import { NullFindingsQueue } from './finding-queue/null-findings-queue';
9+
import { JobManagementQueue } from './job-management-queue/job-management-queue';
10+
import { KafkaJobManagementQueue } from './job-management-queue/kafka-job-management-queue';
11+
import { NullJobManagementQueue } from './job-management-queue/null-job-management-queue';
12+
import { JobModelUpdateQueue } from './job-model-update-queue/job-model-update-queue';
13+
import { KafkaJobModelUpdateQueue } from './job-model-update-queue/kafka-job-model-update-queue';
14+
import { NullJobModelUpdateQueue } from './job-model-update-queue/null-job-model-update-queue';
15+
import { JobQueue } from './job-queue/job-queue';
16+
import { KafkaJobQueue } from './job-queue/kafka-job-queue';
17+
import { NullJobQueue } from './job-queue/null-job-queue';
1518

1619
const certFolder =
1720
isTest() && process.env.TEST_TYPE === 'unit' ? './' : '/certs';
@@ -82,8 +85,21 @@ export const kafkaConfig: KafkaConfig = {
8285
return new KafkaJobModelUpdateQueue(producer);
8386
},
8487
},
88+
{
89+
provide: JobManagementQueue,
90+
useFactory: async () => {
91+
if (isTest()) return new NullJobManagementQueue();
92+
93+
const kafka = new Kafka(kafkaConfig);
94+
95+
const producer = kafka.producer();
96+
await producer.connect();
97+
98+
return new KafkaJobManagementQueue(producer);
99+
},
100+
},
85101
],
86-
exports: [JobQueue, FindingsQueue, JobModelUpdateQueue],
102+
exports: [JobQueue, FindingsQueue, JobModelUpdateQueue, JobManagementQueue],
87103
})
88104
export class QueueModule {
89105
public constructor() {}

0 commit comments

Comments
 (0)