Skip to content

Commit 183d47c

Browse files
committed
Incorporate feedback from alerting framework team
1 parent 7419c13 commit 183d47c

12 files changed

Lines changed: 436 additions & 330 deletions

File tree

x-pack/platform/plugins/shared/alerting/server/application/rule/methods/bulk_create/bulk_create_rules.test.ts

Lines changed: 238 additions & 134 deletions
Large diffs are not rendered by default.

x-pack/platform/plugins/shared/alerting/server/application/rule/methods/bulk_create/bulk_create_rules.ts

Lines changed: 104 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -5,66 +5,138 @@
55
* 2.0.
66
*/
77

8+
import Boom from '@hapi/boom';
89
import pMap from 'p-map';
910
import { withSpan } from '@kbn/apm-utils';
10-
import type { SavedObject, SavedObjectsBulkCreateObject } from '@kbn/core/server';
11+
import type { SavedObjectsBulkCreateObject } from '@kbn/core/server';
1112
import { SavedObjectsUtils } from '@kbn/core/server';
1213
import { RULE_SAVED_OBJECT_TYPE } from '../../../../saved_objects';
1314
import { getRuleCircuitBreakerErrorMessage } from '../../../../../common';
1415
import { updateMeta } from '../../../../rules_client/lib';
15-
import { API_KEY_GENERATE_CONCURRENCY } from '../../../../rules_client/common/constants';
16+
import {
17+
API_KEY_GENERATE_CONCURRENCY,
18+
DEFAULT_BULK_CREATE_BATCH_SIZE,
19+
MAX_BULK_CREATE_BATCH_SIZE,
20+
MAX_RULES_NUMBER_FOR_BULK_OPERATION,
21+
} from '../../../../rules_client/common/constants';
1622
import { ruleAuditEvent, RuleAuditAction } from '../../../../rules_client/common/audit_events';
17-
import { bulkEnableTasks } from '../bulk_enable_tasks';
1823
import { bulkCreateRulesSo } from '../../../../data/rule';
19-
import type { RawRule, SanitizedRule } from '../../../../types';
20-
import type { BulkOperationError, RulesClientContext } from '../../../../rules_client/types';
24+
import type { RawRule } from '../../../../types';
25+
import type { RulesClientContext } from '../../../../rules_client/types';
2126
import type { RuleParams } from '../../types';
2227
import { validateScheduleLimit } from '../get_schedule_frequency';
2328
import type {
2429
ApiKeyEntry,
25-
PreparedRule,
30+
BatchResult,
31+
BulkCreateOperationError,
32+
BulkCreateRulesItem,
2633
BulkCreateRulesParams,
2734
BulkCreateRulesResult,
35+
PreparedRule,
2836
} from './types';
2937
import {
3038
buildTaskInstance,
3139
collectNewKeysToInvalidate,
3240
demotePreparedRules,
3341
flushKeysToInvalidate,
3442
prepareRule,
35-
toSanitizedRule,
3643
} from './utils';
3744

3845
export async function bulkCreateRules<Params extends RuleParams = never>(
3946
context: RulesClientContext,
4047
params: BulkCreateRulesParams<Params>
41-
): Promise<BulkCreateRulesResult<Params>> {
42-
const { rules } = params;
48+
): Promise<BulkCreateRulesResult> {
49+
const { rules, exitEarlyOnError = false } = params;
4350
const { logger } = context;
4451
const total = rules.length;
4552

4653
if (total === 0) {
47-
return { rules: [], errors: [], total: 0, taskIdsFailedToBeEnabled: [] };
54+
return { successfulIds: [], errors: [], total: 0 };
55+
}
56+
57+
if (total > MAX_RULES_NUMBER_FOR_BULK_OPERATION) {
58+
throw Boom.badRequest(
59+
`bulkCreateRules: ${total} rules exceeds the hard limit of ${MAX_RULES_NUMBER_FOR_BULK_OPERATION}. ` +
60+
`Callers should enforce request-level limits before invoking this method.`
61+
);
62+
}
63+
64+
const requestedBatchSize = params.batchSize ?? DEFAULT_BULK_CREATE_BATCH_SIZE;
65+
const batchSize = Math.max(1, Math.min(MAX_BULK_CREATE_BATCH_SIZE, requestedBatchSize));
66+
67+
if (requestedBatchSize !== batchSize) {
68+
logger.warn(
69+
`bulkCreateRules: batchSize ${requestedBatchSize} clamped to ${batchSize} (hard cap ${MAX_BULK_CREATE_BATCH_SIZE}).`
70+
);
4871
}
4972

5073
const username = await context.getUserName();
5174
const actionsClient = await context.getActionsClient();
5275

53-
const inputsWithIds = rules.map((rule) => ({
76+
const successfulIds: string[] = [];
77+
const errors: BulkCreateOperationError[] = [];
78+
79+
const totalBatches = Math.ceil(total / batchSize);
80+
logger.debug(`bulkCreateRules: ${total} rules, ${totalBatches}x batches of ${batchSize} rules.`);
81+
82+
for (let batchIndex = 0; batchIndex < totalBatches; batchIndex++) {
83+
const start = batchIndex * batchSize;
84+
const batch = rules.slice(start, start + batchSize);
85+
86+
const result = await runBatch<Params>({
87+
context,
88+
username,
89+
actionsClient,
90+
batch,
91+
});
92+
93+
successfulIds.push(...result.successfulIds);
94+
errors.push(...result.errors);
95+
96+
if (exitEarlyOnError && result.soFailureOccurred) {
97+
logger.warn(
98+
`bulkCreateRules: exiting early on SO failure at batch ${
99+
batchIndex + 1
100+
}/${totalBatches}. ` +
101+
`${successfulIds.length} rule(s) created, ${
102+
total - start - batch.length
103+
} rule(s) skipped.`
104+
);
105+
break;
106+
}
107+
}
108+
109+
return { successfulIds, errors, total };
110+
}
111+
112+
interface RunBatchArgs<Params extends RuleParams> {
113+
context: RulesClientContext;
114+
username: string | null;
115+
actionsClient: Awaited<ReturnType<RulesClientContext['getActionsClient']>>;
116+
batch: Array<BulkCreateRulesItem<Params>>;
117+
}
118+
119+
async function runBatch<Params extends RuleParams>({
120+
context,
121+
username,
122+
actionsClient,
123+
batch,
124+
}: RunBatchArgs<Params>): Promise<BatchResult> {
125+
const { logger } = context;
126+
127+
const inputsWithIds = batch.map((rule) => ({
54128
id: rule.options?.id ?? SavedObjectsUtils.generateId(),
55129
rule,
56130
}));
57131

58-
logger.debug(`Bulk creating batch of ${total} rules`);
59-
60132
// Phase 1: per-rule prepare (validation + api key generation).
61133
// NOTE: in order to minimise external calls, the values below get mutated
62134
// at different stages in the process (ie if we fail schedule creation),
63135
// so that we invalidate keys and create rule SOs in a single batch at the end.
64136
const preparedRules = new Map<string, PreparedRule>();
65137
const keysToInvalidate = new Set<string>();
66138
const apiKeysMap = new Map<string, ApiKeyEntry>();
67-
const errors: BulkOperationError[] = [];
139+
const errors: BulkCreateOperationError[] = [];
68140
const authzCache = new Map<string, Promise<void>>();
69141

70142
await pMap(
@@ -86,10 +158,10 @@ export async function bulkCreateRules<Params extends RuleParams = never>(
86158
{ concurrency: API_KEY_GENERATE_CONCURRENCY }
87159
);
88160

89-
// No survivors? Flush out any keys created and return
161+
// No survivors? Flush any keys created and return.
90162
if (preparedRules.size === 0) {
91163
await flushKeysToInvalidate(keysToInvalidate, context);
92-
return { rules: [], errors, total, taskIdsFailedToBeEnabled: [] };
164+
return { successfulIds: [], errors, soFailureOccurred: false };
93165
}
94166

95167
// Phase 2: validate schedule-limits, enabled subset only.
@@ -211,10 +283,9 @@ export async function bulkCreateRules<Params extends RuleParams = never>(
211283
})
212284
);
213285
} catch (error) {
214-
// Whole-call SO failure (auth, timeout, etc): invalidate every newly-minted
215-
// key, best-effort orphan-task cleanup, then rethrow.
286+
// Whole-call SO failure: invalidate keys, best-effort task cleanup.
287+
// Surface as batch-wide SO failure so exitEarlyOnError can honour it.
216288
for (const k of collectNewKeysToInvalidate(apiKeysMap.values())) keysToInvalidate.add(k);
217-
await flushKeysToInvalidate(keysToInvalidate, context);
218289
if (newlyScheduledTaskIds.size > 0) {
219290
try {
220291
await context.taskManager.bulkRemove([...newlyScheduledTaskIds]);
@@ -226,16 +297,23 @@ export async function bulkCreateRules<Params extends RuleParams = never>(
226297
);
227298
}
228299
}
229-
throw error;
300+
await flushKeysToInvalidate(keysToInvalidate, context);
301+
errors.push({
302+
message: `Failed to bulk create rule saved objects: ${error.message}`,
303+
status: error.output?.statusCode,
304+
rule: { id: 'n/a', name: 'n/a' },
305+
});
306+
return { successfulIds: [], errors, soFailureOccurred: true };
230307
}
231308

232309
// Phase 4 per-row outcomes.
233-
const successfulSos: Array<SavedObject<RawRule>> = [];
234-
const taskIdsToEnable: string[] = [];
310+
const batchSuccessfulIds: string[] = [];
235311
const taskIdsToCleanUp: string[] = [];
312+
let perRowFailureOccurred = false;
236313

237314
for (const so of bulkResponse.saved_objects) {
238315
if (so.error) {
316+
perRowFailureOccurred = true;
239317
errors.push({
240318
message: so.error.message ?? 'n/a',
241319
status: so.error.statusCode,
@@ -253,9 +331,8 @@ export async function bulkCreateRules<Params extends RuleParams = never>(
253331
taskIdsToCleanUp.push(so.id);
254332
}
255333
} else {
256-
successfulSos.push(so as SavedObject<RawRule>);
334+
batchSuccessfulIds.push(so.id);
257335
if (newlyScheduledTaskIds.has(so.id)) {
258-
taskIdsToEnable.push(so.id);
259336
// Audit per-rule ENABLE for the enabled subset (mirrors single-rule semantics).
260337
context.auditLogger?.log(
261338
ruleAuditEvent({
@@ -286,23 +363,12 @@ export async function bulkCreateRules<Params extends RuleParams = never>(
286363
}
287364
}
288365

289-
// Phase 5: enable scheduled tasks. If skipTaskEnabling, caller enables them later.
290-
const taskIdsFailedToBeEnabled = params.skipTaskEnabling
291-
? [...taskIdsToEnable]
292-
: (await bulkEnableTasks(context, { taskIds: taskIdsToEnable })).taskIdsFailedToBeEnabled;
293-
294-
// Single end-of-function flush for all collected key invalidations.
366+
// Single per-batch flush for all collected key invalidations.
295367
await flushKeysToInvalidate(keysToInvalidate, context);
296368

297-
// Phase 6: domain transform + return.
298-
const sanitizedRules: Array<SanitizedRule<Params>> = successfulSos.map((so) =>
299-
toSanitizedRule<Params>(context, so, context.ruleTypeRegistry)
300-
);
301-
302369
return {
303-
rules: sanitizedRules,
370+
successfulIds: batchSuccessfulIds,
304371
errors,
305-
total,
306-
taskIdsFailedToBeEnabled,
372+
soFailureOccurred: perRowFailureOccurred,
307373
};
308374
}

x-pack/platform/plugins/shared/alerting/server/application/rule/methods/bulk_create/types.ts

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import type { RuleParams } from '../../types';
1111
import type { CreateRuleData } from '../create/types';
1212
import type { CreateRuleOptions } from '../create/create_rule';
1313
import type { BulkOperationError, RulesClientContext } from '../../../../rules_client/types';
14-
import type { RawRule, SanitizedRule } from '../../../../types';
14+
import type { RawRule } from '../../../../types';
1515

1616
export interface PreparedRule {
1717
id: string;
@@ -49,8 +49,10 @@ export interface BulkCreateRulesItem<Params extends RuleParams = never> {
4949

5050
export interface BulkCreateRulesParams<Params extends RuleParams = never> {
5151
rules: Array<BulkCreateRulesItem<Params>>;
52-
// If true, skip Phase 5 (taskManager.bulkEnable);
53-
skipTaskEnabling?: boolean;
52+
/** Per-batch size; clamped to [1, MAX_BULK_CREATE_BATCH_SIZE], defaults to DEFAULT_BULK_CREATE_BATCH_SIZE. Total is bounded by MAX_RULES_NUMBER_FOR_BULK_OPERATION (callers should enforce request-level limits). */
53+
batchSize?: number;
54+
/** If true, stop further batches on SO-level failure (whole-call throw or any per-row SO error). Phase 1/2/3 demotions never halt the loop. Defaults to false. */
55+
exitEarlyOnError?: boolean;
5456
}
5557

5658
export type BulkCreateDisabledReason =
@@ -63,9 +65,16 @@ export interface BulkCreateOperationError extends BulkOperationError {
6365
disabledReason?: BulkCreateDisabledReason;
6466
}
6567

66-
export interface BulkCreateRulesResult<Params extends RuleParams = never> {
67-
rules: Array<SanitizedRule<Params>>;
68+
export interface BulkCreateRulesResult {
69+
/** IDs of rules whose SO was successfully persisted. */
70+
successfulIds: string[];
6871
errors: BulkCreateOperationError[];
6972
total: number;
70-
taskIdsFailedToBeEnabled: string[];
73+
}
74+
75+
export interface BatchResult {
76+
successfulIds: string[];
77+
errors: BulkCreateOperationError[];
78+
/** True if SO bulkCreate threw or any per-row SO error was returned; drives the exitEarlyOnError short-circuit. */
79+
soFailureOccurred: boolean;
7180
}

x-pack/platform/plugins/shared/alerting/server/application/rule/methods/bulk_create/utils.ts

Lines changed: 22 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
import Boom from '@hapi/boom';
99
import Semver from 'semver';
1010
import { i18n } from '@kbn/i18n';
11-
import type { SavedObject } from '@kbn/core/server';
1211
import type { TaskInstanceWithDeprecatedFields } from '@kbn/task-manager-plugin/server/task';
1312

1413
import { validateAndAuthorizeSystemActions } from '../../../../lib/validate_authorize_system_actions';
@@ -32,17 +31,12 @@ import {
3231
apiKeyAsAlertAttributes,
3332
apiKeyAsRuleDomainProperties,
3433
} from '../../../../rules_client/common';
34+
// import { BULK_TM_SCHEDULE_DELAY } from '../../../../rules_client/common/constants';
3535
import { ruleAuditEvent, RuleAuditAction } from '../../../../rules_client/common/audit_events';
3636
import { bulkMarkApiKeysForInvalidation } from '../../../../invalidate_pending_api_keys/bulk_mark_api_keys_for_invalidation';
37-
import type { RawRule, RuleTypeRegistry, SanitizedRule } from '../../../../types';
3837
import type { BulkOperationError, RulesClientContext } from '../../../../rules_client/types';
3938
import type { RuleDomain, RuleParams } from '../../types';
40-
import {
41-
transformRuleAttributesToRuleDomain,
42-
transformRuleDomainToRule,
43-
transformRuleDomainToRuleAttributes,
44-
} from '../../transforms';
45-
import { ruleDomainSchema } from '../../schemas';
39+
import { transformRuleDomainToRuleAttributes } from '../../transforms';
4640
import { createRuleDataSchema } from '../create/schemas';
4741
import type {
4842
PreparedRule,
@@ -67,55 +61,30 @@ export const collectNewKeysToInvalidate = (entries: Iterable<ApiKeyEntry>): stri
6761
return keys;
6862
};
6963

64+
// Task is scheduled `enabled: true` in the future.
7065
export const buildTaskInstance = (
7166
context: RulesClientContext,
7267
prepared: PreparedRule
73-
): TaskInstanceWithDeprecatedFields => ({
74-
id: prepared.id,
75-
taskType: `alerting:${prepared.ruleTypeId}`,
76-
schedule: prepared.schedule,
77-
params: {
78-
alertId: prepared.id,
79-
spaceId: context.spaceId,
80-
consumer: prepared.consumer,
81-
},
82-
state: {
83-
previousStartedAt: null,
84-
alertTypeState: {},
85-
alertInstances: {},
86-
},
87-
scope: ['alerting'],
88-
// Tasks are scheduled disabled. Phase 5 enables them via taskManager.bulkEnable
89-
// so per-task validation drops never produce a running task without a rule SO,
90-
// and the activation can randomise schedule datetimes across the batch.
91-
enabled: false,
92-
});
93-
94-
export const toSanitizedRule = <Params extends RuleParams = never>(
95-
context: RulesClientContext,
96-
so: SavedObject<RawRule>,
97-
ruleTypeRegistry: RuleTypeRegistry
98-
): SanitizedRule<Params> => {
99-
const ruleType = ruleTypeRegistry.get(so.attributes.alertTypeId);
100-
const ruleDomain: RuleDomain<Params> = transformRuleAttributesToRuleDomain<Params>(
101-
so.attributes,
102-
{
103-
id: so.id,
104-
logger: context.logger,
105-
ruleType,
106-
references: so.references,
107-
omitGeneratedValues: false,
68+
): TaskInstanceWithDeprecatedFields => {
69+
return {
70+
id: prepared.id,
71+
taskType: `alerting:${prepared.ruleTypeId}`,
72+
schedule: prepared.schedule,
73+
params: {
74+
alertId: prepared.id,
75+
spaceId: context.spaceId,
76+
consumer: prepared.consumer,
10877
},
109-
context.isSystemAction
110-
);
111-
112-
try {
113-
ruleDomainSchema.validate(ruleDomain);
114-
} catch (e) {
115-
context.logger.warn(`Error validating bulk-created rule domain object for id: ${so.id}, ${e}`);
116-
}
117-
118-
return transformRuleDomainToRule<Params>(ruleDomain, { isPublic: true }) as SanitizedRule<Params>;
78+
state: {
79+
previousStartedAt: null,
80+
alertTypeState: {},
81+
alertInstances: {},
82+
},
83+
scope: ['alerting'],
84+
enabled: true,
85+
runAt: new Date(),
86+
scheduledAt: new Date(),
87+
};
11988
};
12089

12190
export const prepareRule = async <Params extends RuleParams>({

0 commit comments

Comments
 (0)