Skip to content

Commit 9365dc1

Browse files
jcgercnasikas
andauthored
[Alerting v2] Flag rule ESQL query errors as user errors (#270643)
## Summary Implements part of elastic/rna-program#430 This PR introduces `isEsqlUserError`: a small predicate that returns `true` for `ResponseError` with status 400 or 404, and applies it in two places: - **Main rule query** (`QueryService.executeQueryStream`): on a user error, wraps the thrown error with `createTaskRunError(..., TaskErrorSource.USER)` before rethrowing. Non-user errors (5xx, network, cancellation, Arrow parse errors) are rethrown unchanged. - **Recovery query** (`CreateRecoveryEventsStep`): same wrapping applied to the `recovery_policy.type === 'query'` execution path. --------- Co-authored-by: Christos Nasikas <xristosnasikas@gmail.com>
1 parent 253a918 commit 9365dc1

7 files changed

Lines changed: 224 additions & 34 deletions

File tree

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
import type { DiagnosticResult } from '@elastic/elasticsearch';
9+
import { errors } from '@elastic/elasticsearch';
10+
import { isEsqlUserError } from './esql_user_error';
11+
12+
const makeResponseError = (statusCode: number) =>
13+
new errors.ResponseError({ statusCode } as DiagnosticResult);
14+
15+
describe('isEsqlUserError', () => {
16+
it.each([400, 401, 403, 404])(
17+
'returns true for ResponseError with statusCode %i',
18+
(statusCode) => {
19+
expect(isEsqlUserError(makeResponseError(statusCode))).toBe(true);
20+
}
21+
);
22+
23+
it('returns false for ResponseError with statusCode 503', () => {
24+
expect(isEsqlUserError(makeResponseError(503))).toBe(false);
25+
});
26+
27+
it('returns false for a plain Error', () => {
28+
expect(isEsqlUserError(new Error('something went wrong'))).toBe(false);
29+
});
30+
});
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
import { isResponseError } from '@kbn/es-errors';
9+
10+
// 400: syntax/semantic ES|QL query errors (verification_exception, parsing_exception)
11+
// 404: unknown index referenced in the query
12+
const USER_ERROR_STATUS_CODES = new Set<number | undefined>([400, 401, 403, 404]);
13+
14+
export const isEsqlUserError = (error: unknown): boolean =>
15+
isResponseError(error) && USER_ERROR_STATUS_CODES.has(error.statusCode);

x-pack/platform/plugins/shared/alerting_v2/server/lib/rule_executor/steps/create_recovery_events_step.test.ts

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,11 @@
55
* 2.0.
66
*/
77

8+
import type { DiagnosticResult } from '@elastic/elasticsearch';
9+
import { errors } from '@elastic/elasticsearch';
810
import { CreateRecoveryEventsStep } from './create_recovery_events_step';
11+
import { TaskErrorSource } from '@kbn/task-manager-plugin/server';
12+
import { getErrorSource } from '@kbn/task-manager-plugin/server/task_running';
913
import {
1014
collectStreamResults,
1115
createPipelineStream,
@@ -14,6 +18,7 @@ import {
1418
createAlertEvent,
1519
createRuleResponse,
1620
createEsqlResponse,
21+
getStepError,
1722
} from '../test_utils';
1823
import { createLoggerService } from '../../services/logger_service/logger_service.mock';
1924
import { createQueryService } from '../../services/query_service/query_service.mock';
@@ -331,6 +336,80 @@ describe('CreateRecoveryEventsStep', () => {
331336
expect(result.state.alertEventsBatch![0].status).toBe('breached');
332337
expect(result.state.alertEventsBatch![0].group_hash).toBe('hash-new');
333338
});
339+
340+
it('marks ResponseError(400) recovery query errors as TaskErrorSource.USER', async () => {
341+
const { step, internalEsClient, scopedEsClient } = createStep();
342+
343+
internalEsClient.esql.query.mockResolvedValue(createActiveGroupHashesResponse(['hash-1']));
344+
scopedEsClient.esql.query.mockRejectedValue(
345+
// @ts-expect-error: Not all params are needed for the test.
346+
new errors.ResponseError({ statusCode: 400 })
347+
);
348+
349+
const state = createRulePipelineState({
350+
rule: createRuleResponse({
351+
kind: 'alert',
352+
recovery_policy: {
353+
type: 'query',
354+
query: { base: 'FROM logs-* | WHERE invalid syntax' },
355+
},
356+
}),
357+
alertEventsBatch: [],
358+
});
359+
360+
const error = await getStepError(step, state);
361+
362+
expect(error).toBeInstanceOf(Error);
363+
expect(getErrorSource(error!)).toBe(TaskErrorSource.USER);
364+
});
365+
366+
it('does not mark ResponseError(503) recovery query errors as TaskErrorSource.USER', async () => {
367+
const { step, internalEsClient, scopedEsClient } = createStep();
368+
369+
internalEsClient.esql.query.mockResolvedValue(createActiveGroupHashesResponse(['hash-1']));
370+
scopedEsClient.esql.query.mockRejectedValue(
371+
new errors.ResponseError({ statusCode: 503 } as DiagnosticResult)
372+
);
373+
374+
const state = createRulePipelineState({
375+
rule: createRuleResponse({
376+
kind: 'alert',
377+
recovery_policy: {
378+
type: 'query',
379+
query: { base: 'FROM logs-*' },
380+
},
381+
}),
382+
alertEventsBatch: [],
383+
});
384+
385+
const error = await getStepError(step, state);
386+
387+
expect(error).toBeInstanceOf(Error);
388+
expect(getErrorSource(error!)).toBeUndefined();
389+
});
390+
391+
it('does not mark plain recovery query errors as TaskErrorSource.USER', async () => {
392+
const { step, internalEsClient, scopedEsClient } = createStep();
393+
394+
internalEsClient.esql.query.mockResolvedValue(createActiveGroupHashesResponse(['hash-1']));
395+
scopedEsClient.esql.query.mockRejectedValue(new Error('connection reset'));
396+
397+
const state = createRulePipelineState({
398+
rule: createRuleResponse({
399+
kind: 'alert',
400+
recovery_policy: {
401+
type: 'query',
402+
query: { base: 'FROM logs-*' },
403+
},
404+
}),
405+
alertEventsBatch: [],
406+
});
407+
408+
const error = await getStepError(step, state);
409+
410+
expect(error).toBeInstanceOf(Error);
411+
expect(getErrorSource(error!)).toBeUndefined();
412+
});
334413
});
335414

336415
describe('abort signal', () => {

x-pack/platform/plugins/shared/alerting_v2/server/lib/rule_executor/steps/create_recovery_events_step.ts

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,10 @@
66
*/
77

88
import { inject, injectable } from 'inversify';
9+
import { createTaskRunError, TaskErrorSource } from '@kbn/task-manager-plugin/server';
910
import { stableStringify } from '@kbn/std';
1011
import { recoveryPolicyType } from '@kbn/alerting-v2-schemas';
12+
import { isEsqlUserError } from '../../errors/esql_user_error';
1113
import type { PipelineStateStream, RuleExecutionStep, RulePipelineState } from '../types';
1214
import { buildRecoveryAlertEvents, buildQueryRecoveryAlertEvents } from '../build_alert_events';
1315
import { getQueryPayload } from '../get_query_payload';
@@ -126,22 +128,29 @@ export class CreateRecoveryEventsStep implements RuleExecutionStep {
126128
})}`,
127129
});
128130

129-
const esqlResponse = await this.scopedQueryService.executeQuery({
130-
query: effectiveQuery,
131-
filter: queryPayload.filter,
132-
params: queryPayload.params,
133-
abortSignal: input.executionContext.signal,
134-
});
131+
try {
132+
const esqlResponse = await this.scopedQueryService.executeQuery({
133+
query: effectiveQuery,
134+
filter: queryPayload.filter,
135+
params: queryPayload.params,
136+
abortSignal: input.executionContext.signal,
137+
});
135138

136-
return buildQueryRecoveryAlertEvents({
137-
ruleId: rule.id,
138-
ruleVersion: 1,
139-
spaceId: input.spaceId,
140-
ruleAttributes: rule,
141-
activeGroupHashes,
142-
esqlResponse,
143-
scheduledTimestamp: input.scheduledAt,
144-
});
139+
return buildQueryRecoveryAlertEvents({
140+
ruleId: rule.id,
141+
ruleVersion: 1,
142+
spaceId: input.spaceId,
143+
ruleAttributes: rule,
144+
activeGroupHashes,
145+
esqlResponse,
146+
scheduledTimestamp: input.scheduledAt,
147+
});
148+
} catch (error) {
149+
if (isEsqlUserError(error)) {
150+
throw createTaskRunError(error as Error, TaskErrorSource.USER);
151+
}
152+
throw error;
153+
}
145154
}
146155

147156
private async fetchActiveAlertGroupHashes(

x-pack/platform/plugins/shared/alerting_v2/server/lib/rule_executor/steps/execute_rule_query_step.test.ts

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,18 @@
55
* 2.0.
66
*/
77

8+
import type { DiagnosticResult } from '@elastic/elasticsearch';
9+
import { errors } from '@elastic/elasticsearch';
10+
import { TaskErrorSource } from '@kbn/task-manager-plugin/server';
11+
import { getErrorSource } from '@kbn/task-manager-plugin/server/task_running';
812
import { ExecuteRuleQueryStep } from './execute_rule_query_step';
913
import {
1014
collectStreamResults,
1115
createPipelineStream,
1216
createRuleExecutionInput,
1317
createRuleResponse,
1418
createRulePipelineState,
19+
getStepError,
1520
mockHelpersEsqlArrowBatches,
1621
mockHelpersEsqlToArrowReader,
1722
} from '../test_utils';
@@ -91,6 +96,34 @@ describe('ExecuteRuleQueryStep', () => {
9196
).rejects.toThrow('Query execution failed');
9297
});
9398

99+
it('marks ResponseError(400) ES|QL errors as TaskErrorSource.USER', async () => {
100+
mockHelpersEsqlToArrowReader(
101+
mockEsClient,
102+
jest.fn().mockRejectedValue(new errors.ResponseError({ statusCode: 400 } as DiagnosticResult))
103+
);
104+
105+
const state = createRulePipelineState({ rule: createRuleResponse() });
106+
107+
const error = await getStepError(step, state);
108+
109+
expect(error).toBeInstanceOf(Error);
110+
expect(getErrorSource(error!)).toBe(TaskErrorSource.USER);
111+
});
112+
113+
it('does not mark plain ES|QL errors as TaskErrorSource.USER', async () => {
114+
mockHelpersEsqlToArrowReader(
115+
mockEsClient,
116+
jest.fn().mockRejectedValue(new Error('ES query failed'))
117+
);
118+
119+
const state = createRulePipelineState({ rule: createRuleResponse() });
120+
121+
const error = await getStepError(step, state);
122+
123+
expect(error).toBeInstanceOf(Error);
124+
expect(getErrorSource(error!)).toBeUndefined();
125+
});
126+
94127
it('yields rows from query results', async () => {
95128
mockHelpersEsqlArrowBatches(mockEsClient, [
96129
{

x-pack/platform/plugins/shared/alerting_v2/server/lib/rule_executor/steps/execute_rule_query_step.ts

Lines changed: 28 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
*/
77

88
import { inject, injectable } from 'inversify';
9+
import { createTaskRunError, TaskErrorSource } from '@kbn/task-manager-plugin/server';
10+
import { isEsqlUserError } from '../../errors/esql_user_error';
911
import type { PipelineStateStream, RuleExecutionStep } from '../types';
1012
import { getQueryPayload } from '../get_query_payload';
1113
import {
@@ -60,28 +62,35 @@ export class ExecuteRuleQueryStep implements RuleExecutionStep {
6062
})}`,
6163
});
6264

63-
const esqlRowBatchStream = step.queryService.executeQueryStream({
64-
query: effectiveQuery,
65-
filter: queryPayload.filter,
66-
params: queryPayload.params,
67-
abortSignal: input.executionContext.signal,
68-
});
65+
try {
66+
const esqlRowBatchStream = step.queryService.executeQueryStream({
67+
query: effectiveQuery,
68+
filter: queryPayload.filter,
69+
params: queryPayload.params,
70+
abortSignal: input.executionContext.signal,
71+
});
6972

70-
let yielded = false;
73+
let yielded = false;
7174

72-
for await (const batch of esqlRowBatchStream) {
73-
yielded = true;
74-
yield {
75-
type: 'continue',
76-
state: { ...state, queryPayload, esqlRowBatch: batch },
77-
};
78-
}
75+
for await (const batch of esqlRowBatchStream) {
76+
yielded = true;
77+
yield {
78+
type: 'continue',
79+
state: { ...state, queryPayload, esqlRowBatch: batch },
80+
};
81+
}
7982

80-
if (!yielded) {
81-
yield {
82-
type: 'continue',
83-
state: { ...state, queryPayload, esqlRowBatch: [] },
84-
};
83+
if (!yielded) {
84+
yield {
85+
type: 'continue',
86+
state: { ...state, queryPayload, esqlRowBatch: [] },
87+
};
88+
}
89+
} catch (error) {
90+
if (isEsqlUserError(error)) {
91+
throw createTaskRunError(error as Error, TaskErrorSource.USER);
92+
}
93+
throw error;
8594
}
8695
});
8796
}

x-pack/platform/plugins/shared/alerting_v2/server/lib/rule_executor/test_utils.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,19 @@
55
* 2.0.
66
*/
77

8+
import { collectStreamResults, createPipelineStream } from '../test_utils';
9+
import type { RuleExecutionStep, RulePipelineState } from './types';
10+
11+
export async function getStepError(
12+
step: RuleExecutionStep,
13+
state: RulePipelineState
14+
): Promise<Error | undefined> {
15+
try {
16+
await collectStreamResults(step.executeStream(createPipelineStream([state])));
17+
return undefined;
18+
} catch (error) {
19+
return error as Error;
20+
}
21+
}
22+
823
export * from '../test_utils';

0 commit comments

Comments
 (0)