Skip to content

Cancel running job #344

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Feb 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions devspace.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -587,15 +587,23 @@ deployments:
config:
max.message.bytes: 64000
flush.messages: 1
- name: stalker.jobs.management
partitions: 1
replicationFactor: 1
config:
max.message.bytes: 64000
flush.messages: 1
extraProvisioningCommands: # Setting the ACLs
- "/opt/bitnami/kafka/bin/kafka-acls.sh --bootstrap-server $KAFKA_SERVICE --command-config $CLIENT_CONF --add --allow-principal User:jobs-manager --operation read --operation write --operation describe --topic stalker.jobs.findings"
- "/opt/bitnami/kafka/bin/kafka-acls.sh --bootstrap-server $KAFKA_SERVICE --command-config $CLIENT_CONF --add --allow-principal User:jobs-manager --operation write --operation describe --topic stalker.jobs.requests"
- "/opt/bitnami/kafka/bin/kafka-acls.sh --bootstrap-server $KAFKA_SERVICE --command-config $CLIENT_CONF --add --allow-principal User:jobs-manager --operation read --operation describe --topic stalker.jobs.logs"
- "/opt/bitnami/kafka/bin/kafka-acls.sh --bootstrap-server $KAFKA_SERVICE --command-config $CLIENT_CONF --add --allow-principal User:jobs-manager --operation read --operation write --operation describe --topic stalker.jobs.models"
- "/opt/bitnami/kafka/bin/kafka-acls.sh --bootstrap-server $KAFKA_SERVICE --command-config $CLIENT_CONF --add --allow-principal User:jobs-manager --operation read --operation write --operation describe --topic stalker.jobs.management"
- "/opt/bitnami/kafka/bin/kafka-acls.sh --bootstrap-server $KAFKA_SERVICE --command-config $CLIENT_CONF --add --allow-principal User:orchestrator --operation write --operation describe --topic stalker.jobs.findings"
- "/opt/bitnami/kafka/bin/kafka-acls.sh --bootstrap-server $KAFKA_SERVICE --command-config $CLIENT_CONF --add --allow-principal User:orchestrator --operation read --operation describe --topic stalker.jobs.requests"
- "/opt/bitnami/kafka/bin/kafka-acls.sh --bootstrap-server $KAFKA_SERVICE --command-config $CLIENT_CONF --add --allow-principal User:orchestrator --operation write --operation describe --topic stalker.jobs.logs"
- "/opt/bitnami/kafka/bin/kafka-acls.sh --bootstrap-server $KAFKA_SERVICE --command-config $CLIENT_CONF --add --allow-principal User:orchestrator --operation read --operation describe --topic stalker.jobs.models"
- "/opt/bitnami/kafka/bin/kafka-acls.sh --bootstrap-server $KAFKA_SERVICE --command-config $CLIENT_CONF --add --allow-principal User:orchestrator --operation read --operation describe --topic stalker.jobs.management"
- "/opt/bitnami/kafka/bin/kafka-acls.sh --bootstrap-server $KAFKA_SERVICE --command-config $CLIENT_CONF --add --allow-principal User:jobs-manager --operation read --operation describe --group jobs-manager"
- "/opt/bitnami/kafka/bin/kafka-acls.sh --bootstrap-server $KAFKA_SERVICE --command-config $CLIENT_CONF --add --allow-principal User:jobs-manager --operation read --operation describe --group jobs-manager-job-logs"
- "/opt/bitnami/kafka/bin/kafka-acls.sh --bootstrap-server $KAFKA_SERVICE --command-config $CLIENT_CONF --add --allow-principal User:orchestrator --operation read --operation describe --group stalker"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ export const orchestratorConstants = {
findings: 'stalker.jobs.findings',
jobLogs: 'stalker.jobs.logs',
jobModels: 'stalker.jobs.models',
jobManagement: 'stalker.jobs.management',
},
};

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Module } from '@nestjs/common';
import { DataSourcesModule } from '../../datasources/data-sources.module';
import { QueueModule } from '../../job-queue/queue.module';
import { QueueModule } from '../../queues/queue.module';
import { DatalayerModule } from '../datalayer.module';
import { CustomJobTemplatesController } from './custom-job-templates.controller';
import { jobTemplatesInitProvider } from './custom-job-templates.provider';
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Module } from '@nestjs/common';
import { MongooseModule } from '@nestjs/mongoose';
import { DataSourcesModule } from '../../datasources/data-sources.module';
import { QueueModule } from '../../job-queue/queue.module';
import { QueueModule } from '../../queues/queue.module';
import { ConfigService } from '../admin/config/config.service';
import { DatalayerModule } from '../datalayer.module';
import { CustomJobsController } from './custom-jobs.controller';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { DeleteResult } from 'mongodb';
import { Model, Types } from 'mongoose';
import { HttpNotFoundException } from '../../../exceptions/http.exceptions';
import { JobSummary } from '../../../types/job-summary.type';
import { JobModelUpdateQueue } from '../../job-queue/job-model-update-queue';
import { JobModelUpdateQueue } from '../../queues/job-model-update-queue/job-model-update-queue';
import {
JobContainer,
JobContainerDocument,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { UpdateFilter } from 'mongodb';
import { Model } from 'mongoose';
import { isConsumerMode } from '../../app.constants';
import { DataSources } from '../../datasources/data-sources';
import { JobModelUpdateQueue } from '../../job-queue/job-model-update-queue';
import { JobModelUpdateQueue } from '../../queues/job-model-update-queue/job-model-update-queue';
import { DATABASE_INIT } from '../admin/config/config.provider';
import { JobPodConfiguration } from '../admin/config/job-pod-config/job-pod-config.model';
import { JobContainer } from '../container/job-container.model';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ import {
} from '../../../types/timestamped-string.type';
import { ProjectUnassigned } from '../../../validators/is-project-id.validator';
import { isTest } from '../../app.constants';
import { JobQueue } from '../../job-queue/job-queue';
import { JobManagementQueue } from '../../queues/job-management-queue/job-management-queue';
import { KafkaJobManagementTask } from '../../queues/job-management-queue/kafka-job-management-queue';
import { JobQueue } from '../../queues/job-queue/job-queue';
import { ConfigService } from '../admin/config/config.service';
import { Project } from '../reporting/project.model';
import { JobExecutionsDto } from './jobs.dto';
Expand All @@ -25,6 +27,7 @@ export class JobExecutionsService {
private jobQueue: JobQueue,
@InjectModel('job') private readonly jobModel: Model<Job & Document>,
@InjectModel('project') private readonly projectModel: Model<Project>,
private jobManagementQueue: JobManagementQueue,
) {}

public async getAll(dto: JobExecutionsDto): Promise<Page<JobDocument>> {
Expand Down Expand Up @@ -141,6 +144,19 @@ export class JobExecutionsService {
};
}

public async terminate(jobId: string) {
const j = await this.jobModel.findById(jobId);

if (j.endTime) return;

const task: KafkaJobManagementTask = {
jobId: j._id.toString(),
task: 'TerminateJob',
};

this.jobManagementQueue.publish(task);
}

public async addJobOutputLine(
jobId: string,
timestamp: number,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {
Delete,
Get,
Param,
Patch,
Post,
Query,
UseGuards,
Expand All @@ -13,6 +14,7 @@ import { isNotEmpty, isString } from 'class-validator';
import {
HttpBadRequestException,
HttpNotFoundException,
HttpNotImplementedException,
} from '../../../exceptions/http.exceptions';
import { MongoIdDto } from '../../../types/dto/mongo-id.dto';
import { JobLog } from '../../../types/job-log.model';
Expand All @@ -32,7 +34,7 @@ import { SecretsService } from '../secrets/secrets.service';
import { JobParameter } from '../subscriptions/subscriptions.type';
import { JobExecutionsService } from './job-executions.service';
import { JobDefinitions } from './job-model.module';
import { JobExecutionsDto, StartJobDto } from './jobs.dto';
import { JobExecutionsDto, JobManagementDto, StartJobDto } from './jobs.dto';
import { JobFactory, JobFactoryUtils } from './jobs.factory';
import { CustomJob } from './models/custom-job.model';
import { JobDocument } from './models/jobs.model';
Expand Down Expand Up @@ -131,6 +133,19 @@ export class JobsController {
return await this.jobsService.publish(job);
}

@UseGuards(AuthGuard([JwtStrategy.name, ApiKeyStrategy.name]), RolesGuard)
@Roles(Role.User)
@Patch(':id')
async stopJob(@Param() id: MongoIdDto, @Body() dto: JobManagementDto) {
switch (dto.task) {
case 'TerminateJob':
await this.jobsService.terminate(id.id);
break;
default:
throw new HttpNotImplementedException();
}
}

@UseGuards(AuthGuard([JwtStrategy.name, ApiKeyStrategy.name]), RolesGuard)
@Roles(Role.User)
@Delete()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { IntersectionType } from '@nestjs/swagger';
import { Type } from 'class-transformer';
import {
IsArray,
IsIn,
IsMongoId,
IsNotEmpty,
IsOptional,
Expand Down Expand Up @@ -40,3 +41,12 @@ export class StartJobDto {
@IsOptional()
projectId?: string;
}

export const jobManagementTasks = ['TerminateJob'] as const;
export type JobManagementTask = (typeof jobManagementTasks)[number];

export class JobManagementDto {
@IsNotEmpty()
@IsIn(jobManagementTasks)
task: JobManagementTask;
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Module } from '@nestjs/common';
import { JwtModule } from '@nestjs/jwt';
import { QueueModule } from '../../job-queue/queue.module';
import { QueueModule } from '../../queues/queue.module';
import { ConfigModule } from '../admin/config/config.module';
import { CustomJobsModule } from '../custom-jobs/custom-jobs.module';
import { DatalayerModule } from '../datalayer.module';
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { forwardRef, Module } from '@nestjs/common';
import { QueueModule } from '../../../job-queue/queue.module';
import { QueueModule } from '../../../queues/queue.module';
import { ConfigModule } from '../../admin/config/config.module';
import { DatalayerModule } from '../../datalayer.module';
import { JobsModule } from '../../jobs/jobs.module';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {
} from '../../../../exceptions/http.exceptions';
import escapeStringRegexp from '../../../../utils/escape-string-regexp';
import { HostnameFinding } from '../../../findings/findings.service';
import { FindingsQueue } from '../../../job-queue/findings-queue';
import { FindingsQueue } from '../../../queues/finding-queue/findings-queue';
import { ConfigService } from '../../admin/config/config.service';
import { MONGO_DUPLICATE_ERROR } from '../../database.constants';
import { JobExecutionsService } from '../../jobs/job-executions.service';
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { forwardRef, Module } from '@nestjs/common';
import { QueueModule } from '../../../job-queue/queue.module';
import { QueueModule } from '../../../queues/queue.module';
import { ConfigModule } from '../../admin/config/config.module';
import { DatalayerModule } from '../../datalayer.module';
import { JobsModule } from '../../jobs/jobs.module';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { FilterQuery, Model, Types } from 'mongoose';
import { HttpNotFoundException } from '../../../../exceptions/http.exceptions';
import escapeStringRegexp from '../../../../utils/escape-string-regexp';
import { IpFinding } from '../../../findings/findings.service';
import { FindingsQueue } from '../../../job-queue/findings-queue';
import { FindingsQueue } from '../../../queues/finding-queue/findings-queue';
import { ConfigService } from '../../admin/config/config.service';
import { TagsService } from '../../tags/tag.service';
import { CorrelationKeyUtils } from '../correlation.utils';
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Module } from '@nestjs/common';
import { QueueModule } from '../../../job-queue/queue.module';
import { QueueModule } from '../../../queues/queue.module';
import { DatalayerModule } from '../../datalayer.module';
import { TagsModule } from '../../tags/tag.module';
import { WebsiteController } from './website.controller';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {
} from '../../../../exceptions/http.exceptions';
import escapeStringRegexp from '../../../../utils/escape-string-regexp';
import { WebsiteFinding } from '../../../findings/findings.service';
import { FindingsQueue } from '../../../job-queue/findings-queue';
import { FindingsQueue } from '../../../queues/finding-queue/findings-queue';
import { TagsService } from '../../tags/tag.service';
import { CorrelationKeyUtils } from '../correlation.utils';
import { Domain } from '../domain/domain.model';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import { SecretsModule } from '../database/secrets/secrets.module';
import { EventSubscriptionsModule } from '../database/subscriptions/event-subscriptions/event-subscriptions.module';
import { SubscriptionTriggersModule } from '../database/subscriptions/subscription-triggers/subscription-triggers.module';
import { TagsModule } from '../database/tags/tag.module';
import { kafkaConfig } from '../job-queue/queue.module';
import { kafkaConfig } from '../queues/queue.module';
import { FindingsHandlers } from './commands/findings-commands';
import { FindingsConsumer } from './findings.consumer';
import { FindingsController } from './findings.controller';
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Finding } from '../findings/findings.service';
import { Finding } from '../../findings/findings.service';

export abstract class FindingsQueue {
public abstract publish(...findings: Finding[]): Promise<void>;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Injectable, Logger } from '@nestjs/common';
import { Message, Producer } from 'kafkajs';
import { orchestratorConstants } from '../auth/constants';
import { Finding } from '../findings/findings.service';
import { orchestratorConstants } from '../../auth/constants';
import { Finding } from '../../findings/findings.service';
import { FindingsQueue } from './findings-queue';

@Injectable()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import { KafkaJobManagementTask } from './kafka-job-management-queue';

export abstract class JobManagementQueue {
public abstract publish(
...jobManagementTasks: KafkaJobManagementTask[]
): Promise<void>;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import { Injectable, Logger } from '@nestjs/common';
import { Message, Producer } from 'kafkajs';
import { orchestratorConstants } from '../../auth/constants';
import { JobManagementTask } from '../../database/jobs/jobs.dto';
import { JobManagementQueue } from './job-management-queue';

export interface KafkaJobManagementTask {
jobId: string;
task: JobManagementTask;
}

@Injectable()
export class KafkaJobManagementQueue implements JobManagementQueue {
private logger = new Logger(KafkaJobManagementQueue.name);

constructor(private producer: Producer) {}

public async publish(...jobManagementTasks: KafkaJobManagementTask[]) {
this.logger.debug(
`Publishing ${
jobManagementTasks.length
} tasks to the job management queue on topic ${
orchestratorConstants.topics.jobManagement
}: ${JSON.stringify(jobManagementTasks)}.`,
);

for (const task of jobManagementTasks) {
const serializedTasks: Message[] = [
{
value: JSON.stringify(task),
},
];

await this.producer.send({
topic: orchestratorConstants.topics.jobManagement,
messages: serializedTasks,
});
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { Injectable, Logger } from '@nestjs/common';
import { JobManagementQueue } from './job-management-queue';
import { KafkaJobManagementTask } from './kafka-job-management-queue';

@Injectable()
export class NullJobManagementQueue implements JobManagementQueue {
private logger = new Logger(NullJobManagementQueue.name);

public async publish(...jobManagementTasks: KafkaJobManagementTask[]) {
this.logger.debug('Job management not posted to queue');
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { CustomJobsDocument } from '../database/custom-jobs/custom-jobs.model';
import { CustomJobsDocument } from '../../database/custom-jobs/custom-jobs.model';

export abstract class JobModelUpdateQueue {
public abstract publish(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { Injectable, Logger } from '@nestjs/common';
import { Message, Producer } from 'kafkajs';
import { orchestratorConstants } from '../auth/constants';
import { orchestratorConstants } from '../../auth/constants';
import {
CustomJobEntry,
CustomJobsDocument,
} from '../database/custom-jobs/custom-jobs.model';
} from '../../database/custom-jobs/custom-jobs.model';
import { JobModelUpdateQueue } from './job-model-update-queue';

export interface KafkaJobModelUpdate extends CustomJobEntry {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Injectable, Logger } from '@nestjs/common';
import { CustomJobEntry } from '../database/custom-jobs/custom-jobs.model';
import { CustomJobEntry } from '../../database/custom-jobs/custom-jobs.model';
import { JobModelUpdateQueue } from './job-model-update-queue';

@Injectable()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Injectable, Logger } from '@nestjs/common';
import { Producer } from 'kafkajs';
import { orchestratorConstants } from '../auth/constants';
import { orchestratorConstants } from '../../auth/constants';
import { JobQueue } from './job-queue';

@Injectable()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,18 @@ import { Kafka, KafkaConfig } from 'kafkajs';
import { readFileSync } from 'node:fs';
import { isTest } from '../app.constants';
import { orchestratorConstants } from '../auth/constants';
import { FindingsQueue } from './findings-queue';
import { JobModelUpdateQueue } from './job-model-update-queue';
import { JobQueue } from './job-queue';
import { KafkaFindingsQueue } from './kafka-findings-queue';
import { KafkaJobModelUpdateQueue } from './kafka-job-model-update-queue';
import { KafkaJobQueue } from './kafka-job-queue';
import { NullFindingsQueue } from './null-findings-queue';
import { NullJobModelUpdateQueue } from './null-job-model-update-queue';
import { NullJobQueue } from './null-job-queue';
import { FindingsQueue } from './finding-queue/findings-queue';
import { KafkaFindingsQueue } from './finding-queue/kafka-findings-queue';
import { NullFindingsQueue } from './finding-queue/null-findings-queue';
import { JobManagementQueue } from './job-management-queue/job-management-queue';
import { KafkaJobManagementQueue } from './job-management-queue/kafka-job-management-queue';
import { NullJobManagementQueue } from './job-management-queue/null-job-management-queue';
import { JobModelUpdateQueue } from './job-model-update-queue/job-model-update-queue';
import { KafkaJobModelUpdateQueue } from './job-model-update-queue/kafka-job-model-update-queue';
import { NullJobModelUpdateQueue } from './job-model-update-queue/null-job-model-update-queue';
import { JobQueue } from './job-queue/job-queue';
import { KafkaJobQueue } from './job-queue/kafka-job-queue';
import { NullJobQueue } from './job-queue/null-job-queue';

const certFolder =
isTest() && process.env.TEST_TYPE === 'unit' ? './' : '/certs';
Expand Down Expand Up @@ -82,8 +85,21 @@ export const kafkaConfig: KafkaConfig = {
return new KafkaJobModelUpdateQueue(producer);
},
},
{
provide: JobManagementQueue,
useFactory: async () => {
if (isTest()) return new NullJobManagementQueue();

const kafka = new Kafka(kafkaConfig);

const producer = kafka.producer();
await producer.connect();

return new KafkaJobManagementQueue(producer);
},
},
],
exports: [JobQueue, FindingsQueue, JobModelUpdateQueue],
exports: [JobQueue, FindingsQueue, JobModelUpdateQueue, JobManagementQueue],
})
export class QueueModule {
public constructor() {}
Expand Down
Loading
Loading