55 * 2.0.
66 */
77
8+ import Boom from '@hapi/boom' ;
89import pMap from 'p-map' ;
910import { withSpan } from '@kbn/apm-utils' ;
10- import type { SavedObject , SavedObjectsBulkCreateObject } from '@kbn/core/server' ;
11+ import type { SavedObjectsBulkCreateObject } from '@kbn/core/server' ;
1112import { SavedObjectsUtils } from '@kbn/core/server' ;
1213import { RULE_SAVED_OBJECT_TYPE } from '../../../../saved_objects' ;
1314import { getRuleCircuitBreakerErrorMessage } from '../../../../../common' ;
1415import { updateMeta } from '../../../../rules_client/lib' ;
15- import { API_KEY_GENERATE_CONCURRENCY } from '../../../../rules_client/common/constants' ;
16+ import {
17+ API_KEY_GENERATE_CONCURRENCY ,
18+ DEFAULT_BULK_CREATE_BATCH_SIZE ,
19+ MAX_BULK_CREATE_BATCH_SIZE ,
20+ MAX_RULES_NUMBER_FOR_BULK_OPERATION ,
21+ } from '../../../../rules_client/common/constants' ;
1622import { ruleAuditEvent , RuleAuditAction } from '../../../../rules_client/common/audit_events' ;
17- import { bulkEnableTasks } from '../bulk_enable_tasks' ;
1823import { bulkCreateRulesSo } from '../../../../data/rule' ;
19- import type { RawRule , SanitizedRule } from '../../../../types' ;
20- import type { BulkOperationError , RulesClientContext } from '../../../../rules_client/types' ;
24+ import type { RawRule } from '../../../../types' ;
25+ import type { RulesClientContext } from '../../../../rules_client/types' ;
2126import type { RuleParams } from '../../types' ;
2227import { validateScheduleLimit } from '../get_schedule_frequency' ;
2328import type {
2429 ApiKeyEntry ,
25- PreparedRule ,
30+ BatchResult ,
31+ BulkCreateOperationError ,
32+ BulkCreateRulesItem ,
2633 BulkCreateRulesParams ,
2734 BulkCreateRulesResult ,
35+ PreparedRule ,
2836} from './types' ;
2937import {
3038 buildTaskInstance ,
3139 collectNewKeysToInvalidate ,
3240 demotePreparedRules ,
3341 flushKeysToInvalidate ,
3442 prepareRule ,
35- toSanitizedRule ,
3643} from './utils' ;
3744
3845export async function bulkCreateRules < Params extends RuleParams = never > (
3946 context : RulesClientContext ,
4047 params : BulkCreateRulesParams < Params >
41- ) : Promise < BulkCreateRulesResult < Params > > {
42- const { rules } = params ;
48+ ) : Promise < BulkCreateRulesResult > {
49+ const { rules, exitEarlyOnError = false } = params ;
4350 const { logger } = context ;
4451 const total = rules . length ;
4552
4653 if ( total === 0 ) {
47- return { rules : [ ] , errors : [ ] , total : 0 , taskIdsFailedToBeEnabled : [ ] } ;
54+ return { successfulIds : [ ] , errors : [ ] , total : 0 } ;
55+ }
56+
57+ if ( total > MAX_RULES_NUMBER_FOR_BULK_OPERATION ) {
58+ throw Boom . badRequest (
59+ `bulkCreateRules: ${ total } rules exceeds the hard limit of ${ MAX_RULES_NUMBER_FOR_BULK_OPERATION } . ` +
60+ `Callers should enforce request-level limits before invoking this method.`
61+ ) ;
62+ }
63+
64+ const requestedBatchSize = params . batchSize ?? DEFAULT_BULK_CREATE_BATCH_SIZE ;
65+ const batchSize = Math . max ( 1 , Math . min ( MAX_BULK_CREATE_BATCH_SIZE , requestedBatchSize ) ) ;
66+
67+ if ( requestedBatchSize !== batchSize ) {
68+ logger . warn (
69+ `bulkCreateRules: batchSize ${ requestedBatchSize } clamped to ${ batchSize } (hard cap ${ MAX_BULK_CREATE_BATCH_SIZE } ).`
70+ ) ;
4871 }
4972
5073 const username = await context . getUserName ( ) ;
5174 const actionsClient = await context . getActionsClient ( ) ;
5275
53- const inputsWithIds = rules . map ( ( rule ) => ( {
76+ const successfulIds : string [ ] = [ ] ;
77+ const errors : BulkCreateOperationError [ ] = [ ] ;
78+
79+ const totalBatches = Math . ceil ( total / batchSize ) ;
80+ logger . debug ( `bulkCreateRules: ${ total } rules, ${ totalBatches } x batches of ${ batchSize } rules.` ) ;
81+
82+ for ( let batchIndex = 0 ; batchIndex < totalBatches ; batchIndex ++ ) {
83+ const start = batchIndex * batchSize ;
84+ const batch = rules . slice ( start , start + batchSize ) ;
85+
86+ const result = await runBatch < Params > ( {
87+ context,
88+ username,
89+ actionsClient,
90+ batch,
91+ } ) ;
92+
93+ successfulIds . push ( ...result . successfulIds ) ;
94+ errors . push ( ...result . errors ) ;
95+
96+ if ( exitEarlyOnError && result . soFailureOccurred ) {
97+ logger . warn (
98+ `bulkCreateRules: exiting early on SO failure at batch ${
99+ batchIndex + 1
100+ } /${ totalBatches } . ` +
101+ `${ successfulIds . length } rule(s) created, ${
102+ total - start - batch . length
103+ } rule(s) skipped.`
104+ ) ;
105+ break ;
106+ }
107+ }
108+
109+ return { successfulIds, errors, total } ;
110+ }
111+
112+ interface RunBatchArgs < Params extends RuleParams > {
113+ context : RulesClientContext ;
114+ username : string | null ;
115+ actionsClient : Awaited < ReturnType < RulesClientContext [ 'getActionsClient' ] > > ;
116+ batch : Array < BulkCreateRulesItem < Params > > ;
117+ }
118+
119+ async function runBatch < Params extends RuleParams > ( {
120+ context,
121+ username,
122+ actionsClient,
123+ batch,
124+ } : RunBatchArgs < Params > ) : Promise < BatchResult > {
125+ const { logger } = context ;
126+
127+ const inputsWithIds = batch . map ( ( rule ) => ( {
54128 id : rule . options ?. id ?? SavedObjectsUtils . generateId ( ) ,
55129 rule,
56130 } ) ) ;
57131
58- logger . debug ( `Bulk creating batch of ${ total } rules` ) ;
59-
60132 // Phase 1: per-rule prepare (validation + api key generation).
61133 // NOTE: in order to minimise external calls, the values below get mutated
62134 // at different stages in the process (ie if we fail schedule creation),
63135 // so that we invalidate keys and create rule SOs in a single batch at the end.
64136 const preparedRules = new Map < string , PreparedRule > ( ) ;
65137 const keysToInvalidate = new Set < string > ( ) ;
66138 const apiKeysMap = new Map < string , ApiKeyEntry > ( ) ;
67- const errors : BulkOperationError [ ] = [ ] ;
139+ const errors : BulkCreateOperationError [ ] = [ ] ;
68140 const authzCache = new Map < string , Promise < void > > ( ) ;
69141
70142 await pMap (
@@ -86,10 +158,10 @@ export async function bulkCreateRules<Params extends RuleParams = never>(
86158 { concurrency : API_KEY_GENERATE_CONCURRENCY }
87159 ) ;
88160
89- // No survivors? Flush out any keys created and return
161+ // No survivors? Flush any keys created and return.
90162 if ( preparedRules . size === 0 ) {
91163 await flushKeysToInvalidate ( keysToInvalidate , context ) ;
92- return { rules : [ ] , errors, total , taskIdsFailedToBeEnabled : [ ] } ;
164+ return { successfulIds : [ ] , errors, soFailureOccurred : false } ;
93165 }
94166
95167 // Phase 2: validate schedule-limits, enabled subset only.
@@ -211,10 +283,9 @@ export async function bulkCreateRules<Params extends RuleParams = never>(
211283 } )
212284 ) ;
213285 } catch ( error ) {
214- // Whole-call SO failure (auth, timeout, etc) : invalidate every newly-minted
215- // key, best-effort orphan-task cleanup, then rethrow .
286+ // Whole-call SO failure: invalidate keys, best-effort task cleanup.
287+ // Surface as batch-wide SO failure so exitEarlyOnError can honour it .
216288 for ( const k of collectNewKeysToInvalidate ( apiKeysMap . values ( ) ) ) keysToInvalidate . add ( k ) ;
217- await flushKeysToInvalidate ( keysToInvalidate , context ) ;
218289 if ( newlyScheduledTaskIds . size > 0 ) {
219290 try {
220291 await context . taskManager . bulkRemove ( [ ...newlyScheduledTaskIds ] ) ;
@@ -226,16 +297,23 @@ export async function bulkCreateRules<Params extends RuleParams = never>(
226297 ) ;
227298 }
228299 }
229- throw error ;
300+ await flushKeysToInvalidate ( keysToInvalidate , context ) ;
301+ errors . push ( {
302+ message : `Failed to bulk create rule saved objects: ${ error . message } ` ,
303+ status : error . output ?. statusCode ,
304+ rule : { id : 'n/a' , name : 'n/a' } ,
305+ } ) ;
306+ return { successfulIds : [ ] , errors, soFailureOccurred : true } ;
230307 }
231308
232309 // Phase 4 per-row outcomes.
233- const successfulSos : Array < SavedObject < RawRule > > = [ ] ;
234- const taskIdsToEnable : string [ ] = [ ] ;
310+ const batchSuccessfulIds : string [ ] = [ ] ;
235311 const taskIdsToCleanUp : string [ ] = [ ] ;
312+ let perRowFailureOccurred = false ;
236313
237314 for ( const so of bulkResponse . saved_objects ) {
238315 if ( so . error ) {
316+ perRowFailureOccurred = true ;
239317 errors . push ( {
240318 message : so . error . message ?? 'n/a' ,
241319 status : so . error . statusCode ,
@@ -253,9 +331,8 @@ export async function bulkCreateRules<Params extends RuleParams = never>(
253331 taskIdsToCleanUp . push ( so . id ) ;
254332 }
255333 } else {
256- successfulSos . push ( so as SavedObject < RawRule > ) ;
334+ batchSuccessfulIds . push ( so . id ) ;
257335 if ( newlyScheduledTaskIds . has ( so . id ) ) {
258- taskIdsToEnable . push ( so . id ) ;
259336 // Audit per-rule ENABLE for the enabled subset (mirrors single-rule semantics).
260337 context . auditLogger ?. log (
261338 ruleAuditEvent ( {
@@ -286,23 +363,12 @@ export async function bulkCreateRules<Params extends RuleParams = never>(
286363 }
287364 }
288365
289- // Phase 5: enable scheduled tasks. If skipTaskEnabling, caller enables them later.
290- const taskIdsFailedToBeEnabled = params . skipTaskEnabling
291- ? [ ...taskIdsToEnable ]
292- : ( await bulkEnableTasks ( context , { taskIds : taskIdsToEnable } ) ) . taskIdsFailedToBeEnabled ;
293-
294- // Single end-of-function flush for all collected key invalidations.
366+ // Single per-batch flush for all collected key invalidations.
295367 await flushKeysToInvalidate ( keysToInvalidate , context ) ;
296368
297- // Phase 6: domain transform + return.
298- const sanitizedRules : Array < SanitizedRule < Params > > = successfulSos . map ( ( so ) =>
299- toSanitizedRule < Params > ( context , so , context . ruleTypeRegistry )
300- ) ;
301-
302369 return {
303- rules : sanitizedRules ,
370+ successfulIds : batchSuccessfulIds ,
304371 errors,
305- total,
306- taskIdsFailedToBeEnabled,
372+ soFailureOccurred : perRowFailureOccurred ,
307373 } ;
308374}
0 commit comments