Skip to content

Commit 8fd2beb

Browse files
feat: add bulk workflow creation API endpoint (elastic#252214)
## Summary - Add `POST /api/workflows/_bulk_create` endpoint for creating multiple workflows in a single request - Uses ES bulk API under the hood for efficient batch indexing instead of sequential individual creates - Each workflow is validated independently — partial success is supported with per-item error reporting - Reduces Data Source onboarding latency (3-11 workflows created at once) from seconds to ~50ms ### Architecture ![Bulk Workflow Creation API — Before/After](https://raw.githubusercontent.com/shahargl/kibana/diagrams/pr-252214-architecture.jpg) [View interactive diagram on Excalidraw](https://excalidraw.com/#json=R7TUvsrOt28lBUJXRRDYu,3zDSTBDU8IblkjEx-6Ivmg) **Before (left)**: Each workflow created sequentially via individual `POST /api/workflows` calls. Each call: validate, ES index with `wait_for` refresh (~1s), schedule triggers. For 15 workflows = ~15,000ms total. **After (right)**: - Single `POST /api/workflows/_bulk_create` endpoint (`post_bulk_create_workflows.ts`) - `bulkCreateWorkflows()` in `workflows_management_service.ts` runs 3 phases: - **Phase 1**: Validate all workflows + prepare documents (Zod schema via `BulkCreateWorkflowsCommandSchema`) - **Phase 2**: Single `ES bulk()` call with `refresh: true` — indexes all valid workflows at once - **Phase 3**: Schedule triggers via `Promise.allSettled` (parallel, non-blocking) - Partial failure handling: returns `{ created: [...], failed: [...] }` - Result: **344x faster** (~44ms vs ~15,000ms for 15 workflows) ### API **Request:** ```json POST /api/workflows/_bulk_create { "workflows": [ { "yaml": "name: workflow-1 triggers: - type: manual steps: ..." }, { "yaml": "name: workflow-2 ...", "id": "workflow-custom-id" } ] } ``` **Response:** ```json { "created": [ { "id": "...", "name": "workflow-1", ... } ], "failed": [ { "index": 1, "error": "reason" } ] } ``` ### Benchmark (15 workflows) | Method | Total time | Per workflow | |--------|-----------|-------------| | Sequential (`POST /api/workflows` x15) | 15,163ms | 1,011ms | | Bulk (`POST /api/workflows/_bulk_create`) | 44ms | 3ms | | **Speedup** | **344x** | | ### Comparison with other Kibana bulk APIs | Plugin | Endpoint | Request Body | Response | |---|---|---|---| | Saved Objects | `/_bulk_create` | `[ { type, id?, attributes } ]` | `{ saved_objects: [...] }` | | Cases | `/cases/_bulk_create` | `{ cases: [ ... ] }` | `{ cases: [...] }` | | Fleet | `/package_policies/_bulk_create` | `{ policies: [...] }` | `{ created: [...], failed: [...] }` | | **Workflows** | `/workflows/_bulk_create` | `{ workflows: [ { yaml, id? } ] }` | `{ created: [...], failed: [...] }` | Follows the Fleet convention for response format (`{ created, failed }`) with partial success support. ### Files changed - `kbn-workflows/types/v1.ts` — `BulkCreateWorkflowsCommandSchema` schema - `kbn-workflows/types/latest.ts` — export new schema and type - `workflows_management_service.ts` — `bulkCreateWorkflows()` + shared helpers `prepareWorkflowDocument()`, `scheduleWorkflowTriggers()` - `workflows_management_api.ts` — API passthrough - `routes/post_bulk_create_workflows.ts` — route handler - `routes/index.ts` — register new route - Tests: 7 service tests + 5 route tests (216 total pass) ## References Closes elastic/security-team#15730 ### Architecture --------- Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent 2b0b6f1 commit 8fd2beb

8 files changed

Lines changed: 697 additions & 26 deletions

File tree

src/platform/packages/shared/kbn-workflows/types/latest.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
export type {
1111
// commands
12+
BulkCreateWorkflowsCommand,
1213
CreateWorkflowCommand,
1314
// elasticsearch documents types
1415
EsWorkflow,
@@ -65,6 +66,7 @@ export type {
6566
// exported full to use enum as values
6667
export {
6768
// command schemas
69+
BulkCreateWorkflowsCommandSchema,
6870
CreateWorkflowCommandSchema,
6971
ExecutionStatus,
7072
ExecutionType,

src/platform/packages/shared/kbn-workflows/types/v1.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,12 @@ export const CreateWorkflowCommandSchema = z.object({
247247
id: z.string().optional(),
248248
});
249249

250+
export const BulkCreateWorkflowsCommandSchema = z.object({
251+
workflows: z.array(CreateWorkflowCommandSchema),
252+
});
253+
254+
export type BulkCreateWorkflowsCommand = z.infer<typeof BulkCreateWorkflowsCommandSchema>;
255+
250256
export const UpdateWorkflowCommandSchema = z.object({
251257
name: z.string(),
252258
description: z.string().optional(),

src/platform/plugins/shared/workflows_management/server/workflows_management/routes/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import { registerGetWorkflowExecutionLogsRoute } from './get_workflow_execution_
2222
import { registerGetWorkflowExecutionsRoute } from './get_workflow_executions';
2323
import { registerGetWorkflowJsonSchemaRoute } from './get_workflow_json_schema';
2424
import { registerGetWorkflowStatsRoute } from './get_workflow_stats';
25+
import { registerPostBulkCreateWorkflowsRoute } from './post_bulk_create_workflows';
2526
import { registerPostCancelWorkflowExecutionRoute } from './post_cancel_workflow_execution';
2627
import { registerPostCloneWorkflowRoute } from './post_clone_workflow';
2728
import { registerPostCreateWorkflowRoute } from './post_create_workflow';
@@ -49,6 +50,7 @@ export function defineRoutes(
4950
registerGetConnectorsRoute(deps);
5051
registerPostSearchWorkflowsRoute(deps);
5152
registerPostCreateWorkflowRoute(deps);
53+
registerPostBulkCreateWorkflowsRoute(deps);
5254
registerPutUpdateWorkflowRoute(deps);
5355
registerDeleteWorkflowByIdRoute(deps);
5456
registerDeleteWorkflowsBulkRoute(deps);
Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
import { registerPostBulkCreateWorkflowsRoute } from './post_bulk_create_workflows';
11+
import {
12+
createMockResponse,
13+
createMockRouterInstance,
14+
createMockWorkflowsApi,
15+
createSpacesMock,
16+
mockLogger,
17+
} from './test_utils';
18+
import type { WorkflowsManagementApi } from '../workflows_management_api';
19+
20+
jest.mock('../lib/with_license_check');
21+
22+
describe('POST /api/workflows/_bulk_create', () => {
23+
let workflowsApi: WorkflowsManagementApi;
24+
let mockRouter: any;
25+
let mockSpaces: any;
26+
27+
beforeEach(() => {
28+
mockRouter = createMockRouterInstance();
29+
workflowsApi = createMockWorkflowsApi();
30+
mockSpaces = createSpacesMock();
31+
jest.clearAllMocks();
32+
});
33+
34+
describe('handler logic', () => {
35+
let routeHandler: any;
36+
37+
beforeEach(() => {
38+
registerPostBulkCreateWorkflowsRoute({
39+
router: mockRouter,
40+
api: workflowsApi,
41+
logger: mockLogger,
42+
spaces: mockSpaces,
43+
});
44+
const postCall = (mockRouter.post as jest.Mock).mock.calls.find(
45+
(call) => call[0].path === '/api/workflows/_bulk_create'
46+
);
47+
routeHandler = postCall?.[1];
48+
});
49+
50+
it('should bulk create workflows successfully', async () => {
51+
const mockResult = {
52+
created: [
53+
{ id: 'workflow-1', name: 'Workflow 1', enabled: true, valid: true },
54+
{ id: 'workflow-2', name: 'Workflow 2', enabled: true, valid: true },
55+
],
56+
failed: [],
57+
};
58+
59+
workflowsApi.bulkCreateWorkflows = jest.fn().mockResolvedValue(mockResult);
60+
61+
const mockContext = {};
62+
const mockRequest = {
63+
body: {
64+
workflows: [
65+
{ yaml: 'name: Workflow 1\ntriggers:\n - type: manual\nsteps: []' },
66+
{ yaml: 'name: Workflow 2\ntriggers:\n - type: manual\nsteps: []' },
67+
],
68+
},
69+
headers: {},
70+
url: { pathname: '/api/workflows/_bulk_create' },
71+
};
72+
const mockResponse = createMockResponse();
73+
74+
await routeHandler(mockContext, mockRequest, mockResponse);
75+
76+
expect(workflowsApi.bulkCreateWorkflows).toHaveBeenCalledWith(
77+
mockRequest.body.workflows,
78+
'default',
79+
mockRequest
80+
);
81+
expect(mockResponse.ok).toHaveBeenCalledWith({ body: mockResult });
82+
});
83+
84+
it('should handle partial failures', async () => {
85+
const mockResult = {
86+
created: [{ id: 'workflow-1', name: 'Workflow 1', enabled: true, valid: true }],
87+
failed: [{ index: 1, error: 'Invalid YAML syntax' }],
88+
};
89+
90+
workflowsApi.bulkCreateWorkflows = jest.fn().mockResolvedValue(mockResult);
91+
92+
const mockContext = {};
93+
const mockRequest = {
94+
body: {
95+
workflows: [
96+
{ yaml: 'name: Workflow 1\ntriggers:\n - type: manual\nsteps: []' },
97+
{ yaml: 'invalid: yaml: [' },
98+
],
99+
},
100+
headers: {},
101+
url: { pathname: '/api/workflows/_bulk_create' },
102+
};
103+
const mockResponse = createMockResponse();
104+
105+
await routeHandler(mockContext, mockRequest, mockResponse);
106+
107+
expect(mockResponse.ok).toHaveBeenCalledWith({ body: mockResult });
108+
});
109+
110+
it('should handle API errors gracefully', async () => {
111+
const errorMessage = 'Elasticsearch connection failed';
112+
workflowsApi.bulkCreateWorkflows = jest.fn().mockRejectedValue(new Error(errorMessage));
113+
114+
const mockContext = {};
115+
const mockRequest = {
116+
body: {
117+
workflows: [{ yaml: 'name: Test Workflow' }],
118+
},
119+
headers: {},
120+
url: { pathname: '/api/workflows/_bulk_create' },
121+
};
122+
const mockResponse = createMockResponse();
123+
124+
await routeHandler(mockContext, mockRequest, mockResponse);
125+
126+
expect(mockResponse.customError).toHaveBeenCalledWith({
127+
statusCode: 500,
128+
body: {
129+
message: `Internal server error: Error: ${errorMessage}`,
130+
},
131+
});
132+
});
133+
134+
it('should work with different space contexts', async () => {
135+
const mockResult = {
136+
created: [{ id: 'workflow-1', name: 'Workflow 1' }],
137+
failed: [],
138+
};
139+
140+
workflowsApi.bulkCreateWorkflows = jest.fn().mockResolvedValue(mockResult);
141+
mockSpaces.getSpaceId = jest.fn().mockReturnValue('custom-space');
142+
143+
const mockContext = {};
144+
const mockRequest = {
145+
body: {
146+
workflows: [{ yaml: 'name: Workflow 1' }],
147+
},
148+
headers: {},
149+
url: { pathname: '/s/custom-space/api/workflows/_bulk_create' },
150+
};
151+
const mockResponse = createMockResponse();
152+
153+
await routeHandler(mockContext, mockRequest, mockResponse);
154+
155+
expect(workflowsApi.bulkCreateWorkflows).toHaveBeenCalledWith(
156+
mockRequest.body.workflows,
157+
'custom-space',
158+
mockRequest
159+
);
160+
expect(mockResponse.ok).toHaveBeenCalledWith({ body: mockResult });
161+
});
162+
163+
it('should support custom IDs in bulk create', async () => {
164+
const mockResult = {
165+
created: [
166+
{ id: 'workflow-custom-1', name: 'Workflow 1' },
167+
{ id: 'workflow-custom-2', name: 'Workflow 2' },
168+
],
169+
failed: [],
170+
};
171+
172+
workflowsApi.bulkCreateWorkflows = jest.fn().mockResolvedValue(mockResult);
173+
174+
const mockContext = {};
175+
const mockRequest = {
176+
body: {
177+
workflows: [
178+
{ yaml: 'name: Workflow 1', id: 'workflow-custom-1' },
179+
{ yaml: 'name: Workflow 2', id: 'workflow-custom-2' },
180+
],
181+
},
182+
headers: {},
183+
url: { pathname: '/api/workflows/_bulk_create' },
184+
};
185+
const mockResponse = createMockResponse();
186+
187+
await routeHandler(mockContext, mockRequest, mockResponse);
188+
189+
expect(workflowsApi.bulkCreateWorkflows).toHaveBeenCalledWith(
190+
mockRequest.body.workflows,
191+
'default',
192+
mockRequest
193+
);
194+
expect(mockResponse.ok).toHaveBeenCalledWith({ body: mockResult });
195+
});
196+
});
197+
});
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
import { BulkCreateWorkflowsCommandSchema } from '@kbn/workflows';
11+
import { WORKFLOW_ROUTE_OPTIONS } from './route_constants';
12+
import { handleRouteError } from './route_error_handlers';
13+
import { WORKFLOW_CREATE_SECURITY } from './route_security';
14+
import type { RouteDependencies } from './types';
15+
import { withLicenseCheck } from '../lib/with_license_check';
16+
17+
export function registerPostBulkCreateWorkflowsRoute({
18+
router,
19+
api,
20+
logger,
21+
spaces,
22+
}: RouteDependencies) {
23+
router.post(
24+
{
25+
path: '/api/workflows/_bulk_create',
26+
options: WORKFLOW_ROUTE_OPTIONS,
27+
security: WORKFLOW_CREATE_SECURITY,
28+
validate: {
29+
body: BulkCreateWorkflowsCommandSchema,
30+
},
31+
},
32+
withLicenseCheck(async (context, request, response) => {
33+
try {
34+
const spaceId = spaces.getSpaceId(request);
35+
const result = await api.bulkCreateWorkflows(request.body.workflows, spaceId, request);
36+
return response.ok({ body: result });
37+
} catch (error) {
38+
return handleRouteError(response, error);
39+
}
40+
})
41+
);
42+
}

src/platform/plugins/shared/workflows_management/server/workflows_management/workflows_management_api.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,17 @@ export class WorkflowsManagementApi {
135135
return this.workflowsService.createWorkflow(workflow, spaceId, request);
136136
}
137137

138+
public async bulkCreateWorkflows(
139+
workflows: CreateWorkflowCommand[],
140+
spaceId: string,
141+
request: KibanaRequest
142+
): Promise<{
143+
created: WorkflowDetailDto[];
144+
failed: Array<{ index: number; error: string }>;
145+
}> {
146+
return this.workflowsService.bulkCreateWorkflows(workflows, spaceId, request);
147+
}
148+
138149
public async cloneWorkflow(
139150
workflow: WorkflowDetailDto,
140151
spaceId: string,

0 commit comments

Comments
 (0)