From 5edd132c7612d682c304f12117480e404e2916b1 Mon Sep 17 00:00:00 2001 From: leonardcser <73912641+leonardcser@users.noreply.github.com> Date: Tue, 30 Sep 2025 17:40:40 +0200 Subject: [PATCH 1/8] feat: implement durable objects for editor page --- apps/api/src/context.ts | 2 + apps/api/src/durable-objects/workflow-do.ts | 435 +++++++++++++++++++ apps/api/src/index.ts | 3 + apps/api/src/routes/workflows.ts | 107 ++++- apps/api/src/routes/ws.ts | 31 ++ apps/api/src/runtime/runtime.ts | 48 +- apps/api/src/utils/encryption.test.ts | 1 + apps/api/wrangler.jsonc | 24 + apps/web/src/hooks/use-editable-workflow.ts | 348 ++++++++------- apps/web/src/pages/editor-page.tsx | 139 +++--- apps/web/src/services/workflow-do-service.ts | 184 ++++++++ apps/web/src/utils/utils.ts | 2 +- packages/types/src/workflow.ts | 74 ++++ 13 files changed, 1141 insertions(+), 257 deletions(-) create mode 100644 apps/api/src/durable-objects/workflow-do.ts create mode 100644 apps/api/src/routes/ws.ts create mode 100644 apps/web/src/services/workflow-do-service.ts diff --git a/apps/api/src/context.ts b/apps/api/src/context.ts index 8832023c..f189ecfe 100644 --- a/apps/api/src/context.ts +++ b/apps/api/src/context.ts @@ -1,6 +1,7 @@ import { JWTTokenPayload } from "@dafthunk/types"; import { RuntimeParams } from "./runtime/runtime"; +import { WorkflowDO } from "./durable-objects/workflow-do"; export interface Bindings { DB: D1Database; @@ -9,6 +10,7 @@ export interface Bindings { RATE_LIMIT_AUTH: RateLimit; RATE_LIMIT_EXECUTE: RateLimit; EXECUTE: Workflow; + WORKFLOW_DO: DurableObjectNamespace; RESSOURCES: R2Bucket; DATASETS: R2Bucket; DATASETS_AUTORAG: string; diff --git a/apps/api/src/durable-objects/workflow-do.ts b/apps/api/src/durable-objects/workflow-do.ts new file mode 100644 index 00000000..f567e5a0 --- /dev/null +++ b/apps/api/src/durable-objects/workflow-do.ts @@ -0,0 +1,435 @@ +import { + WorkflowDOAckMessage, + WorkflowDOErrorMessage, + WorkflowDOExecuteMessage, + WorkflowDOExecutionUpdateMessage, + WorkflowDOInitMessage, + WorkflowDOMessage, + WorkflowDOState, + WorkflowDOUpdateMessage, + WorkflowExecution, + WorkflowType, +} from "@dafthunk/types"; +import { DurableObject } from "cloudflare:workers"; + +import { Bindings } from "../context"; +import { createDatabase } from "../db/index"; +import { getWorkflow, updateWorkflow } from "../db/queries"; + +export class WorkflowDO extends DurableObject { + private sql: SqlStorage; + private connectedClients: Set = new Set(); + private currentExecution: WorkflowExecution | null = null; + private workflowId: string = ""; + private organizationId: string = ""; + private loaded: boolean = false; + private dirty: boolean = false; + + constructor(ctx: DurableObjectState, env: Bindings) { + super(ctx, env); + this.sql = this.ctx.storage.sql; + this.initDatabase(); + } + + private initDatabase() { + this.sql.exec(` + CREATE TABLE IF NOT EXISTS states ( + id TEXT PRIMARY KEY, + nodes TEXT NOT NULL, + edges TEXT NOT NULL, + timestamp INTEGER NOT NULL + ) + `); + this.sql.exec(` + CREATE TABLE IF NOT EXISTS metadata ( + id TEXT PRIMARY KEY, + workflow_id TEXT NOT NULL, + organization_id TEXT NOT NULL, + workflow_name TEXT NOT NULL, + workflow_handle TEXT NOT NULL, + workflow_type TEXT NOT NULL + ) + `); + } + + /** + * Load workflow from database into DO storage if not already loaded + */ + private async ensureLoaded( + workflowId: string, + organizationId: string + ): Promise { + if (this.loaded) { + return; + } + + this.workflowId = workflowId; + this.organizationId = organizationId; + + // Ensure metadata exists + let metadataRow = this.sql + .exec("SELECT * FROM metadata WHERE id = ?", "default") + .toArray()[0]; + + if (!metadataRow) { + try { + const db = createDatabase(this.env.DB); + const workflow = await getWorkflow(db, workflowId, organizationId); + if (workflow) { + const workflowData = workflow.data as any; + this.sql.exec( + `INSERT INTO metadata (id, workflow_id, organization_id, workflow_name, workflow_handle, workflow_type) + VALUES (?, ?, ?, ?, ?, ?) + ON CONFLICT(id) DO UPDATE SET + workflow_id = excluded.workflow_id, + organization_id = excluded.organization_id, + workflow_name = excluded.workflow_name, + workflow_handle = excluded.workflow_handle, + workflow_type = excluded.workflow_type`, + "default", + workflowId, + organizationId, + workflow.name, + workflow.handle, + (workflowData.type || "manual") as WorkflowType + ); + } else { + // Minimal metadata for new workflow + this.sql.exec( + `INSERT INTO metadata (id, workflow_id, organization_id, workflow_name, workflow_handle, workflow_type) + VALUES (?, ?, ?, ?, ?, ?)`, + "default", + workflowId, + organizationId, + "New Workflow", + workflowId, + "manual" as WorkflowType + ); + } + } catch (error) { + console.error("Error loading workflow metadata:", error); + } + } + + // Ensure states entry exists + const statesRow = this.sql + .exec("SELECT * FROM states WHERE id = ?", "default") + .toArray()[0]; + if (!statesRow) { + const timestamp = Date.now(); + this.sql.exec( + `INSERT INTO states (id, nodes, edges, timestamp) + VALUES (?, ?, ?, ?)`, + "default", + JSON.stringify([]), + JSON.stringify([]), + timestamp + ); + } + + this.loaded = true; + } + + /** + * Get state from DO storage (internal use) + */ + private async getStateInternal(): Promise { + const statesCursor = this.sql.exec( + "SELECT nodes, edges, timestamp FROM states WHERE id = ?", + "default" + ); + const statesRow = statesCursor.toArray()[0]; + + const metadataCursor = this.sql.exec( + "SELECT workflow_id as id, workflow_name as name, workflow_handle as handle, workflow_type as type FROM metadata WHERE id = ?", + "default" + ); + const metadataRow = metadataCursor.toArray()[0]; + + if (!statesRow || !metadataRow) { + throw new Error("State or metadata missing; call ensureLoaded first"); + } + + return { + id: metadataRow.id as string, + name: metadataRow.name as string, + handle: metadataRow.handle as string, + type: metadataRow.type as WorkflowType, + nodes: JSON.parse(statesRow.nodes as string), + edges: JSON.parse(statesRow.edges as string), + timestamp: statesRow.timestamp as number, + }; + } + + /** + * Get state (public API) + */ + async getState(): Promise { + return await this.getStateInternal(); + } + + async updateState(nodes: unknown[], edges: unknown[]): Promise { + const timestamp = Date.now(); + this.sql.exec( + `INSERT INTO states (id, nodes, edges, timestamp) + VALUES (?, ?, ?, ?) + ON CONFLICT(id) DO UPDATE SET + nodes = excluded.nodes, + edges = excluded.edges, + timestamp = excluded.timestamp`, + "default", + JSON.stringify(nodes), + JSON.stringify(edges), + timestamp + ); + + this.dirty = true; + + // Schedule an alarm to persist to database in 60 seconds if not already scheduled + const currentAlarm = await this.ctx.storage.getAlarm(); + if (currentAlarm === null) { + await this.ctx.storage.setAlarm(Date.now() + 60000); + } + } + + /** + * Persist DO state back to database + */ + private async persistToDatabase(): Promise { + if (!this.dirty || !this.workflowId || !this.organizationId) { + return; + } + + try { + const state = await this.getState(); + const db = createDatabase(this.env.DB); + await updateWorkflow(db, this.workflowId, this.organizationId, { + name: state.name, + data: { + id: state.id, + name: state.name, + handle: state.handle, + type: state.type, + nodes: state.nodes, + edges: state.edges, + }, + }); + + this.dirty = false; + console.log(`Persisted workflow ${this.workflowId} to database`); + } catch (error) { + console.error("Error persisting workflow to database:", error); + } + } + + /** + * Alarm handler - called when alarm fires + */ + async alarm(): Promise { + console.log("Alarm fired for WorkflowDO"); + await this.persistToDatabase(); + + // If still dirty (updates happened during persist), schedule another alarm + if (this.dirty) { + await this.ctx.storage.setAlarm(Date.now() + 60000); + } + } + + private broadcastExecutionUpdate(execution: WorkflowExecution) { + const message: WorkflowDOExecutionUpdateMessage = { + type: "execution_update", + executionId: execution.id, + status: execution.status, + nodeExecutions: execution.nodeExecutions, + error: execution.error, + }; + + const messageStr = JSON.stringify(message); + for (const client of this.connectedClients) { + try { + client.send(messageStr); + } catch (error) { + console.error("Error broadcasting to client:", error); + } + } + } + + async fetch(request: Request): Promise { + const url = new URL(request.url); + + // Extract workflowId and organizationId from query params + const workflowId = url.searchParams.get("workflowId") || ""; + const organizationId = url.searchParams.get("organizationId") || ""; + + // Ensure workflow is loaded from database + if (workflowId && organizationId) { + await this.ensureLoaded(workflowId, organizationId); + } + + // Handle GET request for workflow state + if (url.pathname === "/state" && request.method === "GET") { + try { + const state = await this.getState(); + + return new Response(JSON.stringify(state), { + headers: { "Content-Type": "application/json" }, + }); + } catch (error) { + console.error("Error getting workflow state:", error); + return new Response( + JSON.stringify({ + error: "Failed to get workflow state", + details: error instanceof Error ? error.message : "Unknown error", + }), + { + status: 500, + headers: { "Content-Type": "application/json" }, + } + ); + } + } + + // Handle execution updates from the runtime + if (url.pathname === "/execution" && request.method === "POST") { + try { + const execution = (await request.json()) as WorkflowExecution; + await this.updateExecution(execution); + return new Response(JSON.stringify({ success: true }), { + headers: { "Content-Type": "application/json" }, + }); + } catch (error) { + console.error("Error updating execution:", error); + return new Response( + JSON.stringify({ + error: "Failed to update execution", + details: error instanceof Error ? error.message : "Unknown error", + }), + { + status: 500, + headers: { "Content-Type": "application/json" }, + } + ); + } + } + + // Handle WebSocket connections (ensureLoaded called earlier if params present) + const upgradeHeader = request.headers.get("Upgrade"); + if (!upgradeHeader || upgradeHeader !== "websocket") { + return new Response("Expected WebSocket or /state GET request", { + status: 426, + }); + } + + const webSocketPair = new WebSocketPair(); + const [client, server] = Object.values(webSocketPair); + + this.ctx.acceptWebSocket(server); + this.connectedClients.add(server); + + // Send initial state + let initState: WorkflowDOState; + try { + initState = await this.getState(); + } catch { + // Fallback minimal state + initState = { + id: workflowId, + name: "New Workflow", + handle: workflowId, + type: "manual", + nodes: [], + edges: [], + timestamp: Date.now(), + }; + } + const initMessage: WorkflowDOInitMessage = { + type: "init", + state: initState, + }; + server.send(JSON.stringify(initMessage)); + + // If there's an ongoing execution, send the current state + if (this.currentExecution) { + this.broadcastExecutionUpdate(this.currentExecution); + } + + return new Response(null, { + status: 101, + webSocket: client, + }); + } + + async webSocketMessage(ws: WebSocket, message: string | ArrayBuffer) { + try { + if (typeof message !== "string") { + const errorMsg: WorkflowDOErrorMessage = { + error: "Expected string message", + }; + ws.send(JSON.stringify(errorMsg)); + return; + } + + const data = JSON.parse(message) as WorkflowDOMessage; + + if ("type" in data && data.type === "update") { + const updateMsg = data as WorkflowDOUpdateMessage; + await this.updateState(updateMsg.nodes, updateMsg.edges); + + // Optionally echo back confirmation + const ackMsg: WorkflowDOAckMessage = { + type: "ack", + timestamp: Date.now(), + }; + ws.send(JSON.stringify(ackMsg)); + } else if ("type" in data && data.type === "execute") { + const executeMsg = data as WorkflowDOExecuteMessage; + + // Store the execution ID so we can track updates from the runtime + this.currentExecution = { + id: executeMsg.executionId, + workflowId: this.workflowId, + status: "submitted", + nodeExecutions: [], + }; + + // Broadcast initial execution state to all clients + this.broadcastExecutionUpdate(this.currentExecution); + } + } catch (error) { + console.error("WebSocket message error:", error); + const errorMsg: WorkflowDOErrorMessage = { + error: "Failed to process message", + details: error instanceof Error ? error.message : "Unknown error", + }; + ws.send(JSON.stringify(errorMsg)); + } + } + + async webSocketClose( + ws: WebSocket, + code: number, + reason: string, + _wasClean: boolean + ) { + this.connectedClients.delete(ws); + ws.close(code, reason); + } + + /** + * Called by the runtime workflow to push execution updates to connected clients + */ + async updateExecution(execution: WorkflowExecution) { + this.currentExecution = execution; + this.broadcastExecutionUpdate(execution); + + // Clear current execution if it's in a terminal state + if ( + execution.status === "completed" || + execution.status === "error" || + execution.status === "cancelled" || + execution.status === "exhausted" + ) { + this.currentExecution = null; + } + } +} diff --git a/apps/api/src/index.ts b/apps/api/src/index.ts index c260d8f5..bc19c600 100644 --- a/apps/api/src/index.ts +++ b/apps/api/src/index.ts @@ -1,5 +1,6 @@ import { Hono } from "hono"; export { Runtime } from "./runtime/runtime"; +export { WorkflowDO } from "./durable-objects/workflow-do"; import auth from "./auth"; import { ApiContext } from "./context"; import { handleCronTriggers } from "./cron"; @@ -21,6 +22,7 @@ import secretRoutes from "./routes/secrets"; import typeRoutes from "./routes/types"; import usageRoutes from "./routes/usage"; import workflowRoutes from "./routes/workflows"; +import wsRoutes from "./routes/ws"; // Initialize Hono app with types const app = new Hono(); @@ -65,6 +67,7 @@ app.route("/:organizationIdOrHandle/secrets", secretRoutes); app.route("/:organizationIdOrHandle/workflows", workflowRoutes); app.route("/:organizationIdOrHandle/objects", objectRoutes); app.route("/:organizationIdOrHandle/usage", usageRoutes); +app.route("/:organizationIdOrHandle/ws", wsRoutes); export default { scheduled: handleCronTriggers, diff --git a/apps/api/src/routes/workflows.ts b/apps/api/src/routes/workflows.ts index e5683536..7d6c1a9c 100644 --- a/apps/api/src/routes/workflows.ts +++ b/apps/api/src/routes/workflows.ts @@ -157,28 +157,63 @@ workflowRoutes.post( */ workflowRoutes.get("/:id", jwtMiddleware, async (c) => { const id = c.req.param("id"); - const db = createDatabase(c.env.DB); - const organizationId = c.get("organizationId")!; - const workflow = await getWorkflow(db, id, organizationId); - if (!workflow) { - return c.json({ error: "Workflow not found" }, 404); + const userId = c.var.jwtPayload?.sub; + + if (!userId) { + return c.json({ error: "Unauthorized" }, 401); } - const workflowData = workflow.data; - - const response: GetWorkflowResponse = { - id: workflow.id, - name: workflow.name, - handle: workflow.handle, - type: workflowData.type, - createdAt: workflow.createdAt, - updatedAt: workflow.updatedAt, - nodes: workflowData.nodes || [], - edges: workflowData.edges || [], - }; + try { + // Get workflow from Durable Object + const doId = c.env.WORKFLOW_DO.idFromName(`${userId}-${id}`); + const workflowData = await c.env.WORKFLOW_DO.get(doId).getState(); + + if (!workflowData) { + // If DO doesn't have it, fall back to database + const db = createDatabase(c.env.DB); + const workflow = await getWorkflow(db, id, organizationId); + if (!workflow) { + return c.json({ error: "Workflow not found" }, 404); + } - return c.json(response); + const workflowData = workflow.data; + + const response: GetWorkflowResponse = { + id: workflow.id, + name: workflow.name, + handle: workflow.handle, + type: workflowData.type, + createdAt: workflow.createdAt, + updatedAt: workflow.updatedAt, + nodes: workflowData.nodes || [], + edges: workflowData.edges || [], + }; + + return c.json(response); + } + + // Get metadata from database for createdAt/updatedAt + const db = createDatabase(c.env.DB); + const workflow = await getWorkflow(db, id, organizationId); + + const response: GetWorkflowResponse = { + id: workflowData.id, + name: workflowData.name, + handle: workflowData.handle, + type: workflowData.type, + createdAt: workflow?.createdAt || new Date(), + updatedAt: workflow?.updatedAt || new Date(), + // @ts-ignore + nodes: workflowData.nodes || [], + edges: workflowData.edges || [], + }; + + return c.json(response); + } catch (error) { + console.error("Error fetching workflow:", error); + return c.json({ error: "Failed to fetch workflow" }, 500); + } }); /** @@ -482,9 +517,39 @@ workflowRoutes.post( let deploymentId: string | undefined; if (version === "dev") { - // Get workflow data directly - workflow = await getWorkflow(db, workflowIdOrHandle, organizationId); - workflowData = workflow.data; + // Get workflow data from Durable Object first + let userId: string; + const jwtPayload = c.get("jwtPayload") as JWTTokenPayload | undefined; + if (jwtPayload) { + userId = jwtPayload.sub || "anonymous"; + } else { + userId = "api"; // Use a placeholder for API-triggered executions + } + + const doId = c.env.WORKFLOW_DO.idFromName( + `${userId}-${workflowIdOrHandle}` + ); + const state = await c.env.WORKFLOW_DO.get(doId).getState(); + + if (state) { + workflowData = { + type: state.type, + nodes: state.nodes || [], + edges: state.edges || [], + }; + workflow = { + id: workflowIdOrHandle, + name: state.name, + handle: state.handle, + }; + } else { + // Fallback to database + workflow = await getWorkflow(db, workflowIdOrHandle, organizationId); + if (!workflow) { + return c.json({ error: "Workflow not found" }, 404); + } + workflowData = workflow.data; + } } else { // Get deployment based on version let deployment; diff --git a/apps/api/src/routes/ws.ts b/apps/api/src/routes/ws.ts new file mode 100644 index 00000000..2db982a0 --- /dev/null +++ b/apps/api/src/routes/ws.ts @@ -0,0 +1,31 @@ +import { Hono } from "hono"; +import { jwtMiddleware } from "../auth"; +import { ApiContext } from "../context"; + +const wsRoutes = new Hono(); + +// WebSocket endpoint for real-time workflow state synchronization +wsRoutes.get("/", jwtMiddleware, async (c) => { + const upgradeHeader = c.req.header("Upgrade"); + + if (!upgradeHeader || upgradeHeader !== "websocket") { + return c.json({ error: "Expected WebSocket connection" }, 426); + } + + const userId = c.var.jwtPayload?.sub; + const workflowId = c.req.query("workflowId"); + + if (!userId || !workflowId) { + console.error("Missing userId or workflowId:", { userId, workflowId }); + return c.json({ error: "Missing userId or workflowId" }, 400); + } + + // Create a unique DO ID for this user + workflow combination + const doId = c.env.WORKFLOW_DO.idFromName(`${userId}-${workflowId}`); + const stub = c.env.WORKFLOW_DO.get(doId); + + // Proxy the WebSocket connection to the Durable Object + return stub.fetch(c.req.raw); +}); + +export default wsRoutes; diff --git a/apps/api/src/runtime/runtime.ts b/apps/api/src/runtime/runtime.ts index bb43f6c7..d8d8b42a 100644 --- a/apps/api/src/runtime/runtime.ts +++ b/apps/api/src/runtime/runtime.ts @@ -1187,6 +1187,29 @@ export class Runtime extends WorkflowEntrypoint { return ordered.length === workflow.nodes.length ? ordered : []; } + /** + * Pushes execution update to the Durable Object for real-time updates + */ + private async pushExecutionUpdateToDO( + userId: string, + workflowId: string, + execution: WorkflowExecution + ): Promise { + try { + // Create the Durable Object ID from userId + workflowId + const doId = this.env.WORKFLOW_DO.idFromName(`${userId}-${workflowId}`); + const stub = this.env.WORKFLOW_DO.get(doId); + + await stub.updateExecution(execution); + } catch (error) { + console.error( + "Failed to push execution update to Durable Object:", + error + ); + // Don't throw - this is a non-critical operation + } + } + /** * Persists the workflow execution state to the database. */ @@ -1233,9 +1256,19 @@ export class Runtime extends WorkflowEntrypoint { ? Array.from(runtimeState.nodeErrors.values()).join(", ") : undefined; + const execution: WorkflowExecution = { + id: instanceId, + workflowId, + status: executionStatus, + nodeExecutions: nodeExecutionList, + error: errorMsg, + startedAt, + endedAt, + }; + try { const db = createDatabase(this.env.DB); - return await saveExecution(db, { + await saveExecution(db, { id: instanceId, workflowId, userId, @@ -1252,15 +1285,10 @@ export class Runtime extends WorkflowEntrypoint { // Continue without interrupting the workflow. } - return { - id: instanceId, - workflowId, - status: executionStatus, - nodeExecutions: nodeExecutionList, - error: errorMsg, - startedAt, - endedAt, - }; + // Push update to Durable Object for real-time updates + await this.pushExecutionUpdateToDO(userId, workflowId, execution); + + return execution; } /** diff --git a/apps/api/src/utils/encryption.test.ts b/apps/api/src/utils/encryption.test.ts index 2061c30f..4eef7c78 100644 --- a/apps/api/src/utils/encryption.test.ts +++ b/apps/api/src/utils/encryption.test.ts @@ -20,6 +20,7 @@ const createMockEnv = (masterKey?: string): Bindings => ({ RATE_LIMIT_AUTH: {} as RateLimit, RATE_LIMIT_EXECUTE: {} as RateLimit, EXECUTE: {} as Workflow, + WORKFLOW_DO: {} as DurableObjectNamespace, RESSOURCES: {} as R2Bucket, DATASETS: {} as R2Bucket, DATASETS_AUTORAG: "", diff --git a/apps/api/wrangler.jsonc b/apps/api/wrangler.jsonc index 842cab78..a9a1a8bd 100644 --- a/apps/api/wrangler.jsonc +++ b/apps/api/wrangler.jsonc @@ -67,6 +67,21 @@ "analytics_engine_datasets": [ { "binding": "COMPUTE", "dataset": "dafthunk-compute-development" } ], + "durable_objects": { + "bindings": [ + { + "name": "WORKFLOW_DO", + "class_name": "WorkflowDO", + "script_name": "dafthunk-api" + } + ] + }, + "migrations": [ + { + "tag": "v1", + "new_sqlite_classes": ["WorkflowDO"] + } + ], "unsafe": { "bindings": [ { @@ -155,6 +170,15 @@ "analytics_engine_datasets": [ { "binding": "COMPUTE", "dataset": "dafthunk-compute-production" } ], + "durable_objects": { + "bindings": [ + { + "name": "WORKFLOW_DO", + "class_name": "WorkflowDO", + "script_name": "dafthunk-api" + } + ] + }, "unsafe": { "bindings": [ { diff --git a/apps/web/src/hooks/use-editable-workflow.ts b/apps/web/src/hooks/use-editable-workflow.ts index 0285e6e3..f3f42d22 100644 --- a/apps/web/src/hooks/use-editable-workflow.ts +++ b/apps/web/src/hooks/use-editable-workflow.ts @@ -1,211 +1,234 @@ -import type { Parameter, ParameterType, Workflow } from "@dafthunk/types"; +import type { Parameter, ParameterType, WorkflowExecution } from "@dafthunk/types"; import type { Edge, Node } from "@xyflow/react"; -import { useCallback, useEffect, useMemo, useState } from "react"; +import { useCallback, useEffect, useMemo, useRef, useState } from "react"; import { useAuth } from "@/components/auth-context"; import type { NodeTemplate, WorkflowEdgeType, WorkflowNodeType, -} from "@/components/workflow/workflow-types"; // Corrected import path -import { updateWorkflow } from "@/services/workflow-service"; +} from "@/components/workflow/workflow-types"; +import { + connectWorkflowWS, + WorkflowDOState, + WorkflowWebSocket, +} from "@/services/workflow-do-service"; import { adaptDeploymentNodesToReactFlowNodes } from "@/utils/utils"; import { debounce } from "@/utils/utils"; interface UseEditableWorkflowProps { workflowId: string | undefined; - currentWorkflow: Workflow | null | undefined; - isWorkflowDetailsLoading: boolean; - workflowDetailsError: Error | null; nodeTemplates?: NodeTemplate[]; + enableWebSocket?: boolean; + onExecutionUpdate?: (execution: WorkflowExecution) => void; } export function useEditableWorkflow({ workflowId, - currentWorkflow, - isWorkflowDetailsLoading, - workflowDetailsError, nodeTemplates = [], + enableWebSocket = true, // Enable by default now + onExecutionUpdate, }: UseEditableWorkflowProps) { const [nodes, setNodes] = useState[]>([]); const [edges, setEdges] = useState[]>([]); const [isInitializing, setIsInitializing] = useState(true); const [processingError, setProcessingError] = useState(null); const [savingError, setSavingError] = useState(null); + const wsRef = useRef(null); + const [isWSConnected, setIsWSConnected] = useState(false); + const [workflowMetadata, setWorkflowMetadata] = useState<{ + id: string; + name: string; + handle: string; + type: string; + } | null>(null); // Get the organization from the auth context at the hook level const { organization } = useAuth(); - // Effect to initialize nodes and edges from currentWorkflow + // WebSocket is now the primary source of workflow data + // No effect needed here - state is set via WebSocket onInit callback + + // WebSocket connection effect useEffect(() => { - if (isWorkflowDetailsLoading) { - setIsInitializing(true); + // Don't connect if WebSocket is not enabled or missing required data + if (!enableWebSocket || !workflowId || !organization?.handle) { + setIsInitializing(false); return; } - if (workflowDetailsError || !currentWorkflow) { - setIsInitializing(false); - if (workflowDetailsError) { - setProcessingError( - workflowDetailsError.message || "Failed to load workflow data." - ); - } - setNodes([]); - setEdges([]); + // Prevent duplicate connections if already connected + if (wsRef.current?.isConnected()) { return; } - try { - const reactFlowNodes = adaptDeploymentNodesToReactFlowNodes( - currentWorkflow.nodes, - nodeTemplates - ); - const reactFlowEdges = currentWorkflow.edges.map((edge, index) => ({ - id: `e${index}`, - source: edge.source, - target: edge.target, - sourceHandle: edge.sourceOutput, - targetHandle: edge.targetInput, - type: "workflowEdge", - data: { - isValid: true, - sourceType: edge.sourceOutput, - targetType: edge.targetInput, - }, - })); - - setNodes(reactFlowNodes); - setEdges(reactFlowEdges); - setProcessingError(null); - } catch (error) { - console.error("Error processing workflow data into React Flow:", error); - setProcessingError( - error instanceof Error - ? error.message - : "Error adapting workflow data for editor." - ); - setNodes([]); - setEdges([]); - } finally { - setIsInitializing(false); - } - }, [ - currentWorkflow, - isWorkflowDetailsLoading, - workflowDetailsError, - nodeTemplates, - ]); + // Start initializing + setIsInitializing(true); + + // Add a small delay to avoid race conditions during React strict mode double-mount + const timeoutId = setTimeout(() => { + // Double-check we're not already connected after the delay + if (wsRef.current?.isConnected()) { + return; + } + + const ws = connectWorkflowWS(organization.handle, workflowId, { + onInit: (state: WorkflowDOState) => { + console.log("WebSocket received initial state:", state); + try { + // Store workflow metadata - id and type are required, name and handle can be empty + if (state.id && state.type) { + setWorkflowMetadata({ + id: state.id, + name: state.name || "", + handle: state.handle || "", + type: state.type, + }); + } + + const reactFlowNodes = adaptDeploymentNodesToReactFlowNodes( + state.nodes, + nodeTemplates + ); + const reactFlowEdges = state.edges.map((edge: any, index: number) => ({ + id: `e${index}`, + source: edge.source, + target: edge.target, + sourceHandle: edge.sourceOutput, + targetHandle: edge.targetInput, + type: "workflowEdge", + data: { + isValid: true, + sourceType: edge.sourceOutput, + targetType: edge.targetInput, + }, + })); + + setNodes(reactFlowNodes); + setEdges(reactFlowEdges); + setProcessingError(null); + setIsInitializing(false); + } catch (error) { + console.error("Error processing WebSocket state:", error); + setProcessingError("Failed to load state from WebSocket"); + setIsInitializing(false); + } + }, + onOpen: () => { + console.log("WebSocket connected"); + setIsWSConnected(true); + }, + onClose: () => { + console.log("WebSocket disconnected"); + setIsWSConnected(false); + }, + onError: (error) => { + console.error("WebSocket error:", error); + setSavingError(`WebSocket error: ${error}`); + setProcessingError(`WebSocket error: ${error}`); + setIsInitializing(false); + }, + onExecutionUpdate: (execution: WorkflowExecution) => { + console.log("WebSocket received execution update:", execution); + if (onExecutionUpdate) { + // Add workflowId to the execution object + onExecutionUpdate({ + ...execution, + workflowId: workflowId, + }); + } + }, + }); + + wsRef.current = ws; + }, 100); // Small delay to avoid double-mount issues + + return () => { + clearTimeout(timeoutId); + if (wsRef.current) { + wsRef.current.disconnect(); + wsRef.current = null; + } + }; + // eslint-disable-next-line react-hooks/exhaustive-deps + }, [enableWebSocket, workflowId, organization?.handle]); const saveWorkflowInternal = useCallback( async ( nodesToSave: Node[], edgesToSave: Edge[] ) => { - if (!workflowId || !currentWorkflow) { - setSavingError( - "Workflow ID or current workflow data is missing, cannot save." - ); + if (!workflowId) { + setSavingError("Workflow ID is missing, cannot save."); return; } setSavingError(null); - try { - // Check if any node is currently executing, purely for logging/awareness. - // The actual node.data.executionState should be handled by the UI layer (use-workflow-state) - // and those updated nodes/edges are what we receive in nodesToSave/edgesToSave. - if ( - nodesToSave.some((node) => node.data.executionState === "executing") - ) { - console.log( - "Workflow elements are in an executing state during save." - ); - } - - const workflowNodes = nodesToSave.map((node) => { - const incomingEdges = edgesToSave.filter( - (edge) => edge.target === node.id - ); - return { - id: node.id, - name: node.data.name, - type: node.data.nodeType || "default", // Ensure nodeType is present - position: node.position, - icon: node.data.icon, - functionCalling: node.data.functionCalling, - inputs: node.data.inputs.map((input) => { - const isConnected = incomingEdges.some( - (edge) => edge.targetHandle === input.id - ); - const parameterBase: Omit & { value?: any } = - { - name: input.id, - type: input.type as ParameterType["type"], - description: input.name, - hidden: input.hidden, - required: input.required, - repeated: input.repeated, + // If WebSocket is enabled and connected, use it instead of REST API + if (enableWebSocket && wsRef.current?.isConnected()) { + try { + const workflowNodes = nodesToSave.map((node) => { + const incomingEdges = edgesToSave.filter( + (edge) => edge.target === node.id + ); + return { + id: node.id, + name: node.data.name, + type: node.data.nodeType || "default", + position: node.position, + icon: node.data.icon, + functionCalling: node.data.functionCalling, + inputs: node.data.inputs.map((input) => { + const isConnected = incomingEdges.some( + (edge) => edge.targetHandle === input.id + ); + const parameterBase: Omit & { value?: any } = + { + name: input.id, + type: input.type as ParameterType["type"], + description: input.name, + hidden: input.hidden, + required: input.required, + repeated: input.repeated, + }; + if (!isConnected && typeof input.value !== "undefined") { + parameterBase.value = input.value; + } + return parameterBase as Parameter; + }), + outputs: node.data.outputs.map((output) => { + const parameter: Parameter = { + name: output.id, + type: output.type as ParameterType["type"], + description: output.name, + hidden: output.hidden, }; - if (!isConnected && typeof input.value !== "undefined") { - parameterBase.value = input.value; - } - return parameterBase as Parameter; - }), - outputs: node.data.outputs.map((output) => { - const parameter: Parameter = { - name: output.id, - type: output.type as ParameterType["type"], - description: output.name, - hidden: output.hidden, - // value is not part of output parameters definition in the backend model here - }; - return parameter; - }), - }; - }); - - const workflowEdges = edgesToSave.map((edge) => ({ - source: edge.source, - target: edge.target, - sourceOutput: edge.sourceHandle || "", - targetInput: edge.targetHandle || "", - })); - - const workflowToSave: Workflow = { - ...currentWorkflow, // Base workflow details like name, description etc. - id: workflowId, // Ensure the ID is correctly set - nodes: workflowNodes, - edges: workflowEdges, - }; - - console.log( - "Saving workflow via useEditableWorkflow:", - workflowId, - workflowToSave - ); - - const orgHandle = organization?.handle; - - if (!orgHandle) { - throw new Error("Organization handle is required to save workflow"); - } + return parameter; + }), + }; + }); - await updateWorkflow(workflowId, workflowToSave, orgHandle); - } catch (error) { - console.error("Error saving workflow via useEditableWorkflow:", error); + const workflowEdges = edgesToSave.map((edge) => ({ + source: edge.source, + target: edge.target, + sourceOutput: edge.sourceHandle || "", + targetInput: edge.targetHandle || "", + })); - // If it's an authentication error, the user might need to refresh/login again - if (error instanceof Error && error.message.includes("Unauthorized")) { - setSavingError( - "Authentication expired. Please refresh the page or login again." - ); - } else { - setSavingError( - error instanceof Error ? error.message : "Failed to save workflow." - ); + wsRef.current.send(workflowNodes, workflowEdges); + return; + } catch (error) { + console.error("Error saving via WebSocket:", error); + setSavingError("Failed to save via WebSocket, falling back to REST API"); + // Fall through to REST API } } + + // WebSocket not available - cannot save + console.warn("WebSocket not available, workflow changes may not be saved"); + setSavingError("WebSocket not connected. Please refresh the page."); }, - [workflowId, organization, currentWorkflow] + [workflowId, organization, enableWebSocket] ); const saveWorkflow = useMemo( @@ -218,6 +241,14 @@ export function useEditableWorkflow({ [saveWorkflowInternal] ); + const startExecution = useCallback((executionId: string) => { + if (wsRef.current?.isConnected()) { + wsRef.current.executeWorkflow(executionId); + } else { + console.warn("WebSocket not connected, cannot start execution via WebSocket"); + } + }, []); + return { nodes, edges, @@ -225,5 +256,8 @@ export function useEditableWorkflow({ processingError, savingError, saveWorkflow, + isWSConnected, + startExecution, + workflowMetadata, }; } diff --git a/apps/web/src/pages/editor-page.tsx b/apps/web/src/pages/editor-page.tsx index 5dca9634..438638aa 100644 --- a/apps/web/src/pages/editor-page.tsx +++ b/apps/web/src/pages/editor-page.tsx @@ -23,6 +23,7 @@ import type { WorkflowExecution, WorkflowNodeType, } from "@/components/workflow/workflow-types"; +import type { WorkflowType } from "@dafthunk/types"; import { useEditableWorkflow } from "@/hooks/use-editable-workflow"; import { useOrgUrl } from "@/hooks/use-org-url"; import { usePageBreadcrumbs } from "@/hooks/use-page"; @@ -35,7 +36,6 @@ import { useNodeTypes } from "@/services/type-service"; import { upsertCronTrigger, useCronTrigger, - useWorkflow, useWorkflowExecution, } from "@/services/workflow-service"; @@ -54,25 +54,17 @@ export function EditorPage() { const [isEmailTriggerDialogOpen, setIsEmailTriggerDialogOpen] = useState(false); - const { - workflow: currentWorkflow, - workflowError: workflowDetailsError, - isWorkflowLoading: isWorkflowDetailsLoading, - } = useWorkflow(id || null, { revalidateOnFocus: false }); - - const { cronTrigger, isCronTriggerLoading, mutateCronTrigger } = - useCronTrigger(currentWorkflow?.type === "cron" && id ? id : null, { - revalidateOnFocus: false, - }); - - const { - deployments: deploymentHistory, - isDeploymentHistoryLoading, - mutateHistory: mutateDeploymentHistory, - } = useDeploymentHistory(id!, { revalidateOnFocus: false }); + // No longer fetching workflow via REST - using WebSocket in use-editable-workflow + // const { + // workflow: currentWorkflow, + // workflowError: workflowDetailsError, + // isWorkflowLoading: isWorkflowDetailsLoading, + // } = useWorkflow(id || null, { revalidateOnFocus: false }); + // We need workflowMetadata early, but useEditableWorkflow needs nodeTemplates + // Fetch all node types initially (no filter) const { nodeTypes, nodeTypesError, isNodeTypesLoading } = useNodeTypes( - currentWorkflow?.type, + undefined, // Fetch all node types initially { revalidateOnFocus: false } ); @@ -110,6 +102,34 @@ export function EditorPage() { return templates; }, [nodeTypes]); + // Get workflow metadata from WebSocket connection + const { + nodes: initialNodesForUI, + edges: initialEdgesForUI, + isInitializing: isWorkflowInitializing, + processingError: workflowProcessingError, + savingError: workflowSavingError, + saveWorkflow, + isWSConnected: _isWSConnected, + workflowMetadata, + } = useEditableWorkflow({ + workflowId: id, + nodeTemplates, + enableWebSocket: true, + }); + + // Now we can use workflowMetadata for cron trigger + const { cronTrigger, isCronTriggerLoading, mutateCronTrigger } = + useCronTrigger(workflowMetadata?.type === "cron" && id ? id : null, { + revalidateOnFocus: false, + }); + + const { + deployments: deploymentHistory, + isDeploymentHistoryLoading, + mutateHistory: mutateDeploymentHistory, + } = useDeploymentHistory(id!, { revalidateOnFocus: false }); + const deploymentVersions = useMemo( () => deploymentHistory.map((d) => d.version).sort((a, b) => b - a), [deploymentHistory] @@ -153,21 +173,6 @@ export function EditorPage() { [id, orgHandle, mutateCronTrigger] ); - const { - nodes: initialNodesForUI, - edges: initialEdgesForUI, - isInitializing: isWorkflowInitializing, - processingError: workflowProcessingError, - savingError: workflowSavingError, - saveWorkflow, - } = useEditableWorkflow({ - workflowId: id, - currentWorkflow, - isWorkflowDetailsLoading, - workflowDetailsError, - nodeTemplates, - }); - useEffect(() => { if (initialNodesForUI) { setLatestUiNodes(initialNodesForUI); @@ -183,21 +188,21 @@ export function EditorPage() { const handleUiNodesChanged = useCallback( (updatedNodesFromUI: Node[]) => { setLatestUiNodes(updatedNodesFromUI); - if (currentWorkflow) { + if (workflowMetadata) { saveWorkflow(updatedNodesFromUI, latestUiEdges); } }, - [latestUiEdges, saveWorkflow, currentWorkflow] + [latestUiEdges, saveWorkflow, workflowMetadata] ); const handleUiEdgesChanged = useCallback( (updatedEdgesFromUI: Edge[]) => { setLatestUiEdges(updatedEdgesFromUI); - if (currentWorkflow) { + if (workflowMetadata) { saveWorkflow(latestUiNodes, updatedEdgesFromUI); } }, - [latestUiNodes, saveWorkflow, currentWorkflow] + [latestUiNodes, saveWorkflow, workflowMetadata] ); const { @@ -216,9 +221,9 @@ export function EditorPage() { usePageBreadcrumbs( [ { label: "Workflows", to: getOrgUrl("workflows") }, - { label: currentWorkflow?.name || "Workflow" }, + { label: workflowMetadata?.name || "Workflow" }, ], - [currentWorkflow?.name] + [workflowMetadata?.name] ); const validateConnection = useCallback( @@ -259,10 +264,10 @@ export function EditorPage() { onExecutionFromBuilder, latestUiNodes, nodeTemplates as any, - currentWorkflow?.type + workflowMetadata?.type ); }, - [executeWorkflow, latestUiNodes, nodeTemplates, currentWorkflow?.type] + [executeWorkflow, latestUiNodes, nodeTemplates, workflowMetadata?.type] ); const handleRetryLoading = () => { @@ -293,16 +298,17 @@ export function EditorPage() { setWorkflowBuilderKey(Date.now()); }; - if (workflowDetailsError) { - return ( - - ); - } + // No longer using REST fetch for workflow details - handled by WebSocket + // if (workflowDetailsError) { + // return ( + // + // ); + // } if (nodeTypesError) { return ( @@ -327,7 +333,6 @@ export function EditorPage() { } if ( - isWorkflowDetailsLoading || isNodeTypesLoading || isWorkflowInitializing || isCronTriggerLoading || @@ -337,16 +342,14 @@ export function EditorPage() { } if ( - !currentWorkflow && - !isWorkflowDetailsLoading && - !workflowDetailsError && + !workflowMetadata && !isNodeTypesLoading && !nodeTypesError && !isWorkflowInitializing ) { return ( navigate(getOrgUrl("workflows"))} /> ); @@ -363,19 +366,19 @@ export function EditorPage() { setIsHttpIntegrationDialogOpen(true) : undefined } onShowEmailTrigger={ - currentWorkflow?.type === "email_message" + workflowMetadata?.type === "email_message" ? () => setIsEmailTriggerDialogOpen(true) : undefined } @@ -390,7 +393,7 @@ export function EditorPage() { createObjectUrl={createObjectUrl} /> - {currentWorkflow?.type === "http_request" && ( + {workflowMetadata?.type === "http_request" && ( setIsHttpIntegrationDialogOpen(false)} @@ -401,7 +404,7 @@ export function EditorPage() { nodeTemplates={nodeTemplates} /> )} - {currentWorkflow?.type === "http_request" && + {workflowMetadata?.type === "http_request" && executionFormParameters.length > 0 && ( )} - {currentWorkflow?.type === "http_request" && + {workflowMetadata?.type === "http_request" && executionJsonBodyParameters.length > 0 && ( )} - {currentWorkflow?.type === "email_message" && ( + {workflowMetadata?.type === "email_message" && ( setIsEmailTriggerDialogOpen(false)} orgHandle={orgHandle} - workflowHandle={currentWorkflow.handle} + workflowHandle={workflowMetadata.handle} deploymentVersion="dev" /> )} - {currentWorkflow?.type === "email_message" && ( + {workflowMetadata?.type === "email_message" && ( )} - {currentWorkflow?.type === "cron" && ( + {workflowMetadata?.type === "cron" && ( )} diff --git a/apps/web/src/services/workflow-do-service.ts b/apps/web/src/services/workflow-do-service.ts new file mode 100644 index 00000000..1ef0f962 --- /dev/null +++ b/apps/web/src/services/workflow-do-service.ts @@ -0,0 +1,184 @@ +import type { + Edge, + Node, + WorkflowDOAckMessage, + WorkflowDOErrorMessage, + WorkflowDOExecutionUpdateMessage, + WorkflowDOInitMessage, + WorkflowDOState, + WorkflowDOUpdateMessage, + WorkflowExecution, +} from "@dafthunk/types"; + +import { getApiBaseUrl } from "@/config/api"; + +// Re-export for convenience +export type { WorkflowDOState }; + +type WebSocketMessage = + | WorkflowDOInitMessage + | WorkflowDOAckMessage + | WorkflowDOErrorMessage + | WorkflowDOExecutionUpdateMessage; + +export interface WorkflowWSOptions { + onInit?: (state: WorkflowDOState) => void; + onAck?: (timestamp: number) => void; + onError?: (error: string) => void; + onClose?: () => void; + onOpen?: () => void; + onExecutionUpdate?: (execution: WorkflowExecution) => void; +} + +export class WorkflowWebSocket { + private ws: WebSocket | null = null; + private reconnectAttempts = 0; + private maxReconnectAttempts = 5; + private reconnectDelay = 1000; // Start with 1 second + private shouldReconnect = true; + + constructor( + private orgHandle: string, + private workflowId: string, + private options: WorkflowWSOptions = {} + ) {} + + connect(): void { + if ( + this.ws?.readyState === WebSocket.OPEN || + this.ws?.readyState === WebSocket.CONNECTING + ) { + return; + } + + const apiBaseUrl = getApiBaseUrl(); + const wsBaseUrl = apiBaseUrl.replace(/^http/, "ws"); + const url = `${wsBaseUrl}/${this.orgHandle}/ws?workflowId=${this.workflowId}`; + + try { + this.ws = new WebSocket(url); + + this.ws.onopen = () => { + console.log("WebSocket connected"); + this.reconnectAttempts = 0; + this.reconnectDelay = 1000; + this.options.onOpen?.(); + }; + + this.ws.onmessage = (event) => { + try { + const message = JSON.parse(event.data) as WebSocketMessage; + + if ("error" in message) { + console.error("WebSocket error message:", message.error); + this.options.onError?.(message.error || ""); + } else if (message.type === "init") { + this.options.onInit?.(message.state); + } else if (message.type === "ack") { + this.options.onAck?.(message.timestamp); + } else if (message.type === "execution_update") { + this.options.onExecutionUpdate?.({ + id: message.executionId, + workflowId: "", // Will be filled from context + status: message.status, + nodeExecutions: message.nodeExecutions, + error: message.error, + }); + } + } catch (error) { + console.error("Failed to parse WebSocket message:", error); + this.options.onError?.("Failed to parse message"); + } + }; + + this.ws.onerror = (error) => { + console.error("WebSocket error:", error); + this.options.onError?.("WebSocket connection error"); + }; + + this.ws.onclose = () => { + console.log("WebSocket closed"); + this.options.onClose?.(); + + if ( + this.shouldReconnect && + this.reconnectAttempts < this.maxReconnectAttempts + ) { + this.reconnectAttempts++; + console.log( + `Reconnecting... Attempt ${this.reconnectAttempts}/${this.maxReconnectAttempts}` + ); + + setTimeout(() => { + this.connect(); + }, this.reconnectDelay); + + // Exponential backoff + this.reconnectDelay = Math.min(this.reconnectDelay * 2, 30000); + } + }; + } catch (error) { + console.error("Failed to create WebSocket:", error); + this.options.onError?.("Failed to create WebSocket connection"); + } + } + + send(nodes: Node[], edges: Edge[]): void { + if (this.ws?.readyState !== WebSocket.OPEN) { + console.warn("WebSocket is not open, cannot send message"); + return; + } + + try { + const updateMsg: WorkflowDOUpdateMessage = { + type: "update", + nodes, + edges, + }; + this.ws.send(JSON.stringify(updateMsg)); + } catch (error) { + console.error("Failed to send WebSocket message:", error); + this.options.onError?.("Failed to send message"); + } + } + + executeWorkflow(executionId: string): void { + if (this.ws?.readyState !== WebSocket.OPEN) { + console.warn("WebSocket is not open, cannot send execute message"); + return; + } + + try { + const executeMsg = { + type: "execute", + executionId, + }; + this.ws.send(JSON.stringify(executeMsg)); + } catch (error) { + console.error("Failed to send execute message:", error); + this.options.onError?.("Failed to send execute message"); + } + } + + disconnect(): void { + this.shouldReconnect = false; + if (this.ws) { + this.ws.close(); + this.ws = null; + } + } + + isConnected(): boolean { + return this.ws?.readyState === WebSocket.OPEN; + } +} + +export const connectWorkflowWS = ( + orgHandle: string, + workflowId: string, + options: WorkflowWSOptions = {} +): WorkflowWebSocket => { + const ws = new WorkflowWebSocket(orgHandle, workflowId, options); + ws.connect(); + return ws; +}; diff --git a/apps/web/src/utils/utils.ts b/apps/web/src/utils/utils.ts index 97bf3983..71ad94e3 100644 --- a/apps/web/src/utils/utils.ts +++ b/apps/web/src/utils/utils.ts @@ -118,7 +118,7 @@ export function adaptDeploymentNodesToReactFlowNodes( // Find the icon from nodeTemplates by matching the node type const template = nodeTemplates.find((t) => t.type === depNode.type); - const icon = template?.icon || "circle"; // fallback icon + const icon = depNode.icon || template?.icon || "circle"; // prefer persisted icon return { id: depNode.id, diff --git a/packages/types/src/workflow.ts b/packages/types/src/workflow.ts index 3f8a0296..aefcf527 100644 --- a/packages/types/src/workflow.ts +++ b/packages/types/src/workflow.ts @@ -465,3 +465,77 @@ export interface GetCronTriggerResponse { * Returns the full trigger information. */ export type UpsertCronTriggerResponse = GetCronTriggerResponse; + +/** + * WebSocket message types for Durable Object real-time sync + */ + +/** + * Workflow state stored in Durable Object + */ +export interface WorkflowDOState extends Workflow { + timestamp: number; +} + +/** + * Message sent from DO to client with initial state + */ +export interface WorkflowDOInitMessage { + type: "init"; + state: WorkflowDOState; +} + +/** + * Message sent from client to DO to update state + */ +export interface WorkflowDOUpdateMessage { + type: "update"; + nodes: Node[]; + edges: Edge[]; +} + +/** + * Acknowledgment message sent from DO to client + */ +export interface WorkflowDOAckMessage { + type: "ack"; + timestamp: number; +} + +/** + * Error message sent from DO to client + */ +export interface WorkflowDOErrorMessage { + error: string; + details?: string; +} + +/** + * Message sent from client to DO to start workflow execution + */ +export interface WorkflowDOExecuteMessage { + type: "execute"; + executionId: string; +} + +/** + * Message sent from DO to client with execution progress updates + */ +export interface WorkflowDOExecutionUpdateMessage { + type: "execution_update"; + executionId: string; + status: WorkflowExecutionStatus; + nodeExecutions: NodeExecution[]; + error?: string; +} + +/** + * All possible WebSocket messages + */ +export type WorkflowDOMessage = + | WorkflowDOInitMessage + | WorkflowDOUpdateMessage + | WorkflowDOAckMessage + | WorkflowDOErrorMessage + | WorkflowDOExecuteMessage + | WorkflowDOExecutionUpdateMessage; From e9c051ff22748b170cb095b0ff07caf89d493a0e Mon Sep 17 00:00:00 2001 From: leonardcser <73912641+leonardcser@users.noreply.github.com> Date: Tue, 30 Sep 2025 17:54:54 +0200 Subject: [PATCH 2/8] style: fix lint errors --- apps/api/src/context.ts | 2 +- apps/api/src/durable-objects/workflow-do.ts | 196 ++++++-------------- apps/api/src/index.ts | 2 +- apps/api/src/routes/workflows.ts | 7 +- apps/api/src/routes/ws.ts | 25 ++- apps/api/src/runtime/runtime.ts | 48 +---- apps/api/src/utils/encryption.test.ts | 3 +- apps/web/src/hooks/use-editable-workflow.ts | 176 ++++++++---------- apps/web/src/pages/editor-page.tsx | 23 +-- apps/web/src/utils/utils.ts | 2 +- 10 files changed, 174 insertions(+), 310 deletions(-) diff --git a/apps/api/src/context.ts b/apps/api/src/context.ts index f189ecfe..fed53f5a 100644 --- a/apps/api/src/context.ts +++ b/apps/api/src/context.ts @@ -1,7 +1,7 @@ import { JWTTokenPayload } from "@dafthunk/types"; -import { RuntimeParams } from "./runtime/runtime"; import { WorkflowDO } from "./durable-objects/workflow-do"; +import { RuntimeParams } from "./runtime/runtime"; export interface Bindings { DB: D1Database; diff --git a/apps/api/src/durable-objects/workflow-do.ts b/apps/api/src/durable-objects/workflow-do.ts index f567e5a0..be5f3151 100644 --- a/apps/api/src/durable-objects/workflow-do.ts +++ b/apps/api/src/durable-objects/workflow-do.ts @@ -1,13 +1,10 @@ import { WorkflowDOAckMessage, WorkflowDOErrorMessage, - WorkflowDOExecuteMessage, - WorkflowDOExecutionUpdateMessage, WorkflowDOInitMessage, WorkflowDOMessage, WorkflowDOState, WorkflowDOUpdateMessage, - WorkflowExecution, WorkflowType, } from "@dafthunk/types"; import { DurableObject } from "cloudflare:workers"; @@ -18,8 +15,7 @@ import { getWorkflow, updateWorkflow } from "../db/queries"; export class WorkflowDO extends DurableObject { private sql: SqlStorage; - private connectedClients: Set = new Set(); - private currentExecution: WorkflowExecution | null = null; + private workflowId: string = ""; private organizationId: string = ""; private loaded: boolean = false; @@ -66,65 +62,71 @@ export class WorkflowDO extends DurableObject { this.workflowId = workflowId; this.organizationId = organizationId; - // Ensure metadata exists - let metadataRow = this.sql - .exec("SELECT * FROM metadata WHERE id = ?", "default") - .toArray()[0]; - - if (!metadataRow) { - try { - const db = createDatabase(this.env.DB); - const workflow = await getWorkflow(db, workflowId, organizationId); - if (workflow) { - const workflowData = workflow.data as any; - this.sql.exec( - `INSERT INTO metadata (id, workflow_id, organization_id, workflow_name, workflow_handle, workflow_type) - VALUES (?, ?, ?, ?, ?, ?) - ON CONFLICT(id) DO UPDATE SET - workflow_id = excluded.workflow_id, - organization_id = excluded.organization_id, - workflow_name = excluded.workflow_name, - workflow_handle = excluded.workflow_handle, - workflow_type = excluded.workflow_type`, - "default", - workflowId, - organizationId, - workflow.name, - workflow.handle, - (workflowData.type || "manual") as WorkflowType - ); - } else { - // Minimal metadata for new workflow - this.sql.exec( - `INSERT INTO metadata (id, workflow_id, organization_id, workflow_name, workflow_handle, workflow_type) - VALUES (?, ?, ?, ?, ?, ?)`, - "default", - workflowId, - organizationId, - "New Workflow", - workflowId, - "manual" as WorkflowType - ); - } - } catch (error) { - console.error("Error loading workflow metadata:", error); + try { + const db = createDatabase(this.env.DB); + const workflow = await getWorkflow(db, workflowId, organizationId); + + const nodes = workflow + ? JSON.stringify((workflow.data as any).nodes || []) + : JSON.stringify([]); + const edges = workflow + ? JSON.stringify((workflow.data as any).edges || []) + : JSON.stringify([]); + const timestamp = workflow + ? workflow.updatedAt + ? workflow.updatedAt.getTime() + : Date.now() + : Date.now(); + + // Upsert metadata + if (workflow) { + this.sql.exec( + `INSERT INTO metadata (id, workflow_id, organization_id, workflow_name, workflow_handle, workflow_type) + VALUES (?, ?, ?, ?, ?, ?) + ON CONFLICT(id) DO UPDATE SET + workflow_id = excluded.workflow_id, + organization_id = excluded.organization_id, + workflow_name = excluded.workflow_name, + workflow_handle = excluded.workflow_handle, + workflow_type = excluded.workflow_type`, + "default", + workflowId, + organizationId, + workflow.name, + workflow.handle, + ((workflow.data as any).type || "manual") as WorkflowType + ); + } else { + // Minimal metadata for new workflow + this.sql.exec( + `INSERT INTO metadata (id, workflow_id, organization_id, workflow_name, workflow_handle, workflow_type) + VALUES (?, ?, ?, ?, ?, ?)`, + "default", + workflowId, + organizationId, + "New Workflow", + workflowId, + "manual" as WorkflowType + ); } - } - // Ensure states entry exists - const statesRow = this.sql - .exec("SELECT * FROM states WHERE id = ?", "default") - .toArray()[0]; - if (!statesRow) { - const timestamp = Date.now(); + // Upsert states this.sql.exec( `INSERT INTO states (id, nodes, edges, timestamp) - VALUES (?, ?, ?, ?)`, + VALUES (?, ?, ?, ?) + ON CONFLICT(id) DO UPDATE SET + nodes = excluded.nodes, + edges = excluded.edges, + timestamp = excluded.timestamp`, "default", - JSON.stringify([]), - JSON.stringify([]), + nodes, + edges, timestamp ); + + this.dirty = false; + } catch (error) { + console.error("Error loading workflow:", error); } this.loaded = true; @@ -235,25 +237,6 @@ export class WorkflowDO extends DurableObject { } } - private broadcastExecutionUpdate(execution: WorkflowExecution) { - const message: WorkflowDOExecutionUpdateMessage = { - type: "execution_update", - executionId: execution.id, - status: execution.status, - nodeExecutions: execution.nodeExecutions, - error: execution.error, - }; - - const messageStr = JSON.stringify(message); - for (const client of this.connectedClients) { - try { - client.send(messageStr); - } catch (error) { - console.error("Error broadcasting to client:", error); - } - } - } - async fetch(request: Request): Promise { const url = new URL(request.url); @@ -289,29 +272,6 @@ export class WorkflowDO extends DurableObject { } } - // Handle execution updates from the runtime - if (url.pathname === "/execution" && request.method === "POST") { - try { - const execution = (await request.json()) as WorkflowExecution; - await this.updateExecution(execution); - return new Response(JSON.stringify({ success: true }), { - headers: { "Content-Type": "application/json" }, - }); - } catch (error) { - console.error("Error updating execution:", error); - return new Response( - JSON.stringify({ - error: "Failed to update execution", - details: error instanceof Error ? error.message : "Unknown error", - }), - { - status: 500, - headers: { "Content-Type": "application/json" }, - } - ); - } - } - // Handle WebSocket connections (ensureLoaded called earlier if params present) const upgradeHeader = request.headers.get("Upgrade"); if (!upgradeHeader || upgradeHeader !== "websocket") { @@ -324,7 +284,6 @@ export class WorkflowDO extends DurableObject { const [client, server] = Object.values(webSocketPair); this.ctx.acceptWebSocket(server); - this.connectedClients.add(server); // Send initial state let initState: WorkflowDOState; @@ -348,11 +307,6 @@ export class WorkflowDO extends DurableObject { }; server.send(JSON.stringify(initMessage)); - // If there's an ongoing execution, send the current state - if (this.currentExecution) { - this.broadcastExecutionUpdate(this.currentExecution); - } - return new Response(null, { status: 101, webSocket: client, @@ -381,19 +335,6 @@ export class WorkflowDO extends DurableObject { timestamp: Date.now(), }; ws.send(JSON.stringify(ackMsg)); - } else if ("type" in data && data.type === "execute") { - const executeMsg = data as WorkflowDOExecuteMessage; - - // Store the execution ID so we can track updates from the runtime - this.currentExecution = { - id: executeMsg.executionId, - workflowId: this.workflowId, - status: "submitted", - nodeExecutions: [], - }; - - // Broadcast initial execution state to all clients - this.broadcastExecutionUpdate(this.currentExecution); } } catch (error) { console.error("WebSocket message error:", error); @@ -411,25 +352,6 @@ export class WorkflowDO extends DurableObject { reason: string, _wasClean: boolean ) { - this.connectedClients.delete(ws); ws.close(code, reason); } - - /** - * Called by the runtime workflow to push execution updates to connected clients - */ - async updateExecution(execution: WorkflowExecution) { - this.currentExecution = execution; - this.broadcastExecutionUpdate(execution); - - // Clear current execution if it's in a terminal state - if ( - execution.status === "completed" || - execution.status === "error" || - execution.status === "cancelled" || - execution.status === "exhausted" - ) { - this.currentExecution = null; - } - } } diff --git a/apps/api/src/index.ts b/apps/api/src/index.ts index bc19c600..21ad19a7 100644 --- a/apps/api/src/index.ts +++ b/apps/api/src/index.ts @@ -1,6 +1,6 @@ import { Hono } from "hono"; -export { Runtime } from "./runtime/runtime"; export { WorkflowDO } from "./durable-objects/workflow-do"; +export { Runtime } from "./runtime/runtime"; import auth from "./auth"; import { ApiContext } from "./context"; import { handleCronTriggers } from "./cron"; diff --git a/apps/api/src/routes/workflows.ts b/apps/api/src/routes/workflows.ts index 7d6c1a9c..c0073914 100644 --- a/apps/api/src/routes/workflows.ts +++ b/apps/api/src/routes/workflows.ts @@ -167,7 +167,9 @@ workflowRoutes.get("/:id", jwtMiddleware, async (c) => { try { // Get workflow from Durable Object const doId = c.env.WORKFLOW_DO.idFromName(`${userId}-${id}`); - const workflowData = await c.env.WORKFLOW_DO.get(doId).getState(); + const stub = c.env.WORKFLOW_DO.get(doId); + // @ts-ignore + const workflowData = await stub.getState(); if (!workflowData) { // If DO doesn't have it, fall back to database @@ -529,7 +531,8 @@ workflowRoutes.post( const doId = c.env.WORKFLOW_DO.idFromName( `${userId}-${workflowIdOrHandle}` ); - const state = await c.env.WORKFLOW_DO.get(doId).getState(); + const stub = c.env.WORKFLOW_DO.get(doId); + const state = await stub.getState(); if (state) { workflowData = { diff --git a/apps/api/src/routes/ws.ts b/apps/api/src/routes/ws.ts index 2db982a0..263c7770 100644 --- a/apps/api/src/routes/ws.ts +++ b/apps/api/src/routes/ws.ts @@ -1,4 +1,5 @@ import { Hono } from "hono"; + import { jwtMiddleware } from "../auth"; import { ApiContext } from "../context"; @@ -14,18 +15,30 @@ wsRoutes.get("/", jwtMiddleware, async (c) => { const userId = c.var.jwtPayload?.sub; const workflowId = c.req.query("workflowId"); - - if (!userId || !workflowId) { - console.error("Missing userId or workflowId:", { userId, workflowId }); - return c.json({ error: "Missing userId or workflowId" }, 400); + const organizationId = c.get("organizationId")!; + + if (!userId || !workflowId || !organizationId) { + console.error("Missing userId, workflowId or organizationId:", { + userId, + workflowId, + organizationId, + }); + return c.json( + { error: "Missing userId, workflowId or organizationId" }, + 400 + ); } // Create a unique DO ID for this user + workflow combination const doId = c.env.WORKFLOW_DO.idFromName(`${userId}-${workflowId}`); const stub = c.env.WORKFLOW_DO.get(doId); - // Proxy the WebSocket connection to the Durable Object - return stub.fetch(c.req.raw); + // Reconstruct request with required query params for DO + const url = new URL(c.req.url); + url.searchParams.set("organizationId", organizationId); + url.searchParams.set("workflowId", workflowId); + const newReq = new Request(url.toString(), c.req.raw); + return stub.fetch(newReq); }); export default wsRoutes; diff --git a/apps/api/src/runtime/runtime.ts b/apps/api/src/runtime/runtime.ts index d8d8b42a..bb43f6c7 100644 --- a/apps/api/src/runtime/runtime.ts +++ b/apps/api/src/runtime/runtime.ts @@ -1187,29 +1187,6 @@ export class Runtime extends WorkflowEntrypoint { return ordered.length === workflow.nodes.length ? ordered : []; } - /** - * Pushes execution update to the Durable Object for real-time updates - */ - private async pushExecutionUpdateToDO( - userId: string, - workflowId: string, - execution: WorkflowExecution - ): Promise { - try { - // Create the Durable Object ID from userId + workflowId - const doId = this.env.WORKFLOW_DO.idFromName(`${userId}-${workflowId}`); - const stub = this.env.WORKFLOW_DO.get(doId); - - await stub.updateExecution(execution); - } catch (error) { - console.error( - "Failed to push execution update to Durable Object:", - error - ); - // Don't throw - this is a non-critical operation - } - } - /** * Persists the workflow execution state to the database. */ @@ -1256,19 +1233,9 @@ export class Runtime extends WorkflowEntrypoint { ? Array.from(runtimeState.nodeErrors.values()).join(", ") : undefined; - const execution: WorkflowExecution = { - id: instanceId, - workflowId, - status: executionStatus, - nodeExecutions: nodeExecutionList, - error: errorMsg, - startedAt, - endedAt, - }; - try { const db = createDatabase(this.env.DB); - await saveExecution(db, { + return await saveExecution(db, { id: instanceId, workflowId, userId, @@ -1285,10 +1252,15 @@ export class Runtime extends WorkflowEntrypoint { // Continue without interrupting the workflow. } - // Push update to Durable Object for real-time updates - await this.pushExecutionUpdateToDO(userId, workflowId, execution); - - return execution; + return { + id: instanceId, + workflowId, + status: executionStatus, + nodeExecutions: nodeExecutionList, + error: errorMsg, + startedAt, + endedAt, + }; } /** diff --git a/apps/api/src/utils/encryption.test.ts b/apps/api/src/utils/encryption.test.ts index 4eef7c78..f322a3a2 100644 --- a/apps/api/src/utils/encryption.test.ts +++ b/apps/api/src/utils/encryption.test.ts @@ -5,6 +5,7 @@ import { beforeEach, describe, expect, it } from "vitest"; import { Bindings } from "../context"; +import { WorkflowDO } from "../durable-objects/workflow-do"; import { decryptSecret, encryptSecret } from "./encryption"; // Mock Bindings for testing @@ -20,7 +21,7 @@ const createMockEnv = (masterKey?: string): Bindings => ({ RATE_LIMIT_AUTH: {} as RateLimit, RATE_LIMIT_EXECUTE: {} as RateLimit, EXECUTE: {} as Workflow, - WORKFLOW_DO: {} as DurableObjectNamespace, + WORKFLOW_DO: {} as DurableObjectNamespace, RESSOURCES: {} as R2Bucket, DATASETS: {} as R2Bucket, DATASETS_AUTORAG: "", diff --git a/apps/web/src/hooks/use-editable-workflow.ts b/apps/web/src/hooks/use-editable-workflow.ts index f3f42d22..e1ac5450 100644 --- a/apps/web/src/hooks/use-editable-workflow.ts +++ b/apps/web/src/hooks/use-editable-workflow.ts @@ -1,4 +1,4 @@ -import type { Parameter, ParameterType, WorkflowExecution } from "@dafthunk/types"; +import type { Parameter, ParameterType } from "@dafthunk/types"; import type { Edge, Node } from "@xyflow/react"; import { useCallback, useEffect, useMemo, useRef, useState } from "react"; @@ -19,15 +19,11 @@ import { debounce } from "@/utils/utils"; interface UseEditableWorkflowProps { workflowId: string | undefined; nodeTemplates?: NodeTemplate[]; - enableWebSocket?: boolean; - onExecutionUpdate?: (execution: WorkflowExecution) => void; } export function useEditableWorkflow({ workflowId, nodeTemplates = [], - enableWebSocket = true, // Enable by default now - onExecutionUpdate, }: UseEditableWorkflowProps) { const [nodes, setNodes] = useState[]>([]); const [edges, setEdges] = useState[]>([]); @@ -43,16 +39,11 @@ export function useEditableWorkflow({ type: string; } | null>(null); - // Get the organization from the auth context at the hook level const { organization } = useAuth(); - // WebSocket is now the primary source of workflow data - // No effect needed here - state is set via WebSocket onInit callback - // WebSocket connection effect useEffect(() => { - // Don't connect if WebSocket is not enabled or missing required data - if (!enableWebSocket || !workflowId || !organization?.handle) { + if (!workflowId || !organization?.handle) { setIsInitializing(false); return; } @@ -62,7 +53,6 @@ export function useEditableWorkflow({ return; } - // Start initializing setIsInitializing(true); // Add a small delay to avoid race conditions during React strict mode double-mount @@ -73,71 +63,63 @@ export function useEditableWorkflow({ } const ws = connectWorkflowWS(organization.handle, workflowId, { - onInit: (state: WorkflowDOState) => { - console.log("WebSocket received initial state:", state); - try { - // Store workflow metadata - id and type are required, name and handle can be empty - if (state.id && state.type) { - setWorkflowMetadata({ - id: state.id, - name: state.name || "", - handle: state.handle || "", - type: state.type, - }); - } - - const reactFlowNodes = adaptDeploymentNodesToReactFlowNodes( - state.nodes, - nodeTemplates - ); - const reactFlowEdges = state.edges.map((edge: any, index: number) => ({ - id: `e${index}`, - source: edge.source, - target: edge.target, - sourceHandle: edge.sourceOutput, - targetHandle: edge.targetInput, - type: "workflowEdge", - data: { - isValid: true, - sourceType: edge.sourceOutput, - targetType: edge.targetInput, - }, - })); + onInit: (state: WorkflowDOState) => { + console.log("WebSocket received initial state:", state); + try { + // Store workflow metadata - id and type are required, name and handle can be empty + if (state.id && state.type) { + setWorkflowMetadata({ + id: state.id, + name: state.name || "", + handle: state.handle || "", + type: state.type, + }); + } + + const reactFlowNodes = adaptDeploymentNodesToReactFlowNodes( + state.nodes, + nodeTemplates + ); + const reactFlowEdges = state.edges.map( + (edge: any, index: number) => ({ + id: `e${index}`, + source: edge.source, + target: edge.target, + sourceHandle: edge.sourceOutput, + targetHandle: edge.targetInput, + type: "workflowEdge", + data: { + isValid: true, + sourceType: edge.sourceOutput, + targetType: edge.targetInput, + }, + }) + ); - setNodes(reactFlowNodes); - setEdges(reactFlowEdges); - setProcessingError(null); - setIsInitializing(false); - } catch (error) { - console.error("Error processing WebSocket state:", error); - setProcessingError("Failed to load state from WebSocket"); + setNodes(reactFlowNodes); + setEdges(reactFlowEdges); + setProcessingError(null); + setIsInitializing(false); + } catch (error) { + console.error("Error processing WebSocket state:", error); + setProcessingError("Failed to load state from WebSocket"); + setIsInitializing(false); + } + }, + onOpen: () => { + console.log("WebSocket connected"); + setIsWSConnected(true); + }, + onClose: () => { + console.log("WebSocket disconnected"); + setIsWSConnected(false); + }, + onError: (error) => { + console.error("WebSocket error:", error); + setSavingError(`WebSocket error: ${error}`); + setProcessingError(`WebSocket error: ${error}`); setIsInitializing(false); - } - }, - onOpen: () => { - console.log("WebSocket connected"); - setIsWSConnected(true); - }, - onClose: () => { - console.log("WebSocket disconnected"); - setIsWSConnected(false); - }, - onError: (error) => { - console.error("WebSocket error:", error); - setSavingError(`WebSocket error: ${error}`); - setProcessingError(`WebSocket error: ${error}`); - setIsInitializing(false); - }, - onExecutionUpdate: (execution: WorkflowExecution) => { - console.log("WebSocket received execution update:", execution); - if (onExecutionUpdate) { - // Add workflowId to the execution object - onExecutionUpdate({ - ...execution, - workflowId: workflowId, - }); - } - }, + }, }); wsRef.current = ws; @@ -150,8 +132,9 @@ export function useEditableWorkflow({ wsRef.current = null; } }; + // eslint-disable-next-line react-compiler/react-compiler // eslint-disable-next-line react-hooks/exhaustive-deps - }, [enableWebSocket, workflowId, organization?.handle]); + }, [workflowId, organization?.handle]); const saveWorkflowInternal = useCallback( async ( @@ -164,8 +147,7 @@ export function useEditableWorkflow({ } setSavingError(null); - // If WebSocket is enabled and connected, use it instead of REST API - if (enableWebSocket && wsRef.current?.isConnected()) { + if (wsRef.current?.isConnected()) { try { const workflowNodes = nodesToSave.map((node) => { const incomingEdges = edgesToSave.filter( @@ -182,15 +164,16 @@ export function useEditableWorkflow({ const isConnected = incomingEdges.some( (edge) => edge.targetHandle === input.id ); - const parameterBase: Omit & { value?: any } = - { - name: input.id, - type: input.type as ParameterType["type"], - description: input.name, - hidden: input.hidden, - required: input.required, - repeated: input.repeated, - }; + const parameterBase: Omit & { + value?: any; + } = { + name: input.id, + type: input.type as ParameterType["type"], + description: input.name, + hidden: input.hidden, + required: input.required, + repeated: input.repeated, + }; if (!isConnected && typeof input.value !== "undefined") { parameterBase.value = input.value; } @@ -219,16 +202,16 @@ export function useEditableWorkflow({ return; } catch (error) { console.error("Error saving via WebSocket:", error); - setSavingError("Failed to save via WebSocket, falling back to REST API"); - // Fall through to REST API + setSavingError("Failed to save via WebSocket"); } } - // WebSocket not available - cannot save - console.warn("WebSocket not available, workflow changes may not be saved"); + console.warn( + "WebSocket not available, workflow changes may not be saved" + ); setSavingError("WebSocket not connected. Please refresh the page."); }, - [workflowId, organization, enableWebSocket] + [workflowId] ); const saveWorkflow = useMemo( @@ -241,14 +224,6 @@ export function useEditableWorkflow({ [saveWorkflowInternal] ); - const startExecution = useCallback((executionId: string) => { - if (wsRef.current?.isConnected()) { - wsRef.current.executeWorkflow(executionId); - } else { - console.warn("WebSocket not connected, cannot start execution via WebSocket"); - } - }, []); - return { nodes, edges, @@ -257,7 +232,6 @@ export function useEditableWorkflow({ savingError, saveWorkflow, isWSConnected, - startExecution, workflowMetadata, }; } diff --git a/apps/web/src/pages/editor-page.tsx b/apps/web/src/pages/editor-page.tsx index 438638aa..cfee25fc 100644 --- a/apps/web/src/pages/editor-page.tsx +++ b/apps/web/src/pages/editor-page.tsx @@ -1,3 +1,4 @@ +import type { WorkflowType } from "@dafthunk/types"; import type { Connection, Edge, Node } from "@xyflow/react"; import { ReactFlowProvider } from "@xyflow/react"; import { useCallback, useEffect, useMemo, useState } from "react"; @@ -23,7 +24,6 @@ import type { WorkflowExecution, WorkflowNodeType, } from "@/components/workflow/workflow-types"; -import type { WorkflowType } from "@dafthunk/types"; import { useEditableWorkflow } from "@/hooks/use-editable-workflow"; import { useOrgUrl } from "@/hooks/use-org-url"; import { usePageBreadcrumbs } from "@/hooks/use-page"; @@ -54,14 +54,6 @@ export function EditorPage() { const [isEmailTriggerDialogOpen, setIsEmailTriggerDialogOpen] = useState(false); - // No longer fetching workflow via REST - using WebSocket in use-editable-workflow - // const { - // workflow: currentWorkflow, - // workflowError: workflowDetailsError, - // isWorkflowLoading: isWorkflowDetailsLoading, - // } = useWorkflow(id || null, { revalidateOnFocus: false }); - - // We need workflowMetadata early, but useEditableWorkflow needs nodeTemplates // Fetch all node types initially (no filter) const { nodeTypes, nodeTypesError, isNodeTypesLoading } = useNodeTypes( undefined, // Fetch all node types initially @@ -115,7 +107,6 @@ export function EditorPage() { } = useEditableWorkflow({ workflowId: id, nodeTemplates, - enableWebSocket: true, }); // Now we can use workflowMetadata for cron trigger @@ -298,18 +289,6 @@ export function EditorPage() { setWorkflowBuilderKey(Date.now()); }; - // No longer using REST fetch for workflow details - handled by WebSocket - // if (workflowDetailsError) { - // return ( - // - // ); - // } - if (nodeTypesError) { return ( t.type === depNode.type); - const icon = depNode.icon || template?.icon || "circle"; // prefer persisted icon + const icon = depNode.icon || template?.icon || "circle"; // fallback icon return { id: depNode.id, From 9d363fa28f3594bade4d5592323a7f5b84a671d1 Mon Sep 17 00:00:00 2001 From: Bertil Chapuis Date: Tue, 30 Sep 2025 23:20:36 +0200 Subject: [PATCH 3/8] feat(api): replace durable objects with websocket worker --- apps/api/src/context.ts | 2 - apps/api/src/durable-objects/workflow-do.ts | 357 -------------------- apps/api/src/index.ts | 1 - apps/api/src/routes/workflows.ts | 85 +---- apps/api/src/routes/ws.ts | 133 +++++++- apps/api/src/utils/encryption.test.ts | 2 - apps/api/wrangler.jsonc | 24 -- 7 files changed, 140 insertions(+), 464 deletions(-) delete mode 100644 apps/api/src/durable-objects/workflow-do.ts diff --git a/apps/api/src/context.ts b/apps/api/src/context.ts index fed53f5a..8832023c 100644 --- a/apps/api/src/context.ts +++ b/apps/api/src/context.ts @@ -1,6 +1,5 @@ import { JWTTokenPayload } from "@dafthunk/types"; -import { WorkflowDO } from "./durable-objects/workflow-do"; import { RuntimeParams } from "./runtime/runtime"; export interface Bindings { @@ -10,7 +9,6 @@ export interface Bindings { RATE_LIMIT_AUTH: RateLimit; RATE_LIMIT_EXECUTE: RateLimit; EXECUTE: Workflow; - WORKFLOW_DO: DurableObjectNamespace; RESSOURCES: R2Bucket; DATASETS: R2Bucket; DATASETS_AUTORAG: string; diff --git a/apps/api/src/durable-objects/workflow-do.ts b/apps/api/src/durable-objects/workflow-do.ts deleted file mode 100644 index be5f3151..00000000 --- a/apps/api/src/durable-objects/workflow-do.ts +++ /dev/null @@ -1,357 +0,0 @@ -import { - WorkflowDOAckMessage, - WorkflowDOErrorMessage, - WorkflowDOInitMessage, - WorkflowDOMessage, - WorkflowDOState, - WorkflowDOUpdateMessage, - WorkflowType, -} from "@dafthunk/types"; -import { DurableObject } from "cloudflare:workers"; - -import { Bindings } from "../context"; -import { createDatabase } from "../db/index"; -import { getWorkflow, updateWorkflow } from "../db/queries"; - -export class WorkflowDO extends DurableObject { - private sql: SqlStorage; - - private workflowId: string = ""; - private organizationId: string = ""; - private loaded: boolean = false; - private dirty: boolean = false; - - constructor(ctx: DurableObjectState, env: Bindings) { - super(ctx, env); - this.sql = this.ctx.storage.sql; - this.initDatabase(); - } - - private initDatabase() { - this.sql.exec(` - CREATE TABLE IF NOT EXISTS states ( - id TEXT PRIMARY KEY, - nodes TEXT NOT NULL, - edges TEXT NOT NULL, - timestamp INTEGER NOT NULL - ) - `); - this.sql.exec(` - CREATE TABLE IF NOT EXISTS metadata ( - id TEXT PRIMARY KEY, - workflow_id TEXT NOT NULL, - organization_id TEXT NOT NULL, - workflow_name TEXT NOT NULL, - workflow_handle TEXT NOT NULL, - workflow_type TEXT NOT NULL - ) - `); - } - - /** - * Load workflow from database into DO storage if not already loaded - */ - private async ensureLoaded( - workflowId: string, - organizationId: string - ): Promise { - if (this.loaded) { - return; - } - - this.workflowId = workflowId; - this.organizationId = organizationId; - - try { - const db = createDatabase(this.env.DB); - const workflow = await getWorkflow(db, workflowId, organizationId); - - const nodes = workflow - ? JSON.stringify((workflow.data as any).nodes || []) - : JSON.stringify([]); - const edges = workflow - ? JSON.stringify((workflow.data as any).edges || []) - : JSON.stringify([]); - const timestamp = workflow - ? workflow.updatedAt - ? workflow.updatedAt.getTime() - : Date.now() - : Date.now(); - - // Upsert metadata - if (workflow) { - this.sql.exec( - `INSERT INTO metadata (id, workflow_id, organization_id, workflow_name, workflow_handle, workflow_type) - VALUES (?, ?, ?, ?, ?, ?) - ON CONFLICT(id) DO UPDATE SET - workflow_id = excluded.workflow_id, - organization_id = excluded.organization_id, - workflow_name = excluded.workflow_name, - workflow_handle = excluded.workflow_handle, - workflow_type = excluded.workflow_type`, - "default", - workflowId, - organizationId, - workflow.name, - workflow.handle, - ((workflow.data as any).type || "manual") as WorkflowType - ); - } else { - // Minimal metadata for new workflow - this.sql.exec( - `INSERT INTO metadata (id, workflow_id, organization_id, workflow_name, workflow_handle, workflow_type) - VALUES (?, ?, ?, ?, ?, ?)`, - "default", - workflowId, - organizationId, - "New Workflow", - workflowId, - "manual" as WorkflowType - ); - } - - // Upsert states - this.sql.exec( - `INSERT INTO states (id, nodes, edges, timestamp) - VALUES (?, ?, ?, ?) - ON CONFLICT(id) DO UPDATE SET - nodes = excluded.nodes, - edges = excluded.edges, - timestamp = excluded.timestamp`, - "default", - nodes, - edges, - timestamp - ); - - this.dirty = false; - } catch (error) { - console.error("Error loading workflow:", error); - } - - this.loaded = true; - } - - /** - * Get state from DO storage (internal use) - */ - private async getStateInternal(): Promise { - const statesCursor = this.sql.exec( - "SELECT nodes, edges, timestamp FROM states WHERE id = ?", - "default" - ); - const statesRow = statesCursor.toArray()[0]; - - const metadataCursor = this.sql.exec( - "SELECT workflow_id as id, workflow_name as name, workflow_handle as handle, workflow_type as type FROM metadata WHERE id = ?", - "default" - ); - const metadataRow = metadataCursor.toArray()[0]; - - if (!statesRow || !metadataRow) { - throw new Error("State or metadata missing; call ensureLoaded first"); - } - - return { - id: metadataRow.id as string, - name: metadataRow.name as string, - handle: metadataRow.handle as string, - type: metadataRow.type as WorkflowType, - nodes: JSON.parse(statesRow.nodes as string), - edges: JSON.parse(statesRow.edges as string), - timestamp: statesRow.timestamp as number, - }; - } - - /** - * Get state (public API) - */ - async getState(): Promise { - return await this.getStateInternal(); - } - - async updateState(nodes: unknown[], edges: unknown[]): Promise { - const timestamp = Date.now(); - this.sql.exec( - `INSERT INTO states (id, nodes, edges, timestamp) - VALUES (?, ?, ?, ?) - ON CONFLICT(id) DO UPDATE SET - nodes = excluded.nodes, - edges = excluded.edges, - timestamp = excluded.timestamp`, - "default", - JSON.stringify(nodes), - JSON.stringify(edges), - timestamp - ); - - this.dirty = true; - - // Schedule an alarm to persist to database in 60 seconds if not already scheduled - const currentAlarm = await this.ctx.storage.getAlarm(); - if (currentAlarm === null) { - await this.ctx.storage.setAlarm(Date.now() + 60000); - } - } - - /** - * Persist DO state back to database - */ - private async persistToDatabase(): Promise { - if (!this.dirty || !this.workflowId || !this.organizationId) { - return; - } - - try { - const state = await this.getState(); - const db = createDatabase(this.env.DB); - await updateWorkflow(db, this.workflowId, this.organizationId, { - name: state.name, - data: { - id: state.id, - name: state.name, - handle: state.handle, - type: state.type, - nodes: state.nodes, - edges: state.edges, - }, - }); - - this.dirty = false; - console.log(`Persisted workflow ${this.workflowId} to database`); - } catch (error) { - console.error("Error persisting workflow to database:", error); - } - } - - /** - * Alarm handler - called when alarm fires - */ - async alarm(): Promise { - console.log("Alarm fired for WorkflowDO"); - await this.persistToDatabase(); - - // If still dirty (updates happened during persist), schedule another alarm - if (this.dirty) { - await this.ctx.storage.setAlarm(Date.now() + 60000); - } - } - - async fetch(request: Request): Promise { - const url = new URL(request.url); - - // Extract workflowId and organizationId from query params - const workflowId = url.searchParams.get("workflowId") || ""; - const organizationId = url.searchParams.get("organizationId") || ""; - - // Ensure workflow is loaded from database - if (workflowId && organizationId) { - await this.ensureLoaded(workflowId, organizationId); - } - - // Handle GET request for workflow state - if (url.pathname === "/state" && request.method === "GET") { - try { - const state = await this.getState(); - - return new Response(JSON.stringify(state), { - headers: { "Content-Type": "application/json" }, - }); - } catch (error) { - console.error("Error getting workflow state:", error); - return new Response( - JSON.stringify({ - error: "Failed to get workflow state", - details: error instanceof Error ? error.message : "Unknown error", - }), - { - status: 500, - headers: { "Content-Type": "application/json" }, - } - ); - } - } - - // Handle WebSocket connections (ensureLoaded called earlier if params present) - const upgradeHeader = request.headers.get("Upgrade"); - if (!upgradeHeader || upgradeHeader !== "websocket") { - return new Response("Expected WebSocket or /state GET request", { - status: 426, - }); - } - - const webSocketPair = new WebSocketPair(); - const [client, server] = Object.values(webSocketPair); - - this.ctx.acceptWebSocket(server); - - // Send initial state - let initState: WorkflowDOState; - try { - initState = await this.getState(); - } catch { - // Fallback minimal state - initState = { - id: workflowId, - name: "New Workflow", - handle: workflowId, - type: "manual", - nodes: [], - edges: [], - timestamp: Date.now(), - }; - } - const initMessage: WorkflowDOInitMessage = { - type: "init", - state: initState, - }; - server.send(JSON.stringify(initMessage)); - - return new Response(null, { - status: 101, - webSocket: client, - }); - } - - async webSocketMessage(ws: WebSocket, message: string | ArrayBuffer) { - try { - if (typeof message !== "string") { - const errorMsg: WorkflowDOErrorMessage = { - error: "Expected string message", - }; - ws.send(JSON.stringify(errorMsg)); - return; - } - - const data = JSON.parse(message) as WorkflowDOMessage; - - if ("type" in data && data.type === "update") { - const updateMsg = data as WorkflowDOUpdateMessage; - await this.updateState(updateMsg.nodes, updateMsg.edges); - - // Optionally echo back confirmation - const ackMsg: WorkflowDOAckMessage = { - type: "ack", - timestamp: Date.now(), - }; - ws.send(JSON.stringify(ackMsg)); - } - } catch (error) { - console.error("WebSocket message error:", error); - const errorMsg: WorkflowDOErrorMessage = { - error: "Failed to process message", - details: error instanceof Error ? error.message : "Unknown error", - }; - ws.send(JSON.stringify(errorMsg)); - } - } - - async webSocketClose( - ws: WebSocket, - code: number, - reason: string, - _wasClean: boolean - ) { - ws.close(code, reason); - } -} diff --git a/apps/api/src/index.ts b/apps/api/src/index.ts index 21ad19a7..308ea8f7 100644 --- a/apps/api/src/index.ts +++ b/apps/api/src/index.ts @@ -1,5 +1,4 @@ import { Hono } from "hono"; -export { WorkflowDO } from "./durable-objects/workflow-do"; export { Runtime } from "./runtime/runtime"; import auth from "./auth"; import { ApiContext } from "./context"; diff --git a/apps/api/src/routes/workflows.ts b/apps/api/src/routes/workflows.ts index c0073914..4ff309b2 100644 --- a/apps/api/src/routes/workflows.ts +++ b/apps/api/src/routes/workflows.ts @@ -165,48 +165,23 @@ workflowRoutes.get("/:id", jwtMiddleware, async (c) => { } try { - // Get workflow from Durable Object - const doId = c.env.WORKFLOW_DO.idFromName(`${userId}-${id}`); - const stub = c.env.WORKFLOW_DO.get(doId); - // @ts-ignore - const workflowData = await stub.getState(); - - if (!workflowData) { - // If DO doesn't have it, fall back to database - const db = createDatabase(c.env.DB); - const workflow = await getWorkflow(db, id, organizationId); - if (!workflow) { - return c.json({ error: "Workflow not found" }, 404); - } - - const workflowData = workflow.data; - - const response: GetWorkflowResponse = { - id: workflow.id, - name: workflow.name, - handle: workflow.handle, - type: workflowData.type, - createdAt: workflow.createdAt, - updatedAt: workflow.updatedAt, - nodes: workflowData.nodes || [], - edges: workflowData.edges || [], - }; + // Get workflow from database + const db = createDatabase(c.env.DB); + const workflow = await getWorkflow(db, id, organizationId); - return c.json(response); + if (!workflow) { + return c.json({ error: "Workflow not found" }, 404); } - // Get metadata from database for createdAt/updatedAt - const db = createDatabase(c.env.DB); - const workflow = await getWorkflow(db, id, organizationId); + const workflowData = workflow.data; const response: GetWorkflowResponse = { - id: workflowData.id, - name: workflowData.name, - handle: workflowData.handle, + id: workflow.id, + name: workflow.name, + handle: workflow.handle, type: workflowData.type, - createdAt: workflow?.createdAt || new Date(), - updatedAt: workflow?.updatedAt || new Date(), - // @ts-ignore + createdAt: workflow.createdAt, + updatedAt: workflow.updatedAt, nodes: workflowData.nodes || [], edges: workflowData.edges || [], }; @@ -519,40 +494,12 @@ workflowRoutes.post( let deploymentId: string | undefined; if (version === "dev") { - // Get workflow data from Durable Object first - let userId: string; - const jwtPayload = c.get("jwtPayload") as JWTTokenPayload | undefined; - if (jwtPayload) { - userId = jwtPayload.sub || "anonymous"; - } else { - userId = "api"; // Use a placeholder for API-triggered executions - } - - const doId = c.env.WORKFLOW_DO.idFromName( - `${userId}-${workflowIdOrHandle}` - ); - const stub = c.env.WORKFLOW_DO.get(doId); - const state = await stub.getState(); - - if (state) { - workflowData = { - type: state.type, - nodes: state.nodes || [], - edges: state.edges || [], - }; - workflow = { - id: workflowIdOrHandle, - name: state.name, - handle: state.handle, - }; - } else { - // Fallback to database - workflow = await getWorkflow(db, workflowIdOrHandle, organizationId); - if (!workflow) { - return c.json({ error: "Workflow not found" }, 404); - } - workflowData = workflow.data; + // Get workflow data directly from database + workflow = await getWorkflow(db, workflowIdOrHandle, organizationId); + if (!workflow) { + return c.json({ error: "Workflow not found" }, 404); } + workflowData = workflow.data; } else { // Get deployment based on version let deployment; diff --git a/apps/api/src/routes/ws.ts b/apps/api/src/routes/ws.ts index 263c7770..bd1e1337 100644 --- a/apps/api/src/routes/ws.ts +++ b/apps/api/src/routes/ws.ts @@ -1,7 +1,16 @@ +import { + WorkflowDOAckMessage, + WorkflowDOErrorMessage, + WorkflowDOInitMessage, + WorkflowDOState, + WorkflowDOUpdateMessage, + WorkflowType, +} from "@dafthunk/types"; import { Hono } from "hono"; import { jwtMiddleware } from "../auth"; import { ApiContext } from "../context"; +import { createDatabase, getWorkflow, updateWorkflow } from "../db"; const wsRoutes = new Hono(); @@ -29,16 +38,122 @@ wsRoutes.get("/", jwtMiddleware, async (c) => { ); } - // Create a unique DO ID for this user + workflow combination - const doId = c.env.WORKFLOW_DO.idFromName(`${userId}-${workflowId}`); - const stub = c.env.WORKFLOW_DO.get(doId); + // Load workflow from database + const db = createDatabase(c.env.DB); + let workflow; + try { + workflow = await getWorkflow(db, workflowId, organizationId); + } catch (error) { + console.error("Error loading workflow:", error); + return c.json({ error: "Failed to load workflow" }, 500); + } + + // Create WebSocket pair + const pair = new WebSocketPair(); + const [client, server] = Object.values(pair); + + // Accept the WebSocket connection + server.accept(); + + // Prepare initial state + const initialState: WorkflowDOState = workflow + ? { + id: workflow.id, + name: workflow.name, + handle: workflow.handle, + type: ((workflow.data as any).type || "manual") as WorkflowType, + nodes: (workflow.data as any).nodes || [], + edges: (workflow.data as any).edges || [], + timestamp: workflow.updatedAt + ? workflow.updatedAt.getTime() + : Date.now(), + } + : { + id: workflowId, + name: "New Workflow", + handle: workflowId, + type: "manual" as WorkflowType, + nodes: [], + edges: [], + timestamp: Date.now(), + }; + + // Send initial state + const initMessage: WorkflowDOInitMessage = { + type: "init", + state: initialState, + }; + server.send(JSON.stringify(initMessage)); + + // Handle incoming messages + server.addEventListener("message", async (event: MessageEvent) => { + try { + if (typeof event.data !== "string") { + const errorMsg: WorkflowDOErrorMessage = { + error: "Expected string message", + }; + server.send(JSON.stringify(errorMsg)); + return; + } + + const data = JSON.parse(event.data); + + if ("type" in data && data.type === "update") { + const updateMsg = data as WorkflowDOUpdateMessage; + + // Update workflow in database + try { + await updateWorkflow(db, workflowId, organizationId, { + data: { + id: workflowId, + name: initialState.name, + handle: initialState.handle, + type: initialState.type, + nodes: updateMsg.nodes, + edges: updateMsg.edges, + }, + }); + + // Send acknowledgment + const ackMsg: WorkflowDOAckMessage = { + type: "ack", + timestamp: Date.now(), + }; + server.send(JSON.stringify(ackMsg)); + } catch (error) { + console.error("Error updating workflow:", error); + const errorMsg: WorkflowDOErrorMessage = { + error: "Failed to update workflow", + details: error instanceof Error ? error.message : "Unknown error", + }; + server.send(JSON.stringify(errorMsg)); + } + } + } catch (error) { + console.error("WebSocket message error:", error); + const errorMsg: WorkflowDOErrorMessage = { + error: "Failed to process message", + details: error instanceof Error ? error.message : "Unknown error", + }; + server.send(JSON.stringify(errorMsg)); + } + }); + + // Handle errors + server.addEventListener("error", (event: Event) => { + console.error("WebSocket error:", event); + }); + + // Handle close + server.addEventListener("close", (event: CloseEvent) => { + console.log("WebSocket closed:", event.code, event.reason); + }); - // Reconstruct request with required query params for DO - const url = new URL(c.req.url); - url.searchParams.set("organizationId", organizationId); - url.searchParams.set("workflowId", workflowId); - const newReq = new Request(url.toString(), c.req.raw); - return stub.fetch(newReq); + // Return response with WebSocket + return new Response(null, { + status: 101, + webSocket: client, + }); }); export default wsRoutes; diff --git a/apps/api/src/utils/encryption.test.ts b/apps/api/src/utils/encryption.test.ts index f322a3a2..2061c30f 100644 --- a/apps/api/src/utils/encryption.test.ts +++ b/apps/api/src/utils/encryption.test.ts @@ -5,7 +5,6 @@ import { beforeEach, describe, expect, it } from "vitest"; import { Bindings } from "../context"; -import { WorkflowDO } from "../durable-objects/workflow-do"; import { decryptSecret, encryptSecret } from "./encryption"; // Mock Bindings for testing @@ -21,7 +20,6 @@ const createMockEnv = (masterKey?: string): Bindings => ({ RATE_LIMIT_AUTH: {} as RateLimit, RATE_LIMIT_EXECUTE: {} as RateLimit, EXECUTE: {} as Workflow, - WORKFLOW_DO: {} as DurableObjectNamespace, RESSOURCES: {} as R2Bucket, DATASETS: {} as R2Bucket, DATASETS_AUTORAG: "", diff --git a/apps/api/wrangler.jsonc b/apps/api/wrangler.jsonc index a9a1a8bd..842cab78 100644 --- a/apps/api/wrangler.jsonc +++ b/apps/api/wrangler.jsonc @@ -67,21 +67,6 @@ "analytics_engine_datasets": [ { "binding": "COMPUTE", "dataset": "dafthunk-compute-development" } ], - "durable_objects": { - "bindings": [ - { - "name": "WORKFLOW_DO", - "class_name": "WorkflowDO", - "script_name": "dafthunk-api" - } - ] - }, - "migrations": [ - { - "tag": "v1", - "new_sqlite_classes": ["WorkflowDO"] - } - ], "unsafe": { "bindings": [ { @@ -170,15 +155,6 @@ "analytics_engine_datasets": [ { "binding": "COMPUTE", "dataset": "dafthunk-compute-production" } ], - "durable_objects": { - "bindings": [ - { - "name": "WORKFLOW_DO", - "class_name": "WorkflowDO", - "script_name": "dafthunk-api" - } - ] - }, "unsafe": { "bindings": [ { From 699ca84b471b00bdf3ad70eb14c17f7aebc0915d Mon Sep 17 00:00:00 2001 From: Bertil Chapuis Date: Wed, 1 Oct 2025 11:59:45 +0200 Subject: [PATCH 4/8] Remove the reference to durable objects --- apps/api/src/routes/ws.ts | 24 +++++----- apps/web/src/services/workflow-do-service.ts | 26 +++++------ packages/types/src/workflow.ts | 46 ++++++++++---------- 3 files changed, 48 insertions(+), 48 deletions(-) diff --git a/apps/api/src/routes/ws.ts b/apps/api/src/routes/ws.ts index bd1e1337..0511b3b3 100644 --- a/apps/api/src/routes/ws.ts +++ b/apps/api/src/routes/ws.ts @@ -1,9 +1,9 @@ import { - WorkflowDOAckMessage, - WorkflowDOErrorMessage, - WorkflowDOInitMessage, - WorkflowDOState, - WorkflowDOUpdateMessage, + WorkflowAckMessage, + WorkflowErrorMessage, + WorkflowInitMessage, + WorkflowState, + WorkflowUpdateMessage, WorkflowType, } from "@dafthunk/types"; import { Hono } from "hono"; @@ -56,7 +56,7 @@ wsRoutes.get("/", jwtMiddleware, async (c) => { server.accept(); // Prepare initial state - const initialState: WorkflowDOState = workflow + const initialState: WorkflowState = workflow ? { id: workflow.id, name: workflow.name, @@ -79,7 +79,7 @@ wsRoutes.get("/", jwtMiddleware, async (c) => { }; // Send initial state - const initMessage: WorkflowDOInitMessage = { + const initMessage: WorkflowInitMessage = { type: "init", state: initialState, }; @@ -89,7 +89,7 @@ wsRoutes.get("/", jwtMiddleware, async (c) => { server.addEventListener("message", async (event: MessageEvent) => { try { if (typeof event.data !== "string") { - const errorMsg: WorkflowDOErrorMessage = { + const errorMsg: WorkflowErrorMessage = { error: "Expected string message", }; server.send(JSON.stringify(errorMsg)); @@ -99,7 +99,7 @@ wsRoutes.get("/", jwtMiddleware, async (c) => { const data = JSON.parse(event.data); if ("type" in data && data.type === "update") { - const updateMsg = data as WorkflowDOUpdateMessage; + const updateMsg = data as WorkflowUpdateMessage; // Update workflow in database try { @@ -115,14 +115,14 @@ wsRoutes.get("/", jwtMiddleware, async (c) => { }); // Send acknowledgment - const ackMsg: WorkflowDOAckMessage = { + const ackMsg: WorkflowAckMessage = { type: "ack", timestamp: Date.now(), }; server.send(JSON.stringify(ackMsg)); } catch (error) { console.error("Error updating workflow:", error); - const errorMsg: WorkflowDOErrorMessage = { + const errorMsg: WorkflowErrorMessage = { error: "Failed to update workflow", details: error instanceof Error ? error.message : "Unknown error", }; @@ -131,7 +131,7 @@ wsRoutes.get("/", jwtMiddleware, async (c) => { } } catch (error) { console.error("WebSocket message error:", error); - const errorMsg: WorkflowDOErrorMessage = { + const errorMsg: WorkflowErrorMessage = { error: "Failed to process message", details: error instanceof Error ? error.message : "Unknown error", }; diff --git a/apps/web/src/services/workflow-do-service.ts b/apps/web/src/services/workflow-do-service.ts index 1ef0f962..77d4257a 100644 --- a/apps/web/src/services/workflow-do-service.ts +++ b/apps/web/src/services/workflow-do-service.ts @@ -1,28 +1,28 @@ import type { Edge, Node, - WorkflowDOAckMessage, - WorkflowDOErrorMessage, - WorkflowDOExecutionUpdateMessage, - WorkflowDOInitMessage, - WorkflowDOState, - WorkflowDOUpdateMessage, + WorkflowAckMessage, + WorkflowErrorMessage, + WorkflowExecutionUpdateMessage, + WorkflowInitMessage, + WorkflowState, + WorkflowUpdateMessage, WorkflowExecution, } from "@dafthunk/types"; import { getApiBaseUrl } from "@/config/api"; // Re-export for convenience -export type { WorkflowDOState }; +export type { WorkflowState }; type WebSocketMessage = - | WorkflowDOInitMessage - | WorkflowDOAckMessage - | WorkflowDOErrorMessage - | WorkflowDOExecutionUpdateMessage; + | WorkflowInitMessage + | WorkflowAckMessage + | WorkflowErrorMessage + | WorkflowExecutionUpdateMessage; export interface WorkflowWSOptions { - onInit?: (state: WorkflowDOState) => void; + onInit?: (state: WorkflowState) => void; onAck?: (timestamp: number) => void; onError?: (error: string) => void; onClose?: () => void; @@ -130,7 +130,7 @@ export class WorkflowWebSocket { } try { - const updateMsg: WorkflowDOUpdateMessage = { + const updateMsg: WorkflowUpdateMessage = { type: "update", nodes, edges, diff --git a/packages/types/src/workflow.ts b/packages/types/src/workflow.ts index aefcf527..0196e94a 100644 --- a/packages/types/src/workflow.ts +++ b/packages/types/src/workflow.ts @@ -467,61 +467,61 @@ export interface GetCronTriggerResponse { export type UpsertCronTriggerResponse = GetCronTriggerResponse; /** - * WebSocket message types for Durable Object real-time sync + * WebSocket message types for real-time sync */ /** - * Workflow state stored in Durable Object + * Workflow state stored in real-time session */ -export interface WorkflowDOState extends Workflow { +export interface WorkflowState extends Workflow { timestamp: number; } /** - * Message sent from DO to client with initial state + * Message sent to client with initial state */ -export interface WorkflowDOInitMessage { +export interface WorkflowInitMessage { type: "init"; - state: WorkflowDOState; + state: WorkflowState; } /** - * Message sent from client to DO to update state + * Message sent from client to update state */ -export interface WorkflowDOUpdateMessage { +export interface WorkflowUpdateMessage { type: "update"; nodes: Node[]; edges: Edge[]; } /** - * Acknowledgment message sent from DO to client + * Acknowledgment message sent to client */ -export interface WorkflowDOAckMessage { +export interface WorkflowAckMessage { type: "ack"; timestamp: number; } /** - * Error message sent from DO to client + * Error message sent to client */ -export interface WorkflowDOErrorMessage { +export interface WorkflowErrorMessage { error: string; details?: string; } /** - * Message sent from client to DO to start workflow execution + * Message sent from client to start workflow execution */ -export interface WorkflowDOExecuteMessage { +export interface WorkflowExecuteMessage { type: "execute"; executionId: string; } /** - * Message sent from DO to client with execution progress updates + * Message sent to client with execution progress updates */ -export interface WorkflowDOExecutionUpdateMessage { +export interface WorkflowExecutionUpdateMessage { type: "execution_update"; executionId: string; status: WorkflowExecutionStatus; @@ -532,10 +532,10 @@ export interface WorkflowDOExecutionUpdateMessage { /** * All possible WebSocket messages */ -export type WorkflowDOMessage = - | WorkflowDOInitMessage - | WorkflowDOUpdateMessage - | WorkflowDOAckMessage - | WorkflowDOErrorMessage - | WorkflowDOExecuteMessage - | WorkflowDOExecutionUpdateMessage; +export type WorkflowMessage = + | WorkflowInitMessage + | WorkflowUpdateMessage + | WorkflowAckMessage + | WorkflowErrorMessage + | WorkflowExecuteMessage + | WorkflowExecutionUpdateMessage; From f595702a001bd3cc76fcc4e6280001e3cda15d16 Mon Sep 17 00:00:00 2001 From: Bertil Chapuis Date: Wed, 1 Oct 2025 15:27:03 +0200 Subject: [PATCH 5/8] Fix some bugs --- apps/api/src/routes/ws.ts | 32 +++++++++++++++-- .../components/workflow/use-workflow-state.ts | 4 +-- apps/web/src/hooks/use-editable-workflow.ts | 28 +++++++++++++-- apps/web/src/pages/editor-page.tsx | 34 ++++++++++++++++--- 4 files changed, 86 insertions(+), 12 deletions(-) diff --git a/apps/api/src/routes/ws.ts b/apps/api/src/routes/ws.ts index 0511b3b3..4c3677ba 100644 --- a/apps/api/src/routes/ws.ts +++ b/apps/api/src/routes/ws.ts @@ -101,19 +101,45 @@ wsRoutes.get("/", jwtMiddleware, async (c) => { if ("type" in data && data.type === "update") { const updateMsg = data as WorkflowUpdateMessage; + console.log("🟒 [WS SERVER] Received update message"); + console.log(" - Nodes:", updateMsg.nodes.length); + console.log(" - Edges:", updateMsg.edges.length); + // Update workflow in database try { + // Fetch the latest workflow state to avoid using stale initialState + const currentWorkflow = await getWorkflow( + db, + workflowId, + organizationId + ); + if (!currentWorkflow) { + const errorMsg: WorkflowErrorMessage = { + error: "Workflow not found", + }; + server.send(JSON.stringify(errorMsg)); + return; + } + + const currentData = currentWorkflow.data as any; + console.log( + " - Current edges in DB:", + currentData.edges?.length || 0 + ); + await updateWorkflow(db, workflowId, organizationId, { data: { id: workflowId, - name: initialState.name, - handle: initialState.handle, - type: initialState.type, + name: currentData.name || currentWorkflow.name, + handle: currentData.handle || currentWorkflow.handle, + type: currentData.type || "manual", nodes: updateMsg.nodes, edges: updateMsg.edges, }, }); + console.log(" - βœ… Saved to database"); + // Send acknowledgment const ackMsg: WorkflowAckMessage = { type: "ack", diff --git a/apps/web/src/components/workflow/use-workflow-state.ts b/apps/web/src/components/workflow/use-workflow-state.ts index 5d6c0c0b..932df9d7 100644 --- a/apps/web/src/components/workflow/use-workflow-state.ts +++ b/apps/web/src/components/workflow/use-workflow-state.ts @@ -95,11 +95,11 @@ interface UseWorkflowStateReturn { // Helper functions to replace workflowNodeStateService const stripExecutionFields = ( data: WorkflowNodeType -): Omit & { +): Omit & { outputs: Omit[]; inputs: Omit[]; } => { - const { executionState, error, ...rest } = data; + const { executionState, error, nodeTemplates, ...rest } = data; return { ...rest, diff --git a/apps/web/src/hooks/use-editable-workflow.ts b/apps/web/src/hooks/use-editable-workflow.ts index e1ac5450..064fae4f 100644 --- a/apps/web/src/hooks/use-editable-workflow.ts +++ b/apps/web/src/hooks/use-editable-workflow.ts @@ -64,7 +64,9 @@ export function useEditableWorkflow({ const ws = connectWorkflowWS(organization.handle, workflowId, { onInit: (state: WorkflowDOState) => { - console.log("WebSocket received initial state:", state); + console.log("πŸ”΅ [WS] Received initial state"); + console.log(" - Nodes:", state.nodes.length); + console.log(" - Edges:", state.edges.length, state.edges); try { // Store workflow metadata - id and type are required, name and handle can be empty if (state.id && state.type) { @@ -96,10 +98,16 @@ export function useEditableWorkflow({ }) ); + console.log( + " - Converted to ReactFlow edges:", + reactFlowEdges.length + ); setNodes(reactFlowNodes); setEdges(reactFlowEdges); + console.log(" - Set edges in state"); setProcessingError(null); setIsInitializing(false); + console.log(" - Initialization complete"); } catch (error) { console.error("Error processing WebSocket state:", error); setProcessingError("Failed to load state from WebSocket"); @@ -141,6 +149,17 @@ export function useEditableWorkflow({ nodesToSave: Node[], edgesToSave: Edge[] ) => { + console.log("🟑 [SAVE] saveWorkflowInternal called"); + console.log(" - isInitializing:", isInitializing); + console.log(" - Nodes to save:", nodesToSave.length); + console.log(" - Edges to save:", edgesToSave.length, edgesToSave); + + // Don't save while still loading initial state from WebSocket + if (isInitializing) { + console.log(" - ⏸️ SKIPPED (still initializing)"); + return; + } + if (!workflowId) { setSavingError("Workflow ID is missing, cannot save."); return; @@ -198,6 +217,11 @@ export function useEditableWorkflow({ targetInput: edge.targetHandle || "", })); + console.log( + " - πŸ“€ Sending to WebSocket:", + workflowEdges.length, + "edges" + ); wsRef.current.send(workflowNodes, workflowEdges); return; } catch (error) { @@ -211,7 +235,7 @@ export function useEditableWorkflow({ ); setSavingError("WebSocket not connected. Please refresh the page."); }, - [workflowId] + [workflowId, isInitializing] ); const saveWorkflow = useMemo( diff --git a/apps/web/src/pages/editor-page.tsx b/apps/web/src/pages/editor-page.tsx index cfee25fc..064a9516 100644 --- a/apps/web/src/pages/editor-page.tsx +++ b/apps/web/src/pages/editor-page.tsx @@ -1,7 +1,7 @@ import type { WorkflowType } from "@dafthunk/types"; import type { Connection, Edge, Node } from "@xyflow/react"; import { ReactFlowProvider } from "@xyflow/react"; -import { useCallback, useEffect, useMemo, useState } from "react"; +import { useCallback, useEffect, useMemo, useRef, useState } from "react"; import { useNavigate, useParams } from "react-router"; import { toast } from "sonner"; @@ -133,6 +133,10 @@ export function EditorPage() { [] ); + // Use refs to always have the latest values without causing callback recreation + const latestUiNodesRef = useRef[]>([]); + const latestUiEdgesRef = useRef[]>([]); + const handleOpenSetCronDialog = useCallback(() => { mutateDeploymentHistory(); mutateCronTrigger(); @@ -166,34 +170,54 @@ export function EditorPage() { useEffect(() => { if (initialNodesForUI) { + console.log( + "πŸ”΅ [EDITOR] Setting initial nodes:", + initialNodesForUI.length + ); setLatestUiNodes(initialNodesForUI); + latestUiNodesRef.current = initialNodesForUI; } }, [initialNodesForUI]); useEffect(() => { if (initialEdgesForUI) { + console.log( + "πŸ”΅ [EDITOR] Setting initial edges:", + initialEdgesForUI.length + ); setLatestUiEdges(initialEdgesForUI); + latestUiEdgesRef.current = initialEdgesForUI; } }, [initialEdgesForUI]); const handleUiNodesChanged = useCallback( (updatedNodesFromUI: Node[]) => { + console.log("πŸ”΄ [EDITOR] handleUiNodesChanged"); + console.log(" - New nodes:", updatedNodesFromUI.length); + console.log(" - Current edges (ref):", latestUiEdgesRef.current.length); setLatestUiNodes(updatedNodesFromUI); + latestUiNodesRef.current = updatedNodesFromUI; if (workflowMetadata) { - saveWorkflow(updatedNodesFromUI, latestUiEdges); + // Use ref to get latest edges, not closure variable + saveWorkflow(updatedNodesFromUI, latestUiEdgesRef.current); } }, - [latestUiEdges, saveWorkflow, workflowMetadata] + [saveWorkflow, workflowMetadata] ); const handleUiEdgesChanged = useCallback( (updatedEdgesFromUI: Edge[]) => { + console.log("🟣 [EDITOR] handleUiEdgesChanged"); + console.log(" - New edges:", updatedEdgesFromUI.length); + console.log(" - Current nodes (ref):", latestUiNodesRef.current.length); setLatestUiEdges(updatedEdgesFromUI); + latestUiEdgesRef.current = updatedEdgesFromUI; if (workflowMetadata) { - saveWorkflow(latestUiNodes, updatedEdgesFromUI); + // Use ref to get latest nodes, not closure variable + saveWorkflow(latestUiNodesRef.current, updatedEdgesFromUI); } }, - [latestUiNodes, saveWorkflow, workflowMetadata] + [saveWorkflow, workflowMetadata] ); const { From a5406674651db8e574273e8ac62217e4a0bfb9ee Mon Sep 17 00:00:00 2001 From: Bertil Chapuis Date: Wed, 1 Oct 2025 15:43:51 +0200 Subject: [PATCH 6/8] Refactor and simplify the code --- apps/api/src/routes/ws.ts | 12 +----- .../components/workflow/use-workflow-state.ts | 1 + apps/web/src/hooks/use-editable-workflow.ts | 42 ++++--------------- apps/web/src/pages/editor-page.tsx | 16 ------- 4 files changed, 9 insertions(+), 62 deletions(-) diff --git a/apps/api/src/routes/ws.ts b/apps/api/src/routes/ws.ts index 4c3677ba..c3da6032 100644 --- a/apps/api/src/routes/ws.ts +++ b/apps/api/src/routes/ws.ts @@ -101,13 +101,9 @@ wsRoutes.get("/", jwtMiddleware, async (c) => { if ("type" in data && data.type === "update") { const updateMsg = data as WorkflowUpdateMessage; - console.log("🟒 [WS SERVER] Received update message"); - console.log(" - Nodes:", updateMsg.nodes.length); - console.log(" - Edges:", updateMsg.edges.length); - // Update workflow in database try { - // Fetch the latest workflow state to avoid using stale initialState + // Fetch the latest workflow to avoid using stale initialState captured at connection time const currentWorkflow = await getWorkflow( db, workflowId, @@ -122,10 +118,6 @@ wsRoutes.get("/", jwtMiddleware, async (c) => { } const currentData = currentWorkflow.data as any; - console.log( - " - Current edges in DB:", - currentData.edges?.length || 0 - ); await updateWorkflow(db, workflowId, organizationId, { data: { @@ -138,8 +130,6 @@ wsRoutes.get("/", jwtMiddleware, async (c) => { }, }); - console.log(" - βœ… Saved to database"); - // Send acknowledgment const ackMsg: WorkflowAckMessage = { type: "ack", diff --git a/apps/web/src/components/workflow/use-workflow-state.ts b/apps/web/src/components/workflow/use-workflow-state.ts index 932df9d7..835e13cb 100644 --- a/apps/web/src/components/workflow/use-workflow-state.ts +++ b/apps/web/src/components/workflow/use-workflow-state.ts @@ -99,6 +99,7 @@ const stripExecutionFields = ( outputs: Omit[]; inputs: Omit[]; } => { + // Exclude nodeTemplates from comparison - it's UI metadata that shouldn't trigger persistence const { executionState, error, nodeTemplates, ...rest } = data; return { diff --git a/apps/web/src/hooks/use-editable-workflow.ts b/apps/web/src/hooks/use-editable-workflow.ts index 064fae4f..bbc2029a 100644 --- a/apps/web/src/hooks/use-editable-workflow.ts +++ b/apps/web/src/hooks/use-editable-workflow.ts @@ -1,6 +1,6 @@ import type { Parameter, ParameterType } from "@dafthunk/types"; import type { Edge, Node } from "@xyflow/react"; -import { useCallback, useEffect, useMemo, useRef, useState } from "react"; +import { useCallback, useEffect, useRef, useState } from "react"; import { useAuth } from "@/components/auth-context"; import type { @@ -14,7 +14,6 @@ import { WorkflowWebSocket, } from "@/services/workflow-do-service"; import { adaptDeploymentNodesToReactFlowNodes } from "@/utils/utils"; -import { debounce } from "@/utils/utils"; interface UseEditableWorkflowProps { workflowId: string | undefined; @@ -64,11 +63,8 @@ export function useEditableWorkflow({ const ws = connectWorkflowWS(organization.handle, workflowId, { onInit: (state: WorkflowDOState) => { - console.log("πŸ”΅ [WS] Received initial state"); - console.log(" - Nodes:", state.nodes.length); - console.log(" - Edges:", state.edges.length, state.edges); try { - // Store workflow metadata - id and type are required, name and handle can be empty + // Store workflow metadata if (state.id && state.type) { setWorkflowMetadata({ id: state.id, @@ -78,6 +74,7 @@ export function useEditableWorkflow({ }); } + // Convert to ReactFlow format const reactFlowNodes = adaptDeploymentNodesToReactFlowNodes( state.nodes, nodeTemplates @@ -98,16 +95,10 @@ export function useEditableWorkflow({ }) ); - console.log( - " - Converted to ReactFlow edges:", - reactFlowEdges.length - ); setNodes(reactFlowNodes); setEdges(reactFlowEdges); - console.log(" - Set edges in state"); setProcessingError(null); setIsInitializing(false); - console.log(" - Initialization complete"); } catch (error) { console.error("Error processing WebSocket state:", error); setProcessingError("Failed to load state from WebSocket"); @@ -115,11 +106,9 @@ export function useEditableWorkflow({ } }, onOpen: () => { - console.log("WebSocket connected"); setIsWSConnected(true); }, onClose: () => { - console.log("WebSocket disconnected"); setIsWSConnected(false); }, onError: (error) => { @@ -149,14 +138,9 @@ export function useEditableWorkflow({ nodesToSave: Node[], edgesToSave: Edge[] ) => { - console.log("🟑 [SAVE] saveWorkflowInternal called"); - console.log(" - isInitializing:", isInitializing); - console.log(" - Nodes to save:", nodesToSave.length); - console.log(" - Edges to save:", edgesToSave.length, edgesToSave); - - // Don't save while still loading initial state from WebSocket + // Block saves during initialization to prevent race condition where + // nodeTemplates load before edges, causing empty edges to be saved if (isInitializing) { - console.log(" - ⏸️ SKIPPED (still initializing)"); return; } @@ -217,11 +201,6 @@ export function useEditableWorkflow({ targetInput: edge.targetHandle || "", })); - console.log( - " - πŸ“€ Sending to WebSocket:", - workflowEdges.length, - "edges" - ); wsRef.current.send(workflowNodes, workflowEdges); return; } catch (error) { @@ -238,15 +217,8 @@ export function useEditableWorkflow({ [workflowId, isInitializing] ); - const saveWorkflow = useMemo( - () => - debounce( - (nodes: Node[], edges: Edge[]) => - saveWorkflowInternal(nodes, edges), - 1000 - ), - [saveWorkflowInternal] - ); + // No debouncing needed - WebSocket handles message batching naturally + const saveWorkflow = saveWorkflowInternal; return { nodes, diff --git a/apps/web/src/pages/editor-page.tsx b/apps/web/src/pages/editor-page.tsx index 064a9516..900cedf1 100644 --- a/apps/web/src/pages/editor-page.tsx +++ b/apps/web/src/pages/editor-page.tsx @@ -170,10 +170,6 @@ export function EditorPage() { useEffect(() => { if (initialNodesForUI) { - console.log( - "πŸ”΅ [EDITOR] Setting initial nodes:", - initialNodesForUI.length - ); setLatestUiNodes(initialNodesForUI); latestUiNodesRef.current = initialNodesForUI; } @@ -181,10 +177,6 @@ export function EditorPage() { useEffect(() => { if (initialEdgesForUI) { - console.log( - "πŸ”΅ [EDITOR] Setting initial edges:", - initialEdgesForUI.length - ); setLatestUiEdges(initialEdgesForUI); latestUiEdgesRef.current = initialEdgesForUI; } @@ -192,13 +184,9 @@ export function EditorPage() { const handleUiNodesChanged = useCallback( (updatedNodesFromUI: Node[]) => { - console.log("πŸ”΄ [EDITOR] handleUiNodesChanged"); - console.log(" - New nodes:", updatedNodesFromUI.length); - console.log(" - Current edges (ref):", latestUiEdgesRef.current.length); setLatestUiNodes(updatedNodesFromUI); latestUiNodesRef.current = updatedNodesFromUI; if (workflowMetadata) { - // Use ref to get latest edges, not closure variable saveWorkflow(updatedNodesFromUI, latestUiEdgesRef.current); } }, @@ -207,13 +195,9 @@ export function EditorPage() { const handleUiEdgesChanged = useCallback( (updatedEdgesFromUI: Edge[]) => { - console.log("🟣 [EDITOR] handleUiEdgesChanged"); - console.log(" - New edges:", updatedEdgesFromUI.length); - console.log(" - Current nodes (ref):", latestUiNodesRef.current.length); setLatestUiEdges(updatedEdgesFromUI); latestUiEdgesRef.current = updatedEdgesFromUI; if (workflowMetadata) { - // Use ref to get latest nodes, not closure variable saveWorkflow(latestUiNodesRef.current, updatedEdgesFromUI); } }, From 38b71ae4874a331aebba96e047ac829500c82f52 Mon Sep 17 00:00:00 2001 From: Bertil Chapuis Date: Wed, 1 Oct 2025 16:08:50 +0200 Subject: [PATCH 7/8] Use onNodeDragStop to trigger events --- .../components/workflow/use-workflow-state.ts | 42 +++++++++++++------ .../components/workflow/workflow-builder.tsx | 2 + .../components/workflow/workflow-canvas.tsx | 6 +++ 3 files changed, 38 insertions(+), 12 deletions(-) diff --git a/apps/web/src/components/workflow/use-workflow-state.ts b/apps/web/src/components/workflow/use-workflow-state.ts index 835e13cb..8f00a1ef 100644 --- a/apps/web/src/components/workflow/use-workflow-state.ts +++ b/apps/web/src/components/workflow/use-workflow-state.ts @@ -65,6 +65,10 @@ interface UseWorkflowStateReturn { onConnect: OnConnect; onConnectStart: OnConnectStart; onConnectEnd: OnConnectEnd; + onNodeDragStop: ( + event: React.MouseEvent, + node: ReactFlowNode + ) => void; connectionValidationState: ConnectionValidationState; isValidConnection: IsValidConnection>; handleAddNode: () => void; @@ -340,33 +344,36 @@ export function useWorkflowState({ } }, [initialEdges, readonly, setEdges]); - // Effect to notify parent of changes for nodes + // Effect to notify parent of changes for nodes (excluding position changes during drag) useEffect(() => { if (readonly) return; const nodeCountChanged = nodes.length !== initialNodes.length; - const hasDataOrPositionChanges = nodes.some((node) => { - const initialNode = initialNodes.find((n) => n.id === node.id); - if (!initialNode) return true; - if ( - node.position.x !== initialNode.position.x || - node.position.y !== initialNode.position.y - ) { - return true; + // Check for data changes (excluding position) + let hasDataChanges = false; + nodes.forEach((node) => { + const initialNode = initialNodes.find((n) => n.id === node.id); + if (!initialNode) { + hasDataChanges = true; + return; } + // Check data changes (not position) const nodeData = stripExecutionFields(node.data); const initialNodeData = stripExecutionFields(initialNode.data); - return JSON.stringify(nodeData) !== JSON.stringify(initialNodeData); + if (JSON.stringify(nodeData) !== JSON.stringify(initialNodeData)) { + hasDataChanges = true; + } }); - // Check for deleted nodes by looking for initialNodes that don't exist in the current nodes + // Check for deleted nodes const hasDeletedNodes = initialNodes.some( (initialNode) => !nodes.some((node) => node.id === initialNode.id) ); - if (nodeCountChanged || hasDataOrPositionChanges || hasDeletedNodes) { + // Save for data changes or node add/delete (position changes handled by onNodeDragStop) + if (nodeCountChanged || hasDataChanges || hasDeletedNodes) { onNodesChangePersistCallback?.(nodes); } }, [nodes, onNodesChangePersistCallback, initialNodes, readonly]); @@ -433,6 +440,16 @@ export function useWorkflowState({ setConnectionValidationState("default"); }, [readonly]); + // Handle node drag stop - save positions after drag completes + const onNodeDragStop = useCallback( + (event: React.MouseEvent, node: ReactFlowNode) => { + if (readonly) return; + // Save with current node positions after drag completes + onNodesChangePersistCallback?.(nodes); + }, + [readonly, nodes, onNodesChangePersistCallback] + ); + // Function to validate connection based on type compatibility const isValidConnection = useCallback( (connection: any) => { @@ -1129,6 +1146,7 @@ export function useWorkflowState({ onConnect, onConnectStart, onConnectEnd, + onNodeDragStop: readonly ? () => {} : onNodeDragStop, connectionValidationState, isValidConnection, handleAddNode, diff --git a/apps/web/src/components/workflow/workflow-builder.tsx b/apps/web/src/components/workflow/workflow-builder.tsx index 469879b0..40dccb18 100644 --- a/apps/web/src/components/workflow/workflow-builder.tsx +++ b/apps/web/src/components/workflow/workflow-builder.tsx @@ -125,6 +125,7 @@ export function WorkflowBuilder({ cutSelected, pasteFromClipboard, hasClipboardData, + onNodeDragStop, } = useWorkflowState({ initialNodes, initialEdges, @@ -431,6 +432,7 @@ export function WorkflowBuilder({ onConnectStart={readonly ? () => {} : onConnectStart} onConnectEnd={readonly ? () => {} : onConnectEnd} onNodeDoubleClick={handleNodeDoubleClick} + onNodeDragStop={onNodeDragStop} onInit={setReactFlowInstance} onAddNode={readonly ? undefined : handleAddNode} onAction={ diff --git a/apps/web/src/components/workflow/workflow-canvas.tsx b/apps/web/src/components/workflow/workflow-canvas.tsx index f39fcf22..d7777c0c 100644 --- a/apps/web/src/components/workflow/workflow-canvas.tsx +++ b/apps/web/src/components/workflow/workflow-canvas.tsx @@ -152,6 +152,10 @@ export interface WorkflowCanvasProps { onConnect: OnConnect; onConnectStart: OnConnectStart; onConnectEnd: OnConnectEnd; + onNodeDragStop: ( + event: React.MouseEvent, + node: ReactFlowNode + ) => void; onNodeDoubleClick?: (event: React.MouseEvent) => void; onInit: ( instance: ReactFlowInstance< @@ -695,6 +699,7 @@ export function WorkflowCanvas({ onConnectStart, onConnectEnd, onNodeDoubleClick, + onNodeDragStop, onInit, onAddNode, onAction, @@ -765,6 +770,7 @@ export function WorkflowCanvas({ onConnectStart={readonly ? () => {} : onConnectStart} onConnectEnd={readonly ? () => {} : onConnectEnd} onNodeDoubleClick={onNodeDoubleClick} + onNodeDragStop={onNodeDragStop} nodeTypes={nodeTypes} edgeTypes={edgeTypes} connectionMode={ConnectionMode.Strict} From 5075f0f2f29df150df9648460b679309fa626812 Mon Sep 17 00:00:00 2001 From: Bertil Chapuis Date: Wed, 1 Oct 2025 16:11:42 +0200 Subject: [PATCH 8/8] Fix formatting and linting errors --- apps/api/src/routes/ws.ts | 2 +- apps/web/src/components/workflow/use-workflow-state.ts | 2 +- apps/web/src/pages/editor-page.tsx | 2 +- apps/web/src/services/workflow-do-service.ts | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/apps/api/src/routes/ws.ts b/apps/api/src/routes/ws.ts index c3da6032..42004526 100644 --- a/apps/api/src/routes/ws.ts +++ b/apps/api/src/routes/ws.ts @@ -3,8 +3,8 @@ import { WorkflowErrorMessage, WorkflowInitMessage, WorkflowState, - WorkflowUpdateMessage, WorkflowType, + WorkflowUpdateMessage, } from "@dafthunk/types"; import { Hono } from "hono"; diff --git a/apps/web/src/components/workflow/use-workflow-state.ts b/apps/web/src/components/workflow/use-workflow-state.ts index 8f00a1ef..e5811c81 100644 --- a/apps/web/src/components/workflow/use-workflow-state.ts +++ b/apps/web/src/components/workflow/use-workflow-state.ts @@ -442,7 +442,7 @@ export function useWorkflowState({ // Handle node drag stop - save positions after drag completes const onNodeDragStop = useCallback( - (event: React.MouseEvent, node: ReactFlowNode) => { + (_event: React.MouseEvent, _node: ReactFlowNode) => { if (readonly) return; // Save with current node positions after drag completes onNodesChangePersistCallback?.(nodes); diff --git a/apps/web/src/pages/editor-page.tsx b/apps/web/src/pages/editor-page.tsx index 900cedf1..b32baff9 100644 --- a/apps/web/src/pages/editor-page.tsx +++ b/apps/web/src/pages/editor-page.tsx @@ -129,7 +129,7 @@ export function EditorPage() { const [latestUiNodes, setLatestUiNodes] = useState[]>( [] ); - const [latestUiEdges, setLatestUiEdges] = useState[]>( + const [_latestUiEdges, setLatestUiEdges] = useState[]>( [] ); diff --git a/apps/web/src/services/workflow-do-service.ts b/apps/web/src/services/workflow-do-service.ts index 77d4257a..77541c65 100644 --- a/apps/web/src/services/workflow-do-service.ts +++ b/apps/web/src/services/workflow-do-service.ts @@ -3,11 +3,11 @@ import type { Node, WorkflowAckMessage, WorkflowErrorMessage, + WorkflowExecution, WorkflowExecutionUpdateMessage, WorkflowInitMessage, WorkflowState, WorkflowUpdateMessage, - WorkflowExecution, } from "@dafthunk/types"; import { getApiBaseUrl } from "@/config/api";