Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 111 additions & 68 deletions app/support/DbAdapter/jobs.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { difference, uniq } from 'lodash';

import { Job } from '../../models';

import { prepareModelPayload, initObject } from './utils';
Expand Down Expand Up @@ -48,80 +50,121 @@ export default function jobsTrait(superClass) {
* @returns {Promise<Job[]>}
*/
async fetchJobs(count, lockTime, limitedJobs = {}) {
const limData = Object.entries(limitedJobs).map(([name, lim]) => ({ name, lim }));
let rows;
const hasLimits = Object.keys(limitedJobs).length > 0;

if (limData.length === 0) {
if (!hasLimits) {
// Simple case, no limited jobs
rows = await this.database.getAll(
`update jobs set
unlock_at = now() + :lockTime * '1 second'::interval,
attempts = attempts + 1
where id = any(
select id from jobs
where unlock_at <= now()
order by unlock_at
for update skip locked
limit :count
)
returning *`,
{ count, lockTime },
);
} else {
// Case with limited jobs
rows = await this.database.getAll(
`with
-- Parse job limits from input JSON
limits as (
select name, lim
from jsonb_to_recordset(:limData) as x(name text, lim int)
),
-- Count currently locked jobs for each job type
locked_counts as (
select name, count(*) as locked_count
from jobs
where unlock_at > now()
group by name
),
-- Get available jobs and lock them immediately
lockable_jobs as (
select id, name, unlock_at
from jobs
where unlock_at <= now()
for update skip locked
),
-- Number locked jobs within each job type
numbered_jobs as (
select
id,
name,
unlock_at,
row_number() over (partition by name) as row_num
from lockable_jobs
const rows = await this._justFetchJobs(this.database, count, lockTime);
return rows.map(initJobObject);
}

const rows1 = await this.database.transaction(async (trx) => {
// Lock jobs table
await trx.raw('lock table jobs in share update exclusive mode');
const allRows = [];
const allReturning = [];
let maxLoops = 10; // To prevent infinite loop

while (maxLoops--) {
// eslint-disable-next-line no-await-in-loop
const { rows, toReturn } = await this._fetchJobsWithLimits(
trx,
count,
lockTime,
limitedJobs,
);
allRows.push(...rows);
allReturning.push(...toReturn);

if (toReturn.length === 0 || rows.length + toReturn.length < count) {
break;
}

count -= rows.length;
}

// Unlock all returned jobs
await Promise.all(
allReturning.map(({ id, old_unlock_at }) =>
trx.raw(`update jobs set unlock_at = :old_unlock_at where id = :id`, {
id,
old_unlock_at,
}),
),
-- Select jobs respecting limits and max count
selected_jobs as (
select j.id
from numbered_jobs j
left join limits l on j.name = l.name
left join locked_counts lc on j.name = lc.name
where
l.name is null -- no limits for this job type
or j.row_num + coalesce(lc.locked_count, 0) <= l.lim -- within limits
order by j.unlock_at
limit :count
)
-- Update selected jobs with new unlock time
update jobs
set unlock_at = now() + :lockTime * '1 second'::interval,
attempts = attempts + 1
where id in (select id from selected_jobs)
returning *`,
{ count, lockTime, limData: JSON.stringify(limData) },
);

return allRows;
});

return rows1.map(initJobObject);
}

async _justFetchJobs(db, count, lockTime) {
return await db.getAll(
`with selected as (
select id, unlock_at as old_unlock_at from jobs
where unlock_at <= now()
order by unlock_at
for update skip locked
limit :count
)
update jobs set
unlock_at = now() + :lockTime * '1 second'::interval,
attempts = attempts + 1
from selected
where jobs.id = selected.id
returning jobs.*, selected.old_unlock_at`,
{ count, lockTime },
);
}

async _fetchJobsWithLimits(db, count, lockTime, limitedJobs = {}) {
const rows = await this._justFetchJobs(db, count, lockTime);
const toReturn = [];

// Are there any limited jobs in results?
const limRows = rows.filter((row) => limitedJobs[row.name]).reverse(); // Add .reverse() for the further filtering

if (limRows.length === 0) {
// No limited jobs, just return
return { rows, toReturn };
}

return rows.map(initJobObject);
// How many jobs are already taken for each job name?
const takenCounts = await db.getAll(
`select name, count(*)::int from jobs where unlock_at > now() and name = any(:limNames) group by name`,
{ limNames: uniq(limRows.map((row) => row.name)) },
);

// How many jobs are over the limit for each job name?
const overLimits = takenCounts.reduce((acc, row) => {
acc[row.name] = row.count - limitedJobs[row.name];
return acc;
}, {});

// Put back jobs that are over the limit

for (const row of limRows) {
if (overLimits[row.name] > 0) {
toReturn.push(row);
overLimits[row.name]--;
}
}

if (toReturn.length === 0) {
return { rows, toReturn };
}

// Put the job back to the nearest future
await db.raw(
`update jobs set unlock_at = now() + '1 minute'::interval, attempts = attempts - 1 where id = any(:ids)`,
{ ids: toReturn.map((r) => r.id) },
);

return {
rows: difference(rows, toReturn),
toReturn,
};
}

// For testing purposes only
Expand Down
16 changes: 7 additions & 9 deletions test/integration/models/jobs.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import unexpected from 'unexpected';
import unexpectedDate from 'unexpected-date';
import unexpectedSinon from 'unexpected-sinon';
import { spy } from 'sinon';
import { sortBy } from 'lodash';
import { difference, sortBy } from 'lodash';

import cleanDB from '../../dbCleaner';
import { Job, dbAdapter, JobManager } from '../../../app/models';
Expand Down Expand Up @@ -369,20 +369,18 @@ describe('Jobs', () => {
await Job.create('foo'),
];

const allIds = jobs.map((j) => j.id);

let [pj] = await Promise.all([jm.fetchAndProcess(), setTimeout(50).then(resolve)]);
const fetchedIds = pj.map((j) => j.id);

expect(
pj.map((j) => j.id),
'when sorted',
'to equal',
[jobs[0].id, jobs[1].id].sort(),
);
expect(fetchedIds, 'to have length', 2);

pj = await jm.fetchAndProcess();
expect(
pj.map((j) => j.id),
'to equal',
[jobs[2].id],
difference(allIds, fetchedIds),
);
});

Expand Down Expand Up @@ -413,7 +411,7 @@ describe('Jobs', () => {
it('should respect unlock_at ordering', async () => {
const jm = new JobManager({ limitedJobs: { foo: 1 } });
const foo1 = await Job.create('foo');
await setTimeout(10);
await setTimeout(100);
await Job.create('foo');

const jobs = await jm.fetch();
Expand Down