Skip to content

Commit 0d1a507

Browse files
semdelasticmachine
andauthored
[Actions] Add signal to the action execution (elastic#249931)
## Summary closes: elastic#249928 Adds abort signal support to connector executions in the Actions framework. - Actions executor now receives an optional `signal` parameter - The signal is integrated in the axios config for all v2 connectors. - HTTP connector (v1) updated to pass the signal down to the axios request. - The rest of V1 connectors can now pass the abort signal in their axios requests (optional) - Workflows now always pass workflow execution signal down to connector executions (needed for timeouts, cancellations...) --------- Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
1 parent 18f0c06 commit 0d1a507

20 files changed

Lines changed: 510 additions & 30 deletions

File tree

Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
import type { ActionTypeExecutorResult } from '@kbn/actions-plugin/common';
11+
import type { ActionsClient } from '@kbn/actions-plugin/server';
12+
import type { ConnectorWithExtraFindData } from '@kbn/actions-plugin/server/application/connector/types';
13+
import { ConnectorExecutor } from './connector_executor';
14+
15+
describe('ConnectorExecutor', () => {
16+
let mockActionsClient: jest.Mocked<ActionsClient>;
17+
let connectorExecutor: ConnectorExecutor;
18+
19+
beforeEach(() => {
20+
mockActionsClient = {
21+
execute: jest.fn(),
22+
getAll: jest.fn(() =>
23+
Promise.resolve([
24+
{
25+
id: '123e4567-e89b-12d3-a456-426614174000',
26+
name: 'test-connector',
27+
actionTypeId: 'http',
28+
},
29+
] as ConnectorWithExtraFindData[])
30+
),
31+
} as unknown as jest.Mocked<ActionsClient>;
32+
33+
connectorExecutor = new ConnectorExecutor(mockActionsClient);
34+
});
35+
36+
afterEach(() => {
37+
jest.clearAllMocks();
38+
});
39+
40+
describe('execute', () => {
41+
const connectorType = 'http';
42+
const connectorName = 'test-connector';
43+
const input = { url: 'https://example.com', method: 'GET' };
44+
const abortController = new AbortController();
45+
46+
it('should throw error if connector type is missing', async () => {
47+
await expect(
48+
connectorExecutor.execute({
49+
connectorType: '',
50+
connectorNameOrId: connectorName,
51+
input,
52+
abortController,
53+
})
54+
).rejects.toThrow('Connector type is required');
55+
});
56+
57+
it('should execute connector with UUID connector ID', async () => {
58+
const connectorId = '123e4567-e89b-12d3-a456-426614174000';
59+
const expectedResult: ActionTypeExecutorResult<unknown> = {
60+
status: 'ok',
61+
actionId: connectorId,
62+
data: { status: 200 },
63+
};
64+
65+
mockActionsClient.execute.mockResolvedValue(expectedResult);
66+
67+
const result = await connectorExecutor.execute({
68+
connectorType,
69+
connectorNameOrId: connectorId,
70+
input,
71+
abortController,
72+
});
73+
74+
expect(mockActionsClient.execute).toHaveBeenCalledWith({
75+
actionId: connectorId,
76+
params: input,
77+
signal: abortController.signal,
78+
});
79+
expect(result).toEqual(expectedResult);
80+
});
81+
82+
it('should resolve connector ID by name and execute', async () => {
83+
const connectorId = 'resolved-connector-id';
84+
const name = 'my-connector';
85+
const expectedResult: ActionTypeExecutorResult<unknown> = {
86+
status: 'ok',
87+
actionId: connectorId,
88+
data: { status: 200 },
89+
};
90+
91+
const mockConnectors: ConnectorWithExtraFindData[] = [
92+
{
93+
id: connectorId,
94+
name,
95+
actionTypeId: connectorType,
96+
} as ConnectorWithExtraFindData,
97+
];
98+
99+
mockActionsClient.getAll.mockResolvedValue(mockConnectors);
100+
mockActionsClient.execute.mockResolvedValue(expectedResult);
101+
102+
const result = await connectorExecutor.execute({
103+
connectorType,
104+
connectorNameOrId: name,
105+
input,
106+
abortController,
107+
});
108+
109+
expect(mockActionsClient.getAll).toHaveBeenCalled();
110+
expect(mockActionsClient.execute).toHaveBeenCalledWith({
111+
actionId: connectorId,
112+
params: input,
113+
signal: abortController.signal,
114+
});
115+
expect(result).toEqual(expectedResult);
116+
});
117+
118+
it('should throw error if connector not found by name', async () => {
119+
mockActionsClient.getAll.mockResolvedValue([]);
120+
121+
await expect(
122+
connectorExecutor.execute({
123+
connectorType,
124+
connectorNameOrId: 'non-existent',
125+
input,
126+
abortController,
127+
})
128+
).rejects.toThrow('Connector non-existent not found');
129+
});
130+
131+
it('should pass abort signal to actions client', async () => {
132+
const connectorId = '123e4567-e89b-12d3-a456-426614174000';
133+
const expectedResult: ActionTypeExecutorResult<unknown> = {
134+
status: 'ok',
135+
actionId: connectorId,
136+
data: { status: 200 },
137+
};
138+
139+
mockActionsClient.execute.mockResolvedValue(expectedResult);
140+
141+
const testAbortController = new AbortController();
142+
await connectorExecutor.execute({
143+
connectorType,
144+
connectorNameOrId: connectorId,
145+
input,
146+
abortController: testAbortController,
147+
});
148+
149+
expect(mockActionsClient.execute).toHaveBeenCalledWith({
150+
actionId: connectorId,
151+
params: input,
152+
signal: testAbortController.signal,
153+
});
154+
});
155+
156+
it('should handle abort signal during execution', async () => {
157+
const connectorId = '123e4567-e89b-12d3-a456-426614174000';
158+
const testAbortController = new AbortController();
159+
160+
// Make execute take some time, and abort mid-flight once the listener is registered
161+
mockActionsClient.execute.mockImplementation(
162+
() =>
163+
new Promise<ActionTypeExecutorResult<unknown>>((resolve) => {
164+
// Abort after a short delay so the abort event listener in runConnector
165+
// is already registered when the signal fires
166+
setTimeout(() => testAbortController.abort(), 10);
167+
setTimeout(() => {
168+
resolve({
169+
status: 'ok',
170+
actionId: connectorId,
171+
data: { status: 200 },
172+
});
173+
}, 100);
174+
})
175+
);
176+
177+
const executePromise = connectorExecutor.execute({
178+
connectorType,
179+
connectorNameOrId: connectorId,
180+
input,
181+
abortController: testAbortController,
182+
});
183+
184+
await expect(executePromise).rejects.toThrow(
185+
`Action type "${connectorType}" with ID "${connectorId}" execution was aborted`
186+
);
187+
});
188+
});
189+
});

src/platform/plugins/shared/workflows_execution_engine/server/connector_executor.ts

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@ export class ConnectorExecutor {
2222
abortController: AbortController;
2323
}): Promise<ActionTypeExecutorResult<unknown>> {
2424
const { connectorType, connectorNameOrId, input, abortController } = params;
25+
if (!connectorType) {
26+
throw new Error('Connector type is required');
27+
}
28+
2529
const actionId = await this.resolveConnectorId(connectorNameOrId);
2630

2731
return this.runConnector({ actionTypeId: connectorType, actionId, input, abortController });
@@ -34,6 +38,9 @@ export class ConnectorExecutor {
3438
abortController: AbortController;
3539
}): Promise<ActionTypeExecutorResult<unknown>> {
3640
const { connectorType, input, abortController } = params;
41+
if (!connectorType) {
42+
throw new Error('Connector type is required');
43+
}
3744
// The InMemoryConnector with prefixed "system-connector-" is created by the actions framework
3845
const actionId = `system-connector-${connectorType}`;
3946

@@ -48,24 +55,30 @@ export class ConnectorExecutor {
4855
abortController: AbortController;
4956
}): Promise<ActionTypeExecutorResult<unknown>> {
5057
const { actionTypeId, actionId, input, abortController } = params;
51-
// Execute the connector via the actions client
52-
const executeActionPromise = this.actionsClient.execute({ actionId, params: input });
53-
54-
const abortPromise = new Promise<void>((_resolve, reject) => {
55-
abortController.signal.addEventListener('abort', () =>
56-
reject(
57-
new Error(`Action type "${actionTypeId}" with ID "${actionId}" execution was aborted`)
58-
)
58+
59+
const executeActionPromise = this.actionsClient.execute({
60+
actionId,
61+
params: input,
62+
signal: abortController.signal,
63+
});
64+
65+
const abortPromise = new Promise<ActionTypeExecutorResult<unknown>>((_resolve, reject) => {
66+
abortController.signal.addEventListener(
67+
'abort',
68+
() => reject(this.createAbortError(actionTypeId, actionId)),
69+
{ once: true }
5970
);
6071
});
6172

6273
// If the abort signal is triggered, the abortPromise will reject first
6374
// Otherwise, the executeActionPromise will resolve first
6475
// This ensures that we handle cancellation properly.
65-
// This is a workaround for the fact that connectors do not natively support cancellation.
66-
// In the future, if connectors support cancellation, we can remove this logic.
67-
await Promise.race([abortPromise, executeActionPromise]);
68-
return executeActionPromise;
76+
// In the future, if all connectors support cancellation, we can remove this logic.
77+
return Promise.race([abortPromise, executeActionPromise]);
78+
}
79+
80+
private createAbortError(actionTypeId: string, actionId: string): Error {
81+
return new Error(`Action type "${actionTypeId}" with ID "${actionId}" execution was aborted`);
6982
}
7083

7184
private async resolveConnectorId(connectorName: string): Promise<string> {

src/platform/plugins/shared/workflows_execution_engine/server/step/node_implementation.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,12 @@ export abstract class BaseAtomicNodeImplementation<TStep extends BaseStep>
124124
}
125125

126126
public async run(): Promise<void> {
127+
// If the execution is already aborted, do not start the step, navigate to make the execution finish properly
128+
if (this.stepExecutionRuntime.abortController.signal.aborted) {
129+
this.workflowExecutionRuntime.navigateToNextNode();
130+
return;
131+
}
132+
127133
let input: any;
128134
this.stepExecutionRuntime.startStep();
129135
// flush event logs after start step

x-pack/platform/plugins/shared/actions/server/application/connector/methods/execute/execute.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ export async function execute(
1919
connectorExecuteParams: ConnectorExecuteParams
2020
): Promise<ActionTypeExecutorResult<unknown>> {
2121
const log = context.logger;
22-
const { actionId, params, source, relatedSavedObjects } = connectorExecuteParams;
22+
const { actionId, params, source, relatedSavedObjects, signal } = connectorExecuteParams;
2323
let actionTypeId: string | undefined;
2424

2525
try {
@@ -62,5 +62,6 @@ export async function execute(
6262
relatedSavedObjects,
6363
actionExecutionId: uuidv4(),
6464
connectorTokenClient: context.connectorTokenClient,
65+
signal,
6566
});
6667
}

x-pack/platform/plugins/shared/actions/server/lib/action_executor.test.ts

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -355,6 +355,7 @@ describe('Action Executor', () => {
355355
logger: loggerMock,
356356
connectorUsageCollector: expect.any(ConnectorUsageCollector),
357357
...(executeUnsecure ? {} : { source: SOURCE }),
358+
signal: undefined,
358359
});
359360

360361
expect(loggerMock.debug).toBeCalledWith('executing action test:1: 1');
@@ -399,9 +400,46 @@ describe('Action Executor', () => {
399400
'x-custom-header': 'custom-header-value',
400401
},
401402
...(executeUnsecure ? {} : { source: SOURCE }),
403+
signal: undefined,
402404
});
403405
});
404406

407+
if (!executeUnsecure) {
408+
test(`successfully ${label} with abort signal`, async () => {
409+
mockGetRequestBodyByte.mockReturnValue(300);
410+
encryptedSavedObjectsClient.getDecryptedAsInternalUser.mockResolvedValueOnce(
411+
connectorSavedObject
412+
);
413+
connectorTypeRegistry.get.mockReturnValueOnce(connectorType);
414+
415+
const abortController = new AbortController();
416+
const executeParamsWithSignal = {
417+
...executeParams,
418+
signal: abortController.signal,
419+
};
420+
421+
await actionExecutor.execute(executeParamsWithSignal);
422+
423+
expect(connectorType.executor).toHaveBeenCalledWith(
424+
expect.objectContaining({
425+
actionId: CONNECTOR_ID,
426+
services: expect.anything(),
427+
config: {
428+
bar: true,
429+
},
430+
secrets: {
431+
baz: true,
432+
},
433+
params: { foo: true },
434+
logger: loggerMock,
435+
connectorUsageCollector: expect.any(ConnectorUsageCollector),
436+
source: SOURCE,
437+
signal: abortController.signal,
438+
})
439+
);
440+
});
441+
}
442+
405443
for (const executionSource of [
406444
{
407445
name: `http`,

x-pack/platform/plugins/shared/actions/server/lib/action_executor.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ export interface ExecuteOptions<Source = unknown> {
9595
source?: ActionExecutionSource<Source>;
9696
taskInfo?: TaskInfo;
9797
connectorTokenClient?: ConnectorTokenClientContract;
98+
signal?: AbortSignal;
9899
}
99100

100101
type ExecuteHelperOptions<Source = unknown> = Omit<ExecuteOptions<Source>, 'request'> & {
@@ -155,6 +156,7 @@ export class ActionExecutor {
155156
spaceId: spaceIdOverride,
156157
source,
157158
taskInfo,
159+
signal,
158160
}: ExecuteOptions): Promise<ActionTypeExecutorResult<unknown>> {
159161
const {
160162
actionTypeRegistry,
@@ -201,6 +203,7 @@ export class ActionExecutor {
201203
source,
202204
spaceId,
203205
taskInfo,
206+
signal,
204207
});
205208
}
206209

@@ -395,6 +398,7 @@ export class ActionExecutor {
395398
source,
396399
spaceId,
397400
taskInfo,
401+
signal,
398402
}: ExecuteHelperOptions): Promise<ActionTypeExecutorResult<unknown>> {
399403
if (!this.isInitialized) {
400404
throw new Error('ActionExecutor not initialized');
@@ -561,6 +565,7 @@ export class ActionExecutor {
561565
...(actionType.isSystemActionType ? { request } : {}),
562566
connectorUsageCollector,
563567
connectorTokenClient,
568+
signal,
564569
});
565570

566571
if (rawResult && rawResult.status === 'error') {

0 commit comments

Comments
 (0)