diff --git a/packages/cli/src/webhooks/test-webhooks.ts b/packages/cli/src/webhooks/test-webhooks.ts index 56c20e78ab10b..4f340a88915aa 100644 --- a/packages/cli/src/webhooks/test-webhooks.ts +++ b/packages/cli/src/webhooks/test-webhooks.ts @@ -278,7 +278,7 @@ export class TestWebhooks implements IWebhookManager { runData?: IRunData; pushRef?: string; destinationNode?: IDestinationNode; - triggerToStartFrom?: WorkflowRequest.ManualRunPayload['triggerToStartFrom']; + triggerToStartFrom?: WorkflowRequest.FullManualExecutionFromKnownTriggerPayload['triggerToStartFrom']; }) { const { userId, diff --git a/packages/cli/src/workflow-runner.ts b/packages/cli/src/workflow-runner.ts index 38b1a6672a2d2..44377eebf0237 100644 --- a/packages/cli/src/workflow-runner.ts +++ b/packages/cli/src/workflow-runner.ts @@ -139,6 +139,46 @@ export class WorkflowRunner { restartExecutionId?: string, responsePromise?: IDeferredPromise, ): Promise { + const offloadingManualExecutionsInQueueMode = + this.executionsConfig.mode === 'queue' && + process.env.OFFLOAD_MANUAL_EXECUTIONS_TO_WORKERS === 'true'; + + /** + * Historically, manual executions in scaling mode ran in the main process, + * so some execution details were never persisted in the database. + * + * Currently, manual executions in scaling mode are offloaded to workers, + * so we persist all details to give workers full access to them. + */ + if (data.executionMode === 'manual' && offloadingManualExecutionsInQueueMode) { + console.log('offloadingManualExecutionsInQueueMode'); + data.executionData = createRunExecutionData({ + startData: { + startNodes: data.startNodes, + destinationNode: data.destinationNode, + }, + resultData: { + pinData: data.pinData, + // If `runData` is initialized to an empty object the execution will + // be treated like a partial manual execution instead of a full + // manual execution. + // So we have to set this to null to instruct + // `createRunExecutionData` to not initialize it. + runData: data.runData ?? null, + }, + manualData: { + userId: data.userId, + dirtyNodeNames: data.dirtyNodeNames, + triggerToStartFrom: data.triggerToStartFrom, + }, + // If `executionData` is initialized the execution will be treated like + // a resumed execution after waiting, instead of a manual execution. + // So we have to set this to null to instruct `createRunExecutionData` + // to not initialize it. + executionData: null, + }); + } + // Register a new execution const executionId = await this.activeExecutions.add(data, restartExecutionId); diff --git a/packages/cli/src/workflows/__tests__/workflow-execution.service.test.ts b/packages/cli/src/workflows/__tests__/workflow-execution.service.test.ts index dabf4c77d8d55..209877158464e 100644 --- a/packages/cli/src/workflows/__tests__/workflow-execution.service.test.ts +++ b/packages/cli/src/workflows/__tests__/workflow-execution.service.test.ts @@ -415,13 +415,13 @@ describe('WorkflowExecutionService', () => { }); it('should return `null` if no pindata', () => { - const node = workflowExecutionService.selectPinnedActivatorStarter(workflow, []); + const node = workflowExecutionService.selectPinnedTrigger(workflow, []); expect(node).toBeNull(); }); it('should return `null` if no starter nodes', () => { - const node = workflowExecutionService.selectPinnedActivatorStarter(workflow); + const node = workflowExecutionService.selectPinnedTrigger(workflow); expect(node).toBeNull(); }); @@ -429,7 +429,7 @@ describe('WorkflowExecutionService', () => { it('should select webhook node if only choice', () => { workflow.nodes.push(webhookNode); - const node = workflowExecutionService.selectPinnedActivatorStarter(workflow, [], pinData); + const node = workflowExecutionService.selectPinnedTrigger(workflow, [], pinData); expect(node).toEqual(webhookNode); }); @@ -437,7 +437,7 @@ describe('WorkflowExecutionService', () => { it('should return `null` if no choice', () => { workflow.nodes.push(hackerNewsNode); - const node = workflowExecutionService.selectPinnedActivatorStarter(workflow, [], pinData); + const node = workflowExecutionService.selectPinnedTrigger(workflow, [], pinData); expect(node).toBeNull(); }); @@ -445,7 +445,7 @@ describe('WorkflowExecutionService', () => { it('should return ignore Respond to Webhook', () => { workflow.nodes.push(respondToWebhookNode); - const node = workflowExecutionService.selectPinnedActivatorStarter(workflow, [], pinData); + const node = workflowExecutionService.selectPinnedTrigger(workflow, [], pinData); expect(node).toBeNull(); }); @@ -453,7 +453,7 @@ describe('WorkflowExecutionService', () => { it('should select execute workflow trigger if only choice', () => { workflow.nodes.push(executeWorkflowTriggerNode); - const node = workflowExecutionService.selectPinnedActivatorStarter(workflow, [], pinData); + const node = workflowExecutionService.selectPinnedTrigger(workflow, [], pinData); expect(node).toEqual(executeWorkflowTriggerNode); }); @@ -461,7 +461,7 @@ describe('WorkflowExecutionService', () => { it('should favor webhook node over execute workflow trigger', () => { workflow.nodes.push(webhookNode, executeWorkflowTriggerNode); - const node = workflowExecutionService.selectPinnedActivatorStarter(workflow, [], pinData); + const node = workflowExecutionService.selectPinnedTrigger(workflow, [], pinData); expect(node).toEqual(webhookNode); }); @@ -469,7 +469,7 @@ describe('WorkflowExecutionService', () => { it('should favor first webhook node over second webhook node', () => { workflow.nodes.push(webhookNode, secondWebhookNode); - const node = workflowExecutionService.selectPinnedActivatorStarter(workflow, [], pinData); + const node = workflowExecutionService.selectPinnedTrigger(workflow, [], pinData); expect(node).toEqual(webhookNode); }); @@ -481,7 +481,7 @@ describe('WorkflowExecutionService', () => { ...createMainConnection(secondWebhookNode.name, secondHackerNewsNode.name), }; - const node = workflowExecutionService.selectPinnedActivatorStarter( + const node = workflowExecutionService.selectPinnedTrigger( workflow, [], { ...pinData, [secondWebhookNode.name]: [{ json: { key: 'value' } }] }, diff --git a/packages/cli/src/workflows/workflow-execution.service.ts b/packages/cli/src/workflows/workflow-execution.service.ts index 8adde50ffdf6e..bcdb096849688 100644 --- a/packages/cli/src/workflows/workflow-execution.service.ts +++ b/packages/cli/src/workflows/workflow-execution.service.ts @@ -4,7 +4,7 @@ import type { Project, User, CreateExecutionPayload } from '@n8n/db'; import { ExecutionRepository, WorkflowRepository } from '@n8n/db'; import { Service } from '@n8n/di'; import type { Response } from 'express'; -import { ErrorReporter } from 'n8n-core'; +import { DirectedGraph, ErrorReporter, anyReachableRootHaveRunData } from 'n8n-core'; import type { IDeferredPromise, IExecuteData, @@ -17,8 +17,10 @@ import type { WorkflowExecuteMode, IWorkflowExecutionDataProcess, IWorkflowBase, + IRunData, } from 'n8n-workflow'; import { SubworkflowOperationError, Workflow, createRunExecutionData } from 'n8n-workflow'; +import * as a from 'node:assert/strict'; import { ExecutionDataService } from '@/executions/execution-data.service'; import { SubworkflowPolicyChecker } from '@/executions/pre-execution-checks'; @@ -91,6 +93,18 @@ export class WorkflowExecutionService { return nodeType.description.group.includes('trigger'); } + private doesTriggerHaveRunData( + destinationNode: string, + workflowData: IWorkflowBase, + runData: IRunData, + ) { + return anyReachableRootHaveRunData( + DirectedGraph.fromNodesAndConnections(workflowData.nodes, workflowData.connections), + destinationNode, + runData, + ); + } + async executeManually( payload: WorkflowRequest.ManualRunPayload, user: User, @@ -98,139 +112,218 @@ export class WorkflowExecutionService { streamingEnabled?: boolean, httpResponse?: Response, ) { - const { workflowData, startNodes, dirtyNodeNames, triggerToStartFrom, agentRequest } = payload; - let { runData } = payload; - const destinationNode = payload.destinationNode - ? ({ nodeName: payload.destinationNode, mode: 'inclusive' } as const) - : undefined; - const pinData = workflowData.pinData; - let pinnedTrigger = this.selectPinnedActivatorStarter( - workflowData, - startNodes?.map((nodeData) => nodeData.name), - pinData, - destinationNode?.nodeName, - ); + console.log('version 1'); + function isFullManualExecutionFromKnownTriggerPayload( + payload: WorkflowRequest.ManualRunPayload, + ): payload is WorkflowRequest.FullManualExecutionFromKnownTriggerPayload { + if ('triggerToStartFrom' in payload && !('runData' in payload)) { + return true; + } + return false; + } - // TODO: Reverse the order of events, first find out if the execution is - // partial or full, if it's partial create the execution and run, if it's - // full get the data first and only then create the execution. - // - // If the destination node is a trigger, then per definition this - // is not a partial execution and thus we can ignore the run data. - // If we don't do this we'll end up creating an execution, calling the - // partial execution flow, finding out that we don't have run data to - // create the execution stack and have to cancel the execution, come back - // here and either create the runData (e.g. scheduler trigger) or wait for - // a webhook or event. - if (destinationNode) { - if (this.isDestinationNodeATrigger(destinationNode.nodeName, workflowData)) { - runData = undefined; + function isFullManualExecutionFromUnknownTriggerPayload( + payload: WorkflowRequest.ManualRunPayload, + ): payload is WorkflowRequest.FullManualExecutionFromUnknownTriggerPayload { + if (!('triggerToStartFrom' in payload) && !('runData' in payload)) { + return true; } + return false; } - // if we have a trigger to start from and it's not the pinned trigger - // ignore the pinned trigger - if (pinnedTrigger && triggerToStartFrom && pinnedTrigger.name !== triggerToStartFrom.name) { - pinnedTrigger = null; + function isPartialManualExecutionToDestination( + payload: WorkflowRequest.ManualRunPayload, + ): payload is WorkflowRequest.PartialManualExecutionToDestination { + if ('destinationNode' in payload && 'runData' in payload) { + return true; + } + return false; } - // If webhooks nodes exist and are active we have to wait for till we receive a call - if ( - pinnedTrigger === null && - (runData === undefined || - startNodes === undefined || - startNodes.length === 0 || - destinationNode === undefined) - ) { - const additionalData = await WorkflowExecuteAdditionalData.getBase({ - userId: user.id, - workflowId: workflowData.id, - }); + // For manual testing always set to not active + payload.workflowData.active = false; + payload.workflowData.activeVersionId = null; - const needsWebhook = await this.testWebhooks.needsWebhook({ - userId: user.id, - workflowEntity: workflowData, - additionalData, - runData, + // TODO: fix this on the FE + if ('triggerToStartFrom' in payload) { + // eslint-disable-next-line @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-member-access + delete (payload as any).runData; + } + + console.log('payload', payload); + + if (isPartialManualExecutionToDestination(payload)) { + console.log('isPartialManualExecutionToDestination'); + + const destinationNode = payload.destinationNode + ? ({ nodeName: payload.destinationNode, mode: 'inclusive' } as const) + : undefined; + + if ( + // If the trigger has no runData we have to upgrade to a + // FullManualExecutionFromUnknownTriggerPayload + // + // TODO: This function should be happy if at least one root trigger has + // runData, if there are multiple root triggers this will always return + // false, because it's impossible for more than one trigger to have + // runData. + !this.doesTriggerHaveRunData( + payload.destinationNode, + payload.workflowData, + payload.runData, + ) || + // If the destination node is a trigger, then per definition this + // is not a partial execution and thus we can ignore the run data. + // If we don't do this we'll end up creating an execution, calling the + // partial execution flow, finding out that we don't have run data to + // create the execution stack and have to cancel the execution, come back + // here and either create the runData (e.g. scheduler trigger) or wait for + // a webhook or event. + this.isDestinationNodeATrigger(payload.destinationNode, payload.workflowData) + ) { + console.log('upgrade to FullManualExecutionFromUnknownTriggerPayload'); + payload = { + workflowData: payload.workflowData, + destinationNode: payload.destinationNode, + agentRequest: payload.agentRequest, + } satisfies WorkflowRequest.FullManualExecutionFromUnknownTriggerPayload; + } else { + console.log('prerequisitesAreGiven'); + const executionId = await this.workflowRunner.run({ + destinationNode, + executionMode: 'manual', + runData: payload.runData, + pinData: payload.workflowData.pinData, + pushRef, + workflowData: payload.workflowData, + userId: user.id, + dirtyNodeNames: payload.dirtyNodeNames, + agentRequest: payload.agentRequest, + streamingEnabled, + httpResponse, + // startNodes, + // triggerToStartFrom: payload.triggerToStartFrom, + }); + return { executionId }; + } + } + + if (isFullManualExecutionFromKnownTriggerPayload(payload)) { + console.log('isFullManualExecutionFromKnownTriggerPayload'); + + const destinationNode = payload.destinationNode + ? ({ nodeName: payload.destinationNode, mode: 'inclusive' } as const) + : undefined; + + const pinnedTrigger = this.selectPinnedTrigger( + payload.workflowData, + payload.destinationNode, + payload.workflowData.pinData ?? {}, + ); + + if (pinnedTrigger === null) { + console.log('check webhooks'); + const additionalData = await WorkflowExecuteAdditionalData.getBase({ + userId: user.id, + workflowId: payload.workflowData.id, + }); + + const needsWebhook = await this.testWebhooks.needsWebhook({ + userId: user.id, + workflowEntity: payload.workflowData, + additionalData, + pushRef, + triggerToStartFrom: payload.triggerToStartFrom, + destinationNode, + // runData, + }); + + console.log('needsWebhook', needsWebhook); + if (needsWebhook) { + return { waitingForWebhook: true }; + } + } + + const executionId = await this.workflowRunner.run({ + executionMode: 'manual', + pinData: payload.workflowData.pinData, pushRef, + workflowData: payload.workflowData, + userId: user.id, + triggerToStartFrom: payload.triggerToStartFrom, + agentRequest: payload.agentRequest, + streamingEnabled, + httpResponse, destinationNode, - triggerToStartFrom, + // runData, + // startNodes, + // dirtyNodeNames, }); - if (needsWebhook) return { waitingForWebhook: true }; + return { executionId }; } - // For manual testing always set to not active - workflowData.active = false; - workflowData.activeVersionId = null; + if (isFullManualExecutionFromUnknownTriggerPayload(payload)) { + console.log('isFullManualExecutionFromUnknownTriggerPayload'); - // Start the workflow - const data: IWorkflowExecutionDataProcess = { - destinationNode, - executionMode: 'manual', - runData, - pinData, - pushRef, - startNodes, - workflowData, - userId: user.id, - dirtyNodeNames, - triggerToStartFrom, - agentRequest, - streamingEnabled, - httpResponse, - }; + const destinationNode = payload.destinationNode + ? ({ nodeName: payload.destinationNode, mode: 'inclusive' } as const) + : undefined; - const hasRunData = (node: INode) => runData !== undefined && !!runData[node.name]; + const pinnedTrigger = this.selectPinnedTrigger( + payload.workflowData, + payload.destinationNode, + payload.workflowData.pinData ?? {}, + ); - if (pinnedTrigger && !hasRunData(pinnedTrigger)) { - data.startNodes = [{ name: pinnedTrigger.name, sourceData: null }]; - } + if (pinnedTrigger === null) { + console.log('check webhooks'); + const additionalData = await WorkflowExecuteAdditionalData.getBase({ + userId: user.id, + workflowId: payload.workflowData.id, + }); - const offloadingManualExecutionsInQueueMode = - this.globalConfig.executions.mode === 'queue' && - process.env.OFFLOAD_MANUAL_EXECUTIONS_TO_WORKERS === 'true'; - - /** - * Historically, manual executions in scaling mode ran in the main process, - * so some execution details were never persisted in the database. - * - * Currently, manual executions in scaling mode are offloaded to workers, - * so we persist all details to give workers full access to them. - */ - if (offloadingManualExecutionsInQueueMode) { - data.executionData = createRunExecutionData({ - startData: { - startNodes: data.startNodes, + const needsWebhook = await this.testWebhooks.needsWebhook({ + userId: user.id, + workflowEntity: payload.workflowData, + additionalData, + pushRef, destinationNode, - }, - resultData: { - pinData, - // If `runData` is initialized to an empty object the execution will - // be treated like a partial manual execution instead of a full - // manual execution. - // So we have to set this to null to instruct - // `createRunExecutionData` to not initialize it. - runData: runData ?? null, - }, - manualData: { - userId: data.userId, - dirtyNodeNames, - triggerToStartFrom, - }, - // If `executionData` is initialized the execution will be treated like - // a resumed execution after waiting, instead of a manual execution. - // So we have to set this to null to instruct `createRunExecutionData` - // to not initialize it. - executionData: null, + // triggerToStartFrom: payload.triggerToStartFrom, + // runData, + }); + + console.log('needsWebhook', needsWebhook); + if (needsWebhook) { + return { waitingForWebhook: true }; + } + } + + if (pinnedTrigger) { + console.log('rewrite startNodes'); + console.log('pinnedTrigger', pinnedTrigger); + } + + const executionId = this.workflowRunner.run({ + executionMode: 'manual', + pinData: payload.workflowData.pinData, + pushRef, + workflowData: payload.workflowData, + userId: user.id, + agentRequest: payload.agentRequest, + streamingEnabled, + httpResponse, + destinationNode, + triggerToStartFrom: pinnedTrigger ? { name: pinnedTrigger.name } : undefined, + // startNodes, + // runData, + // dirtyNodeNames, }); - } - const executionId = await this.workflowRunner.run(data); + return { executionId }; + } - return { - executionId, - }; + a.fail('should never happen'); } async executeChatWorkflow( @@ -402,6 +495,8 @@ export class WorkflowExecutionService { } } + // TODO: update the docstring + // /** * Select the pinned activator node to use as starter for a manual execution. * @@ -413,71 +508,28 @@ export class WorkflowExecutionService { * prioritizing `n8n-nodes-base.webhook` over other activators. If the executed node * has no upstream nodes and is itself is a pinned activator, select it. */ - selectPinnedActivatorStarter( - workflow: IWorkflowBase, - startNodes?: string[], - pinData?: IPinData, - destinationNode?: string, - ) { - if (!pinData || !startNodes) return null; - - const allPinnedActivators = this.findAllPinnedActivators(workflow, pinData); - - if (allPinnedActivators.length === 0) return null; - - const [firstPinnedActivator] = allPinnedActivators; - - // full manual execution - - if (startNodes?.length === 0) { - // If there is a destination node, find the pinned activator that is a parent of the destination node - if (destinationNode) { - const destinationParents = new Set( - new Workflow({ - nodes: workflow.nodes, - connections: workflow.connections, - active: workflow.activeVersionId !== null, - nodeTypes: this.nodeTypes, - }).getParentNodes(destinationNode), - ); + selectPinnedTrigger(workflow: IWorkflowBase, destinationNode: string, pinData: IPinData) { + const allPinnedTriggers = this.findAllPinnedTriggers(workflow, pinData); - const activator = allPinnedActivators.find((a) => destinationParents.has(a.name)); + if (allPinnedTriggers.length === 0) return null; - if (activator) { - return activator; - } - } - - return firstPinnedActivator ?? null; - } - - // partial manual execution - - /** - * If the partial manual execution has 2+ start nodes, we search only the zeroth - * start node's parents for a pinned activator. If we had 2+ start nodes without - * a common ancestor and so if we end up finding multiple pinned activators, we - * would still need to return one to comply with existing usage. - */ - const [firstStartNodeName] = startNodes; - - const parentNodeNames = new Workflow({ - nodes: workflow.nodes, - connections: workflow.connections, - active: workflow.activeVersionId !== null, - nodeTypes: this.nodeTypes, - }).getParentNodes(firstStartNodeName); + const destinationParents = new Set( + new Workflow({ + nodes: workflow.nodes, + connections: workflow.connections, + active: workflow.activeVersionId !== null, + nodeTypes: this.nodeTypes, + }).getParentNodes(destinationNode), + ); - if (parentNodeNames.length > 0) { - const parentNodeName = parentNodeNames.find((p) => p === firstPinnedActivator.name); + const trigger = allPinnedTriggers.find((a) => destinationParents.has(a.name)); - return allPinnedActivators.find((pa) => pa.name === parentNodeName) ?? null; - } + console.log('trigger', trigger); - return allPinnedActivators.find((pa) => pa.name === firstStartNodeName) ?? null; + return trigger; } - private findAllPinnedActivators(workflow: IWorkflowBase, pinData?: IPinData) { + private findAllPinnedTriggers(workflow: IWorkflowBase, pinData?: IPinData) { return workflow.nodes .filter( (node) => diff --git a/packages/cli/src/workflows/workflow.request.ts b/packages/cli/src/workflows/workflow.request.ts index ad1a4302240bc..5ba7c9c701c27 100644 --- a/packages/cli/src/workflows/workflow.request.ts +++ b/packages/cli/src/workflows/workflow.request.ts @@ -4,7 +4,6 @@ import type { IConnections, IWorkflowSettings, IRunData, - StartNodeData, ITaskData, IWorkflowBase, AiAgentRequest, @@ -29,19 +28,37 @@ export declare namespace WorkflowRequest { uiContext?: string; }>; - type ManualRunPayload = { + // 1. Full Manual Execution from Known Trigger + type FullManualExecutionFromKnownTriggerPayload = { workflowData: IWorkflowBase; - runData?: IRunData; - startNodes?: StartNodeData[]; - destinationNode?: string; - dirtyNodeNames?: string[]; - triggerToStartFrom?: { - name: string; - data?: ITaskData; - }; agentRequest?: AiAgentRequest; + + destinationNode: string; + triggerToStartFrom: { name: string; data?: ITaskData }; + }; + // 2. Full Manual Execution from Unknown Trigger + type FullManualExecutionFromUnknownTriggerPayload = { + workflowData: IWorkflowBase; + agentRequest?: AiAgentRequest; + + destinationNode: string; }; + // 3. Partial Manual Execution to Destination + type PartialManualExecutionToDestination = { + workflowData: IWorkflowBase; + agentRequest?: AiAgentRequest; + + runData: IRunData; + destinationNode: string; + dirtyNodeNames: string[]; + }; + + type ManualRunPayload = + | FullManualExecutionFromKnownTriggerPayload + | FullManualExecutionFromUnknownTriggerPayload + | PartialManualExecutionToDestination; + type Create = AuthenticatedRequest<{}, {}, CreateUpdatePayload>; type Get = AuthenticatedRequest<{ workflowId: string }>; diff --git a/packages/core/src/execution-engine/partial-execution-utils/directed-graph.ts b/packages/core/src/execution-engine/partial-execution-utils/directed-graph.ts index 48477cc32dbd6..b29efcea7b32f 100644 --- a/packages/core/src/execution-engine/partial-execution-utils/directed-graph.ts +++ b/packages/core/src/execution-engine/partial-execution-utils/directed-graph.ts @@ -493,6 +493,44 @@ export class DirectedGraph { return graph; } + static fromNodesAndConnections(nodes: INode[], connections: IConnections): DirectedGraph { + const graph = new DirectedGraph(); + + graph.addNodes(...nodes); + + // Create a map for quick node lookup + const nodeMap = new Map(); + for (const node of nodes) { + nodeMap.set(node.name, node); + } + + for (const [fromNodeName, iConnection] of Object.entries(connections)) { + const from = nodeMap.get(fromNodeName); + a.ok(from); + + for (const [outputType, outputs] of Object.entries(iConnection)) { + for (const [outputIndex, conns] of outputs.entries()) { + for (const conn of conns ?? []) { + const { node: toNodeName, type: _inputType, index: inputIndex } = conn; + const to = nodeMap.get(toNodeName); + a.ok(to); + + graph.addConnection({ + from, + to, + // TODO: parse outputType instead of casting it + type: outputType as NodeConnectionType, + outputIndex, + inputIndex, + }); + } + } + } + } + + return graph; + } + clone() { return new DirectedGraph() .addNodes(...this.getNodes().values()) diff --git a/packages/core/src/execution-engine/partial-execution-utils/find-trigger-for-partial-execution.ts b/packages/core/src/execution-engine/partial-execution-utils/find-trigger-for-partial-execution.ts index f753ddbae0e1f..0ca665333decf 100644 --- a/packages/core/src/execution-engine/partial-execution-utils/find-trigger-for-partial-execution.ts +++ b/packages/core/src/execution-engine/partial-execution-utils/find-trigger-for-partial-execution.ts @@ -1,6 +1,8 @@ import * as assert from 'assert/strict'; import type { INode, INodeType, IRunData, Workflow } from 'n8n-workflow'; +import type { DirectedGraph } from './directed-graph'; + const isTriggerNode = (nodeType: INodeType) => nodeType.description.group.includes('trigger'); function findAllParentTriggers(workflow: Workflow, destinationNodeName: string) { @@ -25,6 +27,42 @@ function findAllParentTriggers(workflow: Workflow, destinationNodeName: string) return parentNodes; } +export function anyReachableRootHaveRunData( + workflow: DirectedGraph, + destinationNodeName: string, + runData: IRunData, +): boolean { + const destinationNode = workflow.getNodes().get(destinationNodeName); + if (!destinationNode) return false; + + // Get all parent connections recursively + const parentConnections = workflow.getParentConnections(destinationNode); + + // Extract unique parent nodes from connections + const parentNodes = new Set(); + for (const connection of parentConnections) { + parentNodes.add(connection.from); + } + + // Find all root nodes (nodes with no incoming connections) + const rootNodes = new Set(); + for (const parentNode of parentNodes) { + const hasParents = workflow.getDirectParentConnections(parentNode).length > 0; + if (!hasParents) { + rootNodes.add(parentNode); + } + } + + // Check if at least one root node has run data + for (const rootNode of rootNodes) { + if (runData[rootNode.name]) { + return true; + } + } + + return false; +} + // TODO: rewrite this using DirectedGraph instead of workflow. export function findTriggerForPartialExecution( workflow: Workflow, diff --git a/packages/core/src/execution-engine/partial-execution-utils/index.ts b/packages/core/src/execution-engine/partial-execution-utils/index.ts index bd01d10fcc46c..939ad608fa1cc 100644 --- a/packages/core/src/execution-engine/partial-execution-utils/index.ts +++ b/packages/core/src/execution-engine/partial-execution-utils/index.ts @@ -1,5 +1,8 @@ export { DirectedGraph } from './directed-graph'; -export { findTriggerForPartialExecution } from './find-trigger-for-partial-execution'; +export { + findTriggerForPartialExecution, + anyReachableRootHaveRunData, +} from './find-trigger-for-partial-execution'; export { findStartNodes } from './find-start-nodes'; export { findSubgraph } from './find-subgraph'; export { recreateNodeExecutionStack } from './recreate-node-execution-stack'; diff --git a/packages/core/src/execution-engine/workflow-execute.ts b/packages/core/src/execution-engine/workflow-execute.ts index 09b499d349b32..43c1babf8a8a5 100644 --- a/packages/core/src/execution-engine/workflow-execute.ts +++ b/packages/core/src/execution-engine/workflow-execute.ts @@ -209,37 +209,39 @@ export class WorkflowExecute { } else { // Edge Case 1: // Support executing a single node that is not connected to a trigger + // TODO: remove this, once it's proven to be unnecessary const destinationHasNoParents = graph.getDirectParentConnections(destination).length === 0; if (destinationHasNoParents) { - // short cut here, only create a subgraph and the stacks - graph = findSubgraph({ - graph: filterDisabledNodes(graph), - destination, - trigger: destination, - }); - const filteredNodes = graph.getNodes(); - runData = cleanRunData(runData, graph, new Set([destination])); - const { nodeExecutionStack, waitingExecution, waitingExecutionSource } = - recreateNodeExecutionStack(graph, new Set([destination]), runData, pinData ?? {}); - - this.status = 'running'; - this.runExecutionData = createRunExecutionData({ - startData: { - destinationNode, - runNodeFilter: Array.from(filteredNodes.values()).map((node) => node.name), - }, - resultData: { - runData, - pinData, - }, - executionData: { - nodeExecutionStack, - waitingExecution, - waitingExecutionSource, - }, - }); - - return this.processRunExecutionData(graph.toWorkflow({ ...workflow })); + assert.fail('this code should be dead'); + // // short cut here, only create a subgraph and the stacks + // graph = findSubgraph({ + // graph: filterDisabledNodes(graph), + // destination, + // trigger: destination, + // }); + // const filteredNodes = graph.getNodes(); + // runData = cleanRunData(runData, graph, new Set([destination])); + // const { nodeExecutionStack, waitingExecution, waitingExecutionSource } = + // recreateNodeExecutionStack(graph, new Set([destination]), runData, pinData ?? {}); + // + // this.status = 'running'; + // this.runExecutionData = createRunExecutionData({ + // startData: { + // destinationNode, + // runNodeFilter: Array.from(filteredNodes.values()).map((node) => node.name), + // }, + // resultData: { + // runData, + // pinData, + // }, + // executionData: { + // nodeExecutionStack, + // waitingExecution, + // waitingExecutionSource, + // }, + // }); + // + // return this.processRunExecutionData(graph.toWorkflow({ ...workflow })); } }