@@ -196,6 +196,22 @@ export default class BackgroundJobsPlugin extends AdminForthPlugin {
196196 return { failedTasks, succeededTasks } ;
197197 }
198198
199+ private async getLevelDbForTheJob ( jobId : string ) : Promise < Level > {
200+ const levelDbPath = `${ this . options . levelDbPath || './background-jobs-dbs/' } job_${ jobId } ` ;
201+ let jobLevelDb : Level ;
202+ if ( this . levelDbInstances [ jobId ] ) {
203+ jobLevelDb = this . levelDbInstances [ jobId ] ;
204+ } else {
205+ try {
206+ jobLevelDb = new Level ( levelDbPath , { valueEncoding : 'json' } ) ;
207+ this . levelDbInstances [ jobId ] = jobLevelDb ;
208+ } catch ( error ) {
209+ throw new Error ( `Failed to access task storage for job with id ${ jobId } .` ) ;
210+ }
211+ }
212+ return jobLevelDb ;
213+ }
214+
199215 private publishJobStateField ( jobId : string , fieldName : string , value : any ) {
200216 this . adminforth . websocket . publish ( `/background-jobs-state-update/${ jobId } /${ encodeStateFieldName ( fieldName ) } ` , {
201217 jobId,
@@ -301,8 +317,7 @@ export default class BackgroundJobsPlugin extends AdminForthPlugin {
301317 } ) ;
302318
303319 //create a level db instance for the job with name as jobId
304- const jobLevelDb = new Level ( `${ this . options . levelDbPath || './background-jobs-dbs/' } job_${ jobId } ` , { valueEncoding : 'json' } ) ;
305- this . levelDbInstances [ jobId ] = jobLevelDb ;
320+ const jobLevelDb = await this . getLevelDbForTheJob ( jobId ) ;
306321 await jobLevelDb . put ( '_meta:count' , `${ tasks . length } ` ) ;
307322 const limit2 = pLimit ( parrallelLimit ) ;
308323 const createTaskRecordsPromises = tasks . map ( ( task , index ) => {
@@ -315,6 +330,66 @@ export default class BackgroundJobsPlugin extends AdminForthPlugin {
315330 return jobId ;
316331 }
317332
333+ public async addNewTasksToExistingJob (
334+ jobId : string ,
335+ tasks : taskType [ ] ,
336+ ) {
337+ const jobRecord = await this . adminforth . resource ( this . getResourceId ( ) ) . get ( Filters . EQ ( this . getResourcePk ( ) , jobId ) ) ;
338+ if ( ! jobRecord ) {
339+ throw new Error ( `Job with id ${ jobId } not found.` ) ;
340+ }
341+ const jobStatus = jobRecord [ this . options . statusField ] ;
342+ if ( jobStatus !== 'IN_PROGRESS' ) {
343+ throw new Error ( `Cannot add tasks to a job with status ${ jobStatus } . Only jobs with status IN_PROGRESS can be added new tasks.` ) ;
344+ }
345+ const jobLevelDb = await this . getLevelDbForTheJob ( jobId ) ;
346+ const currentTotalTasks = await this . getTotalTasksInLevelDb ( jobLevelDb ) ;
347+ const newTotalTasks = currentTotalTasks + tasks . length ;
348+ await jobLevelDb . put ( '_meta:count' , `${ newTotalTasks } ` ) ;
349+ const createTaskRecordsPromises = tasks . map ( ( task , index ) => {
350+ return this . createLevelDbTaskRecord ( jobLevelDb , ( currentTotalTasks + index ) . toString ( ) , task . state ) ;
351+ } ) ;
352+
353+ await Promise . all ( createTaskRecordsPromises ) ;
354+ }
355+
356+ public async deleteTasksFromExistingJob (
357+ jobId : string ,
358+ taskIndex : number ,
359+ ) : Promise < void > {
360+ if ( taskIndex < 0 ) {
361+ throw new Error ( `Invalid task index ${ taskIndex } .` ) ;
362+ }
363+ const jobRecord = await this . adminforth . resource ( this . getResourceId ( ) ) . get ( Filters . EQ ( this . getResourcePk ( ) , jobId ) ) ;
364+ if ( ! jobRecord ) {
365+ throw new Error ( `Job with id ${ jobId } not found.` ) ;
366+ }
367+ const jobStatus = jobRecord [ this . options . statusField ] ;
368+ if ( jobStatus !== 'IN_PROGRESS' ) {
369+ throw new Error ( `Cannot delete tasks from a job with status ${ jobStatus } . Only jobs with status IN_PROGRESS can have tasks deleted.` ) ;
370+ }
371+ const jobLevelDb = await this . getLevelDbForTheJob ( jobId ) ;
372+ const currentTotalTasks = await this . getTotalTasksInLevelDb ( jobLevelDb ) ;
373+ if ( taskIndex >= currentTotalTasks ) {
374+ throw new Error ( `Invalid task index ${ taskIndex } .` ) ;
375+ }
376+ await jobLevelDb . del ( taskIndex . toString ( ) ) ;
377+ await jobLevelDb . put ( '_meta:count' , `${ currentTotalTasks - 1 } ` ) ;
378+ }
379+
380+ private async getUnfinishedTasksFromLevelDb ( levelDb : Level ) : Promise < { state : Record < string , any > } [ ] > {
381+ const totalTasks = await this . getTotalTasksInLevelDb ( levelDb ) ;
382+ const unfinishedTasks : { state : Record < string , any > } [ ] = [ ] ;
383+ for ( let taskIndex = 0 ; taskIndex < totalTasks ; taskIndex ++ ) {
384+ const status = await this . getLevelDbTaskStatusField ( levelDb , taskIndex . toString ( ) ) ;
385+ if ( status === 'IN_PROGRESS' || status === 'SCHEDULED' ) {
386+ const state = await this . getLevelDbTaskStateField ( levelDb , taskIndex . toString ( ) ) ;
387+ unfinishedTasks . push ( { state } ) ;
388+ }
389+ }
390+ return unfinishedTasks ;
391+ }
392+
318393 private async runProcessingTasks (
319394 tasks : taskType [ ] ,
320395 jobLevelDb : Level ,
@@ -323,12 +398,13 @@ export default class BackgroundJobsPlugin extends AdminForthPlugin {
323398 parrallelLimit : number ,
324399 onAllTasksDone ?: onAllTasksDoneType ,
325400 ) {
326- const totalTasks = tasks . length ;
401+ let totalTasks = tasks . length ;
327402 let completedTasks = 0 ;
328403 let failedTasks = 0 ;
329404 let lastJobStatus = 'IN_PROGRESS' ;
330405
331- const taskHandler = async ( taskIndex : number , task ) => {
406+ const taskHandler = async ( taskIndex : number , task : taskType ) => {
407+ totalTasks = await this . getTotalTasksInLevelDb ( jobLevelDb ) ;
332408 if ( task . skip ) {
333409 completedTasks = await this . handleFinishTask ( completedTasks , totalTasks , jobId , true ) ;
334410 return ;
@@ -344,7 +420,12 @@ export default class BackgroundJobsPlugin extends AdminForthPlugin {
344420 afLogger . info ( `Job ${ jobId } was cancelled. Skipping task ${ taskIndex } .` ) ;
345421 return ;
346422 }
347-
423+ // check if task is still exists in level db, because it can be deleted while processing
424+ const taskStatus = await this . getLevelDbTaskStatusField ( jobLevelDb , taskIndex . toString ( ) ) ;
425+ if ( ! taskStatus ) {
426+ afLogger . info ( `Task ${ taskIndex } of job ${ jobId } was deleted. Skipping processing.` ) ;
427+ return ;
428+ }
348429 const getState = async ( ) => {
349430 return await this . getLevelDbTaskStateField ( jobLevelDb , taskIndex . toString ( ) ) ;
350431 }
@@ -417,22 +498,29 @@ export default class BackgroundJobsPlugin extends AdminForthPlugin {
417498 } ) ;
418499
419500 await Promise . all ( tasksToExecute ) ;
420- if ( lastJobStatus !== 'CANCELLED' && failedTasks === 0 ) {
421- await this . adminforth . resource ( this . getResourceId ( ) ) . update ( jobId , {
422- [ this . options . statusField ] : 'DONE' ,
423- [ this . options . finishedAtField ] : ( new Date ( ) ) . toISOString ( ) ,
424- } )
425- this . adminforth . websocket . publish ( '/background-jobs-job-update' , { jobId, status : 'DONE' , finishedAt : ( new Date ( ) ) . toISOString ( ) } ) ;
426- this . cleanupJobMutexIfTerminalStatus ( jobId , 'DONE' ) ;
427- await this . triggerOnAllTasksDone ( onAllTasksDone , jobLevelDb , jobId ) ;
428- } else if ( failedTasks > 0 ) {
429- await this . adminforth . resource ( this . getResourceId ( ) ) . update ( jobId , {
430- [ this . options . statusField ] : 'DONE_WITH_ERRORS' ,
431- [ this . options . finishedAtField ] : ( new Date ( ) ) . toISOString ( ) ,
432- } )
433- this . adminforth . websocket . publish ( '/background-jobs-job-update' , { jobId, status : 'DONE_WITH_ERRORS' } ) ;
434- this . cleanupJobMutexIfTerminalStatus ( jobId , 'DONE_WITH_ERRORS' ) ;
435- await this . triggerOnAllTasksDone ( onAllTasksDone , jobLevelDb , jobId ) ;
501+ const unfinishedTasks = await this . getUnfinishedTasksFromLevelDb ( jobLevelDb ) ;
502+ if ( unfinishedTasks . length > 0 ) {
503+ const tasksToReprocess = tasks . map ( ( t ) => { t . skip = true ; t . state = t . state || { } ; return t ; } ) ;
504+ tasksToReprocess . push ( ...unfinishedTasks ) ;
505+ await this . runProcessingTasks ( tasksToReprocess , jobLevelDb , jobId , handleTask , parrallelLimit , onAllTasksDone ) ;
506+ } else {
507+ if ( lastJobStatus !== 'CANCELLED' && failedTasks === 0 ) {
508+ await this . adminforth . resource ( this . getResourceId ( ) ) . update ( jobId , {
509+ [ this . options . statusField ] : 'DONE' ,
510+ [ this . options . finishedAtField ] : ( new Date ( ) ) . toISOString ( ) ,
511+ } )
512+ this . adminforth . websocket . publish ( '/background-jobs-job-update' , { jobId, status : 'DONE' , finishedAt : ( new Date ( ) ) . toISOString ( ) } ) ;
513+ this . cleanupJobMutexIfTerminalStatus ( jobId , 'DONE' ) ;
514+ await this . triggerOnAllTasksDone ( onAllTasksDone , jobLevelDb , jobId ) ;
515+ } else if ( failedTasks > 0 ) {
516+ await this . adminforth . resource ( this . getResourceId ( ) ) . update ( jobId , {
517+ [ this . options . statusField ] : 'DONE_WITH_ERRORS' ,
518+ [ this . options . finishedAtField ] : ( new Date ( ) ) . toISOString ( ) ,
519+ } )
520+ this . adminforth . websocket . publish ( '/background-jobs-job-update' , { jobId, status : 'DONE_WITH_ERRORS' , finishedAt : ( new Date ( ) ) . toISOString ( ) } ) ;
521+ this . cleanupJobMutexIfTerminalStatus ( jobId , 'DONE_WITH_ERRORS' ) ;
522+ await this . triggerOnAllTasksDone ( onAllTasksDone , jobLevelDb , jobId ) ;
523+ }
436524 }
437525 }
438526
@@ -668,17 +756,9 @@ export default class BackgroundJobsPlugin extends AdminForthPlugin {
668756 path : `/plugin/${ this . pluginInstanceId } /get-tasks` ,
669757 handler : async ( { body } ) => {
670758 const { jobId, limit, offset } = body ;
671- const levelDbPath = `${ this . options . levelDbPath || './background-jobs-dbs/' } job_${ jobId } ` ;
672- let jobLevelDb : Level ;
673- if ( this . levelDbInstances [ jobId ] ) {
674- jobLevelDb = this . levelDbInstances [ jobId ] ;
675- } else {
676- try {
677- jobLevelDb = new Level ( levelDbPath , { valueEncoding : 'json' } ) ;
678- this . levelDbInstances [ jobId ] = jobLevelDb ;
679- } catch ( error ) {
680- return { ok : false , message : `Failed to access tasks for job with id ${ jobId } .` } ;
681- }
759+ const jobLevelDb : Level = await this . getLevelDbForTheJob ( jobId ) ;
760+ if ( ! jobLevelDb ) {
761+ return { ok : false , message : `Job with id ${ jobId } not found.` } ;
682762 }
683763 const tasks = [ ] ;
684764 let taskIndex = 0 + offset ;
0 commit comments