Skip to content

Commit 1f9c11e

Browse files
authored
[One Workflow] Implement cancel-in-progress collision strategy (elastic#247217)
1 parent 3b40980 commit 1f9c11e

15 files changed

Lines changed: 1045 additions & 146 deletions

File tree

src/platform/packages/shared/kbn-workflows/spec/lib/generate_yaml_schema_from_connectors.ts

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,15 +41,13 @@ export function generateYamlSchemaFromConnectors(
4141

4242
if (loose) {
4343
return WorkflowSchema.partial().extend({
44-
settings: WorkflowSettingsSchema.omit({ concurrency: true }).optional(),
44+
settings: WorkflowSettingsSchema.optional(),
4545
steps: z.array(recursiveStepSchema).optional(),
4646
});
4747
}
4848

4949
return WorkflowSchema.extend({
50-
settings: getWorkflowSettingsSchema(recursiveStepSchema, loose)
51-
.omit({ concurrency: true })
52-
.optional(),
50+
settings: getWorkflowSettingsSchema(recursiveStepSchema, loose).optional(),
5351
steps: z.array(recursiveStepSchema),
5452
});
5553
}

src/platform/packages/shared/kbn-workflows/spec/schema.test.ts

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,8 @@ describe('ConcurrencySettingsSchema', () => {
180180

181181
describe('strategy', () => {
182182
it('should accept valid strategy values', () => {
183-
const strategies = ['queue', 'drop', 'cancel-in-progress'] as const;
183+
// Only 'cancel-in-progress' is currently implemented; 'queue' and 'drop' are TBD
184+
const strategies = ['cancel-in-progress'] as const;
184185
strategies.forEach((strategy) => {
185186
const result = ConcurrencySettingsSchema.safeParse({
186187
strategy,
@@ -263,7 +264,7 @@ describe('ConcurrencySettingsSchema', () => {
263264
// Verify the type can be used and matches the schema inference
264265
const testSettings: ConcurrencySettings = {
265266
key: '{{ event.host.name }}',
266-
strategy: 'queue',
267+
strategy: 'cancel-in-progress',
267268
max: 3,
268269
};
269270
const result = ConcurrencySettingsSchema.safeParse(testSettings);
@@ -282,14 +283,14 @@ describe('WorkflowSettingsSchema', () => {
282283
const result = WorkflowSettingsSchema.safeParse({
283284
concurrency: {
284285
key: '{{ event.host.name }}',
285-
strategy: 'queue',
286+
strategy: 'cancel-in-progress',
286287
max: 3,
287288
},
288289
});
289290
expect(result.success).toBe(true);
290291
if (result.success) {
291292
expect(result.data.concurrency?.key).toBe('{{ event.host.name }}');
292-
expect(result.data.concurrency?.strategy).toBe('queue');
293+
expect(result.data.concurrency?.strategy).toBe('cancel-in-progress');
293294
expect(result.data.concurrency?.max).toBe(3);
294295
}
295296
});
@@ -328,9 +329,10 @@ describe('WorkflowSettingsSchema', () => {
328329

329330
describe('CollisionStrategySchema', () => {
330331
it('should accept all valid strategy values', () => {
331-
expect(CollisionStrategySchema.safeParse('queue').success).toBe(true);
332-
expect(CollisionStrategySchema.safeParse('drop').success).toBe(true);
332+
// Only 'cancel-in-progress' is currently implemented; 'queue' and 'drop' are TBD
333333
expect(CollisionStrategySchema.safeParse('cancel-in-progress').success).toBe(true);
334+
expect(CollisionStrategySchema.safeParse('queue').success).toBe(false);
335+
expect(CollisionStrategySchema.safeParse('drop').success).toBe(false);
334336
});
335337

336338
it('should reject invalid strategy values', () => {
@@ -341,7 +343,7 @@ describe('WorkflowSettingsSchema', () => {
341343

342344
it('should export CollisionStrategy type that matches valid values', () => {
343345
// Verify the type can be used and matches the schema values
344-
const validStrategies: CollisionStrategy[] = ['queue', 'drop', 'cancel-in-progress'];
346+
const validStrategies: CollisionStrategy[] = ['cancel-in-progress']; // 'queue', 'drop', TBD
345347
validStrategies.forEach((strategy) => {
346348
const result = CollisionStrategySchema.safeParse(strategy);
347349
expect(result.success).toBe(true);

src/platform/packages/shared/kbn-workflows/spec/schema.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ export function getOnFailureStepSchema(stepSchema: z.ZodType, loose: boolean = f
5656
return schema;
5757
}
5858

59-
export const CollisionStrategySchema = z.enum(['queue', 'drop', 'cancel-in-progress']);
59+
export const CollisionStrategySchema = z.enum(['cancel-in-progress']); // 'queue', 'drop' TBD
6060
export type CollisionStrategy = z.infer<typeof CollisionStrategySchema>;
6161

6262
export const ConcurrencySettingsSchema = z.object({

src/platform/packages/shared/kbn-workflows/types/latest.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ export {
6363
ExecutionType,
6464
ExecutionStatusValues,
6565
ExecutionTypeValues,
66+
TerminalExecutionStatuses,
6667
SearchWorkflowCommandSchema,
6768
UpdateWorkflowCommandSchema,
6869
} from './v1';

src/platform/packages/shared/kbn-workflows/types/utils.ts

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import type {
1414
HttpMethod,
1515
InternalConnectorContract,
1616
} from './v1';
17-
import { ExecutionStatus, KNOWN_HTTP_METHODS } from './v1';
17+
import { ExecutionStatus, KNOWN_HTTP_METHODS, TerminalExecutionStatuses } from './v1';
1818
import type {
1919
BuiltInStepType,
2020
ElasticsearchStep,
@@ -60,14 +60,7 @@ export function isDangerousStatus(status: ExecutionStatus) {
6060
}
6161

6262
export function isTerminalStatus(status: ExecutionStatus) {
63-
const TerminalStatus: readonly ExecutionStatus[] = [
64-
ExecutionStatus.COMPLETED,
65-
ExecutionStatus.FAILED,
66-
ExecutionStatus.CANCELLED,
67-
ExecutionStatus.SKIPPED,
68-
ExecutionStatus.TIMED_OUT,
69-
];
70-
return TerminalStatus.includes(status);
63+
return TerminalExecutionStatuses.includes(status);
7164
}
7265

7366
export function isCancelableStatus(status: ExecutionStatus) {

src/platform/packages/shared/kbn-workflows/types/v1.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,14 @@ export enum ExecutionStatus {
2929
export type ExecutionStatusUnion = `${ExecutionStatus}`;
3030
export const ExecutionStatusValues = Object.values(ExecutionStatus);
3131

32+
export const TerminalExecutionStatuses: readonly ExecutionStatus[] = [
33+
ExecutionStatus.COMPLETED,
34+
ExecutionStatus.FAILED,
35+
ExecutionStatus.CANCELLED,
36+
ExecutionStatus.SKIPPED,
37+
ExecutionStatus.TIMED_OUT,
38+
] as const;
39+
3240
export enum ExecutionType {
3341
TEST = 'test',
3442
PRODUCTION = 'production',

src/platform/plugins/shared/workflows_execution_engine/integration_tests/mocks/workflow_execution_repository.mock.ts

Lines changed: 71 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
*/
99

1010
import type { EsWorkflowExecution } from '@kbn/workflows';
11-
import { ExecutionStatus } from '@kbn/workflows';
11+
import { TerminalExecutionStatuses } from '@kbn/workflows';
1212
import type { WorkflowExecutionRepository as WorkflowExecutionRepositoryType } from '../../server/repositories/workflow_execution_repository';
1313

1414
export class WorkflowExecutionRepositoryMock implements Required<WorkflowExecutionRepositoryType> {
@@ -102,19 +102,11 @@ export class WorkflowExecutionRepositoryMock implements Required<WorkflowExecuti
102102
spaceId: string,
103103
triggeredBy?: string
104104
): Promise<Array<{ _source: EsWorkflowExecution; _id: string; _index: string }>> {
105-
const terminalStatuses = [
106-
ExecutionStatus.COMPLETED,
107-
ExecutionStatus.FAILED,
108-
ExecutionStatus.CANCELLED,
109-
ExecutionStatus.SKIPPED,
110-
ExecutionStatus.TIMED_OUT,
111-
];
112-
113105
let results = Array.from(this.workflowExecutions.values()).filter(
114106
(exec) =>
115107
exec.workflowId === workflowId &&
116108
exec.spaceId === spaceId &&
117-
!terminalStatuses.includes(exec.status)
109+
!TerminalExecutionStatuses.includes(exec.status)
118110
);
119111

120112
if (triggeredBy) {
@@ -128,4 +120,73 @@ export class WorkflowExecutionRepositoryMock implements Required<WorkflowExecuti
128120
_index: 'workflows-executions',
129121
}));
130122
}
123+
124+
public async getRunningExecutionsByConcurrencyGroup(
125+
concurrencyGroupKey: string,
126+
spaceId: string,
127+
excludeExecutionId?: string,
128+
size: number = 5000
129+
): Promise<string[]> {
130+
const results = Array.from(this.workflowExecutions.values())
131+
.filter(
132+
(exec) =>
133+
exec.concurrencyGroupKey === concurrencyGroupKey &&
134+
exec.spaceId === spaceId &&
135+
!TerminalExecutionStatuses.includes(exec.status) &&
136+
(!excludeExecutionId || exec.id !== excludeExecutionId)
137+
)
138+
.sort((a, b) => {
139+
const aTime = new Date(a.createdAt).getTime();
140+
const bTime = new Date(b.createdAt).getTime();
141+
return aTime - bTime; // Oldest first
142+
})
143+
.map((exec) => exec.id)
144+
.slice(0, Math.min(size, 10000)); // Cap at ES default max_result_window
145+
146+
return results;
147+
}
148+
149+
public async bulkUpdateWorkflowExecutions(
150+
updates: Array<Partial<EsWorkflowExecution>>
151+
): Promise<void> {
152+
if (updates.length === 0) {
153+
return;
154+
}
155+
156+
// Validate all IDs are present
157+
for (const update of updates) {
158+
if (!update.id) {
159+
throw new Error('Workflow execution ID is required for bulk update');
160+
}
161+
}
162+
163+
// Validate all executions exist (matching Elasticsearch document_missing_exception behavior)
164+
const missingIds: string[] = [];
165+
for (const update of updates) {
166+
if (!this.workflowExecutions.has(update.id!)) {
167+
missingIds.push(update.id!);
168+
}
169+
}
170+
171+
if (missingIds.length > 0) {
172+
throw new Error(
173+
`Failed to update ${missingIds.length} workflow executions: ${JSON.stringify(
174+
missingIds.map((id) => ({
175+
id,
176+
error: { type: 'document_missing_exception', reason: 'document missing' },
177+
status: 404,
178+
}))
179+
)}`
180+
);
181+
}
182+
183+
// Perform updates
184+
for (const update of updates) {
185+
const existing = this.workflowExecutions.get(update.id!);
186+
this.workflowExecutions.set(update.id!, {
187+
...existing!,
188+
...update,
189+
} as EsWorkflowExecution);
190+
}
191+
}
131192
}

0 commit comments

Comments
 (0)