From bcace8ef5b4e86a4a740e717d81d16b492aca180 Mon Sep 17 00:00:00 2001 From: Bertil Chapuis Date: Tue, 7 Oct 2025 00:12:46 +0200 Subject: [PATCH 1/2] refactor: restructure runtime --- .../runtime/conditional-execution-handler.ts | 122 ++ apps/api/src/runtime/credit-manager.ts | 47 + apps/api/src/runtime/execution-persistence.ts | 121 ++ apps/api/src/runtime/execution-planner.ts | 225 ++++ apps/api/src/runtime/node-executor.ts | 302 +++++ apps/api/src/runtime/node-input-mapper.ts | 186 +++ apps/api/src/runtime/node-output-mapper.ts | 46 + apps/api/src/runtime/object-store.ts | 660 +++------- apps/api/src/runtime/runtime.ts | 1087 +---------------- apps/api/src/runtime/secret-manager.ts | 64 + 10 files changed, 1349 insertions(+), 1511 deletions(-) create mode 100644 apps/api/src/runtime/conditional-execution-handler.ts create mode 100644 apps/api/src/runtime/credit-manager.ts create mode 100644 apps/api/src/runtime/execution-persistence.ts create mode 100644 apps/api/src/runtime/execution-planner.ts create mode 100644 apps/api/src/runtime/node-executor.ts create mode 100644 apps/api/src/runtime/node-input-mapper.ts create mode 100644 apps/api/src/runtime/node-output-mapper.ts create mode 100644 apps/api/src/runtime/secret-manager.ts diff --git a/apps/api/src/runtime/conditional-execution-handler.ts b/apps/api/src/runtime/conditional-execution-handler.ts new file mode 100644 index 00000000..c91c8516 --- /dev/null +++ b/apps/api/src/runtime/conditional-execution-handler.ts @@ -0,0 +1,122 @@ +import type { CloudflareNodeRegistry } from "../nodes/cloudflare-node-registry"; +import type { NodeInputMapper } from "./node-input-mapper"; +import type { RuntimeState } from "./runtime"; + +/** + * Handles conditional logic in workflow execution. + * Determines which nodes should be skipped based on inactive outputs and missing inputs. + */ +export class ConditionalExecutionHandler { + constructor( + private nodeRegistry: CloudflareNodeRegistry, + private inputMapper: NodeInputMapper + ) {} + + /** + * Marks nodes connected to inactive outputs as skipped. + * This is crucial for conditional logic where only one branch should execute. + */ + markInactiveOutputNodesAsSkipped( + runtimeState: RuntimeState, + nodeIdentifier: string, + nodeOutputs: Record + ): RuntimeState { + const node = runtimeState.workflow.nodes.find( + (n) => n.id === nodeIdentifier + ); + if (!node) return runtimeState; + + // Find outputs that were NOT produced + const inactiveOutputs = node.outputs + .map((output) => output.name) + .filter((outputName) => !(outputName in nodeOutputs)); + + if (inactiveOutputs.length === 0) return runtimeState; + + // Find all edges from this node's inactive outputs + const inactiveEdges = runtimeState.workflow.edges.filter( + (edge) => + edge.source === nodeIdentifier && + inactiveOutputs.includes(edge.sourceOutput) + ); + + // Process each target node of inactive edges + for (const edge of inactiveEdges) { + this.markNodeAsSkippedIfNoValidInputs(runtimeState, edge.target); + } + + return runtimeState; + } + + /** + * Marks a node as skipped if it cannot execute due to missing required inputs. + * This is smarter than recursively skipping all dependents. + */ + private markNodeAsSkippedIfNoValidInputs( + runtimeState: RuntimeState, + nodeId: string + ): void { + if ( + runtimeState.skippedNodes.has(nodeId) || + runtimeState.executedNodes.has(nodeId) + ) { + return; // Already processed + } + + const node = runtimeState.workflow.nodes.find((n) => n.id === nodeId); + if (!node) return; + + // Check if this node has all required inputs satisfied + const allRequiredInputsSatisfied = this.nodeHasAllRequiredInputsSatisfied( + runtimeState, + nodeId + ); + + // Only skip if the node cannot execute (missing required inputs) + if (!allRequiredInputsSatisfied) { + runtimeState.skippedNodes.add(nodeId); + + // Recursively check dependents of this skipped node + const outgoingEdges = runtimeState.workflow.edges.filter( + (edge) => edge.source === nodeId + ); + + for (const edge of outgoingEdges) { + this.markNodeAsSkippedIfNoValidInputs(runtimeState, edge.target); + } + } + } + + /** + * Checks if a node has all required inputs satisfied. + * A node can execute if all its required inputs are available. + */ + private nodeHasAllRequiredInputsSatisfied( + runtimeState: RuntimeState, + nodeId: string + ): boolean { + const node = runtimeState.workflow.nodes.find((n) => n.id === nodeId); + if (!node) return false; + + // Get the node type definition to check for required inputs + const executable = this.nodeRegistry.createExecutableNode(node); + if (!executable) return false; + + const nodeTypeDefinition = (executable.constructor as any).nodeType; + if (!nodeTypeDefinition) return false; + + const inputValues = this.inputMapper.collectNodeInputs( + runtimeState, + nodeId + ); + + // Check each required input based on the node type definition (not workflow node definition) + for (const input of nodeTypeDefinition.inputs) { + if (input.required && inputValues[input.name] === undefined) { + return false; // Found a required input that's missing + } + } + + return true; // All required inputs are satisfied + } +} diff --git a/apps/api/src/runtime/credit-manager.ts b/apps/api/src/runtime/credit-manager.ts new file mode 100644 index 00000000..ed452880 --- /dev/null +++ b/apps/api/src/runtime/credit-manager.ts @@ -0,0 +1,47 @@ +import type { Node } from "@dafthunk/types"; + +import type { Bindings } from "../context"; +import type { CloudflareNodeRegistry } from "../nodes/cloudflare-node-registry"; +import { getOrganizationComputeUsage } from "../utils/credits"; + +/** + * Manages compute credits for workflow execution. + * Handles credit checks and cost calculations. + */ +export class CreditManager { + constructor( + private env: Bindings, + private nodeRegistry: CloudflareNodeRegistry + ) {} + + /** + * Checks if the organization has enough compute credits to execute a workflow. + * Credit limits are not enforced in development mode. + */ + async hasEnoughComputeCredits( + organizationId: string, + computeCredits: number, + computeCost: number + ): Promise { + // Skip credit limit enforcement in development mode + if (this.env.CLOUDFLARE_ENV === "development") { + return true; + } + + const computeUsage = await getOrganizationComputeUsage( + this.env.KV, + organizationId + ); + return computeUsage + computeCost <= computeCredits; + } + + /** + * Returns the compute cost of a list of nodes. + */ + getNodesComputeCost(nodes: Node[]): number { + return nodes.reduce((acc, node) => { + const nodeType = this.nodeRegistry.getNodeType(node.type); + return acc + (nodeType.computeCost ?? 1); + }, 0); + } +} diff --git a/apps/api/src/runtime/execution-persistence.ts b/apps/api/src/runtime/execution-persistence.ts new file mode 100644 index 00000000..dfb93647 --- /dev/null +++ b/apps/api/src/runtime/execution-persistence.ts @@ -0,0 +1,121 @@ +import type { WorkflowExecution } from "@dafthunk/types"; + +import type { Bindings } from "../context"; +import { createDatabase, type ExecutionStatusType, saveExecution } from "../db"; +import type { RuntimeState } from "./runtime"; + +/** + * Handles persistence and updates for workflow executions. + * Manages database storage and WebSocket updates to sessions. + */ +export class ExecutionPersistence { + constructor(private env: Bindings) {} + + /** + * Sends execution update to workflow session via WebSocket + */ + async sendExecutionUpdateToSession( + workflowSessionId: string, + execution: WorkflowExecution + ): Promise { + try { + const id = this.env.WORKFLOW_SESSION.idFromName(workflowSessionId); + const stub = this.env.WORKFLOW_SESSION.get(id); + + await stub.fetch(`https://workflow-session/execution`, { + method: "POST", + headers: { + "Content-Type": "application/json", + }, + body: JSON.stringify(execution), + }); + } catch (error) { + console.error( + `Failed to send execution update to session ${workflowSessionId}:`, + error + ); + } + } + + /** + * Builds node execution list from runtime state + */ + buildNodeExecutions(runtimeState: RuntimeState) { + return runtimeState.workflow.nodes.map((node) => { + if (runtimeState.executedNodes.has(node.id)) { + return { + nodeId: node.id, + status: "completed" as const, + outputs: runtimeState.nodeOutputs.get(node.id) || {}, + }; + } + if (runtimeState.nodeErrors.has(node.id)) { + return { + nodeId: node.id, + status: "error" as const, + error: runtimeState.nodeErrors.get(node.id), + }; + } + if (runtimeState.skippedNodes.has(node.id)) { + return { + nodeId: node.id, + status: "skipped" as const, + }; + } + return { + nodeId: node.id, + status: "executing" as const, + }; + }); + } + + /** + * Persists the workflow execution state to the database. + */ + async saveExecutionState( + userId: string, + organizationId: string, + workflowId: string, + instanceId: string, + runtimeState: RuntimeState, + startedAt?: Date, + endedAt?: Date + ): Promise { + const nodeExecutionList = this.buildNodeExecutions(runtimeState); + + const executionStatus = runtimeState.status; + const errorMsg = + runtimeState.nodeErrors.size > 0 + ? Array.from(runtimeState.nodeErrors.values()).join(", ") + : undefined; + + try { + const db = createDatabase(this.env.DB); + return await saveExecution(db, { + id: instanceId, + workflowId, + userId, + organizationId, + status: executionStatus as ExecutionStatusType, + nodeExecutions: nodeExecutionList, + error: errorMsg, + updatedAt: new Date(), + startedAt, + endedAt, + }); + } catch (error) { + console.error("Failed to persist execution record:", error); + // Continue without interrupting the workflow. + } + + return { + id: instanceId, + workflowId, + status: executionStatus, + nodeExecutions: nodeExecutionList, + error: errorMsg, + startedAt, + endedAt, + }; + } +} diff --git a/apps/api/src/runtime/execution-planner.ts b/apps/api/src/runtime/execution-planner.ts new file mode 100644 index 00000000..ae81feed --- /dev/null +++ b/apps/api/src/runtime/execution-planner.ts @@ -0,0 +1,225 @@ +import type { Workflow } from "@dafthunk/types"; + +import type { CloudflareNodeRegistry } from "../nodes/cloudflare-node-registry"; +import type { ExecutionPlan } from "./runtime"; + +/** + * Creates execution plans for workflows by analyzing the node graph. + * Handles topological ordering and groups consecutive inlinable nodes together. + */ +export class ExecutionPlanner { + constructor(private nodeRegistry: CloudflareNodeRegistry) {} + + /** + * Creates an execution plan that groups consecutive inlinable nodes together. + * Enhanced version that can handle branching patterns within groups. + * + * Examples of patterns that can now be inlined: + * + * Fan-out pattern: + * A → B + * A → C [A, B, C] can be grouped together + * + * Fan-in pattern: + * A → C + * B → C [A, B, C] can be grouped together + * + * Tree pattern: + * A → B → D + * A → C → D [A, B, C, D] can be grouped together + * + * The old linear approach would have executed these as separate steps, + * but now they execute in a single Cloudflare workflow step. + */ + createExecutionPlan( + workflow: Workflow, + orderedNodes: string[] + ): ExecutionPlan { + const plan: ExecutionPlan = []; + const processedNodes = new Set(); + let totalInlineGroups = 0; + let totalInlinedNodes = 0; + + for (let i = 0; i < orderedNodes.length; i++) { + const nodeId = orderedNodes[i]; + + if (processedNodes.has(nodeId)) { + continue; // Already processed in a group + } + + const node = workflow.nodes.find((n) => n.id === nodeId); + if (!node) continue; + + const nodeType = this.nodeRegistry.getNodeType(node.type); + const isInlinable = nodeType.inlinable ?? false; + + if (isInlinable) { + // Look ahead to find a group of connected inlinable nodes + const inlineGroup = this.findConnectedInlinableGroup( + workflow, + nodeId, + orderedNodes, + i, + processedNodes + ); + + if (inlineGroup.length === 1) { + // Single node - add as individual + plan.push({ type: "individual", nodeId: inlineGroup[0] }); + } else { + // Multiple nodes - add as inline group + plan.push({ type: "inline", nodeIds: [...inlineGroup] }); + totalInlineGroups++; + totalInlinedNodes += inlineGroup.length; + } + + // Mark all nodes in the group as processed + inlineGroup.forEach((id) => processedNodes.add(id)); + } else { + // Non-inlinable node - add as individual + plan.push({ type: "individual", nodeId }); + processedNodes.add(nodeId); + } + } + + // Log metrics for performance analysis + if (totalInlineGroups > 0) { + const totalInlinableNodes = orderedNodes.filter((nodeId) => { + const node = workflow.nodes.find((n) => n.id === nodeId); + if (!node) return false; + const nodeType = this.nodeRegistry.getNodeType(node.type); + return nodeType.inlinable ?? false; + }).length; + + const inliningEfficiency = + (totalInlinedNodes / totalInlinableNodes) * 100; + console.log( + `Execution plan optimized: ${totalInlineGroups} inline groups containing ${totalInlinedNodes}/${totalInlinableNodes} inlinable nodes (${inliningEfficiency.toFixed(1)}% efficiency)` + ); + + // Log individual group sizes for analysis + const groupSizes = plan + .filter((unit) => unit.type === "inline") + .map((unit) => (unit.type === "inline" ? unit.nodeIds.length : 0)); + + console.log(`Group sizes: [${groupSizes.join(", ")}]`); + } + + return plan; + } + + /** + * Finds a connected group of inlinable nodes starting from a given node. + * Uses a simple algorithm: expand the group as long as all dependencies are satisfied. + */ + private findConnectedInlinableGroup( + workflow: Workflow, + startNodeId: string, + orderedNodes: string[], + startIndex: number, + alreadyProcessed: Set + ): string[] { + const group = [startNodeId]; + const groupSet = new Set([startNodeId]); + + // Look ahead in the topological order for nodes that can be added to this group + for (let i = startIndex + 1; i < orderedNodes.length; i++) { + const candidateId = orderedNodes[i]; + + // Skip if already processed or not inlinable + if (alreadyProcessed.has(candidateId)) continue; + + const candidateNode = workflow.nodes.find((n) => n.id === candidateId); + if (!candidateNode) continue; + + const candidateNodeType = this.nodeRegistry.getNodeType( + candidateNode.type + ); + if (!(candidateNodeType.inlinable ?? false)) continue; + + // Check if this candidate can be safely added to the group + if ( + this.canSafelyAddToGroup( + workflow, + candidateId, + groupSet, + orderedNodes, + startIndex + ) + ) { + group.push(candidateId); + groupSet.add(candidateId); + } + } + + return group; + } + + /** + * Simplified check: a node can be added to a group if all its dependencies + * are either already executed or in the current group. + */ + private canSafelyAddToGroup( + workflow: Workflow, + nodeId: string, + currentGroupSet: Set, + orderedNodes: string[], + groupStartIndex: number + ): boolean { + // Get all dependencies of this node + const dependencies = workflow.edges + .filter((edge) => edge.target === nodeId) + .map((edge) => edge.source); + + // Check each dependency + for (const depId of dependencies) { + const isInGroup = currentGroupSet.has(depId); + const depIndex = orderedNodes.indexOf(depId); + const isAlreadyExecuted = depIndex < groupStartIndex; + + if (!isInGroup && !isAlreadyExecuted) { + return false; // Has unmet dependency + } + } + + return true; + } + + /** + * Calculates a topological ordering of nodes. Returns an empty array if a cycle is detected. + */ + createTopologicalOrder(workflow: Workflow): string[] { + const inDegree: Record = {}; + const adjacency: Record = {}; + + for (const node of workflow.nodes) { + inDegree[node.id] = 0; + adjacency[node.id] = []; + } + + for (const edge of workflow.edges) { + adjacency[edge.source].push(edge.target); + inDegree[edge.target] += 1; + } + + const queue: string[] = Object.keys(inDegree).filter( + (id) => inDegree[id] === 0 + ); + const ordered: string[] = []; + + while (queue.length > 0) { + const current = queue.shift()!; + ordered.push(current); + + for (const neighbour of adjacency[current]) { + inDegree[neighbour] -= 1; + if (inDegree[neighbour] === 0) { + queue.push(neighbour); + } + } + } + + // If ordering missed nodes, a cycle exists. + return ordered.length === workflow.nodes.length ? ordered : []; + } +} diff --git a/apps/api/src/runtime/node-executor.ts b/apps/api/src/runtime/node-executor.ts new file mode 100644 index 00000000..e8842b20 --- /dev/null +++ b/apps/api/src/runtime/node-executor.ts @@ -0,0 +1,302 @@ +import type { Bindings } from "../context"; +import type { CloudflareNodeRegistry } from "../nodes/cloudflare-node-registry"; +import type { CloudflareToolRegistry } from "../nodes/cloudflare-tool-registry"; +import type { HttpRequest, NodeContext } from "../nodes/types"; +import type { EmailMessage } from "../nodes/types"; +import type { ConditionalExecutionHandler } from "./conditional-execution-handler"; +import type { NodeInputMapper } from "./node-input-mapper"; +import type { NodeOutputMapper } from "./node-output-mapper"; +import { ObjectStore } from "./object-store"; +import type { NodeOutputs, RuntimeState } from "./runtime"; + +/** + * Executes workflow nodes. + * Handles both individual nodes and groups of inlinable nodes. + */ +export class NodeExecutor { + constructor( + private env: Bindings, + private nodeRegistry: CloudflareNodeRegistry, + private toolRegistry: CloudflareToolRegistry, + private inputMapper: NodeInputMapper, + private outputMapper: NodeOutputMapper, + private conditionalHandler: ConditionalExecutionHandler + ) {} + + /** + * Executes a group of inlinable nodes sequentially in a single step. + */ + async executeInlineGroup( + runtimeState: RuntimeState, + workflowId: string, + nodeIds: string[], + organizationId: string, + executionId: string, + secrets: Record, + httpRequest?: HttpRequest, + emailMessage?: EmailMessage + ): Promise { + let currentState = runtimeState; + const groupStartTime = Date.now(); + const executedNodesInGroup: string[] = []; + + console.log(`Starting inline group execution: [${nodeIds.join(", ")}]`); + + // Execute each node in the group sequentially + for (const nodeId of nodeIds) { + // Skip nodes that were already marked as failed or skipped + if ( + currentState.nodeErrors.has(nodeId) || + currentState.skippedNodes.has(nodeId) + ) { + console.log( + `Skipping node ${nodeId} in inline group (already failed/skipped)` + ); + continue; + } + + try { + const nodeStartTime = Date.now(); + + currentState = await this.executeNode( + currentState, + workflowId, + nodeId, + organizationId, + executionId, + secrets, + httpRequest, + emailMessage + ); + + const nodeExecutionTime = Date.now() - nodeStartTime; + + // If execution failed, break the inline group execution + if (currentState.nodeErrors.has(nodeId)) { + console.log( + `Node ${nodeId} failed in inline group after ${nodeExecutionTime}ms, stopping group execution` + ); + break; + } + + executedNodesInGroup.push(nodeId); + console.log( + `Node ${nodeId} completed in inline group (${nodeExecutionTime}ms)` + ); + } catch (error) { + // Handle errors at the group level + const message = error instanceof Error ? error.message : String(error); + currentState.nodeErrors.set(nodeId, message); + currentState.status = "error"; + console.log( + `Fatal error in node ${nodeId} within inline group: ${message}` + ); + break; + } + } + + const totalGroupTime = Date.now() - groupStartTime; + console.log( + `Inline group completed: executed ${executedNodesInGroup.length}/${nodeIds.length} nodes in ${totalGroupTime}ms` + ); + + return currentState; + } + + /** + * Executes a single node and stores its outputs. + */ + async executeNode( + runtimeState: RuntimeState, + workflowId: string, + nodeIdentifier: string, + organizationId: string, + executionId: string, + secrets: Record, + httpRequest?: HttpRequest, + emailMessage?: EmailMessage + ): Promise { + const node = runtimeState.workflow.nodes.find( + (n): boolean => n.id === nodeIdentifier + ); + if (!node) { + runtimeState.nodeErrors.set( + nodeIdentifier, + `Node not found: ${nodeIdentifier}` + ); + return { ...runtimeState, status: "error" }; + } + + const nodeType = this.nodeRegistry.getNodeType(node.type); + this.env.COMPUTE.writeDataPoint({ + indexes: [organizationId], + blobs: [organizationId, workflowId, node.id], + doubles: [nodeType.computeCost ?? 1], + }); + + // Resolve the runnable implementation. + const executable = this.nodeRegistry.createExecutableNode(node); + if (!executable) { + runtimeState.nodeErrors.set( + nodeIdentifier, + `Node type not implemented: ${node.type}` + ); + return { ...runtimeState, status: "error" }; + } + + // Gather inputs by reading connections and default values. + const inputValues = this.inputMapper.collectNodeInputs( + runtimeState, + nodeIdentifier + ); + + try { + const objectStore = new ObjectStore(this.env.RESSOURCES); + const processedInputs = await this.inputMapper.mapRuntimeToNodeInputs( + runtimeState, + nodeIdentifier, + inputValues, + objectStore + ); + + // Configure AI Gateway options for all AI model requests + // If CLOUDFLARE_AI_GATEWAY_ID is set, all AI requests will be routed through the gateway + // for analytics, caching, and rate limiting. If not set, requests go directly to the model. + const aiOptions: AiOptions = {}; + const gatewayId = this.env.CLOUDFLARE_AI_GATEWAY_ID; + if (gatewayId) { + aiOptions.gateway = { + id: gatewayId, + skipCache: false, // Enable caching by default for better performance + }; + } + + const context: NodeContext = { + nodeId: nodeIdentifier, + workflowId: runtimeState.workflow.id, + organizationId, + inputs: processedInputs, + httpRequest, + emailMessage, + onProgress: () => {}, + toolRegistry: this.toolRegistry, + secrets: secrets || {}, + env: { + DB: this.env.DB, + AI: this.env.AI, + AI_OPTIONS: aiOptions, + RESSOURCES: this.env.RESSOURCES, + DATASETS: this.env.DATASETS, + DATASETS_AUTORAG: this.env.DATASETS_AUTORAG, + CLOUDFLARE_ACCOUNT_ID: this.env.CLOUDFLARE_ACCOUNT_ID, + CLOUDFLARE_API_TOKEN: this.env.CLOUDFLARE_API_TOKEN, + CLOUDFLARE_AI_GATEWAY_ID: this.env.CLOUDFLARE_AI_GATEWAY_ID, + TWILIO_ACCOUNT_SID: this.env.TWILIO_ACCOUNT_SID, + TWILIO_AUTH_TOKEN: this.env.TWILIO_AUTH_TOKEN, + TWILIO_PHONE_NUMBER: this.env.TWILIO_PHONE_NUMBER, + SENDGRID_API_KEY: this.env.SENDGRID_API_KEY, + SENDGRID_DEFAULT_FROM: this.env.SENDGRID_DEFAULT_FROM, + RESEND_API_KEY: this.env.RESEND_API_KEY, + RESEND_DEFAULT_FROM: this.env.RESEND_DEFAULT_FROM, + AWS_ACCESS_KEY_ID: this.env.AWS_ACCESS_KEY_ID, + AWS_SECRET_ACCESS_KEY: this.env.AWS_SECRET_ACCESS_KEY, + AWS_REGION: this.env.AWS_REGION, + SES_DEFAULT_FROM: this.env.SES_DEFAULT_FROM, + EMAIL_DOMAIN: this.env.EMAIL_DOMAIN, + OPENAI_API_KEY: this.env.OPENAI_API_KEY, + ANTHROPIC_API_KEY: this.env.ANTHROPIC_API_KEY, + GEMINI_API_KEY: this.env.GEMINI_API_KEY, + HUGGINGFACE_API_KEY: this.env.HUGGINGFACE_API_KEY, + }, + }; + + const result = await executable.execute(context); + + if (result.status === "completed") { + const outputsForRuntime = + await this.outputMapper.mapNodeToRuntimeOutputs( + runtimeState, + nodeIdentifier, + result.outputs ?? {}, + objectStore, + organizationId, + executionId + ); + runtimeState.nodeOutputs.set( + nodeIdentifier, + outputsForRuntime as NodeOutputs + ); + runtimeState.executedNodes.add(nodeIdentifier); + + // After successful execution, mark nodes connected to inactive outputs as skipped + runtimeState = this.conditionalHandler.markInactiveOutputNodesAsSkipped( + runtimeState, + nodeIdentifier, + result.outputs ?? {} + ); + } else { + const failureMessage = result.error ?? "Unknown error"; + runtimeState.nodeErrors.set(nodeIdentifier, failureMessage); + runtimeState.status = "error"; + } + + // Determine final workflow status. + if (runtimeState.status !== "error") { + const allNodesVisited = runtimeState.executionPlan.every((unit) => + unit.type === "individual" + ? runtimeState.executedNodes.has(unit.nodeId) || + runtimeState.skippedNodes.has(unit.nodeId) || + runtimeState.nodeErrors.has(unit.nodeId) + : unit.type === "inline" + ? unit.nodeIds.every( + (id: string) => + runtimeState.executedNodes.has(id) || + runtimeState.skippedNodes.has(id) || + runtimeState.nodeErrors.has(id) + ) + : false + ); + runtimeState.status = + allNodesVisited && runtimeState.nodeErrors.size === 0 + ? "completed" + : "executing"; + } + + return runtimeState; + } catch (error) { + if ( + error instanceof Error && + error.message.startsWith("Required input") + ) { + runtimeState.skippedNodes.add(nodeIdentifier); + + // Determine final workflow status. + if (runtimeState.status !== "error") { + const allNodesVisited = runtimeState.executionPlan.every((unit) => + unit.type === "individual" + ? runtimeState.executedNodes.has(unit.nodeId) || + runtimeState.skippedNodes.has(unit.nodeId) || + runtimeState.nodeErrors.has(unit.nodeId) + : unit.type === "inline" + ? unit.nodeIds.every( + (id: string) => + runtimeState.executedNodes.has(id) || + runtimeState.skippedNodes.has(id) || + runtimeState.nodeErrors.has(id) + ) + : false + ); + runtimeState.status = + allNodesVisited && runtimeState.nodeErrors.size === 0 + ? "completed" + : "executing"; + } + return runtimeState; + } + const message = error instanceof Error ? error.message : String(error); + runtimeState.nodeErrors.set(nodeIdentifier, message); + runtimeState.status = "error"; + return runtimeState; + } + } +} diff --git a/apps/api/src/runtime/node-input-mapper.ts b/apps/api/src/runtime/node-input-mapper.ts new file mode 100644 index 00000000..388af2f7 --- /dev/null +++ b/apps/api/src/runtime/node-input-mapper.ts @@ -0,0 +1,186 @@ +import type { JsonArray, JsonObject, ObjectReference } from "@dafthunk/types"; + +import type { CloudflareNodeRegistry } from "../nodes/cloudflare-node-registry"; +import { apiToNodeParameter } from "../nodes/parameter-mapper"; +import type { ObjectStore } from "./object-store"; +import type { + BasicNodeOutputValue, + NodeOutputs, + NodeOutputValue, + RuntimeState, +} from "./runtime"; + +/** + * Handles mapping and transformation of node inputs during workflow execution. + * Collects inputs from edges and default values, and transforms them to node format. + */ +export class NodeInputMapper { + constructor(private nodeRegistry: CloudflareNodeRegistry) {} + + /** + * Returns inputs for a node by checking its default values and inbound edges. + */ + collectNodeInputs( + runtimeState: RuntimeState, + nodeIdentifier: string + ): NodeOutputs { + const inputs: NodeOutputs = {}; + const node = runtimeState.workflow.nodes.find( + (n): boolean => n.id === nodeIdentifier + ); + if (!node) return inputs; + + // Defaults declared directly on the node. + for (const input of node.inputs) { + if (input.value !== undefined) { + if ( + typeof input.value === "string" || + typeof input.value === "number" || + typeof input.value === "boolean" || + (typeof input.value === "object" && input.value !== null) + ) { + inputs[input.name] = input.value as NodeOutputValue; + } + } + } + + // Values coming from connected nodes. + const inboundEdges = runtimeState.workflow.edges.filter( + (edge): boolean => edge.target === nodeIdentifier + ); + + // Group edges by target input to handle multiple connections + const edgesByInput = new Map(); + for (const edge of inboundEdges) { + const inputName = edge.targetInput; + if (!edgesByInput.has(inputName)) { + edgesByInput.set(inputName, []); + } + edgesByInput.get(inputName)!.push(edge); + } + + // Process each input's connections + for (const [inputName, edges] of edgesByInput) { + // Get the node type definition to check repeated + const executable = this.nodeRegistry.createExecutableNode(node); + const nodeTypeDefinition = executable + ? (executable.constructor as any).nodeType + : null; + const nodeTypeInput = nodeTypeDefinition?.inputs?.find( + (input: any) => input.name === inputName + ); + + // Check repeated from node type definition (not workflow node) + const acceptsMultiple = nodeTypeInput?.repeated || false; + + const values: BasicNodeOutputValue[] = []; + + for (const edge of edges) { + const sourceOutputs = runtimeState.nodeOutputs.get(edge.source); + if (sourceOutputs && sourceOutputs[edge.sourceOutput] !== undefined) { + const value = sourceOutputs[edge.sourceOutput]; + if ( + typeof value === "string" || + typeof value === "number" || + typeof value === "boolean" || + (typeof value === "object" && value !== null) + ) { + values.push(value as BasicNodeOutputValue); + } + } + } + + if (values.length > 0) { + if (acceptsMultiple) { + // For parameters that accept multiple connections, provide an array + inputs[inputName] = values; + } else { + // For single connection parameters, use the last value (current behavior) + inputs[inputName] = values[values.length - 1]; + } + } + } + + return inputs; + } + + /** + * Converts raw runtime inputs to the representation expected by the node. + */ + async mapRuntimeToNodeInputs( + runtimeState: RuntimeState, + nodeIdentifier: string, + inputValues: Record, + objectStore: ObjectStore + ): Promise> { + const node = runtimeState.workflow.nodes.find( + (n) => n.id === nodeIdentifier + ); + if (!node) throw new Error(`Node ${nodeIdentifier} not found`); + + const processed: Record = {}; + + for (const definition of node.inputs) { + const { name, type, required } = definition; + const value = inputValues[name]; + + if (required && value === undefined) { + throw new Error( + `Required input '${name}' missing for node ${nodeIdentifier}` + ); + } + if (value === undefined || value === null) continue; + + // Check if this parameter accepts multiple connections + const executable = this.nodeRegistry.createExecutableNode(node); + const nodeTypeDefinition = executable + ? (executable.constructor as any).nodeType + : null; + const nodeTypeInput = nodeTypeDefinition?.inputs?.find( + (input: any) => input.name === name + ); + const acceptsMultiple = nodeTypeInput?.repeated || false; + + // Handle secret parameters as strings since secrets are preloaded in context + const parameterType = type === "secret" ? "string" : type; + + if (acceptsMultiple && Array.isArray(value)) { + // For parameters that accept multiple connections, process each value individually + const processedArray = []; + for (const singleValue of value) { + const validSingleValue = singleValue as + | string + | number + | boolean + | ObjectReference + | JsonArray + | JsonObject; + const processedSingleValue = await apiToNodeParameter( + parameterType, + validSingleValue, + objectStore + ); + processedArray.push(processedSingleValue); + } + processed[name] = processedArray; + } else { + // Single value processing (existing logic) + const validValue = value as + | string + | number + | boolean + | ObjectReference + | JsonArray + | JsonObject; + const processedValue = await apiToNodeParameter( + parameterType, + validValue, + objectStore + ); + processed[name] = processedValue; + } + } + + return processed; + } +} diff --git a/apps/api/src/runtime/node-output-mapper.ts b/apps/api/src/runtime/node-output-mapper.ts new file mode 100644 index 00000000..cf5e4a53 --- /dev/null +++ b/apps/api/src/runtime/node-output-mapper.ts @@ -0,0 +1,46 @@ +import { nodeToApiParameter } from "../nodes/parameter-mapper"; +import type { ObjectStore } from "./object-store"; +import type { RuntimeState } from "./runtime"; + +/** + * Handles transformation of node outputs to runtime format. + * Converts node outputs to serializable representations for storage. + */ +export class NodeOutputMapper { + /** + * Converts node outputs to a serialisable runtime representation. + */ + async mapNodeToRuntimeOutputs( + runtimeState: RuntimeState, + nodeIdentifier: string, + outputsFromNode: Record, + objectStore: ObjectStore, + organizationId: string, + executionId: string + ): Promise> { + const node = runtimeState.workflow.nodes.find( + (n) => n.id === nodeIdentifier + ); + if (!node) throw new Error(`Node ${nodeIdentifier} not found`); + + const processed: Record = {}; + + for (const definition of node.outputs) { + const { name, type } = definition; + const value = outputsFromNode[name]; + if (value === undefined || value === null) continue; + + // Handle secret parameters as strings since secrets are preloaded in context + const parameterType = type === "secret" ? "string" : type; + + processed[name] = await nodeToApiParameter( + parameterType, + value, + objectStore, + organizationId, + executionId + ); + } + return processed; + } +} diff --git a/apps/api/src/runtime/object-store.ts b/apps/api/src/runtime/object-store.ts index 235d89d8..ecb4693b 100644 --- a/apps/api/src/runtime/object-store.ts +++ b/apps/api/src/runtime/object-store.ts @@ -1,16 +1,13 @@ import { ObjectReference, Workflow, WorkflowExecution } from "@dafthunk/types"; import { v7 as uuid } from "uuid"; +/** + * Manages R2 storage for objects, workflows, and executions. + * Uses helper methods to eliminate duplication in logging and error handling. + */ export class ObjectStore { - private bucket: R2Bucket; + constructor(private bucket: R2Bucket) {} - constructor(bucket: R2Bucket) { - this.bucket = bucket; - } - - /** - * Write a binary object to storage and return a reference - */ async writeObject( data: Uint8Array, mimeType: string, @@ -27,9 +24,6 @@ export class ObjectStore { ); } - /** - * Write a binary object to storage and return a reference - */ async writeObjectWithId( id: string, data: Uint8Array, @@ -37,141 +31,56 @@ export class ObjectStore { organizationId: string, executionId?: string ): Promise { - try { - console.log( - `ObjectStore.writeObjectWithId: Starting to write object with id ${id}` - ); - - if (!this.bucket) { - console.error( - "ObjectStore.writeObjectWithId: R2 bucket is not initialized" - ); - throw new Error("R2 bucket is not initialized"); - } - - const key = `objects/${id}/object.data`; - console.log( - `ObjectStore.writeObjectWithId: Attempting to store object with key ${key}` - ); - - const customMetadataForR2: { [key: string]: string } = { - id, - createdAt: new Date().toISOString(), - organizationId, - }; - - if (executionId) { - customMetadataForR2.executionId = executionId; - } + const customMetadata: Record = { + id, + createdAt: new Date().toISOString(), + organizationId, + }; + if (executionId) { + customMetadata.executionId = executionId; + } - const writeResult = await this.bucket.put(key, data, { + await this.writeToR2( + `objects/${id}/object.data`, + data, + { httpMetadata: { contentType: mimeType, cacheControl: "public, max-age=31536000", }, - customMetadata: customMetadataForR2, - }); - - console.log( - `ObjectStore.writeObjectWithId: Successfully stored object ${id}, etag: ${writeResult?.etag || "unknown"}` - ); + customMetadata, + }, + "writeObjectWithId" + ); - return { - id, - mimeType: mimeType, - }; - } catch (error) { - console.error( - "ObjectStore.writeObjectWithId: Failed to write object to R2:", - error - ); - throw error; - } + return { id, mimeType }; } - /** - * Read an object from storage using its reference - */ async readObject(reference: ObjectReference): Promise<{ data: Uint8Array; metadata: R2Object["customMetadata"]; } | null> { - try { - console.log( - `ObjectStore.read: Attempting to read object with id ${reference.id}` - ); - - if (!this.bucket) { - console.error("ObjectStore.read: R2 bucket is not initialized"); - throw new Error("R2 bucket is not initialized"); - } - - const key = `objects/${reference.id}/object.data`; - console.log(`ObjectStore.readObject: Getting object with key ${key}`); - - const object = await this.bucket.get(key); - - if (!object) { - console.log(`ObjectStore.readObject: Object not found with key ${key}`); - console.error(`ObjectStore.read: Object not found: ${reference.id}`); - return null; - } + const object = await this.readFromR2( + `objects/${reference.id}/object.data`, + "readObject" + ); - console.log( - `ObjectStore.readObject: Retrieved object ${reference.id}, size: ${object.size} bytes` - ); + if (!object) return null; - const data = await object.arrayBuffer(); - console.log( - `ObjectStore.read: Successfully read object ${reference.id}, size: ${data.byteLength} bytes` - ); - return { - data: new Uint8Array(data), - metadata: object.customMetadata, - }; - } catch (error) { - console.error( - `ObjectStore.read: Failed to read object ${reference.id}:`, - error - ); - throw error; - } + const arrayBuffer = await object.arrayBuffer(); + return { + data: new Uint8Array(arrayBuffer), + metadata: object.customMetadata, + }; } - /** - * Delete an object from storage using its reference - */ async deleteObject(reference: ObjectReference): Promise { - try { - console.log( - `ObjectStore.delete: Attempting to delete object with id ${reference.id}` - ); - - if (!this.bucket) { - console.error("ObjectStore.delete: R2 bucket is not initialized"); - throw new Error("R2 bucket is not initialized"); - } - - const key = `objects/${reference.id}/object.data`; - console.log(`ObjectStore.deleteObject: Deleting object with key ${key}`); - - await this.bucket.delete(key); - console.log(`ObjectStore.deleteObject: Deleted object with key ${key}`); - console.log( - `ObjectStore.delete: Successfully deleted object ${reference.id}` - ); - } catch (error) { - console.error( - `ObjectStore.delete: Failed to delete object ${reference.id}:`, - error - ); - throw error; - } + await this.deleteFromR2( + `objects/${reference.id}/object.data`, + "deleteObject" + ); } - /** - * List objects for an organization - */ async listObjects(organizationId: string): Promise< { id: string; @@ -182,81 +91,30 @@ export class ObjectStore { executionId?: string; }[] > { - try { - console.log( - `ObjectStore.listObjects: Listing objects for organization ${organizationId}` - ); - - if (!this.bucket) { - console.error("ObjectStore.listObjects: R2 bucket is not initialized"); - throw new Error("R2 bucket is not initialized"); - } - - const prefix = "objects/"; - const objects = await this.bucket.list({ prefix }); - - const filteredObjects = []; - - // Filter objects by organizationId and parse metadata - for (const obj of objects.objects) { - if (obj.customMetadata?.organizationId === organizationId) { - // Extract object ID from the key path (objects/{id}/object.data) - const keyParts = obj.key.split("/"); - const id = keyParts[1]; // The ID should be in the second position - - // We don't know the MIME type from the listing, so we use the content-type from httpMetadata - const mimeType = - obj.httpMetadata?.contentType || "application/octet-stream"; - - filteredObjects.push({ - id, - mimeType, - size: obj.size, - createdAt: obj.customMetadata?.createdAt - ? new Date(obj.customMetadata.createdAt) - : new Date(), - organizationId, - executionId: obj.customMetadata?.executionId, - }); - } - } - - console.log( - `ObjectStore.listObjects: Found ${filteredObjects.length} objects for organization ${organizationId}` - ); - - return filteredObjects; - } catch (error) { - console.error( - `ObjectStore.listObjects: Failed to list objects for organization ${organizationId}:`, - error - ); - throw error; - } + const objects = await this.listFromR2("objects/", "listObjects"); + + return objects.objects + .filter((obj) => obj.customMetadata?.organizationId === organizationId) + .map((obj) => { + const id = obj.key.split("/")[1]; + return { + id, + mimeType: obj.httpMetadata?.contentType || "application/octet-stream", + size: obj.size, + createdAt: obj.customMetadata?.createdAt + ? new Date(obj.customMetadata.createdAt) + : new Date(), + organizationId, + executionId: obj.customMetadata?.executionId, + }; + }); } - /** - * Write a workflow to storage - */ async writeWorkflow(workflow: Workflow): Promise { - try { - console.log( - `ObjectStore.writeWorkflow: Starting to write workflow ${workflow.id}` - ); - - if (!this.bucket) { - console.error( - "ObjectStore.writeWorkflow: R2 bucket is not initialized" - ); - throw new Error("R2 bucket is not initialized"); - } - - const key = `workflows/${workflow.id}/workflow.json`; - console.log( - `ObjectStore.writeWorkflow: Attempting to store workflow with key ${key}` - ); - - const writeResult = await this.bucket.put(key, JSON.stringify(workflow), { + await this.writeToR2( + `workflows/${workflow.id}/workflow.json`, + JSON.stringify(workflow), + { httpMetadata: { contentType: "application/json", cacheControl: "no-cache", @@ -264,133 +122,41 @@ export class ObjectStore { customMetadata: { updatedAt: new Date().toISOString(), }, - }); - - console.log( - `ObjectStore.writeWorkflow: Successfully stored workflow ${workflow.id}, etag: ${writeResult?.etag || "unknown"}` - ); - - return workflow.id; - } catch (error) { - console.error( - "ObjectStore.writeWorkflow: Failed to write workflow to R2:", - error - ); - throw error; - } + }, + "writeWorkflow" + ); + return workflow.id; } - /** - * Read a workflow from storage using its id - */ async readWorkflow(workflowId: string): Promise { - try { - console.log( - `ObjectStore.readWorkflow: Attempting to read workflow with id ${workflowId}` - ); - - if (!this.bucket) { - console.error("ObjectStore.readWorkflow: R2 bucket is not initialized"); - throw new Error("R2 bucket is not initialized"); - } - - const key = `workflows/${workflowId}/workflow.json`; - console.log(`ObjectStore.readWorkflow: Getting workflow with key ${key}`); - - const object = await this.bucket.get(key); - - if (!object) { - console.log( - `ObjectStore.readWorkflow: Workflow not found with key ${key}` - ); - console.error( - `ObjectStore.readWorkflow: Workflow not found: ${workflowId}` - ); - throw new Error(`Workflow not found: ${workflowId}`); - } - - console.log( - `ObjectStore.readWorkflow: Retrieved workflow ${workflowId}, size: ${object.size} bytes` - ); - - const text = await object.text(); - const workflow = JSON.parse(text) as Workflow; - console.log( - `ObjectStore.readWorkflow: Successfully read workflow ${workflowId}` - ); + const object = await this.readFromR2( + `workflows/${workflowId}/workflow.json`, + "readWorkflow" + ); - return workflow; - } catch (error) { - console.error( - `ObjectStore.readWorkflow: Failed to read workflow ${workflowId}:`, - error - ); - throw error; + if (!object) { + throw new Error(`Workflow not found: ${workflowId}`); } + + const text = await object.text(); + return JSON.parse(text) as Workflow; } - /** - * Delete a workflow from storage using its id - */ async deleteWorkflow(workflowId: string): Promise { - try { - console.log( - `ObjectStore.deleteWorkflow: Attempting to delete workflow with id ${workflowId}` - ); - - if (!this.bucket) { - console.error( - "ObjectStore.deleteWorkflow: R2 bucket is not initialized" - ); - throw new Error("R2 bucket is not initialized"); - } - - const key = `workflows/${workflowId}/workflow.json`; - console.log( - `ObjectStore.deleteWorkflow: Deleting workflow with key ${key}` - ); - - await this.bucket.delete(key); - console.log( - `ObjectStore.deleteWorkflow: Deleted workflow with key ${key}` - ); - console.log( - `ObjectStore.deleteWorkflow: Successfully deleted workflow ${workflowId}` - ); - } catch (error) { - console.error( - `ObjectStore.deleteWorkflow: Failed to delete workflow ${workflowId}:`, - error - ); - throw error; - } + await this.deleteFromR2( + `workflows/${workflowId}/workflow.json`, + "deleteWorkflow" + ); } - /** - * Write a workflow to execution storage - */ async writeExecutionWorkflow( executionId: string, workflow: Workflow ): Promise { - try { - console.log( - `ObjectStore.writeExecutionWorkflow: Starting to write workflow for execution ${executionId}` - ); - - if (!this.bucket) { - console.error( - "ObjectStore.writeExecutionWorkflow: R2 bucket is not initialized" - ); - throw new Error("R2 bucket is not initialized"); - } - - const key = `executions/${executionId}/workflow.json`; - console.log( - `ObjectStore.writeExecutionWorkflow: Attempting to store workflow with key ${key}` - ); - - const writeResult = await this.bucket.put(key, JSON.stringify(workflow), { + await this.writeToR2( + `executions/${executionId}/workflow.json`, + JSON.stringify(workflow), + { httpMetadata: { contentType: "application/json", cacheControl: "no-cache", @@ -400,247 +166,163 @@ export class ObjectStore { workflowId: workflow.id, updatedAt: new Date().toISOString(), }, - }); - - console.log( - `ObjectStore.writeExecutionWorkflow: Successfully stored workflow for execution ${executionId}, etag: ${writeResult?.etag || "unknown"}` - ); - - return executionId; - } catch (error) { - console.error( - `ObjectStore.writeExecutionWorkflow: Failed to write workflow to R2 for execution ${executionId}:`, - error - ); - throw error; - } + }, + "writeExecutionWorkflow" + ); + return executionId; } - /** - * Read a workflow from execution storage using its execution id - */ async readExecutionWorkflow(executionId: string): Promise { - try { - console.log( - `ObjectStore.readExecutionWorkflow: Attempting to read workflow for execution ${executionId}` - ); - - if (!this.bucket) { - console.error( - "ObjectStore.readExecutionWorkflow: R2 bucket is not initialized" - ); - throw new Error("R2 bucket is not initialized"); - } + const object = await this.readFromR2( + `executions/${executionId}/workflow.json`, + "readExecutionWorkflow" + ); - const key = `executions/${executionId}/workflow.json`; - console.log( - `ObjectStore.readExecutionWorkflow: Getting workflow with key ${key}` - ); + if (!object) { + throw new Error(`Workflow not found for execution: ${executionId}`); + } - const object = await this.bucket.get(key); + const text = await object.text(); + return JSON.parse(text) as Workflow; + } - if (!object) { - console.log( - `ObjectStore.readExecutionWorkflow: Workflow not found with key ${key}` - ); - console.error( - `ObjectStore.readExecutionWorkflow: Workflow not found for execution: ${executionId}` - ); - throw new Error(`Workflow not found for execution: ${executionId}`); - } + async deleteExecutionWorkflow(executionId: string): Promise { + await this.deleteFromR2( + `executions/${executionId}/workflow.json`, + "deleteExecutionWorkflow" + ); + } - console.log( - `ObjectStore.readExecutionWorkflow: Retrieved workflow for execution ${executionId}, size: ${object.size} bytes` - ); + async writeExecution(execution: WorkflowExecution): Promise { + await this.writeToR2( + `executions/${execution.id}/execution.json`, + JSON.stringify(execution), + { + httpMetadata: { + contentType: "application/json", + cacheControl: "no-cache", + }, + customMetadata: { + workflowId: execution.workflowId, + status: execution.status, + updatedAt: new Date().toISOString(), + }, + }, + "writeExecution" + ); + return execution.id; + } - const text = await object.text(); - const workflow = JSON.parse(text) as Workflow; - console.log( - `ObjectStore.readExecutionWorkflow: Successfully read workflow for execution ${executionId}` - ); + async readExecution(executionId: string): Promise { + const object = await this.readFromR2( + `executions/${executionId}/execution.json`, + "readExecution" + ); - return workflow; - } catch (error) { - console.error( - `ObjectStore.readExecutionWorkflow: Failed to read workflow for execution ${executionId}:`, - error - ); - throw error; + if (!object) { + throw new Error(`Execution not found: ${executionId}`); } + + const text = await object.text(); + return JSON.parse(text) as WorkflowExecution; } - /** - * Delete a workflow from execution storage using its execution id - */ - async deleteExecutionWorkflow(executionId: string): Promise { + async deleteExecution(executionId: string): Promise { + await this.deleteFromR2( + `executions/${executionId}/execution.json`, + "deleteExecution" + ); + } + + private async writeToR2( + key: string, + data: string | ArrayBuffer | Uint8Array, + options: R2PutOptions, + operation: string + ): Promise { try { - console.log( - `ObjectStore.deleteExecutionWorkflow: Attempting to delete workflow for execution ${executionId}` - ); + console.log(`ObjectStore.${operation}: Writing to ${key}`); if (!this.bucket) { - console.error( - "ObjectStore.deleteExecutionWorkflow: R2 bucket is not initialized" - ); throw new Error("R2 bucket is not initialized"); } - const key = `executions/${executionId}/workflow.json`; - console.log( - `ObjectStore.deleteExecutionWorkflow: Deleting workflow with key ${key}` - ); - - await this.bucket.delete(key); - console.log( - `ObjectStore.deleteExecutionWorkflow: Deleted workflow with key ${key}` - ); + const result = await this.bucket.put(key, data, options); console.log( - `ObjectStore.deleteExecutionWorkflow: Successfully deleted workflow for execution ${executionId}` + `ObjectStore.${operation}: Success, etag: ${result?.etag || "unknown"}` ); } catch (error) { - console.error( - `ObjectStore.deleteExecutionWorkflow: Failed to delete workflow for execution ${executionId}:`, - error - ); + console.error(`ObjectStore.${operation}: Failed to write ${key}:`, error); throw error; } } - /** - * Write an execution to storage - */ - async writeExecution(execution: WorkflowExecution): Promise { + private async readFromR2( + key: string, + operation: string + ): Promise { try { - console.log( - `ObjectStore.writeExecution: Starting to write execution ${execution.id}` - ); + console.log(`ObjectStore.${operation}: Reading from ${key}`); if (!this.bucket) { - console.error( - "ObjectStore.writeExecution: R2 bucket is not initialized" - ); throw new Error("R2 bucket is not initialized"); } - const key = `executions/${execution.id}/execution.json`; - console.log( - `ObjectStore.writeExecution: Attempting to store execution with key ${key}` - ); + const object = await this.bucket.get(key); - const writeResult = await this.bucket.put( - key, - JSON.stringify(execution), - { - httpMetadata: { - contentType: "application/json", - cacheControl: "no-cache", - }, - customMetadata: { - workflowId: execution.workflowId, - status: execution.status, - updatedAt: new Date().toISOString(), - }, - } - ); + if (!object) { + console.log(`ObjectStore.${operation}: Not found at ${key}`); + return null; + } console.log( - `ObjectStore.writeExecution: Successfully stored execution ${execution.id}, etag: ${writeResult?.etag || "unknown"}` + `ObjectStore.${operation}: Success, size: ${object.size} bytes` ); - - return execution.id; + return object; } catch (error) { - console.error( - `ObjectStore.writeExecution: Failed to write execution to R2:`, - error - ); + console.error(`ObjectStore.${operation}: Failed to read ${key}:`, error); throw error; } } - /** - * Read an execution from storage using its id - */ - async readExecution(executionId: string): Promise { + private async deleteFromR2(key: string, operation: string): Promise { try { - console.log( - `ObjectStore.readExecution: Attempting to read execution with id ${executionId}` - ); + console.log(`ObjectStore.${operation}: Deleting ${key}`); if (!this.bucket) { - console.error( - "ObjectStore.readExecution: R2 bucket is not initialized" - ); throw new Error("R2 bucket is not initialized"); } - const key = `executions/${executionId}/execution.json`; - console.log( - `ObjectStore.readExecution: Getting execution with key ${key}` - ); - - const object = await this.bucket.get(key); - - if (!object) { - console.log( - `ObjectStore.readExecution: Execution not found with key ${key}` - ); - console.error( - `ObjectStore.readExecution: Execution not found: ${executionId}` - ); - throw new Error(`Execution not found: ${executionId}`); - } - - console.log( - `ObjectStore.readExecution: Retrieved execution ${executionId}, size: ${object.size} bytes` - ); - - const text = await object.text(); - const execution = JSON.parse(text) as WorkflowExecution; - console.log( - `ObjectStore.readExecution: Successfully read execution ${executionId}` - ); - - return execution; + await this.bucket.delete(key); + console.log(`ObjectStore.${operation}: Successfully deleted ${key}`); } catch (error) { console.error( - `ObjectStore.readExecution: Failed to read execution ${executionId}:`, + `ObjectStore.${operation}: Failed to delete ${key}:`, error ); throw error; } } - /** - * Delete an execution from storage using its id - */ - async deleteExecution(executionId: string): Promise { + private async listFromR2( + prefix: string, + operation: string + ): Promise { try { - console.log( - `ObjectStore.deleteExecution: Attempting to delete execution with id ${executionId}` - ); + console.log(`ObjectStore.${operation}: Listing with prefix ${prefix}`); if (!this.bucket) { - console.error( - "ObjectStore.deleteExecution: R2 bucket is not initialized" - ); throw new Error("R2 bucket is not initialized"); } - const key = `executions/${executionId}/execution.json`; - console.log( - `ObjectStore.deleteExecution: Deleting execution with key ${key}` - ); - - await this.bucket.delete(key); - console.log( - `ObjectStore.deleteExecution: Deleted execution with key ${key}` - ); + const objects = await this.bucket.list({ prefix }); console.log( - `ObjectStore.deleteExecution: Successfully deleted execution ${executionId}` + `ObjectStore.${operation}: Found ${objects.objects.length} objects` ); + return objects; } catch (error) { console.error( - `ObjectStore.deleteExecution: Failed to delete execution ${executionId}:`, + `ObjectStore.${operation}: Failed to list with prefix ${prefix}:`, error ); throw error; diff --git a/apps/api/src/runtime/runtime.ts b/apps/api/src/runtime/runtime.ts index 1219cf1b..83f79097 100644 --- a/apps/api/src/runtime/runtime.ts +++ b/apps/api/src/runtime/runtime.ts @@ -1,7 +1,6 @@ import { JsonArray, JsonObject, - Node, ObjectReference, Workflow, WorkflowExecution, @@ -16,26 +15,20 @@ import { import { NonRetryableError } from "cloudflare:workflows"; import { Bindings } from "../context"; -import { - createDatabase, - ExecutionStatusType, - getAllSecretsWithValues, - saveExecution, -} from "../db"; import { CloudflareNodeRegistry } from "../nodes/cloudflare-node-registry"; import { CloudflareToolRegistry } from "../nodes/cloudflare-tool-registry"; -import { - apiToNodeParameter, - nodeToApiParameter, -} from "../nodes/parameter-mapper"; import { HttpRequest, NodeContext } from "../nodes/types"; import { EmailMessage } from "../nodes/types"; -import { - getOrganizationComputeUsage, - updateOrganizationComputeUsage, -} from "../utils/credits"; +import { updateOrganizationComputeUsage } from "../utils/credits"; import { validateWorkflow } from "../utils/workflows"; -import { ObjectStore } from "./object-store"; +import { ConditionalExecutionHandler } from "./conditional-execution-handler"; +import { CreditManager } from "./credit-manager"; +import { ExecutionPersistence } from "./execution-persistence"; +import { ExecutionPlanner } from "./execution-planner"; +import { NodeExecutor } from "./node-executor"; +import { NodeInputMapper } from "./node-input-mapper"; +import { NodeOutputMapper } from "./node-output-mapper"; +import { SecretManager } from "./secret-manager"; // Basic node output value types export type BasicNodeOutputValue = @@ -107,6 +100,14 @@ export class Runtime extends WorkflowEntrypoint { private nodeRegistry: CloudflareNodeRegistry; private toolRegistry: CloudflareToolRegistry; + private planner: ExecutionPlanner; + private inputMapper: NodeInputMapper; + private outputMapper: NodeOutputMapper; + private secretManager: SecretManager; + private creditManager: CreditManager; + private conditionalHandler: ConditionalExecutionHandler; + private persistence: ExecutionPersistence; + private executor: NodeExecutor; constructor(ctx: ExecutionContext, env: Bindings) { super(ctx, env); @@ -115,6 +116,26 @@ export class Runtime extends WorkflowEntrypoint { this.nodeRegistry, this.createNodeContextForTool.bind(this) ); + + // Initialize specialized components + this.planner = new ExecutionPlanner(this.nodeRegistry); + this.inputMapper = new NodeInputMapper(this.nodeRegistry); + this.outputMapper = new NodeOutputMapper(); + this.secretManager = new SecretManager(env); + this.creditManager = new CreditManager(env, this.nodeRegistry); + this.conditionalHandler = new ConditionalExecutionHandler( + this.nodeRegistry, + this.inputMapper + ); + this.persistence = new ExecutionPersistence(env); + this.executor = new NodeExecutor( + env, + this.nodeRegistry, + this.toolRegistry, + this.inputMapper, + this.outputMapper, + this.conditionalHandler + ); } /** @@ -208,10 +229,10 @@ export class Runtime extends WorkflowEntrypoint { } as WorkflowExecution; if ( - !(await this.hasEnoughComputeCredits( + !(await this.creditManager.hasEnoughComputeCredits( organizationId, computeCredits, - this.getNodesComputeCost(workflow.nodes) + this.creditManager.getNodesComputeCost(workflow.nodes) )) ) { runtimeState = { ...runtimeState, status: "exhausted" }; @@ -219,7 +240,7 @@ export class Runtime extends WorkflowEntrypoint { "persist exhausted execution state", Runtime.defaultStepConfig, async () => - this.saveExecutionState( + this.persistence.saveExecutionState( userId, organizationId, workflow.id, @@ -236,7 +257,7 @@ export class Runtime extends WorkflowEntrypoint { const secrets = await step.do( "preload organization secrets", Runtime.defaultStepConfig, - async () => this.preloadAllSecrets(organizationId) + async () => this.secretManager.preloadAllSecrets(organizationId) ); // Prepare workflow (validation + ordering). @@ -254,7 +275,7 @@ export class Runtime extends WorkflowEntrypoint { "persist initial execution status", Runtime.defaultStepConfig, async () => - this.saveExecutionState( + this.persistence.saveExecutionState( userId, organizationId, workflow.id, @@ -280,7 +301,7 @@ export class Runtime extends WorkflowEntrypoint { `run node ${nodeIdentifier}`, Runtime.defaultStepConfig, async () => - this.executeNode( + this.executor.executeNode( runtimeState, workflow.id, nodeIdentifier, @@ -299,7 +320,7 @@ export class Runtime extends WorkflowEntrypoint { `run ${groupDescription}`, Runtime.defaultStepConfig, async () => - this.executeInlineGroup( + this.executor.executeInlineGroup( runtimeState, workflow.id, executionUnit.nodeIds, @@ -316,15 +337,17 @@ export class Runtime extends WorkflowEntrypoint { executionRecord = { ...executionRecord, status: runtimeState.status, - nodeExecutions: this.buildNodeExecutions(runtimeState), + nodeExecutions: this.persistence.buildNodeExecutions(runtimeState), }; - this.sendExecutionUpdateToSession( - workflowSessionId, - executionRecord - ).catch((error) => { - console.error("Failed to send execution update to session:", error); - }); + this.persistence + .sendExecutionUpdateToSession(workflowSessionId, executionRecord) + .catch((error) => { + console.error( + "Failed to send execution update to session:", + error + ); + }); } } } catch (error) { @@ -349,14 +372,14 @@ export class Runtime extends WorkflowEntrypoint { this.env.KV, organizationId, // Update organization compute credits for executed nodes - this.getNodesComputeCost( + this.creditManager.getNodesComputeCost( runtimeState.workflow.nodes.filter((node) => runtimeState.executedNodes.has(node.id) ) ) ); } - return this.saveExecutionState( + return this.persistence.saveExecutionState( userId, organizationId, workflow.id, @@ -369,126 +392,17 @@ export class Runtime extends WorkflowEntrypoint { ); if (workflowSessionId) { - this.sendExecutionUpdateToSession( - workflowSessionId, - executionRecord - ).catch((error) => { - console.error("Failed to send execution update to session:", error); - }); + this.persistence + .sendExecutionUpdateToSession(workflowSessionId, executionRecord) + .catch((error) => { + console.error("Failed to send execution update to session:", error); + }); } } return executionRecord; } - /** - * Checks if the organization has enough compute credits to execute a workflow. - * Credit limits are not enforced in development mode. - */ - private async hasEnoughComputeCredits( - organizationId: string, - computeCredits: number, - computeCost: number - ): Promise { - // Skip credit limit enforcement in development mode - if (this.env.CLOUDFLARE_ENV === "development") { - return true; - } - - const computeUsage = await getOrganizationComputeUsage( - this.env.KV, - organizationId - ); - return computeUsage + computeCost <= computeCredits; - } - - /** - * Returns the compute cost of a list of nodes. - */ - private getNodesComputeCost(nodes: Node[]): number { - return nodes.reduce((acc, node) => { - const nodeType = this.nodeRegistry.getNodeType(node.type); - return acc + (nodeType.computeCost ?? 1); - }, 0); - } - - private async sendExecutionUpdateToSession( - workflowSessionId: string, - execution: WorkflowExecution - ): Promise { - try { - const id = this.env.WORKFLOW_SESSION.idFromName(workflowSessionId); - const stub = this.env.WORKFLOW_SESSION.get(id); - - await stub.fetch(`https://workflow-session/execution`, { - method: "POST", - headers: { - "Content-Type": "application/json", - }, - body: JSON.stringify(execution), - }); - } catch (error) { - console.error( - `Failed to send execution update to session ${workflowSessionId}:`, - error - ); - } - } - - /** - * Preloads all organization secrets for synchronous access during workflow execution - */ - private async preloadAllSecrets( - organizationId: string - ): Promise> { - const secrets: Record = {}; - const db = createDatabase(this.env.DB); - - try { - // Get all secret records for the organization (including encrypted values) - const secretRecords = await getAllSecretsWithValues(db, organizationId); - - // Decrypt each secret and add to the secrets object - for (const secretRecord of secretRecords) { - try { - const secretValue = await this.decryptSecretValue( - secretRecord.encryptedValue, - organizationId - ); - secrets[secretRecord.name] = secretValue; - } catch (error) { - console.warn( - `Failed to decrypt secret '${secretRecord.name}':`, - error - ); - } - } - - console.log( - `Preloaded ${Object.keys(secrets).length} secrets for organization ${organizationId}` - ); - } catch (error) { - console.error( - `Failed to preload secrets for organization ${organizationId}:`, - error - ); - } - - return secrets; - } - - /** - * Decrypt a secret value using organization-specific key - */ - private async decryptSecretValue( - encryptedValue: string, - organizationId: string - ): Promise { - // Import decryptSecret here to avoid circular dependency issues - const { decryptSecret } = await import("../utils/encryption"); - return await decryptSecret(encryptedValue, this.env, organizationId); - } - /** * Validates the workflow and creates a sequential execution order with inline groups. */ @@ -502,7 +416,7 @@ export class Runtime extends WorkflowEntrypoint { ); } - const orderedNodes = this.createTopologicalOrder(workflow); + const orderedNodes = this.planner.createTopologicalOrder(workflow); if (orderedNodes.length === 0 && workflow.nodes.length > 0) { throw new NonRetryableError( "Unable to derive execution order. The graph may contain a cycle." @@ -510,7 +424,10 @@ export class Runtime extends WorkflowEntrypoint { } // Create execution plan with inline groups - const executionPlan = this.createExecutionPlan(workflow, orderedNodes); + const executionPlan = this.planner.createExecutionPlan( + workflow, + orderedNodes + ); return { workflow, @@ -522,878 +439,4 @@ export class Runtime extends WorkflowEntrypoint { status: "executing", }; } - - /** - * Creates an execution plan that groups consecutive inlinable nodes together. - * Enhanced version that can handle branching patterns within groups. - * - * Examples of patterns that can now be inlined: - * - * Fan-out pattern: - * A → B - * A → C [A, B, C] can be grouped together - * - * Fan-in pattern: - * A → C - * B → C [A, B, C] can be grouped together - * - * Tree pattern: - * A → B → D - * A → C → D [A, B, C, D] can be grouped together - * - * The old linear approach would have executed these as separate steps, - * but now they execute in a single Cloudflare workflow step. - */ - private createExecutionPlan( - workflow: Workflow, - orderedNodes: string[] - ): ExecutionPlan { - const plan: ExecutionPlan = []; - const processedNodes = new Set(); - let totalInlineGroups = 0; - let totalInlinedNodes = 0; - - for (let i = 0; i < orderedNodes.length; i++) { - const nodeId = orderedNodes[i]; - - if (processedNodes.has(nodeId)) { - continue; // Already processed in a group - } - - const node = workflow.nodes.find((n) => n.id === nodeId); - if (!node) continue; - - const nodeType = this.nodeRegistry.getNodeType(node.type); - const isInlinable = nodeType.inlinable ?? false; - - if (isInlinable) { - // Look ahead to find a group of connected inlinable nodes - const inlineGroup = this.findConnectedInlinableGroup( - workflow, - nodeId, - orderedNodes, - i, - processedNodes - ); - - if (inlineGroup.length === 1) { - // Single node - add as individual - plan.push({ type: "individual", nodeId: inlineGroup[0] }); - } else { - // Multiple nodes - add as inline group - plan.push({ type: "inline", nodeIds: [...inlineGroup] }); - totalInlineGroups++; - totalInlinedNodes += inlineGroup.length; - } - - // Mark all nodes in the group as processed - inlineGroup.forEach((id) => processedNodes.add(id)); - } else { - // Non-inlinable node - add as individual - plan.push({ type: "individual", nodeId }); - processedNodes.add(nodeId); - } - } - - // Log metrics for performance analysis - if (totalInlineGroups > 0) { - const totalInlinableNodes = orderedNodes.filter((nodeId) => { - const node = workflow.nodes.find((n) => n.id === nodeId); - if (!node) return false; - const nodeType = this.nodeRegistry.getNodeType(node.type); - return nodeType.inlinable ?? false; - }).length; - - const inliningEfficiency = - (totalInlinedNodes / totalInlinableNodes) * 100; - console.log( - `Execution plan optimized: ${totalInlineGroups} inline groups containing ${totalInlinedNodes}/${totalInlinableNodes} inlinable nodes (${inliningEfficiency.toFixed(1)}% efficiency)` - ); - - // Log individual group sizes for analysis - const groupSizes = plan - .filter((unit) => unit.type === "inline") - .map((unit) => (unit.type === "inline" ? unit.nodeIds.length : 0)); - - console.log(`Group sizes: [${groupSizes.join(", ")}]`); - } - - return plan; - } - - /** - * Finds a connected group of inlinable nodes starting from a given node. - * Uses a simple algorithm: expand the group as long as all dependencies are satisfied. - */ - private findConnectedInlinableGroup( - workflow: Workflow, - startNodeId: string, - orderedNodes: string[], - startIndex: number, - alreadyProcessed: Set - ): string[] { - const group = [startNodeId]; - const groupSet = new Set([startNodeId]); - - // Look ahead in the topological order for nodes that can be added to this group - for (let i = startIndex + 1; i < orderedNodes.length; i++) { - const candidateId = orderedNodes[i]; - - // Skip if already processed or not inlinable - if (alreadyProcessed.has(candidateId)) continue; - - const candidateNode = workflow.nodes.find((n) => n.id === candidateId); - if (!candidateNode) continue; - - const candidateNodeType = this.nodeRegistry.getNodeType( - candidateNode.type - ); - if (!(candidateNodeType.inlinable ?? false)) continue; - - // Check if this candidate can be safely added to the group - if ( - this.canSafelyAddToGroup( - workflow, - candidateId, - groupSet, - orderedNodes, - startIndex - ) - ) { - group.push(candidateId); - groupSet.add(candidateId); - } - } - - return group; - } - - /** - * Simplified check: a node can be added to a group if all its dependencies - * are either already executed or in the current group. - */ - private canSafelyAddToGroup( - workflow: Workflow, - nodeId: string, - currentGroupSet: Set, - orderedNodes: string[], - groupStartIndex: number - ): boolean { - // Get all dependencies of this node - const dependencies = workflow.edges - .filter((edge) => edge.target === nodeId) - .map((edge) => edge.source); - - // Check each dependency - for (const depId of dependencies) { - const isInGroup = currentGroupSet.has(depId); - const depIndex = orderedNodes.indexOf(depId); - const isAlreadyExecuted = depIndex < groupStartIndex; - - if (!isInGroup && !isAlreadyExecuted) { - return false; // Has unmet dependency - } - } - - return true; - } - - /** - * Executes a group of inlinable nodes sequentially in a single step. - */ - private async executeInlineGroup( - runtimeState: RuntimeState, - workflowId: string, - nodeIds: string[], - organizationId: string, - executionId: string, - secrets: Record, - httpRequest?: HttpRequest, - emailMessage?: EmailMessage - ): Promise { - let currentState = runtimeState; - const groupStartTime = Date.now(); - const executedNodesInGroup: string[] = []; - - console.log(`Starting inline group execution: [${nodeIds.join(", ")}]`); - - // Execute each node in the group sequentially - for (const nodeId of nodeIds) { - // Skip nodes that were already marked as failed or skipped - if ( - currentState.nodeErrors.has(nodeId) || - currentState.skippedNodes.has(nodeId) - ) { - console.log( - `Skipping node ${nodeId} in inline group (already failed/skipped)` - ); - continue; - } - - try { - const nodeStartTime = Date.now(); - - currentState = await this.executeNode( - currentState, - workflowId, - nodeId, - organizationId, - executionId, - secrets, - httpRequest, - emailMessage - ); - - const nodeExecutionTime = Date.now() - nodeStartTime; - - // If execution failed, break the inline group execution - if (currentState.nodeErrors.has(nodeId)) { - console.log( - `Node ${nodeId} failed in inline group after ${nodeExecutionTime}ms, stopping group execution` - ); - break; - } - - executedNodesInGroup.push(nodeId); - console.log( - `Node ${nodeId} completed in inline group (${nodeExecutionTime}ms)` - ); - } catch (error) { - // Handle errors at the group level - const message = error instanceof Error ? error.message : String(error); - currentState.nodeErrors.set(nodeId, message); - currentState.status = "error"; - console.log( - `Fatal error in node ${nodeId} within inline group: ${message}` - ); - break; - } - } - - const totalGroupTime = Date.now() - groupStartTime; - console.log( - `Inline group completed: executed ${executedNodesInGroup.length}/${nodeIds.length} nodes in ${totalGroupTime}ms` - ); - - return currentState; - } - - /** - * Executes a single node and stores its outputs. - */ - private async executeNode( - runtimeState: RuntimeState, - workflowId: string, - nodeIdentifier: string, - organizationId: string, - executionId: string, - secrets: Record, - httpRequest?: HttpRequest, - emailMessage?: EmailMessage - ): Promise { - const node = runtimeState.workflow.nodes.find( - (n): boolean => n.id === nodeIdentifier - ); - if (!node) { - runtimeState.nodeErrors.set( - nodeIdentifier, - `Node not found: ${nodeIdentifier}` - ); - return { ...runtimeState, status: "error" }; - } - - const nodeType = this.nodeRegistry.getNodeType(node.type); - this.env.COMPUTE.writeDataPoint({ - indexes: [organizationId], - blobs: [organizationId, workflowId, node.id], - doubles: [nodeType.computeCost ?? 1], - }); - - // Resolve the runnable implementation. - const executable = this.nodeRegistry.createExecutableNode(node); - if (!executable) { - runtimeState.nodeErrors.set( - nodeIdentifier, - `Node type not implemented: ${node.type}` - ); - return { ...runtimeState, status: "error" }; - } - - // Gather inputs by reading connections and default values. - const inputValues = this.collectNodeInputs(runtimeState, nodeIdentifier); - - try { - const processedInputs = await this.mapRuntimeToNodeInputs( - runtimeState, - nodeIdentifier, - inputValues - ); - - // Configure AI Gateway options for all AI model requests - // If CLOUDFLARE_AI_GATEWAY_ID is set, all AI requests will be routed through the gateway - // for analytics, caching, and rate limiting. If not set, requests go directly to the model. - const aiOptions: AiOptions = {}; - const gatewayId = this.env.CLOUDFLARE_AI_GATEWAY_ID; - if (gatewayId) { - aiOptions.gateway = { - id: gatewayId, - skipCache: false, // Enable caching by default for better performance - }; - } - - const context: NodeContext = { - nodeId: nodeIdentifier, - workflowId: runtimeState.workflow.id, - organizationId, - inputs: processedInputs, - httpRequest, - emailMessage, - onProgress: () => {}, - toolRegistry: this.toolRegistry, - secrets: secrets || {}, - env: { - DB: this.env.DB, - AI: this.env.AI, - AI_OPTIONS: aiOptions, - RESSOURCES: this.env.RESSOURCES, - DATASETS: this.env.DATASETS, - DATASETS_AUTORAG: this.env.DATASETS_AUTORAG, - CLOUDFLARE_ACCOUNT_ID: this.env.CLOUDFLARE_ACCOUNT_ID, - CLOUDFLARE_API_TOKEN: this.env.CLOUDFLARE_API_TOKEN, - CLOUDFLARE_AI_GATEWAY_ID: this.env.CLOUDFLARE_AI_GATEWAY_ID, - TWILIO_ACCOUNT_SID: this.env.TWILIO_ACCOUNT_SID, - TWILIO_AUTH_TOKEN: this.env.TWILIO_AUTH_TOKEN, - TWILIO_PHONE_NUMBER: this.env.TWILIO_PHONE_NUMBER, - SENDGRID_API_KEY: this.env.SENDGRID_API_KEY, - SENDGRID_DEFAULT_FROM: this.env.SENDGRID_DEFAULT_FROM, - RESEND_API_KEY: this.env.RESEND_API_KEY, - RESEND_DEFAULT_FROM: this.env.RESEND_DEFAULT_FROM, - AWS_ACCESS_KEY_ID: this.env.AWS_ACCESS_KEY_ID, - AWS_SECRET_ACCESS_KEY: this.env.AWS_SECRET_ACCESS_KEY, - AWS_REGION: this.env.AWS_REGION, - SES_DEFAULT_FROM: this.env.SES_DEFAULT_FROM, - EMAIL_DOMAIN: this.env.EMAIL_DOMAIN, - OPENAI_API_KEY: this.env.OPENAI_API_KEY, - ANTHROPIC_API_KEY: this.env.ANTHROPIC_API_KEY, - GEMINI_API_KEY: this.env.GEMINI_API_KEY, - HUGGINGFACE_API_KEY: this.env.HUGGINGFACE_API_KEY, - }, - }; - - const result = await executable.execute(context); - - if (result.status === "completed") { - const outputsForRuntime = await this.mapNodeToRuntimeOutputs( - runtimeState, - nodeIdentifier, - result.outputs ?? {}, - organizationId, - executionId - ); - runtimeState.nodeOutputs.set( - nodeIdentifier, - outputsForRuntime as NodeOutputs - ); - runtimeState.executedNodes.add(nodeIdentifier); - - // After successful execution, mark nodes connected to inactive outputs as skipped - runtimeState = this.markInactiveOutputNodesAsSkipped( - runtimeState, - nodeIdentifier, - result.outputs ?? {} - ); - } else { - const failureMessage = result.error ?? "Unknown error"; - runtimeState.nodeErrors.set(nodeIdentifier, failureMessage); - runtimeState.status = "error"; - } - - // Determine final workflow status. - if (runtimeState.status !== "error") { - const allNodesVisited = runtimeState.executionPlan.every((unit) => - unit.type === "individual" - ? runtimeState.executedNodes.has(unit.nodeId) || - runtimeState.skippedNodes.has(unit.nodeId) || - runtimeState.nodeErrors.has(unit.nodeId) - : unit.type === "inline" - ? unit.nodeIds.every( - (id: string) => - runtimeState.executedNodes.has(id) || - runtimeState.skippedNodes.has(id) || - runtimeState.nodeErrors.has(id) - ) - : false - ); - runtimeState.status = - allNodesVisited && runtimeState.nodeErrors.size === 0 - ? "completed" - : "executing"; - } - - return runtimeState; - } catch (error) { - if ( - error instanceof Error && - error.message.startsWith("Required input") - ) { - runtimeState.skippedNodes.add(nodeIdentifier); - - // Determine final workflow status. - if (runtimeState.status !== "error") { - const allNodesVisited = runtimeState.executionPlan.every((unit) => - unit.type === "individual" - ? runtimeState.executedNodes.has(unit.nodeId) || - runtimeState.skippedNodes.has(unit.nodeId) || - runtimeState.nodeErrors.has(unit.nodeId) - : unit.type === "inline" - ? unit.nodeIds.every( - (id: string) => - runtimeState.executedNodes.has(id) || - runtimeState.skippedNodes.has(id) || - runtimeState.nodeErrors.has(id) - ) - : false - ); - runtimeState.status = - allNodesVisited && runtimeState.nodeErrors.size === 0 - ? "completed" - : "executing"; - } - return runtimeState; - } - const message = error instanceof Error ? error.message : String(error); - runtimeState.nodeErrors.set(nodeIdentifier, message); - runtimeState.status = "error"; - return runtimeState; - } - } - - /** - * Returns inputs for a node by checking its default values and inbound edges. - */ - private collectNodeInputs( - runtimeState: RuntimeState, - nodeIdentifier: string - ): NodeOutputs { - const inputs: NodeOutputs = {}; - const node = runtimeState.workflow.nodes.find( - (n): boolean => n.id === nodeIdentifier - ); - if (!node) return inputs; - - // Defaults declared directly on the node. - for (const input of node.inputs) { - if (input.value !== undefined) { - if ( - typeof input.value === "string" || - typeof input.value === "number" || - typeof input.value === "boolean" || - (typeof input.value === "object" && input.value !== null) - ) { - inputs[input.name] = input.value as NodeOutputValue; - } - } - } - - // Values coming from connected nodes. - const inboundEdges = runtimeState.workflow.edges.filter( - (edge): boolean => edge.target === nodeIdentifier - ); - - // Group edges by target input to handle multiple connections - const edgesByInput = new Map(); - for (const edge of inboundEdges) { - const inputName = edge.targetInput; - if (!edgesByInput.has(inputName)) { - edgesByInput.set(inputName, []); - } - edgesByInput.get(inputName)!.push(edge); - } - - // Process each input's connections - for (const [inputName, edges] of edgesByInput) { - // Get the node type definition to check repeated - const executable = this.nodeRegistry.createExecutableNode(node); - const nodeTypeDefinition = executable - ? (executable.constructor as any).nodeType - : null; - const nodeTypeInput = nodeTypeDefinition?.inputs?.find( - (input: any) => input.name === inputName - ); - - // Check repeated from node type definition (not workflow node) - const acceptsMultiple = nodeTypeInput?.repeated || false; - - const values: BasicNodeOutputValue[] = []; - - for (const edge of edges) { - const sourceOutputs = runtimeState.nodeOutputs.get(edge.source); - if (sourceOutputs && sourceOutputs[edge.sourceOutput] !== undefined) { - const value = sourceOutputs[edge.sourceOutput]; - if ( - typeof value === "string" || - typeof value === "number" || - typeof value === "boolean" || - (typeof value === "object" && value !== null) - ) { - values.push(value as BasicNodeOutputValue); - } - } - } - - if (values.length > 0) { - if (acceptsMultiple) { - // For parameters that accept multiple connections, provide an array - inputs[inputName] = values; - } else { - // For single connection parameters, use the last value (current behavior) - inputs[inputName] = values[values.length - 1]; - } - } - } - - return inputs; - } - - /** - * Converts raw runtime inputs to the representation expected by the node. - */ - private async mapRuntimeToNodeInputs( - runtimeState: RuntimeState, - nodeIdentifier: string, - inputValues: Record - ): Promise> { - const node = runtimeState.workflow.nodes.find( - (n) => n.id === nodeIdentifier - ); - if (!node) throw new Error(`Node ${nodeIdentifier} not found`); - - const processed: Record = {}; - const objectStore = new ObjectStore(this.env.RESSOURCES); - - for (const definition of node.inputs) { - const { name, type, required } = definition; - const value = inputValues[name]; - - if (required && value === undefined) { - throw new Error( - `Required input '${name}' missing for node ${nodeIdentifier}` - ); - } - if (value === undefined || value === null) continue; - - // Check if this parameter accepts multiple connections - const executable = this.nodeRegistry.createExecutableNode(node); - const nodeTypeDefinition = executable - ? (executable.constructor as any).nodeType - : null; - const nodeTypeInput = nodeTypeDefinition?.inputs?.find( - (input: any) => input.name === name - ); - const acceptsMultiple = nodeTypeInput?.repeated || false; - - // Handle secret parameters as strings since secrets are preloaded in context - const parameterType = type === "secret" ? "string" : type; - - if (acceptsMultiple && Array.isArray(value)) { - // For parameters that accept multiple connections, process each value individually - const processedArray = []; - for (const singleValue of value) { - const validSingleValue = singleValue as - | string - | number - | boolean - | ObjectReference - | JsonArray - | JsonObject; - const processedSingleValue = await apiToNodeParameter( - parameterType, - validSingleValue, - objectStore - ); - processedArray.push(processedSingleValue); - } - processed[name] = processedArray; - } else { - // Single value processing (existing logic) - const validValue = value as - | string - | number - | boolean - | ObjectReference - | JsonArray - | JsonObject; - const processedValue = await apiToNodeParameter( - parameterType, - validValue, - objectStore - ); - processed[name] = processedValue; - } - } - - return processed; - } - - /** - * Converts node outputs to a serialisable runtime representation. - */ - private async mapNodeToRuntimeOutputs( - runtimeState: RuntimeState, - nodeIdentifier: string, - outputsFromNode: Record, - organizationId: string, - executionId: string - ): Promise> { - const node = runtimeState.workflow.nodes.find( - (n) => n.id === nodeIdentifier - ); - if (!node) throw new Error(`Node ${nodeIdentifier} not found`); - - const processed: Record = {}; - const objectStore = new ObjectStore(this.env.RESSOURCES); - - for (const definition of node.outputs) { - const { name, type } = definition; - const value = outputsFromNode[name]; - if (value === undefined || value === null) continue; - - // Handle secret parameters as strings since secrets are preloaded in context - const parameterType = type === "secret" ? "string" : type; - - processed[name] = await nodeToApiParameter( - parameterType, - value, - objectStore, - organizationId, - executionId - ); - } - return processed; - } - - /** - * Calculates a topological ordering of nodes. Returns an empty array if a cycle is detected. - */ - private createTopologicalOrder(workflow: Workflow): string[] { - const inDegree: Record = {}; - const adjacency: Record = {}; - - for (const node of workflow.nodes) { - inDegree[node.id] = 0; - adjacency[node.id] = []; - } - - for (const edge of workflow.edges) { - adjacency[edge.source].push(edge.target); - inDegree[edge.target] += 1; - } - - const queue: string[] = Object.keys(inDegree).filter( - (id) => inDegree[id] === 0 - ); - const ordered: string[] = []; - - while (queue.length > 0) { - const current = queue.shift()!; - ordered.push(current); - - for (const neighbour of adjacency[current]) { - inDegree[neighbour] -= 1; - if (inDegree[neighbour] === 0) { - queue.push(neighbour); - } - } - } - - // If ordering missed nodes, a cycle exists. - return ordered.length === workflow.nodes.length ? ordered : []; - } - - /** - * Builds node execution list from runtime state - */ - private buildNodeExecutions(runtimeState: RuntimeState) { - return runtimeState.workflow.nodes.map((node) => { - if (runtimeState.executedNodes.has(node.id)) { - return { - nodeId: node.id, - status: "completed" as const, - outputs: runtimeState.nodeOutputs.get(node.id) || {}, - }; - } - if (runtimeState.nodeErrors.has(node.id)) { - return { - nodeId: node.id, - status: "error" as const, - error: runtimeState.nodeErrors.get(node.id), - }; - } - if (runtimeState.skippedNodes.has(node.id)) { - return { - nodeId: node.id, - status: "skipped" as const, - }; - } - return { - nodeId: node.id, - status: "executing" as const, - }; - }); - } - - /** - * Persists the workflow execution state to the database. - */ - private async saveExecutionState( - userId: string, - organizationId: string, - workflowId: string, - instanceId: string, - runtimeState: RuntimeState, - startedAt?: Date, - endedAt?: Date - ): Promise { - const nodeExecutionList = this.buildNodeExecutions(runtimeState); - - const executionStatus = runtimeState.status; - const errorMsg = - runtimeState.nodeErrors.size > 0 - ? Array.from(runtimeState.nodeErrors.values()).join(", ") - : undefined; - - try { - const db = createDatabase(this.env.DB); - return await saveExecution(db, { - id: instanceId, - workflowId, - userId, - organizationId, - status: executionStatus as ExecutionStatusType, - nodeExecutions: nodeExecutionList, - error: errorMsg, - updatedAt: new Date(), - startedAt, - endedAt, - }); - } catch (error) { - console.error("Failed to persist execution record:", error); - // Continue without interrupting the workflow. - } - - return { - id: instanceId, - workflowId, - status: executionStatus, - nodeExecutions: nodeExecutionList, - error: errorMsg, - startedAt, - endedAt, - }; - } - - /** - * Marks nodes connected to inactive outputs as skipped. - * This is crucial for conditional logic where only one branch should execute. - */ - private markInactiveOutputNodesAsSkipped( - runtimeState: RuntimeState, - nodeIdentifier: string, - nodeOutputs: Record - ): RuntimeState { - const node = runtimeState.workflow.nodes.find( - (n) => n.id === nodeIdentifier - ); - if (!node) return runtimeState; - - // Find outputs that were NOT produced - const inactiveOutputs = node.outputs - .map((output) => output.name) - .filter((outputName) => !(outputName in nodeOutputs)); - - if (inactiveOutputs.length === 0) return runtimeState; - - // Find all edges from this node's inactive outputs - const inactiveEdges = runtimeState.workflow.edges.filter( - (edge) => - edge.source === nodeIdentifier && - inactiveOutputs.includes(edge.sourceOutput) - ); - - // Process each target node of inactive edges - for (const edge of inactiveEdges) { - this.markNodeAsSkippedIfNoValidInputs(runtimeState, edge.target); - } - - return runtimeState; - } - - /** - * Marks a node as skipped if it cannot execute due to missing required inputs. - * This is smarter than recursively skipping all dependents. - */ - private markNodeAsSkippedIfNoValidInputs( - runtimeState: RuntimeState, - nodeId: string - ): void { - if ( - runtimeState.skippedNodes.has(nodeId) || - runtimeState.executedNodes.has(nodeId) - ) { - return; // Already processed - } - - const node = runtimeState.workflow.nodes.find((n) => n.id === nodeId); - if (!node) return; - - // Check if this node has all required inputs satisfied - const allRequiredInputsSatisfied = this.nodeHasAllRequiredInputsSatisfied( - runtimeState, - nodeId - ); - - // Only skip if the node cannot execute (missing required inputs) - if (!allRequiredInputsSatisfied) { - runtimeState.skippedNodes.add(nodeId); - - // Recursively check dependents of this skipped node - const outgoingEdges = runtimeState.workflow.edges.filter( - (edge) => edge.source === nodeId - ); - - for (const edge of outgoingEdges) { - this.markNodeAsSkippedIfNoValidInputs(runtimeState, edge.target); - } - } - } - - /** - * Checks if a node has all required inputs satisfied. - * A node can execute if all its required inputs are available. - */ - private nodeHasAllRequiredInputsSatisfied( - runtimeState: RuntimeState, - nodeId: string - ): boolean { - const node = runtimeState.workflow.nodes.find((n) => n.id === nodeId); - if (!node) return false; - - // Get the node type definition to check for required inputs - const executable = this.nodeRegistry.createExecutableNode(node); - if (!executable) return false; - - const nodeTypeDefinition = (executable.constructor as any).nodeType; - if (!nodeTypeDefinition) return false; - - const inputValues = this.collectNodeInputs(runtimeState, nodeId); - - // Check each required input based on the node type definition (not workflow node definition) - for (const input of nodeTypeDefinition.inputs) { - if (input.required && inputValues[input.name] === undefined) { - return false; // Found a required input that's missing - } - } - - return true; // All required inputs are satisfied - } } diff --git a/apps/api/src/runtime/secret-manager.ts b/apps/api/src/runtime/secret-manager.ts new file mode 100644 index 00000000..7e369ee1 --- /dev/null +++ b/apps/api/src/runtime/secret-manager.ts @@ -0,0 +1,64 @@ +import type { Bindings } from "../context"; +import { createDatabase, getAllSecretsWithValues } from "../db"; + +/** + * Manages organization secrets for workflow execution. + * Handles preloading and decryption of secrets. + */ +export class SecretManager { + constructor(private env: Bindings) {} + + /** + * Preloads all organization secrets for synchronous access during workflow execution + */ + async preloadAllSecrets( + organizationId: string + ): Promise> { + const secrets: Record = {}; + const db = createDatabase(this.env.DB); + + try { + // Get all secret records for the organization (including encrypted values) + const secretRecords = await getAllSecretsWithValues(db, organizationId); + + // Decrypt each secret and add to the secrets object + for (const secretRecord of secretRecords) { + try { + const secretValue = await this.decryptSecretValue( + secretRecord.encryptedValue, + organizationId + ); + secrets[secretRecord.name] = secretValue; + } catch (error) { + console.warn( + `Failed to decrypt secret '${secretRecord.name}':`, + error + ); + } + } + + console.log( + `Preloaded ${Object.keys(secrets).length} secrets for organization ${organizationId}` + ); + } catch (error) { + console.error( + `Failed to preload secrets for organization ${organizationId}:`, + error + ); + } + + return secrets; + } + + /** + * Decrypt a secret value using organization-specific key + */ + private async decryptSecretValue( + encryptedValue: string, + organizationId: string + ): Promise { + // Import decryptSecret here to avoid circular dependency issues + const { decryptSecret } = await import("../utils/encryption"); + return await decryptSecret(encryptedValue, this.env, organizationId); + } +} From 1e99194a819eacd6ca5383d72c0b49071ced4829 Mon Sep 17 00:00:00 2001 From: Bertil Chapuis Date: Tue, 7 Oct 2025 08:53:18 +0200 Subject: [PATCH 2/2] Add unit tests for the components of the runtime --- .../conditional-execution-handler.test.ts | 459 ++++++++++++++++++ apps/api/src/runtime/credit-manager.test.ts | 205 ++++++++ .../src/runtime/execution-persistence.test.ts | 410 ++++++++++++++++ .../api/src/runtime/execution-planner.test.ts | 268 ++++++++++ .../api/src/runtime/node-input-mapper.test.ts | 414 ++++++++++++++++ .../src/runtime/node-output-mapper.test.ts | 284 +++++++++++ apps/api/src/runtime/object-store.test.ts | 455 +++++++++++++++++ 7 files changed, 2495 insertions(+) create mode 100644 apps/api/src/runtime/conditional-execution-handler.test.ts create mode 100644 apps/api/src/runtime/credit-manager.test.ts create mode 100644 apps/api/src/runtime/execution-persistence.test.ts create mode 100644 apps/api/src/runtime/execution-planner.test.ts create mode 100644 apps/api/src/runtime/node-input-mapper.test.ts create mode 100644 apps/api/src/runtime/node-output-mapper.test.ts create mode 100644 apps/api/src/runtime/object-store.test.ts diff --git a/apps/api/src/runtime/conditional-execution-handler.test.ts b/apps/api/src/runtime/conditional-execution-handler.test.ts new file mode 100644 index 00000000..e4e37f36 --- /dev/null +++ b/apps/api/src/runtime/conditional-execution-handler.test.ts @@ -0,0 +1,459 @@ +import type { Workflow } from "@dafthunk/types"; +import { describe, expect, it, vi } from "vitest"; + +import type { CloudflareNodeRegistry } from "../nodes/cloudflare-node-registry"; +import { ConditionalExecutionHandler } from "./conditional-execution-handler"; +import type { NodeInputMapper } from "./node-input-mapper"; +import type { RuntimeState, WorkflowOutputs } from "./runtime"; + +describe("ConditionalExecutionHandler", () => { + const createMockRegistry = ( + nodeTypes: Record< + string, + { inputs?: Array<{ name: string; required?: boolean }> } + > + ): CloudflareNodeRegistry => { + return { + createExecutableNode: vi.fn((node) => ({ + constructor: { + nodeType: nodeTypes[node.type], + }, + })), + } as any; + }; + + const createMockInputMapper = ( + inputResults: Record> + ): NodeInputMapper => { + return { + collectNodeInputs: vi.fn((_runtimeState, nodeId) => { + return inputResults[nodeId] || {}; + }), + } as any; + }; + + describe("markInactiveOutputNodesAsSkipped", () => { + it("should not mark any nodes when all outputs are active", () => { + const runtimeState: RuntimeState = { + workflow: { + id: "workflow-1", + name: "Test Workflow", + handle: "test-workflow", + type: "manual", + nodes: [ + { + id: "A", + type: "conditional", + inputs: [], + outputs: [ + { name: "true", type: "string" }, + { name: "false", type: "string" }, + ], + }, + { + id: "B", + type: "text", + inputs: [{ name: "input", type: "string" }], + outputs: [], + }, + ], + edges: [ + { + source: "A", + sourceOutput: "true", + target: "B", + targetInput: "input", + }, + ], + } as unknown as Workflow, + nodeOutputs: new Map([["A", { true: "yes", false: "no" }]]), + executedNodes: new Set(["A"]), + skippedNodes: new Set(), + nodeErrors: new Map(), + executionPlan: [], + status: "executing", + }; + + const registry = createMockRegistry({ + conditional: { + inputs: [], + }, + text: { inputs: [{ name: "input", required: true }] }, + }); + const inputMapper = createMockInputMapper({ + B: { input: "yes" }, + }); + const handler = new ConditionalExecutionHandler(registry, inputMapper); + + const result = handler.markInactiveOutputNodesAsSkipped( + runtimeState, + "A", + { true: "yes", false: "no" } + ); + + expect(result.skippedNodes.size).toBe(0); + }); + + it("should mark nodes connected to inactive outputs as skipped", () => { + const runtimeState: RuntimeState = { + workflow: { + id: "workflow-2", + name: "Test Workflow", + handle: "test-workflow", + type: "manual", + nodes: [ + { + id: "A", + type: "conditional", + inputs: [], + outputs: [ + { name: "true", type: "string" }, + { name: "false", type: "string" }, + ], + }, + { + id: "B", + type: "text", + inputs: [{ name: "input", type: "string", required: true }], + outputs: [], + }, + { + id: "C", + type: "text", + inputs: [{ name: "input", type: "string", required: true }], + outputs: [], + }, + ], + edges: [ + { + source: "A", + sourceOutput: "true", + target: "B", + targetInput: "input", + }, + { + source: "A", + sourceOutput: "false", + target: "C", + targetInput: "input", + }, + ], + } as unknown as Workflow, + nodeOutputs: new Map([["A", { true: "yes" }]]), // only "true" output + executedNodes: new Set(["A"]), + skippedNodes: new Set(), + nodeErrors: new Map(), + executionPlan: [], + status: "executing", + }; + + const registry = createMockRegistry({ + conditional: { inputs: [] }, + text: { inputs: [{ name: "input", required: true }] }, + }); + const inputMapper = createMockInputMapper({ + B: { input: "yes" }, + C: {}, // No input from A's false output + }); + const handler = new ConditionalExecutionHandler(registry, inputMapper); + + const result = handler.markInactiveOutputNodesAsSkipped( + runtimeState, + "A", + { true: "yes" } // "false" output is inactive + ); + + expect(result.skippedNodes.has("C")).toBe(true); + expect(result.skippedNodes.has("B")).toBe(false); + }); + + it("should recursively skip dependent nodes", () => { + const runtimeState: RuntimeState = { + workflow: { + id: "workflow-3", + name: "Test Workflow", + handle: "test-workflow", + type: "manual", + nodes: [ + { + id: "A", + type: "conditional", + inputs: [], + outputs: [ + { name: "true", type: "string" }, + { name: "false", type: "string" }, + ], + }, + { + id: "B", + type: "text", + inputs: [{ name: "input", type: "string", required: true }], + outputs: [{ name: "output", type: "string" }], + }, + { + id: "C", + type: "text", + inputs: [{ name: "input", type: "string", required: true }], + outputs: [], + }, + ], + edges: [ + { + source: "A", + sourceOutput: "false", + target: "B", + targetInput: "input", + }, + { + source: "B", + sourceOutput: "output", + target: "C", + targetInput: "input", + }, + ], + } as unknown as Workflow, + nodeOutputs: new Map([["A", { true: "yes" }]]), + executedNodes: new Set(["A"]), + skippedNodes: new Set(), + nodeErrors: new Map(), + executionPlan: [], + status: "executing", + }; + + const registry = createMockRegistry({ + conditional: { inputs: [] }, + text: { inputs: [{ name: "input", required: true }] }, + }); + const inputMapper = createMockInputMapper({ + B: {}, // No input + C: {}, // No input (because B will be skipped) + }); + const handler = new ConditionalExecutionHandler(registry, inputMapper); + + const result = handler.markInactiveOutputNodesAsSkipped( + runtimeState, + "A", + { true: "yes" } + ); + + // Both B and C should be skipped + expect(result.skippedNodes.has("B")).toBe(true); + expect(result.skippedNodes.has("C")).toBe(true); + }); + + it("should not skip nodes with optional inputs", () => { + const runtimeState: RuntimeState = { + workflow: { + id: "workflow-4", + name: "Test Workflow", + handle: "test-workflow", + type: "manual", + nodes: [ + { + id: "A", + type: "conditional", + inputs: [], + outputs: [ + { name: "true", type: "string" }, + { name: "false", type: "string" }, + ], + }, + { + id: "B", + type: "text", + inputs: [{ name: "input", type: "string", required: false }], + outputs: [], + }, + ], + edges: [ + { + source: "A", + sourceOutput: "false", + target: "B", + targetInput: "input", + }, + ], + } as unknown as Workflow, + nodeOutputs: new Map([["A", { true: "yes" }]]), + executedNodes: new Set(["A"]), + skippedNodes: new Set(), + nodeErrors: new Map(), + executionPlan: [], + status: "executing", + }; + + const registry = createMockRegistry({ + conditional: { inputs: [] }, + text: { inputs: [{ name: "input", required: false }] }, // Optional! + }); + const inputMapper = createMockInputMapper({ + B: {}, // No input, but it's optional + }); + const handler = new ConditionalExecutionHandler(registry, inputMapper); + + const result = handler.markInactiveOutputNodesAsSkipped( + runtimeState, + "A", + { true: "yes" } + ); + + // B should NOT be skipped because input is optional + expect(result.skippedNodes.has("B")).toBe(false); + }); + + it("should not skip nodes with alternative valid inputs", () => { + const runtimeState: RuntimeState = { + workflow: { + id: "workflow-5", + name: "Test Workflow", + handle: "test-workflow", + type: "manual", + nodes: [ + { + id: "A", + type: "conditional", + inputs: [], + outputs: [ + { name: "true", type: "string" }, + { name: "false", type: "string" }, + ], + }, + { + id: "B", + type: "text", + inputs: [], + outputs: [{ name: "output", type: "string" }], + }, + { + id: "C", + type: "merge", + inputs: [ + { name: "input1", type: "string", required: true }, + { name: "input2", type: "string", required: false }, + ], + outputs: [], + }, + ], + edges: [ + { + source: "A", + sourceOutput: "false", + target: "C", + targetInput: "input2", + }, + { + source: "B", + sourceOutput: "output", + target: "C", + targetInput: "input1", + }, + ], + } as unknown as Workflow, + nodeOutputs: new Map([ + ["A", { true: "yes" }], + ["B", { output: "from B" }], + ]) as unknown as WorkflowOutputs, + executedNodes: new Set(["A", "B"]), + skippedNodes: new Set(), + nodeErrors: new Map(), + executionPlan: [], + status: "executing", + }; + + const registry = createMockRegistry({ + conditional: { inputs: [] }, + text: { inputs: [] }, + merge: { + inputs: [ + { name: "input1", required: true }, + { name: "input2", required: false }, + ], + }, + }); + const inputMapper = createMockInputMapper({ + C: { input1: "from B" }, // Has required input from B + }); + const handler = new ConditionalExecutionHandler(registry, inputMapper); + + const result = handler.markInactiveOutputNodesAsSkipped( + runtimeState, + "A", + { true: "yes" } + ); + + // C should NOT be skipped because it has input1 from B + expect(result.skippedNodes.has("C")).toBe(false); + }); + + it("should handle node not found", () => { + const runtimeState: RuntimeState = { + workflow: { + id: "workflow-6", + name: "Test Workflow", + handle: "test-workflow", + type: "manual", + nodes: [], + edges: [], + } as unknown as Workflow, + nodeOutputs: new Map(), + executedNodes: new Set(), + skippedNodes: new Set(), + nodeErrors: new Map(), + executionPlan: [], + status: "executing", + }; + + const registry = createMockRegistry({}); + const inputMapper = createMockInputMapper({}); + const handler = new ConditionalExecutionHandler(registry, inputMapper); + + const result = handler.markInactiveOutputNodesAsSkipped( + runtimeState, + "NonExistent", + {} + ); + + expect(result).toBe(runtimeState); + }); + + it("should handle node with no outputs", () => { + const runtimeState: RuntimeState = { + workflow: { + id: "workflow-7", + name: "Test Workflow", + handle: "test-workflow", + type: "manual", + nodes: [ + { + id: "A", + type: "text", + inputs: [], + outputs: [], + }, + ], + edges: [], + } as unknown as Workflow, + nodeOutputs: new Map([["A", {}]]), + executedNodes: new Set(["A"]), + skippedNodes: new Set(), + nodeErrors: new Map(), + executionPlan: [], + status: "executing", + }; + + const registry = createMockRegistry({ + text: { inputs: [] }, + }); + const inputMapper = createMockInputMapper({}); + const handler = new ConditionalExecutionHandler(registry, inputMapper); + + const result = handler.markInactiveOutputNodesAsSkipped( + runtimeState, + "A", + {} + ); + + expect(result.skippedNodes.size).toBe(0); + }); + }); +}); diff --git a/apps/api/src/runtime/credit-manager.test.ts b/apps/api/src/runtime/credit-manager.test.ts new file mode 100644 index 00000000..cfae9ecf --- /dev/null +++ b/apps/api/src/runtime/credit-manager.test.ts @@ -0,0 +1,205 @@ +import type { Node } from "@dafthunk/types"; +import { describe, expect, it, vi } from "vitest"; + +import type { Bindings } from "../context"; +import type { CloudflareNodeRegistry } from "../nodes/cloudflare-node-registry"; +import { CreditManager } from "./credit-manager"; + +// Mock the credits utility +vi.mock("../utils/credits", () => ({ + getOrganizationComputeUsage: vi.fn(), +})); + +import { getOrganizationComputeUsage } from "../utils/credits"; + +describe("CreditManager", () => { + const createMockEnv = (cloudflareEnv?: string): Bindings => { + return { + CLOUDFLARE_ENV: cloudflareEnv, + KV: {} as any, + } as Bindings; + }; + + const createMockRegistry = ( + nodeTypes: Record + ): CloudflareNodeRegistry => { + return { + getNodeType: vi.fn((type: string) => nodeTypes[type] || {}), + } as any; + }; + + describe("hasEnoughComputeCredits", () => { + it("should always return true in development mode", async () => { + const env = createMockEnv("development"); + const registry = createMockRegistry({}); + const manager = new CreditManager(env, registry); + + const result = await manager.hasEnoughComputeCredits( + "org-123", + 100, // computeCredits + 200 // computeCost (exceeds credits) + ); + + expect(result).toBe(true); + expect(getOrganizationComputeUsage).not.toHaveBeenCalled(); + }); + + it("should return true when credits are sufficient", async () => { + const env = createMockEnv("production"); + const registry = createMockRegistry({}); + const manager = new CreditManager(env, registry); + + vi.mocked(getOrganizationComputeUsage).mockResolvedValue(50); // current usage + + const result = await manager.hasEnoughComputeCredits( + "org-123", + 100, // total credits + 30 // additional cost needed + ); + + expect(result).toBe(true); // 50 + 30 = 80 <= 100 + }); + + it("should return false when credits are insufficient", async () => { + const env = createMockEnv("production"); + const registry = createMockRegistry({}); + const manager = new CreditManager(env, registry); + + vi.mocked(getOrganizationComputeUsage).mockResolvedValue(80); // current usage + + const result = await manager.hasEnoughComputeCredits( + "org-123", + 100, // total credits + 30 // additional cost needed + ); + + expect(result).toBe(false); // 80 + 30 = 110 > 100 + }); + + it("should return true when exactly at credit limit", async () => { + const env = createMockEnv("production"); + const registry = createMockRegistry({}); + const manager = new CreditManager(env, registry); + + vi.mocked(getOrganizationComputeUsage).mockResolvedValue(70); // current usage + + const result = await manager.hasEnoughComputeCredits( + "org-123", + 100, // total credits + 30 // additional cost needed + ); + + expect(result).toBe(true); // 70 + 30 = 100 == 100 + }); + + it("should handle zero current usage", async () => { + const env = createMockEnv("production"); + const registry = createMockRegistry({}); + const manager = new CreditManager(env, registry); + + vi.mocked(getOrganizationComputeUsage).mockResolvedValue(0); + + const result = await manager.hasEnoughComputeCredits("org-123", 100, 50); + + expect(result).toBe(true); // 0 + 50 = 50 <= 100 + }); + }); + + describe("getNodesComputeCost", () => { + it("should calculate total cost for multiple nodes", () => { + const registry = createMockRegistry({ + text: { computeCost: 1 }, + ai: { computeCost: 10 }, + image: { computeCost: 5 }, + }); + const manager = new CreditManager({} as Bindings, registry); + + const nodes: Node[] = [ + { id: "A", type: "text", inputs: [], outputs: [] }, + { id: "B", type: "ai", inputs: [], outputs: [] }, + { id: "C", type: "image", inputs: [], outputs: [] }, + ] as unknown as Node[]; + + const result = manager.getNodesComputeCost(nodes); + + expect(result).toBe(16); // 1 + 10 + 5 + }); + + it("should use default cost of 1 when computeCost not specified", () => { + const registry = createMockRegistry({ + text: {}, // no computeCost specified + unknown: {}, // no computeCost specified + }); + const manager = new CreditManager({} as Bindings, registry); + + const nodes: Node[] = [ + { id: "A", type: "text", inputs: [], outputs: [] }, + { id: "B", type: "unknown", inputs: [], outputs: [] }, + ] as unknown as Node[]; + + const result = manager.getNodesComputeCost(nodes); + + expect(result).toBe(2); // 1 + 1 (defaults) + }); + + it("should handle empty node list", () => { + const registry = createMockRegistry({}); + const manager = new CreditManager({} as Bindings, registry); + + const result = manager.getNodesComputeCost([]); + + expect(result).toBe(0); + }); + + it("should handle nodes with zero cost", () => { + const registry = createMockRegistry({ + free: { computeCost: 0 }, + }); + const manager = new CreditManager({} as Bindings, registry); + + const nodes: Node[] = [ + { id: "A", type: "free", inputs: [], outputs: [] }, + { id: "B", type: "free", inputs: [], outputs: [] }, + ] as unknown as Node[]; + + const result = manager.getNodesComputeCost(nodes); + + expect(result).toBe(0); + }); + + it("should handle single node", () => { + const registry = createMockRegistry({ + expensive: { computeCost: 100 }, + }); + const manager = new CreditManager({} as Bindings, registry); + + const nodes: Node[] = [ + { id: "A", type: "expensive", inputs: [], outputs: [] }, + ] as unknown as Node[]; + + const result = manager.getNodesComputeCost(nodes); + + expect(result).toBe(100); + }); + + it("should sum costs correctly for many nodes", () => { + const registry = createMockRegistry({ + type1: { computeCost: 3 }, + type2: { computeCost: 7 }, + }); + const manager = new CreditManager({} as Bindings, registry); + + const nodes: Node[] = Array.from({ length: 10 }, (_, i) => ({ + id: `node-${i}`, + type: i % 2 === 0 ? "type1" : "type2", + inputs: [], + outputs: [], + })) as unknown as Node[]; + + const result = manager.getNodesComputeCost(nodes); + + // 5 nodes of type1 (3 each) + 5 nodes of type2 (7 each) = 15 + 35 = 50 + expect(result).toBe(50); + }); + }); +}); diff --git a/apps/api/src/runtime/execution-persistence.test.ts b/apps/api/src/runtime/execution-persistence.test.ts new file mode 100644 index 00000000..1add405a --- /dev/null +++ b/apps/api/src/runtime/execution-persistence.test.ts @@ -0,0 +1,410 @@ +import type { Workflow, WorkflowExecution } from "@dafthunk/types"; +import { describe, expect, it, vi } from "vitest"; + +import type { Bindings } from "../context"; +import { ExecutionPersistence } from "./execution-persistence"; +import type { RuntimeState } from "./runtime"; + +// Mock the db module +vi.mock("../db", () => ({ + createDatabase: vi.fn(() => ({})), + saveExecution: vi.fn(async (_db, execution) => execution), +})); + +import { saveExecution } from "../db"; + +describe("ExecutionPersistence", () => { + const createMockEnv = (): Bindings => { + const mockFetch = vi.fn().mockResolvedValue({ ok: true }); + const mockGet = vi.fn().mockReturnValue({ fetch: mockFetch }); + const mockIdFromName = vi.fn().mockReturnValue("mock-id"); + + return { + DB: {} as any, + WORKFLOW_SESSION: { + idFromName: mockIdFromName, + get: mockGet, + } as any, + } as Bindings; + }; + + describe("buildNodeExecutions", () => { + it("should build execution list with completed nodes", () => { + const runtimeState: RuntimeState = { + workflow: { + id: "workflow-1", + name: "Test Workflow", + handle: "test-workflow", + type: "manual", + nodes: [ + { + id: "A", + type: "text", + inputs: [], + outputs: [{ name: "result", type: "string" }], + }, + { + id: "B", + type: "text", + inputs: [], + outputs: [{ name: "result", type: "string" }], + }, + ], + edges: [], + } as unknown as Workflow, + nodeOutputs: new Map([ + ["A", { result: "output A" }], + ["B", { result: "output B" }], + ]), + executedNodes: new Set(["A", "B"]), + skippedNodes: new Set(), + nodeErrors: new Map(), + executionPlan: [], + status: "completed", + }; + + const persistence = new ExecutionPersistence(createMockEnv()); + const result = persistence.buildNodeExecutions(runtimeState); + + expect(result).toEqual([ + { + nodeId: "A", + status: "completed", + outputs: { result: "output A" }, + }, + { + nodeId: "B", + status: "completed", + outputs: { result: "output B" }, + }, + ]); + }); + + it("should build execution list with error nodes", () => { + const runtimeState: RuntimeState = { + workflow: { + id: "workflow-2", + name: "Test Workflow", + handle: "test-workflow", + type: "manual", + nodes: [ + { + id: "A", + type: "text", + inputs: [], + outputs: [], + }, + { + id: "B", + type: "text", + inputs: [], + outputs: [], + }, + ], + edges: [], + } as unknown as Workflow, + nodeOutputs: new Map([["A", {}]]), + executedNodes: new Set(["A"]), + skippedNodes: new Set(), + nodeErrors: new Map([["B", "Something went wrong"]]), + executionPlan: [], + status: "error", + }; + + const persistence = new ExecutionPersistence(createMockEnv()); + const result = persistence.buildNodeExecutions(runtimeState); + + expect(result).toEqual([ + { + nodeId: "A", + status: "completed", + outputs: {}, + }, + { + nodeId: "B", + status: "error", + error: "Something went wrong", + }, + ]); + }); + + it("should build execution list with skipped nodes", () => { + const runtimeState: RuntimeState = { + workflow: { + id: "workflow-3", + name: "Test Workflow", + handle: "test-workflow", + type: "manual", + nodes: [ + { + id: "A", + type: "text", + inputs: [], + outputs: [], + }, + { + id: "B", + type: "text", + inputs: [], + outputs: [], + }, + ], + edges: [], + } as unknown as Workflow, + nodeOutputs: new Map([["A", {}]]), + executedNodes: new Set(["A"]), + skippedNodes: new Set(["B"]), + nodeErrors: new Map(), + executionPlan: [], + status: "completed", + }; + + const persistence = new ExecutionPersistence(createMockEnv()); + const result = persistence.buildNodeExecutions(runtimeState); + + expect(result).toEqual([ + { + nodeId: "A", + status: "completed", + outputs: {}, + }, + { + nodeId: "B", + status: "skipped", + }, + ]); + }); + + it("should mark unprocessed nodes as executing", () => { + const runtimeState: RuntimeState = { + workflow: { + id: "workflow-4", + name: "Test Workflow", + handle: "test-workflow", + type: "manual", + nodes: [ + { + id: "A", + type: "text", + inputs: [], + outputs: [], + }, + { + id: "B", + type: "text", + inputs: [], + outputs: [], + }, + ], + edges: [], + } as unknown as Workflow, + nodeOutputs: new Map([["A", {}]]), + executedNodes: new Set(["A"]), + skippedNodes: new Set(), + nodeErrors: new Map(), + executionPlan: [], + status: "executing", + }; + + const persistence = new ExecutionPersistence(createMockEnv()); + const result = persistence.buildNodeExecutions(runtimeState); + + expect(result).toEqual([ + { + nodeId: "A", + status: "completed", + outputs: {}, + }, + { + nodeId: "B", + status: "executing", + }, + ]); + }); + }); + + describe("saveExecutionState", () => { + it("should save execution state to database", async () => { + const runtimeState: RuntimeState = { + workflow: { + id: "workflow-123", + name: "Test Workflow", + handle: "test-workflow", + type: "manual", + nodes: [ + { + id: "A", + type: "text", + inputs: [], + outputs: [], + }, + ], + edges: [], + } as unknown as Workflow, + nodeOutputs: new Map([["A", {}]]), + executedNodes: new Set(["A"]), + skippedNodes: new Set(), + nodeErrors: new Map(), + executionPlan: [], + status: "completed", + }; + + const persistence = new ExecutionPersistence(createMockEnv()); + const startedAt = new Date("2024-01-01T00:00:00Z"); + const endedAt = new Date("2024-01-01T00:01:00Z"); + + const result = await persistence.saveExecutionState( + "user-123", + "org-123", + "workflow-123", + "exec-456", + runtimeState, + startedAt, + endedAt + ); + + expect(saveExecution).toHaveBeenCalledWith( + {}, + expect.objectContaining({ + id: "exec-456", + workflowId: "workflow-123", + userId: "user-123", + organizationId: "org-123", + status: "completed", + startedAt, + endedAt, + }) + ); + + expect(result.id).toBe("exec-456"); + expect(result.status).toBe("completed"); + }); + + it("should handle errors with multiple node errors", async () => { + const runtimeState: RuntimeState = { + workflow: { + id: "workflow-123", + name: "Test Workflow", + handle: "test-workflow", + type: "manual", + nodes: [ + { + id: "A", + type: "text", + inputs: [], + outputs: [], + }, + { + id: "B", + type: "text", + inputs: [], + outputs: [], + }, + ], + edges: [], + } as unknown as Workflow, + nodeOutputs: new Map(), + executedNodes: new Set(), + skippedNodes: new Set(), + nodeErrors: new Map([ + ["A", "Error 1"], + ["B", "Error 2"], + ]), + executionPlan: [], + status: "error", + }; + + const persistence = new ExecutionPersistence(createMockEnv()); + + const result = await persistence.saveExecutionState( + "user-123", + "org-123", + "workflow-123", + "exec-456", + runtimeState + ); + + expect(result.error).toBe("Error 1, Error 2"); + }); + + it("should handle database save failure gracefully", async () => { + const runtimeState: RuntimeState = { + workflow: { + id: "workflow-123", + name: "Test Workflow", + handle: "test-workflow", + type: "manual", + nodes: [], + edges: [], + } as unknown as Workflow, + nodeOutputs: new Map(), + executedNodes: new Set(), + skippedNodes: new Set(), + nodeErrors: new Map(), + executionPlan: [], + status: "completed", + }; + + vi.mocked(saveExecution).mockRejectedValueOnce( + new Error("Database error") + ); + + const persistence = new ExecutionPersistence(createMockEnv()); + + const result = await persistence.saveExecutionState( + "user-123", + "org-123", + "workflow-123", + "exec-456", + runtimeState + ); + + // Should return execution record even when database fails + expect(result.id).toBe("exec-456"); + expect(result.workflowId).toBe("workflow-123"); + }); + }); + + describe("sendExecutionUpdateToSession", () => { + it("should send execution update via WebSocket", async () => { + const env = createMockEnv(); + const persistence = new ExecutionPersistence(env); + + const execution: WorkflowExecution = { + id: "exec-123", + workflowId: "workflow-456", + status: "executing", + nodeExecutions: [], + }; + + await persistence.sendExecutionUpdateToSession("session-789", execution); + + expect(env.WORKFLOW_SESSION.idFromName).toHaveBeenCalledWith( + "session-789" + ); + expect(env.WORKFLOW_SESSION.get).toHaveBeenCalledWith("mock-id"); + }); + + it("should handle WebSocket send failure gracefully", async () => { + const env = createMockEnv(); + const mockGet = env.WORKFLOW_SESSION.get as any; + mockGet.mockReturnValue({ + fetch: vi.fn().mockRejectedValue(new Error("Connection failed")), + }); + + const persistence = new ExecutionPersistence(env); + + const execution: WorkflowExecution = { + id: "exec-123", + workflowId: "workflow-456", + status: "executing", + nodeExecutions: [], + }; + + // Should not throw + await expect( + persistence.sendExecutionUpdateToSession("session-789", execution) + ).resolves.toBeUndefined(); + }); + }); +}); diff --git a/apps/api/src/runtime/execution-planner.test.ts b/apps/api/src/runtime/execution-planner.test.ts new file mode 100644 index 00000000..ae01d577 --- /dev/null +++ b/apps/api/src/runtime/execution-planner.test.ts @@ -0,0 +1,268 @@ +import type { Workflow } from "@dafthunk/types"; +import { describe, expect, it, vi } from "vitest"; + +import type { CloudflareNodeRegistry } from "../nodes/cloudflare-node-registry"; +import { ExecutionPlanner } from "./execution-planner"; + +describe("ExecutionPlanner", () => { + const createMockRegistry = ( + nodeTypes: Record + ): CloudflareNodeRegistry => { + return { + getNodeType: vi.fn((type: string) => nodeTypes[type] || {}), + } as any; + }; + + describe("createTopologicalOrder", () => { + it("should create correct topological order for simple linear workflow", () => { + const workflow: Workflow = { + id: "test", + name: "test", + nodes: [ + { id: "A", type: "text", inputs: [], outputs: [] }, + { id: "B", type: "text", inputs: [], outputs: [] }, + { id: "C", type: "text", inputs: [], outputs: [] }, + ], + edges: [ + { source: "A", sourceOutput: "out", target: "B", targetInput: "in" }, + { source: "B", sourceOutput: "out", target: "C", targetInput: "in" }, + ], + } as unknown as Workflow; + + const registry = createMockRegistry({}); + const planner = new ExecutionPlanner(registry); + + const result = planner.createTopologicalOrder(workflow); + + expect(result).toEqual(["A", "B", "C"]); + }); + + it("should handle branching workflows (fan-out)", () => { + const workflow: Workflow = { + id: "test", + name: "test", + nodes: [ + { id: "A", type: "text", inputs: [], outputs: [] }, + { id: "B", type: "text", inputs: [], outputs: [] }, + { id: "C", type: "text", inputs: [], outputs: [] }, + ], + edges: [ + { source: "A", sourceOutput: "out", target: "B", targetInput: "in" }, + { source: "A", sourceOutput: "out", target: "C", targetInput: "in" }, + ], + } as unknown as Workflow; + + const registry = createMockRegistry({}); + const planner = new ExecutionPlanner(registry); + + const result = planner.createTopologicalOrder(workflow); + + expect(result[0]).toBe("A"); + expect(result).toContain("B"); + expect(result).toContain("C"); + expect(result).toHaveLength(3); + }); + + it("should handle merging workflows (fan-in)", () => { + const workflow: Workflow = { + id: "test", + name: "test", + nodes: [ + { id: "A", type: "text", inputs: [], outputs: [] }, + { id: "B", type: "text", inputs: [], outputs: [] }, + { id: "C", type: "text", inputs: [], outputs: [] }, + ], + edges: [ + { source: "A", sourceOutput: "out", target: "C", targetInput: "in1" }, + { source: "B", sourceOutput: "out", target: "C", targetInput: "in2" }, + ], + } as unknown as Workflow; + + const registry = createMockRegistry({}); + const planner = new ExecutionPlanner(registry); + + const result = planner.createTopologicalOrder(workflow); + + expect(result[2]).toBe("C"); + expect(result).toContain("A"); + expect(result).toContain("B"); + expect(result).toHaveLength(3); + }); + + it("should detect cycles and return empty array", () => { + const workflow: Workflow = { + id: "test", + name: "test", + nodes: [ + { id: "A", type: "text", inputs: [], outputs: [] }, + { id: "B", type: "text", inputs: [], outputs: [] }, + { id: "C", type: "text", inputs: [], outputs: [] }, + ], + edges: [ + { source: "A", sourceOutput: "out", target: "B", targetInput: "in" }, + { source: "B", sourceOutput: "out", target: "C", targetInput: "in" }, + { source: "C", sourceOutput: "out", target: "A", targetInput: "in" }, + ], + } as unknown as Workflow; + + const registry = createMockRegistry({}); + const planner = new ExecutionPlanner(registry); + + const result = planner.createTopologicalOrder(workflow); + + expect(result).toEqual([]); + }); + + it("should handle workflow with no edges", () => { + const workflow: Workflow = { + id: "test", + name: "test", + nodes: [ + { id: "A", type: "text", inputs: [], outputs: [] }, + { id: "B", type: "text", inputs: [], outputs: [] }, + ], + edges: [], + } as unknown as Workflow; + + const registry = createMockRegistry({}); + const planner = new ExecutionPlanner(registry); + + const result = planner.createTopologicalOrder(workflow); + + expect(result).toHaveLength(2); + expect(result).toContain("A"); + expect(result).toContain("B"); + }); + }); + + describe("createExecutionPlan", () => { + it("should create individual execution units for non-inlinable nodes", () => { + const workflow: Workflow = { + id: "test", + name: "test", + nodes: [ + { id: "A", type: "text", inputs: [], outputs: [] }, + { id: "B", type: "text", inputs: [], outputs: [] }, + ], + edges: [ + { source: "A", sourceOutput: "out", target: "B", targetInput: "in" }, + ], + } as unknown as Workflow; + + const registry = createMockRegistry({ + text: { inlinable: false }, + }); + const planner = new ExecutionPlanner(registry); + + const ordered = ["A", "B"]; + const result = planner.createExecutionPlan(workflow, ordered); + + expect(result).toEqual([ + { type: "individual", nodeId: "A" }, + { type: "individual", nodeId: "B" }, + ]); + }); + + it("should group consecutive inlinable nodes", () => { + const workflow: Workflow = { + id: "test", + name: "test", + nodes: [ + { id: "A", type: "math", inputs: [], outputs: [] }, + { id: "B", type: "math", inputs: [], outputs: [] }, + { id: "C", type: "math", inputs: [], outputs: [] }, + ], + edges: [ + { source: "A", sourceOutput: "out", target: "B", targetInput: "in" }, + { source: "B", sourceOutput: "out", target: "C", targetInput: "in" }, + ], + } as unknown as Workflow; + + const registry = createMockRegistry({ + math: { inlinable: true }, + }); + const planner = new ExecutionPlanner(registry); + + const ordered = ["A", "B", "C"]; + const result = planner.createExecutionPlan(workflow, ordered); + + expect(result).toEqual([{ type: "inline", nodeIds: ["A", "B", "C"] }]); + }); + + it("should handle mixed inlinable and non-inlinable nodes", () => { + const workflow: Workflow = { + id: "test", + name: "test", + nodes: [ + { id: "A", type: "math", inputs: [], outputs: [] }, + { id: "B", type: "ai", inputs: [], outputs: [] }, + { id: "C", type: "math", inputs: [], outputs: [] }, + ], + edges: [ + { source: "A", sourceOutput: "out", target: "B", targetInput: "in" }, + { source: "B", sourceOutput: "out", target: "C", targetInput: "in" }, + ], + } as unknown as Workflow; + + const registry = createMockRegistry({ + math: { inlinable: true }, + ai: { inlinable: false }, + }); + const planner = new ExecutionPlanner(registry); + + const ordered = ["A", "B", "C"]; + const result = planner.createExecutionPlan(workflow, ordered); + + expect(result).toEqual([ + { type: "individual", nodeId: "A" }, + { type: "individual", nodeId: "B" }, + { type: "individual", nodeId: "C" }, + ]); + }); + + it("should group fan-out pattern of inlinable nodes", () => { + const workflow: Workflow = { + id: "test", + name: "test", + nodes: [ + { id: "A", type: "math", inputs: [], outputs: [] }, + { id: "B", type: "math", inputs: [], outputs: [] }, + { id: "C", type: "math", inputs: [], outputs: [] }, + ], + edges: [ + { source: "A", sourceOutput: "out", target: "B", targetInput: "in" }, + { source: "A", sourceOutput: "out", target: "C", targetInput: "in" }, + ], + } as unknown as Workflow; + + const registry = createMockRegistry({ + math: { inlinable: true }, + }); + const planner = new ExecutionPlanner(registry); + + const ordered = ["A", "B", "C"]; + const result = planner.createExecutionPlan(workflow, ordered); + + expect(result).toEqual([{ type: "inline", nodeIds: ["A", "B", "C"] }]); + }); + + it("should handle single inlinable node as individual", () => { + const workflow: Workflow = { + id: "test", + name: "test", + nodes: [{ id: "A", type: "math", inputs: [], outputs: [] }], + edges: [], + } as unknown as Workflow; + + const registry = createMockRegistry({ + math: { inlinable: true }, + }); + const planner = new ExecutionPlanner(registry); + + const ordered = ["A"]; + const result = planner.createExecutionPlan(workflow, ordered); + + expect(result).toEqual([{ type: "individual", nodeId: "A" }]); + }); + }); +}); diff --git a/apps/api/src/runtime/node-input-mapper.test.ts b/apps/api/src/runtime/node-input-mapper.test.ts new file mode 100644 index 00000000..6367b486 --- /dev/null +++ b/apps/api/src/runtime/node-input-mapper.test.ts @@ -0,0 +1,414 @@ +import type { Workflow } from "@dafthunk/types"; +import { describe, expect, it, vi } from "vitest"; + +import type { CloudflareNodeRegistry } from "../nodes/cloudflare-node-registry"; +import { NodeInputMapper } from "./node-input-mapper"; +import type { ObjectStore } from "./object-store"; +import type { RuntimeState } from "./runtime"; + +describe("NodeInputMapper", () => { + const createMockRegistry = ( + nodeTypes: Record< + string, + { + inputs?: Array<{ + name: string; + repeated?: boolean; + required?: boolean; + }>; + } + > + ): CloudflareNodeRegistry => { + return { + createExecutableNode: vi.fn((node) => ({ + constructor: { + nodeType: nodeTypes[node.type], + }, + })), + } as any; + }; + + describe("collectNodeInputs", () => { + it("should collect default values from node definition", () => { + const runtimeState: RuntimeState = { + workflow: { + id: "workflow-1", + name: "Test Workflow", + handle: "test-workflow", + type: "manual", + nodes: [ + { + id: "A", + type: "text", + inputs: [ + { name: "text", type: "string", value: "hello" }, + { name: "count", type: "number", value: 42 }, + ], + outputs: [], + }, + ], + edges: [], + } as unknown as Workflow, + nodeOutputs: new Map(), + executedNodes: new Set(), + skippedNodes: new Set(), + nodeErrors: new Map(), + executionPlan: [], + status: "executing", + }; + + const registry = createMockRegistry({ + text: { + inputs: [ + { name: "text", required: false }, + { name: "count", required: false }, + ], + }, + }); + const mapper = new NodeInputMapper(registry); + + const result = mapper.collectNodeInputs(runtimeState, "A"); + + expect(result).toEqual({ + text: "hello", + count: 42, + }); + }); + + it("should collect values from connected nodes", () => { + const runtimeState: RuntimeState = { + workflow: { + id: "workflow-2", + name: "Test Workflow", + handle: "test-workflow", + type: "manual", + nodes: [ + { + id: "A", + type: "text", + inputs: [], + outputs: [{ name: "result", type: "string" }], + }, + { + id: "B", + type: "text", + inputs: [{ name: "input", type: "string" }], + outputs: [], + }, + ], + edges: [ + { + source: "A", + sourceOutput: "result", + target: "B", + targetInput: "input", + }, + ], + } as unknown as Workflow, + nodeOutputs: new Map([["A", { result: "test value" }]]), + executedNodes: new Set(["A"]), + skippedNodes: new Set(), + nodeErrors: new Map(), + executionPlan: [], + status: "executing", + }; + + const registry = createMockRegistry({ + text: { inputs: [{ name: "input", required: false }] }, + }); + const mapper = new NodeInputMapper(registry); + + const result = mapper.collectNodeInputs(runtimeState, "B"); + + expect(result).toEqual({ + input: "test value", + }); + }); + + it("should handle repeated parameters as array", () => { + const runtimeState: RuntimeState = { + workflow: { + id: "workflow-3", + name: "Test Workflow", + handle: "test-workflow", + type: "manual", + nodes: [ + { + id: "A", + type: "text", + inputs: [], + outputs: [{ name: "out", type: "string" }], + }, + { + id: "B", + type: "text", + inputs: [], + outputs: [{ name: "out", type: "string" }], + }, + { + id: "C", + type: "merge", + inputs: [{ name: "values", type: "string" }], + outputs: [], + }, + ], + edges: [ + { + source: "A", + sourceOutput: "out", + target: "C", + targetInput: "values", + }, + { + source: "B", + sourceOutput: "out", + target: "C", + targetInput: "values", + }, + ], + } as unknown as Workflow, + nodeOutputs: new Map([ + ["A", { out: "value1" }], + ["B", { out: "value2" }], + ]), + executedNodes: new Set(["A", "B"]), + skippedNodes: new Set(), + nodeErrors: new Map(), + executionPlan: [], + status: "executing", + }; + + const registry = createMockRegistry({ + merge: { inputs: [{ name: "values", repeated: true }] }, + }); + const mapper = new NodeInputMapper(registry); + + const result = mapper.collectNodeInputs(runtimeState, "C"); + + expect(result).toEqual({ + values: ["value1", "value2"], + }); + }); + + it("should use last value for non-repeated parameters with multiple connections", () => { + const runtimeState: RuntimeState = { + workflow: { + id: "workflow-4", + name: "Test Workflow", + handle: "test-workflow", + type: "manual", + nodes: [ + { + id: "A", + type: "text", + inputs: [], + outputs: [{ name: "out", type: "string" }], + }, + { + id: "B", + type: "text", + inputs: [], + outputs: [{ name: "out", type: "string" }], + }, + { + id: "C", + type: "text", + inputs: [{ name: "input", type: "string" }], + outputs: [], + }, + ], + edges: [ + { + source: "A", + sourceOutput: "out", + target: "C", + targetInput: "input", + }, + { + source: "B", + sourceOutput: "out", + target: "C", + targetInput: "input", + }, + ], + } as unknown as Workflow, + nodeOutputs: new Map([ + ["A", { out: "value1" }], + ["B", { out: "value2" }], + ]), + executedNodes: new Set(["A", "B"]), + skippedNodes: new Set(), + nodeErrors: new Map(), + executionPlan: [], + status: "executing", + }; + + const registry = createMockRegistry({ + text: { inputs: [{ name: "input", repeated: false }] }, + }); + const mapper = new NodeInputMapper(registry); + + const result = mapper.collectNodeInputs(runtimeState, "C"); + + expect(result).toEqual({ + input: "value2", // Last value + }); + }); + + it("should return empty object for node not found", () => { + const runtimeState: RuntimeState = { + workflow: { + id: "workflow-5", + name: "Test Workflow", + handle: "test-workflow", + type: "manual", + nodes: [], + edges: [], + } as unknown as Workflow, + nodeOutputs: new Map(), + executedNodes: new Set(), + skippedNodes: new Set(), + nodeErrors: new Map(), + executionPlan: [], + status: "executing", + }; + + const registry = createMockRegistry({}); + const mapper = new NodeInputMapper(registry); + + const result = mapper.collectNodeInputs(runtimeState, "NonExistent"); + + expect(result).toEqual({}); + }); + }); + + describe("mapRuntimeToNodeInputs", () => { + it("should throw error for missing required input", async () => { + const runtimeState: RuntimeState = { + workflow: { + id: "workflow-6", + name: "Test Workflow", + handle: "test-workflow", + type: "manual", + nodes: [ + { + id: "A", + type: "text", + inputs: [{ name: "required", type: "string", required: true }], + outputs: [], + }, + ], + edges: [], + } as unknown as Workflow, + nodeOutputs: new Map(), + executedNodes: new Set(), + skippedNodes: new Set(), + nodeErrors: new Map(), + executionPlan: [], + status: "executing", + }; + + const registry = createMockRegistry({ + text: { inputs: [{ name: "required", required: true }] }, + }); + const mapper = new NodeInputMapper(registry); + const mockStore = {} as ObjectStore; + + await expect( + mapper.mapRuntimeToNodeInputs(runtimeState, "A", {}, mockStore) + ).rejects.toThrow("Required input 'required' missing for node A"); + }); + + it("should skip undefined and null values", async () => { + const runtimeState: RuntimeState = { + workflow: { + id: "workflow-7", + name: "Test Workflow", + handle: "test-workflow", + type: "manual", + nodes: [ + { + id: "A", + type: "text", + inputs: [ + { name: "optional1", type: "string", required: false }, + { name: "optional2", type: "string", required: false }, + ], + outputs: [], + }, + ], + edges: [], + } as unknown as Workflow, + nodeOutputs: new Map(), + executedNodes: new Set(), + skippedNodes: new Set(), + nodeErrors: new Map(), + executionPlan: [], + status: "executing", + }; + + const registry = createMockRegistry({ + text: { + inputs: [ + { name: "optional1", required: false }, + { name: "optional2", required: false }, + ], + }, + }); + const mapper = new NodeInputMapper(registry); + const mockStore = {} as ObjectStore; + + const result = await mapper.mapRuntimeToNodeInputs( + runtimeState, + "A", + { optional1: undefined, optional2: null }, + mockStore + ); + + expect(result).toEqual({}); + }); + + it("should process repeated parameters as array", async () => { + const runtimeState: RuntimeState = { + workflow: { + id: "workflow-8", + name: "Test Workflow", + handle: "test-workflow", + type: "manual", + nodes: [ + { + id: "A", + type: "merge", + inputs: [{ name: "values", type: "string", required: false }], + outputs: [], + }, + ], + edges: [], + } as unknown as Workflow, + nodeOutputs: new Map(), + executedNodes: new Set(), + skippedNodes: new Set(), + nodeErrors: new Map(), + executionPlan: [], + status: "executing", + }; + + const registry = createMockRegistry({ + merge: { inputs: [{ name: "values", repeated: true }] }, + }); + const mapper = new NodeInputMapper(registry); + const mockStore = {} as ObjectStore; + + const result = await mapper.mapRuntimeToNodeInputs( + runtimeState, + "A", + { values: ["a", "b", "c"] }, + mockStore + ); + + expect(result).toEqual({ + values: ["a", "b", "c"], + }); + }); + }); +}); diff --git a/apps/api/src/runtime/node-output-mapper.test.ts b/apps/api/src/runtime/node-output-mapper.test.ts new file mode 100644 index 00000000..84b0de51 --- /dev/null +++ b/apps/api/src/runtime/node-output-mapper.test.ts @@ -0,0 +1,284 @@ +import type { Workflow } from "@dafthunk/types"; +import { describe, expect, it, vi } from "vitest"; + +import { NodeOutputMapper } from "./node-output-mapper"; +import type { ObjectStore } from "./object-store"; +import type { RuntimeState } from "./runtime"; + +// Mock the parameter mapper module +vi.mock("../nodes/parameter-mapper", () => ({ + nodeToApiParameter: vi.fn(async (type, value) => { + // Simple mock that returns the value as-is for most types + if (type === "object") { + return { id: "mock-object-ref", mimeType: "application/octet-stream" }; + } + return value; + }), +})); + +describe("NodeOutputMapper", () => { + describe("mapNodeToRuntimeOutputs", () => { + it("should map basic output values", async () => { + const runtimeState: RuntimeState = { + workflow: { + id: "workflow-1", + name: "Test Workflow", + handle: "test-workflow", + type: "manual", + nodes: [ + { + id: "A", + type: "text", + inputs: [], + outputs: [ + { name: "text", type: "string" }, + { name: "count", type: "number" }, + ], + }, + ], + edges: [], + } as unknown as Workflow, + nodeOutputs: new Map(), + executedNodes: new Set(), + skippedNodes: new Set(), + nodeErrors: new Map(), + executionPlan: [], + status: "executing", + }; + + const mapper = new NodeOutputMapper(); + const mockStore = {} as ObjectStore; + + const result = await mapper.mapNodeToRuntimeOutputs( + runtimeState, + "A", + { text: "hello", count: 42 }, + mockStore, + "org-123", + "exec-456" + ); + + expect(result).toEqual({ + text: "hello", + count: 42, + }); + }); + + it("should skip undefined and null output values", async () => { + const runtimeState: RuntimeState = { + workflow: { + id: "workflow-2", + name: "Test Workflow", + handle: "test-workflow", + type: "manual", + nodes: [ + { + id: "A", + type: "text", + inputs: [], + outputs: [ + { name: "output1", type: "string" }, + { name: "output2", type: "string" }, + { name: "output3", type: "string" }, + ], + }, + ], + edges: [], + } as unknown as Workflow, + nodeOutputs: new Map(), + executedNodes: new Set(), + skippedNodes: new Set(), + nodeErrors: new Map(), + executionPlan: [], + status: "executing", + }; + + const mapper = new NodeOutputMapper(); + const mockStore = {} as ObjectStore; + + const result = await mapper.mapNodeToRuntimeOutputs( + runtimeState, + "A", + { output1: "value", output2: undefined, output3: null }, + mockStore, + "org-123", + "exec-456" + ); + + expect(result).toEqual({ + output1: "value", + }); + }); + + it("should throw error when node not found", async () => { + const runtimeState: RuntimeState = { + workflow: { + id: "workflow-3", + name: "Test Workflow", + handle: "test-workflow", + type: "manual", + nodes: [], + edges: [], + } as unknown as Workflow, + nodeOutputs: new Map(), + executedNodes: new Set(), + skippedNodes: new Set(), + nodeErrors: new Map(), + executionPlan: [], + status: "executing", + }; + + const mapper = new NodeOutputMapper(); + const mockStore = {} as ObjectStore; + + await expect( + mapper.mapNodeToRuntimeOutputs( + runtimeState, + "NonExistent", + {}, + mockStore, + "org-123", + "exec-456" + ) + ).rejects.toThrow("Node NonExistent not found"); + }); + + it("should handle multiple output types", async () => { + const runtimeState: RuntimeState = { + workflow: { + id: "workflow-4", + name: "Test Workflow", + handle: "test-workflow", + type: "manual", + nodes: [ + { + id: "A", + type: "multi", + inputs: [], + outputs: [ + { name: "text", type: "string" }, + { name: "number", type: "number" }, + { name: "bool", type: "boolean" }, + { name: "json", type: "json" }, + ], + }, + ], + edges: [], + } as unknown as Workflow, + nodeOutputs: new Map(), + executedNodes: new Set(), + skippedNodes: new Set(), + nodeErrors: new Map(), + executionPlan: [], + status: "executing", + }; + + const mapper = new NodeOutputMapper(); + const mockStore = {} as ObjectStore; + + const result = await mapper.mapNodeToRuntimeOutputs( + runtimeState, + "A", + { + text: "hello", + number: 42, + bool: true, + json: { key: "value" }, + }, + mockStore, + "org-123", + "exec-456" + ); + + expect(result).toEqual({ + text: "hello", + number: 42, + bool: true, + json: { key: "value" }, + }); + }); + + it("should handle empty outputs object", async () => { + const runtimeState: RuntimeState = { + workflow: { + id: "workflow-5", + name: "Test Workflow", + handle: "test-workflow", + type: "manual", + nodes: [ + { + id: "A", + type: "text", + inputs: [], + outputs: [{ name: "output", type: "string" }], + }, + ], + edges: [], + } as unknown as Workflow, + nodeOutputs: new Map(), + executedNodes: new Set(), + skippedNodes: new Set(), + nodeErrors: new Map(), + executionPlan: [], + status: "executing", + }; + + const mapper = new NodeOutputMapper(); + const mockStore = {} as ObjectStore; + + const result = await mapper.mapNodeToRuntimeOutputs( + runtimeState, + "A", + {}, + mockStore, + "org-123", + "exec-456" + ); + + expect(result).toEqual({}); + }); + + it("should handle outputs with no matching values", async () => { + const runtimeState: RuntimeState = { + workflow: { + id: "workflow-6", + name: "Test Workflow", + handle: "test-workflow", + type: "manual", + nodes: [ + { + id: "A", + type: "text", + inputs: [], + outputs: [ + { name: "output1", type: "string" }, + { name: "output2", type: "string" }, + ], + }, + ], + edges: [], + } as unknown as Workflow, + nodeOutputs: new Map(), + executedNodes: new Set(), + skippedNodes: new Set(), + nodeErrors: new Map(), + executionPlan: [], + status: "executing", + }; + + const mapper = new NodeOutputMapper(); + const mockStore = {} as ObjectStore; + + const result = await mapper.mapNodeToRuntimeOutputs( + runtimeState, + "A", + { someOtherOutput: "value" }, + mockStore, + "org-123", + "exec-456" + ); + + expect(result).toEqual({}); + }); + }); +}); diff --git a/apps/api/src/runtime/object-store.test.ts b/apps/api/src/runtime/object-store.test.ts new file mode 100644 index 00000000..3a1ada07 --- /dev/null +++ b/apps/api/src/runtime/object-store.test.ts @@ -0,0 +1,455 @@ +import type { + ObjectReference, + Workflow, + WorkflowExecution, +} from "@dafthunk/types"; +import { beforeEach, describe, expect, it, vi } from "vitest"; + +import { ObjectStore } from "./object-store"; + +describe("ObjectStore", () => { + let mockBucket: any; + + beforeEach(() => { + mockBucket = { + put: vi.fn().mockResolvedValue({ etag: "mock-etag" }), + get: vi.fn(), + delete: vi.fn().mockResolvedValue(undefined), + list: vi.fn(), + }; + }); + + describe("Binary Object Storage", () => { + describe("writeObject", () => { + it("should write object and return reference with generated ID", async () => { + const store = new ObjectStore(mockBucket); + const data = new Uint8Array([1, 2, 3]); + + const result = await store.writeObject( + data, + "image/png", + "org-123", + "exec-456" + ); + + expect(result).toHaveProperty("id"); + expect(result.mimeType).toBe("image/png"); + expect(mockBucket.put).toHaveBeenCalledWith( + expect.stringContaining("objects/"), + data, + expect.objectContaining({ + httpMetadata: expect.objectContaining({ + contentType: "image/png", + }), + }) + ); + }); + }); + + describe("writeObjectWithId", () => { + it("should write object with specific ID", async () => { + const store = new ObjectStore(mockBucket); + const data = new Uint8Array([1, 2, 3]); + + const result = await store.writeObjectWithId( + "custom-id", + data, + "image/jpeg", + "org-123" + ); + + expect(result.id).toBe("custom-id"); + expect(result.mimeType).toBe("image/jpeg"); + expect(mockBucket.put).toHaveBeenCalledWith( + "objects/custom-id/object.data", + data, + expect.objectContaining({ + customMetadata: expect.objectContaining({ + id: "custom-id", + organizationId: "org-123", + }), + }) + ); + }); + + it("should include executionId when provided", async () => { + const store = new ObjectStore(mockBucket); + const data = new Uint8Array([1, 2, 3]); + + await store.writeObjectWithId( + "custom-id", + data, + "image/png", + "org-123", + "exec-456" + ); + + expect(mockBucket.put).toHaveBeenCalledWith( + expect.any(String), + expect.any(Uint8Array), + expect.objectContaining({ + customMetadata: expect.objectContaining({ + executionId: "exec-456", + }), + }) + ); + }); + }); + + describe("readObject", () => { + it("should read object and return data with metadata", async () => { + const mockData = new Uint8Array([1, 2, 3]); + mockBucket.get.mockResolvedValue({ + arrayBuffer: vi.fn().mockResolvedValue(mockData.buffer), + size: 3, + customMetadata: { organizationId: "org-123" }, + }); + + const store = new ObjectStore(mockBucket); + const reference: ObjectReference = { + id: "obj-123", + mimeType: "image/png", + }; + + const result = await store.readObject(reference); + + expect(result).not.toBeNull(); + expect(result?.data).toEqual(mockData); + expect(result?.metadata).toEqual({ organizationId: "org-123" }); + expect(mockBucket.get).toHaveBeenCalledWith( + "objects/obj-123/object.data" + ); + }); + + it("should return null when object not found", async () => { + mockBucket.get.mockResolvedValue(null); + + const store = new ObjectStore(mockBucket); + const reference: ObjectReference = { + id: "obj-123", + mimeType: "image/png", + }; + + const result = await store.readObject(reference); + + expect(result).toBeNull(); + }); + }); + + describe("deleteObject", () => { + it("should delete object", async () => { + const store = new ObjectStore(mockBucket); + const reference: ObjectReference = { + id: "obj-123", + mimeType: "image/png", + }; + + await store.deleteObject(reference); + + expect(mockBucket.delete).toHaveBeenCalledWith( + "objects/obj-123/object.data" + ); + }); + }); + + describe("listObjects", () => { + it("should list objects for organization", async () => { + mockBucket.list.mockResolvedValue({ + objects: [ + { + key: "objects/obj-1/object.data", + size: 100, + httpMetadata: { contentType: "image/png" }, + customMetadata: { + organizationId: "org-123", + createdAt: "2024-01-01T00:00:00Z", + executionId: "exec-1", + }, + }, + { + key: "objects/obj-2/object.data", + size: 200, + httpMetadata: { contentType: "image/jpeg" }, + customMetadata: { + organizationId: "org-123", + createdAt: "2024-01-02T00:00:00Z", + }, + }, + { + key: "objects/obj-3/object.data", + size: 300, + httpMetadata: { contentType: "image/gif" }, + customMetadata: { + organizationId: "org-456", // Different org + createdAt: "2024-01-03T00:00:00Z", + }, + }, + ], + }); + + const store = new ObjectStore(mockBucket); + const result = await store.listObjects("org-123"); + + expect(result).toHaveLength(2); + expect(result[0]).toEqual({ + id: "obj-1", + mimeType: "image/png", + size: 100, + createdAt: new Date("2024-01-01T00:00:00Z"), + organizationId: "org-123", + executionId: "exec-1", + }); + expect(result[1].id).toBe("obj-2"); + }); + }); + }); + + describe("Workflow Storage", () => { + describe("writeWorkflow", () => { + it("should write workflow to storage", async () => { + const store = new ObjectStore(mockBucket); + const workflow: Workflow = { + id: "workflow-123", + name: "Test Workflow", + handle: "test-workflow", + type: "manual", + nodes: [], + edges: [], + } as unknown as Workflow; + + const result = await store.writeWorkflow(workflow); + + expect(result).toBe("workflow-123"); + expect(mockBucket.put).toHaveBeenCalledWith( + "workflows/workflow-123/workflow.json", + JSON.stringify(workflow), + expect.objectContaining({ + httpMetadata: expect.objectContaining({ + contentType: "application/json", + }), + }) + ); + }); + }); + + describe("readWorkflow", () => { + it("should read workflow from storage", async () => { + const workflow: Workflow = { + id: "workflow-123", + name: "Test Workflow", + handle: "test-workflow", + type: "manual", + nodes: [], + edges: [], + } as unknown as Workflow; + + mockBucket.get.mockResolvedValue({ + text: vi.fn().mockResolvedValue(JSON.stringify(workflow)), + size: 100, + }); + + const store = new ObjectStore(mockBucket); + const result = await store.readWorkflow("workflow-123"); + + expect(result).toEqual(workflow); + expect(mockBucket.get).toHaveBeenCalledWith( + "workflows/workflow-123/workflow.json" + ); + }); + + it("should throw error when workflow not found", async () => { + mockBucket.get.mockResolvedValue(null); + + const store = new ObjectStore(mockBucket); + + await expect(store.readWorkflow("workflow-123")).rejects.toThrow( + "Workflow not found: workflow-123" + ); + }); + }); + + describe("deleteWorkflow", () => { + it("should delete workflow from storage", async () => { + const store = new ObjectStore(mockBucket); + + await store.deleteWorkflow("workflow-123"); + + expect(mockBucket.delete).toHaveBeenCalledWith( + "workflows/workflow-123/workflow.json" + ); + }); + }); + }); + + describe("Execution Workflow Storage", () => { + describe("writeExecutionWorkflow", () => { + it("should write execution workflow snapshot", async () => { + const store = new ObjectStore(mockBucket); + const workflow: Workflow = { + id: "workflow-123", + name: "Test Workflow", + handle: "test-workflow", + type: "manual", + nodes: [], + edges: [], + } as unknown as Workflow; + + const result = await store.writeExecutionWorkflow("exec-456", workflow); + + expect(result).toBe("exec-456"); + expect(mockBucket.put).toHaveBeenCalledWith( + "executions/exec-456/workflow.json", + JSON.stringify(workflow), + expect.objectContaining({ + customMetadata: expect.objectContaining({ + executionId: "exec-456", + workflowId: "workflow-123", + }), + }) + ); + }); + }); + + describe("readExecutionWorkflow", () => { + it("should read execution workflow snapshot", async () => { + const workflow: Workflow = { + id: "workflow-123", + name: "Test Workflow", + handle: "test-workflow", + type: "manual", + nodes: [], + edges: [], + } as unknown as Workflow; + + mockBucket.get.mockResolvedValue({ + text: vi.fn().mockResolvedValue(JSON.stringify(workflow)), + size: 100, + }); + + const store = new ObjectStore(mockBucket); + const result = await store.readExecutionWorkflow("exec-456"); + + expect(result).toEqual(workflow); + expect(mockBucket.get).toHaveBeenCalledWith( + "executions/exec-456/workflow.json" + ); + }); + + it("should throw error when execution workflow not found", async () => { + mockBucket.get.mockResolvedValue(null); + + const store = new ObjectStore(mockBucket); + + await expect(store.readExecutionWorkflow("exec-456")).rejects.toThrow( + "Workflow not found for execution: exec-456" + ); + }); + }); + }); + + describe("Execution Storage", () => { + describe("writeExecution", () => { + it("should write execution to storage", async () => { + const store = new ObjectStore(mockBucket); + const execution: WorkflowExecution = { + id: "exec-456", + workflowId: "workflow-123", + status: "completed", + nodeExecutions: [], + }; + + const result = await store.writeExecution(execution); + + expect(result).toBe("exec-456"); + expect(mockBucket.put).toHaveBeenCalledWith( + "executions/exec-456/execution.json", + JSON.stringify(execution), + expect.objectContaining({ + customMetadata: expect.objectContaining({ + workflowId: "workflow-123", + status: "completed", + }), + }) + ); + }); + }); + + describe("readExecution", () => { + it("should read execution from storage", async () => { + const execution: WorkflowExecution = { + id: "exec-456", + workflowId: "workflow-123", + status: "completed", + nodeExecutions: [], + }; + + mockBucket.get.mockResolvedValue({ + text: vi.fn().mockResolvedValue(JSON.stringify(execution)), + size: 100, + }); + + const store = new ObjectStore(mockBucket); + const result = await store.readExecution("exec-456"); + + expect(result).toEqual(execution); + expect(mockBucket.get).toHaveBeenCalledWith( + "executions/exec-456/execution.json" + ); + }); + + it("should throw error when execution not found", async () => { + mockBucket.get.mockResolvedValue(null); + + const store = new ObjectStore(mockBucket); + + await expect(store.readExecution("exec-456")).rejects.toThrow( + "Execution not found: exec-456" + ); + }); + }); + + describe("deleteExecution", () => { + it("should delete execution from storage", async () => { + const store = new ObjectStore(mockBucket); + + await store.deleteExecution("exec-456"); + + expect(mockBucket.delete).toHaveBeenCalledWith( + "executions/exec-456/execution.json" + ); + }); + }); + }); + + describe("Error Handling", () => { + it("should throw error when bucket not initialized", async () => { + const store = new ObjectStore(null as any); + + await expect( + store.writeObject(new Uint8Array([1, 2, 3]), "image/png", "org-123") + ).rejects.toThrow("R2 bucket is not initialized"); + }); + + it("should handle bucket put failure", async () => { + mockBucket.put.mockRejectedValue(new Error("Storage error")); + + const store = new ObjectStore(mockBucket); + + await expect( + store.writeObject(new Uint8Array([1, 2, 3]), "image/png", "org-123") + ).rejects.toThrow("Storage error"); + }); + + it("should handle bucket get failure", async () => { + mockBucket.get.mockRejectedValue(new Error("Read error")); + + const store = new ObjectStore(mockBucket); + const reference: ObjectReference = { + id: "obj-123", + mimeType: "image/png", + }; + + await expect(store.readObject(reference)).rejects.toThrow("Read error"); + }); + }); +});