diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 00000000..8b42ac42 --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,160 @@ +# AGENTS.md + +## Project Overview + +Dafthunk is a visual workflow automation platform built on Cloudflare infrastructure (Workers, D1, R2, AI). Users create workflows by connecting 50+ node types in a visual editor (React Flow). + +**Monorepo structure** (pnpm workspaces): +- `apps/api` - Backend (Hono on Cloudflare Workers) +- `apps/web` - Frontend (React 19 + React Router v7 + Vite) +- `packages/types` - Shared TypeScript types +- `packages/utils` - Shared utilities + +## Development Commands + +### Common commands +```bash +pnpm dev # Start all services +pnpm build # Build all packages and apps +pnpm typecheck # Type check all workspaces +pnpm lint # Lint and type check +pnpm fix # Auto-fix linting + format +pnpm test # Run tests + +# Workspace-specific (use --filter) +pnpm --filter '@dafthunk/api' dev # API dev server (port 3001) +pnpm --filter '@dafthunk/web' dev # Web dev server (port 3000) +pnpm --filter '@dafthunk/api' test:integration # Integration tests + +# Database migrations +pnpm --filter '@dafthunk/api' db:migrate # Apply migrations locally +pnpm --filter '@dafthunk/api' db:generate # Generate new migrations +pnpm --filter '@dafthunk/api' db:prod:migrate # Apply to production +``` + +## Architecture + +### Backend: API (`apps/api/`) + +**Routes** (`src/routes/`) +- Organized by feature (workflows, executions, objects, etc.) +- Stateless: each request is self-contained +- Auth in `src/auth.ts` (JWT + API Keys) +- Multi-tenant: always scope by `organizationId` from context (`c.get("organizationId")`) +- Validate with Zod + `@hono/zod-validator` + +**Database** (`src/db/`) +- D1 (SQLite) + Drizzle ORM +- Schema: `schema/index.ts` +- Queries: `queries.ts` +- Migrations: `migrations/` (generate with `drizzle-kit`) +- Convention: `snake_case` in SQL, `camelCase` in TypeScript + +**Workflow Runtime** (`src/runtime/`) +- `runtime.ts` - Cloudflare Workflows for durable execution +- Durable Objects manage state +- `object-store.ts` - Node outputs (R2 + transient storage) +- Executes nodes by graph topology + +**Node System** (`src/nodes/`) +- node types in category folders: `text/`, `image/`, `audio/`, `browser/`, `logic/`, `math/`, `javascript/`, `anthropic/`, `openai/`, `gemini/`, `3d/`, `date/`, `document/`, `email/`, `geo/`, `json/`, `net/`, `parameter/`, `rag/` +- Registry: `base-node-registry.ts` and `cloudflare-node-registry.ts` +- All implement common interface from `packages/types` + +### Frontend: Web (`apps/web/`) + +**Structure** +- Pages: `src/pages/` (one file per route) +- Components: `src/components/` (`ui/` = shadcn/ui, `workflow/` = React Flow editor) +- Routes: `src/routes.tsx` (React Router v7) +- Services: `src/services/` (API clients) + +**Patterns** +- Data fetching: SWR (consolidate related calls) +- Styling: Tailwind CSS only (use `cn()` utility) +- State: Avoid `useEffect`, prefer derived state + +### Shared: Types (`packages/types/`) +- Single source of truth for data structures +- Backend serializes, frontend deserializes/validates +- Ensures type safety across stack + +## Design Principles + +When writing or refactoring code: + +### Simplify Interfaces +- Export only what's necessary—hide everything else +- Keep public APIs small (fewer exports = less complexity) +- Use barrel exports (`index.ts`) to define module boundaries +- If a function/class can't be described in one sentence, split it + +### Manage Complexity +- Push complexity into lower-level modules with simple APIs +- Eliminate unnecessary state, conditionals, and abstractions +- Keep related logic together; separate unrelated concerns +- Depend on interfaces/types, not concrete implementations + +### Prioritize Maintainability +- Write the calling code you want first, then implement to match +- After code works, refactor to simplify the interface +- Use comments for *why* (design decisions, trade-offs), not *what* (code explains itself) +- Front-load architectural decisions (module boundaries, data flow); defer details (naming, parameters) + +## Code Guidelines + +### TypeScript Style +- Strict mode: never use `any` or `unknown` +- Prefer `interface` over `type` for object shapes +- Always use `import type` for type-only imports +- Use early returns to avoid deep nesting + +### Naming Conventions +``` +Files: kebab-case.tsx +Functions: camelCase() +Hooks: useCamelCase() +Event handlers: handleClick() +Components: PascalCase +``` + +### React (apps/web) +```tsx +// ✓ Correct +import { Link } from 'react-router' // not react-router-dom +import type { User } from '@dafthunk/types' +export function MyComponent() { ... } // functional component + +// Data fetching +const { data } = useSWR(['/users', '/posts'], fetchAll) // consolidate + +// Styling +
+ +// Avoid useEffect - prefer derived state or move logic outside React +``` + +### Hono API (apps/api) +```ts +// Routes by feature +const workflows = new Hono() +workflows.get('/', zValidator('query', schema), (c) => { + const orgId = c.get('organizationId') // always scope by org + // ... +}) +app.route('/workflows', workflows) + +// Database +const users = sqliteTable('users', { + createdAt: text('created_at'), // snake_case in DB +}) +export type User = InferModel +``` + +### Testing +```ts +// Unit tests: *.test.ts +import { describe, it, expect } from 'vitest' + +// Integration tests: *.integration.ts +``` diff --git a/apps/api/.gitignore b/apps/api/.gitignore index ce8386dc..e31c93df 100644 --- a/apps/api/.gitignore +++ b/apps/api/.gitignore @@ -170,6 +170,7 @@ dist .dev.vars .wrangler/ +worker-configuration.d.ts # macOS .DS_Store diff --git a/apps/api/eslint.config.mjs b/apps/api/eslint.config.mjs index 393124e0..7e867cac 100644 --- a/apps/api/eslint.config.mjs +++ b/apps/api/eslint.config.mjs @@ -6,7 +6,14 @@ import globals from "globals"; import tseslint from "typescript-eslint"; export default defineConfig([ - { ignores: ["dist", ".wrangler/**", "node_modules/**"] }, + { + ignores: [ + "dist", + ".wrangler/**", + "node_modules/**", + "worker-configuration.d.ts", + ], + }, { files: ["**/*.{js,mjs,cjs,ts}"], plugins: { diff --git a/apps/api/package.json b/apps/api/package.json index 9468d629..db99e9a7 100644 --- a/apps/api/package.json +++ b/apps/api/package.json @@ -24,7 +24,6 @@ }, "devDependencies": { "@cloudflare/vitest-pool-workers": "^0.8.58", - "@cloudflare/workers-types": "^4.20250726.0", "@eslint/js": "^9.26.0", "@types/mailparser": "^3.4.6", "@types/node": "^22.15.3", @@ -38,7 +37,7 @@ "typescript": "^5.8.3", "typescript-eslint": "^8.31.1", "vitest": "^3.2.4", - "wrangler": "^4.26.1" + "wrangler": "^4.42.0" }, "dependencies": { "@anthropic-ai/sdk": "^0.62.0", diff --git a/apps/api/src/context.ts b/apps/api/src/context.ts index 8832023c..d3e2f21e 100644 --- a/apps/api/src/context.ts +++ b/apps/api/src/context.ts @@ -1,5 +1,6 @@ import { JWTTokenPayload } from "@dafthunk/types"; +import { WorkflowSession } from "./durable-objects/workflow-session"; import { RuntimeParams } from "./runtime/runtime"; export interface Bindings { @@ -9,11 +10,12 @@ export interface Bindings { RATE_LIMIT_AUTH: RateLimit; RATE_LIMIT_EXECUTE: RateLimit; EXECUTE: Workflow; + WORKFLOW_SESSION: DurableObjectNamespace; RESSOURCES: R2Bucket; DATASETS: R2Bucket; DATASETS_AUTORAG: string; AI: Ai; - BROWSER: Fetcher; + BROWSER?: Fetcher; COMPUTE: AnalyticsEngineDataset; WEB_HOST: string; EMAIL_DOMAIN: string; diff --git a/apps/api/src/cron.ts b/apps/api/src/cron.ts index 458af3b4..9cc53659 100644 --- a/apps/api/src/cron.ts +++ b/apps/api/src/cron.ts @@ -1,4 +1,3 @@ -import { ExecutionContext } from "@cloudflare/workers-types"; import { Node, Workflow as WorkflowType } from "@dafthunk/types"; import CronParser from "cron-parser"; @@ -51,7 +50,6 @@ async function executeWorkflow( nodes: workflowData.nodes, edges: workflowData.edges, }, - monitorProgress: false, deploymentId: deploymentId, }, }); diff --git a/apps/api/src/db/index.ts b/apps/api/src/db/index.ts index 00348fd8..a22154a3 100644 --- a/apps/api/src/db/index.ts +++ b/apps/api/src/db/index.ts @@ -1,4 +1,3 @@ -import type { D1Database } from "@cloudflare/workers-types"; import { drizzle } from "drizzle-orm/d1"; import { type DrizzleD1Database } from "drizzle-orm/d1"; diff --git a/apps/api/src/db/queries.ts b/apps/api/src/db/queries.ts index a5d098bc..d55d03d1 100644 --- a/apps/api/src/db/queries.ts +++ b/apps/api/src/db/queries.ts @@ -302,6 +302,42 @@ export async function getWorkflow( return workflow?.workflows; } +/** + * Get a workflow that the user has access to through their organization memberships + * + * @param db Database instance + * @param workflowIdOrHandle Workflow ID or handle + * @param userId User ID to check access for + * @returns The workflow and organization ID if user has access, undefined otherwise + */ +export async function getWorkflowWithUserAccess( + db: ReturnType, + workflowIdOrHandle: string, + userId: string +): Promise<{ workflow: WorkflowRow; organizationId: string } | undefined> { + const [result] = await db + .select({ + workflow: workflows, + organizationId: workflows.organizationId, + }) + .from(workflows) + .innerJoin( + memberships, + eq(workflows.organizationId, memberships.organizationId) + ) + .where( + and( + eq(memberships.userId, userId), + getWorkflowCondition(workflowIdOrHandle) + ) + ) + .limit(1); + + return result + ? { workflow: result.workflow, organizationId: result.organizationId } + : undefined; +} + /** * Get the latest deployment for a workflow * diff --git a/apps/api/src/durable-objects/workflow-session.ts b/apps/api/src/durable-objects/workflow-session.ts new file mode 100644 index 00000000..791537c4 --- /dev/null +++ b/apps/api/src/durable-objects/workflow-session.ts @@ -0,0 +1,508 @@ +/** + * WorkflowSession Durable Object + * + * Manages workflow state synchronization and execution coordination via WebSocket. + * Clients connect via WebSocket to sync state and receive realtime execution updates. + */ + +import { + WorkflowErrorMessage, + WorkflowExecuteMessage, + WorkflowExecution, + WorkflowExecutionUpdateMessage, + WorkflowInitMessage, + WorkflowMessage, + WorkflowState, + WorkflowType, + WorkflowUpdateMessage, +} from "@dafthunk/types"; +import { DurableObject } from "cloudflare:workers"; + +import { Bindings } from "../context"; +import { createDatabase, ExecutionStatus, saveExecution } from "../db/index"; +import { + getOrganizationComputeCredits, + getWorkflowWithUserAccess, + updateWorkflow, +} from "../db/queries"; + +export class WorkflowSession extends DurableObject { + private static readonly PERSIST_DEBOUNCE_MS = 500; + + private state: WorkflowState | null = null; + private organizationId: string | null = null; + private userId: string | null = null; + private pendingPersistTimeout: number | undefined = undefined; + private connectedUsers: Set = new Set(); + private executions: Map = new Map(); + private executionIdToWebSocket: Map = new Map(); + + constructor(ctx: DurableObjectState, env: Bindings) { + super(ctx, env); + } + + /** + * Load workflow from D1 database with user access verification + */ + private async loadState(workflowId: string, userId: string): Promise { + console.log(`Loading workflow ${workflowId} for user ${userId}`); + const db = createDatabase(this.env.DB); + const result = await getWorkflowWithUserAccess(db, workflowId, userId); + + if (!result) { + throw new Error( + `User ${userId} does not have access to workflow ${workflowId}` + ); + } + + const { workflow, organizationId } = result; + + const { name, handle, type, nodes, edges, timestamp } = + this.extractWorkflowData(workflow, workflowId); + + this.state = { + id: workflowId, + name, + handle, + type, + nodes, + edges, + timestamp, + }; + + this.organizationId = organizationId; + } + + private extractWorkflowData(workflow: any, workflowId: string) { + return { + name: workflow?.name || "New Workflow", + handle: workflow?.handle || workflowId, + type: (workflow?.data?.type || "manual") as WorkflowType, + nodes: workflow?.data?.nodes || [], + edges: workflow?.data?.edges || [], + timestamp: workflow?.updatedAt?.getTime() || Date.now(), + }; + } + + /** + * Get state from memory + */ + async getState(): Promise { + if (!this.state) { + throw new Error("Workflow not loaded"); + } + + return this.state; + } + + async updateState(state: WorkflowState): Promise { + if (!this.state) { + throw new Error("Workflow not loaded"); + } + + // Validate incoming state matches current state + if (state.id !== this.state.id) { + throw new Error( + `Workflow ID mismatch: expected ${this.state.id}, got ${state.id}` + ); + } + + // Validate required fields + if (!state.name || !state.handle || !state.type) { + throw new Error( + "Invalid state: missing required fields (name, handle, or type)" + ); + } + + // Validate arrays are present + if (!Array.isArray(state.nodes) || !Array.isArray(state.edges)) { + throw new Error("Invalid state: nodes and edges must be arrays"); + } + + this.state = state; + + // Broadcast to all connected users + this.broadcast(state); + + // Debounce persistence to reduce D1 writes on rapid updates + this.schedulePersist(); + } + + /** + * Broadcast state update to all connected users + */ + private broadcast(state: WorkflowState): void { + const updateMsg: WorkflowUpdateMessage = { + type: "update", + state, + }; + const message = JSON.stringify(updateMsg); + + for (const ws of this.connectedUsers) { + try { + ws.send(message); + } catch (error) { + console.error("Error broadcasting to WebSocket:", error); + } + } + } + + /** + * Schedule a debounced persist + */ + private schedulePersist(): void { + // Clear any existing timeout + if (this.pendingPersistTimeout !== undefined) { + clearTimeout(this.pendingPersistTimeout); + } + + // Schedule new persist + this.pendingPersistTimeout = setTimeout(() => { + this.persistToDatabase(); + this.pendingPersistTimeout = undefined; + }, WorkflowSession.PERSIST_DEBOUNCE_MS) as unknown as number; + } + + /** + * Persist state back to D1 database + */ + private async persistToDatabase(): Promise { + if (!this.state || !this.organizationId) { + return; + } + + try { + const db = createDatabase(this.env.DB); + await updateWorkflow(db, this.state.id, this.organizationId, { + name: this.state.name, + data: { + id: this.state.id, + name: this.state.name, + handle: this.state.handle, + type: this.state.type, + nodes: this.state.nodes, + edges: this.state.edges, + }, + }); + + console.log(`Persisted workflow ${this.state.id} to D1 database`); + } catch (error) { + console.error("Error persisting workflow to database:", error); + } + } + + async fetch(request: Request): Promise { + const url = new URL(request.url); + + // This endpoint is ONLY called by the Runtime (Cloudflare Workflow) + // to send execution progress updates. Clients never call this directly. + if (url.pathname.endsWith("/execution") && request.method === "POST") { + return this.handleExecutionUpdate(request); + } + + // This endpoint is called by the api to establish a WebSocket connection. + // It requires authentication and userId. + // It extracts workflowId from the URL path. + const pathParts = url.pathname.split("/").filter(Boolean); + const workflowId = pathParts[pathParts.length - 1] || ""; + + // Extract userId from custom header + const userId = request.headers.get("X-User-Id") || ""; + + if (!workflowId) { + return new Response("Missing workflowId in path", { + status: 400, + }); + } + + if (!userId) { + return new Response("Missing userId header", { + status: 401, + }); + } + + if (!this.state) { + try { + await this.loadState(workflowId, userId); + this.userId = userId; + } catch (error) { + console.error("Error loading workflow:", error); + return Response.json( + { + error: "Failed to load workflow", + details: error instanceof Error ? error.message : "Unknown error", + }, + { status: 403 } + ); + } + } + + if (url.pathname.endsWith("/state") && request.method === "GET") { + return this.handleStateRequest(); + } + + const upgradeHeader = request.headers.get("Upgrade"); + if (upgradeHeader === "websocket") { + return this.handleWebSocketUpgrade(request); + } + + return new Response("Expected /state GET or WebSocket upgrade", { + status: 400, + }); + } + + private async handleStateRequest(): Promise { + try { + const state = await this.getState(); + return Response.json(state); + } catch (error) { + console.error("Error getting workflow state:", error); + return Response.json( + { + error: "Failed to get workflow state", + details: error instanceof Error ? error.message : "Unknown error", + }, + { status: 500 } + ); + } + } + + /** + * Handle execution updates from Runtime (internal endpoint) + */ + private async handleExecutionUpdate(request: Request): Promise { + try { + const execution = (await request.json()) as WorkflowExecution; + + const ws = this.executionIdToWebSocket.get(execution.id); + if (!ws) { + console.warn( + `No WebSocket connection found for execution ${execution.id}` + ); + return Response.json({ ok: true }); + } + + this.executions.set(ws, execution); + + const updateMessage: WorkflowExecutionUpdateMessage = { + type: "execution_update", + executionId: execution.id, + status: execution.status, + nodeExecutions: execution.nodeExecutions, + error: execution.error, + }; + + ws.send(JSON.stringify(updateMessage)); + + return Response.json({ ok: true }); + } catch (error) { + console.error("Error handling execution update:", error); + return Response.json( + { + error: "Failed to handle execution update", + details: error instanceof Error ? error.message : "Unknown error", + }, + { status: 500 } + ); + } + } + + private async handleWebSocketUpgrade(_request: Request): Promise { + const webSocketPair = new WebSocketPair(); + const [client, server] = Object.values(webSocketPair); + + this.ctx.acceptWebSocket(server); + this.connectedUsers.add(server); + this.executions.set(server, null); // Initialize with no execution + + const initState = await this.getState(); + const initMessage: WorkflowInitMessage = { + type: "init", + state: initState, + }; + server.send(JSON.stringify(initMessage)); + + return new Response(null, { + status: 101, + webSocket: client, + }); + } + + /** + * Handle WebSocket messages from client + * + * Supports two message types: + * 1. WorkflowUpdateMessage - Update workflow state (nodes/edges) + * 2. WorkflowExecuteMessage - Trigger workflow execution or register for updates + */ + async webSocketMessage(ws: WebSocket, message: string | ArrayBuffer) { + try { + if (typeof message !== "string") { + const errorMsg: WorkflowErrorMessage = { + error: "Expected string message", + }; + ws.send(JSON.stringify(errorMsg)); + return; + } + + const data = JSON.parse(message) as WorkflowMessage; + + if ("type" in data && data.type === "update") { + const updateMsg = data as WorkflowUpdateMessage; + await this.updateState(updateMsg.state); + } else if ("type" in data && data.type === "execute") { + const executeMsg = data as WorkflowExecuteMessage; + + if (executeMsg.executionId) { + this.executionIdToWebSocket.set(executeMsg.executionId, ws); + console.log( + `Registered execution ${executeMsg.executionId} for WebSocket updates` + ); + } else { + await this.handleExecuteWorkflow(ws, executeMsg.parameters); + } + } + } catch (error) { + console.error("WebSocket message error:", error); + const errorMsg: WorkflowErrorMessage = { + error: "Failed to process message", + details: error instanceof Error ? error.message : "Unknown error", + }; + ws.send(JSON.stringify(errorMsg)); + } + } + + /** + * Handle workflow execution triggered via WebSocket + */ + private async handleExecuteWorkflow( + ws: WebSocket, + _parameters?: Record + ): Promise { + if (!this.state || !this.organizationId || !this.userId) { + const errorMsg: WorkflowErrorMessage = { + error: "Workflow not initialized", + }; + ws.send(JSON.stringify(errorMsg)); + return; + } + + try { + const db = createDatabase(this.env.DB); + + // Get organization compute credits + const computeCredits = await getOrganizationComputeCredits( + db, + this.organizationId + ); + if (computeCredits === undefined) { + const errorMsg: WorkflowErrorMessage = { + error: "Organization not found", + }; + ws.send(JSON.stringify(errorMsg)); + return; + } + + // Validate workflow has nodes + if (!this.state.nodes || this.state.nodes.length === 0) { + const errorMsg: WorkflowErrorMessage = { + error: + "Cannot execute an empty workflow. Please add nodes to the workflow.", + }; + ws.send(JSON.stringify(errorMsg)); + return; + } + + const executionParams = { + workflow: { + id: this.state.id, + name: this.state.name, + handle: this.state.handle, + type: this.state.type, + nodes: this.state.nodes, + edges: this.state.edges, + }, + userId: this.userId, + organizationId: this.organizationId, + computeCredits, + workflowSessionId: this.state.id, + }; + + // Start workflow execution + const instance = await this.env.EXECUTE.create({ + params: executionParams, + }); + const executionId = instance.id; + + // Register this WebSocket for execution updates + this.executionIdToWebSocket.set(executionId, ws); + + // Build initial nodeExecutions + const nodeExecutions = this.state.nodes.map((node) => ({ + nodeId: node.id, + status: "executing" as const, + })); + + // Save initial execution record + const initialExecution = await saveExecution(db, { + id: executionId, + workflowId: this.state.id, + userId: this.userId, + organizationId: this.organizationId, + status: ExecutionStatus.EXECUTING, + nodeExecutions, + createdAt: new Date(), + updatedAt: new Date(), + }); + + // Store execution for this WebSocket + this.executions.set(ws, { + id: initialExecution.id, + workflowId: initialExecution.workflowId, + status: "submitted", + nodeExecutions: initialExecution.nodeExecutions, + }); + + // Send execution started message + const updateMessage: WorkflowExecutionUpdateMessage = { + type: "execution_update", + executionId: initialExecution.id, + status: "submitted", + nodeExecutions: initialExecution.nodeExecutions, + }; + ws.send(JSON.stringify(updateMessage)); + + console.log( + `Started workflow execution ${executionId} for workflow ${this.state.id}` + ); + } catch (error) { + console.error("Failed to execute workflow:", error); + const errorMsg: WorkflowErrorMessage = { + error: "Failed to execute workflow", + details: error instanceof Error ? error.message : "Unknown error", + }; + ws.send(JSON.stringify(errorMsg)); + } + } + + async webSocketClose( + ws: WebSocket, + _code: number, + _reason: string, + _wasClean: boolean + ) { + this.connectedUsers.delete(ws); + + const execution = this.executions.get(ws); + if (execution) { + this.executionIdToWebSocket.delete(execution.id); + } + this.executions.delete(ws); + + if (this.pendingPersistTimeout !== undefined) { + clearTimeout(this.pendingPersistTimeout); + await this.persistToDatabase(); + this.pendingPersistTimeout = undefined; + } + } +} diff --git a/apps/api/src/email.ts b/apps/api/src/email.ts index b3139172..bc75fb10 100644 --- a/apps/api/src/email.ts +++ b/apps/api/src/email.ts @@ -1,4 +1,3 @@ -import { ExecutionContext } from "@cloudflare/workers-types"; import { Node, Workflow as WorkflowType } from "@dafthunk/types"; import { Bindings } from "./context"; @@ -160,7 +159,6 @@ export async function handleIncomingEmail( nodes: workflowData.nodes, edges: workflowData.edges, }, - monitorProgress: false, deploymentId, emailMessage: { from, diff --git a/apps/api/src/index.ts b/apps/api/src/index.ts index c260d8f5..304088e1 100644 --- a/apps/api/src/index.ts +++ b/apps/api/src/index.ts @@ -1,8 +1,9 @@ import { Hono } from "hono"; -export { Runtime } from "./runtime/runtime"; + import auth from "./auth"; import { ApiContext } from "./context"; import { handleCronTriggers } from "./cron"; +import { WorkflowSession } from "./durable-objects/workflow-session"; import { handleIncomingEmail } from "./email"; import { corsMiddleware } from "./middleware/cors"; import { createRateLimitMiddleware } from "./middleware/rate-limit"; @@ -21,6 +22,8 @@ import secretRoutes from "./routes/secrets"; import typeRoutes from "./routes/types"; import usageRoutes from "./routes/usage"; import workflowRoutes from "./routes/workflows"; +import wsRoutes from "./routes/ws"; +import { Runtime } from "./runtime/runtime"; // Initialize Hono app with types const app = new Hono(); @@ -65,6 +68,9 @@ app.route("/:organizationIdOrHandle/secrets", secretRoutes); app.route("/:organizationIdOrHandle/workflows", workflowRoutes); app.route("/:organizationIdOrHandle/objects", objectRoutes); app.route("/:organizationIdOrHandle/usage", usageRoutes); +app.route("/:organizationIdOrHandle/ws", wsRoutes); + +export { Runtime, WorkflowSession }; export default { scheduled: handleCronTriggers, diff --git a/apps/api/src/middleware/developer-mode.ts b/apps/api/src/middleware/developer-mode.ts deleted file mode 100644 index cc01e3cf..00000000 --- a/apps/api/src/middleware/developer-mode.ts +++ /dev/null @@ -1,22 +0,0 @@ -import { Context } from "hono"; - -import { ApiContext } from "../context"; - -export const developerModeMiddleware = async ( - c: Context, - next: () => Promise -) => { - const jwtPayload = c.get("jwtPayload"); - - if (!jwtPayload?.developerMode) { - return c.json( - { - error: - "This feature is under development and accessible only to developers.", - }, - 403 - ); - } - - await next(); -}; diff --git a/apps/api/src/nodes/audio/whisper-large-v3-turbo-node.ts b/apps/api/src/nodes/audio/whisper-large-v3-turbo-node.ts index 994f906f..7d587717 100644 --- a/apps/api/src/nodes/audio/whisper-large-v3-turbo-node.ts +++ b/apps/api/src/nodes/audio/whisper-large-v3-turbo-node.ts @@ -1,4 +1,3 @@ -import type { Ai_Cf_Openai_Whisper_Large_V3_Turbo_Input } from "@cloudflare/workers-types/experimental"; import { NodeExecution, NodeType } from "@dafthunk/types"; import { NodeContext } from "../types"; @@ -106,17 +105,15 @@ export class WhisperLargeV3TurboNode extends ExecutableNode { const audioBase64 = btoa(String.fromCharCode(...audio.data)); // Prepare the request parameters - const params: Ai_Cf_Openai_Whisper_Large_V3_Turbo_Input = { + const params = { audio: audioBase64, + ...(task && { task }), + ...(language && { language }), + ...(vad_filter !== undefined && { vad_filter }), + ...(initial_prompt && { initial_prompt }), + ...(prefix && { prefix }), }; - // Add optional parameters if provided - if (task) params.task = task; - if (language) params.language = language; - if (vad_filter !== undefined) params.vad_filter = vad_filter; - if (initial_prompt) params.initial_prompt = initial_prompt; - if (prefix) params.prefix = prefix; - // Call Cloudflare AI Whisper Large V3 Turbo model const response = await context.env.AI.run( "@cf/openai/whisper-large-v3-turbo", @@ -124,6 +121,11 @@ export class WhisperLargeV3TurboNode extends ExecutableNode { context.env.AI_OPTIONS ); + // Handle streaming response + if (response instanceof ReadableStream) { + throw new Error("Streaming response not supported for whisper model"); + } + // Extract the results const output = { text: response.text, diff --git a/apps/api/src/routes/datasets.ts b/apps/api/src/routes/datasets.ts index efa64c16..66ebaab9 100644 --- a/apps/api/src/routes/datasets.ts +++ b/apps/api/src/routes/datasets.ts @@ -26,7 +26,6 @@ import { getDatasets, updateDataset, } from "../db"; -import { developerModeMiddleware } from "../middleware/developer-mode"; // Extend the ApiContext with our custom variable type ExtendedApiContext = ApiContext & { @@ -38,7 +37,7 @@ type ExtendedApiContext = ApiContext & { const datasetRoutes = new Hono(); // Apply early access middleware to all dataset routes -datasetRoutes.use("*", jwtMiddleware, developerModeMiddleware); +datasetRoutes.use("*", jwtMiddleware); /** * List all datasets for the current organization diff --git a/apps/api/src/routes/deployments.ts b/apps/api/src/routes/deployments.ts index 56ee2214..7b0ee33e 100644 --- a/apps/api/src/routes/deployments.ts +++ b/apps/api/src/routes/deployments.ts @@ -263,9 +263,6 @@ deploymentRoutes.post( const deploymentId = c.req.param("deploymentId"); const db = createDatabase(c.env.DB); - const monitorProgress = - new URL(c.req.url).searchParams.get("monitorProgress") === "true"; - // Get organization compute credits const computeCredits = await getOrganizationComputeCredits( db, @@ -333,7 +330,6 @@ deploymentRoutes.post( nodes: workflowData.nodes, edges: workflowData.edges, }, - monitorProgress, deploymentId: deployment.id, httpRequest: { url, diff --git a/apps/api/src/routes/workflows.ts b/apps/api/src/routes/workflows.ts index e5683536..f85e4e04 100644 --- a/apps/api/src/routes/workflows.ts +++ b/apps/api/src/routes/workflows.ts @@ -157,28 +157,39 @@ workflowRoutes.post( */ workflowRoutes.get("/:id", jwtMiddleware, async (c) => { const id = c.req.param("id"); - const db = createDatabase(c.env.DB); - const organizationId = c.get("organizationId")!; - const workflow = await getWorkflow(db, id, organizationId); - if (!workflow) { - return c.json({ error: "Workflow not found" }, 404); + const userId = c.var.jwtPayload?.sub; + + if (!userId) { + return c.json({ error: "Unauthorized" }, 401); } - const workflowData = workflow.data; - - const response: GetWorkflowResponse = { - id: workflow.id, - name: workflow.name, - handle: workflow.handle, - type: workflowData.type, - createdAt: workflow.createdAt, - updatedAt: workflow.updatedAt, - nodes: workflowData.nodes || [], - edges: workflowData.edges || [], - }; + const db = createDatabase(c.env.DB); - return c.json(response); + try { + // Get metadata from database for timestamps + const workflow = await getWorkflow(db, id, organizationId); + + if (!workflow) { + return c.json({ error: "Workflow not found" }, 404); + } + + const response: GetWorkflowResponse = { + id: workflow!.id, + name: workflow!.name, + handle: workflow!.handle, + type: workflow!.data.type, + createdAt: workflow?.createdAt || new Date(), + updatedAt: workflow?.updatedAt || new Date(), + nodes: workflow?.data.nodes || [], + edges: workflow?.data.edges || [], + }; + + return c.json(response); + } catch (error) { + console.error("Error fetching workflow:", error); + return c.json({ error: "Failed to fetch workflow" }, 500); + } }); /** @@ -450,8 +461,6 @@ workflowRoutes.post( const workflowIdOrHandle = c.req.param("workflowIdOrHandle"); const version = c.req.param("version"); const db = createDatabase(c.env.DB); - const monitorProgress = - new URL(c.req.url).searchParams.get("monitorProgress") === "true"; // Get organization compute credits const computeCredits = await getOrganizationComputeCredits( @@ -482,8 +491,11 @@ workflowRoutes.post( let deploymentId: string | undefined; if (version === "dev") { - // Get workflow data directly + // Fallback to database workflow = await getWorkflow(db, workflowIdOrHandle, organizationId); + if (!workflow) { + return c.json({ error: "Workflow not found" }, 404); + } workflowData = workflow.data; } else { // Get deployment based on version @@ -593,7 +605,6 @@ workflowRoutes.post( userId, organizationId, computeCredits, - monitorProgress, deploymentId, }; diff --git a/apps/api/src/routes/ws.ts b/apps/api/src/routes/ws.ts new file mode 100644 index 00000000..3e314dc7 --- /dev/null +++ b/apps/api/src/routes/ws.ts @@ -0,0 +1,40 @@ +import { Hono } from "hono"; + +import { jwtMiddleware } from "../auth"; +import { ApiContext } from "../context"; + +const wsRoutes = new Hono(); + +// WebSocket endpoint for real-time workflow state synchronization +wsRoutes.get("/:workflowId", jwtMiddleware, async (c) => { + const upgradeHeader = c.req.header("Upgrade"); + + if (!upgradeHeader || upgradeHeader !== "websocket") { + return c.json({ error: "Expected WebSocket connection" }, 426); + } + + const userId = c.var.jwtPayload?.sub; + + if (!userId) { + return c.json({ error: "Unauthorized" }, 401); + } + + const workflowId = c.req.param("workflowId"); + + // Create a unique DO ID for this workflow + const doId = c.env.WORKFLOW_SESSION.idFromName(workflowId); + const stub = c.env.WORKFLOW_SESSION.get(doId); + + // Pass the original request with userId in a custom header + const headers = new Headers(c.req.raw.headers); + headers.set("X-User-Id", userId); + const newReq = new Request(c.req.url, { + method: c.req.method, + headers, + body: c.req.raw.body, + }); + + return stub.fetch(newReq); +}); + +export default wsRoutes; diff --git a/apps/api/src/runtime/object-store.ts b/apps/api/src/runtime/object-store.ts index 637487d7..235d89d8 100644 --- a/apps/api/src/runtime/object-store.ts +++ b/apps/api/src/runtime/object-store.ts @@ -1,4 +1,3 @@ -import type { R2Object } from "@cloudflare/workers-types"; import { ObjectReference, Workflow, WorkflowExecution } from "@dafthunk/types"; import { v7 as uuid } from "uuid"; diff --git a/apps/api/src/runtime/runtime.ts b/apps/api/src/runtime/runtime.ts index bb43f6c7..1219cf1b 100644 --- a/apps/api/src/runtime/runtime.ts +++ b/apps/api/src/runtime/runtime.ts @@ -2,7 +2,6 @@ import { JsonArray, JsonObject, Node, - NodeExecutionStatus, ObjectReference, Workflow, WorkflowExecution, @@ -74,7 +73,7 @@ export type RuntimeParams = { userId: string; organizationId: string; computeCredits: number; - monitorProgress?: boolean; + workflowSessionId?: string; deploymentId?: string; httpRequest?: HttpRequest; emailMessage?: EmailMessage; @@ -180,7 +179,7 @@ export class Runtime extends WorkflowEntrypoint { workflow, userId, organizationId, - monitorProgress = false, + workflowSessionId, httpRequest, emailMessage, computeCredits, @@ -313,27 +312,19 @@ export class Runtime extends WorkflowEntrypoint { ); } - // Persist progress after each execution unit if monitoring is enabled - if (monitorProgress) { - const unitDescription = - executionUnit.type === "individual" - ? executionUnit.nodeId - : `inline group [${executionUnit.nodeIds.join(", ")}]`; - - executionRecord = await step.do( - `persist after ${unitDescription}`, - Runtime.defaultStepConfig, - async () => - this.saveExecutionState( - userId, - organizationId, - workflow.id, - instanceId, - runtimeState, - executionRecord.startedAt, - executionRecord.endedAt - ) - ); + if (workflowSessionId) { + executionRecord = { + ...executionRecord, + status: runtimeState.status, + nodeExecutions: this.buildNodeExecutions(runtimeState), + }; + + this.sendExecutionUpdateToSession( + workflowSessionId, + executionRecord + ).catch((error) => { + console.error("Failed to send execution update to session:", error); + }); } } } catch (error) { @@ -376,6 +367,15 @@ export class Runtime extends WorkflowEntrypoint { ); } ); + + if (workflowSessionId) { + this.sendExecutionUpdateToSession( + workflowSessionId, + executionRecord + ).catch((error) => { + console.error("Failed to send execution update to session:", error); + }); + } } return executionRecord; @@ -412,6 +412,29 @@ export class Runtime extends WorkflowEntrypoint { }, 0); } + private async sendExecutionUpdateToSession( + workflowSessionId: string, + execution: WorkflowExecution + ): Promise { + try { + const id = this.env.WORKFLOW_SESSION.idFromName(workflowSessionId); + const stub = this.env.WORKFLOW_SESSION.get(id); + + await stub.fetch(`https://workflow-session/execution`, { + method: "POST", + headers: { + "Content-Type": "application/json", + }, + body: JSON.stringify(execution), + }); + } catch (error) { + console.error( + `Failed to send execution update to session ${workflowSessionId}:`, + error + ); + } + } + /** * Preloads all organization secrets for synchronous access during workflow execution */ @@ -1188,44 +1211,50 @@ export class Runtime extends WorkflowEntrypoint { } /** - * Persists the workflow execution state to the database. + * Builds node execution list from runtime state */ - private async saveExecutionState( - userId: string, - organizationId: string, - workflowId: string, - instanceId: string, - runtimeState: RuntimeState, - startedAt?: Date, - endedAt?: Date - ): Promise { - // Build node execution list with explicit status for each node. - const nodeExecutionList = runtimeState.workflow.nodes.map((node) => { + private buildNodeExecutions(runtimeState: RuntimeState) { + return runtimeState.workflow.nodes.map((node) => { if (runtimeState.executedNodes.has(node.id)) { return { nodeId: node.id, - status: "completed" as NodeExecutionStatus, - outputs: runtimeState.nodeOutputs.get(node.id), + status: "completed" as const, + outputs: runtimeState.nodeOutputs.get(node.id) || {}, }; } if (runtimeState.nodeErrors.has(node.id)) { return { nodeId: node.id, - status: "error" as NodeExecutionStatus, + status: "error" as const, error: runtimeState.nodeErrors.get(node.id), }; } if (runtimeState.skippedNodes.has(node.id)) { return { nodeId: node.id, - status: "skipped" as NodeExecutionStatus, + status: "skipped" as const, }; } return { nodeId: node.id, - status: "executing" as NodeExecutionStatus, + status: "executing" as const, }; }); + } + + /** + * Persists the workflow execution state to the database. + */ + private async saveExecutionState( + userId: string, + organizationId: string, + workflowId: string, + instanceId: string, + runtimeState: RuntimeState, + startedAt?: Date, + endedAt?: Date + ): Promise { + const nodeExecutionList = this.buildNodeExecutions(runtimeState); const executionStatus = runtimeState.status; const errorMsg = diff --git a/apps/api/src/utils/encryption.test.ts b/apps/api/src/utils/encryption.test.ts index 2061c30f..430c8997 100644 --- a/apps/api/src/utils/encryption.test.ts +++ b/apps/api/src/utils/encryption.test.ts @@ -5,6 +5,7 @@ import { beforeEach, describe, expect, it } from "vitest"; import { Bindings } from "../context"; +import { WorkflowSession } from "../durable-objects/workflow-session"; import { decryptSecret, encryptSecret } from "./encryption"; // Mock Bindings for testing @@ -20,6 +21,7 @@ const createMockEnv = (masterKey?: string): Bindings => ({ RATE_LIMIT_AUTH: {} as RateLimit, RATE_LIMIT_EXECUTE: {} as RateLimit, EXECUTE: {} as Workflow, + WORKFLOW_SESSION: {} as DurableObjectNamespace, RESSOURCES: {} as R2Bucket, DATASETS: {} as R2Bucket, DATASETS_AUTORAG: "", diff --git a/apps/api/tsconfig.json b/apps/api/tsconfig.json index 709d55a8..27744b58 100644 --- a/apps/api/tsconfig.json +++ b/apps/api/tsconfig.json @@ -34,8 +34,6 @@ // "rootDirs": [], /* Allow multiple folders to be treated as one when resolving modules. */ // "typeRoots": [], /* Specify multiple folders that act like `./node_modules/@types`. */ "types": [ - "@cloudflare/workers-types/experimental", - "@cloudflare/workers-types/2023-07-01", "@cloudflare/vitest-pool-workers" ] /* Specify type package names to be included without being referenced in a source file. */, // "allowUmdGlobalAccess": true, /* Allow accessing UMD globals from modules. */ diff --git a/apps/api/wrangler.jsonc b/apps/api/wrangler.jsonc index 842cab78..74bc7926 100644 --- a/apps/api/wrangler.jsonc +++ b/apps/api/wrangler.jsonc @@ -20,7 +20,6 @@ * Docs: https://developers.cloudflare.com/workers/configuration/smart-placement/#smart-placement */ // "placement": { "mode": "smart" }, - /** * Bindings * Bindings allow your Worker to interact with resources on the Cloudflare Developer Platform, including @@ -65,7 +64,25 @@ } ], "analytics_engine_datasets": [ - { "binding": "COMPUTE", "dataset": "dafthunk-compute-development" } + { + "binding": "COMPUTE", + "dataset": "dafthunk-compute-development" + } + ], + "durable_objects": { + "bindings": [ + { + "name": "WORKFLOW_SESSION", + "class_name": "WorkflowSession", + "script_name": "dafthunk-api" + } + ] + }, + "migrations": [ + { + "tag": "v1", + "new_classes": ["WorkflowSession"] + } ], "unsafe": { "bindings": [ @@ -98,7 +115,6 @@ } ] }, - /** * Environment Variables * https://developers.cloudflare.com/workers/wrangler/configuration/#environment-variables @@ -153,7 +169,25 @@ } ], "analytics_engine_datasets": [ - { "binding": "COMPUTE", "dataset": "dafthunk-compute-production" } + { + "binding": "COMPUTE", + "dataset": "dafthunk-compute-production" + } + ], + "durable_objects": { + "bindings": [ + { + "name": "WORKFLOW_SESSION", + "class_name": "WorkflowSession", + "script_name": "dafthunk-api" + } + ] + }, + "migrations": [ + { + "tag": "v1", + "new_classes": ["WorkflowSession"] + } ], "unsafe": { "bindings": [ @@ -195,13 +229,11 @@ * Note: Use secrets to store sensitive data. * https://developers.cloudflare.com/workers/configuration/secrets/ */ - /** * Static Assets * https://developers.cloudflare.com/workers/static-assets/binding/ */ // "assets": { "directory": "./public/", "binding": "ASSETS" }, - /** * Service Bindings (communicate between multiple Workers) * https://developers.cloudflare.com/workers/wrangler/configuration/#service-bindings diff --git a/apps/web/src/components/workflow/use-workflow-state.ts b/apps/web/src/components/workflow/use-workflow-state.ts index 5d6c0c0b..e5811c81 100644 --- a/apps/web/src/components/workflow/use-workflow-state.ts +++ b/apps/web/src/components/workflow/use-workflow-state.ts @@ -65,6 +65,10 @@ interface UseWorkflowStateReturn { onConnect: OnConnect; onConnectStart: OnConnectStart; onConnectEnd: OnConnectEnd; + onNodeDragStop: ( + event: React.MouseEvent, + node: ReactFlowNode + ) => void; connectionValidationState: ConnectionValidationState; isValidConnection: IsValidConnection>; handleAddNode: () => void; @@ -95,11 +99,12 @@ interface UseWorkflowStateReturn { // Helper functions to replace workflowNodeStateService const stripExecutionFields = ( data: WorkflowNodeType -): Omit & { +): Omit & { outputs: Omit[]; inputs: Omit[]; } => { - const { executionState, error, ...rest } = data; + // Exclude nodeTemplates from comparison - it's UI metadata that shouldn't trigger persistence + const { executionState, error, nodeTemplates, ...rest } = data; return { ...rest, @@ -339,33 +344,36 @@ export function useWorkflowState({ } }, [initialEdges, readonly, setEdges]); - // Effect to notify parent of changes for nodes + // Effect to notify parent of changes for nodes (excluding position changes during drag) useEffect(() => { if (readonly) return; const nodeCountChanged = nodes.length !== initialNodes.length; - const hasDataOrPositionChanges = nodes.some((node) => { - const initialNode = initialNodes.find((n) => n.id === node.id); - if (!initialNode) return true; - if ( - node.position.x !== initialNode.position.x || - node.position.y !== initialNode.position.y - ) { - return true; + // Check for data changes (excluding position) + let hasDataChanges = false; + nodes.forEach((node) => { + const initialNode = initialNodes.find((n) => n.id === node.id); + if (!initialNode) { + hasDataChanges = true; + return; } + // Check data changes (not position) const nodeData = stripExecutionFields(node.data); const initialNodeData = stripExecutionFields(initialNode.data); - return JSON.stringify(nodeData) !== JSON.stringify(initialNodeData); + if (JSON.stringify(nodeData) !== JSON.stringify(initialNodeData)) { + hasDataChanges = true; + } }); - // Check for deleted nodes by looking for initialNodes that don't exist in the current nodes + // Check for deleted nodes const hasDeletedNodes = initialNodes.some( (initialNode) => !nodes.some((node) => node.id === initialNode.id) ); - if (nodeCountChanged || hasDataOrPositionChanges || hasDeletedNodes) { + // Save for data changes or node add/delete (position changes handled by onNodeDragStop) + if (nodeCountChanged || hasDataChanges || hasDeletedNodes) { onNodesChangePersistCallback?.(nodes); } }, [nodes, onNodesChangePersistCallback, initialNodes, readonly]); @@ -432,6 +440,16 @@ export function useWorkflowState({ setConnectionValidationState("default"); }, [readonly]); + // Handle node drag stop - save positions after drag completes + const onNodeDragStop = useCallback( + (_event: React.MouseEvent, _node: ReactFlowNode) => { + if (readonly) return; + // Save with current node positions after drag completes + onNodesChangePersistCallback?.(nodes); + }, + [readonly, nodes, onNodesChangePersistCallback] + ); + // Function to validate connection based on type compatibility const isValidConnection = useCallback( (connection: any) => { @@ -1128,6 +1146,7 @@ export function useWorkflowState({ onConnect, onConnectStart, onConnectEnd, + onNodeDragStop: readonly ? () => {} : onNodeDragStop, connectionValidationState, isValidConnection, handleAddNode, diff --git a/apps/web/src/components/workflow/workflow-builder.tsx b/apps/web/src/components/workflow/workflow-builder.tsx index 469879b0..40dccb18 100644 --- a/apps/web/src/components/workflow/workflow-builder.tsx +++ b/apps/web/src/components/workflow/workflow-builder.tsx @@ -125,6 +125,7 @@ export function WorkflowBuilder({ cutSelected, pasteFromClipboard, hasClipboardData, + onNodeDragStop, } = useWorkflowState({ initialNodes, initialEdges, @@ -431,6 +432,7 @@ export function WorkflowBuilder({ onConnectStart={readonly ? () => {} : onConnectStart} onConnectEnd={readonly ? () => {} : onConnectEnd} onNodeDoubleClick={handleNodeDoubleClick} + onNodeDragStop={onNodeDragStop} onInit={setReactFlowInstance} onAddNode={readonly ? undefined : handleAddNode} onAction={ diff --git a/apps/web/src/components/workflow/workflow-canvas.tsx b/apps/web/src/components/workflow/workflow-canvas.tsx index f39fcf22..d7777c0c 100644 --- a/apps/web/src/components/workflow/workflow-canvas.tsx +++ b/apps/web/src/components/workflow/workflow-canvas.tsx @@ -152,6 +152,10 @@ export interface WorkflowCanvasProps { onConnect: OnConnect; onConnectStart: OnConnectStart; onConnectEnd: OnConnectEnd; + onNodeDragStop: ( + event: React.MouseEvent, + node: ReactFlowNode + ) => void; onNodeDoubleClick?: (event: React.MouseEvent) => void; onInit: ( instance: ReactFlowInstance< @@ -695,6 +699,7 @@ export function WorkflowCanvas({ onConnectStart, onConnectEnd, onNodeDoubleClick, + onNodeDragStop, onInit, onAddNode, onAction, @@ -765,6 +770,7 @@ export function WorkflowCanvas({ onConnectStart={readonly ? () => {} : onConnectStart} onConnectEnd={readonly ? () => {} : onConnectEnd} onNodeDoubleClick={onNodeDoubleClick} + onNodeDragStop={onNodeDragStop} nodeTypes={nodeTypes} edgeTypes={edgeTypes} connectionMode={ConnectionMode.Strict} diff --git a/apps/web/src/hooks/use-editable-workflow.ts b/apps/web/src/hooks/use-editable-workflow.ts index 0285e6e3..44bc858a 100644 --- a/apps/web/src/hooks/use-editable-workflow.ts +++ b/apps/web/src/hooks/use-editable-workflow.ts @@ -1,143 +1,192 @@ -import type { Parameter, ParameterType, Workflow } from "@dafthunk/types"; +import type { + Parameter, + ParameterType, + WorkflowExecution, +} from "@dafthunk/types"; import type { Edge, Node } from "@xyflow/react"; -import { useCallback, useEffect, useMemo, useState } from "react"; +import { useCallback, useEffect, useRef, useState } from "react"; import { useAuth } from "@/components/auth-context"; import type { NodeTemplate, WorkflowEdgeType, WorkflowNodeType, -} from "@/components/workflow/workflow-types"; // Corrected import path -import { updateWorkflow } from "@/services/workflow-service"; +} from "@/components/workflow/workflow-types"; +import { + connectWorkflowWS, + WorkflowState, + WorkflowWebSocket, +} from "@/services/workflow-session-service.ts"; import { adaptDeploymentNodesToReactFlowNodes } from "@/utils/utils"; -import { debounce } from "@/utils/utils"; interface UseEditableWorkflowProps { workflowId: string | undefined; - currentWorkflow: Workflow | null | undefined; - isWorkflowDetailsLoading: boolean; - workflowDetailsError: Error | null; nodeTemplates?: NodeTemplate[]; + onExecutionUpdate?: (execution: WorkflowExecution) => void; } export function useEditableWorkflow({ workflowId, - currentWorkflow, - isWorkflowDetailsLoading, - workflowDetailsError, nodeTemplates = [], + onExecutionUpdate, }: UseEditableWorkflowProps) { const [nodes, setNodes] = useState[]>([]); const [edges, setEdges] = useState[]>([]); const [isInitializing, setIsInitializing] = useState(true); const [processingError, setProcessingError] = useState(null); const [savingError, setSavingError] = useState(null); + const wsRef = useRef(null); + const [isWSConnected, setIsWSConnected] = useState(false); + const [workflowMetadata, setWorkflowMetadata] = useState<{ + id: string; + name: string; + handle: string; + type: string; + } | null>(null); - // Get the organization from the auth context at the hook level const { organization } = useAuth(); - // Effect to initialize nodes and edges from currentWorkflow + // WebSocket connection effect useEffect(() => { - if (isWorkflowDetailsLoading) { - setIsInitializing(true); + if (!workflowId || !organization?.handle) { + setIsInitializing(false); return; } - if (workflowDetailsError || !currentWorkflow) { - setIsInitializing(false); - if (workflowDetailsError) { - setProcessingError( - workflowDetailsError.message || "Failed to load workflow data." - ); - } - setNodes([]); - setEdges([]); + // Prevent duplicate connections if already connected + if (wsRef.current?.isConnected()) { return; } - try { - const reactFlowNodes = adaptDeploymentNodesToReactFlowNodes( - currentWorkflow.nodes, - nodeTemplates - ); - const reactFlowEdges = currentWorkflow.edges.map((edge, index) => ({ - id: `e${index}`, - source: edge.source, - target: edge.target, - sourceHandle: edge.sourceOutput, - targetHandle: edge.targetInput, - type: "workflowEdge", - data: { - isValid: true, - sourceType: edge.sourceOutput, - targetType: edge.targetInput, + setIsInitializing(true); + + // Add a small delay to avoid race conditions during React strict mode double-mount + const timeoutId = setTimeout(() => { + // Double-check we're not already connected after the delay + if (wsRef.current?.isConnected()) { + return; + } + + const handleStateUpdate = (state: WorkflowState) => { + try { + // Store workflow metadata + if (state.id && state.type) { + setWorkflowMetadata({ + id: state.id, + name: state.name || "", + handle: state.handle || "", + type: state.type, + }); + } + + // Convert to ReactFlow format + const reactFlowNodes = adaptDeploymentNodesToReactFlowNodes( + state.nodes, + nodeTemplates + ); + const reactFlowEdges = state.edges.map( + (edge: any, index: number) => ({ + id: `e${index}`, + source: edge.source, + target: edge.target, + sourceHandle: edge.sourceOutput, + targetHandle: edge.targetInput, + type: "workflowEdge", + data: { + isValid: true, + sourceType: edge.sourceOutput, + targetType: edge.targetInput, + }, + }) + ); + + setNodes(reactFlowNodes); + setEdges(reactFlowEdges); + setProcessingError(null); + } catch (error) { + console.error("Error processing WebSocket state:", error); + setProcessingError("Failed to load state from WebSocket"); + } + }; + + const ws = connectWorkflowWS(organization.handle, workflowId, { + onInit: (state: WorkflowState) => { + handleStateUpdate(state); + setIsInitializing(false); }, - })); - - setNodes(reactFlowNodes); - setEdges(reactFlowEdges); - setProcessingError(null); - } catch (error) { - console.error("Error processing workflow data into React Flow:", error); - setProcessingError( - error instanceof Error - ? error.message - : "Error adapting workflow data for editor." - ); - setNodes([]); - setEdges([]); - } finally { - setIsInitializing(false); - } - }, [ - currentWorkflow, - isWorkflowDetailsLoading, - workflowDetailsError, - nodeTemplates, - ]); + onUpdate: (state: WorkflowState) => { + // Handle broadcasts from other users + handleStateUpdate(state); + }, + onOpen: () => { + setIsWSConnected(true); + }, + onClose: () => { + setIsWSConnected(false); + }, + onError: (error) => { + console.error("WebSocket error:", error); + setSavingError(`WebSocket error: ${error}`); + setProcessingError(`WebSocket error: ${error}`); + setIsInitializing(false); + }, + onExecutionUpdate: (execution: WorkflowExecution) => { + // Forward execution updates to parent component + onExecutionUpdate?.(execution); + }, + }); + + wsRef.current = ws; + }, 100); // Small delay to avoid double-mount issues + + return () => { + clearTimeout(timeoutId); + if (wsRef.current) { + wsRef.current.disconnect(); + wsRef.current = null; + } + }; + // eslint-disable-next-line react-compiler/react-compiler + // eslint-disable-next-line react-hooks/exhaustive-deps + }, [workflowId, organization?.handle]); const saveWorkflowInternal = useCallback( async ( nodesToSave: Node[], edgesToSave: Edge[] ) => { - if (!workflowId || !currentWorkflow) { - setSavingError( - "Workflow ID or current workflow data is missing, cannot save." - ); + // Block saves during initialization to prevent race condition where + // nodeTemplates load before edges, causing empty edges to be saved + if (isInitializing) { return; } - setSavingError(null); - try { - // Check if any node is currently executing, purely for logging/awareness. - // The actual node.data.executionState should be handled by the UI layer (use-workflow-state) - // and those updated nodes/edges are what we receive in nodesToSave/edgesToSave. - if ( - nodesToSave.some((node) => node.data.executionState === "executing") - ) { - console.log( - "Workflow elements are in an executing state during save." - ); - } + if (!workflowId) { + setSavingError("Workflow ID is missing, cannot save."); + return; + } + setSavingError(null); - const workflowNodes = nodesToSave.map((node) => { - const incomingEdges = edgesToSave.filter( - (edge) => edge.target === node.id - ); - return { - id: node.id, - name: node.data.name, - type: node.data.nodeType || "default", // Ensure nodeType is present - position: node.position, - icon: node.data.icon, - functionCalling: node.data.functionCalling, - inputs: node.data.inputs.map((input) => { - const isConnected = incomingEdges.some( - (edge) => edge.targetHandle === input.id - ); - const parameterBase: Omit & { value?: any } = - { + if (wsRef.current?.isConnected()) { + try { + const workflowNodes = nodesToSave.map((node) => { + const incomingEdges = edgesToSave.filter( + (edge) => edge.target === node.id + ); + return { + id: node.id, + name: node.data.name, + type: node.data.nodeType || "default", + position: node.position, + icon: node.data.icon, + functionCalling: node.data.functionCalling, + inputs: node.data.inputs.map((input) => { + const isConnected = incomingEdges.some( + (edge) => edge.targetHandle === input.id + ); + const parameterBase: Omit & { + value?: any; + } = { name: input.id, type: input.type as ParameterType["type"], description: input.name, @@ -145,77 +194,57 @@ export function useEditableWorkflow({ required: input.required, repeated: input.repeated, }; - if (!isConnected && typeof input.value !== "undefined") { - parameterBase.value = input.value; - } - return parameterBase as Parameter; - }), - outputs: node.data.outputs.map((output) => { - const parameter: Parameter = { - name: output.id, - type: output.type as ParameterType["type"], - description: output.name, - hidden: output.hidden, - // value is not part of output parameters definition in the backend model here - }; - return parameter; - }), - }; - }); - - const workflowEdges = edgesToSave.map((edge) => ({ - source: edge.source, - target: edge.target, - sourceOutput: edge.sourceHandle || "", - targetInput: edge.targetHandle || "", - })); - - const workflowToSave: Workflow = { - ...currentWorkflow, // Base workflow details like name, description etc. - id: workflowId, // Ensure the ID is correctly set - nodes: workflowNodes, - edges: workflowEdges, - }; - - console.log( - "Saving workflow via useEditableWorkflow:", - workflowId, - workflowToSave - ); - - const orgHandle = organization?.handle; - - if (!orgHandle) { - throw new Error("Organization handle is required to save workflow"); - } + if (!isConnected && typeof input.value !== "undefined") { + parameterBase.value = input.value; + } + return parameterBase as Parameter; + }), + outputs: node.data.outputs.map((output) => { + const parameter: Parameter = { + name: output.id, + type: output.type as ParameterType["type"], + description: output.name, + hidden: output.hidden, + }; + return parameter; + }), + }; + }); - await updateWorkflow(workflowId, workflowToSave, orgHandle); - } catch (error) { - console.error("Error saving workflow via useEditableWorkflow:", error); + const workflowEdges = edgesToSave.map((edge) => ({ + source: edge.source, + target: edge.target, + sourceOutput: edge.sourceHandle || "", + targetInput: edge.targetHandle || "", + })); - // If it's an authentication error, the user might need to refresh/login again - if (error instanceof Error && error.message.includes("Unauthorized")) { - setSavingError( - "Authentication expired. Please refresh the page or login again." - ); - } else { - setSavingError( - error instanceof Error ? error.message : "Failed to save workflow." - ); + wsRef.current.send(workflowNodes, workflowEdges); + return; + } catch (error) { + console.error("Error saving via WebSocket:", error); + setSavingError("Failed to save via WebSocket"); } } + + console.warn( + "WebSocket not available, workflow changes may not be saved" + ); + setSavingError("WebSocket not connected. Please refresh the page."); }, - [workflowId, organization, currentWorkflow] + [workflowId, isInitializing] ); - const saveWorkflow = useMemo( - () => - debounce( - (nodes: Node[], edges: Edge[]) => - saveWorkflowInternal(nodes, edges), - 1000 - ), - [saveWorkflowInternal] + const saveWorkflow = saveWorkflowInternal; + + const executeWorkflow = useCallback( + (options?: { parameters?: Record }) => { + if (!wsRef.current?.isConnected()) { + console.warn("WebSocket is not connected, cannot execute workflow"); + return; + } + wsRef.current.executeWorkflow(options); + }, + [] ); return { @@ -225,5 +254,8 @@ export function useEditableWorkflow({ processingError, savingError, saveWorkflow, + isWSConnected, + workflowMetadata, + executeWorkflow, }; } diff --git a/apps/web/src/pages/editor-page.tsx b/apps/web/src/pages/editor-page.tsx index 5dca9634..63e75a05 100644 --- a/apps/web/src/pages/editor-page.tsx +++ b/apps/web/src/pages/editor-page.tsx @@ -1,6 +1,7 @@ +import type { WorkflowType } from "@dafthunk/types"; import type { Connection, Edge, Node } from "@xyflow/react"; import { ReactFlowProvider } from "@xyflow/react"; -import { useCallback, useEffect, useMemo, useState } from "react"; +import { useCallback, useEffect, useMemo, useRef, useState } from "react"; import { useNavigate, useParams } from "react-router"; import { toast } from "sonner"; @@ -35,7 +36,6 @@ import { useNodeTypes } from "@/services/type-service"; import { upsertCronTrigger, useCronTrigger, - useWorkflow, useWorkflowExecution, } from "@/services/workflow-service"; @@ -54,25 +54,9 @@ export function EditorPage() { const [isEmailTriggerDialogOpen, setIsEmailTriggerDialogOpen] = useState(false); - const { - workflow: currentWorkflow, - workflowError: workflowDetailsError, - isWorkflowLoading: isWorkflowDetailsLoading, - } = useWorkflow(id || null, { revalidateOnFocus: false }); - - const { cronTrigger, isCronTriggerLoading, mutateCronTrigger } = - useCronTrigger(currentWorkflow?.type === "cron" && id ? id : null, { - revalidateOnFocus: false, - }); - - const { - deployments: deploymentHistory, - isDeploymentHistoryLoading, - mutateHistory: mutateDeploymentHistory, - } = useDeploymentHistory(id!, { revalidateOnFocus: false }); - + // Fetch all node types initially (no filter) const { nodeTypes, nodeTypesError, isNodeTypesLoading } = useNodeTypes( - currentWorkflow?.type, + undefined, // Fetch all node types initially { revalidateOnFocus: false } ); @@ -110,17 +94,53 @@ export function EditorPage() { return templates; }, [nodeTypes]); + const executionCallbackRef = useRef< + ((execution: WorkflowExecution) => void) | null + >(null); + + const handleExecutionUpdate = useCallback((execution: WorkflowExecution) => { + if (executionCallbackRef.current) { + executionCallbackRef.current(execution); + } + }, []); + + const { + nodes: initialNodesForUI, + edges: initialEdgesForUI, + isInitializing: isWorkflowInitializing, + processingError: workflowProcessingError, + savingError: workflowSavingError, + saveWorkflow, + isWSConnected: _isWSConnected, + workflowMetadata, + executeWorkflow: wsExecuteWorkflow, + } = useEditableWorkflow({ + workflowId: id, + nodeTemplates, + onExecutionUpdate: handleExecutionUpdate, + }); + + // Now we can use workflowMetadata for cron trigger + const { cronTrigger, isCronTriggerLoading, mutateCronTrigger } = + useCronTrigger(workflowMetadata?.type === "cron" && id ? id : null, { + revalidateOnFocus: false, + }); + + const { + deployments: deploymentHistory, + isDeploymentHistoryLoading, + mutateHistory: mutateDeploymentHistory, + } = useDeploymentHistory(id!, { revalidateOnFocus: false }); + const deploymentVersions = useMemo( () => deploymentHistory.map((d) => d.version).sort((a, b) => b - a), [deploymentHistory] ); - const [latestUiNodes, setLatestUiNodes] = useState[]>( - [] - ); - const [latestUiEdges, setLatestUiEdges] = useState[]>( - [] - ); + // Use refs for both nodes and edges to avoid stale closures and unnecessary re-renders + // Both are only used for validation, execution, and saving - no UI rendering depends on them + const latestUiNodesRef = useRef[]>([]); + const latestUiEdgesRef = useRef[]>([]); const handleOpenSetCronDialog = useCallback(() => { mutateDeploymentHistory(); @@ -153,51 +173,38 @@ export function EditorPage() { [id, orgHandle, mutateCronTrigger] ); - const { - nodes: initialNodesForUI, - edges: initialEdgesForUI, - isInitializing: isWorkflowInitializing, - processingError: workflowProcessingError, - savingError: workflowSavingError, - saveWorkflow, - } = useEditableWorkflow({ - workflowId: id, - currentWorkflow, - isWorkflowDetailsLoading, - workflowDetailsError, - nodeTemplates, - }); - useEffect(() => { if (initialNodesForUI) { - setLatestUiNodes(initialNodesForUI); + latestUiNodesRef.current = initialNodesForUI; } }, [initialNodesForUI]); useEffect(() => { if (initialEdgesForUI) { - setLatestUiEdges(initialEdgesForUI); + latestUiEdgesRef.current = initialEdgesForUI; } }, [initialEdgesForUI]); const handleUiNodesChanged = useCallback( (updatedNodesFromUI: Node[]) => { - setLatestUiNodes(updatedNodesFromUI); - if (currentWorkflow) { - saveWorkflow(updatedNodesFromUI, latestUiEdges); + latestUiNodesRef.current = updatedNodesFromUI; + if (workflowMetadata) { + // Use refs for both to get current values without stale closures + saveWorkflow(updatedNodesFromUI, latestUiEdgesRef.current); } }, - [latestUiEdges, saveWorkflow, currentWorkflow] + [saveWorkflow, workflowMetadata] ); const handleUiEdgesChanged = useCallback( (updatedEdgesFromUI: Edge[]) => { - setLatestUiEdges(updatedEdgesFromUI); - if (currentWorkflow) { - saveWorkflow(latestUiNodes, updatedEdgesFromUI); + latestUiEdgesRef.current = updatedEdgesFromUI; + if (workflowMetadata) { + // Use refs for both to get current values without stale closures + saveWorkflow(latestUiNodesRef.current, updatedEdgesFromUI); } }, - [latestUiNodes, saveWorkflow, currentWorkflow] + [saveWorkflow, workflowMetadata] ); const { @@ -211,22 +218,22 @@ export function EditorPage() { closeExecutionForm, isEmailFormDialogVisible, submitEmailFormData, - } = useWorkflowExecution(orgHandle); + } = useWorkflowExecution(orgHandle, wsExecuteWorkflow); usePageBreadcrumbs( [ { label: "Workflows", to: getOrgUrl("workflows") }, - { label: currentWorkflow?.name || "Workflow" }, + { label: workflowMetadata?.name || "Workflow" }, ], - [currentWorkflow?.name] + [workflowMetadata?.name] ); const validateConnection = useCallback( (connection: Connection) => { - const sourceNode = latestUiNodes.find( + const sourceNode = latestUiNodesRef.current.find( (node) => node.id === connection.source ); - const targetNode = latestUiNodes.find( + const targetNode = latestUiNodesRef.current.find( (node) => node.id === connection.target ); if (!sourceNode || !targetNode) return false; @@ -246,7 +253,7 @@ export function EditorPage() { return typesMatch; }, - [latestUiNodes] + [] // No dependencies since we're using refs ); const editorExecuteWorkflow = useCallback( @@ -254,15 +261,17 @@ export function EditorPage() { workflowIdFromBuilder: string, onExecutionFromBuilder: (execution: WorkflowExecution) => void ) => { + executionCallbackRef.current = onExecutionFromBuilder; + return executeWorkflow( workflowIdFromBuilder, onExecutionFromBuilder, - latestUiNodes, + latestUiNodesRef.current, nodeTemplates as any, - currentWorkflow?.type + workflowMetadata?.type ); }, - [executeWorkflow, latestUiNodes, nodeTemplates, currentWorkflow?.type] + [executeWorkflow, nodeTemplates, workflowMetadata?.type] ); const handleRetryLoading = () => { @@ -293,17 +302,6 @@ export function EditorPage() { setWorkflowBuilderKey(Date.now()); }; - if (workflowDetailsError) { - return ( - - ); - } - if (nodeTypesError) { return ( navigate(getOrgUrl("workflows"))} /> ); @@ -363,19 +358,19 @@ export function EditorPage() { setIsHttpIntegrationDialogOpen(true) : undefined } onShowEmailTrigger={ - currentWorkflow?.type === "email_message" + workflowMetadata?.type === "email_message" ? () => setIsEmailTriggerDialogOpen(true) : undefined } @@ -390,18 +385,18 @@ export function EditorPage() { createObjectUrl={createObjectUrl} />
- {currentWorkflow?.type === "http_request" && ( + {workflowMetadata?.type === "http_request" && ( setIsHttpIntegrationDialogOpen(false)} orgHandle={orgHandle} workflowId={id!} deploymentVersion="dev" - nodes={latestUiNodes} + nodes={latestUiNodesRef.current} nodeTemplates={nodeTemplates} /> )} - {currentWorkflow?.type === "http_request" && + {workflowMetadata?.type === "http_request" && executionFormParameters.length > 0 && ( )} - {currentWorkflow?.type === "http_request" && + {workflowMetadata?.type === "http_request" && executionJsonBodyParameters.length > 0 && ( )} - {currentWorkflow?.type === "email_message" && ( + {workflowMetadata?.type === "email_message" && ( setIsEmailTriggerDialogOpen(false)} orgHandle={orgHandle} - workflowHandle={currentWorkflow.handle} + workflowHandle={workflowMetadata.handle} deploymentVersion="dev" /> )} - {currentWorkflow?.type === "email_message" && ( + {workflowMetadata?.type === "email_message" && ( )} - {currentWorkflow?.type === "cron" && ( + {workflowMetadata?.type === "cron" && ( )} diff --git a/apps/web/src/services/workflow-service.ts b/apps/web/src/services/workflow-service.ts index 8e691f3f..2761e6c1 100644 --- a/apps/web/src/services/workflow-service.ts +++ b/apps/web/src/services/workflow-service.ts @@ -275,7 +275,10 @@ const wouldCreateIndirectCycle = ( /** * Hook to manage workflow execution, including parameter forms and status polling. */ -export function useWorkflowExecution(orgHandle: string) { +export function useWorkflowExecution( + orgHandle: string, + wsExecuteFn?: (options?: { parameters?: Record }) => void +) { const [isFormDialogVisible, setIsFormDialogVisible] = useState(false); const [isJsonBodyDialogVisible, setIsJsonBodyDialogVisible] = useState(false); const [isEmailFormDialogVisible, setIsEmailFormDialogVisible] = @@ -323,7 +326,7 @@ export function useWorkflowExecution(orgHandle: string) { const response = await makeOrgRequest( orgHandle, API_ENDPOINT_BASE, - `/${id}/execute/dev?monitorProgress=${request?.monitorProgress ?? true}`, + `/${id}/execute/dev`, { method: "POST", ...(request?.parameters && @@ -419,11 +422,31 @@ export function useWorkflowExecution(orgHandle: string) { cleanup(); pollingRef.current.cancelled = false; - executeAndPollWorkflow(id, { monitorProgress: true, ...request }) + if (wsExecuteFn) { + try { + wsExecuteFn({ + parameters: request?.parameters, + }); + } catch (error) { + console.error("WebSocket execution failed:", error); + onExecutionUpdate({ + id: "", + workflowId: id, + status: "error", + nodeExecutions: [], + error: + error instanceof Error + ? error.message + : "WebSocket execution failed", + }); + } + return cancelCurrentExecution; + } + + executeAndPollWorkflow(id, request) .then((initialExecution: WorkflowExecution) => { if (pollingRef.current.cancelled) return; - // Track the current execution pollingRef.current.currentExecutionId = initialExecution.id; pollingRef.current.currentWorkflowId = id; @@ -487,7 +510,13 @@ export function useWorkflowExecution(orgHandle: string) { return cancelCurrentExecution; }, - [executeAndPollWorkflow, cancelCurrentExecution, orgHandle, cleanup] + [ + wsExecuteFn, + executeAndPollWorkflow, + cancelCurrentExecution, + orgHandle, + cleanup, + ] ); const executeWorkflowWithForm = useCallback( diff --git a/apps/web/src/services/workflow-session-service.ts b/apps/web/src/services/workflow-session-service.ts new file mode 100644 index 00000000..456cf8c1 --- /dev/null +++ b/apps/web/src/services/workflow-session-service.ts @@ -0,0 +1,230 @@ +import type { + Edge, + Node, + WorkflowErrorMessage, + WorkflowExecution, + WorkflowExecutionUpdateMessage, + WorkflowInitMessage, + WorkflowState, + WorkflowUpdateMessage, +} from "@dafthunk/types"; + +import { getApiBaseUrl } from "@/config/api"; + +// Re-export for convenience +export type { WorkflowState }; + +type WebSocketMessage = + | WorkflowInitMessage + | WorkflowUpdateMessage + | WorkflowErrorMessage + | WorkflowExecutionUpdateMessage; + +export interface WorkflowWSOptions { + onInit?: (state: WorkflowState) => void; + onUpdate?: (state: WorkflowState) => void; + onError?: (error: string) => void; + onClose?: () => void; + onOpen?: () => void; + onExecutionUpdate?: (execution: WorkflowExecution) => void; +} + +export class WorkflowWebSocket { + private ws: WebSocket | null = null; + private reconnectAttempts = 0; + private maxReconnectAttempts = 5; + private reconnectDelay = 1000; // Start with 1 second + private shouldReconnect = true; + private currentState: WorkflowState | null = null; + + constructor( + private orgHandle: string, + private workflowId: string, + private options: WorkflowWSOptions = {} + ) {} + + connect(): void { + if ( + this.ws?.readyState === WebSocket.OPEN || + this.ws?.readyState === WebSocket.CONNECTING + ) { + return; + } + + const apiBaseUrl = getApiBaseUrl(); + const wsBaseUrl = apiBaseUrl.replace(/^http/, "ws"); + const url = `${wsBaseUrl}/${this.orgHandle}/ws/${this.workflowId}`; + + try { + this.ws = new WebSocket(url); + + this.ws.onopen = () => { + console.log("WebSocket connected"); + this.reconnectAttempts = 0; + this.reconnectDelay = 1000; + this.options.onOpen?.(); + }; + + this.ws.onmessage = (event) => { + try { + const message = JSON.parse(event.data) as WebSocketMessage; + + if ("error" in message) { + console.error("WebSocket error message:", message.error); + this.options.onError?.(message.error || ""); + } else if (message.type === "init") { + this.currentState = message.state; + this.options.onInit?.(message.state); + } else if (message.type === "update") { + this.currentState = message.state; + this.options.onUpdate?.(message.state); + } else if (message.type === "execution_update") { + this.options.onExecutionUpdate?.({ + id: message.executionId, + workflowId: this.workflowId, + status: message.status, + nodeExecutions: message.nodeExecutions, + error: message.error, + }); + } + } catch (error) { + console.error("Failed to parse WebSocket message:", error); + this.options.onError?.("Failed to parse message"); + } + }; + + this.ws.onerror = (error) => { + console.error("WebSocket error:", error); + this.options.onError?.("WebSocket connection error"); + }; + + this.ws.onclose = () => { + console.log("WebSocket closed"); + this.options.onClose?.(); + + if ( + this.shouldReconnect && + this.reconnectAttempts < this.maxReconnectAttempts + ) { + this.reconnectAttempts++; + console.log( + `Reconnecting... Attempt ${this.reconnectAttempts}/${this.maxReconnectAttempts}` + ); + + setTimeout(() => { + this.connect(); + }, this.reconnectDelay); + + // Exponential backoff + this.reconnectDelay = Math.min(this.reconnectDelay * 2, 30000); + } + }; + } catch (error) { + console.error("Failed to create WebSocket:", error); + this.options.onError?.("Failed to create WebSocket connection"); + } + } + + send(nodes: Node[], edges: Edge[]): void { + if (this.ws?.readyState !== WebSocket.OPEN) { + console.warn("WebSocket is not open, cannot send message"); + return; + } + + if (!this.currentState) { + console.warn("No current state available, cannot send update"); + return; + } + + try { + const updatedState: WorkflowState = { + ...this.currentState, + nodes, + edges, + timestamp: Date.now(), + }; + + const updateMsg: WorkflowUpdateMessage = { + type: "update", + state: updatedState, + }; + + this.currentState = updatedState; + this.ws.send(JSON.stringify(updateMsg)); + } catch (error) { + console.error("Failed to send WebSocket message:", error); + this.options.onError?.("Failed to send message"); + } + } + + /** + * Execute workflow and receive realtime updates via WebSocket + */ + executeWorkflow(options?: { parameters?: Record }): void { + if (this.ws?.readyState !== WebSocket.OPEN) { + console.warn("WebSocket is not open, cannot execute workflow"); + this.options.onError?.("WebSocket is not connected"); + return; + } + + try { + const executeMsg = { + type: "execute", + parameters: options?.parameters, + }; + this.ws.send(JSON.stringify(executeMsg)); + } catch (error) { + console.error("Failed to execute workflow:", error); + this.options.onError?.("Failed to execute workflow"); + } + } + + /** + * Register to receive updates for an existing execution + */ + registerForExecutionUpdates(executionId: string): void { + if (this.ws?.readyState !== WebSocket.OPEN) { + console.warn( + "WebSocket is not open, cannot register for execution updates" + ); + return; + } + + try { + const executeMsg = { + type: "execute", + executionId, + }; + this.ws.send(JSON.stringify(executeMsg)); + } catch (error) { + console.error("Failed to register for execution updates:", error); + this.options.onError?.("Failed to register for execution updates"); + } + } + + disconnect(): void { + this.shouldReconnect = false; + if (this.ws) { + this.ws.close(); + this.ws = null; + } + } + + isConnected(): boolean { + return this.ws?.readyState === WebSocket.OPEN; + } + + getWorkflowId(): string { + return this.workflowId; + } +} + +export const connectWorkflowWS = ( + orgHandle: string, + workflowId: string, + options: WorkflowWSOptions = {} +): WorkflowWebSocket => { + const ws = new WorkflowWebSocket(orgHandle, workflowId, options); + ws.connect(); + return ws; +}; diff --git a/apps/web/src/utils/utils.ts b/apps/web/src/utils/utils.ts index 97bf3983..438f3c20 100644 --- a/apps/web/src/utils/utils.ts +++ b/apps/web/src/utils/utils.ts @@ -18,17 +18,6 @@ export function cn(...inputs: ClassValue[]) { return twMerge(clsx(inputs)); } -export const debounce = ) => ReturnType>( - func: T, - wait: number -): ((...args: Parameters) => void) => { - let timeout: ReturnType; - return (...args: Parameters) => { - clearTimeout(timeout); - timeout = setTimeout(() => func(...args), wait); - }; -}; - // Helper function to extract and format parameters for the execution dialog export function extractDialogParametersFromNodes( nodes: Node[], @@ -118,7 +107,7 @@ export function adaptDeploymentNodesToReactFlowNodes( // Find the icon from nodeTemplates by matching the node type const template = nodeTemplates.find((t) => t.type === depNode.type); - const icon = template?.icon || "circle"; // fallback icon + const icon = depNode.icon || template?.icon || "circle"; // fallback icon return { id: depNode.id, diff --git a/packages/types/src/workflow.ts b/packages/types/src/workflow.ts index 3f8a0296..01a48b7c 100644 --- a/packages/types/src/workflow.ts +++ b/packages/types/src/workflow.ts @@ -395,7 +395,6 @@ export interface DeleteWorkflowResponse { * Request to execute a workflow */ export interface ExecuteWorkflowRequest { - monitorProgress?: boolean; // eslint-disable-next-line @typescript-eslint/no-explicit-any parameters?: Record; } @@ -465,3 +464,71 @@ export interface GetCronTriggerResponse { * Returns the full trigger information. */ export type UpsertCronTriggerResponse = GetCronTriggerResponse; + +/** + * WebSocket message types for websocket synchronization + */ + +/** + * Workflow state + */ +export interface WorkflowState extends Workflow { + timestamp: number; +} + +/** + * Message sent from server to client with initial state + */ +export interface WorkflowInitMessage { + type: "init"; + state: WorkflowState; +} + +/** + * Message sent from client to server to update state + */ +export interface WorkflowUpdateMessage { + type: "update"; + state: WorkflowState; +} + +/** + * Error message sent from server to client + */ +export interface WorkflowErrorMessage { + error: string; + details?: string; +} + +/** + * Message sent from client to server to start workflow execution + * or register for execution updates + */ +export interface WorkflowExecuteMessage { + type: "execute"; + /** If provided, register for updates on this execution. If not provided, start a new execution. */ + executionId?: string; + /** Additional parameters for workflow execution */ + parameters?: Record; +} + +/** + * Message sent from server to client with execution progress updates + */ +export interface WorkflowExecutionUpdateMessage { + type: "execution_update"; + executionId: string; + status: WorkflowExecutionStatus; + nodeExecutions: NodeExecution[]; + error?: string; +} + +/** + * All possible WebSocket messages + */ +export type WorkflowMessage = + | WorkflowInitMessage + | WorkflowUpdateMessage + | WorkflowErrorMessage + | WorkflowExecuteMessage + | WorkflowExecutionUpdateMessage; diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 558bbe69..415fa7b0 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -120,9 +120,6 @@ importers: '@cloudflare/vitest-pool-workers': specifier: ^0.8.58 version: 0.8.71(@cloudflare/workers-types@4.20250726.0)(@vitest/runner@3.2.4)(@vitest/snapshot@3.2.4)(vitest@3.2.4(@types/debug@4.1.12)(@types/node@22.15.31)(jiti@2.5.1)(tsx@4.20.3)(yaml@2.8.0)) - '@cloudflare/workers-types': - specifier: ^4.20250726.0 - version: 4.20250726.0 '@eslint/js': specifier: ^9.26.0 version: 9.29.0 @@ -163,8 +160,8 @@ importers: specifier: ^3.2.4 version: 3.2.4(@types/debug@4.1.12)(@types/node@22.15.31)(jiti@2.5.1)(tsx@4.20.3)(yaml@2.8.0) wrangler: - specifier: ^4.26.1 - version: 4.26.1(@cloudflare/workers-types@4.20250726.0) + specifier: ^4.42.0 + version: 4.42.0(@cloudflare/workers-types@4.20250726.0) apps/web: dependencies: @@ -737,6 +734,15 @@ packages: workerd: optional: true + '@cloudflare/unenv-preset@2.7.6': + resolution: {integrity: sha512-ykG2nd3trk6jbknRCH69xL3RpGLLbKCrbTbWSOvKEq7s4jH06yLrQlRr/q9IU+dK9p1JY1EXqhFK7VG5KqhzmQ==} + peerDependencies: + unenv: 2.0.0-rc.21 + workerd: ^1.20250927.0 + peerDependenciesMeta: + workerd: + optional: true + '@cloudflare/vitest-pool-workers@0.8.71': resolution: {integrity: sha512-keu2HCLQfRNwbmLBCDXJgCFpANTaYnQpE01fBOo4CNwiWHUT7SZGN7w64RKiSWRHyYppStXBuE5Ng7F42+flpg==} peerDependencies: @@ -756,6 +762,12 @@ packages: cpu: [x64] os: [darwin] + '@cloudflare/workerd-darwin-64@1.20251001.0': + resolution: {integrity: sha512-y1ST/cCscaRewWRnsHZdWbgiLJbki5UMGd0hMo/FLqjlztwPeDgQ5CGm5jMiCDdw/IBCpWxEukftPYR34rWNog==} + engines: {node: '>=16'} + cpu: [x64] + os: [darwin] + '@cloudflare/workerd-darwin-arm64@1.20250726.0': resolution: {integrity: sha512-I+TOQ+YQahxL/K7eS2GJzv5CZzSVaZoyqfB15Q71MT/+wyzPCaFDTt+fg3uXdwpaIQEMUfqFNpTQSqbKHAYNgA==} engines: {node: '>=16'} @@ -768,6 +780,12 @@ packages: cpu: [arm64] os: [darwin] + '@cloudflare/workerd-darwin-arm64@1.20251001.0': + resolution: {integrity: sha512-+z4QHHZ/Yix82zLFYS+ZS2UV09IENFPwDCEKUWfnrM9Km2jOOW3Ua4hJNob1EgQUYs8fFZo7k5O/tpwxMsSbbQ==} + engines: {node: '>=16'} + cpu: [arm64] + os: [darwin] + '@cloudflare/workerd-linux-64@1.20250726.0': resolution: {integrity: sha512-WSCv4o2uOW6b++ROVazrEW+jjZdBqCmXmmt7uVVfvjVxlzoYVwK9IvV2IXe4gsJ99HG9I0YCa7AT743cZ7TNNg==} engines: {node: '>=16'} @@ -780,6 +798,12 @@ packages: cpu: [x64] os: [linux] + '@cloudflare/workerd-linux-64@1.20251001.0': + resolution: {integrity: sha512-hGS+O2V9Mm2XjJUaB9ZHMA5asDUaDjKko42e+accbew0PQR7zrAl1afdII6hMqCLV4tk4GAjvhv281pN4g48rg==} + engines: {node: '>=16'} + cpu: [x64] + os: [linux] + '@cloudflare/workerd-linux-arm64@1.20250726.0': resolution: {integrity: sha512-jNokAGL3EQqH+31b0dX8+tlbKdjt/0UtTLvgD1e+7bOD92lzjYMa/CixHyMIY/FVvhsN4TNqfiz4cqroABTlhg==} engines: {node: '>=16'} @@ -792,6 +816,12 @@ packages: cpu: [arm64] os: [linux] + '@cloudflare/workerd-linux-arm64@1.20251001.0': + resolution: {integrity: sha512-QYaMK+pRgt28N7CX1JlJ+ToegJF9LxzqdT7MjWqPgVj9D2WTyIhBVYl3wYjJRcgOlnn+DRt42+li4T64CPEeuA==} + engines: {node: '>=16'} + cpu: [arm64] + os: [linux] + '@cloudflare/workerd-windows-64@1.20250726.0': resolution: {integrity: sha512-DiPTY63TNh6/ylvfutNQzYZi688x6NJDjQoqf5uiCp7xHweWx+GpVs42sZPeeXqCNvhm4dYjHjuigXJNh7t8Uw==} engines: {node: '>=16'} @@ -804,6 +834,12 @@ packages: cpu: [x64] os: [win32] + '@cloudflare/workerd-windows-64@1.20251001.0': + resolution: {integrity: sha512-ospnDR/FlyRvrv9DSHuxDAXmzEBLDUiAHQrQHda1iUH9HqxnNQ8giz9VlPfq7NIRc7bQ1ZdIYPGLJOY4Q366Ng==} + engines: {node: '>=16'} + cpu: [x64] + os: [win32] + '@cloudflare/workers-types@4.20250726.0': resolution: {integrity: sha512-NtM1yVBKJFX4LgSoZkVU0EDhWWvSb1vt6REO+uMYZRgx1HAfQz9GDN6bBB0B+fm2ZIxzt6FzlDbmrXpGJ2M/4Q==} @@ -4694,6 +4730,11 @@ packages: engines: {node: '>=18.0.0'} hasBin: true + miniflare@4.20251001.0: + resolution: {integrity: sha512-OHd31D2LT8JH+85nVXClV0Z18jxirCohzKNAcZs/fgt4mIkUDtidX3VqR3ovAM0jWooNxrFhB9NSs3iDbiJF7Q==} + engines: {node: '>=18.0.0'} + hasBin: true + minimatch@3.1.2: resolution: {integrity: sha512-J7p63hRiAjw1NDEww1W7i37+ByIrOWO5XQQAzZ3VOcL0PNybwpfmV/N05zFAzwQ9USyEcX6t3UO+K5aqBQOIHw==} @@ -5741,6 +5782,11 @@ packages: engines: {node: '>=16'} hasBin: true + workerd@1.20251001.0: + resolution: {integrity: sha512-oT/K4YWNhmwpVmGeaHNmF7mLRfgjszlVr7lJtpS4jx5khmxmMzWZEEQRrJEpgzeHP6DOq9qWLPNT0bjMK7TchQ==} + engines: {node: '>=16'} + hasBin: true + wrangler@4.26.1: resolution: {integrity: sha512-zGFEtHrjTAWOngm+zwEvYCxFwMSIBrzHa3Yu6rAxYMEzsT8PPvo2rdswyUJiUkpE9s2Depr37opceaY7JxEYFw==} engines: {node: '>=18.0.0'} @@ -5761,6 +5807,16 @@ packages: '@cloudflare/workers-types': optional: true + wrangler@4.42.0: + resolution: {integrity: sha512-OZXiUSfGD66OVkncDbjZtqrsH6bWPRQMYc6RmMbkzYm/lEvJ8lvARKcqDgEyq8zDAgJAivlMQLyPtKQoVjQ/4g==} + engines: {node: '>=18.0.0'} + hasBin: true + peerDependencies: + '@cloudflare/workers-types': ^4.20251001.0 + peerDependenciesMeta: + '@cloudflare/workers-types': + optional: true + wrap-ansi@7.0.0: resolution: {integrity: sha512-YVGIj2kamLSTxw6NsZjoBxfSwsn0ycdesmc4p+Q21c5zPuZ1pl+NfxVdxPtdHvmNVOQ6XSYG4AUtyt/Fi7D16Q==} engines: {node: '>=10'} @@ -6425,6 +6481,12 @@ snapshots: optionalDependencies: workerd: 1.20250906.0 + '@cloudflare/unenv-preset@2.7.6(unenv@2.0.0-rc.21)(workerd@1.20251001.0)': + dependencies: + unenv: 2.0.0-rc.21 + optionalDependencies: + workerd: 1.20251001.0 + '@cloudflare/vitest-pool-workers@0.8.71(@cloudflare/workers-types@4.20250726.0)(@vitest/runner@3.2.4)(@vitest/snapshot@3.2.4)(vitest@3.2.4(@types/debug@4.1.12)(@types/node@22.15.31)(jiti@2.5.1)(tsx@4.20.3)(yaml@2.8.0))': dependencies: '@vitest/runner': 3.2.4 @@ -6448,30 +6510,45 @@ snapshots: '@cloudflare/workerd-darwin-64@1.20250906.0': optional: true + '@cloudflare/workerd-darwin-64@1.20251001.0': + optional: true + '@cloudflare/workerd-darwin-arm64@1.20250726.0': optional: true '@cloudflare/workerd-darwin-arm64@1.20250906.0': optional: true + '@cloudflare/workerd-darwin-arm64@1.20251001.0': + optional: true + '@cloudflare/workerd-linux-64@1.20250726.0': optional: true '@cloudflare/workerd-linux-64@1.20250906.0': optional: true + '@cloudflare/workerd-linux-64@1.20251001.0': + optional: true + '@cloudflare/workerd-linux-arm64@1.20250726.0': optional: true '@cloudflare/workerd-linux-arm64@1.20250906.0': optional: true + '@cloudflare/workerd-linux-arm64@1.20251001.0': + optional: true + '@cloudflare/workerd-windows-64@1.20250726.0': optional: true '@cloudflare/workerd-windows-64@1.20250906.0': optional: true + '@cloudflare/workerd-windows-64@1.20251001.0': + optional: true + '@cloudflare/workers-types@4.20250726.0': {} '@cspotcode/source-map-support@0.8.1': @@ -6993,7 +7070,7 @@ snapshots: '@jridgewell/trace-mapping@0.3.25': dependencies: '@jridgewell/resolve-uri': 3.1.2 - '@jridgewell/sourcemap-codec': 1.5.0 + '@jridgewell/sourcemap-codec': 1.5.4 '@jridgewell/trace-mapping@0.3.9': dependencies: @@ -11218,6 +11295,24 @@ snapshots: - bufferutil - utf-8-validate + miniflare@4.20251001.0: + dependencies: + '@cspotcode/source-map-support': 0.8.1 + acorn: 8.14.0 + acorn-walk: 8.3.2 + exit-hook: 2.2.1 + glob-to-regexp: 0.4.1 + sharp: 0.33.5 + stoppable: 1.1.0 + undici: 7.14.0 + workerd: 1.20251001.0 + ws: 8.18.0 + youch: 4.1.0-beta.10 + zod: 3.25.76 + transitivePeerDependencies: + - bufferutil + - utf-8-validate + minimatch@3.1.2: dependencies: brace-expansion: 1.1.12 @@ -12348,6 +12443,14 @@ snapshots: '@cloudflare/workerd-linux-arm64': 1.20250906.0 '@cloudflare/workerd-windows-64': 1.20250906.0 + workerd@1.20251001.0: + optionalDependencies: + '@cloudflare/workerd-darwin-64': 1.20251001.0 + '@cloudflare/workerd-darwin-arm64': 1.20251001.0 + '@cloudflare/workerd-linux-64': 1.20251001.0 + '@cloudflare/workerd-linux-arm64': 1.20251001.0 + '@cloudflare/workerd-windows-64': 1.20251001.0 + wrangler@4.26.1(@cloudflare/workers-types@4.20250726.0): dependencies: '@cloudflare/kv-asset-handler': 0.4.0 @@ -12382,6 +12485,23 @@ snapshots: - bufferutil - utf-8-validate + wrangler@4.42.0(@cloudflare/workers-types@4.20250726.0): + dependencies: + '@cloudflare/kv-asset-handler': 0.4.0 + '@cloudflare/unenv-preset': 2.7.6(unenv@2.0.0-rc.21)(workerd@1.20251001.0) + blake3-wasm: 2.1.5 + esbuild: 0.25.4 + miniflare: 4.20251001.0 + path-to-regexp: 6.3.0 + unenv: 2.0.0-rc.21 + workerd: 1.20251001.0 + optionalDependencies: + '@cloudflare/workers-types': 4.20250726.0 + fsevents: 2.3.3 + transitivePeerDependencies: + - bufferutil + - utf-8-validate + wrap-ansi@7.0.0: dependencies: ansi-styles: 4.3.0