diff --git a/app/support/DbAdapter/jobs.js b/app/support/DbAdapter/jobs.js index 4b6f56f80..3e4502e3b 100644 --- a/app/support/DbAdapter/jobs.js +++ b/app/support/DbAdapter/jobs.js @@ -1,3 +1,5 @@ +import { difference, uniq } from 'lodash'; + import { Job } from '../../models'; import { prepareModelPayload, initObject } from './utils'; @@ -48,80 +50,121 @@ export default function jobsTrait(superClass) { * @returns {Promise} */ async fetchJobs(count, lockTime, limitedJobs = {}) { - const limData = Object.entries(limitedJobs).map(([name, lim]) => ({ name, lim })); - let rows; + const hasLimits = Object.keys(limitedJobs).length > 0; - if (limData.length === 0) { + if (!hasLimits) { // Simple case, no limited jobs - rows = await this.database.getAll( - `update jobs set - unlock_at = now() + :lockTime * '1 second'::interval, - attempts = attempts + 1 - where id = any( - select id from jobs - where unlock_at <= now() - order by unlock_at - for update skip locked - limit :count - ) - returning *`, - { count, lockTime }, - ); - } else { - // Case with limited jobs - rows = await this.database.getAll( - `with - -- Parse job limits from input JSON - limits as ( - select name, lim - from jsonb_to_recordset(:limData) as x(name text, lim int) - ), - -- Count currently locked jobs for each job type - locked_counts as ( - select name, count(*) as locked_count - from jobs - where unlock_at > now() - group by name - ), - -- Get available jobs and lock them immediately - lockable_jobs as ( - select id, name, unlock_at - from jobs - where unlock_at <= now() - for update skip locked - ), - -- Number locked jobs within each job type - numbered_jobs as ( - select - id, - name, - unlock_at, - row_number() over (partition by name) as row_num - from lockable_jobs + const rows = await this._justFetchJobs(this.database, count, lockTime); + return rows.map(initJobObject); + } + + const rows1 = await this.database.transaction(async (trx) => { + // Lock jobs table + await trx.raw('lock table jobs in share update exclusive mode'); + const allRows = []; + const allReturning = []; + let maxLoops = 10; // To prevent infinite loop + + while (maxLoops--) { + // eslint-disable-next-line no-await-in-loop + const { rows, toReturn } = await this._fetchJobsWithLimits( + trx, + count, + lockTime, + limitedJobs, + ); + allRows.push(...rows); + allReturning.push(...toReturn); + + if (toReturn.length === 0 || rows.length + toReturn.length < count) { + break; + } + + count -= rows.length; + } + + // Unlock all returned jobs + await Promise.all( + allReturning.map(({ id, old_unlock_at }) => + trx.raw(`update jobs set unlock_at = :old_unlock_at where id = :id`, { + id, + old_unlock_at, + }), ), - -- Select jobs respecting limits and max count - selected_jobs as ( - select j.id - from numbered_jobs j - left join limits l on j.name = l.name - left join locked_counts lc on j.name = lc.name - where - l.name is null -- no limits for this job type - or j.row_num + coalesce(lc.locked_count, 0) <= l.lim -- within limits - order by j.unlock_at - limit :count - ) - -- Update selected jobs with new unlock time - update jobs - set unlock_at = now() + :lockTime * '1 second'::interval, - attempts = attempts + 1 - where id in (select id from selected_jobs) - returning *`, - { count, lockTime, limData: JSON.stringify(limData) }, ); + + return allRows; + }); + + return rows1.map(initJobObject); + } + + async _justFetchJobs(db, count, lockTime) { + return await db.getAll( + `with selected as ( + select id, unlock_at as old_unlock_at from jobs + where unlock_at <= now() + order by unlock_at + for update skip locked + limit :count + ) + update jobs set + unlock_at = now() + :lockTime * '1 second'::interval, + attempts = attempts + 1 + from selected + where jobs.id = selected.id + returning jobs.*, selected.old_unlock_at`, + { count, lockTime }, + ); + } + + async _fetchJobsWithLimits(db, count, lockTime, limitedJobs = {}) { + const rows = await this._justFetchJobs(db, count, lockTime); + const toReturn = []; + + // Are there any limited jobs in results? + const limRows = rows.filter((row) => limitedJobs[row.name]).reverse(); // Add .reverse() for the further filtering + + if (limRows.length === 0) { + // No limited jobs, just return + return { rows, toReturn }; } - return rows.map(initJobObject); + // How many jobs are already taken for each job name? + const takenCounts = await db.getAll( + `select name, count(*)::int from jobs where unlock_at > now() and name = any(:limNames) group by name`, + { limNames: uniq(limRows.map((row) => row.name)) }, + ); + + // How many jobs are over the limit for each job name? + const overLimits = takenCounts.reduce((acc, row) => { + acc[row.name] = row.count - limitedJobs[row.name]; + return acc; + }, {}); + + // Put back jobs that are over the limit + + for (const row of limRows) { + if (overLimits[row.name] > 0) { + toReturn.push(row); + overLimits[row.name]--; + } + } + + if (toReturn.length === 0) { + return { rows, toReturn }; + } + + // Put the job back to the nearest future + await db.raw( + `update jobs set unlock_at = now() + '1 minute'::interval, attempts = attempts - 1 where id = any(:ids)`, + { ids: toReturn.map((r) => r.id) }, + ); + + return { + rows: difference(rows, toReturn), + toReturn, + }; } // For testing purposes only diff --git a/test/integration/models/jobs.js b/test/integration/models/jobs.js index 433418e63..2be6e7dd1 100644 --- a/test/integration/models/jobs.js +++ b/test/integration/models/jobs.js @@ -6,7 +6,7 @@ import unexpected from 'unexpected'; import unexpectedDate from 'unexpected-date'; import unexpectedSinon from 'unexpected-sinon'; import { spy } from 'sinon'; -import { sortBy } from 'lodash'; +import { difference, sortBy } from 'lodash'; import cleanDB from '../../dbCleaner'; import { Job, dbAdapter, JobManager } from '../../../app/models'; @@ -369,20 +369,18 @@ describe('Jobs', () => { await Job.create('foo'), ]; + const allIds = jobs.map((j) => j.id); + let [pj] = await Promise.all([jm.fetchAndProcess(), setTimeout(50).then(resolve)]); + const fetchedIds = pj.map((j) => j.id); - expect( - pj.map((j) => j.id), - 'when sorted', - 'to equal', - [jobs[0].id, jobs[1].id].sort(), - ); + expect(fetchedIds, 'to have length', 2); pj = await jm.fetchAndProcess(); expect( pj.map((j) => j.id), 'to equal', - [jobs[2].id], + difference(allIds, fetchedIds), ); }); @@ -413,7 +411,7 @@ describe('Jobs', () => { it('should respect unlock_at ordering', async () => { const jm = new JobManager({ limitedJobs: { foo: 1 } }); const foo1 = await Job.create('foo'); - await setTimeout(10); + await setTimeout(100); await Job.create('foo'); const jobs = await jm.fetch();