Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1379,6 +1379,120 @@ describe('TaskScheduling', () => {
);
});

test('leaves the first task untouched and jitters subsequent recurring tasks within min(interval, 5m)', async () => {
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
const task0 = {
taskType: 'foo',
params: {},
state: {},
schedule: { interval: '1m' },
};
const task1 = {
taskType: 'foo',
params: {},
state: {},
schedule: { interval: '1m' },
};
const task2 = {
taskType: 'foo',
params: {},
state: {},
schedule: { interval: '1h' },
};
await taskScheduling.bulkSchedule([task0, task1, task2]);

const bulkSchedulePayload = mockTaskStore.bulkSchedule.mock.calls[0][0];

expect(bulkSchedulePayload.length).toBe(3);

expect(bulkSchedulePayload[0]).toEqual({
...task0,
id: undefined,
traceparent: 'parent',
enabled: true,
});

expect(omit(bulkSchedulePayload[1], 'runAt', 'scheduledAt')).toEqual({
...task1,
id: undefined,
traceparent: 'parent',
enabled: true,
});
expect(omit(bulkSchedulePayload[2], 'runAt', 'scheduledAt')).toEqual({
...task2,
id: undefined,
traceparent: 'parent',
enabled: true,
});

const t1RunAt = bulkSchedulePayload[1].runAt!.getTime();
expect(bulkSchedulePayload[1].scheduledAt!.getTime()).toBe(t1RunAt);
expect(t1RunAt).toBeGreaterThanOrEqual(1);
expect(t1RunAt).toBeLessThanOrEqual(60 * 1000);

const t2RunAt = bulkSchedulePayload[2].runAt!.getTime();
expect(bulkSchedulePayload[2].scheduledAt!.getTime()).toBe(t2RunAt);
expect(t2RunAt).toBeGreaterThanOrEqual(1);
expect(t2RunAt).toBeLessThanOrEqual(5 * 60 * 1000);
});

test('does not jitter ad-hoc tasks at i > 0', async () => {
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
const recurringTask = {
taskType: 'foo',
params: {},
state: {},
schedule: { interval: '1m' },
};
const adHocTask = {
taskType: 'foo',
params: {},
state: {},
};
await taskScheduling.bulkSchedule([recurringTask, adHocTask]);

const bulkSchedulePayload = mockTaskStore.bulkSchedule.mock.calls[0][0];

expect(bulkSchedulePayload).toEqual([
{
...recurringTask,
id: undefined,
traceparent: 'parent',
enabled: true,
},
{
...adHocTask,
id: undefined,
schedule: undefined,
traceparent: 'parent',
enabled: true,
},
]);
});

test('does not jitter disabled tasks even if they have an interval schedule', async () => {
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
const task = {
taskType: 'foo',
params: {},
state: {},
schedule: { interval: '1h' },
enabled: false,
};
await taskScheduling.bulkSchedule([task]);

const bulkSchedulePayload = mockTaskStore.bulkSchedule.mock.calls[0][0];

expect(bulkSchedulePayload).toEqual([
{
...task,
id: undefined,
traceparent: 'parent',
enabled: false,
},
]);
});

test('doesnt allow naively rescheduling existing tasks that have already been scheduled', async () => {
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
mockTaskStore.bulkSchedule.mockRejectedValueOnce({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,15 +148,20 @@ export class TaskScheduling {
? agent.currentTraceparent
: '';
const modifiedTasks = await Promise.all(
taskInstances.map(async (taskInstance) => {
taskInstances.map(async (taskInstance, i) => {
const { taskInstance: modifiedTask } = await this.middleware.beforeSave({
...omit(options, 'apiKey', 'request'),
taskInstance: ensureDeprecatedFieldsAreCorrected(taskInstance, this.logger),
});
const enabled = modifiedTask.enabled ?? true;
// Run the first task now. Run all other tasks a random number of ms in the future,
// with a maximum of 5 minutes or the task interval, whichever is smaller.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I guess this comment was carried over from bulkEnable, but bulkSchedule doesn't actually set runAt: new Date() for i == 0, it just spreads {} and lets the store default to now.
Could we either adjsut the wording to match what's happening, or mirror bulkEnable and set runAt: new Date(), scheduledAt: new Date() explicitly for i == 0?
I think it'd be easier to have consistent logic for both methods

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @darnautov, addressed in 08379a0

Logic is the same now across bulkEnable and bulkSchedule. You're right the comment was not clear that the magic was happening inside the store. Seeing the dates set explicitly makes this clear on this end.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also added this change to make sure that adhoc tasks are set with a date explicitly (to run now).

const runAt = enabled && i > 0 ? addJitter(modifiedTask.schedule?.interval) ?? {} : {};
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: since the variable contain an object with more fields than runAt - { runAt: Date, scheduledAt: Date } shall we name it jitterSchedule or something similar?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @darnautov, addressed in 08379a0

I went with var name scheduling to cover all situations, jittering and no jittering.

return {
...modifiedTask,
traceparent: traceparent || '',
enabled: modifiedTask.enabled ?? true,
enabled,
...runAt,
};
})
);
Expand Down Expand Up @@ -200,11 +205,9 @@ export class TaskScheduling {
if (runSoon) {
// Run the first task now. Run all other tasks a random number of ms in the future,
// with a maximum of 5 minutes or the task interval, whichever is smaller.
const taskToRun =
i === 0
? { ...task, runAt: new Date(), scheduledAt: new Date() }
: randomlyOffsetRunTimestamp(task);
return { ...taskToRun, enabled: true };
return i === 0
? { ...task, enabled: true, runAt: new Date(), scheduledAt: new Date() }
: { ...task, enabled: true, ...addJitter(task.schedule?.interval ?? '0s') };
}
return { ...task, enabled: true };
},
Expand Down Expand Up @@ -375,17 +378,15 @@ export class TaskScheduling {
}
}

const randomlyOffsetRunTimestamp: (task: ConcreteTaskInstance) => ConcreteTaskInstance = (task) => {
const addJitter = (interval?: string): { runAt: Date; scheduledAt: Date } | undefined => {
if (!interval) return undefined;

const now = Date.now();
const maximumOffsetTimestamp = now + 1000 * 60 * 5; // now + 5 minutes
const taskIntervalInMs = parseIntervalAsMillisecond(task.schedule?.interval ?? '0s');
const taskIntervalInMs = parseIntervalAsMillisecond(interval);
const maximumRunAt = Math.min(now + taskIntervalInMs, maximumOffsetTimestamp);

// Offset between 1 and maximumRunAt ms
const runAt = new Date(now + Math.floor(Math.random() * (maximumRunAt - now) + 1));
return {
...task,
runAt,
scheduledAt: runAt,
};
return { runAt, scheduledAt: runAt };
};
Loading