Skip to content

Commit fba596b

Browse files
sdesalascursoragent
andcommitted
[Security Solution] Randomize task schedules when bulk scheduling
Resolves: #195136 Bring `TaskScheduling.bulkSchedule` in line with `bulkEnable`: the first task in a batch still runs immediately, but subsequent enabled recurring tasks are scheduled with a randomized `runAt` between 1ms and `min(interval, 5m)` from "now". Ad-hoc and disabled tasks are left untouched and run immediately as before. This prevents the polling queue from being flooded with simultaneous claims when many recurring tasks are bulk-scheduled in one call. `randomlyOffsetRunTimestamp` is replaced by a smaller pure helper `addJitter(interval?: string)` that returns just the timing fields, and the `bulkEnable` `i > 0` branch uses it too. Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent ae6c10b commit fba596b

2 files changed

Lines changed: 129 additions & 14 deletions

File tree

x-pack/platform/plugins/shared/task_manager/server/task_scheduling.test.ts

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1379,6 +1379,120 @@ describe('TaskScheduling', () => {
13791379
);
13801380
});
13811381

1382+
test('leaves the first task untouched and jitters subsequent recurring tasks within min(interval, 5m)', async () => {
1383+
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
1384+
const task0 = {
1385+
taskType: 'foo',
1386+
params: {},
1387+
state: {},
1388+
schedule: { interval: '1m' },
1389+
};
1390+
const task1 = {
1391+
taskType: 'foo',
1392+
params: {},
1393+
state: {},
1394+
schedule: { interval: '1m' },
1395+
};
1396+
const task2 = {
1397+
taskType: 'foo',
1398+
params: {},
1399+
state: {},
1400+
schedule: { interval: '1h' },
1401+
};
1402+
await taskScheduling.bulkSchedule([task0, task1, task2]);
1403+
1404+
const bulkSchedulePayload = mockTaskStore.bulkSchedule.mock.calls[0][0];
1405+
1406+
expect(bulkSchedulePayload.length).toBe(3);
1407+
1408+
expect(bulkSchedulePayload[0]).toEqual({
1409+
...task0,
1410+
id: undefined,
1411+
traceparent: 'parent',
1412+
enabled: true,
1413+
});
1414+
1415+
expect(omit(bulkSchedulePayload[1], 'runAt', 'scheduledAt')).toEqual({
1416+
...task1,
1417+
id: undefined,
1418+
traceparent: 'parent',
1419+
enabled: true,
1420+
});
1421+
expect(omit(bulkSchedulePayload[2], 'runAt', 'scheduledAt')).toEqual({
1422+
...task2,
1423+
id: undefined,
1424+
traceparent: 'parent',
1425+
enabled: true,
1426+
});
1427+
1428+
const t1RunAt = bulkSchedulePayload[1].runAt!.getTime();
1429+
expect(bulkSchedulePayload[1].scheduledAt!.getTime()).toBe(t1RunAt);
1430+
expect(t1RunAt).toBeGreaterThanOrEqual(1);
1431+
expect(t1RunAt).toBeLessThanOrEqual(60 * 1000);
1432+
1433+
const t2RunAt = bulkSchedulePayload[2].runAt!.getTime();
1434+
expect(bulkSchedulePayload[2].scheduledAt!.getTime()).toBe(t2RunAt);
1435+
expect(t2RunAt).toBeGreaterThanOrEqual(1);
1436+
expect(t2RunAt).toBeLessThanOrEqual(5 * 60 * 1000);
1437+
});
1438+
1439+
test('does not jitter ad-hoc tasks at i > 0', async () => {
1440+
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
1441+
const recurringTask = {
1442+
taskType: 'foo',
1443+
params: {},
1444+
state: {},
1445+
schedule: { interval: '1m' },
1446+
};
1447+
const adHocTask = {
1448+
taskType: 'foo',
1449+
params: {},
1450+
state: {},
1451+
};
1452+
await taskScheduling.bulkSchedule([recurringTask, adHocTask]);
1453+
1454+
const bulkSchedulePayload = mockTaskStore.bulkSchedule.mock.calls[0][0];
1455+
1456+
expect(bulkSchedulePayload).toEqual([
1457+
{
1458+
...recurringTask,
1459+
id: undefined,
1460+
traceparent: 'parent',
1461+
enabled: true,
1462+
},
1463+
{
1464+
...adHocTask,
1465+
id: undefined,
1466+
schedule: undefined,
1467+
traceparent: 'parent',
1468+
enabled: true,
1469+
},
1470+
]);
1471+
});
1472+
1473+
test('does not jitter disabled tasks even if they have an interval schedule', async () => {
1474+
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
1475+
const task = {
1476+
taskType: 'foo',
1477+
params: {},
1478+
state: {},
1479+
schedule: { interval: '1h' },
1480+
enabled: false,
1481+
};
1482+
await taskScheduling.bulkSchedule([task]);
1483+
1484+
const bulkSchedulePayload = mockTaskStore.bulkSchedule.mock.calls[0][0];
1485+
1486+
expect(bulkSchedulePayload).toEqual([
1487+
{
1488+
...task,
1489+
id: undefined,
1490+
traceparent: 'parent',
1491+
enabled: false,
1492+
},
1493+
]);
1494+
});
1495+
13821496
test('doesnt allow naively rescheduling existing tasks that have already been scheduled', async () => {
13831497
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
13841498
mockTaskStore.bulkSchedule.mockRejectedValueOnce({

x-pack/platform/plugins/shared/task_manager/server/task_scheduling.ts

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -148,15 +148,20 @@ export class TaskScheduling {
148148
? agent.currentTraceparent
149149
: '';
150150
const modifiedTasks = await Promise.all(
151-
taskInstances.map(async (taskInstance) => {
151+
taskInstances.map(async (taskInstance, i) => {
152152
const { taskInstance: modifiedTask } = await this.middleware.beforeSave({
153153
...omit(options, 'apiKey', 'request'),
154154
taskInstance: ensureDeprecatedFieldsAreCorrected(taskInstance, this.logger),
155155
});
156+
const enabled = modifiedTask.enabled ?? true;
157+
// Run the first task now. Run all other tasks a random number of ms in the future,
158+
// with a maximum of 5 minutes or the task interval, whichever is smaller.
159+
const runAt = enabled && i > 0 ? addJitter(modifiedTask.schedule?.interval) ?? {} : {};
156160
return {
157161
...modifiedTask,
158162
traceparent: traceparent || '',
159-
enabled: modifiedTask.enabled ?? true,
163+
enabled,
164+
...runAt,
160165
};
161166
})
162167
);
@@ -200,11 +205,9 @@ export class TaskScheduling {
200205
if (runSoon) {
201206
// Run the first task now. Run all other tasks a random number of ms in the future,
202207
// with a maximum of 5 minutes or the task interval, whichever is smaller.
203-
const taskToRun =
204-
i === 0
205-
? { ...task, runAt: new Date(), scheduledAt: new Date() }
206-
: randomlyOffsetRunTimestamp(task);
207-
return { ...taskToRun, enabled: true };
208+
return i === 0
209+
? { ...task, enabled: true, runAt: new Date(), scheduledAt: new Date() }
210+
: { ...task, enabled: true, ...addJitter(task.schedule?.interval ?? '0s') };
208211
}
209212
return { ...task, enabled: true };
210213
},
@@ -375,17 +378,15 @@ export class TaskScheduling {
375378
}
376379
}
377380

378-
const randomlyOffsetRunTimestamp: (task: ConcreteTaskInstance) => ConcreteTaskInstance = (task) => {
381+
const addJitter = (interval?: string): { runAt: Date; scheduledAt: Date } | undefined => {
382+
if (!interval) return undefined;
383+
379384
const now = Date.now();
380385
const maximumOffsetTimestamp = now + 1000 * 60 * 5; // now + 5 minutes
381-
const taskIntervalInMs = parseIntervalAsMillisecond(task.schedule?.interval ?? '0s');
386+
const taskIntervalInMs = parseIntervalAsMillisecond(interval);
382387
const maximumRunAt = Math.min(now + taskIntervalInMs, maximumOffsetTimestamp);
383388

384389
// Offset between 1 and maximumRunAt ms
385390
const runAt = new Date(now + Math.floor(Math.random() * (maximumRunAt - now) + 1));
386-
return {
387-
...task,
388-
runAt,
389-
scheduledAt: runAt,
390-
};
391+
return { runAt, scheduledAt: runAt };
391392
};

0 commit comments

Comments
 (0)