Skip to content

Commit c13c081

Browse files
committed
feat: allow to continue job by adding tasks from finshed jobs handler
1 parent 4aa4096 commit c13c081

1 file changed

Lines changed: 60 additions & 72 deletions

File tree

index.ts

Lines changed: 60 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,16 @@ type allTasksDoneStatusType = {
2222
jobId: string;
2323
failedTasks: number;
2424
succeededTasks: number;
25+
finishAttemptNumber: number;
2526
};
2627
type onAllTasksDoneType = (status: allTasksDoneStatusType) => Promise<void> | void;
2728
type taskType = {
28-
skip?: boolean;
2929
state: Record<string, any>;
3030
}
31+
type taskToProcessType = {
32+
taskIndex: number;
33+
task: taskType;
34+
};
3135

3236
function encodeStateFieldName(fieldName: string): string {
3337
return encodeURIComponent(fieldName);
@@ -179,7 +183,7 @@ export default class BackgroundJobsPlugin extends AdminForthPlugin {
179183
return count ? parseInt(count, 10) : 0;
180184
}
181185

182-
private async getAllTasksDoneStatus(levelDb: Level): Promise<Omit<allTasksDoneStatusType, 'jobId'>> {
186+
private async getAllTasksDoneStatus(levelDb: Level): Promise<Omit<allTasksDoneStatusType, 'jobId' | 'finishAttemptNumber'>> {
183187
const totalTasks = await this.getTotalTasksInLevelDb(levelDb);
184188
let failedTasks = 0;
185189
let succeededTasks = 0;
@@ -240,14 +244,14 @@ export default class BackgroundJobsPlugin extends AdminForthPlugin {
240244
afLogger.warn(message);
241245
}
242246

243-
private async triggerOnAllTasksDone(onAllTasksDone: onAllTasksDoneType | undefined, levelDb: Level, jobId: string) {
247+
private async triggerOnAllTasksDone(onAllTasksDone: onAllTasksDoneType | undefined, levelDb: Level, jobId: string, finishAttemptNumber: number) {
244248
if (!onAllTasksDone) {
245249
return;
246250
}
247251

248252
try {
249253
const status = await this.getAllTasksDoneStatus(levelDb);
250-
await onAllTasksDone({ jobId, ...status });
254+
await onAllTasksDone({ jobId, ...status, finishAttemptNumber });
251255
} catch (error) {
252256
const errorMessage = error instanceof Error ? error.message : String(error);
253257
afLogger.error(`Error in onAllTasksDone callback for job ${jobId}: ${errorMessage}`);
@@ -326,7 +330,8 @@ export default class BackgroundJobsPlugin extends AdminForthPlugin {
326330

327331
await Promise.all(createTaskRecordsPromises);
328332

329-
this.runProcessingTasks(tasks, jobLevelDb, jobId, handleTask, parrallelLimit, onAllTasksDone);
333+
const tasksToProcess = tasks.map((task, taskIndex) => ({ taskIndex, task }));
334+
this.runProcessingTasks(tasksToProcess, jobLevelDb, jobId, handleTask, parrallelLimit, onAllTasksDone);
330335
return jobId;
331336
}
332337

@@ -351,6 +356,7 @@ export default class BackgroundJobsPlugin extends AdminForthPlugin {
351356
});
352357

353358
await Promise.all(createTaskRecordsPromises);
359+
await this.updateJobProgressFromLevelDb(jobLevelDb, jobId);
354360
}
355361

356362
public async deleteTasksFromExistingJob(
@@ -377,38 +383,32 @@ export default class BackgroundJobsPlugin extends AdminForthPlugin {
377383
await jobLevelDb.put('_meta:count', `${currentTotalTasks - 1}`);
378384
}
379385

380-
private async getUnfinishedTasksFromLevelDb(levelDb: Level): Promise<{ state: Record<string, any> }[]> {
386+
private async getUnfinishedTasksFromLevelDb(levelDb: Level): Promise<taskToProcessType[]> {
381387
const totalTasks = await this.getTotalTasksInLevelDb(levelDb);
382-
const unfinishedTasks: { state: Record<string, any> }[] = [];
388+
const unfinishedTasks: taskToProcessType[] = [];
383389
for (let taskIndex = 0; taskIndex < totalTasks; taskIndex++) {
384390
const status = await this.getLevelDbTaskStatusField(levelDb, taskIndex.toString());
385391
if (status === 'IN_PROGRESS' || status === 'SCHEDULED') {
386392
const state = await this.getLevelDbTaskStateField(levelDb, taskIndex.toString());
387-
unfinishedTasks.push({ state });
393+
unfinishedTasks.push({ taskIndex, task: { state } });
388394
}
389395
}
390396
return unfinishedTasks;
391397
}
392398

393399
private async runProcessingTasks(
394-
tasks: taskType[],
400+
tasks: taskToProcessType[],
395401
jobLevelDb: Level,
396402
jobId: string,
397403
handleTask: taskHandlerType,
398404
parrallelLimit: number,
399405
onAllTasksDone?: onAllTasksDoneType,
406+
finishAttemptNumber = 1,
400407
) {
401-
let totalTasks = tasks.length;
402-
let completedTasks = 0;
403-
let failedTasks = 0;
404408
let lastJobStatus = 'IN_PROGRESS';
405409

406-
const taskHandler = async ( taskIndex: number, task: taskType ) => {
407-
totalTasks = await this.getTotalTasksInLevelDb(jobLevelDb);
408-
if (task.skip) {
409-
completedTasks = await this.handleFinishTask(completedTasks, totalTasks, jobId, true);
410-
return;
411-
}
410+
const taskHandler = async ( taskToProcess: taskToProcessType ) => {
411+
const { taskIndex, task } = taskToProcess;
412412
if (lastJobStatus === 'CANCELLED') {
413413
afLogger.info(`Job ${jobId} was cancelled. Skipping task ${taskIndex}.`);
414414
return;
@@ -477,7 +477,6 @@ export default class BackgroundJobsPlugin extends AdminForthPlugin {
477477
await this.setJobStateField(jobId, 'error', errorMessage);
478478
await this.setLevelDbTaskStatusField(jobLevelDb, taskIndex.toString(), 'FAILED');
479479
this.adminforth.websocket.publish(`/background-jobs-task-update/${jobId}`, { taskIndex, status: "FAILED" });
480-
failedTasks++;
481480
return;
482481
} finally {
483482
//Update progress
@@ -488,40 +487,49 @@ export default class BackgroundJobsPlugin extends AdminForthPlugin {
488487
return;
489488
}
490489

491-
completedTasks = await this.handleFinishTask(completedTasks, totalTasks, jobId);
490+
await this.handleFinishTask(jobLevelDb, jobId);
492491
}
493492
}
494493

495494
const limit = pLimit(parrallelLimit);
496-
const tasksToExecute = tasks.map((task, taskIndex) => {
497-
return limit(() => taskHandler(taskIndex, task));
495+
const tasksToExecute = tasks.map((task) => {
496+
return limit(() => taskHandler(task));
498497
});
499498

500499
await Promise.all(tasksToExecute);
501500
const unfinishedTasks = await this.getUnfinishedTasksFromLevelDb(jobLevelDb);
502501
if (unfinishedTasks.length > 0) {
503-
const tasksToReprocess = tasks.map((t) => {t.skip = true; t.state = t.state || {}; return t;});
504-
tasksToReprocess.push(...unfinishedTasks);
505-
await this.runProcessingTasks(tasksToReprocess, jobLevelDb, jobId, handleTask, parrallelLimit, onAllTasksDone);
506-
} else {
507-
if (lastJobStatus !== 'CANCELLED' && failedTasks === 0) {
508-
await this.adminforth.resource(this.getResourceId()).update(jobId, {
509-
[this.options.statusField]: 'DONE',
510-
[this.options.finishedAtField]: (new Date()).toISOString(),
511-
})
512-
this.adminforth.websocket.publish('/background-jobs-job-update', { jobId, status: 'DONE', finishedAt: (new Date()).toISOString() });
513-
this.cleanupJobMutexIfTerminalStatus(jobId, 'DONE');
514-
await this.triggerOnAllTasksDone(onAllTasksDone, jobLevelDb, jobId);
515-
} else if (failedTasks > 0) {
516-
await this.adminforth.resource(this.getResourceId()).update(jobId, {
517-
[this.options.statusField]: 'DONE_WITH_ERRORS',
518-
[this.options.finishedAtField]: (new Date()).toISOString(),
519-
})
520-
this.adminforth.websocket.publish('/background-jobs-job-update', { jobId, status: 'DONE_WITH_ERRORS', finishedAt: (new Date()).toISOString() });
521-
this.cleanupJobMutexIfTerminalStatus(jobId, 'DONE_WITH_ERRORS');
522-
await this.triggerOnAllTasksDone(onAllTasksDone, jobLevelDb, jobId);
523-
}
502+
await this.runProcessingTasks(unfinishedTasks, jobLevelDb, jobId, handleTask, parrallelLimit, onAllTasksDone, finishAttemptNumber);
503+
return;
504+
}
505+
506+
if (lastJobStatus === 'CANCELLED') {
507+
return;
508+
}
509+
510+
await this.triggerOnAllTasksDone(onAllTasksDone, jobLevelDb, jobId, finishAttemptNumber);
511+
512+
const currentJobStatus = await this.getLastJobStatus(jobId);
513+
if (currentJobStatus === 'CANCELLED') {
514+
this.cleanupJobMutexIfTerminalStatus(jobId, currentJobStatus);
515+
return;
524516
}
517+
518+
const tasksAddedByCallback = await this.getUnfinishedTasksFromLevelDb(jobLevelDb);
519+
if (tasksAddedByCallback.length > 0) {
520+
await this.runProcessingTasks(tasksAddedByCallback, jobLevelDb, jobId, handleTask, parrallelLimit, onAllTasksDone, finishAttemptNumber + 1);
521+
return;
522+
}
523+
524+
const { failedTasks } = await this.getAllTasksDoneStatus(jobLevelDb);
525+
const finalStatus = failedTasks > 0 ? 'DONE_WITH_ERRORS' : 'DONE';
526+
const finishedAt = (new Date()).toISOString();
527+
await this.adminforth.resource(this.getResourceId()).update(jobId, {
528+
[this.options.statusField]: finalStatus,
529+
[this.options.finishedAtField]: finishedAt,
530+
})
531+
this.adminforth.websocket.publish('/background-jobs-job-update', { jobId, status: finalStatus, finishedAt });
532+
this.cleanupJobMutexIfTerminalStatus(jobId, finalStatus);
525533
}
526534

527535
private async getLastJobStatus(jobId: string): Promise<string> {
@@ -530,17 +538,18 @@ export default class BackgroundJobsPlugin extends AdminForthPlugin {
530538
return currentJobStatus;
531539
}
532540

533-
private async handleFinishTask(completedTasks: number, totalTasks: number, jobId: string, wasTaskSkipped: boolean = false) {
534-
completedTasks++;
535-
if (wasTaskSkipped) {
536-
return completedTasks;
537-
}
538-
const progress = Math.round((completedTasks / totalTasks) * 100);
541+
private async updateJobProgressFromLevelDb(jobLevelDb: Level, jobId: string) {
542+
const totalTasks = await this.getTotalTasksInLevelDb(jobLevelDb);
543+
const { failedTasks, succeededTasks } = await this.getAllTasksDoneStatus(jobLevelDb);
544+
const progress = totalTasks === 0 ? 100 : Math.round(((failedTasks + succeededTasks) / totalTasks) * 100);
539545
await this.adminforth.resource(this.getResourceId()).update(jobId, {
540546
[this.options.progressField]: progress,
541547
})
542548
this.adminforth.websocket.publish('/background-jobs-job-update', { jobId, progress });
543-
return completedTasks;
549+
}
550+
551+
private async handleFinishTask(jobLevelDb: Level, jobId: string) {
552+
await this.updateJobProgressFromLevelDb(jobLevelDb, jobId);
544553
}
545554

546555

@@ -559,28 +568,7 @@ export default class BackgroundJobsPlugin extends AdminForthPlugin {
559568
const parrallelLimit = this.jobParallelLimits[jobHandlerName] || 3;
560569
const onAllTasksDone = this.onAllTasksDoneHandlers[jobHandlerName];
561570

562-
const unfinishedTasks: taskType[] = [];
563-
let taskIndex = 0;
564-
while (true) {
565-
const taskData = await jobLevelDb.get(taskIndex.toString());
566-
if (!taskData) {
567-
break;
568-
}
569-
let parsedTaskData: { state: Record<string, any>, status: TaskStatus };
570-
try {
571-
parsedTaskData = JSON.parse(taskData);
572-
} catch (error) {
573-
afLogger.error(`Error parsing task data for task ${taskIndex} of job ${job[this.getResourcePk()]}: ${error}`);
574-
taskIndex++;
575-
continue;
576-
}
577-
if (parsedTaskData.status === 'IN_PROGRESS' || parsedTaskData.status === 'SCHEDULED') {
578-
unfinishedTasks.push({ state: parsedTaskData.state });
579-
} else {
580-
unfinishedTasks.push({ state: parsedTaskData.state, skip: true });
581-
}
582-
taskIndex++;
583-
}
571+
const unfinishedTasks = await this.getUnfinishedTasksFromLevelDb(jobLevelDb);
584572
await this.runProcessingTasks(unfinishedTasks, jobLevelDb, job[this.getResourcePk()], handleTask, parrallelLimit, onAllTasksDone);
585573

586574
}

0 commit comments

Comments
 (0)