-
Notifications
You must be signed in to change notification settings - Fork 4.3k
Enqueue a new job every 20 step executions #15068
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Greptile Overview
Greptile Summary
This PR implements a workflow execution optimization that prevents large workflows from blocking workers by enqueueing new jobs every 20 step executions. The core change introduces job chunking where workflows are split into smaller execution units when they exceed 20 steps, allowing other jobs to be processed and maintaining system responsiveness.
The implementation adds an executedStepsCount
parameter to track workflow progress, integrates message queue functionality for job management, and introduces a MAX_EXECUTED_STEPS_COUNT
constant set to 20. When this limit is exceeded, the system enqueues a new job with the lastExecutedStepId
, enabling seamless workflow resumption while preventing worker starvation.
Additionally, the PR includes code cleanup for failure handling by extracting the logic to mark running steps as failed when workflows fail into a dedicated markRunningStepsAsFailed
method. This refactoring improves code organization and makes the failure handling more explicit and targeted.
The changes also extract workflow step execution logic into reusable utility functions (shouldExecuteChildStep
), following DRY principles and improving maintainability. Comprehensive tests validate the step execution logic with various parent-child step scenarios.
Important Files Changed
Changed Files
Filename | Score | Overview |
---|---|---|
packages/twenty-server/src/modules/workflow/workflow-executor/workspace-services/workflow-executor.workspace-service.ts |
4/5 | Implements core job chunking logic with MAX_EXECUTED_STEPS_COUNT limit and new job enqueueing |
packages/twenty-server/src/modules/workflow/workflow-runner/jobs/run-workflow.job.ts |
4/5 | Refactors workflow resumption logic to use service method for determining next steps to execute |
packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service.ts |
4/5 | Extracts step failure marking logic into dedicated method and optimizes updates for failed workflows only |
packages/twenty-server/src/modules/workflow/workflow-executor/workspace-services/__tests__/workflow-executor.workspace-service.spec.ts |
4/5 | Adds comprehensive tests for job queueing behavior when step execution limit is exceeded |
packages/twenty-server/src/modules/workflow/workflow-executor/utils/should-execute-child-step.util.ts |
4/5 | New utility function for determining child step execution based on parent step completion status |
packages/twenty-server/src/modules/workflow/workflow-executor/utils/__tests__/should-execute-child-step.util.spec.ts |
5/5 | Comprehensive test suite covering all workflow step execution scenarios including edge cases |
packages/twenty-server/src/modules/workflow/workflow-executor/utils/should-execute-step.util.ts |
5/5 | Refactors to use extracted shouldExecuteChildStep utility, reducing code duplication |
packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/iterator/utils/should-execute-iterator-step.util.ts |
4/5 | Consolidates duplicate logic by delegating to shouldExecuteChildStep utility function |
packages/twenty-server/src/modules/workflow/workflow-runner/workspace-services/workflow-runner.workspace-service.ts |
5/5 | Changes enqueueWorkflowRun method visibility from private to public for external access |
packages/twenty-server/src/modules/workflow/workflow-executor/workflow-executor.module.ts |
5/5 | Adds WorkflowRunQueueModule import to enable queue functionality for job management |
packages/twenty-server/src/modules/workflow/workflow-executor/types/workflow-executor-input.ts |
5/5 | Adds executedStepsCount optional field to track workflow execution progress |
packages/twenty-server/src/modules/workflow/workflow-runner/types/run-workflow-job-data.type.ts |
5/5 | New type definition for workflow job data structure supporting job chunking |
packages/twenty-server/src/modules/workflow/workflow-runner/constants/run-workflow-job-name.ts |
5/5 | New constant for centralizing workflow job name across the codebase |
Confidence score: 4/5
- This PR implements a well-designed solution to prevent large workflows from blocking workers with reasonable chunking logic
- Score reflects the complexity of workflow execution changes and potential edge cases in distributed job processing that may not be fully covered by tests
- Pay close attention to the workflow executor service changes and job resumption logic, especially around step counting accuracy and job enqueueing timing
Sequence Diagram
sequenceDiagram
participant User
participant WorkflowRunner as WorkflowRunnerWorkspaceService
participant Queue as MessageQueue
participant Job as RunWorkflowJob
participant Executor as WorkflowExecutorWorkspaceService
participant WorkflowRun as WorkflowRunWorkspaceService
User->>WorkflowRunner: "run workflow"
WorkflowRunner->>Queue: "add RunWorkflowJob"
WorkflowRunner->>WorkflowRun: "createWorkflowRun"
Queue->>Job: "handle job"
Job->>Job: "startWorkflowExecution"
Job->>WorkflowRun: "startWorkflowRun"
Job->>Executor: "executeFromSteps"
loop For each step (max 20)
Executor->>Executor: "executeFromStep"
Executor->>WorkflowRun: "updateWorkflowRunStepInfo (RUNNING)"
Executor->>Executor: "executeStep"
Executor->>WorkflowRun: "updateWorkflowRunStepInfo (SUCCESS/FAILED)"
alt executedStepsCount > 20
Executor->>Queue: "add new RunWorkflowJob with lastExecutedStepId"
Executor->>Executor: "return (stop current job)"
else continue in same job
Executor->>Executor: "getNextStepIdsToExecute"
Executor->>Executor: "executeFromSteps (recursive)"
end
end
alt New job queued
Queue->>Job: "handle resumed job"
Job->>Job: "resumeWorkflowExecution"
Job->>Executor: "getNextStepIdsToExecute"
Job->>Executor: "executeFromSteps"
else Workflow completed
Executor->>WorkflowRun: "endWorkflowRun (COMPLETED)"
end
alt Workflow failed
WorkflowRun->>WorkflowRun: "markRunningStepsAsFailed"
WorkflowRun->>WorkflowRun: "endWorkflowRun (FAILED)"
end
Context used (4)
- Rule from
dashboard
- Extract utility functions like 'includesExpectedScopes' to utils modules instead of keeping them in ... (source) - Context from
dashboard
- Always consider adding tests for new functionality, especially for edge cases like empty responses. (source) - Context from
dashboard
- Follow the convention of having only one utility function per exported file in the project. (source) - Context from
dashboard
- Check that mocked functions in tests are called exactly once with the expected arguments. (source)
13 files reviewed, 4 comments
...twenty-server/src/modules/workflow/workflow-executor/utils/should-execute-child-step.util.ts
Show resolved
Hide resolved
...w/workflow-executor/workspace-services/__tests__/workflow-executor.workspace-service.spec.ts
Show resolved
Hide resolved
...modules/workflow/workflow-executor/workspace-services/workflow-executor.workspace-service.ts
Show resolved
Hide resolved
packages/twenty-server/src/modules/workflow/workflow-runner/jobs/run-workflow.job.ts
Show resolved
Hide resolved
const lastExecutedStepResult = | ||
workflowRun.state?.stepInfos[lastExecutedStepId]?.result; | ||
|
||
const nextStepIdsToExecute = | ||
await this.workflowExecutorWorkspaceService.getNextStepIdsToExecute({ | ||
executedStep: lastExecutedStep, | ||
executedStepResult: lastExecutedStepResult, | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential bug: When resuming a workflow, lastExecutedStepResult
is passed to getNextStepIdsToExecute
with an incorrect shape, causing a type error for iterator steps.
-
Description: When a workflow run is resumed after 20 steps, the code retrieves
lastExecutedStepResult
fromworkflowRun.state.stepInfos[lastExecutedStepId].result
. This variable contains only the raw result data, not the expectedWorkflowActionOutput
object. This incorrect object is then passed togetNextStepIdsToExecute
. If the last executed step was an iterator, this function attempts to accessexecutedStepResult.result.hasProcessedAllItems
. SinceexecutedStepResult
is the result data itself,executedStepResult.result
isundefined
, leading to aTypeError
. This causes the resumed workflow job to fail. -
Suggested fix: In
run-workflow.job.ts
, when callinggetNextStepIdsToExecute
for a resumed workflow, wraplastExecutedStepResult
in an object to match the expectedWorkflowActionOutput
type. The call should bethis.workflowExecutorWorkspaceService.getNextStepIdsToExecute({ executedStep: lastExecutedStep, executedStepResult: { result: lastExecutedStepResult } })
.
severity: 0.85, confidence: 1.0
Did we get this right? 👍 / 👎 to inform future reviews.
🚀 Preview Environment Ready! Your preview environment is available at: http://bore.pub:46128 This environment will automatically shut down when the PR is closed or after 5 hours. |
39de292
to
7905881
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am thinking about the issue, but in fact, I don't see the point of stopping a workflow that runs fast even if it has more than 20 steps. In the contrary, a 3 steps workflow can lasts a lot of time and black the workers.
I feel that a workflow execution timeout would provide a better control on worker ressource allocation
What do you think?
) {} | ||
|
||
@Process(RunWorkflowJob.name) | ||
@Process(RUN_WORKFLOW_JOB_NAME) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why? we do everywhere else @Process(<JobClass>.name)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Circular import?
To avoid huge workflows to block the worker, we will enqueue a new job every 20 steps.
This could be more than 20 if there are branches but I think this is fine, the goal is only to have a limit set.
Also cleaning a bit the code to mark running steps as failed when workflow fails.
I tested it on a huge workflow:
Enregistrement.de.l.ecran.2025-10-13.a.16.43.35.mov