Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
312 changes: 311 additions & 1 deletion apps/sim/app/api/workflows/[id]/state/route.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { db } from '@sim/db'
import { workflow } from '@sim/db/schema'
import { webhook, workflow, workflowSchedule } from '@sim/db/schema'
import { eq } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { z } from 'zod'
Expand All @@ -10,6 +10,13 @@ import { generateRequestId } from '@/lib/utils'
import { extractAndPersistCustomTools } from '@/lib/workflows/custom-tools-persistence'
import { saveWorkflowToNormalizedTables } from '@/lib/workflows/db-helpers'
import { getWorkflowAccessContext } from '@/lib/workflows/utils'
import { getTrigger } from '@/triggers'
import {
calculateNextRunTime,
generateCronExpression,
getScheduleTimeValues,
validateCronExpression,
} from '@/lib/schedules/utils'
import { sanitizeAgentToolsInBlocks } from '@/lib/workflows/validation'
import type { BlockState } from '@/stores/workflows/workflow/types'
import { generateLoopBlocks, generateParallelBlocks } from '@/stores/workflows/workflow/utils'
Expand Down Expand Up @@ -202,6 +209,9 @@ export async function PUT(request: NextRequest, { params }: { params: Promise<{
)
}

await syncWorkflowWebhooks(workflowId, workflowState.blocks)
await syncWorkflowSchedules(workflowId, workflowState.blocks)

// Extract and persist custom tools to database
try {
const workspaceId = workflowData.workspaceId
Expand Down Expand Up @@ -287,3 +297,303 @@ export async function PUT(request: NextRequest, { params }: { params: Promise<{
return NextResponse.json({ error: 'Internal server error' }, { status: 500 })
}
}

function getSubBlockValue<T = unknown>(block: BlockState, subBlockId: string): T | undefined {
const value = block.subBlocks?.[subBlockId]?.value
if (value === undefined || value === null) {
return undefined
}
return value as T
}

async function syncWorkflowWebhooks(
workflowId: string,
blocks: Record<string, any>
): Promise<void> {
await syncBlockResources(workflowId, blocks, {
resourceName: 'webhook',
subBlockId: 'webhookId',
buildMetadata: buildWebhookMetadata,
applyMetadata: upsertWebhookRecord,
})
}

type ScheduleBlockInput = Parameters<typeof getScheduleTimeValues>[0]

async function syncWorkflowSchedules(
workflowId: string,
blocks: Record<string, any>
): Promise<void> {
await syncBlockResources(workflowId, blocks, {
resourceName: 'schedule',
subBlockId: 'scheduleId',
buildMetadata: buildScheduleMetadata,
applyMetadata: upsertScheduleRecord,
})
}

interface ScheduleMetadata {
cronExpression: string | null
nextRunAt: Date | null
timezone: string
}

function buildScheduleMetadata(block: BlockState): ScheduleMetadata | null {
const scheduleType = getSubBlockValue<string>(block, 'scheduleType') || 'daily'
const scheduleBlock = convertToScheduleBlock(block)

const scheduleValues = getScheduleTimeValues(scheduleBlock)
const sanitizedValues =
scheduleType !== 'custom' ? { ...scheduleValues, cronExpression: null } : scheduleValues

try {
const cronExpression = generateCronExpression(scheduleType, sanitizedValues)
const timezone = scheduleValues.timezone || 'UTC'

if (cronExpression) {
const validation = validateCronExpression(cronExpression, timezone)
if (!validation.isValid) {
logger.warn('Invalid cron expression while syncing schedule', {
blockId: block.id,
cronExpression,
error: validation.error,
})
return null
}
}

const nextRunAt = calculateNextRunTime(scheduleType, sanitizedValues)

return {
cronExpression,
timezone,
nextRunAt,
}
} catch (error) {
logger.error('Failed to build schedule metadata during sync', {
blockId: block.id,
error,
})
return null
}
}

function convertToScheduleBlock(block: BlockState): ScheduleBlockInput {
const subBlocks: ScheduleBlockInput['subBlocks'] = {}

Object.entries(block.subBlocks || {}).forEach(([id, subBlock]) => {
subBlocks[id] = { value: stringifySubBlockValue(subBlock?.value) }
})

return {
type: block.type,
subBlocks,
}
}

interface WebhookMetadata {
triggerPath: string
provider: string | null
providerConfig: Record<string, any>
}

function buildWebhookMetadata(block: BlockState): WebhookMetadata | null {
const triggerId =
getSubBlockValue<string>(block, 'triggerId') ||
getSubBlockValue<string>(block, 'selectedTriggerId')
const triggerConfig = getSubBlockValue<Record<string, any>>(block, 'triggerConfig') || {}
const triggerCredentials = getSubBlockValue<string>(block, 'triggerCredentials')
const triggerPath = getSubBlockValue<string>(block, 'triggerPath') || block.id

const triggerDef = triggerId ? getTrigger(triggerId) : undefined
const provider = triggerDef?.provider || null

const providerConfig = {
...(typeof triggerConfig === 'object' ? triggerConfig : {}),
...(triggerCredentials ? { credentialId: triggerCredentials } : {}),
...(triggerId ? { triggerId } : {}),
}

return {
triggerPath,
provider,
providerConfig,
}
}

async function upsertWebhookRecord(
workflowId: string,
block: BlockState,
webhookId: string,
metadata: WebhookMetadata
): Promise<void> {
const [existing] = await db.select().from(webhook).where(eq(webhook.id, webhookId)).limit(1)

if (existing) {
const needsUpdate =
existing.blockId !== block.id ||
existing.workflowId !== workflowId ||
existing.path !== metadata.triggerPath

if (needsUpdate) {
await db
.update(webhook)
.set({
workflowId,
blockId: block.id,
path: metadata.triggerPath,
provider: metadata.provider || existing.provider,
providerConfig: Object.keys(metadata.providerConfig).length
? metadata.providerConfig
: existing.providerConfig,
isActive: true,
updatedAt: new Date(),
})
.where(eq(webhook.id, webhookId))
}
return
}

await db.insert(webhook).values({
id: webhookId,
workflowId,
blockId: block.id,
path: metadata.triggerPath,
provider: metadata.provider,
providerConfig: metadata.providerConfig,
isActive: true,
createdAt: new Date(),
updatedAt: new Date(),
})

logger.info('Recreated missing webhook after workflow save', {
workflowId,
blockId: block.id,
webhookId,
})
}

async function upsertScheduleRecord(
workflowId: string,
block: BlockState,
scheduleId: string,
metadata: ScheduleMetadata
): Promise<void> {
const now = new Date()
const [existing] = await db
.select({
id: workflowSchedule.id,
nextRunAt: workflowSchedule.nextRunAt,
})
.from(workflowSchedule)
.where(eq(workflowSchedule.id, scheduleId))
.limit(1)

if (existing) {
await db
.update(workflowSchedule)
.set({
workflowId,
blockId: block.id,
cronExpression: metadata.cronExpression,
nextRunAt: metadata.nextRunAt ?? existing.nextRunAt,
timezone: metadata.timezone,
updatedAt: now,
})
.where(eq(workflowSchedule.id, scheduleId))
return
}

await db.insert(workflowSchedule).values({
id: scheduleId,
workflowId,
blockId: block.id,
cronExpression: metadata.cronExpression,
nextRunAt: metadata.nextRunAt ?? null,
triggerType: 'schedule',
timezone: metadata.timezone,
status: 'active',
failedCount: 0,
createdAt: now,
updatedAt: now,
})

logger.info('Recreated missing schedule after workflow save', {
workflowId,
blockId: block.id,
scheduleId,
})
}

interface BlockResourceSyncConfig<T> {
resourceName: string
subBlockId: string
buildMetadata: (block: BlockState, resourceId: string) => T | null
applyMetadata: (
workflowId: string,
block: BlockState,
resourceId: string,
metadata: T
) => Promise<void>
}

async function syncBlockResources<T>(
workflowId: string,
blocks: Record<string, any>,
config: BlockResourceSyncConfig<T>
): Promise<void> {
const blockEntries = Object.values(blocks || {}).filter(Boolean) as BlockState[]
if (blockEntries.length === 0) return

for (const block of blockEntries) {
const resourceId = getSubBlockValue<string>(block, config.subBlockId)
if (!resourceId) continue

const metadata = config.buildMetadata(block, resourceId)
if (!metadata) {
logger.warn(`Skipping ${config.resourceName} sync due to invalid configuration`, {
workflowId,
blockId: block.id,
resourceId,
resourceName: config.resourceName,
})
continue
}

try {
await config.applyMetadata(workflowId, block, resourceId, metadata)
} catch (error) {
logger.error(`Failed to sync ${config.resourceName}`, {
workflowId,
blockId: block.id,
resourceId,
resourceName: config.resourceName,
error,
})
}
}
}

function stringifySubBlockValue(value: unknown): string {
if (value === undefined || value === null) {
return ''
}

if (typeof value === 'string') {
return value
}

if (typeof value === 'number' || typeof value === 'boolean') {
return String(value)
}

if (value instanceof Date) {
return value.toISOString()
}

try {
return JSON.stringify(value)
} catch {
return String(value)
}
}
Loading
Loading