diff --git a/devspace.yaml b/devspace.yaml index c66577dc..88d3b920 100644 --- a/devspace.yaml +++ b/devspace.yaml @@ -587,15 +587,23 @@ deployments: config: max.message.bytes: 64000 flush.messages: 1 + - name: stalker.jobs.management + partitions: 1 + replicationFactor: 1 + config: + max.message.bytes: 64000 + flush.messages: 1 extraProvisioningCommands: # Setting the ACLs - "/opt/bitnami/kafka/bin/kafka-acls.sh --bootstrap-server $KAFKA_SERVICE --command-config $CLIENT_CONF --add --allow-principal User:jobs-manager --operation read --operation write --operation describe --topic stalker.jobs.findings" - "/opt/bitnami/kafka/bin/kafka-acls.sh --bootstrap-server $KAFKA_SERVICE --command-config $CLIENT_CONF --add --allow-principal User:jobs-manager --operation write --operation describe --topic stalker.jobs.requests" - "/opt/bitnami/kafka/bin/kafka-acls.sh --bootstrap-server $KAFKA_SERVICE --command-config $CLIENT_CONF --add --allow-principal User:jobs-manager --operation read --operation describe --topic stalker.jobs.logs" - "/opt/bitnami/kafka/bin/kafka-acls.sh --bootstrap-server $KAFKA_SERVICE --command-config $CLIENT_CONF --add --allow-principal User:jobs-manager --operation read --operation write --operation describe --topic stalker.jobs.models" + - "/opt/bitnami/kafka/bin/kafka-acls.sh --bootstrap-server $KAFKA_SERVICE --command-config $CLIENT_CONF --add --allow-principal User:jobs-manager --operation read --operation write --operation describe --topic stalker.jobs.management" - "/opt/bitnami/kafka/bin/kafka-acls.sh --bootstrap-server $KAFKA_SERVICE --command-config $CLIENT_CONF --add --allow-principal User:orchestrator --operation write --operation describe --topic stalker.jobs.findings" - "/opt/bitnami/kafka/bin/kafka-acls.sh --bootstrap-server $KAFKA_SERVICE --command-config $CLIENT_CONF --add --allow-principal User:orchestrator --operation read --operation describe --topic stalker.jobs.requests" - "/opt/bitnami/kafka/bin/kafka-acls.sh --bootstrap-server $KAFKA_SERVICE --command-config $CLIENT_CONF --add --allow-principal User:orchestrator --operation write --operation describe --topic stalker.jobs.logs" - "/opt/bitnami/kafka/bin/kafka-acls.sh --bootstrap-server $KAFKA_SERVICE --command-config $CLIENT_CONF --add --allow-principal User:orchestrator --operation read --operation describe --topic stalker.jobs.models" + - "/opt/bitnami/kafka/bin/kafka-acls.sh --bootstrap-server $KAFKA_SERVICE --command-config $CLIENT_CONF --add --allow-principal User:orchestrator --operation read --operation describe --topic stalker.jobs.management" - "/opt/bitnami/kafka/bin/kafka-acls.sh --bootstrap-server $KAFKA_SERVICE --command-config $CLIENT_CONF --add --allow-principal User:jobs-manager --operation read --operation describe --group jobs-manager" - "/opt/bitnami/kafka/bin/kafka-acls.sh --bootstrap-server $KAFKA_SERVICE --command-config $CLIENT_CONF --add --allow-principal User:jobs-manager --operation read --operation describe --group jobs-manager-job-logs" - "/opt/bitnami/kafka/bin/kafka-acls.sh --bootstrap-server $KAFKA_SERVICE --command-config $CLIENT_CONF --add --allow-principal User:orchestrator --operation read --operation describe --group stalker" diff --git a/packages/backend/jobs-manager/service/src/modules/auth/constants.ts b/packages/backend/jobs-manager/service/src/modules/auth/constants.ts index 22af1776..ca3db28e 100644 --- a/packages/backend/jobs-manager/service/src/modules/auth/constants.ts +++ b/packages/backend/jobs-manager/service/src/modules/auth/constants.ts @@ -22,6 +22,7 @@ export const orchestratorConstants = { findings: 'stalker.jobs.findings', jobLogs: 'stalker.jobs.logs', jobModels: 'stalker.jobs.models', + jobManagement: 'stalker.jobs.management', }, }; diff --git a/packages/backend/jobs-manager/service/src/modules/database/custom-job-templates/custom-job-templates.module.ts b/packages/backend/jobs-manager/service/src/modules/database/custom-job-templates/custom-job-templates.module.ts index ee488161..5dc891b5 100644 --- a/packages/backend/jobs-manager/service/src/modules/database/custom-job-templates/custom-job-templates.module.ts +++ b/packages/backend/jobs-manager/service/src/modules/database/custom-job-templates/custom-job-templates.module.ts @@ -1,6 +1,6 @@ import { Module } from '@nestjs/common'; import { DataSourcesModule } from '../../datasources/data-sources.module'; -import { QueueModule } from '../../job-queue/queue.module'; +import { QueueModule } from '../../queues/queue.module'; import { DatalayerModule } from '../datalayer.module'; import { CustomJobTemplatesController } from './custom-job-templates.controller'; import { jobTemplatesInitProvider } from './custom-job-templates.provider'; diff --git a/packages/backend/jobs-manager/service/src/modules/database/custom-jobs/custom-jobs.module.ts b/packages/backend/jobs-manager/service/src/modules/database/custom-jobs/custom-jobs.module.ts index ab1ee9c4..286d3e4b 100644 --- a/packages/backend/jobs-manager/service/src/modules/database/custom-jobs/custom-jobs.module.ts +++ b/packages/backend/jobs-manager/service/src/modules/database/custom-jobs/custom-jobs.module.ts @@ -1,7 +1,7 @@ import { Module } from '@nestjs/common'; import { MongooseModule } from '@nestjs/mongoose'; import { DataSourcesModule } from '../../datasources/data-sources.module'; -import { QueueModule } from '../../job-queue/queue.module'; +import { QueueModule } from '../../queues/queue.module'; import { ConfigService } from '../admin/config/config.service'; import { DatalayerModule } from '../datalayer.module'; import { CustomJobsController } from './custom-jobs.controller'; diff --git a/packages/backend/jobs-manager/service/src/modules/database/custom-jobs/custom-jobs.service.ts b/packages/backend/jobs-manager/service/src/modules/database/custom-jobs/custom-jobs.service.ts index fa6fd7fd..5f7db15d 100644 --- a/packages/backend/jobs-manager/service/src/modules/database/custom-jobs/custom-jobs.service.ts +++ b/packages/backend/jobs-manager/service/src/modules/database/custom-jobs/custom-jobs.service.ts @@ -4,7 +4,7 @@ import { DeleteResult } from 'mongodb'; import { Model, Types } from 'mongoose'; import { HttpNotFoundException } from '../../../exceptions/http.exceptions'; import { JobSummary } from '../../../types/job-summary.type'; -import { JobModelUpdateQueue } from '../../job-queue/job-model-update-queue'; +import { JobModelUpdateQueue } from '../../queues/job-model-update-queue/job-model-update-queue'; import { JobContainer, JobContainerDocument, diff --git a/packages/backend/jobs-manager/service/src/modules/database/custom-jobs/jobs.provider.ts b/packages/backend/jobs-manager/service/src/modules/database/custom-jobs/jobs.provider.ts index 18f48050..2a2a52c1 100644 --- a/packages/backend/jobs-manager/service/src/modules/database/custom-jobs/jobs.provider.ts +++ b/packages/backend/jobs-manager/service/src/modules/database/custom-jobs/jobs.provider.ts @@ -4,7 +4,7 @@ import { UpdateFilter } from 'mongodb'; import { Model } from 'mongoose'; import { isConsumerMode } from '../../app.constants'; import { DataSources } from '../../datasources/data-sources'; -import { JobModelUpdateQueue } from '../../job-queue/job-model-update-queue'; +import { JobModelUpdateQueue } from '../../queues/job-model-update-queue/job-model-update-queue'; import { DATABASE_INIT } from '../admin/config/config.provider'; import { JobPodConfiguration } from '../admin/config/job-pod-config/job-pod-config.model'; import { JobContainer } from '../container/job-container.model'; diff --git a/packages/backend/jobs-manager/service/src/modules/database/jobs/job-executions.service.ts b/packages/backend/jobs-manager/service/src/modules/database/jobs/job-executions.service.ts index cb9d0248..a95e569a 100644 --- a/packages/backend/jobs-manager/service/src/modules/database/jobs/job-executions.service.ts +++ b/packages/backend/jobs-manager/service/src/modules/database/jobs/job-executions.service.ts @@ -11,7 +11,9 @@ import { } from '../../../types/timestamped-string.type'; import { ProjectUnassigned } from '../../../validators/is-project-id.validator'; import { isTest } from '../../app.constants'; -import { JobQueue } from '../../job-queue/job-queue'; +import { JobManagementQueue } from '../../queues/job-management-queue/job-management-queue'; +import { KafkaJobManagementTask } from '../../queues/job-management-queue/kafka-job-management-queue'; +import { JobQueue } from '../../queues/job-queue/job-queue'; import { ConfigService } from '../admin/config/config.service'; import { Project } from '../reporting/project.model'; import { JobExecutionsDto } from './jobs.dto'; @@ -25,6 +27,7 @@ export class JobExecutionsService { private jobQueue: JobQueue, @InjectModel('job') private readonly jobModel: Model, @InjectModel('project') private readonly projectModel: Model, + private jobManagementQueue: JobManagementQueue, ) {} public async getAll(dto: JobExecutionsDto): Promise> { @@ -141,6 +144,19 @@ export class JobExecutionsService { }; } + public async terminate(jobId: string) { + const j = await this.jobModel.findById(jobId); + + if (j.endTime) return; + + const task: KafkaJobManagementTask = { + jobId: j._id.toString(), + task: 'TerminateJob', + }; + + this.jobManagementQueue.publish(task); + } + public async addJobOutputLine( jobId: string, timestamp: number, diff --git a/packages/backend/jobs-manager/service/src/modules/database/jobs/jobs.controller.ts b/packages/backend/jobs-manager/service/src/modules/database/jobs/jobs.controller.ts index b1aafe47..47b8ff1f 100644 --- a/packages/backend/jobs-manager/service/src/modules/database/jobs/jobs.controller.ts +++ b/packages/backend/jobs-manager/service/src/modules/database/jobs/jobs.controller.ts @@ -4,6 +4,7 @@ import { Delete, Get, Param, + Patch, Post, Query, UseGuards, @@ -13,6 +14,7 @@ import { isNotEmpty, isString } from 'class-validator'; import { HttpBadRequestException, HttpNotFoundException, + HttpNotImplementedException, } from '../../../exceptions/http.exceptions'; import { MongoIdDto } from '../../../types/dto/mongo-id.dto'; import { JobLog } from '../../../types/job-log.model'; @@ -32,7 +34,7 @@ import { SecretsService } from '../secrets/secrets.service'; import { JobParameter } from '../subscriptions/subscriptions.type'; import { JobExecutionsService } from './job-executions.service'; import { JobDefinitions } from './job-model.module'; -import { JobExecutionsDto, StartJobDto } from './jobs.dto'; +import { JobExecutionsDto, JobManagementDto, StartJobDto } from './jobs.dto'; import { JobFactory, JobFactoryUtils } from './jobs.factory'; import { CustomJob } from './models/custom-job.model'; import { JobDocument } from './models/jobs.model'; @@ -131,6 +133,19 @@ export class JobsController { return await this.jobsService.publish(job); } + @UseGuards(AuthGuard([JwtStrategy.name, ApiKeyStrategy.name]), RolesGuard) + @Roles(Role.User) + @Patch(':id') + async stopJob(@Param() id: MongoIdDto, @Body() dto: JobManagementDto) { + switch (dto.task) { + case 'TerminateJob': + await this.jobsService.terminate(id.id); + break; + default: + throw new HttpNotImplementedException(); + } + } + @UseGuards(AuthGuard([JwtStrategy.name, ApiKeyStrategy.name]), RolesGuard) @Roles(Role.User) @Delete() diff --git a/packages/backend/jobs-manager/service/src/modules/database/jobs/jobs.dto.ts b/packages/backend/jobs-manager/service/src/modules/database/jobs/jobs.dto.ts index 256a61f1..6cd74160 100644 --- a/packages/backend/jobs-manager/service/src/modules/database/jobs/jobs.dto.ts +++ b/packages/backend/jobs-manager/service/src/modules/database/jobs/jobs.dto.ts @@ -2,6 +2,7 @@ import { IntersectionType } from '@nestjs/swagger'; import { Type } from 'class-transformer'; import { IsArray, + IsIn, IsMongoId, IsNotEmpty, IsOptional, @@ -40,3 +41,12 @@ export class StartJobDto { @IsOptional() projectId?: string; } + +export const jobManagementTasks = ['TerminateJob'] as const; +export type JobManagementTask = (typeof jobManagementTasks)[number]; + +export class JobManagementDto { + @IsNotEmpty() + @IsIn(jobManagementTasks) + task: JobManagementTask; +} diff --git a/packages/backend/jobs-manager/service/src/modules/database/jobs/jobs.module.ts b/packages/backend/jobs-manager/service/src/modules/database/jobs/jobs.module.ts index 19ed00f5..7fe0741d 100644 --- a/packages/backend/jobs-manager/service/src/modules/database/jobs/jobs.module.ts +++ b/packages/backend/jobs-manager/service/src/modules/database/jobs/jobs.module.ts @@ -1,6 +1,6 @@ import { Module } from '@nestjs/common'; import { JwtModule } from '@nestjs/jwt'; -import { QueueModule } from '../../job-queue/queue.module'; +import { QueueModule } from '../../queues/queue.module'; import { ConfigModule } from '../admin/config/config.module'; import { CustomJobsModule } from '../custom-jobs/custom-jobs.module'; import { DatalayerModule } from '../datalayer.module'; diff --git a/packages/backend/jobs-manager/service/src/modules/database/reporting/domain/domain.module.ts b/packages/backend/jobs-manager/service/src/modules/database/reporting/domain/domain.module.ts index 68cefbb1..bdb97e3c 100644 --- a/packages/backend/jobs-manager/service/src/modules/database/reporting/domain/domain.module.ts +++ b/packages/backend/jobs-manager/service/src/modules/database/reporting/domain/domain.module.ts @@ -1,5 +1,5 @@ import { forwardRef, Module } from '@nestjs/common'; -import { QueueModule } from '../../../job-queue/queue.module'; +import { QueueModule } from '../../../queues/queue.module'; import { ConfigModule } from '../../admin/config/config.module'; import { DatalayerModule } from '../../datalayer.module'; import { JobsModule } from '../../jobs/jobs.module'; diff --git a/packages/backend/jobs-manager/service/src/modules/database/reporting/domain/domain.service.ts b/packages/backend/jobs-manager/service/src/modules/database/reporting/domain/domain.service.ts index 96c7a991..ffc15f72 100644 --- a/packages/backend/jobs-manager/service/src/modules/database/reporting/domain/domain.service.ts +++ b/packages/backend/jobs-manager/service/src/modules/database/reporting/domain/domain.service.ts @@ -8,7 +8,7 @@ import { } from '../../../../exceptions/http.exceptions'; import escapeStringRegexp from '../../../../utils/escape-string-regexp'; import { HostnameFinding } from '../../../findings/findings.service'; -import { FindingsQueue } from '../../../job-queue/findings-queue'; +import { FindingsQueue } from '../../../queues/finding-queue/findings-queue'; import { ConfigService } from '../../admin/config/config.service'; import { MONGO_DUPLICATE_ERROR } from '../../database.constants'; import { JobExecutionsService } from '../../jobs/job-executions.service'; diff --git a/packages/backend/jobs-manager/service/src/modules/database/reporting/host/host.module.ts b/packages/backend/jobs-manager/service/src/modules/database/reporting/host/host.module.ts index cd4577e6..e83deda5 100644 --- a/packages/backend/jobs-manager/service/src/modules/database/reporting/host/host.module.ts +++ b/packages/backend/jobs-manager/service/src/modules/database/reporting/host/host.module.ts @@ -1,5 +1,5 @@ import { forwardRef, Module } from '@nestjs/common'; -import { QueueModule } from '../../../job-queue/queue.module'; +import { QueueModule } from '../../../queues/queue.module'; import { ConfigModule } from '../../admin/config/config.module'; import { DatalayerModule } from '../../datalayer.module'; import { JobsModule } from '../../jobs/jobs.module'; diff --git a/packages/backend/jobs-manager/service/src/modules/database/reporting/host/host.service.ts b/packages/backend/jobs-manager/service/src/modules/database/reporting/host/host.service.ts index f38df92b..d3006995 100644 --- a/packages/backend/jobs-manager/service/src/modules/database/reporting/host/host.service.ts +++ b/packages/backend/jobs-manager/service/src/modules/database/reporting/host/host.service.ts @@ -5,7 +5,7 @@ import { FilterQuery, Model, Types } from 'mongoose'; import { HttpNotFoundException } from '../../../../exceptions/http.exceptions'; import escapeStringRegexp from '../../../../utils/escape-string-regexp'; import { IpFinding } from '../../../findings/findings.service'; -import { FindingsQueue } from '../../../job-queue/findings-queue'; +import { FindingsQueue } from '../../../queues/finding-queue/findings-queue'; import { ConfigService } from '../../admin/config/config.service'; import { TagsService } from '../../tags/tag.service'; import { CorrelationKeyUtils } from '../correlation.utils'; diff --git a/packages/backend/jobs-manager/service/src/modules/database/reporting/websites/website.module.ts b/packages/backend/jobs-manager/service/src/modules/database/reporting/websites/website.module.ts index 2e946fb4..d4e31668 100644 --- a/packages/backend/jobs-manager/service/src/modules/database/reporting/websites/website.module.ts +++ b/packages/backend/jobs-manager/service/src/modules/database/reporting/websites/website.module.ts @@ -1,5 +1,5 @@ import { Module } from '@nestjs/common'; -import { QueueModule } from '../../../job-queue/queue.module'; +import { QueueModule } from '../../../queues/queue.module'; import { DatalayerModule } from '../../datalayer.module'; import { TagsModule } from '../../tags/tag.module'; import { WebsiteController } from './website.controller'; diff --git a/packages/backend/jobs-manager/service/src/modules/database/reporting/websites/website.service.ts b/packages/backend/jobs-manager/service/src/modules/database/reporting/websites/website.service.ts index f23dbe99..7d0c131d 100644 --- a/packages/backend/jobs-manager/service/src/modules/database/reporting/websites/website.service.ts +++ b/packages/backend/jobs-manager/service/src/modules/database/reporting/websites/website.service.ts @@ -8,7 +8,7 @@ import { } from '../../../../exceptions/http.exceptions'; import escapeStringRegexp from '../../../../utils/escape-string-regexp'; import { WebsiteFinding } from '../../../findings/findings.service'; -import { FindingsQueue } from '../../../job-queue/findings-queue'; +import { FindingsQueue } from '../../../queues/finding-queue/findings-queue'; import { TagsService } from '../../tags/tag.service'; import { CorrelationKeyUtils } from '../correlation.utils'; import { Domain } from '../domain/domain.model'; diff --git a/packages/backend/jobs-manager/service/src/modules/findings/findings.module.ts b/packages/backend/jobs-manager/service/src/modules/findings/findings.module.ts index 1c0c0b3b..f42b475d 100644 --- a/packages/backend/jobs-manager/service/src/modules/findings/findings.module.ts +++ b/packages/backend/jobs-manager/service/src/modules/findings/findings.module.ts @@ -16,7 +16,7 @@ import { SecretsModule } from '../database/secrets/secrets.module'; import { EventSubscriptionsModule } from '../database/subscriptions/event-subscriptions/event-subscriptions.module'; import { SubscriptionTriggersModule } from '../database/subscriptions/subscription-triggers/subscription-triggers.module'; import { TagsModule } from '../database/tags/tag.module'; -import { kafkaConfig } from '../job-queue/queue.module'; +import { kafkaConfig } from '../queues/queue.module'; import { FindingsHandlers } from './commands/findings-commands'; import { FindingsConsumer } from './findings.consumer'; import { FindingsController } from './findings.controller'; diff --git a/packages/backend/jobs-manager/service/src/modules/job-queue/findings-queue.ts b/packages/backend/jobs-manager/service/src/modules/queues/finding-queue/findings-queue.ts similarity index 77% rename from packages/backend/jobs-manager/service/src/modules/job-queue/findings-queue.ts rename to packages/backend/jobs-manager/service/src/modules/queues/finding-queue/findings-queue.ts index 0e3f07ec..5562bbd6 100644 --- a/packages/backend/jobs-manager/service/src/modules/job-queue/findings-queue.ts +++ b/packages/backend/jobs-manager/service/src/modules/queues/finding-queue/findings-queue.ts @@ -1,4 +1,4 @@ -import { Finding } from '../findings/findings.service'; +import { Finding } from '../../findings/findings.service'; export abstract class FindingsQueue { public abstract publish(...findings: Finding[]): Promise; diff --git a/packages/backend/jobs-manager/service/src/modules/job-queue/kafka-findings-queue.ts b/packages/backend/jobs-manager/service/src/modules/queues/finding-queue/kafka-findings-queue.ts similarity index 90% rename from packages/backend/jobs-manager/service/src/modules/job-queue/kafka-findings-queue.ts rename to packages/backend/jobs-manager/service/src/modules/queues/finding-queue/kafka-findings-queue.ts index 81cc3710..03ed7bb4 100644 --- a/packages/backend/jobs-manager/service/src/modules/job-queue/kafka-findings-queue.ts +++ b/packages/backend/jobs-manager/service/src/modules/queues/finding-queue/kafka-findings-queue.ts @@ -1,7 +1,7 @@ import { Injectable, Logger } from '@nestjs/common'; import { Message, Producer } from 'kafkajs'; -import { orchestratorConstants } from '../auth/constants'; -import { Finding } from '../findings/findings.service'; +import { orchestratorConstants } from '../../auth/constants'; +import { Finding } from '../../findings/findings.service'; import { FindingsQueue } from './findings-queue'; @Injectable() diff --git a/packages/backend/jobs-manager/service/src/modules/job-queue/null-findings-queue.ts b/packages/backend/jobs-manager/service/src/modules/queues/finding-queue/null-findings-queue.ts similarity index 100% rename from packages/backend/jobs-manager/service/src/modules/job-queue/null-findings-queue.ts rename to packages/backend/jobs-manager/service/src/modules/queues/finding-queue/null-findings-queue.ts diff --git a/packages/backend/jobs-manager/service/src/modules/queues/job-management-queue/job-management-queue.ts b/packages/backend/jobs-manager/service/src/modules/queues/job-management-queue/job-management-queue.ts new file mode 100644 index 00000000..130652b8 --- /dev/null +++ b/packages/backend/jobs-manager/service/src/modules/queues/job-management-queue/job-management-queue.ts @@ -0,0 +1,7 @@ +import { KafkaJobManagementTask } from './kafka-job-management-queue'; + +export abstract class JobManagementQueue { + public abstract publish( + ...jobManagementTasks: KafkaJobManagementTask[] + ): Promise; +} diff --git a/packages/backend/jobs-manager/service/src/modules/queues/job-management-queue/kafka-job-management-queue.ts b/packages/backend/jobs-manager/service/src/modules/queues/job-management-queue/kafka-job-management-queue.ts new file mode 100644 index 00000000..4bed51e3 --- /dev/null +++ b/packages/backend/jobs-manager/service/src/modules/queues/job-management-queue/kafka-job-management-queue.ts @@ -0,0 +1,40 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { Message, Producer } from 'kafkajs'; +import { orchestratorConstants } from '../../auth/constants'; +import { JobManagementTask } from '../../database/jobs/jobs.dto'; +import { JobManagementQueue } from './job-management-queue'; + +export interface KafkaJobManagementTask { + jobId: string; + task: JobManagementTask; +} + +@Injectable() +export class KafkaJobManagementQueue implements JobManagementQueue { + private logger = new Logger(KafkaJobManagementQueue.name); + + constructor(private producer: Producer) {} + + public async publish(...jobManagementTasks: KafkaJobManagementTask[]) { + this.logger.debug( + `Publishing ${ + jobManagementTasks.length + } tasks to the job management queue on topic ${ + orchestratorConstants.topics.jobManagement + }: ${JSON.stringify(jobManagementTasks)}.`, + ); + + for (const task of jobManagementTasks) { + const serializedTasks: Message[] = [ + { + value: JSON.stringify(task), + }, + ]; + + await this.producer.send({ + topic: orchestratorConstants.topics.jobManagement, + messages: serializedTasks, + }); + } + } +} diff --git a/packages/backend/jobs-manager/service/src/modules/queues/job-management-queue/null-job-management-queue.ts b/packages/backend/jobs-manager/service/src/modules/queues/job-management-queue/null-job-management-queue.ts new file mode 100644 index 00000000..e281fa9e --- /dev/null +++ b/packages/backend/jobs-manager/service/src/modules/queues/job-management-queue/null-job-management-queue.ts @@ -0,0 +1,12 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { JobManagementQueue } from './job-management-queue'; +import { KafkaJobManagementTask } from './kafka-job-management-queue'; + +@Injectable() +export class NullJobManagementQueue implements JobManagementQueue { + private logger = new Logger(NullJobManagementQueue.name); + + public async publish(...jobManagementTasks: KafkaJobManagementTask[]) { + this.logger.debug('Job management not posted to queue'); + } +} diff --git a/packages/backend/jobs-manager/service/src/modules/job-queue/job-model-update-queue.ts b/packages/backend/jobs-manager/service/src/modules/queues/job-model-update-queue/job-model-update-queue.ts similarity index 62% rename from packages/backend/jobs-manager/service/src/modules/job-queue/job-model-update-queue.ts rename to packages/backend/jobs-manager/service/src/modules/queues/job-model-update-queue/job-model-update-queue.ts index 117a5653..bc56a617 100644 --- a/packages/backend/jobs-manager/service/src/modules/job-queue/job-model-update-queue.ts +++ b/packages/backend/jobs-manager/service/src/modules/queues/job-model-update-queue/job-model-update-queue.ts @@ -1,4 +1,4 @@ -import { CustomJobsDocument } from '../database/custom-jobs/custom-jobs.model'; +import { CustomJobsDocument } from '../../database/custom-jobs/custom-jobs.model'; export abstract class JobModelUpdateQueue { public abstract publish( diff --git a/packages/backend/jobs-manager/service/src/modules/job-queue/kafka-job-model-update-queue.ts b/packages/backend/jobs-manager/service/src/modules/queues/job-model-update-queue/kafka-job-model-update-queue.ts similarity index 93% rename from packages/backend/jobs-manager/service/src/modules/job-queue/kafka-job-model-update-queue.ts rename to packages/backend/jobs-manager/service/src/modules/queues/job-model-update-queue/kafka-job-model-update-queue.ts index c02d3a1f..dbf94724 100644 --- a/packages/backend/jobs-manager/service/src/modules/job-queue/kafka-job-model-update-queue.ts +++ b/packages/backend/jobs-manager/service/src/modules/queues/job-model-update-queue/kafka-job-model-update-queue.ts @@ -1,10 +1,10 @@ import { Injectable, Logger } from '@nestjs/common'; import { Message, Producer } from 'kafkajs'; -import { orchestratorConstants } from '../auth/constants'; +import { orchestratorConstants } from '../../auth/constants'; import { CustomJobEntry, CustomJobsDocument, -} from '../database/custom-jobs/custom-jobs.model'; +} from '../../database/custom-jobs/custom-jobs.model'; import { JobModelUpdateQueue } from './job-model-update-queue'; export interface KafkaJobModelUpdate extends CustomJobEntry { diff --git a/packages/backend/jobs-manager/service/src/modules/job-queue/null-job-model-update-queue.ts b/packages/backend/jobs-manager/service/src/modules/queues/job-model-update-queue/null-job-model-update-queue.ts similarity index 83% rename from packages/backend/jobs-manager/service/src/modules/job-queue/null-job-model-update-queue.ts rename to packages/backend/jobs-manager/service/src/modules/queues/job-model-update-queue/null-job-model-update-queue.ts index 0f2d4eb5..5d137648 100644 --- a/packages/backend/jobs-manager/service/src/modules/job-queue/null-job-model-update-queue.ts +++ b/packages/backend/jobs-manager/service/src/modules/queues/job-model-update-queue/null-job-model-update-queue.ts @@ -1,5 +1,5 @@ import { Injectable, Logger } from '@nestjs/common'; -import { CustomJobEntry } from '../database/custom-jobs/custom-jobs.model'; +import { CustomJobEntry } from '../../database/custom-jobs/custom-jobs.model'; import { JobModelUpdateQueue } from './job-model-update-queue'; @Injectable() diff --git a/packages/backend/jobs-manager/service/src/modules/job-queue/job-queue.ts b/packages/backend/jobs-manager/service/src/modules/queues/job-queue/job-queue.ts similarity index 100% rename from packages/backend/jobs-manager/service/src/modules/job-queue/job-queue.ts rename to packages/backend/jobs-manager/service/src/modules/queues/job-queue/job-queue.ts diff --git a/packages/backend/jobs-manager/service/src/modules/job-queue/kafka-job-queue.ts b/packages/backend/jobs-manager/service/src/modules/queues/job-queue/kafka-job-queue.ts similarity index 90% rename from packages/backend/jobs-manager/service/src/modules/job-queue/kafka-job-queue.ts rename to packages/backend/jobs-manager/service/src/modules/queues/job-queue/kafka-job-queue.ts index 5b1dbf6a..2ca6d52a 100644 --- a/packages/backend/jobs-manager/service/src/modules/job-queue/kafka-job-queue.ts +++ b/packages/backend/jobs-manager/service/src/modules/queues/job-queue/kafka-job-queue.ts @@ -1,6 +1,6 @@ import { Injectable, Logger } from '@nestjs/common'; import { Producer } from 'kafkajs'; -import { orchestratorConstants } from '../auth/constants'; +import { orchestratorConstants } from '../../auth/constants'; import { JobQueue } from './job-queue'; @Injectable() diff --git a/packages/backend/jobs-manager/service/src/modules/job-queue/null-job-queue.ts b/packages/backend/jobs-manager/service/src/modules/queues/job-queue/null-job-queue.ts similarity index 100% rename from packages/backend/jobs-manager/service/src/modules/job-queue/null-job-queue.ts rename to packages/backend/jobs-manager/service/src/modules/queues/job-queue/null-job-queue.ts diff --git a/packages/backend/jobs-manager/service/src/modules/job-queue/queue.module.ts b/packages/backend/jobs-manager/service/src/modules/queues/queue.module.ts similarity index 60% rename from packages/backend/jobs-manager/service/src/modules/job-queue/queue.module.ts rename to packages/backend/jobs-manager/service/src/modules/queues/queue.module.ts index a64abc3f..dcedd687 100644 --- a/packages/backend/jobs-manager/service/src/modules/job-queue/queue.module.ts +++ b/packages/backend/jobs-manager/service/src/modules/queues/queue.module.ts @@ -3,15 +3,18 @@ import { Kafka, KafkaConfig } from 'kafkajs'; import { readFileSync } from 'node:fs'; import { isTest } from '../app.constants'; import { orchestratorConstants } from '../auth/constants'; -import { FindingsQueue } from './findings-queue'; -import { JobModelUpdateQueue } from './job-model-update-queue'; -import { JobQueue } from './job-queue'; -import { KafkaFindingsQueue } from './kafka-findings-queue'; -import { KafkaJobModelUpdateQueue } from './kafka-job-model-update-queue'; -import { KafkaJobQueue } from './kafka-job-queue'; -import { NullFindingsQueue } from './null-findings-queue'; -import { NullJobModelUpdateQueue } from './null-job-model-update-queue'; -import { NullJobQueue } from './null-job-queue'; +import { FindingsQueue } from './finding-queue/findings-queue'; +import { KafkaFindingsQueue } from './finding-queue/kafka-findings-queue'; +import { NullFindingsQueue } from './finding-queue/null-findings-queue'; +import { JobManagementQueue } from './job-management-queue/job-management-queue'; +import { KafkaJobManagementQueue } from './job-management-queue/kafka-job-management-queue'; +import { NullJobManagementQueue } from './job-management-queue/null-job-management-queue'; +import { JobModelUpdateQueue } from './job-model-update-queue/job-model-update-queue'; +import { KafkaJobModelUpdateQueue } from './job-model-update-queue/kafka-job-model-update-queue'; +import { NullJobModelUpdateQueue } from './job-model-update-queue/null-job-model-update-queue'; +import { JobQueue } from './job-queue/job-queue'; +import { KafkaJobQueue } from './job-queue/kafka-job-queue'; +import { NullJobQueue } from './job-queue/null-job-queue'; const certFolder = isTest() && process.env.TEST_TYPE === 'unit' ? './' : '/certs'; @@ -82,8 +85,21 @@ export const kafkaConfig: KafkaConfig = { return new KafkaJobModelUpdateQueue(producer); }, }, + { + provide: JobManagementQueue, + useFactory: async () => { + if (isTest()) return new NullJobManagementQueue(); + + const kafka = new Kafka(kafkaConfig); + + const producer = kafka.producer(); + await producer.connect(); + + return new KafkaJobManagementQueue(producer); + }, + }, ], - exports: [JobQueue, FindingsQueue, JobModelUpdateQueue], + exports: [JobQueue, FindingsQueue, JobModelUpdateQueue, JobManagementQueue], }) export class QueueModule { public constructor() {} diff --git a/packages/backend/orchestrator/service/Orchestrator/Constants.cs b/packages/backend/orchestrator/service/Orchestrator/Constants.cs index f9e095dd..bd651991 100644 --- a/packages/backend/orchestrator/service/Orchestrator/Constants.cs +++ b/packages/backend/orchestrator/service/Orchestrator/Constants.cs @@ -6,4 +6,5 @@ public static class Constants public static readonly string JobFindingsTopic = "stalker.jobs.findings"; public static readonly string JobLogsTopic = "stalker.jobs.logs"; public static readonly string JobModelsTopic = "stalker.jobs.models"; + public static readonly string JobManagementTopic = "stalker.jobs.management"; } \ No newline at end of file diff --git a/packages/backend/orchestrator/service/Orchestrator/Controllers/JobsController.cs b/packages/backend/orchestrator/service/Orchestrator/Controllers/JobsController.cs index e6429349..297d518e 100644 --- a/packages/backend/orchestrator/service/Orchestrator/Controllers/JobsController.cs +++ b/packages/backend/orchestrator/service/Orchestrator/Controllers/JobsController.cs @@ -8,7 +8,7 @@ namespace Orchestrator.Controllers { public class JobsController : Controller { - private IMessagesProducer EventsProducer { get; set; } + private JobEventsProducer EventsProducer { get; set; } private IMessagesProducer LogsProducer { get; set; } private IFindingsParser Parser { get; set; } @@ -35,7 +35,7 @@ private static bool IsValidJobId(string jobId) public JobsController(IMessagesProducer eventsProducer, IMessagesProducer jobLogsProducer, IFindingsParser parser) { - EventsProducer = eventsProducer; + EventsProducer = eventsProducer as JobEventsProducer; LogsProducer = jobLogsProducer; Parser = parser; } @@ -92,17 +92,12 @@ public async Task Status([FromBody] StatusUpdateDto dto, string id if (!acceptableStatuses.Contains(dto.Status)) { Console.WriteLine("bad status"); - return BadRequest("Status should be Success or Failed"); + return BadRequest("Status should be Success, Failed or Ended"); } if (!IsValidJobId(id)) return BadRequest("Job id is invalid"); - await EventsProducer.Produce(new JobEventMessage - { - JobId = id, - FindingsJson = $"{{ \"findings\": [{{ \"type\": \"JobStatusFinding\", \"status\": \"{dto.Status}\" }}]}}", - Timestamp = CurrentTimeMs, - }); + await EventsProducer.LogStatus(id, dto.Status); return Ok(); } diff --git a/packages/backend/orchestrator/service/Orchestrator/Events/JobEventsProducer.cs b/packages/backend/orchestrator/service/Orchestrator/Events/JobEventsProducer.cs index 9303938a..fe7a4075 100644 --- a/packages/backend/orchestrator/service/Orchestrator/Events/JobEventsProducer.cs +++ b/packages/backend/orchestrator/service/Orchestrator/Events/JobEventsProducer.cs @@ -1,4 +1,8 @@ -using Orchestrator.Queue; +using System.Collections.ObjectModel; +using System.Linq; +using Confluent.Kafka; +using k8s.Models; +using Orchestrator.Queue; namespace Orchestrator.Events; @@ -6,6 +10,54 @@ public class JobEventsProducer : MessagesProducer { protected override string Topic => Constants.JobFindingsTopic; + public enum JobStatus + { + Success, + Failed, + Ended, + Started + } + + private static readonly Dictionary JobStatusMapping = new Dictionary() { + { JobStatus.Success, "Success" }, + { JobStatus.Failed, "Failed" }, + { JobStatus.Ended, "Ended" }, + { JobStatus.Started, "Started" } + }; + + private static Dictionary InvertStatusMapping(Dictionary jobStatusMapping) + { + Dictionary dict = new(); + foreach (var kvp in jobStatusMapping) + { + dict[kvp.Value] = kvp.Key; + } + + return dict; + } + + private static readonly Dictionary JobStatusInvertedMapping = InvertStatusMapping(JobStatusMapping); + + public JobEventsProducer(IConfiguration config, ILogger> logger) : base(new JsonSerializer(), config, logger) { } + + + public async Task LogStatus(string jobId, JobStatus status) + { + await LogStatus(jobId, JobStatusMapping[status]); + } + + public async Task LogStatus(string jobId, string status) + { + if (!JobStatusInvertedMapping.ContainsKey(status)) + throw new ArgumentException("Invalid job status"); + + await Produce(new JobEventMessage + { + JobId = jobId, + FindingsJson = $"{{ \"findings\": [{{ \"type\": \"JobStatusFinding\", \"status\": \"{status}\" }}]}}", + Timestamp = TimeUtils.CurrentTimeMs(), + }); + } } \ No newline at end of file diff --git a/packages/backend/orchestrator/service/Orchestrator/Events/JobLogsProducer.cs b/packages/backend/orchestrator/service/Orchestrator/Events/JobLogsProducer.cs index a4a349ab..1ad7300a 100644 --- a/packages/backend/orchestrator/service/Orchestrator/Events/JobLogsProducer.cs +++ b/packages/backend/orchestrator/service/Orchestrator/Events/JobLogsProducer.cs @@ -1,4 +1,5 @@ -using Orchestrator.Queue; +using Confluent.Kafka; +using Orchestrator.Queue; namespace Orchestrator.Events; diff --git a/packages/backend/orchestrator/service/Orchestrator/Jobs/Commands/NucleiCustomJobCommand.cs b/packages/backend/orchestrator/service/Orchestrator/Jobs/Commands/NucleiCustomJobCommand.cs index b8b48609..acce9ecf 100644 --- a/packages/backend/orchestrator/service/Orchestrator/Jobs/Commands/NucleiCustomJobCommand.cs +++ b/packages/backend/orchestrator/service/Orchestrator/Jobs/Commands/NucleiCustomJobCommand.cs @@ -7,11 +7,11 @@ namespace Orchestrator.Jobs.Commands; -public class NucleiCustomJobCommand : KubernetesCommand +public class NucleiCustomJobCommand : KubernetesJobCommand { protected override KubernetesJobTemplate JobTemplate { get; } - public NucleiCustomJobCommand(CustomJobRequest request, JobModel model, IKubernetesFacade kubernetes, IMessagesProducer eventsProducer, JobLogsProducer jobLogsProducer, IFindingsParser parser, ILogger logger, IConfiguration config) + public NucleiCustomJobCommand(CustomJobRequest request, JobModel model, IKubernetesFacade kubernetes, JobEventsProducer eventsProducer, JobLogsProducer jobLogsProducer, IFindingsParser parser, ILogger logger, IConfiguration config) : base(request, kubernetes, eventsProducer, jobLogsProducer, parser, logger) { JobTemplate = new NucleiCustomJobTemplate(request.JobId, config, request.CustomJobParameters, model.Code, request.JobPodMilliCpuLimit, request.JobPodMemoryKbLimit, model.FindingHandler, model.Image); diff --git a/packages/backend/orchestrator/service/Orchestrator/Jobs/Commands/PythonCustomJobCommand.cs b/packages/backend/orchestrator/service/Orchestrator/Jobs/Commands/PythonCustomJobCommand.cs index 05ca35a7..fbde5c22 100644 --- a/packages/backend/orchestrator/service/Orchestrator/Jobs/Commands/PythonCustomJobCommand.cs +++ b/packages/backend/orchestrator/service/Orchestrator/Jobs/Commands/PythonCustomJobCommand.cs @@ -7,11 +7,11 @@ namespace Orchestrator.Jobs.Commands; -public class PythonCustomJobCommand : KubernetesCommand +public class PythonCustomJobCommand : KubernetesJobCommand { protected override KubernetesJobTemplate JobTemplate { get; } - public PythonCustomJobCommand(CustomJobRequest request, JobModel model, IKubernetesFacade kubernetes, IMessagesProducer eventsProducer, JobLogsProducer jobLogsProducer, IFindingsParser parser, ILogger logger, IConfiguration config) + public PythonCustomJobCommand(CustomJobRequest request, JobModel model, IKubernetesFacade kubernetes, JobEventsProducer eventsProducer, JobLogsProducer jobLogsProducer, IFindingsParser parser, ILogger logger, IConfiguration config) : base(request, kubernetes, eventsProducer, jobLogsProducer, parser, logger) { JobTemplate = new PythonCustomJobTemplate(request.JobId, config, request.CustomJobParameters, model.Code, request.JobPodMilliCpuLimit, request.JobPodMemoryKbLimit, model.Image); diff --git a/packages/backend/orchestrator/service/Orchestrator/Jobs/IJobFactory.cs b/packages/backend/orchestrator/service/Orchestrator/Jobs/IJobFactory.cs index 9ef71b23..34a2c239 100644 --- a/packages/backend/orchestrator/service/Orchestrator/Jobs/IJobFactory.cs +++ b/packages/backend/orchestrator/service/Orchestrator/Jobs/IJobFactory.cs @@ -1,8 +1,10 @@ -using Orchestrator.Queue.JobsConsumer; +using Orchestrator.Queue.JobManagementConsumer; +using Orchestrator.Queue.JobsConsumer; namespace Orchestrator.Jobs; public interface IJobFactory { JobCommand Create(JobRequest request); + JobCommand Create(JobManagementRequest request); } \ No newline at end of file diff --git a/packages/backend/orchestrator/service/Orchestrator/Jobs/JobFactory.cs b/packages/backend/orchestrator/service/Orchestrator/Jobs/JobFactory.cs index 0dac4fd7..64cb75e8 100644 --- a/packages/backend/orchestrator/service/Orchestrator/Jobs/JobFactory.cs +++ b/packages/backend/orchestrator/service/Orchestrator/Jobs/JobFactory.cs @@ -1,9 +1,12 @@ using Microsoft.IdentityModel.Tokens; using Orchestrator.Events; using Orchestrator.Jobs.Commands; +using Orchestrator.Jobs.JobManagementCommand; using Orchestrator.Jobs.JobModelCache; using Orchestrator.K8s; using Orchestrator.Queue; +using Orchestrator.Queue.JobManagementConsumer; +using Orchestrator.Queue.JobManagementConsumer.JobManagementRequests; using Orchestrator.Queue.JobsConsumer; using Orchestrator.Queue.JobsConsumer.JobRequests; using Orchestrator.Utils; @@ -14,7 +17,7 @@ namespace Orchestrator.Jobs; public class JobFactory : IJobFactory { private IKubernetesFacade Kubernetes { get; } - private IMessagesProducer EventsProducer { get; } + private JobEventsProducer EventsProducer { get; } private JobLogsProducer JobLogsProducer { get; } private IFindingsParser Parser { get; } private ILoggerFactory LoggerFactory { get; } @@ -25,7 +28,7 @@ public JobFactory(IKubernetesFacade kubernetes, IMessagesProducer(); @@ -43,6 +46,17 @@ public JobCommand Create(JobRequest request) }; } + public JobCommand Create(JobManagementRequest request) + { + Logger.LogInformation(JsonSerializer.Serialize(request)); + + return request switch + { + TerminateJobRequest managementRequest => new TerminateJobCommand(managementRequest, Kubernetes, EventsProducer, JobLogsProducer, Parser, LoggerFactory.CreateLogger(), Config), + _ => throw new InvalidOperationException(), + }; + } + private JobCommand CreateCustomJobCommand(CustomJobRequest request) { if (request.JobModelId == null) throw new InvalidOperationException(); diff --git a/packages/backend/orchestrator/service/Orchestrator/Jobs/JobManagementCommands/TerminateJobCommand.cs b/packages/backend/orchestrator/service/Orchestrator/Jobs/JobManagementCommands/TerminateJobCommand.cs new file mode 100644 index 00000000..ef5e6cbe --- /dev/null +++ b/packages/backend/orchestrator/service/Orchestrator/Jobs/JobManagementCommands/TerminateJobCommand.cs @@ -0,0 +1,28 @@ +using k8s; +using Orchestrator.Events; +using Orchestrator.K8s; +using Orchestrator.Queue; +using Orchestrator.Queue.JobManagementConsumer.JobManagementRequests; + +namespace Orchestrator.Jobs.JobManagementCommand; + +public class TerminateJobCommand : KubernetesManagementCommand +{ + public TerminateJobCommand(TerminateJobRequest request, IKubernetesFacade kubernetes, JobEventsProducer eventsProducer, JobLogsProducer jobLogsProducer, IFindingsParser parser, ILogger logger, IConfiguration config) + : base(request, kubernetes, eventsProducer, jobLogsProducer, parser, logger, config) + { } + + public async override Task Execute() + { + bool terminationSuccess = await Kubernetes.TerminateJob(Request.JobId, Namespace); + if (terminationSuccess) + { + LogsProducer.LogDebug(Request.JobId, $"Job terminated"); + } + else + { + LogsProducer.LogDebug(Request.JobId, "Tried to terminate, but no corresponding job was runnning"); + } + _ = EventsProducer.LogStatus(Request.JobId, JobEventsProducer.JobStatus.Ended); + } +} \ No newline at end of file diff --git a/packages/backend/orchestrator/service/Orchestrator/Jobs/KubernetesCommand.cs b/packages/backend/orchestrator/service/Orchestrator/Jobs/KubernetesJobCommand.cs similarity index 59% rename from packages/backend/orchestrator/service/Orchestrator/Jobs/KubernetesCommand.cs rename to packages/backend/orchestrator/service/Orchestrator/Jobs/KubernetesJobCommand.cs index 316efbc7..887754c3 100644 --- a/packages/backend/orchestrator/service/Orchestrator/Jobs/KubernetesCommand.cs +++ b/packages/backend/orchestrator/service/Orchestrator/Jobs/KubernetesJobCommand.cs @@ -5,10 +5,10 @@ namespace Orchestrator.Jobs; -public abstract class KubernetesCommand : JobCommand where T : JobRequest +public abstract class KubernetesJobCommand : JobCommand where T : JobRequest { private IKubernetesFacade Kubernetes { get; } - private IMessagesProducer EventsProducer { get; } + private JobEventsProducer EventsProducer { get; } private JobLogsProducer LogsProducer { get; } private IFindingsParser Parser { get; } private ILogger Logger { get; } @@ -16,7 +16,7 @@ public abstract class KubernetesCommand : JobCommand where T : JobRequest protected abstract KubernetesJobTemplate JobTemplate { get; } - protected KubernetesCommand(T request, IKubernetesFacade kubernetes, IMessagesProducer eventsProducer, JobLogsProducer jobLogsProducer, IFindingsParser parser, ILogger logger) + protected KubernetesJobCommand(T request, IKubernetesFacade kubernetes, JobEventsProducer eventsProducer, JobLogsProducer jobLogsProducer, IFindingsParser parser, ILogger logger) { Request = request; Kubernetes = kubernetes; @@ -34,11 +34,6 @@ public override async Task Execute() LogsProducer.LogDebug(Request.JobId, "Job picked up by orchestrator."); Logger.LogInformation(Request.JobId, "Job created, listening for events."); - await EventsProducer.Produce(new JobEventMessage - { - JobId = Request.JobId, - FindingsJson = "{ \"findings\": [{ \"type\": \"JobStatusFinding\", \"status\": \"Started\" }]}", - Timestamp = TimeUtils.CurrentTimeMs(), - }); + await EventsProducer.LogStatus(Request.JobId, JobEventsProducer.JobStatus.Started); } } \ No newline at end of file diff --git a/packages/backend/orchestrator/service/Orchestrator/Jobs/KubernetesManagementCommand.cs b/packages/backend/orchestrator/service/Orchestrator/Jobs/KubernetesManagementCommand.cs new file mode 100644 index 00000000..e5e77946 --- /dev/null +++ b/packages/backend/orchestrator/service/Orchestrator/Jobs/KubernetesManagementCommand.cs @@ -0,0 +1,39 @@ +using Confluent.Kafka; +using Orchestrator.Events; +using Orchestrator.K8s; +using Orchestrator.Queue; +using Orchestrator.Queue.JobManagementConsumer; +using Orchestrator.Queue.JobsConsumer; + +namespace Orchestrator.Jobs; + +public abstract class KubernetesManagementCommand : JobCommand where T : JobManagementRequest +{ + protected IKubernetesFacade Kubernetes { get; } + protected JobEventsProducer EventsProducer { get; } + protected JobLogsProducer LogsProducer { get; } + private IFindingsParser Parser { get; } + private ILogger Logger { get; } + protected T Request { get; } + public string Namespace { get; set; } = "default"; + protected IConfiguration Config { get; init; } + + protected KubernetesManagementCommand(T request, IKubernetesFacade kubernetes, JobEventsProducer eventsProducer, JobLogsProducer jobLogsProducer, IFindingsParser parser, ILogger logger, IConfiguration config) + { + Request = request; + Kubernetes = kubernetes; + EventsProducer = eventsProducer; + LogsProducer = jobLogsProducer; + Parser = parser; + Logger = logger; + Config = config; + SetNamespace(); + } + + private void SetNamespace() + { + string? k8sNamespace = Config.GetSection("Jobs").GetValue("Namespace"); + if (k8sNamespace == null) throw new NullReferenceException("Setting Jobs Namespace is missing."); + Namespace = k8sNamespace; + } +} \ No newline at end of file diff --git a/packages/backend/orchestrator/service/Orchestrator/K8s/IKubernetesFacade.cs b/packages/backend/orchestrator/service/Orchestrator/K8s/IKubernetesFacade.cs index c9809206..5e040f49 100644 --- a/packages/backend/orchestrator/service/Orchestrator/K8s/IKubernetesFacade.cs +++ b/packages/backend/orchestrator/service/Orchestrator/K8s/IKubernetesFacade.cs @@ -1,4 +1,6 @@ -namespace Orchestrator.K8s; +using k8s.Models; + +namespace Orchestrator.K8s; public interface IKubernetesFacade { @@ -8,6 +10,12 @@ public interface IKubernetesFacade /// Task CreateJob(KubernetesJobTemplate jobTemplate); + /// + /// Creates a jobTemplate. + /// + /// + Task TerminateJob(string jobId, string jobNamespace = "default"); + /// /// True if the pod is in the status "Failed" or "Succeeded", false otherwise /// diff --git a/packages/backend/orchestrator/service/Orchestrator/K8s/KubernetesFacade.cs b/packages/backend/orchestrator/service/Orchestrator/K8s/KubernetesFacade.cs index 301fb072..66921a7d 100644 --- a/packages/backend/orchestrator/service/Orchestrator/K8s/KubernetesFacade.cs +++ b/packages/backend/orchestrator/service/Orchestrator/K8s/KubernetesFacade.cs @@ -1,5 +1,7 @@ using k8s; using k8s.Models; +using Orchestrator.Events; +using Orchestrator.Queue; namespace Orchestrator.K8s; @@ -59,7 +61,8 @@ public async Task CreateJob(KubernetesJobTemplate jobTemplate) Name = jobName, Labels = new Dictionary() { - ["red-kite.io/component"] = "job" + ["red-kite.io/component"] = "job", + ["red-kite.io/jobid"] = jobTemplate.Id } }, new V1JobSpec @@ -70,7 +73,8 @@ public async Task CreateJob(KubernetesJobTemplate jobTemplate) { Labels = new Dictionary() { - ["red-kite.io/component"] = "job" + ["red-kite.io/component"] = "job", + ["red-kite.io/jobid"] = jobTemplate.Id } }, Spec = new V1PodSpec @@ -88,12 +92,13 @@ public async Task CreateJob(KubernetesJobTemplate jobTemplate) }, NodeSelector = nodeSelector, RestartPolicy = "Never", - TerminationGracePeriodSeconds = 100 + TerminationGracePeriodSeconds = 100, + }, }, BackoffLimit = jobTemplate.MaxRetries, ActiveDeadlineSeconds = jobTemplate.Timeout, - TtlSecondsAfterFinished = 1 + TtlSecondsAfterFinished = 1, }); Logger.LogInformation($"Creating job {jobName} in namespace {jobTemplate.Namespace}"); @@ -122,6 +127,30 @@ public async Task IsJobPodFinished(string jobName, string jobNamespace = " return RetryableCall(() => pods.Items.FirstOrDefault()?.Status?.Phase == "Succeeded" || pods.Items.FirstOrDefault()?.Status?.Phase == "Failed"); } + /// + /// Terminates a job. The pod will be deleted after its termination grace period. + /// + /// + /// + /// true if the job was running and terminated, false if it was not running. + public async Task TerminateJob(string jobId, string jobNamespace = "default") + { + using var client = new Kubernetes(KubernetesConfiguration); + string labelSelector = "red-kite.io/jobid=" + jobId; + var jobs = RetryableCall(() => client.ListNamespacedJob(jobNamespace, labelSelector: labelSelector)); + var runningJob = jobs.Items.FirstOrDefault(job => job.Status.Active > 0); + + if (runningJob != null) + { + var options = new V1DeleteOptions(gracePeriodSeconds: 0, propagationPolicy: "Foreground"); + + await RetryableCall(() => client.DeleteNamespacedJobAsync(runningJob.Metadata.Name, jobNamespace, options)); + return true; + } + + return false; + } + /// /// Allows to query a namespace for its memory and CPU limit and usage /// @@ -167,9 +196,6 @@ private static T RetryableCall(Func call, int maxRetry = 10) catch (k8s.Autorest.HttpOperationException e) { exceptions.Add(e); - } - finally - { RandomWait(); retryCount++; } diff --git a/packages/backend/orchestrator/service/Orchestrator/Program.cs b/packages/backend/orchestrator/service/Orchestrator/Program.cs index 409254d2..008df058 100644 --- a/packages/backend/orchestrator/service/Orchestrator/Program.cs +++ b/packages/backend/orchestrator/service/Orchestrator/Program.cs @@ -39,6 +39,7 @@ new TopicSpecification { Name = Constants.JobRequestsTopic, }, new TopicSpecification { Name = Constants.JobFindingsTopic, }, new TopicSpecification { Name = Constants.JobModelsTopic, }, + new TopicSpecification { Name = Constants.JobManagementTopic, }, }; try @@ -70,6 +71,7 @@ // Start consumer app.Services.GetService(); app.Services.GetService(); +app.Services.GetService(); app.MapGet("/version", () => "V1"); app.MapFallback(() => "V1"); @@ -91,6 +93,7 @@ void ConfigureServices(IServiceCollection services) .AddResponseCompression() .AddSingleton() .AddSingleton() + .AddSingleton() .AddSingleton, JobEventsProducer>() .AddSingleton, JobLogsProducer>() .AddTransient() diff --git a/packages/backend/orchestrator/service/Orchestrator/Queue/JobManagementConsumer/JobManagementConsumer.cs b/packages/backend/orchestrator/service/Orchestrator/Queue/JobManagementConsumer/JobManagementConsumer.cs new file mode 100644 index 00000000..e283f661 --- /dev/null +++ b/packages/backend/orchestrator/service/Orchestrator/Queue/JobManagementConsumer/JobManagementConsumer.cs @@ -0,0 +1,71 @@ +using Confluent.Kafka; +using k8s.Models; +using Orchestrator.Jobs; +using Orchestrator.K8s; +using Orchestrator.Queue.JobManagementConsumer; +using Orchestrator.Queue.JobModelsConsumer; +using YamlDotNet.Core.Tokens; +using YamlDotNet.Serialization; + +namespace Orchestrator.Queue.JobsConsumer; + +public class JobManagementConsumer : KafkaConsumer +{ + + protected override string GroupId => "stalker"; + + protected override string[] Topics => new[] { Constants.JobManagementTopic }; + private string? K8sNamespace = ""; + private IJobFactory JobFactory { get; } + + public JobManagementConsumer(IJobFactory jobFactory, IConfiguration config, ILogger logger) + : base(config, new JobManagementSerializer(), logger) + { + K8sNamespace = config.GetSection("Jobs").GetValue("Namespace"); + JobFactory = jobFactory; + } + + protected override CancellationToken SetupConsumer(ConsumerConfig consumerConfig, IDeserializer deserializer) + { + Consumer = new ConsumerBuilder(consumerConfig) + .SetValueDeserializer(deserializer) + .Build(); + + Logger.LogDebug($"Starting consumer. Subscribing to topics {string.Join(",", Topics)}"); + Consumer.Subscribe(Topics); + + foreach (var topic in Topics) + { + Consumer.Assign(new TopicPartitionOffset(new TopicPartition(topic, new Partition(0)), Offset.End)); + } + + TokenSource = new CancellationTokenSource(); + CancellationToken ct = TokenSource.Token; + + return ct; + } + + protected override Task Consume(JobManagementRequest request) + { + + +#pragma warning disable CS4014 + HandleRequest(request); +#pragma warning restore CS4014 + + return Task.CompletedTask; + } + + private async Task HandleRequest(JobManagementRequest request) + { + try + { + var jobCommand = JobFactory.Create(request); + await jobCommand.Execute(); + } + catch (Exception ex) + { + Logger.LogError(ex, $"An error occurred while handling job mangement request for job {request.JobId}."); + } + } +} \ No newline at end of file diff --git a/packages/backend/orchestrator/service/Orchestrator/Queue/JobManagementConsumer/JobManagementRequest.cs b/packages/backend/orchestrator/service/Orchestrator/Queue/JobManagementConsumer/JobManagementRequest.cs new file mode 100644 index 00000000..ab2ce00f --- /dev/null +++ b/packages/backend/orchestrator/service/Orchestrator/Queue/JobManagementConsumer/JobManagementRequest.cs @@ -0,0 +1,8 @@ +namespace Orchestrator.Queue.JobManagementConsumer; + +public abstract class JobManagementRequest +{ + public abstract string Task { get; } + + public required string JobId { get; init; } +} \ No newline at end of file diff --git a/packages/backend/orchestrator/service/Orchestrator/Queue/JobManagementConsumer/JobManagementRequests/TerminateJobRequest.cs b/packages/backend/orchestrator/service/Orchestrator/Queue/JobManagementConsumer/JobManagementRequests/TerminateJobRequest.cs new file mode 100644 index 00000000..3f08fa20 --- /dev/null +++ b/packages/backend/orchestrator/service/Orchestrator/Queue/JobManagementConsumer/JobManagementRequests/TerminateJobRequest.cs @@ -0,0 +1,8 @@ +namespace Orchestrator.Queue.JobManagementConsumer.JobManagementRequests; + +public class TerminateJobRequest : JobManagementRequest +{ + public static readonly string Discriminator = "TerminateJob"; + + public override string Task => Discriminator; +} \ No newline at end of file diff --git a/packages/backend/orchestrator/service/Orchestrator/Queue/JobManagementConsumer/JobManagementSerializer.cs b/packages/backend/orchestrator/service/Orchestrator/Queue/JobManagementConsumer/JobManagementSerializer.cs new file mode 100644 index 00000000..ea3b6ce9 --- /dev/null +++ b/packages/backend/orchestrator/service/Orchestrator/Queue/JobManagementConsumer/JobManagementSerializer.cs @@ -0,0 +1,56 @@ +using Confluent.Kafka; +using System.Text.Json.Nodes; +using System.Text.Json; +using Orchestrator.Queue.JobsConsumer.JobRequests; +using Orchestrator.Queue.JobManagementConsumer.JobManagementRequests; + +namespace Orchestrator.Queue.JobManagementConsumer +{ + public class JobManagementSerializer : ISerializer, IDeserializer where T : JobManagementRequest + { + private JsonSerializerOptions Options => new() + { + PropertyNamingPolicy = JsonNamingPolicy.CamelCase + }; + + public T Deserialize(ReadOnlySpan data, bool isNull, SerializationContext context) + { + try + { + if (isNull) throw new InvalidOperationException(); + + var generic = JsonNode.Parse(data.ToArray()); + if (generic == null) throw new InvalidOperationException(); + + var type = generic["Task"]?.GetValue() ?? generic["task"]?.GetValue(); + return type switch + { + "TerminateJob" => JsonSerializer.Deserialize(data.ToArray(), Options) as T, + _ => default + } ?? throw new InvalidOperationException(); + } + catch (JsonException) + { + // This part of the code prevents that the orchestrator breaks in case of bad serialization + // It should be improved with better logging, possibly requeuing the messages with errors, etc. + // Documented here: https://github.com/red-kite-solutions/stalker/issues/218 + Console.WriteLine("Error while deserializing a job management message. Invalid message on queue. Ignoring..."); + return null; + } + } + + public byte[] Serialize(T data, SerializationContext context) + { + using var ms = new MemoryStream(); + + string jsonString = JsonSerializer.Serialize(data, Options); + var writer = new StreamWriter(ms); + + writer.Write(jsonString); + writer.Flush(); + ms.Position = 0; + + return ms.ToArray(); + } + } +} \ No newline at end of file diff --git a/packages/backend/orchestrator/service/orchestrator-service-account.yml b/packages/backend/orchestrator/service/orchestrator-service-account.yml index 3ee46c2c..495943df 100644 --- a/packages/backend/orchestrator/service/orchestrator-service-account.yml +++ b/packages/backend/orchestrator/service/orchestrator-service-account.yml @@ -12,6 +12,7 @@ rules: - "create" - "delete" - "deletecollection" + - "list" - apiGroups: - "" resources: diff --git a/packages/frontend/stalker-app/src/app/api/jobs/job-executions/job-executions.service.ts b/packages/frontend/stalker-app/src/app/api/jobs/job-executions/job-executions.service.ts index a126807e..9dbccdff 100644 --- a/packages/frontend/stalker-app/src/app/api/jobs/job-executions/job-executions.service.ts +++ b/packages/frontend/stalker-app/src/app/api/jobs/job-executions/job-executions.service.ts @@ -93,6 +93,13 @@ export class JobExecutionsService { projectId: projectId, }; - return await firstValueFrom(this.http.post(`${environment.fmUrl}/jobs/`, data)); + return await firstValueFrom(this.http.post(`${environment.fmUrl}/jobs/`, data)); + } + + public async stopJob(jobId: string) { + const data = { + task: 'TerminateJob', + }; + return await firstValueFrom(this.http.patch(`${environment.fmUrl}/jobs/${jobId}`, data)); } } diff --git a/packages/frontend/stalker-app/src/app/modules/domains/list-domains/list-domains.component.ts b/packages/frontend/stalker-app/src/app/modules/domains/list-domains/list-domains.component.ts index e3a5007d..52f21cd9 100644 --- a/packages/frontend/stalker-app/src/app/modules/domains/list-domains/list-domains.component.ts +++ b/packages/frontend/stalker-app/src/app/modules/domains/list-domains/list-domains.component.ts @@ -26,10 +26,8 @@ import { HttpStatus } from '../../../shared/types/http-status.type'; import { Page } from '../../../shared/types/page.type'; import { ProjectSummary } from '../../../shared/types/project/project.summary'; import { Tag } from '../../../shared/types/tag.type'; -import { - ElementMenuItems, - FilteredPaginatedTableComponent, -} from '../../../shared/widget/filtered-paginated-table/filtered-paginated-table.component'; +import { ElementMenuItems } from '../../../shared/widget/dynamic-icons/menu-icon.component'; +import { FilteredPaginatedTableComponent } from '../../../shared/widget/filtered-paginated-table/filtered-paginated-table.component'; import { TABLE_FILTERS_SOURCE_INITAL_FILTERS, TableFilters, diff --git a/packages/frontend/stalker-app/src/app/modules/hosts/list-hosts/list-hosts.component.ts b/packages/frontend/stalker-app/src/app/modules/hosts/list-hosts/list-hosts.component.ts index e2c4ceca..59c2e214 100644 --- a/packages/frontend/stalker-app/src/app/modules/hosts/list-hosts/list-hosts.component.ts +++ b/packages/frontend/stalker-app/src/app/modules/hosts/list-hosts/list-hosts.component.ts @@ -26,10 +26,8 @@ import { HttpStatus } from '../../../shared/types/http-status.type'; import { Page } from '../../../shared/types/page.type'; import { ProjectSummary } from '../../../shared/types/project/project.summary'; import { Tag } from '../../../shared/types/tag.type'; -import { - ElementMenuItems, - FilteredPaginatedTableComponent, -} from '../../../shared/widget/filtered-paginated-table/filtered-paginated-table.component'; +import { ElementMenuItems } from '../../../shared/widget/dynamic-icons/menu-icon.component'; +import { FilteredPaginatedTableComponent } from '../../../shared/widget/filtered-paginated-table/filtered-paginated-table.component'; import { TABLE_FILTERS_SOURCE_INITAL_FILTERS, TableFilters, diff --git a/packages/frontend/stalker-app/src/app/modules/jobs/custom-jobs/list-custom-jobs.component.ts b/packages/frontend/stalker-app/src/app/modules/jobs/custom-jobs/list-custom-jobs.component.ts index 75b6441d..b2bde14d 100644 --- a/packages/frontend/stalker-app/src/app/modules/jobs/custom-jobs/list-custom-jobs.component.ts +++ b/packages/frontend/stalker-app/src/app/modules/jobs/custom-jobs/list-custom-jobs.component.ts @@ -13,15 +13,13 @@ import { MatTooltipModule } from '@angular/material/tooltip'; import { Title } from '@angular/platform-browser'; import { Router, RouterModule } from '@angular/router'; import { BehaviorSubject, combineLatest, map, shareReplay, switchMap, tap } from 'rxjs'; +import { AuthService } from '../../../api/auth/auth.service'; +import { CustomJobTemplatesService } from '../../../api/jobs/custom-job-templates/custom-job-templates.service'; import { JobsService } from '../../../api/jobs/jobs/jobs.service'; import { AvatarComponent } from '../../../shared/components/avatar/avatar.component'; import { CustomJob } from '../../../shared/types/jobs/custom-job.type'; -import { - ElementMenuItems, - FilteredPaginatedTableComponent, -} from '../../../shared/widget/filtered-paginated-table/filtered-paginated-table.component'; -import { AuthService } from '../../../api/auth/auth.service'; -import { CustomJobTemplatesService } from '../../../api/jobs/custom-job-templates/custom-job-templates.service'; +import { ElementMenuItems } from '../../../shared/widget/dynamic-icons/menu-icon.component'; +import { FilteredPaginatedTableComponent } from '../../../shared/widget/filtered-paginated-table/filtered-paginated-table.component'; import { TABLE_FILTERS_SOURCE_INITAL_FILTERS, TableFilters, diff --git a/packages/frontend/stalker-app/src/app/modules/jobs/job-executions/job-execution-detail.component.html b/packages/frontend/stalker-app/src/app/modules/jobs/job-executions/job-execution-detail.component.html index 776efc2e..ac083413 100644 --- a/packages/frontend/stalker-app/src/app/modules/jobs/job-executions/job-execution-detail.component.html +++ b/packages/frontend/stalker-app/src/app/modules/jobs/job-executions/job-execution-detail.component.html @@ -1,7 +1,20 @@ - - +
+ + + + Stop +
@if (execution$ | async; as execution) { diff --git a/packages/frontend/stalker-app/src/app/modules/jobs/job-executions/job-execution-detail.component.ts b/packages/frontend/stalker-app/src/app/modules/jobs/job-executions/job-execution-detail.component.ts index 798280b4..700b2701 100644 --- a/packages/frontend/stalker-app/src/app/modules/jobs/job-executions/job-execution-detail.component.ts +++ b/packages/frontend/stalker-app/src/app/modules/jobs/job-executions/job-execution-detail.component.ts @@ -1,17 +1,19 @@ -import { Component } from '@angular/core'; +import { AfterViewInit, Component, ViewChild } from '@angular/core'; import { ActivatedRoute } from '@angular/router'; -import { map, Observable, shareReplay, Subject, switchMap } from 'rxjs'; -import { ThemeService } from '../../../services/theme.service'; +import { BehaviorSubject, firstValueFrom, map, Observable, shareReplay, Subject, switchMap, tap } from 'rxjs'; import { JobExecutionsService } from '../../../api/jobs/job-executions/job-executions.service'; import { ProjectsService } from '../../../api/projects/projects.service'; +import { ThemeService } from '../../../services/theme.service'; +import { JobLogsComponent } from '../../../shared/components/job-logs/job-logs.component'; import { CodeEditorTheme } from '../../../shared/widget/code-editor/code-editor.component'; +import { JobExecutionInteractionsService } from './job-execution-interactions.service'; @Component({ selector: 'app-job-execution-detail', templateUrl: 'job-execution-detail.component.html', styleUrls: ['./job-execution-detail.component.scss'], }) -export class JobExecutionDetailComponent { +export class JobExecutionDetailComponent implements AfterViewInit { public executionId$ = this.route.paramMap.pipe(map((x) => x.get('id'))); public theme$: Observable = this.themeService.theme$.pipe( map((theme) => (theme === 'dark' ? 'vs-dark' : 'vs')) @@ -21,6 +23,10 @@ export class JobExecutionDetailComponent { shareReplay(1) ); + @ViewChild(JobLogsComponent) public logs!: JobLogsComponent; + public jobIsStopping$ = new BehaviorSubject(false); + public jobIsRunning$!: Observable; + public projects$ = this.projectsService .getAllSummaries() .pipe(map((projects: any[]) => projects.map((c) => ({ id: c._id, name: c.name })))); @@ -31,6 +37,28 @@ export class JobExecutionDetailComponent { private jobService: JobExecutionsService, private projectsService: ProjectsService, private route: ActivatedRoute, - private themeService: ThemeService + private themeService: ThemeService, + private jobExecutionInteractionsService: JobExecutionInteractionsService ) {} + + ngAfterViewInit(): void { + this.jobIsRunning$ = this.logs.isJobInProgress$.pipe( + tap((inProgress) => { + if (!inProgress) { + this.jobIsStopping$.next(false); + } + }) + ); + } + + public async stopJob() { + const id = await firstValueFrom(this.executionId$); + if (!id) return; + + const result = await this.jobExecutionInteractionsService.stopJob(id); + + if (result) { + this.jobIsStopping$.next(true); + } + } } diff --git a/packages/frontend/stalker-app/src/app/modules/jobs/job-executions/job-execution-interactions.service.ts b/packages/frontend/stalker-app/src/app/modules/jobs/job-executions/job-execution-interactions.service.ts new file mode 100644 index 00000000..17834b01 --- /dev/null +++ b/packages/frontend/stalker-app/src/app/modules/jobs/job-executions/job-execution-interactions.service.ts @@ -0,0 +1,45 @@ +import { Injectable } from '@angular/core'; +import { MatDialog } from '@angular/material/dialog'; +import { ToastrService } from 'ngx-toastr'; +import { firstValueFrom } from 'rxjs'; +import { JobExecutionsService } from '../../../api/jobs/job-executions/job-executions.service'; +import { + ConfirmDialogComponent, + ConfirmDialogData, +} from '../../../shared/widget/confirm-dialog/confirm-dialog.component'; + +@Injectable({ providedIn: 'root' }) +export class JobExecutionInteractionsService { + constructor( + private jobExecutionService: JobExecutionsService, + private dialog: MatDialog, + private toastr: ToastrService + ) {} + + public async stopJob(jobId: string) { + let data: ConfirmDialogData; + + data = { + text: $localize`:Stopping execution|Warning before stopping job execution:Do you really want to stop the current job execution?`, + title: $localize`:Stopping execution title|Stopping the current job execution:Stopping execution`, + dangerButtonText: $localize`:Stop job|Stop the current running job:Stop job`, + enableCancelButton: true, + onDangerButtonClick: async (close) => { + await this.jobExecutionService.stopJob(jobId); + this.toastr.success( + $localize`:Sent termination signal|Confirms that the termination signal is sent:Sent termination signal` + ); + close(true); + }, + }; + + return await firstValueFrom( + this.dialog + .open(ConfirmDialogComponent, { + data, + restoreFocus: false, + }) + .afterClosed() + ); + } +} diff --git a/packages/frontend/stalker-app/src/app/modules/jobs/job-executions/job-executions.component.html b/packages/frontend/stalker-app/src/app/modules/jobs/job-executions/job-executions.component.html index 9bea1f03..e451877d 100644 --- a/packages/frontend/stalker-app/src/app/modules/jobs/job-executions/job-executions.component.html +++ b/packages/frontend/stalker-app/src/app/modules/jobs/job-executions/job-executions.component.html @@ -18,6 +18,7 @@ [columns]="displayColumns" [routerLinkPrefix]="'/jobs/executions/'" [noDataMessage]="noDataMessage" + [menuFactory]="generateMenuItem" > diff --git a/packages/frontend/stalker-app/src/app/modules/jobs/job-executions/job-executions.component.ts b/packages/frontend/stalker-app/src/app/modules/jobs/job-executions/job-executions.component.ts index ff5479bd..ebce2a0d 100644 --- a/packages/frontend/stalker-app/src/app/modules/jobs/job-executions/job-executions.component.ts +++ b/packages/frontend/stalker-app/src/app/modules/jobs/job-executions/job-executions.component.ts @@ -7,13 +7,14 @@ import { Title } from '@angular/platform-browser'; import { RouterModule } from '@angular/router'; import { ToastrService } from 'ngx-toastr'; import { BehaviorSubject, combineLatest, map, shareReplay, switchMap, tap } from 'rxjs'; +import { JobExecutionsService } from '../../../api/jobs/job-executions/job-executions.service'; +import { ProjectsService } from '../../../api/projects/projects.service'; import { ProjectCellComponent } from '../../../shared/components/project-cell/project-cell.component'; import { SharedModule } from '../../../shared/shared.module'; +import { StartedJobViewModel } from '../../../shared/types/jobs/job.type'; import { Project } from '../../../shared/types/project/project.interface'; +import { ElementMenuItems } from '../../../shared/widget/dynamic-icons/menu-icon.component'; import { FilteredPaginatedTableComponent } from '../../../shared/widget/filtered-paginated-table/filtered-paginated-table.component'; -import { JobExecutionsService } from '../../../api/jobs/job-executions/job-executions.service'; -import { ProjectsService } from '../../../api/projects/projects.service'; -import { StartedJobViewModel } from '../../../shared/types/jobs/job.type'; import { TABLE_FILTERS_SOURCE_INITAL_FILTERS, TableFilters, @@ -21,6 +22,7 @@ import { TableFiltersSourceBase, } from '../../../shared/widget/filtered-paginated-table/table-filters-source'; import { TableFormatComponent } from '../../../shared/widget/filtered-paginated-table/table-format/table-format.component'; +import { JobExecutionInteractionsService } from './job-execution-interactions.service'; import { JobLogsSummaryComponent } from './job-execution-logs-summary.component'; import { JobStateComponent } from './job-execution-state.component'; @@ -53,7 +55,7 @@ import { JobStateComponent } from './job-execution-state.component'; ], }) export class JobExecutionsComponent { - readonly displayColumns = ['name', 'project', 'time']; + readonly displayColumns = ['name', 'project', 'time', 'menu']; readonly filterOptions: string[] = ['project']; public readonly noDataMessage = $localize`:No job history|No jobs were run up to this point:No job history`; @@ -81,6 +83,7 @@ export class JobExecutionsComponent { private projectsService: ProjectsService, private titleService: Title, private toastrService: ToastrService, + private jobExecutionInteractionsService: JobExecutionInteractionsService, @Inject(TableFiltersSourceBase) private filtersSource: TableFiltersSource ) { this.titleService.setTitle($localize`:Job executions|:Job executions`); @@ -115,4 +118,27 @@ export class JobExecutionsComponent { } return filterObject; } + + stopSent = new Set(); + + public async stopJob(jobId: string) { + const result = await this.jobExecutionInteractionsService.stopJob(jobId); + if (result) { + this.stopSent.add(jobId); + } + } + + public generateMenuItem = (element: StartedJobViewModel): ElementMenuItems[] => { + if (!element) return []; + const menuItems: ElementMenuItems[] = []; + + menuItems.push({ + action: () => this.stopJob(element.id), + icon: 'stop_circle', + label: $localize`:Stop job|Stop the currently running job:Stop job`, + disabled: !!element.endTime || this.stopSent.has(element.id), + }); + + return menuItems; + }; } diff --git a/packages/frontend/stalker-app/src/app/modules/jobs/launch-jobs/launch-jobs.component.html b/packages/frontend/stalker-app/src/app/modules/jobs/launch-jobs/launch-jobs.component.html index be9751d1..81911e69 100644 --- a/packages/frontend/stalker-app/src/app/modules/jobs/launch-jobs/launch-jobs.component.html +++ b/packages/frontend/stalker-app/src/app/modules/jobs/launch-jobs/launch-jobs.component.html @@ -15,6 +15,16 @@ > + + Stop
diff --git a/packages/frontend/stalker-app/src/app/modules/jobs/launch-jobs/launch-jobs.component.ts b/packages/frontend/stalker-app/src/app/modules/jobs/launch-jobs/launch-jobs.component.ts index 08e874f5..9497b6e1 100644 --- a/packages/frontend/stalker-app/src/app/modules/jobs/launch-jobs/launch-jobs.component.ts +++ b/packages/frontend/stalker-app/src/app/modules/jobs/launch-jobs/launch-jobs.component.ts @@ -1,5 +1,5 @@ import { CommonModule } from '@angular/common'; -import { Component, ViewChild } from '@angular/core'; +import { AfterViewInit, Component, ViewChild } from '@angular/core'; import { FormsModule, ReactiveFormsModule } from '@angular/forms'; import { MatButtonModule } from '@angular/material/button'; import { MatCardModule } from '@angular/material/card'; @@ -26,22 +26,24 @@ import { Title } from '@angular/platform-browser'; import { RouterModule } from '@angular/router'; import { NgxFileDropModule } from 'ngx-file-drop'; import { ToastrService } from 'ngx-toastr'; -import { BehaviorSubject, Observable, combineLatest, map } from 'rxjs'; +import { BehaviorSubject, Observable, combineLatest, map, tap } from 'rxjs'; +import { parse, parseDocument, stringify } from 'yaml'; +import { AuthService } from '../../../api/auth/auth.service'; +import { JobExecutionsService } from '../../../api/jobs/job-executions/job-executions.service'; +import { ProjectsService } from '../../../api/projects/projects.service'; import { ThemeService } from '../../../services/theme.service'; import { AvatarComponent } from '../../../shared/components/avatar/avatar.component'; import { JobLogsComponent } from '../../../shared/components/job-logs/job-logs.component'; import { AppHeaderComponent } from '../../../shared/components/page-header/page-header.component'; import { PanelSectionModule } from '../../../shared/components/panel-section/panel-section.module'; import { SharedModule } from '../../../shared/shared.module'; -import { parse, parseDocument, stringify } from 'yaml'; -import { AuthService } from '../../../api/auth/auth.service'; -import { JobExecutionsService } from '../../../api/jobs/job-executions/job-executions.service'; -import { ProjectsService } from '../../../api/projects/projects.service'; import { JobListEntry, JobParameterDefinition, StartedJob } from '../../../shared/types/jobs/job.type'; import { ProjectSummary } from '../../../shared/types/project/project.summary'; import { CodeEditorComponent, CodeEditorTheme } from '../../../shared/widget/code-editor/code-editor.component'; +import { SpinnerButtonComponent } from '../../../shared/widget/spinner-button/spinner-button.component'; import { normalizeSearchString } from '../../../utils/normalize-search-string'; import { FindingsModule } from '../../findings/findings.module'; +import { JobExecutionInteractionsService } from '../job-executions/job-execution-interactions.service'; @Component({ standalone: true, @@ -82,9 +84,10 @@ import { FindingsModule } from '../../findings/findings.module'; JobLogsComponent, CodeEditorComponent, AvatarComponent, + SpinnerButtonComponent, ], }) -export class LaunchJobsComponent { +export class LaunchJobsComponent implements AfterViewInit { public code = ''; public language = 'yaml'; public minimapEnabled = false; @@ -98,6 +101,19 @@ export class LaunchJobsComponent { @ViewChild(MatPaginator) paginator!: MatPaginator; dataSource = new MatTableDataSource(); + @ViewChild(JobLogsComponent) public logs!: JobLogsComponent; + public jobIsStopping$ = new BehaviorSubject(false); + public jobIsRunning$!: Observable; + + ngAfterViewInit(): void { + this.jobIsRunning$ = this.logs.isJobInProgress$.pipe( + tap((inProgress) => { + if (!inProgress) { + this.jobIsStopping$.next(false); + } + }) + ); + } public selectedRow: JobListEntry | undefined; public currentJobName = ''; @@ -124,7 +140,8 @@ export class LaunchJobsComponent { private projectsService: ProjectsService, private titleService: Title, public authService: AuthService, - private themeService: ThemeService + private themeService: ThemeService, + private jobExecutionInteractionsService: JobExecutionInteractionsService ) { this.titleService.setTitle($localize`:Launch jobs|:Launch jobs`); } @@ -205,6 +222,7 @@ export class LaunchJobsComponent { parameters, this.selectedProject ?? undefined ); + this.jobIsStopping$.next(false); } catch { this.toastr.error( $localize`:Error while starting job|There was an error while starting the job:Error while starting job` @@ -212,6 +230,14 @@ export class LaunchJobsComponent { } } + public async stopJob() { + const result = await this.jobExecutionInteractionsService.stopJob(this.currentStartedJob!.id); + + if (result) { + this.jobIsStopping$.next(true); + } + } + private filterJob(entry: JobListEntry, filter: string) { const parts = [entry.name]; return normalizeSearchString(parts.join(' ')).includes(filter); diff --git a/packages/frontend/stalker-app/src/app/modules/jobs/subscriptions/list-subscriptions.component.ts b/packages/frontend/stalker-app/src/app/modules/jobs/subscriptions/list-subscriptions.component.ts index db5439d2..7aba989f 100644 --- a/packages/frontend/stalker-app/src/app/modules/jobs/subscriptions/list-subscriptions.component.ts +++ b/packages/frontend/stalker-app/src/app/modules/jobs/subscriptions/list-subscriptions.component.ts @@ -21,11 +21,8 @@ import { EventSubscription, SubscriptionData, } from '../../../shared/types/subscriptions/subscription.type'; -import { - ElementMenuItems, - FilteredPaginatedTableComponent, -} from '../../../shared/widget/filtered-paginated-table/filtered-paginated-table.component'; -import { DisabledPillTagComponent } from '../../../shared/widget/pill-tag/disabled-pill-tag.component'; +import { ElementMenuItems } from '../../../shared/widget/dynamic-icons/menu-icon.component'; +import { FilteredPaginatedTableComponent } from '../../../shared/widget/filtered-paginated-table/filtered-paginated-table.component'; import { TABLE_FILTERS_SOURCE_INITAL_FILTERS, TableFilters, @@ -33,6 +30,7 @@ import { TableFiltersSourceBase, } from '../../../shared/widget/filtered-paginated-table/table-filters-source'; import { TableFormatComponent } from '../../../shared/widget/filtered-paginated-table/table-format/table-format.component'; +import { DisabledPillTagComponent } from '../../../shared/widget/pill-tag/disabled-pill-tag.component'; import { DataSourceComponent } from '../../data-source/data-source/data-source.component'; import { SubscriptionInteractionService } from './subscription-interaction.service'; import { subscriptionTypes } from './subscription-templates'; diff --git a/packages/frontend/stalker-app/src/app/modules/ports/list-ports/list-ports.component.ts b/packages/frontend/stalker-app/src/app/modules/ports/list-ports/list-ports.component.ts index 06e7f538..dbfa4a44 100644 --- a/packages/frontend/stalker-app/src/app/modules/ports/list-ports/list-ports.component.ts +++ b/packages/frontend/stalker-app/src/app/modules/ports/list-ports/list-ports.component.ts @@ -25,10 +25,8 @@ import { DomainSummary } from '../../../shared/types/domain/domain.summary'; import { ExtendedPort, Port } from '../../../shared/types/ports/port.interface'; import { ProjectSummary } from '../../../shared/types/project/project.summary'; import { Tag } from '../../../shared/types/tag.type'; -import { - ElementMenuItems, - FilteredPaginatedTableComponent, -} from '../../../shared/widget/filtered-paginated-table/filtered-paginated-table.component'; +import { ElementMenuItems } from '../../../shared/widget/dynamic-icons/menu-icon.component'; +import { FilteredPaginatedTableComponent } from '../../../shared/widget/filtered-paginated-table/filtered-paginated-table.component'; import { TABLE_FILTERS_SOURCE_INITAL_FILTERS, TableFilters, diff --git a/packages/frontend/stalker-app/src/app/modules/user/api-key/api-key.component.ts b/packages/frontend/stalker-app/src/app/modules/user/api-key/api-key.component.ts index 81622f08..057341b5 100644 --- a/packages/frontend/stalker-app/src/app/modules/user/api-key/api-key.component.ts +++ b/packages/frontend/stalker-app/src/app/modules/user/api-key/api-key.component.ts @@ -20,10 +20,8 @@ import { ConfirmDialogComponent, ConfirmDialogData, } from '../../../shared/widget/confirm-dialog/confirm-dialog.component'; -import { - ElementMenuItems, - FilteredPaginatedTableComponent, -} from '../../../shared/widget/filtered-paginated-table/filtered-paginated-table.component'; +import { ElementMenuItems } from '../../../shared/widget/dynamic-icons/menu-icon.component'; +import { FilteredPaginatedTableComponent } from '../../../shared/widget/filtered-paginated-table/filtered-paginated-table.component'; import { TableFiltersSource, TableFiltersSourceBase, diff --git a/packages/frontend/stalker-app/src/app/modules/websites/list-websites/list-websites.component.ts b/packages/frontend/stalker-app/src/app/modules/websites/list-websites/list-websites.component.ts index 57ee5360..41dedb48 100644 --- a/packages/frontend/stalker-app/src/app/modules/websites/list-websites/list-websites.component.ts +++ b/packages/frontend/stalker-app/src/app/modules/websites/list-websites/list-websites.component.ts @@ -29,11 +29,9 @@ import { CustomFinding, CustomFindingField } from '../../../shared/types/finding import { ProjectSummary } from '../../../shared/types/project/project.summary'; import { Tag } from '../../../shared/types/tag.type'; import { Website } from '../../../shared/types/websites/website.type'; +import { ElementMenuItems } from '../../../shared/widget/dynamic-icons/menu-icon.component'; import { SecureIconComponent } from '../../../shared/widget/dynamic-icons/secure-icon.component'; -import { - ElementMenuItems, - FilteredPaginatedTableComponent, -} from '../../../shared/widget/filtered-paginated-table/filtered-paginated-table.component'; +import { FilteredPaginatedTableComponent } from '../../../shared/widget/filtered-paginated-table/filtered-paginated-table.component'; import { GridFormatComponent } from '../../../shared/widget/filtered-paginated-table/grid-format/grid-format.component'; import { TABLE_FILTERS_SOURCE_INITAL_FILTERS, diff --git a/packages/frontend/stalker-app/src/app/shared/components/job-logs/job-logs.component.ts b/packages/frontend/stalker-app/src/app/shared/components/job-logs/job-logs.component.ts index e0948dfd..4883229a 100644 --- a/packages/frontend/stalker-app/src/app/shared/components/job-logs/job-logs.component.ts +++ b/packages/frontend/stalker-app/src/app/shared/components/job-logs/job-logs.component.ts @@ -1,7 +1,7 @@ import { CommonModule } from '@angular/common'; import { ChangeDetectionStrategy, Component, Input, OnChanges } from '@angular/core'; import { MatProgressBarModule } from '@angular/material/progress-bar'; -import { Observable, concatMap, debounceTime, map, scan, shareReplay, startWith } from 'rxjs'; +import { BehaviorSubject, Observable, concatMap, debounceTime, map, scan, shareReplay, startWith } from 'rxjs'; import { AuthService } from '../../../api/auth/auth.service'; import { JobExecutionsService } from '../../../api/jobs/job-executions/job-executions.service'; import { JobExecutionsSocketioClient } from '../../../api/jobs/job-executions/job-executions.socketio-client'; @@ -17,7 +17,7 @@ import { CodeEditorComponent, CodeEditorTheme } from '../../widget/code-editor/c template: ` | null = null; - public isJobInProgress$: Observable | null = null; + public _isJobInProgress$: Observable | null = null; + public isJobInProgress$ = new BehaviorSubject(false); constructor( public jobsService: JobExecutionsService, @@ -71,15 +72,17 @@ export class JobLogsComponent implements OnChanges { shareReplay(1) ); - this.isJobInProgress$ = this.socket.jobStatus.pipe( + this._isJobInProgress$ = this.socket.jobStatus.pipe( map((update) => { switch (update.status) { case 'success': + this.isJobInProgress$.next(false); this.timeout = setTimeout(() => this.socket?.disconnect(), 5000); return false; case 'started': default: + this.isJobInProgress$.next(true); return true; } }) diff --git a/packages/frontend/stalker-app/src/app/shared/widget/confirm-dialog/confirm-dialog.component.html b/packages/frontend/stalker-app/src/app/shared/widget/confirm-dialog/confirm-dialog.component.html index c8ff4634..6958a9b7 100644 --- a/packages/frontend/stalker-app/src/app/shared/widget/confirm-dialog/confirm-dialog.component.html +++ b/packages/frontend/stalker-app/src/app/shared/widget/confirm-dialog/confirm-dialog.component.html @@ -22,7 +22,7 @@

{{ data.title }}

} }
- @if (data.primaryButtonText != '') { + @if (data.primaryButtonText && data.primaryButtonText != '') { {{ data.primaryButtonText }} diff --git a/packages/frontend/stalker-app/src/app/shared/widget/dynamic-icons/menu-icon.component.ts b/packages/frontend/stalker-app/src/app/shared/widget/dynamic-icons/menu-icon.component.ts index 1873bc5a..8f8f030f 100644 --- a/packages/frontend/stalker-app/src/app/shared/widget/dynamic-icons/menu-icon.component.ts +++ b/packages/frontend/stalker-app/src/app/shared/widget/dynamic-icons/menu-icon.component.ts @@ -2,7 +2,15 @@ import { Component, Input } from '@angular/core'; import { MatIconModule } from '@angular/material/icon'; import { MatMenuModule } from '@angular/material/menu'; import { MatTooltipModule } from '@angular/material/tooltip'; -import { ElementMenuItems } from '../filtered-paginated-table/filtered-paginated-table.component'; + +export interface ElementMenuItems { + label: string; + icon: string; + action: () => Promise | void; + hidden?: boolean; + disabled?: boolean; + tooltip?: string; +} @Component({ standalone: true, diff --git a/packages/frontend/stalker-app/src/app/shared/widget/filtered-paginated-table/filtered-paginated-table.component.ts b/packages/frontend/stalker-app/src/app/shared/widget/filtered-paginated-table/filtered-paginated-table.component.ts index 11b07b1a..210b37ff 100644 --- a/packages/frontend/stalker-app/src/app/shared/widget/filtered-paginated-table/filtered-paginated-table.component.ts +++ b/packages/frontend/stalker-app/src/app/shared/widget/filtered-paginated-table/filtered-paginated-table.component.ts @@ -47,15 +47,6 @@ import { Observable, debounceTime, distinctUntilChanged, filter, map, startWith import { IdentifiedElement } from '../../types/identified-element.type'; import { TableFiltersSourceBase } from './table-filters-source'; -export interface ElementMenuItems { - label: string; - icon: string; - action: () => Promise | void; - hidden?: boolean; - disabled?: boolean; - tooltip?: string; -} - @Component({ standalone: true, selector: 'app-filtered-paginated-table', diff --git a/packages/frontend/stalker-app/src/app/shared/widget/filtered-paginated-table/grid-format/grid-format.component.ts b/packages/frontend/stalker-app/src/app/shared/widget/filtered-paginated-table/grid-format/grid-format.component.ts index 7c5861ef..ef0010f8 100644 --- a/packages/frontend/stalker-app/src/app/shared/widget/filtered-paginated-table/grid-format/grid-format.component.ts +++ b/packages/frontend/stalker-app/src/app/shared/widget/filtered-paginated-table/grid-format/grid-format.component.ts @@ -14,14 +14,7 @@ import { MatTooltipModule } from '@angular/material/tooltip'; import { RouterModule } from '@angular/router'; import { AvatarComponent } from '../../../components/avatar/avatar.component'; import { IdentifiedElement } from '../../../types/identified-element.type'; -import { MenuIconComponent } from '../../dynamic-icons/menu-icon.component'; - -export interface ElementMenuItems { - label: string; - icon: string; - action: () => Promise | void; - hidden?: boolean; -} +import { ElementMenuItems, MenuIconComponent } from '../../dynamic-icons/menu-icon.component'; @Component({ standalone: true, diff --git a/packages/frontend/stalker-app/src/app/shared/widget/filtered-paginated-table/table-format/table-format.component.ts b/packages/frontend/stalker-app/src/app/shared/widget/filtered-paginated-table/table-format/table-format.component.ts index cef19876..00aab0a2 100644 --- a/packages/frontend/stalker-app/src/app/shared/widget/filtered-paginated-table/table-format/table-format.component.ts +++ b/packages/frontend/stalker-app/src/app/shared/widget/filtered-paginated-table/table-format/table-format.component.ts @@ -32,14 +32,7 @@ import { MatTooltipModule } from '@angular/material/tooltip'; import { RouterModule } from '@angular/router'; import { AvatarComponent } from '../../../components/avatar/avatar.component'; import { IdentifiedElement } from '../../../types/identified-element.type'; -import { MenuIconComponent } from '../../dynamic-icons/menu-icon.component'; - -export interface ElementMenuItems { - label: string; - icon: string; - action: () => Promise | void; - hidden?: boolean; -} +import { ElementMenuItems, MenuIconComponent } from '../../dynamic-icons/menu-icon.component'; @Component({ standalone: true, diff --git a/packages/frontend/stalker-app/src/app/shared/widget/spinner-button/spinner-button.component.ts b/packages/frontend/stalker-app/src/app/shared/widget/spinner-button/spinner-button.component.ts index e38436b2..e850ea43 100644 --- a/packages/frontend/stalker-app/src/app/shared/widget/spinner-button/spinner-button.component.ts +++ b/packages/frontend/stalker-app/src/app/shared/widget/spinner-button/spinner-button.component.ts @@ -23,11 +23,24 @@ import { MatProgressSpinnerModule } from '@angular/material/progress-spinner'; `, imports: [CommonModule, MatButtonModule, MatProgressSpinnerModule], + styles: ` + :host { + pointer-events: none; + } + + :host button span { + pointer-events: none; + } + + button { + pointer-events: auto; + } + `, }) export class SpinnerButtonComponent { @Input() label = ''; @Input() buttonColor: 'primary' | 'accent' | 'warn' | undefined = undefined; @Input() spinnerColor = 'primary'; - @Input() loadingState = false; + @Input() loadingState: boolean | null = false; @Input() disabled = false; }