From f6010e8c08393888888c8cbf183e8e7b24f8f119 Mon Sep 17 00:00:00 2001 From: leonardcser <73912641+leonardcser@users.noreply.github.com> Date: Tue, 7 Oct 2025 10:54:36 +0200 Subject: [PATCH 1/3] refactor(api): udpdate workflow object store endpoint --- apps/api/src/runtime/object-store.test.ts | 12 +++++------- apps/api/src/runtime/object-store.ts | 15 ++++++--------- 2 files changed, 11 insertions(+), 16 deletions(-) diff --git a/apps/api/src/runtime/object-store.test.ts b/apps/api/src/runtime/object-store.test.ts index 3a1ada07..26220dec 100644 --- a/apps/api/src/runtime/object-store.test.ts +++ b/apps/api/src/runtime/object-store.test.ts @@ -221,7 +221,7 @@ describe("ObjectStore", () => { expect(result).toBe("workflow-123"); expect(mockBucket.put).toHaveBeenCalledWith( - "workflows/workflow-123/workflow.json", + "workflows/workflow-123.json", JSON.stringify(workflow), expect.objectContaining({ httpMetadata: expect.objectContaining({ @@ -253,7 +253,7 @@ describe("ObjectStore", () => { expect(result).toEqual(workflow); expect(mockBucket.get).toHaveBeenCalledWith( - "workflows/workflow-123/workflow.json" + "workflows/workflow-123.json" ); }); @@ -275,7 +275,7 @@ describe("ObjectStore", () => { await store.deleteWorkflow("workflow-123"); expect(mockBucket.delete).toHaveBeenCalledWith( - "workflows/workflow-123/workflow.json" + "workflows/workflow-123.json" ); }); }); @@ -298,7 +298,7 @@ describe("ObjectStore", () => { expect(result).toBe("exec-456"); expect(mockBucket.put).toHaveBeenCalledWith( - "executions/exec-456/workflow.json", + "executions/exec-456.json", JSON.stringify(workflow), expect.objectContaining({ customMetadata: expect.objectContaining({ @@ -330,9 +330,7 @@ describe("ObjectStore", () => { const result = await store.readExecutionWorkflow("exec-456"); expect(result).toEqual(workflow); - expect(mockBucket.get).toHaveBeenCalledWith( - "executions/exec-456/workflow.json" - ); + expect(mockBucket.get).toHaveBeenCalledWith("executions/exec-456.json"); }); it("should throw error when execution workflow not found", async () => { diff --git a/apps/api/src/runtime/object-store.ts b/apps/api/src/runtime/object-store.ts index ecb4693b..a39c98ff 100644 --- a/apps/api/src/runtime/object-store.ts +++ b/apps/api/src/runtime/object-store.ts @@ -112,7 +112,7 @@ export class ObjectStore { async writeWorkflow(workflow: Workflow): Promise { await this.writeToR2( - `workflows/${workflow.id}/workflow.json`, + `workflows/${workflow.id}.json`, JSON.stringify(workflow), { httpMetadata: { @@ -130,7 +130,7 @@ export class ObjectStore { async readWorkflow(workflowId: string): Promise { const object = await this.readFromR2( - `workflows/${workflowId}/workflow.json`, + `workflows/${workflowId}.json`, "readWorkflow" ); @@ -143,10 +143,7 @@ export class ObjectStore { } async deleteWorkflow(workflowId: string): Promise { - await this.deleteFromR2( - `workflows/${workflowId}/workflow.json`, - "deleteWorkflow" - ); + await this.deleteFromR2(`workflows/${workflowId}.json`, "deleteWorkflow"); } async writeExecutionWorkflow( @@ -154,7 +151,7 @@ export class ObjectStore { workflow: Workflow ): Promise { await this.writeToR2( - `executions/${executionId}/workflow.json`, + `executions/${executionId}.json`, JSON.stringify(workflow), { httpMetadata: { @@ -174,7 +171,7 @@ export class ObjectStore { async readExecutionWorkflow(executionId: string): Promise { const object = await this.readFromR2( - `executions/${executionId}/workflow.json`, + `executions/${executionId}.json`, "readExecutionWorkflow" ); @@ -188,7 +185,7 @@ export class ObjectStore { async deleteExecutionWorkflow(executionId: string): Promise { await this.deleteFromR2( - `executions/${executionId}/workflow.json`, + `executions/${executionId}.json`, "deleteExecutionWorkflow" ); } From 36198cf97fe5676dc4fdae0e19049f04d2773a24 Mon Sep 17 00:00:00 2001 From: leonardcser <73912641+leonardcser@users.noreply.github.com> Date: Tue, 7 Oct 2025 12:00:03 +0200 Subject: [PATCH 2/3] refactor(api): migrate workflows, deployments and executions to R2 --- apps/api/src/cron.ts | 41 +- .../src/db/migrations/0012_migrate_to_r2.sql | 25 + .../src/db/migrations/meta/0012_snapshot.json | 1104 +++++++++++++++++ apps/api/src/db/migrations/meta/_journal.json | 7 + apps/api/src/db/queries.ts | 136 +- apps/api/src/db/schema/index.ts | 14 +- .../src/durable-objects/workflow-session.ts | 89 +- apps/api/src/email.ts | 38 +- apps/api/src/routes/deployments.ts | 118 +- apps/api/src/routes/executions.ts | 71 +- apps/api/src/routes/workflows.ts | 210 +++- apps/api/src/runtime/execution-persistence.ts | 18 +- apps/api/src/runtime/object-store.ts | 44 + 13 files changed, 1778 insertions(+), 137 deletions(-) create mode 100644 apps/api/src/db/migrations/0012_migrate_to_r2.sql create mode 100644 apps/api/src/db/migrations/meta/0012_snapshot.json diff --git a/apps/api/src/cron.ts b/apps/api/src/cron.ts index 9cc53659..179cb885 100644 --- a/apps/api/src/cron.ts +++ b/apps/api/src/cron.ts @@ -9,6 +9,7 @@ import { saveExecution, updateCronTriggerRunTimes, } from "./db/queries"; +import { ObjectStore } from "./runtime/object-store"; // This function will now handle the actual execution triggering and initial record saving async function executeWorkflow( @@ -22,7 +23,8 @@ async function executeWorkflow( deploymentId: string | undefined, db: ReturnType, env: Bindings, - _ctx: ExecutionContext + _ctx: ExecutionContext, + objectStore: ObjectStore ): Promise { console.log(`Attempting to execute workflow ${workflowInfo.id} via cron.`); @@ -64,7 +66,7 @@ async function executeWorkflow( status: "idle" as const, })); - await saveExecution(db, { + const initialExecution = await saveExecution(db, { id: executionId, workflowId: workflowInfo.id, deploymentId: deploymentId, @@ -77,6 +79,13 @@ async function executeWorkflow( startedAt: new Date(), }); console.log(`Initial execution record saved for ${executionId}`); + + // Save execution data to R2 + try { + await objectStore.writeExecution(initialExecution); + } catch (error) { + console.error(`Failed to save execution to R2: ${executionId}`, error); + } } catch (execError) { console.error( `Error executing workflow ${workflowInfo.id} or saving initial record:`, @@ -92,6 +101,7 @@ export async function handleCronTriggers( ): Promise { console.log(`Cron event triggered at: ${new Date(event.scheduledTime)}`); const db = createDatabase(env.DB); + const objectStore = new ObjectStore(env.RESSOURCES); const now = new Date(); try { @@ -117,9 +127,29 @@ export async function handleCronTriggers( try { if (versionAlias === "dev") { - workflowToExecute = workflow.data; + // Load workflow data from R2 + try { + workflowToExecute = await objectStore.readWorkflow(workflow.id); + } catch (error) { + console.error( + `Failed to load workflow data from R2 for ${workflow.id}:`, + error + ); + continue; + } } else if (deployment) { - workflowToExecute = deployment.workflowData; + // Load deployment workflow snapshot from R2 + try { + workflowToExecute = await objectStore.readDeploymentWorkflow( + deployment.id + ); + } catch (error) { + console.error( + `Failed to load deployment workflow data from R2 for ${deployment.id}:`, + error + ); + continue; + } deploymentIdToExecute = deployment.id; } else { console.error( @@ -142,7 +172,8 @@ export async function handleCronTriggers( deploymentIdToExecute, db, env, - ctx + ctx, + objectStore ); } else { // This case should ideally not be reached due to the checks above, diff --git a/apps/api/src/db/migrations/0012_migrate_to_r2.sql b/apps/api/src/db/migrations/0012_migrate_to_r2.sql new file mode 100644 index 00000000..5807ab15 --- /dev/null +++ b/apps/api/src/db/migrations/0012_migrate_to_r2.sql @@ -0,0 +1,25 @@ +PRAGMA foreign_keys=OFF;--> statement-breakpoint +CREATE TABLE `__new_secrets` ( + `id` text PRIMARY KEY NOT NULL, + `name` text NOT NULL, + `encrypted_value` text NOT NULL, + `organization_id` text NOT NULL, + `created_at` integer DEFAULT CURRENT_TIMESTAMP NOT NULL, + `updated_at` integer DEFAULT CURRENT_TIMESTAMP NOT NULL, + FOREIGN KEY (`organization_id`) REFERENCES `organizations`(`id`) ON UPDATE no action ON DELETE cascade +); +--> statement-breakpoint +INSERT INTO `__new_secrets`("id", "name", "encrypted_value", "organization_id", "created_at", "updated_at") SELECT "id", "name", "encrypted_value", "organization_id", "created_at", "updated_at" FROM `secrets`;--> statement-breakpoint +DROP TABLE `secrets`;--> statement-breakpoint +ALTER TABLE `__new_secrets` RENAME TO `secrets`;--> statement-breakpoint +PRAGMA foreign_keys=ON;--> statement-breakpoint +CREATE INDEX `secrets_name_idx` ON `secrets` (`name`);--> statement-breakpoint +CREATE INDEX `secrets_organization_id_idx` ON `secrets` (`organization_id`);--> statement-breakpoint +CREATE INDEX `secrets_created_at_idx` ON `secrets` (`created_at`);--> statement-breakpoint +CREATE UNIQUE INDEX `secrets_organization_id_name_unique_idx` ON `secrets` (`organization_id`,`name`);--> statement-breakpoint +DROP INDEX `executions_visibility_idx`;--> statement-breakpoint +ALTER TABLE `executions` DROP COLUMN `data`;--> statement-breakpoint +ALTER TABLE `executions` DROP COLUMN `visibility`;--> statement-breakpoint +ALTER TABLE `executions` DROP COLUMN `og_image_generated`;--> statement-breakpoint +ALTER TABLE `deployments` DROP COLUMN `workflow_data`;--> statement-breakpoint +ALTER TABLE `workflows` DROP COLUMN `data`; \ No newline at end of file diff --git a/apps/api/src/db/migrations/meta/0012_snapshot.json b/apps/api/src/db/migrations/meta/0012_snapshot.json new file mode 100644 index 00000000..5fa7cca3 --- /dev/null +++ b/apps/api/src/db/migrations/meta/0012_snapshot.json @@ -0,0 +1,1104 @@ +{ + "version": "6", + "dialect": "sqlite", + "id": "773573a6-db23-45fd-a3b5-ff69d98b20e6", + "prevId": "c51aea42-08b7-43f3-8fee-366375e07f75", + "tables": { + "api_keys": { + "name": "api_keys", + "columns": { + "id": { + "name": "id", + "type": "text", + "primaryKey": true, + "notNull": true, + "autoincrement": false + }, + "name": { + "name": "name", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "key": { + "name": "key", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "organization_id": { + "name": "organization_id", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "created_at": { + "name": "created_at", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": "CURRENT_TIMESTAMP" + }, + "updated_at": { + "name": "updated_at", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": "CURRENT_TIMESTAMP" + } + }, + "indexes": { + "api_keys_key_unique": { + "name": "api_keys_key_unique", + "columns": ["key"], + "isUnique": true + }, + "api_keys_name_idx": { + "name": "api_keys_name_idx", + "columns": ["name"], + "isUnique": false + }, + "api_keys_organization_id_idx": { + "name": "api_keys_organization_id_idx", + "columns": ["organization_id"], + "isUnique": false + }, + "api_keys_created_at_idx": { + "name": "api_keys_created_at_idx", + "columns": ["created_at"], + "isUnique": false + } + }, + "foreignKeys": { + "api_keys_organization_id_organizations_id_fk": { + "name": "api_keys_organization_id_organizations_id_fk", + "tableFrom": "api_keys", + "tableTo": "organizations", + "columnsFrom": ["organization_id"], + "columnsTo": ["id"], + "onDelete": "cascade", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "checkConstraints": {} + }, + "cron_triggers": { + "name": "cron_triggers", + "columns": { + "workflow_id": { + "name": "workflow_id", + "type": "text", + "primaryKey": true, + "notNull": true, + "autoincrement": false + }, + "cron_expression": { + "name": "cron_expression", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "version_alias": { + "name": "version_alias", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": "'dev'" + }, + "version_number": { + "name": "version_number", + "type": "integer", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "active": { + "name": "active", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": true + }, + "last_run": { + "name": "last_run", + "type": "integer", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "next_run_at": { + "name": "next_run_at", + "type": "integer", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "created_at": { + "name": "created_at", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": "CURRENT_TIMESTAMP" + }, + "updated_at": { + "name": "updated_at", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": "CURRENT_TIMESTAMP" + } + }, + "indexes": { + "cron_triggers_workflow_id_idx": { + "name": "cron_triggers_workflow_id_idx", + "columns": ["workflow_id"], + "isUnique": false + }, + "cron_triggers_version_alias_idx": { + "name": "cron_triggers_version_alias_idx", + "columns": ["version_alias"], + "isUnique": false + }, + "cron_triggers_version_number_idx": { + "name": "cron_triggers_version_number_idx", + "columns": ["version_number"], + "isUnique": false + }, + "cron_triggers_active_idx": { + "name": "cron_triggers_active_idx", + "columns": ["active"], + "isUnique": false + }, + "cron_triggers_last_run_idx": { + "name": "cron_triggers_last_run_idx", + "columns": ["last_run"], + "isUnique": false + }, + "cron_triggers_next_run_at_idx": { + "name": "cron_triggers_next_run_at_idx", + "columns": ["next_run_at"], + "isUnique": false + }, + "cron_triggers_created_at_idx": { + "name": "cron_triggers_created_at_idx", + "columns": ["created_at"], + "isUnique": false + }, + "cron_triggers_updated_at_idx": { + "name": "cron_triggers_updated_at_idx", + "columns": ["updated_at"], + "isUnique": false + } + }, + "foreignKeys": { + "cron_triggers_workflow_id_workflows_id_fk": { + "name": "cron_triggers_workflow_id_workflows_id_fk", + "tableFrom": "cron_triggers", + "tableTo": "workflows", + "columnsFrom": ["workflow_id"], + "columnsTo": ["id"], + "onDelete": "cascade", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "checkConstraints": {} + }, + "datasets": { + "name": "datasets", + "columns": { + "id": { + "name": "id", + "type": "text", + "primaryKey": true, + "notNull": true, + "autoincrement": false + }, + "name": { + "name": "name", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "handle": { + "name": "handle", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "organization_id": { + "name": "organization_id", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "created_at": { + "name": "created_at", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": "CURRENT_TIMESTAMP" + }, + "updated_at": { + "name": "updated_at", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": "CURRENT_TIMESTAMP" + } + }, + "indexes": { + "datasets_name_idx": { + "name": "datasets_name_idx", + "columns": ["name"], + "isUnique": false + }, + "datasets_handle_idx": { + "name": "datasets_handle_idx", + "columns": ["handle"], + "isUnique": false + }, + "datasets_organization_id_idx": { + "name": "datasets_organization_id_idx", + "columns": ["organization_id"], + "isUnique": false + }, + "datasets_created_at_idx": { + "name": "datasets_created_at_idx", + "columns": ["created_at"], + "isUnique": false + }, + "datasets_organization_id_handle_unique_idx": { + "name": "datasets_organization_id_handle_unique_idx", + "columns": ["organization_id", "handle"], + "isUnique": true + } + }, + "foreignKeys": { + "datasets_organization_id_organizations_id_fk": { + "name": "datasets_organization_id_organizations_id_fk", + "tableFrom": "datasets", + "tableTo": "organizations", + "columnsFrom": ["organization_id"], + "columnsTo": ["id"], + "onDelete": "cascade", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "checkConstraints": {} + }, + "deployments": { + "name": "deployments", + "columns": { + "id": { + "name": "id", + "type": "text", + "primaryKey": true, + "notNull": true, + "autoincrement": false + }, + "organization_id": { + "name": "organization_id", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "workflow_id": { + "name": "workflow_id", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "version": { + "name": "version", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "created_at": { + "name": "created_at", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": "CURRENT_TIMESTAMP" + }, + "updated_at": { + "name": "updated_at", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": "CURRENT_TIMESTAMP" + } + }, + "indexes": { + "deployments_organization_id_idx": { + "name": "deployments_organization_id_idx", + "columns": ["organization_id"], + "isUnique": false + }, + "deployments_workflow_id_idx": { + "name": "deployments_workflow_id_idx", + "columns": ["workflow_id"], + "isUnique": false + }, + "deployments_version_idx": { + "name": "deployments_version_idx", + "columns": ["version"], + "isUnique": false + }, + "deployments_created_at_idx": { + "name": "deployments_created_at_idx", + "columns": ["created_at"], + "isUnique": false + }, + "deployments_workflow_id_version_idx": { + "name": "deployments_workflow_id_version_idx", + "columns": ["workflow_id", "version"], + "isUnique": false + } + }, + "foreignKeys": { + "deployments_organization_id_organizations_id_fk": { + "name": "deployments_organization_id_organizations_id_fk", + "tableFrom": "deployments", + "tableTo": "organizations", + "columnsFrom": ["organization_id"], + "columnsTo": ["id"], + "onDelete": "cascade", + "onUpdate": "no action" + }, + "deployments_workflow_id_workflows_id_fk": { + "name": "deployments_workflow_id_workflows_id_fk", + "tableFrom": "deployments", + "tableTo": "workflows", + "columnsFrom": ["workflow_id"], + "columnsTo": ["id"], + "onDelete": "cascade", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "checkConstraints": {} + }, + "executions": { + "name": "executions", + "columns": { + "id": { + "name": "id", + "type": "text", + "primaryKey": true, + "notNull": true, + "autoincrement": false + }, + "workflow_id": { + "name": "workflow_id", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "deployment_id": { + "name": "deployment_id", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "organization_id": { + "name": "organization_id", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "status": { + "name": "status", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": "'started'" + }, + "error": { + "name": "error", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "started_at": { + "name": "started_at", + "type": "integer", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "ended_at": { + "name": "ended_at", + "type": "integer", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "created_at": { + "name": "created_at", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": "CURRENT_TIMESTAMP" + }, + "updated_at": { + "name": "updated_at", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": "CURRENT_TIMESTAMP" + } + }, + "indexes": { + "executions_workflow_id_idx": { + "name": "executions_workflow_id_idx", + "columns": ["workflow_id"], + "isUnique": false + }, + "executions_organization_id_idx": { + "name": "executions_organization_id_idx", + "columns": ["organization_id"], + "isUnique": false + }, + "executions_status_idx": { + "name": "executions_status_idx", + "columns": ["status"], + "isUnique": false + }, + "executions_deployment_id_idx": { + "name": "executions_deployment_id_idx", + "columns": ["deployment_id"], + "isUnique": false + }, + "executions_created_at_idx": { + "name": "executions_created_at_idx", + "columns": ["created_at"], + "isUnique": false + }, + "executions_started_at_idx": { + "name": "executions_started_at_idx", + "columns": ["started_at"], + "isUnique": false + }, + "executions_ended_at_idx": { + "name": "executions_ended_at_idx", + "columns": ["ended_at"], + "isUnique": false + }, + "executions_organization_id_status_idx": { + "name": "executions_organization_id_status_idx", + "columns": ["organization_id", "status"], + "isUnique": false + }, + "executions_workflow_id_status_idx": { + "name": "executions_workflow_id_status_idx", + "columns": ["workflow_id", "status"], + "isUnique": false + } + }, + "foreignKeys": { + "executions_workflow_id_workflows_id_fk": { + "name": "executions_workflow_id_workflows_id_fk", + "tableFrom": "executions", + "tableTo": "workflows", + "columnsFrom": ["workflow_id"], + "columnsTo": ["id"], + "onDelete": "cascade", + "onUpdate": "no action" + }, + "executions_deployment_id_deployments_id_fk": { + "name": "executions_deployment_id_deployments_id_fk", + "tableFrom": "executions", + "tableTo": "deployments", + "columnsFrom": ["deployment_id"], + "columnsTo": ["id"], + "onDelete": "cascade", + "onUpdate": "no action" + }, + "executions_organization_id_organizations_id_fk": { + "name": "executions_organization_id_organizations_id_fk", + "tableFrom": "executions", + "tableTo": "organizations", + "columnsFrom": ["organization_id"], + "columnsTo": ["id"], + "onDelete": "cascade", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "checkConstraints": {} + }, + "memberships": { + "name": "memberships", + "columns": { + "user_id": { + "name": "user_id", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "organization_id": { + "name": "organization_id", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "role": { + "name": "role", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": "'member'" + }, + "created_at": { + "name": "created_at", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": "CURRENT_TIMESTAMP" + }, + "updated_at": { + "name": "updated_at", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": "CURRENT_TIMESTAMP" + } + }, + "indexes": { + "memberships_role_idx": { + "name": "memberships_role_idx", + "columns": ["role"], + "isUnique": false + }, + "memberships_user_id_idx": { + "name": "memberships_user_id_idx", + "columns": ["user_id"], + "isUnique": false + }, + "memberships_organization_id_idx": { + "name": "memberships_organization_id_idx", + "columns": ["organization_id"], + "isUnique": false + }, + "memberships_created_at_idx": { + "name": "memberships_created_at_idx", + "columns": ["created_at"], + "isUnique": false + } + }, + "foreignKeys": { + "memberships_user_id_users_id_fk": { + "name": "memberships_user_id_users_id_fk", + "tableFrom": "memberships", + "tableTo": "users", + "columnsFrom": ["user_id"], + "columnsTo": ["id"], + "onDelete": "cascade", + "onUpdate": "no action" + }, + "memberships_organization_id_organizations_id_fk": { + "name": "memberships_organization_id_organizations_id_fk", + "tableFrom": "memberships", + "tableTo": "organizations", + "columnsFrom": ["organization_id"], + "columnsTo": ["id"], + "onDelete": "cascade", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": { + "memberships_user_id_organization_id_pk": { + "columns": ["user_id", "organization_id"], + "name": "memberships_user_id_organization_id_pk" + } + }, + "uniqueConstraints": {}, + "checkConstraints": {} + }, + "organizations": { + "name": "organizations", + "columns": { + "id": { + "name": "id", + "type": "text", + "primaryKey": true, + "notNull": true, + "autoincrement": false + }, + "name": { + "name": "name", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "handle": { + "name": "handle", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "compute_credits": { + "name": "compute_credits", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": 1000 + }, + "created_at": { + "name": "created_at", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": "CURRENT_TIMESTAMP" + }, + "updated_at": { + "name": "updated_at", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": "CURRENT_TIMESTAMP" + } + }, + "indexes": { + "organizations_handle_unique": { + "name": "organizations_handle_unique", + "columns": ["handle"], + "isUnique": true + }, + "organizations_name_idx": { + "name": "organizations_name_idx", + "columns": ["name"], + "isUnique": false + }, + "organizations_handle_idx": { + "name": "organizations_handle_idx", + "columns": ["handle"], + "isUnique": false + }, + "organizations_created_at_idx": { + "name": "organizations_created_at_idx", + "columns": ["created_at"], + "isUnique": false + } + }, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "checkConstraints": {} + }, + "secrets": { + "name": "secrets", + "columns": { + "id": { + "name": "id", + "type": "text", + "primaryKey": true, + "notNull": true, + "autoincrement": false + }, + "name": { + "name": "name", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "encrypted_value": { + "name": "encrypted_value", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "organization_id": { + "name": "organization_id", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "created_at": { + "name": "created_at", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": "CURRENT_TIMESTAMP" + }, + "updated_at": { + "name": "updated_at", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": "CURRENT_TIMESTAMP" + } + }, + "indexes": { + "secrets_name_idx": { + "name": "secrets_name_idx", + "columns": ["name"], + "isUnique": false + }, + "secrets_organization_id_idx": { + "name": "secrets_organization_id_idx", + "columns": ["organization_id"], + "isUnique": false + }, + "secrets_created_at_idx": { + "name": "secrets_created_at_idx", + "columns": ["created_at"], + "isUnique": false + }, + "secrets_organization_id_name_unique_idx": { + "name": "secrets_organization_id_name_unique_idx", + "columns": ["organization_id", "name"], + "isUnique": true + } + }, + "foreignKeys": { + "secrets_organization_id_organizations_id_fk": { + "name": "secrets_organization_id_organizations_id_fk", + "tableFrom": "secrets", + "tableTo": "organizations", + "columnsFrom": ["organization_id"], + "columnsTo": ["id"], + "onDelete": "cascade", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "checkConstraints": {} + }, + "users": { + "name": "users", + "columns": { + "id": { + "name": "id", + "type": "text", + "primaryKey": true, + "notNull": true, + "autoincrement": false + }, + "name": { + "name": "name", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "email": { + "name": "email", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "github_id": { + "name": "github_id", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "google_id": { + "name": "google_id", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "avatar_url": { + "name": "avatar_url", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "organization_id": { + "name": "organization_id", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "plan": { + "name": "plan", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": "'trial'" + }, + "role": { + "name": "role", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": "'user'" + }, + "developer_mode": { + "name": "developer_mode", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": false + }, + "created_at": { + "name": "created_at", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": "CURRENT_TIMESTAMP" + }, + "updated_at": { + "name": "updated_at", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": "CURRENT_TIMESTAMP" + } + }, + "indexes": { + "users_email_unique": { + "name": "users_email_unique", + "columns": ["email"], + "isUnique": true + }, + "users_github_id_unique": { + "name": "users_github_id_unique", + "columns": ["github_id"], + "isUnique": true + }, + "users_google_id_unique": { + "name": "users_google_id_unique", + "columns": ["google_id"], + "isUnique": true + }, + "users_github_id_idx": { + "name": "users_github_id_idx", + "columns": ["github_id"], + "isUnique": false + }, + "users_google_id_idx": { + "name": "users_google_id_idx", + "columns": ["google_id"], + "isUnique": false + }, + "users_organization_id_idx": { + "name": "users_organization_id_idx", + "columns": ["organization_id"], + "isUnique": false + }, + "users_email_idx": { + "name": "users_email_idx", + "columns": ["email"], + "isUnique": false + }, + "users_name_idx": { + "name": "users_name_idx", + "columns": ["name"], + "isUnique": false + }, + "users_plan_idx": { + "name": "users_plan_idx", + "columns": ["plan"], + "isUnique": false + }, + "users_role_idx": { + "name": "users_role_idx", + "columns": ["role"], + "isUnique": false + }, + "users_developer_mode_idx": { + "name": "users_developer_mode_idx", + "columns": ["developer_mode"], + "isUnique": false + }, + "users_created_at_idx": { + "name": "users_created_at_idx", + "columns": ["created_at"], + "isUnique": false + } + }, + "foreignKeys": { + "users_organization_id_organizations_id_fk": { + "name": "users_organization_id_organizations_id_fk", + "tableFrom": "users", + "tableTo": "organizations", + "columnsFrom": ["organization_id"], + "columnsTo": ["id"], + "onDelete": "cascade", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "checkConstraints": {} + }, + "workflows": { + "name": "workflows", + "columns": { + "id": { + "name": "id", + "type": "text", + "primaryKey": true, + "notNull": true, + "autoincrement": false + }, + "name": { + "name": "name", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "handle": { + "name": "handle", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "type": { + "name": "type", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": "'manual'" + }, + "organization_id": { + "name": "organization_id", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "created_at": { + "name": "created_at", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": "CURRENT_TIMESTAMP" + }, + "updated_at": { + "name": "updated_at", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": "CURRENT_TIMESTAMP" + } + }, + "indexes": { + "workflows_name_idx": { + "name": "workflows_name_idx", + "columns": ["name"], + "isUnique": false + }, + "workflows_type_idx": { + "name": "workflows_type_idx", + "columns": ["type"], + "isUnique": false + }, + "workflows_organization_id_idx": { + "name": "workflows_organization_id_idx", + "columns": ["organization_id"], + "isUnique": false + }, + "workflows_created_at_idx": { + "name": "workflows_created_at_idx", + "columns": ["created_at"], + "isUnique": false + }, + "workflows_updated_at_idx": { + "name": "workflows_updated_at_idx", + "columns": ["updated_at"], + "isUnique": false + }, + "workflows_organization_id_handle_unique_idx": { + "name": "workflows_organization_id_handle_unique_idx", + "columns": ["organization_id", "handle"], + "isUnique": true + } + }, + "foreignKeys": { + "workflows_organization_id_organizations_id_fk": { + "name": "workflows_organization_id_organizations_id_fk", + "tableFrom": "workflows", + "tableTo": "organizations", + "columnsFrom": ["organization_id"], + "columnsTo": ["id"], + "onDelete": "cascade", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "checkConstraints": {} + } + }, + "views": {}, + "enums": {}, + "_meta": { + "schemas": {}, + "tables": {}, + "columns": {} + }, + "internal": { + "indexes": {} + } +} diff --git a/apps/api/src/db/migrations/meta/_journal.json b/apps/api/src/db/migrations/meta/_journal.json index 8d349463..3bfff1d4 100644 --- a/apps/api/src/db/migrations/meta/_journal.json +++ b/apps/api/src/db/migrations/meta/_journal.json @@ -85,6 +85,13 @@ "when": 1758263902291, "tag": "0011_add_secrets", "breakpoints": true + }, + { + "idx": 12, + "version": "6", + "when": 1759828484317, + "tag": "0012_migrate_to_r2", + "breakpoints": true } ] } diff --git a/apps/api/src/db/queries.ts b/apps/api/src/db/queries.ts index d55d03d1..6a482679 100644 --- a/apps/api/src/db/queries.ts +++ b/apps/api/src/db/queries.ts @@ -11,6 +11,7 @@ import { alias } from "drizzle-orm/sqlite-core"; import { v7 as uuidv7 } from "uuid"; import { Bindings } from "../context"; +import { ObjectStore } from "../runtime/object-store"; import { encryptSecret } from "../utils/encryption"; import { type ApiKeyInsert, @@ -260,7 +261,7 @@ export async function getWorkflows( id: workflows.id, name: workflows.name, handle: workflows.handle, - data: workflows.data, + type: workflows.type, createdAt: workflows.createdAt, updatedAt: workflows.updatedAt, }) @@ -414,19 +415,20 @@ export async function createWorkflow( } /** - * Update a workflow, ensuring it belongs to the specified organization + * Update a workflow (metadata only), ensuring it belongs to the specified organization + * Note: Full workflow data should be saved to R2 separately by the caller * * @param db Database instance * @param id Workflow ID * @param organizationId Organization ID - * @param data Updated workflow data + * @param data Updated workflow metadata (name, type, timestamps, etc.) * @returns Updated workflow record */ export async function updateWorkflow( db: ReturnType, id: string, organizationId: string, - data: Partial & { data: WorkflowType } + data: Partial ): Promise { const [workflow] = await db .update(workflows) @@ -495,7 +497,8 @@ export async function getExecution( } /** - * Save an execution record + * Save an execution record (metadata only) + * Note: Full execution data should be saved to R2 separately by the caller * * @param db Database instance * @param record Execution data to save @@ -508,7 +511,7 @@ export async function saveExecution( const now = new Date(); const { nodeExecutions, userId, deploymentId, ...dbFields } = record; - // Create the execution object that will be stored as JSON in the data field + // Create the execution object that will be returned (and saved to R2 by caller) const executionData: WorkflowExecution = { id: record.id, workflowId: record.workflowId, @@ -520,11 +523,10 @@ export async function saveExecution( endedAt: record.endedAt, }; - // Create the record to insert into the database + // Create the metadata record to insert into the database (no data field) const dbRecord = { ...dbFields, deploymentId: deploymentId, - data: executionData, updatedAt: record.updatedAt ?? now, createdAt: record.createdAt ?? now, startedAt: record.startedAt, @@ -1954,3 +1956,121 @@ export async function getOrganizationMembershipsWithUsers( }, })); } + +/** + * Get workflow metadata from DB and full workflow data from R2 + * + * @param db Database instance + * @param objectStore ObjectStore instance for R2 operations + * @param workflowIdOrHandle Workflow ID or handle + * @param organizationIdOrHandle Organization ID or handle + * @returns Workflow metadata with full data from R2 + */ +export async function getWorkflowWithData( + db: ReturnType, + objectStore: ObjectStore, + workflowIdOrHandle: string, + organizationIdOrHandle: string +): Promise<(WorkflowRow & { data: WorkflowType }) | undefined> { + const workflow = await getWorkflow( + db, + workflowIdOrHandle, + organizationIdOrHandle + ); + + if (!workflow) { + return undefined; + } + + try { + const workflowData = await objectStore.readWorkflow(workflow.id); + return { + ...workflow, + data: workflowData, + }; + } catch (error) { + console.error( + `Failed to read workflow data from R2 for ${workflow.id}:`, + error + ); + throw error; + } +} + +/** + * Get deployment metadata from DB and workflow snapshot from R2 + * + * @param db Database instance + * @param objectStore ObjectStore instance for R2 operations + * @param deploymentId Deployment ID + * @param organizationIdOrHandle Organization ID or handle + * @returns Deployment metadata with workflow data from R2 + */ +export async function getDeploymentWithData( + db: ReturnType, + objectStore: ObjectStore, + deploymentId: string, + organizationIdOrHandle: string +): Promise<(DeploymentRow & { workflowData: WorkflowType }) | undefined> { + const deployment = await getDeployment( + db, + deploymentId, + organizationIdOrHandle + ); + + if (!deployment) { + return undefined; + } + + try { + const workflowData = await objectStore.readDeploymentWorkflow( + deployment.id + ); + return { + ...deployment, + workflowData, + }; + } catch (error) { + console.error( + `Failed to read deployment workflow from R2 for ${deployment.id}:`, + error + ); + throw error; + } +} + +/** + * Get execution metadata from DB and full execution data from R2 + * + * @param db Database instance + * @param objectStore ObjectStore instance for R2 operations + * @param executionId Execution ID + * @param organizationIdOrHandle Organization ID or handle + * @returns Execution metadata with full data from R2 + */ +export async function getExecutionWithData( + db: ReturnType, + objectStore: ObjectStore, + executionId: string, + organizationIdOrHandle: string +): Promise<(ExecutionRow & { data: WorkflowExecution }) | undefined> { + const execution = await getExecution(db, executionId, organizationIdOrHandle); + + if (!execution) { + return undefined; + } + + try { + const executionData = await objectStore.readExecution(execution.id); + return { + ...execution, + data: executionData, + }; + } catch (error) { + console.error( + `Failed to read execution data from R2 for ${execution.id}:`, + error + ); + throw error; + } +} diff --git a/apps/api/src/db/schema/index.ts b/apps/api/src/db/schema/index.ts index 1f308b6d..99320700 100644 --- a/apps/api/src/db/schema/index.ts +++ b/apps/api/src/db/schema/index.ts @@ -1,7 +1,3 @@ -import { - Workflow as WorkflowType, - WorkflowExecution as WorkflowExecutionType, -} from "@dafthunk/types"; import { sql } from "drizzle-orm"; import { relations } from "drizzle-orm"; import { @@ -201,13 +197,13 @@ export const apiKeys = sqliteTable( ); // Workflows - Workflow definitions created and edited by users +// Note: Full workflow data is stored in R2, only metadata is in the database export const workflows = sqliteTable( "workflows", { id: text("id").primaryKey(), name: text("name").notNull(), handle: text("handle").notNull(), - data: text("data", { mode: "json" }).$type().notNull(), type: text("type") .$type() .notNull() @@ -232,6 +228,7 @@ export const workflows = sqliteTable( ); // Deployments - Versioned workflow definitions ready for execution +// Note: Workflow snapshot is stored in R2, only metadata is in the database export const deployments = sqliteTable( "deployments", { @@ -243,9 +240,6 @@ export const deployments = sqliteTable( onDelete: "cascade", }), version: integer("version").notNull(), - workflowData: text("workflow_data", { mode: "json" }) - .$type() - .notNull(), createdAt: createCreatedAt(), updatedAt: createUpdatedAt(), }, @@ -263,6 +257,7 @@ export const deployments = sqliteTable( ); // Executions - Records of workflow runs with status and results +// Note: Full execution data is stored in R2, only metadata is in the database export const executions = sqliteTable( "executions", { @@ -280,9 +275,6 @@ export const executions = sqliteTable( .$type() .notNull() .default(ExecutionStatus.STARTED), - data: text("data", { mode: "json" }) - .$type() - .notNull(), error: text("error"), startedAt: integer("started_at", { mode: "timestamp" }), endedAt: integer("ended_at", { mode: "timestamp" }), diff --git a/apps/api/src/durable-objects/workflow-session.ts b/apps/api/src/durable-objects/workflow-session.ts index 791537c4..c31ade5f 100644 --- a/apps/api/src/durable-objects/workflow-session.ts +++ b/apps/api/src/durable-objects/workflow-session.ts @@ -13,7 +13,6 @@ import { WorkflowInitMessage, WorkflowMessage, WorkflowState, - WorkflowType, WorkflowUpdateMessage, } from "@dafthunk/types"; import { DurableObject } from "cloudflare:workers"; @@ -25,6 +24,7 @@ import { getWorkflowWithUserAccess, updateWorkflow, } from "../db/queries"; +import { ObjectStore } from "../runtime/object-store"; export class WorkflowSession extends DurableObject { private static readonly PERSIST_DEBOUNCE_MS = 500; @@ -42,7 +42,7 @@ export class WorkflowSession extends DurableObject { } /** - * Load workflow from D1 database with user access verification + * Load workflow from D1 database (metadata) and R2 (full data) with user access verification */ private async loadState(workflowId: string, userId: string): Promise { console.log(`Loading workflow ${workflowId} for user ${userId}`); @@ -57,33 +57,40 @@ export class WorkflowSession extends DurableObject { const { workflow, organizationId } = result; - const { name, handle, type, nodes, edges, timestamp } = - this.extractWorkflowData(workflow, workflowId); + // Load full workflow data from R2 + const objectStore = new ObjectStore(this.env.RESSOURCES); + let workflowData; + try { + workflowData = await objectStore.readWorkflow(workflowId); + } catch (error) { + console.error( + `Failed to load workflow data from R2 for ${workflowId}:`, + error + ); + // Fall back to empty workflow structure + workflowData = { + id: workflowId, + name: workflow.name, + handle: workflow.handle, + type: workflow.type, + nodes: [], + edges: [], + }; + } this.state = { id: workflowId, - name, - handle, - type, - nodes, - edges, - timestamp, + name: workflowData.name, + handle: workflowData.handle, + type: workflowData.type, + nodes: workflowData.nodes, + edges: workflowData.edges, + timestamp: workflow?.updatedAt?.getTime() || Date.now(), }; this.organizationId = organizationId; } - private extractWorkflowData(workflow: any, workflowId: string) { - return { - name: workflow?.name || "New Workflow", - handle: workflow?.handle || workflowId, - type: (workflow?.data?.type || "manual") as WorkflowType, - nodes: workflow?.data?.nodes || [], - edges: workflow?.data?.edges || [], - timestamp: workflow?.updatedAt?.getTime() || Date.now(), - }; - } - /** * Get state from memory */ @@ -164,7 +171,7 @@ export class WorkflowSession extends DurableObject { } /** - * Persist state back to D1 database + * Persist state back to D1 database (metadata) and R2 (full data) */ private async persistToDatabase(): Promise { if (!this.state || !this.organizationId) { @@ -173,21 +180,29 @@ export class WorkflowSession extends DurableObject { try { const db = createDatabase(this.env.DB); + const objectStore = new ObjectStore(this.env.RESSOURCES); + + // Save full workflow data to R2 + const workflowData = { + id: this.state.id, + name: this.state.name, + handle: this.state.handle, + type: this.state.type, + nodes: this.state.nodes, + edges: this.state.edges, + }; + + await objectStore.writeWorkflow(workflowData); + + // Save metadata to D1 database await updateWorkflow(db, this.state.id, this.organizationId, { name: this.state.name, - data: { - id: this.state.id, - name: this.state.name, - handle: this.state.handle, - type: this.state.type, - nodes: this.state.nodes, - edges: this.state.edges, - }, + type: this.state.type, }); - console.log(`Persisted workflow ${this.state.id} to D1 database`); + console.log(`Persisted workflow ${this.state.id} to D1 database and R2`); } catch (error) { - console.error("Error persisting workflow to database:", error); + console.error("Error persisting workflow:", error); } } @@ -443,7 +458,7 @@ export class WorkflowSession extends DurableObject { status: "executing" as const, })); - // Save initial execution record + // Save initial execution record (metadata to DB) const initialExecution = await saveExecution(db, { id: executionId, workflowId: this.state.id, @@ -455,6 +470,14 @@ export class WorkflowSession extends DurableObject { updatedAt: new Date(), }); + // Save execution data to R2 + const objectStore = new ObjectStore(this.env.RESSOURCES); + try { + await objectStore.writeExecution(initialExecution); + } catch (error) { + console.error(`Failed to save execution to R2: ${executionId}`, error); + } + // Store execution for this WebSocket this.executions.set(ws, { id: initialExecution.id, diff --git a/apps/api/src/email.ts b/apps/api/src/email.ts index bc75fb10..b4bdc99b 100644 --- a/apps/api/src/email.ts +++ b/apps/api/src/email.ts @@ -12,6 +12,7 @@ import { getLatestDeployment, saveExecution, } from "./db"; +import { ObjectStore } from "./runtime/object-store"; async function streamToString( stream: ReadableStream @@ -74,6 +75,7 @@ export async function handleIncomingEmail( console.log(`Parsed trigger type: ${triggerType}`); const db = createDatabase(env.DB); + const objectStore = new ObjectStore(env.RESSOURCES); // Get workflow data either from deployment or directly from workflow let workflowData: WorkflowType; @@ -81,7 +83,7 @@ export async function handleIncomingEmail( let deploymentId: string | undefined; if (version === "dev") { - // Get workflow data directly + // Get workflow metadata from DB workflow = await getWorkflow( db, workflowIdOrHandle, @@ -91,7 +93,17 @@ export async function handleIncomingEmail( console.error("Workflow not found"); return; } - workflowData = workflow.data as WorkflowType; + + // Load workflow data from R2 + try { + workflowData = await objectStore.readWorkflow(workflow.id); + } catch (error) { + console.error( + `Failed to load workflow data from R2 for ${workflow.id}:`, + error + ); + return; + } } else { // Get deployment based on version let deployment; @@ -119,7 +131,18 @@ export async function handleIncomingEmail( } deploymentId = deployment.id; - workflowData = deployment.workflowData as WorkflowType; + + // Load deployment workflow snapshot from R2 + try { + workflowData = await objectStore.readDeploymentWorkflow(deployment.id); + } catch (error) { + console.error( + `Failed to load deployment workflow data from R2 for ${deployment.id}:`, + error + ); + return; + } + workflow = { id: deployment.workflowId, name: workflowData.name, @@ -177,7 +200,7 @@ export async function handleIncomingEmail( })); // Save initial execution record - await saveExecution(db, { + const initialExecution = await saveExecution(db, { id: executionId, workflowId: workflow.id, deploymentId, @@ -188,4 +211,11 @@ export async function handleIncomingEmail( createdAt: new Date(), updatedAt: new Date(), }); + + // Save execution data to R2 + try { + await objectStore.writeExecution(initialExecution); + } catch (error) { + console.error(`Failed to save execution to R2: ${executionId}`, error); + } } diff --git a/apps/api/src/routes/deployments.ts b/apps/api/src/routes/deployments.ts index 7b0ee33e..78bdc8ef 100644 --- a/apps/api/src/routes/deployments.ts +++ b/apps/api/src/routes/deployments.ts @@ -6,7 +6,6 @@ import { GetWorkflowDeploymentsResponse, ListDeploymentsResponse, Node, - Workflow as WorkflowType, } from "@dafthunk/types"; import { JWTTokenPayload } from "@dafthunk/types"; import { Hono } from "hono"; @@ -27,6 +26,7 @@ import { saveExecution, } from "../db"; import { createRateLimitMiddleware } from "../middleware/rate-limit"; +import { ObjectStore } from "../runtime/object-store"; // Extend the ApiContext with our custom variable type ExtendedApiContext = ApiContext & { @@ -73,7 +73,18 @@ deploymentRoutes.get("/version/:deploymentId", jwtMiddleware, async (c) => { return c.json({ error: "Deployment not found" }, 404); } - const workflowData = deployment.workflowData as WorkflowType; + // Load deployment workflow snapshot from R2 + const objectStore = new ObjectStore(c.env.RESSOURCES); + let workflowData; + try { + workflowData = await objectStore.readDeploymentWorkflow(deployment.id); + } catch (error) { + console.error( + `Failed to load deployment workflow from R2 for ${deployment.id}:`, + error + ); + return c.json({ error: "Failed to load deployment data" }, 500); + } // Transform to match WorkflowDeploymentVersion type const deploymentVersion: GetDeploymentVersionResponse = { @@ -115,7 +126,18 @@ deploymentRoutes.get("/:workflowIdOrHandle", jwtMiddleware, async (c) => { return c.json({ error: "No deployments found for this workflow" }, 404); } - const workflowData = deployment.workflowData as WorkflowType; + // Load deployment workflow snapshot from R2 + const objectStore = new ObjectStore(c.env.RESSOURCES); + let workflowData; + try { + workflowData = await objectStore.readDeploymentWorkflow(deployment.id); + } catch (error) { + console.error( + `Failed to load deployment workflow from R2 for ${deployment.id}:`, + error + ); + return c.json({ error: "Failed to load deployment data" }, 500); + } // Transform to match WorkflowDeploymentVersion type const deploymentVersion: GetDeploymentVersionResponse = { @@ -147,7 +169,18 @@ deploymentRoutes.post("/:workflowIdOrHandle", jwtMiddleware, async (c) => { return c.json({ error: "Workflow not found" }, 404); } - const workflowData = workflow.data as WorkflowType; + // Load full workflow data from R2 + const objectStore = new ObjectStore(c.env.RESSOURCES); + let workflowData; + try { + workflowData = await objectStore.readWorkflow(workflow.id); + } catch (error) { + console.error( + `Failed to load workflow data from R2 for ${workflow.id}:`, + error + ); + return c.json({ error: "Failed to load workflow data" }, 500); + } // Get the latest version number and increment const latestVersion = @@ -158,18 +191,29 @@ deploymentRoutes.post("/:workflowIdOrHandle", jwtMiddleware, async (c) => { )) || 0; const newVersion = latestVersion + 1; - // Create new deployment + // Create new deployment (metadata only in DB) const deploymentId = uuid(); const newDeployment = await createDeployment(db, { id: deploymentId, organizationId: organizationId, workflowId: workflowIdOrHandle, version: newVersion, - workflowData: workflowData, createdAt: now, updatedAt: now, }); + // Save workflow snapshot to R2 + try { + await objectStore.writeDeploymentWorkflow(deploymentId, workflowData); + } catch (error) { + console.error( + `Failed to save deployment workflow to R2 for ${deploymentId}:`, + error + ); + // Consider rolling back the deployment creation here + return c.json({ error: "Failed to save deployment data" }, 500); + } + // Transform to match WorkflowDeploymentVersion type const deploymentVersion: DeploymentVersion = { id: newDeployment.id, @@ -209,19 +253,35 @@ deploymentRoutes.get( organizationId ); - // Transform to match WorkflowDeploymentVersion type - const deploymentVersions = deploymentsList.map((deployment) => { - const workflowData = deployment.workflowData as WorkflowType; - return { - id: deployment.id, - workflowId: deployment.workflowId || "", - version: deployment.version, - createdAt: deployment.createdAt, - updatedAt: deployment.updatedAt, - nodes: workflowData.nodes || [], - edges: workflowData.edges || [], - }; - }); + // Load all deployment workflow snapshots from R2 in parallel + const objectStore = new ObjectStore(c.env.RESSOURCES); + const deploymentVersions = await Promise.all( + deploymentsList.map(async (deployment) => { + let workflowData; + try { + workflowData = await objectStore.readDeploymentWorkflow( + deployment.id + ); + } catch (error) { + console.error( + `Failed to load deployment workflow from R2 for ${deployment.id}:`, + error + ); + // Return deployment with empty nodes/edges if R2 read fails + workflowData = { nodes: [], edges: [] }; + } + + return { + id: deployment.id, + workflowId: deployment.workflowId || "", + version: deployment.version, + createdAt: deployment.createdAt, + updatedAt: deployment.updatedAt, + nodes: workflowData.nodes || [], + edges: workflowData.edges || [], + }; + }) + ); const response: GetWorkflowDeploymentsResponse = { workflow: { @@ -279,7 +339,18 @@ deploymentRoutes.post( return c.json({ error: "Deployment not found" }, 404); } - const workflowData = deployment.workflowData as WorkflowType; + // Load deployment workflow snapshot from R2 + const objectStore = new ObjectStore(c.env.RESSOURCES); + let workflowData; + try { + workflowData = await objectStore.readDeploymentWorkflow(deployment.id); + } catch (error) { + console.error( + `Failed to load deployment workflow from R2 for ${deployment.id}:`, + error + ); + return c.json({ error: "Failed to load deployment data" }, 500); + } // Validate if workflow has nodes if (!workflowData.nodes || workflowData.nodes.length === 0) { @@ -365,6 +436,13 @@ deploymentRoutes.post( updatedAt: new Date(), }); + // Save execution data to R2 + try { + await objectStore.writeExecution(initialExecution); + } catch (error) { + console.error(`Failed to save execution to R2: ${executionId}`, error); + } + // Return the initial execution object, explicitly matching ExecuteDeploymentResponse const response: ExecuteDeploymentResponse = { id: initialExecution.id, diff --git a/apps/api/src/routes/executions.ts b/apps/api/src/routes/executions.ts index 512d0b67..faa5e90f 100644 --- a/apps/api/src/routes/executions.ts +++ b/apps/api/src/routes/executions.ts @@ -16,6 +16,7 @@ import { getWorkflowNames, listExecutions, } from "../db"; +import { ObjectStore } from "../runtime/object-store"; const executionRoutes = new Hono(); @@ -38,7 +39,25 @@ executionRoutes.get("/:id", apiKeyOrJwtMiddleware, async (c) => { organizationId ); - const executionData = execution.data as WorkflowExecution; + // Load execution data from R2 + const objectStore = new ObjectStore(c.env.RESSOURCES); + let executionData; + try { + executionData = await objectStore.readExecution(execution.id); + } catch (error) { + console.error( + `Failed to load execution data from R2 for ${execution.id}:`, + error + ); + // Fallback to empty data if R2 read fails + executionData = { + id: execution.id, + workflowId: execution.workflowId, + status: execution.status as WorkflowExecutionStatus, + nodeExecutions: [], + }; + } + const workflowExecution: WorkflowExecution = { id: execution.id, workflowId: execution.workflowId, @@ -84,21 +103,41 @@ executionRoutes.get("/", jwtMiddleware, async (c) => { const workflowNames = await getWorkflowNames(db, workflowIds); const workflowMap = new Map(workflowNames.map((w) => [w.id, w.name])); - // Map to WorkflowExecution type - const results = executions.map((execution) => { - const executionData = execution.data as WorkflowExecution; - return { - id: execution.id, - workflowId: execution.workflowId, - workflowName: workflowMap.get(execution.workflowId) || "Unknown Workflow", - deploymentId: execution.deploymentId ?? undefined, - status: execution.status as WorkflowExecutionStatus, - nodeExecutions: executionData.nodeExecutions || [], - error: execution.error || undefined, - startedAt: execution.startedAt ?? executionData.startedAt, - endedAt: execution.endedAt ?? executionData.endedAt, - }; - }); + // Load all execution data from R2 in parallel + const objectStore = new ObjectStore(c.env.RESSOURCES); + const results = await Promise.all( + executions.map(async (execution) => { + let executionData; + try { + executionData = await objectStore.readExecution(execution.id); + } catch (error) { + console.error( + `Failed to load execution data from R2 for ${execution.id}:`, + error + ); + // Fallback to empty data if R2 read fails + executionData = { + id: execution.id, + workflowId: execution.workflowId, + status: execution.status as WorkflowExecutionStatus, + nodeExecutions: [], + }; + } + + return { + id: execution.id, + workflowId: execution.workflowId, + workflowName: + workflowMap.get(execution.workflowId) || "Unknown Workflow", + deploymentId: execution.deploymentId ?? undefined, + status: execution.status as WorkflowExecutionStatus, + nodeExecutions: executionData.nodeExecutions || [], + error: execution.error || undefined, + startedAt: execution.startedAt ?? executionData.startedAt, + endedAt: execution.endedAt ?? executionData.endedAt, + }; + }) + ); const response: ListExecutionsResponse = { executions: results }; return c.json(response); diff --git a/apps/api/src/routes/workflows.ts b/apps/api/src/routes/workflows.ts index f85e4e04..8e6d6f21 100644 --- a/apps/api/src/routes/workflows.ts +++ b/apps/api/src/routes/workflows.ts @@ -43,6 +43,7 @@ import { type WorkflowInsert, } from "../db"; import { createRateLimitMiddleware } from "../middleware/rate-limit"; +import { ObjectStore } from "../runtime/object-store"; import { validateWorkflow } from "../utils/workflows"; // Extend the ApiContext with our custom variable @@ -71,7 +72,7 @@ workflowRoutes.get("/", jwtMiddleware, async (c) => { id: workflow.id, name: workflow.name, handle: workflow.handle, - type: workflow.data.type, + type: workflow.type, createdAt: workflow.createdAt, updatedAt: workflow.updatedAt, nodes: [], @@ -117,35 +118,43 @@ workflowRoutes.post( edges: Array.isArray(data.edges) ? data.edges : [], }; + const validationErrors = validateWorkflow(workflowData); + if (validationErrors.length > 0) { + return c.json({ errors: validationErrors }, 400); + } + + // Save metadata to database const newWorkflowData: WorkflowInsert = { id: workflowData.id, name: workflowData.name, handle: workflowData.handle, - data: workflowData, + type: workflowData.type, organizationId: organizationId, createdAt: now, updatedAt: now, }; - const validationErrors = validateWorkflow(workflowData); - if (validationErrors.length > 0) { - return c.json({ errors: validationErrors }, 400); - } - const db = createDatabase(c.env.DB); const newWorkflow = await createWorkflow(db, newWorkflowData); - const workflowDataFromDb = newWorkflow.data; + // Save full workflow data to R2 + const objectStore = new ObjectStore(c.env.RESSOURCES); + try { + await objectStore.writeWorkflow(workflowData); + } catch (error) { + console.error(`Failed to save workflow to R2: ${workflowId}`, error); + // Consider whether to rollback the database insert here + } const response: CreateWorkflowResponse = { id: newWorkflow.id, name: newWorkflow.name, handle: newWorkflow.handle, - type: workflowDataFromDb.type, + type: newWorkflow.type, createdAt: newWorkflow.createdAt, updatedAt: newWorkflow.updatedAt, - nodes: workflowDataFromDb.nodes, - edges: workflowDataFromDb.edges, + nodes: workflowData.nodes, + edges: workflowData.edges, }; return c.json(response, 201); @@ -167,22 +176,40 @@ workflowRoutes.get("/:id", jwtMiddleware, async (c) => { const db = createDatabase(c.env.DB); try { - // Get metadata from database for timestamps + // Get metadata from database const workflow = await getWorkflow(db, id, organizationId); if (!workflow) { return c.json({ error: "Workflow not found" }, 404); } + // Load full workflow data from R2 + const objectStore = new ObjectStore(c.env.RESSOURCES); + let workflowData; + try { + workflowData = await objectStore.readWorkflow(id); + } catch (error) { + console.error(`Failed to load workflow data from R2 for ${id}:`, error); + // Fallback to empty structure if R2 read fails + workflowData = { + id: workflow.id, + name: workflow.name, + handle: workflow.handle, + type: workflow.type, + nodes: [], + edges: [], + }; + } + const response: GetWorkflowResponse = { - id: workflow!.id, - name: workflow!.name, - handle: workflow!.handle, - type: workflow!.data.type, - createdAt: workflow?.createdAt || new Date(), - updatedAt: workflow?.updatedAt || new Date(), - nodes: workflow?.data.nodes || [], - edges: workflow?.data.edges || [], + id: workflow.id, + name: workflow.name, + handle: workflow.handle, + type: workflow.type, + createdAt: workflow.createdAt || new Date(), + updatedAt: workflow.updatedAt || new Date(), + nodes: workflowData.nodes || [], + edges: workflowData.edges || [], }; return c.json(response); @@ -222,6 +249,24 @@ workflowRoutes.put( const data = c.req.valid("json"); const now = new Date(); + // Load existing workflow data from R2 to get current state + const objectStore = new ObjectStore(c.env.RESSOURCES); + let existingWorkflowData; + try { + existingWorkflowData = await objectStore.readWorkflow(id); + } catch (error) { + console.error(`Failed to load workflow data from R2 for ${id}:`, error); + // Fallback to empty structure + existingWorkflowData = { + id: existingWorkflow.id, + name: existingWorkflow.name, + handle: existingWorkflow.handle, + type: existingWorkflow.type, + nodes: [], + edges: [], + }; + } + // Sanitize nodes to prevent saving binary data and connected values const sanitizedNodes = Array.isArray(data.nodes) ? data.nodes.map((node: any) => { @@ -249,15 +294,17 @@ workflowRoutes.put( : [], }; }) - : []; + : existingWorkflowData.nodes; const workflowToValidate = { id: existingWorkflow.id, name: data.name ?? existingWorkflow.name, handle: existingWorkflow.handle, - type: data.type || existingWorkflow.data?.type, + type: data.type || existingWorkflowData.type, nodes: sanitizedNodes, - edges: Array.isArray(data.edges) ? data.edges : [], + edges: Array.isArray(data.edges) + ? data.edges + : existingWorkflowData.edges, }; const validationErrors = validateWorkflow(workflowToValidate); if (validationErrors.length > 0) { @@ -268,28 +315,36 @@ workflowRoutes.put( id: existingWorkflow.id, name: data.name ?? existingWorkflow.name, handle: existingWorkflow.handle, - type: data.type || existingWorkflow.data?.type, + type: data.type || existingWorkflowData.type, nodes: sanitizedNodes, - edges: Array.isArray(data.edges) ? data.edges : [], + edges: Array.isArray(data.edges) + ? data.edges + : existingWorkflowData.edges, }; + // Save full workflow data to R2 + try { + await objectStore.writeWorkflow(updatedWorkflowData); + } catch (error) { + console.error(`Failed to save workflow to R2: ${id}`, error); + } + + // Update metadata in database const updatedWorkflow = await updateWorkflow(db, id, organizationId, { name: data.name, - data: updatedWorkflowData, + type: updatedWorkflowData.type, updatedAt: now, }); - const workflowDataFromDb = updatedWorkflow.data; - const response: UpdateWorkflowResponse = { id: updatedWorkflow.id, name: updatedWorkflow.name, handle: updatedWorkflow.handle, - type: workflowDataFromDb.type, + type: updatedWorkflowData.type, createdAt: updatedWorkflow.createdAt, updatedAt: updatedWorkflow.updatedAt, - nodes: workflowDataFromDb.nodes || [], - edges: workflowDataFromDb.edges || [], + nodes: updatedWorkflowData.nodes || [], + edges: updatedWorkflowData.edges || [], }; return c.json(response); @@ -317,6 +372,15 @@ workflowRoutes.delete("/:id", jwtMiddleware, async (c) => { return c.json({ error: "Failed to delete workflow" }, 500); } + // Delete workflow data from R2 + const objectStore = new ObjectStore(c.env.RESSOURCES); + try { + await objectStore.deleteWorkflow(id); + } catch (error) { + console.error(`Failed to delete workflow from R2: ${id}`, error); + // Continue even if R2 deletion fails + } + const response: DeleteWorkflowResponse = { id: deletedWorkflow.id }; return c.json(response); }); @@ -380,7 +444,7 @@ workflowRoutes.put( return c.json({ error: "Workflow not found" }, 404); } - if (workflow.data.type !== "cron") { + if (workflow.type !== "cron") { return c.json({ error: "Workflow is not a cron workflow" }, 400); } @@ -489,14 +553,25 @@ workflowRoutes.post( let workflowData; let workflow: any; let deploymentId: string | undefined; + const objectStore = new ObjectStore(c.env.RESSOURCES); if (version === "dev") { - // Fallback to database + // Load workflow metadata from database workflow = await getWorkflow(db, workflowIdOrHandle, organizationId); if (!workflow) { return c.json({ error: "Workflow not found" }, 404); } - workflowData = workflow.data; + + // Load full workflow data from R2 + try { + workflowData = await objectStore.readWorkflow(workflow.id); + } catch (error) { + console.error( + `Failed to load workflow data from R2 for ${workflow.id}:`, + error + ); + return c.json({ error: "Failed to load workflow data" }, 500); + } } else { // Get deployment based on version let deployment; @@ -525,10 +600,23 @@ workflowRoutes.post( } deploymentId = deployment.id; - workflowData = deployment.workflowData; + + // Load deployment workflow snapshot from R2 + try { + workflowData = await objectStore.readDeploymentWorkflow(deployment.id); + } catch (error) { + console.error( + `Failed to load deployment workflow from R2 for ${deployment.id}:`, + error + ); + return c.json({ error: "Failed to load deployment data" }, 500); + } + workflow = { id: deployment.workflowId, name: workflowData.name, + handle: workflowData.handle, + type: workflowData.type, }; } @@ -697,6 +785,13 @@ workflowRoutes.post( updatedAt: new Date(), }); + // Save execution data to R2 + try { + await objectStore.writeExecution(initialExecution); + } catch (error) { + console.error(`Failed to save execution to R2: ${executionId}`, error); + } + const response: ExecuteWorkflowResponse = { id: initialExecution.id, workflowId: initialExecution.workflowId, @@ -735,6 +830,25 @@ workflowRoutes.post( ); } + // Load execution data from R2 to get nodeExecutions + const objectStore = new ObjectStore(c.env.RESSOURCES); + let executionData; + try { + executionData = await objectStore.readExecution(executionId); + } catch (error) { + console.error( + `Failed to load execution data from R2 for ${executionId}:`, + error + ); + // Fallback to empty nodeExecutions if R2 read fails + executionData = { + id: executionId, + workflowId: execution.workflowId, + status: execution.status, + nodeExecutions: [], + }; + } + try { // Get the workflow instance and terminate it const instance = await c.env.EXECUTE.get(executionId); @@ -749,13 +863,23 @@ workflowRoutes.post( userId: "cancelled", // Required by SaveExecutionRecord but not stored in DB organizationId: execution.organizationId, status: ExecutionStatus.CANCELLED, - nodeExecutions: execution.data.nodeExecutions || [], + nodeExecutions: executionData.nodeExecutions || [], error: execution.error ?? "Execution cancelled by user", updatedAt: now, endedAt: now, startedAt: execution.startedAt ?? undefined, }); + // Save updated execution to R2 + try { + await objectStore.writeExecution(updatedExecution); + } catch (error) { + console.error( + `Failed to save updated execution to R2: ${executionId}`, + error + ); + } + const response: CancelWorkflowExecutionResponse = { id: updatedExecution.id, status: "cancelled", @@ -767,20 +891,30 @@ workflowRoutes.post( // If the instance doesn't exist or can't be terminated, still update the database const now = new Date(); - await saveExecution(db, { + const updatedExecution = await saveExecution(db, { id: executionId, workflowId: execution.workflowId, deploymentId: execution.deploymentId ?? undefined, userId: "cancelled", // Required by SaveExecutionRecord but not stored in DB organizationId: execution.organizationId, status: ExecutionStatus.CANCELLED, - nodeExecutions: execution.data.nodeExecutions || [], + nodeExecutions: executionData.nodeExecutions || [], error: execution.error ?? "Execution cancelled by user", updatedAt: now, endedAt: now, startedAt: execution.startedAt ?? undefined, }); + // Save updated execution to R2 + try { + await objectStore.writeExecution(updatedExecution); + } catch (error) { + console.error( + `Failed to save updated execution to R2: ${executionId}`, + error + ); + } + const response: CancelWorkflowExecutionResponse = { id: executionId, status: "cancelled", diff --git a/apps/api/src/runtime/execution-persistence.ts b/apps/api/src/runtime/execution-persistence.ts index dfb93647..974bdfcc 100644 --- a/apps/api/src/runtime/execution-persistence.ts +++ b/apps/api/src/runtime/execution-persistence.ts @@ -2,6 +2,7 @@ import type { WorkflowExecution } from "@dafthunk/types"; import type { Bindings } from "../context"; import { createDatabase, type ExecutionStatusType, saveExecution } from "../db"; +import { ObjectStore } from "./object-store"; import type { RuntimeState } from "./runtime"; /** @@ -9,7 +10,11 @@ import type { RuntimeState } from "./runtime"; * Manages database storage and WebSocket updates to sessions. */ export class ExecutionPersistence { - constructor(private env: Bindings) {} + private objectStore: ObjectStore; + + constructor(private env: Bindings) { + this.objectStore = new ObjectStore(env.RESSOURCES); + } /** * Sends execution update to workflow session via WebSocket @@ -91,7 +96,7 @@ export class ExecutionPersistence { try { const db = createDatabase(this.env.DB); - return await saveExecution(db, { + const execution = await saveExecution(db, { id: instanceId, workflowId, userId, @@ -103,6 +108,15 @@ export class ExecutionPersistence { startedAt, endedAt, }); + + // Save execution data to R2 + try { + await this.objectStore.writeExecution(execution); + } catch (error) { + console.error(`Failed to save execution to R2: ${instanceId}`, error); + } + + return execution; } catch (error) { console.error("Failed to persist execution record:", error); // Continue without interrupting the workflow. diff --git a/apps/api/src/runtime/object-store.ts b/apps/api/src/runtime/object-store.ts index a39c98ff..0c0fb9df 100644 --- a/apps/api/src/runtime/object-store.ts +++ b/apps/api/src/runtime/object-store.ts @@ -190,6 +190,50 @@ export class ObjectStore { ); } + async writeDeploymentWorkflow( + deploymentId: string, + workflow: Workflow + ): Promise { + await this.writeToR2( + `deployments/${deploymentId}/workflow.json`, + JSON.stringify(workflow), + { + httpMetadata: { + contentType: "application/json", + cacheControl: "public, max-age=31536000", + }, + customMetadata: { + deploymentId, + workflowId: workflow.id, + createdAt: new Date().toISOString(), + }, + }, + "writeDeploymentWorkflow" + ); + return deploymentId; + } + + async readDeploymentWorkflow(deploymentId: string): Promise { + const object = await this.readFromR2( + `deployments/${deploymentId}/workflow.json`, + "readDeploymentWorkflow" + ); + + if (!object) { + throw new Error(`Workflow not found for deployment: ${deploymentId}`); + } + + const text = await object.text(); + return JSON.parse(text) as Workflow; + } + + async deleteDeploymentWorkflow(deploymentId: string): Promise { + await this.deleteFromR2( + `deployments/${deploymentId}/workflow.json`, + "deleteDeploymentWorkflow" + ); + } + async writeExecution(execution: WorkflowExecution): Promise { await this.writeToR2( `executions/${execution.id}/execution.json`, From d4b36451bb42fd571774bc09d97073c9d606f3a1 Mon Sep 17 00:00:00 2001 From: leonardcser <73912641+leonardcser@users.noreply.github.com> Date: Tue, 7 Oct 2025 17:00:53 +0200 Subject: [PATCH 3/3] refactor(api): use queries functions --- apps/api/src/email.ts | 21 ++---- apps/api/src/routes/deployments.ts | 56 +++++---------- apps/api/src/routes/executions.ts | 35 +++------ apps/api/src/routes/workflows.ts | 111 ++++++++++------------------- 4 files changed, 74 insertions(+), 149 deletions(-) diff --git a/apps/api/src/email.ts b/apps/api/src/email.ts index b4bdc99b..3a9c5c49 100644 --- a/apps/api/src/email.ts +++ b/apps/api/src/email.ts @@ -4,7 +4,7 @@ import { Bindings } from "./context"; import { createDatabase, getOrganizationComputeCredits, - getWorkflow, + getWorkflowWithData, } from "./db"; import { ExecutionStatus, @@ -83,27 +83,20 @@ export async function handleIncomingEmail( let deploymentId: string | undefined; if (version === "dev") { - // Get workflow metadata from DB - workflow = await getWorkflow( + // Get workflow with data from DB and R2 + const workflowWithData = await getWorkflowWithData( db, + objectStore, workflowIdOrHandle, organizationIdOrHandle ); - if (!workflow) { + if (!workflowWithData) { console.error("Workflow not found"); return; } - // Load workflow data from R2 - try { - workflowData = await objectStore.readWorkflow(workflow.id); - } catch (error) { - console.error( - `Failed to load workflow data from R2 for ${workflow.id}:`, - error - ); - return; - } + workflow = workflowWithData; + workflowData = workflowWithData.data; } else { // Get deployment based on version let deployment; diff --git a/apps/api/src/routes/deployments.ts b/apps/api/src/routes/deployments.ts index 78bdc8ef..1f16c5cf 100644 --- a/apps/api/src/routes/deployments.ts +++ b/apps/api/src/routes/deployments.ts @@ -16,9 +16,9 @@ import { ApiContext } from "../context"; import { createDatabase, ExecutionStatus } from "../db"; import { createDeployment, - getDeployment, getDeployments, getDeploymentsGroupedByWorkflow, + getDeploymentWithData, getLatestDeployment, getLatestDeploymentsVersionNumbers, getOrganizationComputeCredits, @@ -66,26 +66,19 @@ deploymentRoutes.get("/version/:deploymentId", jwtMiddleware, async (c) => { const organizationId = c.get("organizationId")!; const deploymentId = c.req.param("deploymentId"); const db = createDatabase(c.env.DB); + const objectStore = new ObjectStore(c.env.RESSOURCES); - const deployment = await getDeployment(db, deploymentId, organizationId); + const deployment = await getDeploymentWithData( + db, + objectStore, + deploymentId, + organizationId + ); if (!deployment) { return c.json({ error: "Deployment not found" }, 404); } - // Load deployment workflow snapshot from R2 - const objectStore = new ObjectStore(c.env.RESSOURCES); - let workflowData; - try { - workflowData = await objectStore.readDeploymentWorkflow(deployment.id); - } catch (error) { - console.error( - `Failed to load deployment workflow from R2 for ${deployment.id}:`, - error - ); - return c.json({ error: "Failed to load deployment data" }, 500); - } - // Transform to match WorkflowDeploymentVersion type const deploymentVersion: GetDeploymentVersionResponse = { id: deployment.id, @@ -93,8 +86,8 @@ deploymentRoutes.get("/version/:deploymentId", jwtMiddleware, async (c) => { version: deployment.version, createdAt: deployment.createdAt, updatedAt: deployment.updatedAt, - nodes: workflowData.nodes || [], - edges: workflowData.edges || [], + nodes: deployment.workflowData.nodes || [], + edges: deployment.workflowData.edges || [], }; return c.json(deploymentVersion); @@ -109,12 +102,6 @@ deploymentRoutes.get("/:workflowIdOrHandle", jwtMiddleware, async (c) => { const workflowIdOrHandle = c.req.param("workflowIdOrHandle"); const db = createDatabase(c.env.DB); - // Check if workflow exists and belongs to the organization - const workflow = await getWorkflow(db, workflowIdOrHandle, organizationId); - if (!workflow) { - return c.json({ error: "Workflow not found" }, 404); - } - // Get the latest deployment const deployment = await getLatestDeployment( db, @@ -322,6 +309,7 @@ deploymentRoutes.post( const deploymentId = c.req.param("deploymentId"); const db = createDatabase(c.env.DB); + const objectStore = new ObjectStore(c.env.RESSOURCES); // Get organization compute credits const computeCredits = await getOrganizationComputeCredits( @@ -332,25 +320,19 @@ deploymentRoutes.post( return c.json({ error: "Organization not found" }, 404); } - // Get the deployment - const deployment = await getDeployment(db, deploymentId, organizationId); + // Get the deployment with workflow data + const deployment = await getDeploymentWithData( + db, + objectStore, + deploymentId, + organizationId + ); if (!deployment) { return c.json({ error: "Deployment not found" }, 404); } - // Load deployment workflow snapshot from R2 - const objectStore = new ObjectStore(c.env.RESSOURCES); - let workflowData; - try { - workflowData = await objectStore.readDeploymentWorkflow(deployment.id); - } catch (error) { - console.error( - `Failed to load deployment workflow from R2 for ${deployment.id}:`, - error - ); - return c.json({ error: "Failed to load deployment data" }, 500); - } + const workflowData = deployment.workflowData; // Validate if workflow has nodes if (!workflowData.nodes || workflowData.nodes.length === 0) { diff --git a/apps/api/src/routes/executions.ts b/apps/api/src/routes/executions.ts index faa5e90f..f3ca5f46 100644 --- a/apps/api/src/routes/executions.ts +++ b/apps/api/src/routes/executions.ts @@ -11,7 +11,7 @@ import { apiKeyOrJwtMiddleware, jwtMiddleware } from "../auth"; import { ApiContext } from "../context"; import { createDatabase, - getExecution, + getExecutionWithData, getWorkflowName, getWorkflowNames, listExecutions, @@ -24,9 +24,15 @@ executionRoutes.get("/:id", apiKeyOrJwtMiddleware, async (c) => { const organizationId = c.get("organizationId")!; const id = c.req.param("id"); const db = createDatabase(c.env.DB); + const objectStore = new ObjectStore(c.env.RESSOURCES); try { - const execution = await getExecution(db, id, organizationId); + const execution = await getExecutionWithData( + db, + objectStore, + id, + organizationId + ); if (!execution) { return c.json({ error: "Execution not found" }, 404); @@ -39,35 +45,16 @@ executionRoutes.get("/:id", apiKeyOrJwtMiddleware, async (c) => { organizationId ); - // Load execution data from R2 - const objectStore = new ObjectStore(c.env.RESSOURCES); - let executionData; - try { - executionData = await objectStore.readExecution(execution.id); - } catch (error) { - console.error( - `Failed to load execution data from R2 for ${execution.id}:`, - error - ); - // Fallback to empty data if R2 read fails - executionData = { - id: execution.id, - workflowId: execution.workflowId, - status: execution.status as WorkflowExecutionStatus, - nodeExecutions: [], - }; - } - const workflowExecution: WorkflowExecution = { id: execution.id, workflowId: execution.workflowId, workflowName: workflowName || "Unknown Workflow", deploymentId: execution.deploymentId ?? undefined, status: execution.status as WorkflowExecutionStatus, - nodeExecutions: executionData.nodeExecutions || [], + nodeExecutions: execution.data.nodeExecutions || [], error: execution.error || undefined, - startedAt: execution.startedAt ?? executionData.startedAt, - endedAt: execution.endedAt ?? executionData.endedAt, + startedAt: execution.startedAt ?? execution.data.startedAt, + endedAt: execution.endedAt ?? execution.data.endedAt, }; const response: GetExecutionResponse = { execution: workflowExecution }; diff --git a/apps/api/src/routes/workflows.ts b/apps/api/src/routes/workflows.ts index 8e6d6f21..f2833352 100644 --- a/apps/api/src/routes/workflows.ts +++ b/apps/api/src/routes/workflows.ts @@ -32,11 +32,12 @@ import { ExecutionStatus, getCronTrigger, getDeploymentByVersion, - getExecution, + getExecutionWithData, getLatestDeployment, getOrganizationComputeCredits, getWorkflow, getWorkflows, + getWorkflowWithData, saveExecution, updateWorkflow, upsertCronTrigger as upsertDbCronTrigger, @@ -174,33 +175,20 @@ workflowRoutes.get("/:id", jwtMiddleware, async (c) => { } const db = createDatabase(c.env.DB); + const objectStore = new ObjectStore(c.env.RESSOURCES); try { - // Get metadata from database - const workflow = await getWorkflow(db, id, organizationId); + const workflow = await getWorkflowWithData( + db, + objectStore, + id, + organizationId + ); if (!workflow) { return c.json({ error: "Workflow not found" }, 404); } - // Load full workflow data from R2 - const objectStore = new ObjectStore(c.env.RESSOURCES); - let workflowData; - try { - workflowData = await objectStore.readWorkflow(id); - } catch (error) { - console.error(`Failed to load workflow data from R2 for ${id}:`, error); - // Fallback to empty structure if R2 read fails - workflowData = { - id: workflow.id, - name: workflow.name, - handle: workflow.handle, - type: workflow.type, - nodes: [], - edges: [], - }; - } - const response: GetWorkflowResponse = { id: workflow.id, name: workflow.name, @@ -208,8 +196,8 @@ workflowRoutes.get("/:id", jwtMiddleware, async (c) => { type: workflow.type, createdAt: workflow.createdAt || new Date(), updatedAt: workflow.updatedAt || new Date(), - nodes: workflowData.nodes || [], - edges: workflowData.edges || [], + nodes: workflow.data.nodes || [], + edges: workflow.data.edges || [], }; return c.json(response); @@ -237,10 +225,16 @@ workflowRoutes.put( async (c) => { const id = c.req.param("id"); const db = createDatabase(c.env.DB); + const objectStore = new ObjectStore(c.env.RESSOURCES); const organizationId = c.get("organizationId")!; - const existingWorkflow = await getWorkflow(db, id, organizationId); + const existingWorkflow = await getWorkflowWithData( + db, + objectStore, + id, + organizationId + ); if (!existingWorkflow) { return c.json({ error: "Workflow not found" }, 404); @@ -248,24 +242,7 @@ workflowRoutes.put( const data = c.req.valid("json"); const now = new Date(); - - // Load existing workflow data from R2 to get current state - const objectStore = new ObjectStore(c.env.RESSOURCES); - let existingWorkflowData; - try { - existingWorkflowData = await objectStore.readWorkflow(id); - } catch (error) { - console.error(`Failed to load workflow data from R2 for ${id}:`, error); - // Fallback to empty structure - existingWorkflowData = { - id: existingWorkflow.id, - name: existingWorkflow.name, - handle: existingWorkflow.handle, - type: existingWorkflow.type, - nodes: [], - edges: [], - }; - } + const existingWorkflowData = existingWorkflow.data; // Sanitize nodes to prevent saving binary data and connected values const sanitizedNodes = Array.isArray(data.nodes) @@ -556,22 +533,19 @@ workflowRoutes.post( const objectStore = new ObjectStore(c.env.RESSOURCES); if (version === "dev") { - // Load workflow metadata from database - workflow = await getWorkflow(db, workflowIdOrHandle, organizationId); - if (!workflow) { + // Load workflow with data from database and R2 + const workflowWithData = await getWorkflowWithData( + db, + objectStore, + workflowIdOrHandle, + organizationId + ); + if (!workflowWithData) { return c.json({ error: "Workflow not found" }, 404); } - // Load full workflow data from R2 - try { - workflowData = await objectStore.readWorkflow(workflow.id); - } catch (error) { - console.error( - `Failed to load workflow data from R2 for ${workflow.id}:`, - error - ); - return c.json({ error: "Failed to load workflow data" }, 500); - } + workflow = workflowWithData; + workflowData = workflowWithData.data; } else { // Get deployment based on version let deployment; @@ -813,9 +787,15 @@ workflowRoutes.post( const organizationId = c.get("organizationId")!; const executionId = c.req.param("executionId"); const db = createDatabase(c.env.DB); + const objectStore = new ObjectStore(c.env.RESSOURCES); // Get the execution to verify it exists and belongs to this organization - const execution = await getExecution(db, executionId, organizationId); + const execution = await getExecutionWithData( + db, + objectStore, + executionId, + organizationId + ); if (!execution) { return c.json({ error: "Execution not found" }, 404); } @@ -830,24 +810,7 @@ workflowRoutes.post( ); } - // Load execution data from R2 to get nodeExecutions - const objectStore = new ObjectStore(c.env.RESSOURCES); - let executionData; - try { - executionData = await objectStore.readExecution(executionId); - } catch (error) { - console.error( - `Failed to load execution data from R2 for ${executionId}:`, - error - ); - // Fallback to empty nodeExecutions if R2 read fails - executionData = { - id: executionId, - workflowId: execution.workflowId, - status: execution.status, - nodeExecutions: [], - }; - } + const executionData = execution.data; try { // Get the workflow instance and terminate it