Skip to content

Commit 5a97e4f

Browse files
sdesalascursoragent
authored andcommitted
[Security Solution] Randomize task schedules when bulk scheduling (elastic#269991)
**Resolves: elastic#195136** **Related to: elastic#264893** **Related to: elastic#269340** ## Summary `TaskScheduling.bulkSchedule` previously sent every task to the store with no `runAt`, which caused the store to default them all to "now". When a caller bulk-scheduled many recurring tasks at once, the polling queue was flooded with [simultaneous claims](https://en.wikipedia.org/wiki/Thundering_herd_problem). This PR brings `bulkSchedule` in line with `bulkEnable` (see elastic#172742): the first task in the batch still runs immediately, but subsequent enabled recurring tasks are scheduled with a randomized `runAt`, evenly distributed up to 5 minutes in the future. Ad-hoc tasks (no `schedule.interval`) and disabled tasks are left untouched and run immediately as before. This also helps unblock upcoming work on `RulesClient.bulkCreate` (elastic#264893), where a single API call may schedule a large number of detection-rule tasks at once. ## Whats included - The existing `randomlyOffsetRunTimestamp()` helper is replaced by a smaller pure helper `addJitter()` that returns `{ runAt, scheduledAt }` - or `undefined` when no interval is supplied - so callers control the spread. - `bulkSchedule` map callback now receives the index `i` and uses `addJitter()` when `enabled && i > 0`. - `bulkEnable`'s behavior continues exactly the same with the `i > 0` branch using the shared `addJitter()` helper. ## How to test > [!IMPORTANT] > There is no easy way to test TM `bulkSchedule()` randomizing without `v2` alerting. Because of this, the test below only covers task randomizing under `bulkEnable()`. If you apply debugging here, you will notice that enabling detection rules in bulk uses `bulkSchedule()` under the hood, but it does so in `enabled: false` state. In other words, no jitter will get applied until the following pass. This is expected, what the test below does primarily is to verify that existing behavior is unaffected. The changes to behavior in `bulkSchedule(enabled:true)` will become more meaningful with upcoming work on alerting `v2` and `RulesClient.bulkCreate`. 1. Start ES + Kibana from this branch. Make sure you have a clean ES with no rules. 2. In Kibana, navigate to **Security → Rules → Detection rules (SIEM)** and click **Add Elastic Rules** to install the prebuilt detection rule set (~1850 rules). Leave them disabled. > Note: This is a good time to place some breakpoints if you're debugging locally. 3. Go back to the rules management screen. Under "Installed Rules" click the checkbox to select first 20 rules then `Bulk actions` > `Enable`. You should see a message saying "Successfully enabled 20 rules" 4. Verify the `runAt` / `scheduledAt` distribution using [`check-task-runtime.sh`](https://github.com/sdesalas/kibana-knowledge/blob/main/scripts/check-task-runtime.sh): ```bash $ ./check-task-runtime.sh ``` Or if you are using ports different to the standard `5601` and `9200` ```bash $ KIBANA_DEV_PORT=5606 ES_DEV_PORT=9205 ./check-task-runtime.sh ``` 5. Expected output: counts match, and the first-20 task timestamps are **spread across several minutes** rather than all stamped with the same "now": ``` starting.. KIBANA_URL=http://localhost:5601/kbn ES_URL=http://localhost:9200 1. 2. 3. 4. 5. 6. rules: 1850 rules_enabled: 20 tasks: 20 tasks_enabled: 20 api_key_owner: 20 apiKey present: 20 first 20 tasks: taskType status enabled runAt scheduledAt alerting:siem.queryRule idle true 2026-05-19T16:20:08.323Z 2026-05-19T16:20:08.323Z alerting:siem.queryRule idle true 2026-05-19T16:21:28.689Z 2026-05-19T16:21:28.689Z alerting:siem.eqlRule idle true 2026-05-19T16:21:05.927Z 2026-05-19T16:21:05.927Z alerting:siem.queryRule idle true 2026-05-19T16:20:53.163Z 2026-05-19T16:20:53.163Z alerting:siem.queryRule idle true 2026-05-19T16:23:30.562Z 2026-05-19T16:23:30.562Z alerting:siem.esqlRule idle true 2026-05-19T16:23:45.295Z 2026-05-19T16:23:45.295Z ... ``` For every task, `runAt` should equal its matching `scheduledAt`. The timestamps should be distributed across the configured jitter window (`min(rule interval, 5m)`) - confirming jitter is applied per-task. On `main` without this PR, every task's `runAt` collapses to the same value. ## Callers of `bulkSchedule` For reference, the production callers of `TaskScheduling.bulkSchedule` and whether they exercise the new jitter: | Caller | What it schedules | Hits the new jitter? | |---|---|---| | [`alerting_v2/.../rules_client.bulkEnableRules`](https://github.com/elastic/kibana/blob/main/x-pack/platform/plugins/shared/alerting_v2/server/lib/rules_client/rules_client.ts) | enabled, recurring (`schedule.interval`, `enabled: true`) | **Yes** — primary path exercised by the manual test above | | [`alerting/.../bulk_enable_rules.ts`](https://github.com/elastic/kibana/blob/main/x-pack/platform/plugins/shared/alerting/server/application/rule/methods/bulk_enable/bulk_enable_rules.ts) (legacy) | recurring but `enabled: false` — the comment in that file explicitly says "we create the task as disabled, taskManager.bulkEnable will enable them by randomising their schedule datetime" | No (jitter applied later by `bulkEnable`) | | [`workflows_execution_engine/server/plugin.ts`](https://github.com/elastic/kibana/blob/main/src/platform/plugins/shared/workflows_execution_engine/server/plugin.ts) | enabled but ad-hoc (no `schedule`) | No (ad-hoc — runs immediately, correct) | | [`actions/create_execute_function.ts`](https://github.com/elastic/kibana/blob/main/x-pack/platform/plugins/shared/actions/server/create_execute_function.ts) | ad-hoc action tasks | No | | [`actions/create_unsecured_execute_function.ts`](https://github.com/elastic/kibana/blob/main/x-pack/platform/plugins/shared/actions/server/create_unsecured_execute_function.ts) | ad-hoc action tasks | No | | [`alerting/.../backfill_client.ts`](https://github.com/elastic/kibana/blob/main/x-pack/platform/plugins/shared/alerting/server/backfill_client/backfill_client.ts) | variable is literally `adHocTasksToSchedule` | No | Of these, only the alerting_v2 `bulkEnableRules` path schedules enabled recurring tasks in bulk, so it is the only caller whose runtime behavior changes with this PR. ## Release note skip ## Checklist - [x] [Unit or functional tests](https://www.elastic.co/guide/en/kibana/master/development-tests.html) were updated or added to match the most common scenarios ### Identify risks - Behavior change is scoped to recurring tasks at `i > 0`; single-task `bulkSchedule` calls and ad-hoc tasks retain the existing "run now" semantics. - The `bulkEnable` path is unchanged in semantics; only the helper signature changed. --------- Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent 1b5ece0 commit 5a97e4f

2 files changed

Lines changed: 146 additions & 14 deletions

File tree

x-pack/platform/plugins/shared/task_manager/server/task_scheduling.test.ts

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1337,6 +1337,8 @@ describe('TaskScheduling', () => {
13371337
schedule: undefined,
13381338
traceparent: 'parent',
13391339
enabled: true,
1340+
runAt: new Date(),
1341+
scheduledAt: new Date(),
13401342
},
13411343
],
13421344
undefined
@@ -1366,6 +1368,8 @@ describe('TaskScheduling', () => {
13661368
schedule: undefined,
13671369
traceparent: 'parent',
13681370
enabled: true,
1371+
runAt: new Date(),
1372+
scheduledAt: new Date(),
13691373
},
13701374
{
13711375
...task2,
@@ -1379,6 +1383,126 @@ describe('TaskScheduling', () => {
13791383
);
13801384
});
13811385

1386+
test('leaves the first task untouched and jitters subsequent recurring tasks within min(interval, 5m)', async () => {
1387+
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
1388+
const task0 = {
1389+
taskType: 'foo',
1390+
params: {},
1391+
state: {},
1392+
schedule: { interval: '1m' },
1393+
};
1394+
const task1 = {
1395+
taskType: 'foo',
1396+
params: {},
1397+
state: {},
1398+
schedule: { interval: '1m' },
1399+
};
1400+
const task2 = {
1401+
taskType: 'foo',
1402+
params: {},
1403+
state: {},
1404+
schedule: { interval: '1h' },
1405+
};
1406+
await taskScheduling.bulkSchedule([task0, task1, task2]);
1407+
1408+
const bulkSchedulePayload = mockTaskStore.bulkSchedule.mock.calls[0][0];
1409+
1410+
expect(bulkSchedulePayload.length).toBe(3);
1411+
1412+
expect(bulkSchedulePayload[0]).toEqual({
1413+
...task0,
1414+
id: undefined,
1415+
traceparent: 'parent',
1416+
enabled: true,
1417+
runAt: new Date(),
1418+
scheduledAt: new Date(),
1419+
});
1420+
1421+
expect(omit(bulkSchedulePayload[1], 'runAt', 'scheduledAt')).toEqual({
1422+
...task1,
1423+
id: undefined,
1424+
traceparent: 'parent',
1425+
enabled: true,
1426+
});
1427+
expect(omit(bulkSchedulePayload[2], 'runAt', 'scheduledAt')).toEqual({
1428+
...task2,
1429+
id: undefined,
1430+
traceparent: 'parent',
1431+
enabled: true,
1432+
});
1433+
1434+
const t1RunAt = bulkSchedulePayload[1].runAt!.getTime();
1435+
expect(bulkSchedulePayload[1].scheduledAt!.getTime()).toBe(t1RunAt);
1436+
expect(t1RunAt).toBeGreaterThanOrEqual(1);
1437+
expect(t1RunAt).toBeLessThanOrEqual(60 * 1000);
1438+
1439+
const t2RunAt = bulkSchedulePayload[2].runAt!.getTime();
1440+
expect(bulkSchedulePayload[2].scheduledAt!.getTime()).toBe(t2RunAt);
1441+
expect(t2RunAt).toBeGreaterThanOrEqual(1);
1442+
expect(t2RunAt).toBeLessThanOrEqual(5 * 60 * 1000);
1443+
});
1444+
1445+
test('runs ad-hoc tasks immediately without jitter, even at i > 0', async () => {
1446+
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
1447+
const recurringTask = {
1448+
taskType: 'foo',
1449+
params: {},
1450+
state: {},
1451+
schedule: { interval: '1m' },
1452+
};
1453+
const adHocTask = {
1454+
taskType: 'foo',
1455+
params: {},
1456+
state: {},
1457+
};
1458+
await taskScheduling.bulkSchedule([recurringTask, adHocTask]);
1459+
1460+
const bulkSchedulePayload = mockTaskStore.bulkSchedule.mock.calls[0][0];
1461+
1462+
expect(bulkSchedulePayload).toEqual([
1463+
{
1464+
...recurringTask,
1465+
id: undefined,
1466+
traceparent: 'parent',
1467+
enabled: true,
1468+
runAt: new Date(),
1469+
scheduledAt: new Date(),
1470+
},
1471+
{
1472+
...adHocTask,
1473+
id: undefined,
1474+
schedule: undefined,
1475+
traceparent: 'parent',
1476+
enabled: true,
1477+
runAt: new Date(),
1478+
scheduledAt: new Date(),
1479+
},
1480+
]);
1481+
});
1482+
1483+
test('does not jitter disabled tasks even if they have an interval schedule', async () => {
1484+
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
1485+
const task = {
1486+
taskType: 'foo',
1487+
params: {},
1488+
state: {},
1489+
schedule: { interval: '1h' },
1490+
enabled: false,
1491+
};
1492+
await taskScheduling.bulkSchedule([task]);
1493+
1494+
const bulkSchedulePayload = mockTaskStore.bulkSchedule.mock.calls[0][0];
1495+
1496+
expect(bulkSchedulePayload).toEqual([
1497+
{
1498+
...task,
1499+
id: undefined,
1500+
traceparent: 'parent',
1501+
enabled: false,
1502+
},
1503+
]);
1504+
});
1505+
13821506
test('doesnt allow naively rescheduling existing tasks that have already been scheduled', async () => {
13831507
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
13841508
mockTaskStore.bulkSchedule.mockRejectedValueOnce({

x-pack/platform/plugins/shared/task_manager/server/task_scheduling.ts

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -148,15 +148,26 @@ export class TaskScheduling {
148148
? agent.currentTraceparent
149149
: '';
150150
const modifiedTasks = await Promise.all(
151-
taskInstances.map(async (taskInstance) => {
151+
taskInstances.map(async (taskInstance, i) => {
152152
const { taskInstance: modifiedTask } = await this.middleware.beforeSave({
153153
...omit(options, 'apiKey', 'request'),
154154
taskInstance: ensureDeprecatedFieldsAreCorrected(taskInstance, this.logger),
155155
});
156+
const enabled = modifiedTask.enabled ?? true;
157+
let scheduling: Partial<{ runAt: Date; scheduledAt: Date }> = {};
158+
if (enabled) {
159+
// Run the first task now. Run all other tasks a random number of ms in the future,
160+
// with a maximum of 5 minutes or the task interval, whichever is smaller.
161+
scheduling =
162+
i === 0
163+
? { runAt: new Date(), scheduledAt: new Date() }
164+
: addJitter(modifiedTask.schedule?.interval) ?? {};
165+
}
156166
return {
157167
...modifiedTask,
158168
traceparent: traceparent || '',
159-
enabled: modifiedTask.enabled ?? true,
169+
enabled,
170+
...scheduling,
160171
};
161172
})
162173
);
@@ -200,11 +211,9 @@ export class TaskScheduling {
200211
if (runSoon) {
201212
// Run the first task now. Run all other tasks a random number of ms in the future,
202213
// with a maximum of 5 minutes or the task interval, whichever is smaller.
203-
const taskToRun =
204-
i === 0
205-
? { ...task, runAt: new Date(), scheduledAt: new Date() }
206-
: randomlyOffsetRunTimestamp(task);
207-
return { ...taskToRun, enabled: true };
214+
return i === 0
215+
? { ...task, enabled: true, runAt: new Date(), scheduledAt: new Date() }
216+
: { ...task, enabled: true, ...addJitter(task.schedule?.interval ?? '0s') };
208217
}
209218
return { ...task, enabled: true };
210219
},
@@ -375,17 +384,16 @@ export class TaskScheduling {
375384
}
376385
}
377386

378-
const randomlyOffsetRunTimestamp: (task: ConcreteTaskInstance) => ConcreteTaskInstance = (task) => {
387+
const addJitter = (interval?: string): { runAt: Date; scheduledAt: Date } | undefined => {
388+
// Adhoc tasks run immediately.
389+
if (!interval) return { runAt: new Date(), scheduledAt: new Date() };
390+
379391
const now = Date.now();
380392
const maximumOffsetTimestamp = now + 1000 * 60 * 5; // now + 5 minutes
381-
const taskIntervalInMs = parseIntervalAsMillisecond(task.schedule?.interval ?? '0s');
393+
const taskIntervalInMs = parseIntervalAsMillisecond(interval);
382394
const maximumRunAt = Math.min(now + taskIntervalInMs, maximumOffsetTimestamp);
383395

384396
// Offset between 1 and maximumRunAt ms
385397
const runAt = new Date(now + Math.floor(Math.random() * (maximumRunAt - now) + 1));
386-
return {
387-
...task,
388-
runAt,
389-
scheduledAt: runAt,
390-
};
398+
return { runAt, scheduledAt: runAt };
391399
};

0 commit comments

Comments
 (0)