Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions apps/api/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import secretRoutes from "./routes/secrets";
import typeRoutes from "./routes/types";
import usageRoutes from "./routes/usage";
import workflowRoutes from "./routes/workflows";
import wsRoutes from "./routes/ws";

// Initialize Hono app with types
const app = new Hono<ApiContext>();
Expand Down Expand Up @@ -65,6 +66,7 @@ app.route("/:organizationIdOrHandle/secrets", secretRoutes);
app.route("/:organizationIdOrHandle/workflows", workflowRoutes);
app.route("/:organizationIdOrHandle/objects", objectRoutes);
app.route("/:organizationIdOrHandle/usage", usageRoutes);
app.route("/:organizationIdOrHandle/ws", wsRoutes);

export default {
scheduled: handleCronTriggers,
Expand Down
53 changes: 34 additions & 19 deletions apps/api/src/routes/workflows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -157,28 +157,40 @@ workflowRoutes.post(
*/
workflowRoutes.get("/:id", jwtMiddleware, async (c) => {
const id = c.req.param("id");
const db = createDatabase(c.env.DB);

const organizationId = c.get("organizationId")!;
const workflow = await getWorkflow(db, id, organizationId);
if (!workflow) {
return c.json({ error: "Workflow not found" }, 404);
const userId = c.var.jwtPayload?.sub;

if (!userId) {
return c.json({ error: "Unauthorized" }, 401);
}

const workflowData = workflow.data;

const response: GetWorkflowResponse = {
id: workflow.id,
name: workflow.name,
handle: workflow.handle,
type: workflowData.type,
createdAt: workflow.createdAt,
updatedAt: workflow.updatedAt,
nodes: workflowData.nodes || [],
edges: workflowData.edges || [],
};
try {
// Get workflow from database
const db = createDatabase(c.env.DB);
const workflow = await getWorkflow(db, id, organizationId);

return c.json(response);
if (!workflow) {
return c.json({ error: "Workflow not found" }, 404);
}

const workflowData = workflow.data;

const response: GetWorkflowResponse = {
id: workflow.id,
name: workflow.name,
handle: workflow.handle,
type: workflowData.type,
createdAt: workflow.createdAt,
updatedAt: workflow.updatedAt,
nodes: workflowData.nodes || [],
edges: workflowData.edges || [],
};

return c.json(response);
} catch (error) {
console.error("Error fetching workflow:", error);
return c.json({ error: "Failed to fetch workflow" }, 500);
}
});

/**
Expand Down Expand Up @@ -482,8 +494,11 @@ workflowRoutes.post(
let deploymentId: string | undefined;

if (version === "dev") {
// Get workflow data directly
// Get workflow data directly from database
workflow = await getWorkflow(db, workflowIdOrHandle, organizationId);
if (!workflow) {
return c.json({ error: "Workflow not found" }, 404);
}
workflowData = workflow.data;
} else {
// Get deployment based on version
Expand Down
175 changes: 175 additions & 0 deletions apps/api/src/routes/ws.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
import {
WorkflowAckMessage,
WorkflowErrorMessage,
WorkflowInitMessage,
WorkflowState,
WorkflowType,
WorkflowUpdateMessage,
} from "@dafthunk/types";
import { Hono } from "hono";

import { jwtMiddleware } from "../auth";
import { ApiContext } from "../context";
import { createDatabase, getWorkflow, updateWorkflow } from "../db";

const wsRoutes = new Hono<ApiContext>();

// WebSocket endpoint for real-time workflow state synchronization
wsRoutes.get("/", jwtMiddleware, async (c) => {
const upgradeHeader = c.req.header("Upgrade");

if (!upgradeHeader || upgradeHeader !== "websocket") {
return c.json({ error: "Expected WebSocket connection" }, 426);
}

const userId = c.var.jwtPayload?.sub;
const workflowId = c.req.query("workflowId");
const organizationId = c.get("organizationId")!;

if (!userId || !workflowId || !organizationId) {
console.error("Missing userId, workflowId or organizationId:", {
userId,
workflowId,
organizationId,
});
return c.json(
{ error: "Missing userId, workflowId or organizationId" },
400
);
}

// Load workflow from database
const db = createDatabase(c.env.DB);
let workflow;
try {
workflow = await getWorkflow(db, workflowId, organizationId);
} catch (error) {
console.error("Error loading workflow:", error);
return c.json({ error: "Failed to load workflow" }, 500);
}

// Create WebSocket pair
const pair = new WebSocketPair();
const [client, server] = Object.values(pair);

// Accept the WebSocket connection
server.accept();

// Prepare initial state
const initialState: WorkflowState = workflow
? {
id: workflow.id,
name: workflow.name,
handle: workflow.handle,
type: ((workflow.data as any).type || "manual") as WorkflowType,
nodes: (workflow.data as any).nodes || [],
edges: (workflow.data as any).edges || [],
timestamp: workflow.updatedAt
? workflow.updatedAt.getTime()
: Date.now(),
}
: {
id: workflowId,
name: "New Workflow",
handle: workflowId,
type: "manual" as WorkflowType,
nodes: [],
edges: [],
timestamp: Date.now(),
};

// Send initial state
const initMessage: WorkflowInitMessage = {
type: "init",
state: initialState,
};
server.send(JSON.stringify(initMessage));

// Handle incoming messages
server.addEventListener("message", async (event: MessageEvent) => {
try {
if (typeof event.data !== "string") {
const errorMsg: WorkflowErrorMessage = {
error: "Expected string message",
};
server.send(JSON.stringify(errorMsg));
return;
}

const data = JSON.parse(event.data);

if ("type" in data && data.type === "update") {
const updateMsg = data as WorkflowUpdateMessage;

// Update workflow in database
try {
// Fetch the latest workflow to avoid using stale initialState captured at connection time
const currentWorkflow = await getWorkflow(
db,
workflowId,
organizationId
);
if (!currentWorkflow) {
const errorMsg: WorkflowErrorMessage = {
error: "Workflow not found",
};
server.send(JSON.stringify(errorMsg));
return;
}

const currentData = currentWorkflow.data as any;

await updateWorkflow(db, workflowId, organizationId, {
data: {
id: workflowId,
name: currentData.name || currentWorkflow.name,
handle: currentData.handle || currentWorkflow.handle,
type: currentData.type || "manual",
nodes: updateMsg.nodes,
edges: updateMsg.edges,
},
});

// Send acknowledgment
const ackMsg: WorkflowAckMessage = {
type: "ack",
timestamp: Date.now(),
};
server.send(JSON.stringify(ackMsg));
} catch (error) {
console.error("Error updating workflow:", error);
const errorMsg: WorkflowErrorMessage = {
error: "Failed to update workflow",
details: error instanceof Error ? error.message : "Unknown error",
};
server.send(JSON.stringify(errorMsg));
}
}
} catch (error) {
console.error("WebSocket message error:", error);
const errorMsg: WorkflowErrorMessage = {
error: "Failed to process message",
details: error instanceof Error ? error.message : "Unknown error",
};
server.send(JSON.stringify(errorMsg));
}
});

// Handle errors
server.addEventListener("error", (event: Event) => {
console.error("WebSocket error:", event);
});

// Handle close
server.addEventListener("close", (event: CloseEvent) => {
console.log("WebSocket closed:", event.code, event.reason);
});

// Return response with WebSocket
return new Response(null, {
status: 101,
webSocket: client,
});
});

export default wsRoutes;
47 changes: 33 additions & 14 deletions apps/web/src/components/workflow/use-workflow-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ interface UseWorkflowStateReturn {
onConnect: OnConnect;
onConnectStart: OnConnectStart;
onConnectEnd: OnConnectEnd;
onNodeDragStop: (
event: React.MouseEvent,
node: ReactFlowNode<WorkflowNodeType>
) => void;
connectionValidationState: ConnectionValidationState;
isValidConnection: IsValidConnection<ReactFlowEdge<WorkflowEdgeType>>;
handleAddNode: () => void;
Expand Down Expand Up @@ -95,11 +99,12 @@ interface UseWorkflowStateReturn {
// Helper functions to replace workflowNodeStateService
const stripExecutionFields = (
data: WorkflowNodeType
): Omit<WorkflowNodeType, "executionState" | "error"> & {
): Omit<WorkflowNodeType, "executionState" | "error" | "nodeTemplates"> & {
outputs: Omit<WorkflowNodeType["outputs"][number], "value" | "isConnected">[];
inputs: Omit<WorkflowNodeType["inputs"][number], "isConnected">[];
} => {
const { executionState, error, ...rest } = data;
// Exclude nodeTemplates from comparison - it's UI metadata that shouldn't trigger persistence
const { executionState, error, nodeTemplates, ...rest } = data;

return {
...rest,
Expand Down Expand Up @@ -339,33 +344,36 @@ export function useWorkflowState({
}
}, [initialEdges, readonly, setEdges]);

// Effect to notify parent of changes for nodes
// Effect to notify parent of changes for nodes (excluding position changes during drag)
useEffect(() => {
if (readonly) return;

const nodeCountChanged = nodes.length !== initialNodes.length;
const hasDataOrPositionChanges = nodes.some((node) => {
const initialNode = initialNodes.find((n) => n.id === node.id);
if (!initialNode) return true;

if (
node.position.x !== initialNode.position.x ||
node.position.y !== initialNode.position.y
) {
return true;
// Check for data changes (excluding position)
let hasDataChanges = false;
nodes.forEach((node) => {
const initialNode = initialNodes.find((n) => n.id === node.id);
if (!initialNode) {
hasDataChanges = true;
return;
}

// Check data changes (not position)
const nodeData = stripExecutionFields(node.data);
const initialNodeData = stripExecutionFields(initialNode.data);
return JSON.stringify(nodeData) !== JSON.stringify(initialNodeData);
if (JSON.stringify(nodeData) !== JSON.stringify(initialNodeData)) {
hasDataChanges = true;
}
});

// Check for deleted nodes by looking for initialNodes that don't exist in the current nodes
// Check for deleted nodes
const hasDeletedNodes = initialNodes.some(
(initialNode) => !nodes.some((node) => node.id === initialNode.id)
);

if (nodeCountChanged || hasDataOrPositionChanges || hasDeletedNodes) {
// Save for data changes or node add/delete (position changes handled by onNodeDragStop)
if (nodeCountChanged || hasDataChanges || hasDeletedNodes) {
onNodesChangePersistCallback?.(nodes);
}
}, [nodes, onNodesChangePersistCallback, initialNodes, readonly]);
Expand Down Expand Up @@ -432,6 +440,16 @@ export function useWorkflowState({
setConnectionValidationState("default");
}, [readonly]);

// Handle node drag stop - save positions after drag completes
const onNodeDragStop = useCallback(
(_event: React.MouseEvent, _node: ReactFlowNode<WorkflowNodeType>) => {
if (readonly) return;
// Save with current node positions after drag completes
onNodesChangePersistCallback?.(nodes);
},
[readonly, nodes, onNodesChangePersistCallback]
);

// Function to validate connection based on type compatibility
const isValidConnection = useCallback(
(connection: any) => {
Expand Down Expand Up @@ -1128,6 +1146,7 @@ export function useWorkflowState({
onConnect,
onConnectStart,
onConnectEnd,
onNodeDragStop: readonly ? () => {} : onNodeDragStop,
connectionValidationState,
isValidConnection,
handleAddNode,
Expand Down
2 changes: 2 additions & 0 deletions apps/web/src/components/workflow/workflow-builder.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ export function WorkflowBuilder({
cutSelected,
pasteFromClipboard,
hasClipboardData,
onNodeDragStop,
} = useWorkflowState({
initialNodes,
initialEdges,
Expand Down Expand Up @@ -431,6 +432,7 @@ export function WorkflowBuilder({
onConnectStart={readonly ? () => {} : onConnectStart}
onConnectEnd={readonly ? () => {} : onConnectEnd}
onNodeDoubleClick={handleNodeDoubleClick}
onNodeDragStop={onNodeDragStop}
onInit={setReactFlowInstance}
onAddNode={readonly ? undefined : handleAddNode}
onAction={
Expand Down
Loading
Loading