Skip to content

Commit 93ff562

Browse files
authored
perf: use last_scheduled_task_id for dueSchedules query (NangoHQ#2383)
scanning tasks table to determine if a schedule has a running task or had a recently run task doesn't scale we are now using the last_scheduled_task_id column in schedules to avoid heavy subqueries ## Checklist before requesting a review (skip if just adding/editing APIs & templates) - [ ] I added tests, otherwise the reason is: - [ ] I added observability, otherwise the reason is: - [ ] I added analytics, otherwise the reason is:
1 parent 0a00097 commit 93ff562

File tree

1 file changed

+20
-34
lines changed

1 file changed

+20
-34
lines changed

packages/scheduler/lib/workers/scheduling/scheduling.ts

+20-34
Original file line numberDiff line numberDiff line change
@@ -7,40 +7,26 @@ import { TASKS_TABLE } from '../../models/tasks.js';
77

88
export async function dueSchedules(db: knex.Knex): Promise<Result<Schedule[]>> {
99
try {
10-
const query = db
11-
.with(
12-
'due_dates',
13-
// calculate the most recent due date for each schedule that is started/not deleted
14-
db
15-
.select(
16-
's.id',
17-
db.raw(`
18-
s.starts_at + (FLOOR(EXTRACT(EPOCH FROM (NOW() - s.starts_at)) / EXTRACT(EPOCH FROM s.frequency)) * s.frequency) AS dueAt
19-
`)
20-
)
21-
.from({ s: SCHEDULES_TABLE })
22-
.where({ state: 'STARTED' })
23-
.whereRaw('s.starts_at <= NOW()')
24-
// Locking schedules to prevent any concurrent update or concurrent scheduling of tasks
25-
.forUpdate()
26-
.skipLocked()
27-
)
28-
.select('*')
29-
.from<DbSchedule>({ s: SCHEDULES_TABLE })
30-
.joinRaw('JOIN due_dates lrt ON s.id = lrt.id')
31-
// filter out schedules that have a running task
32-
.whereNotExists(
33-
db
34-
.select('id')
35-
.from({ t: TASKS_TABLE })
36-
.whereRaw('t.schedule_id = s.id')
37-
.where(function () {
38-
this.where({ state: 'CREATED' }).orWhere({ state: 'STARTED' });
39-
})
40-
)
41-
// filter out schedules that have tasks started after the due date
42-
.whereNotExists(db.select('id').from({ t: TASKS_TABLE }).whereRaw('t.schedule_id = s.id').andWhere('t.starts_after', '>=', db.raw('lrt.dueAt')));
43-
const schedules = await query;
10+
const schedules: DbSchedule[] = await db
11+
.select('s.*')
12+
.from({ s: SCHEDULES_TABLE })
13+
.leftJoin(`${TASKS_TABLE} AS t`, 's.last_scheduled_task_id', 't.id')
14+
.where('s.state', 'STARTED')
15+
.where('s.starts_at', '<=', db.fn.now())
16+
.where(function () {
17+
// schedule has never been run
18+
this.where('s.last_scheduled_task_id', 'IS', null)
19+
// schedule with last task not running and was started before the last due time
20+
.orWhere(function () {
21+
this.whereNotIn('t.state', ['CREATED', 'STARTED']).andWhere(
22+
't.starts_after',
23+
'<',
24+
db.raw(`s.starts_at + (floor(extract(EPOCH FROM (now() - s.starts_at)) / extract(EPOCH FROM s.frequency)) * s.frequency)`)
25+
);
26+
});
27+
})
28+
.forUpdate('s')
29+
.skipLocked();
4430
return Ok(schedules.map(DbSchedule.from));
4531
} catch (err: unknown) {
4632
console.log(err);

0 commit comments

Comments
 (0)