@@ -4,12 +4,14 @@ import HttpError from '@/errors/http'
44import PartialStepError from '@/errors/partial-error'
55import StepError from '@/errors/step'
66import computeParameters from '@/helpers/compute-parameters'
7+ import { DEFAULT_JOB_OPTIONS } from '@/helpers/default-job-configuration'
78import globalVariable from '@/helpers/global-variable'
89import logger from '@/helpers/logger'
910import Execution from '@/models/execution'
1011import ExecutionStep from '@/models/execution-step'
1112import Flow from '@/models/flow'
1213import Step from '@/models/step'
14+ import { enqueueActionJob } from '@/queues/action'
1315
1416type ProcessActionOptions = {
1517 flowId : string
@@ -34,10 +36,21 @@ export const processAction = async (options: ProcessActionOptions) => {
3436 const flow = await Flow . query ( )
3537 . findById ( flowId )
3638 . withGraphJoined ( 'user' )
39+ . withGraphFetched ( 'steps' )
3740 . throwIfNotFound ( )
3841 const execution = await Execution . query ( )
3942 . findById ( executionId )
4043 . throwIfNotFound ( )
44+ const hasLoop = flow . steps . filter ( ( step ) => step . key === 'forEach' )
45+
46+ let isInsideLoop = false
47+ let loopIndex = - 1
48+ if ( hasLoop . length > 0 ) {
49+ loopIndex = hasLoop [ 0 ] . position
50+ if ( step . position > hasLoop [ 0 ] . position ) {
51+ isInsideLoop = true
52+ }
53+ }
4154
4255 const $ = await globalVariable ( {
4356 flow,
@@ -60,6 +73,16 @@ export const processAction = async (options: ProcessActionOptions) => {
6073 $ . step . parameters ,
6174 priorExecutionSteps ,
6275 actionCommand . preprocessVariable ,
76+ testRun ,
77+ {
78+ isInsideLoop,
79+ loopIndex,
80+ stepPositions : flow . steps . reduce ( ( acc , step ) => {
81+ acc [ step . id ] = step . position
82+ return acc
83+ } , { } as Record < string , number > ) ,
84+ } ,
85+ metadata ,
6386 )
6487
6588 $ . step . parameters = computedParameters
@@ -116,7 +139,9 @@ export const processAction = async (options: ProcessActionOptions) => {
116139 dataOut : $ . actionOutput . data ?. raw ?? null ,
117140 errorDetails : $ . actionOutput . error ?? null ,
118141 appKey : $ . app . key ,
142+ key : step . key ,
119143 jobId,
144+ metadata,
120145 } )
121146
122147 let nextStep = null
@@ -127,6 +152,48 @@ export const processAction = async (options: ProcessActionOptions) => {
127152 . findById ( runResult . nextStep . stepId )
128153 . throwIfNotFound ( )
129154 break
155+
156+ case 'start-loop' : {
157+ nextStep = null // nothing to do, we enqueue the next step here
158+ const dataOut = $ . actionOutput . data ?. raw ?? null
159+ const iterations = Math . min ( 500 , Number ( dataOut . iterations ?? 0 ) )
160+
161+ if ( ! iterations || iterations === 0 ) {
162+ break
163+ }
164+
165+ const firstStepInLoop = await step . getNextStep ( )
166+
167+ // does nothing for test run, do not enqueue any jobs
168+ if ( ! testRun ) {
169+ if ( Number . isInteger ( iterations ) ) {
170+ const enqueuePromises = [ ]
171+ for ( let i = 0 ; i < Number ( iterations ) ; i ++ ) {
172+ const enqueuePromise = enqueueActionJob ( {
173+ appKey : firstStepInLoop . appKey ,
174+ jobName : `${ executionId } -${ firstStepInLoop . id } -${ i + 1 } ` ,
175+ jobData : {
176+ flowId,
177+ executionId,
178+ stepId : firstStepInLoop . id ,
179+ metadata : {
180+ ...metadata ,
181+ loopIteration : i + 1 ,
182+ } ,
183+ } ,
184+ jobOptions : {
185+ ...DEFAULT_JOB_OPTIONS ,
186+ delay : i * 100 ,
187+ } ,
188+ } )
189+ enqueuePromises . push ( enqueuePromise )
190+ }
191+ await Promise . all ( enqueuePromises )
192+ }
193+ }
194+
195+ break
196+ }
130197 case 'stop-execution' :
131198 // Nothing to do, nextStep is already null.
132199 break
@@ -141,7 +208,10 @@ export const processAction = async (options: ProcessActionOptions) => {
141208 executionStep,
142209 computedParameters,
143210 nextStep,
144- nextStepMetadata : runResult . nextStepMetadata ,
211+ nextStepMetadata : {
212+ ...runResult . nextStepMetadata ,
213+ ...metadata ,
214+ } ,
145215 executionError,
146216 }
147217}
0 commit comments