Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1337,6 +1337,8 @@ describe('TaskScheduling', () => {
schedule: undefined,
traceparent: 'parent',
enabled: true,
runAt: new Date(),
scheduledAt: new Date(),
},
],
undefined
Expand Down Expand Up @@ -1366,6 +1368,8 @@ describe('TaskScheduling', () => {
schedule: undefined,
traceparent: 'parent',
enabled: true,
runAt: new Date(),
scheduledAt: new Date(),
},
{
...task2,
Expand All @@ -1379,6 +1383,126 @@ describe('TaskScheduling', () => {
);
});

test('leaves the first task untouched and jitters subsequent recurring tasks within min(interval, 5m)', async () => {
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
const task0 = {
taskType: 'foo',
params: {},
state: {},
schedule: { interval: '1m' },
};
const task1 = {
taskType: 'foo',
params: {},
state: {},
schedule: { interval: '1m' },
};
const task2 = {
taskType: 'foo',
params: {},
state: {},
schedule: { interval: '1h' },
};
await taskScheduling.bulkSchedule([task0, task1, task2]);

const bulkSchedulePayload = mockTaskStore.bulkSchedule.mock.calls[0][0];

expect(bulkSchedulePayload.length).toBe(3);

expect(bulkSchedulePayload[0]).toEqual({
...task0,
id: undefined,
traceparent: 'parent',
enabled: true,
runAt: new Date(),
scheduledAt: new Date(),
});

expect(omit(bulkSchedulePayload[1], 'runAt', 'scheduledAt')).toEqual({
...task1,
id: undefined,
traceparent: 'parent',
enabled: true,
});
expect(omit(bulkSchedulePayload[2], 'runAt', 'scheduledAt')).toEqual({
...task2,
id: undefined,
traceparent: 'parent',
enabled: true,
});

const t1RunAt = bulkSchedulePayload[1].runAt!.getTime();
expect(bulkSchedulePayload[1].scheduledAt!.getTime()).toBe(t1RunAt);
expect(t1RunAt).toBeGreaterThanOrEqual(1);
expect(t1RunAt).toBeLessThanOrEqual(60 * 1000);

const t2RunAt = bulkSchedulePayload[2].runAt!.getTime();
expect(bulkSchedulePayload[2].scheduledAt!.getTime()).toBe(t2RunAt);
expect(t2RunAt).toBeGreaterThanOrEqual(1);
expect(t2RunAt).toBeLessThanOrEqual(5 * 60 * 1000);
});

test('runs ad-hoc tasks immediately without jitter, even at i > 0', async () => {
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
const recurringTask = {
taskType: 'foo',
params: {},
state: {},
schedule: { interval: '1m' },
};
const adHocTask = {
taskType: 'foo',
params: {},
state: {},
};
await taskScheduling.bulkSchedule([recurringTask, adHocTask]);

const bulkSchedulePayload = mockTaskStore.bulkSchedule.mock.calls[0][0];

expect(bulkSchedulePayload).toEqual([
{
...recurringTask,
id: undefined,
traceparent: 'parent',
enabled: true,
runAt: new Date(),
scheduledAt: new Date(),
},
{
...adHocTask,
id: undefined,
schedule: undefined,
traceparent: 'parent',
enabled: true,
runAt: new Date(),
scheduledAt: new Date(),
},
]);
});

test('does not jitter disabled tasks even if they have an interval schedule', async () => {
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
const task = {
taskType: 'foo',
params: {},
state: {},
schedule: { interval: '1h' },
enabled: false,
};
await taskScheduling.bulkSchedule([task]);

const bulkSchedulePayload = mockTaskStore.bulkSchedule.mock.calls[0][0];

expect(bulkSchedulePayload).toEqual([
{
...task,
id: undefined,
traceparent: 'parent',
enabled: false,
},
]);
});

test('doesnt allow naively rescheduling existing tasks that have already been scheduled', async () => {
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
mockTaskStore.bulkSchedule.mockRejectedValueOnce({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,15 +148,26 @@ export class TaskScheduling {
? agent.currentTraceparent
: '';
const modifiedTasks = await Promise.all(
taskInstances.map(async (taskInstance) => {
taskInstances.map(async (taskInstance, i) => {
const { taskInstance: modifiedTask } = await this.middleware.beforeSave({
...omit(options, 'apiKey', 'request'),
taskInstance: ensureDeprecatedFieldsAreCorrected(taskInstance, this.logger),
});
const enabled = modifiedTask.enabled ?? true;
let scheduling: Partial<{ runAt: Date; scheduledAt: Date }> = {};
if (enabled) {
// Run the first task now. Run all other tasks a random number of ms in the future,
// with a maximum of 5 minutes or the task interval, whichever is smaller.
scheduling =
i === 0
? { runAt: new Date(), scheduledAt: new Date() }
: addJitter(modifiedTask.schedule?.interval) ?? {};
}
return {
...modifiedTask,
traceparent: traceparent || '',
enabled: modifiedTask.enabled ?? true,
enabled,
...scheduling,
};
})
);
Expand Down Expand Up @@ -200,11 +211,9 @@ export class TaskScheduling {
if (runSoon) {
// Run the first task now. Run all other tasks a random number of ms in the future,
// with a maximum of 5 minutes or the task interval, whichever is smaller.
const taskToRun =
i === 0
? { ...task, runAt: new Date(), scheduledAt: new Date() }
: randomlyOffsetRunTimestamp(task);
return { ...taskToRun, enabled: true };
return i === 0
? { ...task, enabled: true, runAt: new Date(), scheduledAt: new Date() }
: { ...task, enabled: true, ...addJitter(task.schedule?.interval ?? '0s') };
}
return { ...task, enabled: true };
},
Expand Down Expand Up @@ -375,17 +384,16 @@ export class TaskScheduling {
}
}

const randomlyOffsetRunTimestamp: (task: ConcreteTaskInstance) => ConcreteTaskInstance = (task) => {
const addJitter = (interval?: string): { runAt: Date; scheduledAt: Date } | undefined => {
// Adhoc tasks run immediately.
if (!interval) return { runAt: new Date(), scheduledAt: new Date() };

const now = Date.now();
const maximumOffsetTimestamp = now + 1000 * 60 * 5; // now + 5 minutes
const taskIntervalInMs = parseIntervalAsMillisecond(task.schedule?.interval ?? '0s');
const taskIntervalInMs = parseIntervalAsMillisecond(interval);
const maximumRunAt = Math.min(now + taskIntervalInMs, maximumOffsetTimestamp);

// Offset between 1 and maximumRunAt ms
const runAt = new Date(now + Math.floor(Math.random() * (maximumRunAt - now) + 1));
return {
...task,
runAt,
scheduledAt: runAt,
};
return { runAt, scheduledAt: runAt };
};
Loading