Skip to content

Commit 0a00097

Browse files
authored
chore: add last_scheduled_task_id column to schedules (NangoHQ#2382)
DueSchedule query is too slow because for each schedules we are going through the tasks table to find out if there is any running tasks or if any task has been running recently. In order to make the query faster we are adding a last_scheduled_task_id column to schedules which is populated when scheduling a task. This column will allow us to join schedules with tasks based on this column and look at the state and created_at of the tasks to find due schedules ## Describe your changes ## Issue ticket number and link ## Checklist before requesting a review (skip if just adding/editing APIs & templates) - [ ] I added tests, otherwise the reason is: - [ ] I added observability, otherwise the reason is: - [ ] I added analytics, otherwise the reason is:
1 parent 4053c8a commit 0a00097

File tree

9 files changed

+57
-16
lines changed

9 files changed

+57
-16
lines changed

packages/orchestrator/lib/clients/validate.ts

+2-1
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,8 @@ export function validateSchedule(schedule: Schedule): Result<OrchestratorSchedul
152152
heartbeatTimeoutSecs: z.number().int(),
153153
createdAt: z.coerce.date(),
154154
updatedAt: z.coerce.date(),
155-
deletedAt: z.coerce.date().nullable()
155+
deletedAt: z.coerce.date().nullable(),
156+
lastScheduledTaskId: z.string().uuid().nullable()
156157
})
157158
.strict();
158159
const getNextDueDate = (startsAt: Date, frequencyMs: number) => {

packages/orchestrator/lib/routes/v1/postRecurring.ts

+2-1
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,8 @@ const handler = (scheduler: Scheduler) => {
6969
retryMax: req.body.retry.max,
7070
createdToStartedTimeoutSecs: req.body.timeoutSettingsInSecs.createdToStarted,
7171
startedToCompletedTimeoutSecs: req.body.timeoutSettingsInSecs.startedToCompleted,
72-
heartbeatTimeoutSecs: req.body.timeoutSettingsInSecs.heartbeat
72+
heartbeatTimeoutSecs: req.body.timeoutSettingsInSecs.heartbeat,
73+
lastScheduledTaskId: null
7374
});
7475
if (schedule.isErr()) {
7576
return res.status(500).json({ error: { code: 'recurring_failed', message: schedule.error.message } });
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
import type { Knex } from 'knex';
2+
import { SCHEDULES_TABLE } from '../../models/schedules.js';
3+
4+
export async function up(knex: Knex): Promise<void> {
5+
await knex.raw(`
6+
ALTER TABLE ${SCHEDULES_TABLE}
7+
ADD COLUMN IF NOT EXISTS last_scheduled_task_id uuid NULL;
8+
`);
9+
}
10+
11+
export async function down(knex: Knex): Promise<void> {
12+
await knex.raw(`
13+
ALTER TABLE ${SCHEDULES_TABLE}
14+
DROP COLUMN IF EXISTS last_scheduled_task_id;
15+
16+
`);
17+
}

packages/scheduler/lib/models/schedules.integration.test.ts

+6-3
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import * as schedules from './schedules.js';
33
import { getTestDbClient } from '../db/helpers.test.js';
44
import type { Schedule } from '../types.js';
55
import type knex from 'knex';
6+
import { uuidv7 } from 'uuidv7';
67

78
describe('Schedules', () => {
89
const dbClient = getTestDbClient();
@@ -25,7 +26,8 @@ describe('Schedules', () => {
2526
frequencyMs: 300_000,
2627
createdAt: expect.toBeIsoDateTimezone(),
2728
updatedAt: expect.toBeIsoDateTimezone(),
28-
deletedAt: null
29+
deletedAt: null,
30+
lastScheduledTaskId: null
2931
});
3032
});
3133
it('should be successfully retrieved', async () => {
@@ -60,7 +62,7 @@ describe('Schedules', () => {
6062
});
6163
it('should be successfully updated', async () => {
6264
const schedule = await createSchedule(db);
63-
const updated = (await schedules.update(db, { id: schedule.id, frequencyMs: 600_000, payload: { i: 2 } })).unwrap();
65+
const updated = (await schedules.update(db, { id: schedule.id, frequencyMs: 600_000, payload: { i: 2 }, lastScheduledTaskId: uuidv7() })).unwrap();
6466
expect(updated.frequencyMs).toBe(600_000);
6567
expect(updated.payload).toMatchObject({ i: 2 });
6668
expect(updated.updatedAt.getTime()).toBeGreaterThan(schedule.updatedAt.getTime());
@@ -90,7 +92,8 @@ async function createSchedule(db: knex.Knex): Promise<Schedule> {
9092
retryMax: 1,
9193
createdToStartedTimeoutSecs: 1,
9294
startedToCompletedTimeoutSecs: 1,
93-
heartbeatTimeoutSecs: 1
95+
heartbeatTimeoutSecs: 1,
96+
lastScheduledTaskId: null
9497
})
9598
).unwrap();
9699
}

packages/scheduler/lib/models/schedules.ts

+12-4
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ export interface DbSchedule {
4646
readonly created_at: Date;
4747
updated_at: Date;
4848
deleted_at: Date | null;
49+
last_scheduled_task_id: string | null;
4950
}
5051

5152
// knex uses https://github.com/bendrucker/postgres-interval
@@ -84,7 +85,8 @@ export const DbSchedule = {
8485
heartbeat_timeout_secs: schedule.heartbeatTimeoutSecs,
8586
created_at: schedule.createdAt,
8687
updated_at: schedule.updatedAt,
87-
deleted_at: schedule.deletedAt
88+
deleted_at: schedule.deletedAt,
89+
last_scheduled_task_id: schedule.lastScheduledTaskId
8890
}),
8991
from: (dbSchedule: DbSchedule): Schedule => ({
9092
id: dbSchedule.id,
@@ -100,7 +102,8 @@ export const DbSchedule = {
100102
heartbeatTimeoutSecs: dbSchedule.heartbeat_timeout_secs,
101103
createdAt: dbSchedule.created_at,
102104
updatedAt: dbSchedule.updated_at,
103-
deletedAt: dbSchedule.deleted_at
105+
deletedAt: dbSchedule.deleted_at,
106+
lastScheduledTaskId: dbSchedule.last_scheduled_task_id
104107
})
105108
};
106109

@@ -115,7 +118,8 @@ export async function create(db: knex.Knex, props: ScheduleProps): Promise<Resul
115118
frequencyMs: props.frequencyMs,
116119
createdAt: now,
117120
updatedAt: now,
118-
deletedAt: null
121+
deletedAt: null,
122+
lastScheduledTaskId: null
119123
};
120124
try {
121125
const inserted = await db.from<DbSchedule>(SCHEDULES_TABLE).insert(DbSchedule.to(newSchedule)).returning('*');
@@ -166,11 +170,15 @@ export async function transitionState(db: knex.Knex, scheduleId: string, to: Sch
166170
}
167171
}
168172

169-
export async function update(db: knex.Knex, props: Partial<Pick<ScheduleProps, 'frequencyMs' | 'payload'>> & { id: string }): Promise<Result<Schedule>> {
173+
export async function update(
174+
db: knex.Knex,
175+
props: Partial<Pick<ScheduleProps, 'frequencyMs' | 'payload' | 'lastScheduledTaskId'>> & { id: string }
176+
): Promise<Result<Schedule>> {
170177
try {
171178
const newValues = {
172179
...(props.frequencyMs ? { frequency: `${props.frequencyMs} milliseconds` } : {}),
173180
...(props.payload ? { payload: props.payload } : {}),
181+
...(props.lastScheduledTaskId ? { last_scheduled_task_id: props.lastScheduledTaskId } : {}),
174182
updated_at: new Date()
175183
};
176184
const updated = await db.from<DbSchedule>(SCHEDULES_TABLE).where('id', props.id).update(newValues).returning('*');

packages/scheduler/lib/scheduler.integration.test.ts

+2-1
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,8 @@ async function recurring({ scheduler, state = 'PAUSED' }: { scheduler: Scheduler
168168
retryCount: 0,
169169
createdToStartedTimeoutSecs: 3600,
170170
startedToCompletedTimeoutSecs: 3600,
171-
heartbeatTimeoutSecs: 600
171+
heartbeatTimeoutSecs: 600,
172+
lastScheduledTaskId: null
172173
};
173174
return (await scheduler.recurring(recurringProps)).unwrap();
174175
}

packages/scheduler/lib/types.ts

+1
Original file line numberDiff line numberDiff line change
@@ -48,4 +48,5 @@ export interface Schedule {
4848
readonly createdAt: Date;
4949
readonly updatedAt: Date;
5050
readonly deletedAt: Date | null;
51+
readonly lastScheduledTaskId: string | null;
5152
}

packages/scheduler/lib/workers/scheduling/scheduling.integration.test.ts

+5-1
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,8 @@ async function addSchedule(db: knex.Knex, params?: { state?: ScheduleState; star
108108
heartbeat_timeout_secs: 1,
109109
created_at: new Date(),
110110
updated_at: new Date(),
111-
deleted_at: params?.state === 'DELETED' ? new Date() : null
111+
deleted_at: params?.state === 'DELETED' ? new Date() : null,
112+
last_scheduled_task_id: null
112113
};
113114
const res = await db.from<DbSchedule>(SCHEDULES_TABLE).insert(schedule).returning('*');
114115
const inserted = res[0];
@@ -151,5 +152,8 @@ async function addTask(
151152
if (!inserted) {
152153
throw new Error('Failed to insert task');
153154
}
155+
if (params?.scheduleId) {
156+
await db.from<DbSchedule>(SCHEDULES_TABLE).where('id', params.scheduleId).update({ last_scheduled_task_id: inserted.id });
157+
}
154158
return DbTask.from(inserted);
155159
}

packages/scheduler/lib/workers/scheduling/scheduling.worker.ts

+10-5
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import type knex from 'knex';
88
import { logger } from '../../utils/logger.js';
99
import { dueSchedules } from './scheduling.js';
1010
import * as tasks from '../../models/tasks.js';
11+
import * as schedules from '../../models/schedules.js';
1112
import tracer from 'dd-trace';
1213

1314
interface CreatedTasksMessage {
@@ -109,17 +110,17 @@ export class SchedulingChild {
109110

110111
if (lockGranted) {
111112
const dueSchedulesSpan = tracer.startSpan('scheduler.scheduling.due_schedules', { childOf: span });
112-
const schedules = await tracer.scope().activate(dueSchedulesSpan, async () => {
113+
const getDueSchedules = await tracer.scope().activate(dueSchedulesSpan, async () => {
113114
return dueSchedules(trx);
114115
});
115116
dueSchedulesSpan.finish();
116117

117-
if (schedules.isErr()) {
118-
return Err(`Failed to get due schedules: ${stringifyError(schedules.error)}`);
118+
if (getDueSchedules.isErr()) {
119+
return Err(`Failed to get due schedules: ${stringifyError(getDueSchedules.error)}`);
119120
} else {
120121
const tasksCreationSpan = tracer.startSpan('scheduler.scheduling.tasks_creation', { childOf: span });
121122
await tracer.scope().activate(tasksCreationSpan, async () => {
122-
const createTasks = schedules.value.map((schedule) =>
123+
const createTasks = getDueSchedules.value.map((schedule) =>
123124
tasks.create(trx, {
124125
scheduleId: schedule.id,
125126
startsAfter: new Date(),
@@ -140,7 +141,11 @@ export class SchedulingChild {
140141
} else if (taskRes.value.isErr()) {
141142
logger.error(`Failed to schedule task: ${stringifyError(taskRes.value.error)}`);
142143
} else {
143-
taskIds.push(taskRes.value.value.id);
144+
const task = taskRes.value.value;
145+
if (task.scheduleId) {
146+
schedules.update(trx, { id: task.scheduleId, lastScheduledTaskId: task.id });
147+
}
148+
taskIds.push(task.id);
144149
}
145150
}
146151
});

0 commit comments

Comments
 (0)