Skip to content

Commit a262800

Browse files
[One workflow][Scout] Refactor scheduled workflow tests to reflect actual TaskManager execution timing (elastic#258512)
closes: elastic/security-team#16272
1 parent 5286e34 commit a262800

2 files changed

Lines changed: 142 additions & 94 deletions

File tree

src/platform/plugins/shared/workflows_management/test/scout_workflows_ui/api/tests/workflow_execution/scheduled_workflow.spec.ts

Lines changed: 74 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -10,16 +10,31 @@
1010
import { tags } from '@kbn/scout';
1111
import { expect } from '@kbn/scout/api';
1212
import { ExecutionStatus } from '@kbn/workflows/types/latest';
13-
import type { WorkflowDetailDto, WorkflowsApiService } from '../../../common/apis/workflows';
13+
import type { WorkflowsApiService } from '../../../common/apis/workflows';
1414
import { waitForConditionOrThrow } from '../../../common/utils/wait_for_condition';
1515
import { spaceTest } from '../../fixtures';
1616

1717
const SCHEDULED_WORKFLOW_INTERVAL_SECONDS = 5;
1818

19+
// Task Manager executes scheduled tasks via a fixed-interval polling loop (default: 3 s).
20+
// A task can only start when a poll cycle fires AND the task's `runAt` has passed —
21+
// there is no "wake up at exactly runAt" mechanism.
22+
//
23+
// When the workflow interval is not a multiple of the poll interval, the Least Common
24+
// Multiple (LCM) determines the repeating execution pattern over time:
25+
//
26+
// LCM(interval=5s, pollInterval=3s) = 15s
27+
// → 3 executions per 15-second window, spaced ~6s / ~6s / ~3s (never exactly 5s)
28+
//
29+
// The bounds any consecutive-run gap must satisfy:
30+
// min = TASK_MANAGER_POLL_INTERVAL_MS (poll cannot fire more often than this)
31+
// max = interval + TASK_MANAGER_POLL_INTERVAL_MS (at most one poll miss per cycle)
32+
const TASK_MANAGER_POLL_INTERVAL_MS = 3_000;
33+
1934
const SHORT_RUNNING_SCHEDULED_WORKFLOW_YAML = `
2035
name: Scout Scheduled Workflow Test
2136
enabled: false
22-
description: Scheduled workflow that runs every 10s for testing
37+
description: Scheduled workflow that runs every ${SCHEDULED_WORKFLOW_INTERVAL_SECONDS}s for testing
2338
triggers:
2439
- type: scheduled
2540
with:
@@ -34,7 +49,7 @@ steps:
3449
const LONG_RUNNING_SCHEDULED_WORKFLOW_YAML = `
3550
name: Scout Scheduled Workflow Test
3651
enabled: true
37-
description: Scheduled workflow that runs every 10s for testing
52+
description: Scheduled workflow that runs every ${SCHEDULED_WORKFLOW_INTERVAL_SECONDS}s for testing
3853
triggers:
3954
- type: scheduled
4055
with:
@@ -48,73 +63,16 @@ steps:
4863
- name: wait
4964
type: wait
5065
with:
51-
duration: 11s
52-
`;
53-
54-
const getScheduledWorkflowYaml = (interval: string) => `
55-
name: Scout Scheduled Save/Update Test
56-
enabled: true
57-
description: Scheduled workflow for save/update interval change test
58-
triggers:
59-
- type: scheduled
60-
with:
61-
every: ${interval}
62-
steps:
63-
- name: log_step
64-
type: console
65-
with:
66-
message: "Scheduled execution"
66+
duration: ${SCHEDULED_WORKFLOW_INTERVAL_SECONDS + 1}s
6767
`;
6868

69-
spaceTest.describe('Scheduled workflow save and update', { tag: tags.deploymentAgnostic }, () => {
70-
let workflowsApi: WorkflowsApiService;
71-
let workflowId: string;
72-
let workflowBeforeUpdate: WorkflowDetailDto;
73-
74-
spaceTest.beforeAll(async ({ apiServices }) => {
75-
workflowsApi = apiServices.workflowsApi;
76-
const created = await workflowsApi.create(getScheduledWorkflowYaml('5s'));
77-
workflowId = created.id;
78-
});
79-
80-
spaceTest.afterAll(async () => {
81-
await workflowsApi.deleteAll();
82-
});
83-
84-
spaceTest('updating a scheduled workflow with a changed interval succeeds', async () => {
85-
await waitForConditionOrThrow({
86-
action: () => workflowsApi.getExecutions(workflowId),
87-
condition: ({ results: r }) => r.length >= 1,
88-
interval: 1000,
89-
timeout: 10000,
90-
errorMessage: 'No executions appeared after enabling the workflow',
91-
});
92-
const getBefore = await workflowsApi.rawGetWorkflow(workflowId);
93-
expect(getBefore.status).toBe(200);
94-
workflowBeforeUpdate = getBefore.data;
95-
expect(workflowBeforeUpdate.yaml).toContain('every: 5s');
96-
97-
const updateResponse = await workflowsApi.rawUpdate(workflowId, {
98-
yaml: getScheduledWorkflowYaml('1m'),
99-
});
100-
expect(updateResponse.status).toBe(200);
101-
expect(updateResponse.data.id).toBe(workflowId);
102-
103-
const getAfter = await workflowsApi.rawGetWorkflow(workflowId);
104-
expect(getAfter.status).toBe(200);
105-
expect(getAfter.data.yaml).toContain('every: 1m');
106-
});
107-
});
108-
109-
// FLAKY: https://github.com/elastic/security-team/issues/16272
110-
spaceTest.describe.skip('Scheduled workflow execution', { tag: tags.deploymentAgnostic }, () => {
69+
spaceTest.describe('Scheduled workflow execution', { tag: tags.deploymentAgnostic }, () => {
11170
let workflowsApi: WorkflowsApiService;
11271
let workflowId: string;
11372

11473
spaceTest.beforeAll(async ({ apiServices }) => {
11574
spaceTest.setTimeout(60_000);
11675
workflowsApi = apiServices.workflowsApi;
117-
11876
const created = await workflowsApi.create(SHORT_RUNNING_SCHEDULED_WORKFLOW_YAML);
11977
workflowId = created.id;
12078
});
@@ -127,33 +85,49 @@ spaceTest.describe.skip('Scheduled workflow execution', { tag: tags.deploymentAg
12785
await workflowsApi.update(workflowId, { enabled: true });
12886
const expectedExecutions = 3;
12987

88+
// LCM(5s, 3s) = 15s per 3-execution cycle. Add a generous buffer for startup
89+
// jitter (adaptive polling, stale-execution cleanup) that can delay the first run.
13090
const { results } = await waitForConditionOrThrow({
13191
action: () => workflowsApi.getExecutions(workflowId),
13292
condition: ({ results: r }) => r.length >= expectedExecutions,
13393
interval: 1000,
134-
timeout: SCHEDULED_WORKFLOW_INTERVAL_SECONDS * 1000 * expectedExecutions,
135-
errorMessage: ({ results: r }) => `Expected > 2 executions, got ${r.length}`,
94+
timeout: 25_000,
95+
errorMessage: ({ results: r }) =>
96+
`Expected >= ${expectedExecutions} executions, got ${r.length}`,
13697
});
13798

138-
expect(results.length).toBeLessThan(5);
139-
14099
const completedExecutions = await Promise.all(
141100
results.map((e) => workflowsApi.waitForTermination({ workflowExecutionId: e.id }))
142101
);
143-
const completedExecutionsSorted = completedExecutions.toSorted((a, b) =>
144-
(a?.startedAt ?? '').localeCompare(b?.startedAt ?? '')
145-
);
146-
102+
// Filter out any executions that lack a startedAt timestamp before sorting.
103+
// Executions without startedAt produce NaN when used in Date arithmetic.
104+
const completedExecutionsSorted = completedExecutions
105+
.filter((e): e is NonNullable<typeof e> & { startedAt: string } => e?.startedAt != null)
106+
.toSorted((a, b) => a.startedAt.localeCompare(b.startedAt));
107+
108+
// Task Manager's poll-based scheduler produces uneven gaps when interval is not a
109+
// multiple of pollInterval. For interval=5s and pollInterval=3s the repeating
110+
// pattern is ~6s / ~6s / ~3s rather than ~5s / ~5s / ~5s.
111+
//
112+
// We assert only an upper bound on the gap between consecutive executions:
113+
// ≤ interval + TASK_MANAGER_POLL_INTERVAL_MS — at most one poll miss per interval
114+
//
115+
// No lower bound is asserted on `startedAt` gaps. `startedAt` is set by the
116+
// workflow engine after Task Manager claims the task, so cold-start overhead on
117+
// the first run (ES document creation, warm-up) shifts its timestamp later than
118+
// the actual claim time, making the first→second gap appear shorter than the
119+
// scheduling interval — even though the Task Manager timing was correct.
120+
const maxGapMs = SCHEDULED_WORKFLOW_INTERVAL_SECONDS * 1000 + TASK_MANAGER_POLL_INTERVAL_MS;
147121
for (let index = 1; index < completedExecutionsSorted.length; index++) {
148122
const currentExecution = completedExecutionsSorted[index];
149-
const currentStart = new Date(completedExecutionsSorted[index]?.startedAt ?? '').getTime();
150-
const previousStart = new Date(
151-
completedExecutionsSorted[index - 1]?.startedAt ?? ''
152-
).getTime();
153-
expect(currentStart - previousStart).toBeGreaterThan(
154-
SCHEDULED_WORKFLOW_INTERVAL_SECONDS * 1000 * 0.85 // 85% of the interval to reduce the risk of flakiness
155-
);
156-
expect(currentExecution?.status).toBe(ExecutionStatus.COMPLETED);
123+
const currentStart = new Date(currentExecution.startedAt).getTime();
124+
const previousStart = new Date(completedExecutionsSorted[index - 1].startedAt).getTime();
125+
const gap = currentStart - previousStart;
126+
expect(
127+
gap,
128+
`gap between run ${index} and run ${index + 1} should be ≤ interval + pollInterval`
129+
).toBeLessThan(maxGapMs + 1);
130+
expect(currentExecution.status).toBe(ExecutionStatus.COMPLETED);
157131
}
158132
});
159133

@@ -173,7 +147,8 @@ spaceTest.describe.skip('Scheduled workflow execution', { tag: tags.deploymentAg
173147
const { results: beforeDisable } = await workflowsApi.getExecutions(workflowId);
174148
const countBeforeDisable = beforeDisable.length;
175149

176-
// Wait another interval to confirm no new executions appear
150+
// 10 s > max single gap (~6 s for interval=5s / pollInterval=3s), so any
151+
// execution already in-flight will have completed and no new ones should start.
177152
await new Promise((resolve) => setTimeout(resolve, 10_000));
178153

179154
const { results: afterDisable } = await workflowsApi.getExecutions(workflowId);
@@ -187,9 +162,10 @@ spaceTest.describe.skip('Scheduled workflow execution', { tag: tags.deploymentAg
187162
spaceTest(
188163
'scheduled executions do not overlap when a previous run is still in progress',
189164
async () => {
190-
// Each execution takes ~7s (two 3s waits + console step), scheduled every 5s.
191-
// If the scheduler is reentrant, it must wait for the previous run to finish
192-
// before starting the next one, so consecutive starts should be >5s apart.
165+
// Workflow is scheduled every 5s with an 11s wait step (execution > interval).
166+
// The scheduler must wait for the active run to finish before starting the next.
167+
// We verify this by checking that no two consecutive runs overlap in time using
168+
// the finishedAt / startedAt fields from WorkflowExecutionDto.
193169
const createdLongRunningWorkflow = await workflowsApi.create(
194170
LONG_RUNNING_SCHEDULED_WORKFLOW_YAML
195171
);
@@ -199,7 +175,7 @@ spaceTest.describe.skip('Scheduled workflow execution', { tag: tags.deploymentAg
199175
condition: ({ results: r }) =>
200176
r.filter((e) => e.status === ExecutionStatus.COMPLETED).length >= 2,
201177
interval: 2000,
202-
timeout: SCHEDULED_WORKFLOW_INTERVAL_SECONDS * 1000 * 4,
178+
timeout: 40_000,
203179
errorMessage: ({ results: r }) =>
204180
`Expected >= 2 completed executions, got ${
205181
r.filter((e) => e.status === ExecutionStatus.COMPLETED).length
@@ -213,22 +189,26 @@ spaceTest.describe.skip('Scheduled workflow execution', { tag: tags.deploymentAg
213189
results.map((e) => workflowsApi.waitForTermination({ workflowExecutionId: e.id }))
214190
);
215191

216-
// Keep only completed executions, sorted chronologically by start time
192+
// Keep only completed executions with both timestamps, sorted by startedAt
217193
const completedExecutions = terminalExecutions
218-
.filter((e) => e?.status === ExecutionStatus.COMPLETED)
219-
.toSorted((a, b) => (a?.startedAt ?? '').localeCompare(b?.startedAt ?? ''));
194+
.filter(
195+
(e): e is NonNullable<typeof e> & { startedAt: string; finishedAt: string } =>
196+
e?.status === ExecutionStatus.COMPLETED && e.startedAt != null && e.finishedAt != null
197+
)
198+
.toSorted((a, b) => a.startedAt.localeCompare(b.startedAt));
220199

221-
// At least 2 completed executions are expected, but we can have more if the scheduler was reentrant
222200
expect(completedExecutions.length).toBeGreaterThan(1);
223201

224-
// Compare each consecutive pair: the gap between start times must exceed the 5s interval,
225-
// proving the scheduler waited for the prior run to finish rather than overlapping.
202+
// Non-overlap invariant: each run must finish before the next one starts.
203+
// This is a direct structural check that does not depend on the wait step
204+
// duration or on the Task Manager poll interval.
226205
for (let index = 1; index < completedExecutions.length; index++) {
227-
const currentStart = new Date(completedExecutions[index]?.startedAt ?? '').getTime();
228-
const previousStart = new Date(completedExecutions[index - 1]?.startedAt ?? '').getTime();
229-
expect(currentStart - previousStart).toBeGreaterThan(
230-
SCHEDULED_WORKFLOW_INTERVAL_SECONDS * 1000 * 2 * 0.85 // multiply by 2 because the wait step takes 6s that will be added to the interval
231-
);
206+
const previousFinished = new Date(completedExecutions[index - 1].finishedAt).getTime();
207+
const currentStarted = new Date(completedExecutions[index].startedAt).getTime();
208+
expect(
209+
currentStarted,
210+
`run ${index + 1} started before run ${index} finished (overlap detected)`
211+
).toBeGreaterThan(previousFinished);
232212
}
233213
}
234214
);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
import { tags } from '@kbn/scout';
11+
import { expect } from '@kbn/scout/api';
12+
import type { WorkflowsApiService } from '../../../common/apis/workflows';
13+
import { waitForConditionOrThrow } from '../../../common/utils/wait_for_condition';
14+
import { spaceTest } from '../../fixtures';
15+
16+
const getScheduledWorkflowYaml = (interval: string) => `
17+
name: Scout Scheduled Save/Update Test
18+
enabled: true
19+
description: Scheduled workflow for save/update interval change test
20+
triggers:
21+
- type: scheduled
22+
with:
23+
every: ${interval}
24+
steps:
25+
- name: log_step
26+
type: console
27+
with:
28+
message: "Scheduled execution"
29+
`;
30+
31+
spaceTest.describe('Scheduled workflow save and update', { tag: tags.deploymentAgnostic }, () => {
32+
let workflowsApi: WorkflowsApiService;
33+
let workflowId: string;
34+
35+
spaceTest.beforeAll(async ({ apiServices }) => {
36+
workflowsApi = apiServices.workflowsApi;
37+
const created = await workflowsApi.create(getScheduledWorkflowYaml('5s'));
38+
workflowId = created.id;
39+
});
40+
41+
spaceTest.afterAll(async () => {
42+
await workflowsApi.deleteAll();
43+
});
44+
45+
spaceTest('updating a scheduled workflow with a changed interval succeeds', async () => {
46+
await waitForConditionOrThrow({
47+
action: () => workflowsApi.getExecutions(workflowId),
48+
condition: ({ results: r }) => r.length >= 1,
49+
interval: 1000,
50+
timeout: 10000,
51+
errorMessage: 'No executions appeared after enabling the workflow',
52+
});
53+
const getBefore = await workflowsApi.rawGetWorkflow(workflowId);
54+
expect(getBefore.status).toBe(200);
55+
const workflowBeforeUpdate = getBefore.data;
56+
expect(workflowBeforeUpdate.yaml).toContain('every: 5s');
57+
58+
const updateResponse = await workflowsApi.rawUpdate(workflowId, {
59+
yaml: getScheduledWorkflowYaml('1m'),
60+
});
61+
expect(updateResponse.status).toBe(200);
62+
expect(updateResponse.data.id).toBe(workflowId);
63+
64+
const getAfter = await workflowsApi.rawGetWorkflow(workflowId);
65+
expect(getAfter.status).toBe(200);
66+
expect(getAfter.data.yaml).toContain('every: 1m');
67+
});
68+
});

0 commit comments

Comments
 (0)