Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 36 additions & 5 deletions apps/api/src/cron.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
saveExecution,
updateCronTriggerRunTimes,
} from "./db/queries";
import { ObjectStore } from "./runtime/object-store";

// This function will now handle the actual execution triggering and initial record saving
async function executeWorkflow(
Expand All @@ -22,7 +23,8 @@ async function executeWorkflow(
deploymentId: string | undefined,
db: ReturnType<typeof createDatabase>,
env: Bindings,
_ctx: ExecutionContext
_ctx: ExecutionContext,
objectStore: ObjectStore
): Promise<void> {
console.log(`Attempting to execute workflow ${workflowInfo.id} via cron.`);

Expand Down Expand Up @@ -64,7 +66,7 @@ async function executeWorkflow(
status: "idle" as const,
}));

await saveExecution(db, {
const initialExecution = await saveExecution(db, {
id: executionId,
workflowId: workflowInfo.id,
deploymentId: deploymentId,
Expand All @@ -77,6 +79,13 @@ async function executeWorkflow(
startedAt: new Date(),
});
console.log(`Initial execution record saved for ${executionId}`);

// Save execution data to R2
try {
await objectStore.writeExecution(initialExecution);
} catch (error) {
console.error(`Failed to save execution to R2: ${executionId}`, error);
}
} catch (execError) {
console.error(
`Error executing workflow ${workflowInfo.id} or saving initial record:`,
Expand All @@ -92,6 +101,7 @@ export async function handleCronTriggers(
): Promise<void> {
console.log(`Cron event triggered at: ${new Date(event.scheduledTime)}`);
const db = createDatabase(env.DB);
const objectStore = new ObjectStore(env.RESSOURCES);
const now = new Date();

try {
Expand All @@ -117,9 +127,29 @@ export async function handleCronTriggers(

try {
if (versionAlias === "dev") {
workflowToExecute = workflow.data;
// Load workflow data from R2
try {
workflowToExecute = await objectStore.readWorkflow(workflow.id);
} catch (error) {
console.error(
`Failed to load workflow data from R2 for ${workflow.id}:`,
error
);
continue;
}
} else if (deployment) {
workflowToExecute = deployment.workflowData;
// Load deployment workflow snapshot from R2
try {
workflowToExecute = await objectStore.readDeploymentWorkflow(
deployment.id
);
} catch (error) {
console.error(
`Failed to load deployment workflow data from R2 for ${deployment.id}:`,
error
);
continue;
}
deploymentIdToExecute = deployment.id;
} else {
console.error(
Expand All @@ -142,7 +172,8 @@ export async function handleCronTriggers(
deploymentIdToExecute,
db,
env,
ctx
ctx,
objectStore
);
} else {
// This case should ideally not be reached due to the checks above,
Expand Down
25 changes: 25 additions & 0 deletions apps/api/src/db/migrations/0012_migrate_to_r2.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
PRAGMA foreign_keys=OFF;--> statement-breakpoint
CREATE TABLE `__new_secrets` (
`id` text PRIMARY KEY NOT NULL,
`name` text NOT NULL,
`encrypted_value` text NOT NULL,
`organization_id` text NOT NULL,
`created_at` integer DEFAULT CURRENT_TIMESTAMP NOT NULL,
`updated_at` integer DEFAULT CURRENT_TIMESTAMP NOT NULL,
FOREIGN KEY (`organization_id`) REFERENCES `organizations`(`id`) ON UPDATE no action ON DELETE cascade
);
--> statement-breakpoint
INSERT INTO `__new_secrets`("id", "name", "encrypted_value", "organization_id", "created_at", "updated_at") SELECT "id", "name", "encrypted_value", "organization_id", "created_at", "updated_at" FROM `secrets`;--> statement-breakpoint
DROP TABLE `secrets`;--> statement-breakpoint
ALTER TABLE `__new_secrets` RENAME TO `secrets`;--> statement-breakpoint
PRAGMA foreign_keys=ON;--> statement-breakpoint
CREATE INDEX `secrets_name_idx` ON `secrets` (`name`);--> statement-breakpoint
CREATE INDEX `secrets_organization_id_idx` ON `secrets` (`organization_id`);--> statement-breakpoint
CREATE INDEX `secrets_created_at_idx` ON `secrets` (`created_at`);--> statement-breakpoint
CREATE UNIQUE INDEX `secrets_organization_id_name_unique_idx` ON `secrets` (`organization_id`,`name`);--> statement-breakpoint
DROP INDEX `executions_visibility_idx`;--> statement-breakpoint
ALTER TABLE `executions` DROP COLUMN `data`;--> statement-breakpoint
ALTER TABLE `executions` DROP COLUMN `visibility`;--> statement-breakpoint
ALTER TABLE `executions` DROP COLUMN `og_image_generated`;--> statement-breakpoint
ALTER TABLE `deployments` DROP COLUMN `workflow_data`;--> statement-breakpoint
ALTER TABLE `workflows` DROP COLUMN `data`;
Loading