diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 03fd2313c9..2313594b54 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -145,6 +145,71 @@ jobs: - name: Run Unit Tests run: pnpm test --filter='!./docs' --filter='!./workbench/vitest' + chaos-e2e-vercel: + name: Chaos E2E (${{ matrix.chaos_mode }} / ${{ matrix.app.name }}) + runs-on: ubuntu-latest + timeout-minutes: 45 + # Skip when workflow-server-test label is present (cross-repo testing). + # In that scenario, normal E2E tests run against a workflow-server preview + # deployment. Running chaos tests against the same deployment could cause + # collateral failures (especially with future chaos modes like process crash). + if: ${{ !contains(github.event.pull_request.labels.*.name, 'workflow-server-test') }} + strategy: + fail-fast: false + matrix: + chaos_mode: [random-500, random-429] + app: + - name: "nextjs-turbopack" + project-id: "prj_yjkM7UdHliv8bfxZ1sMJQf1pMpdi" + project-slug: "example-nextjs-workflow-turbopack" + - name: "hono" + project-id: "prj_p0GIEsfl53L7IwVbosPvi9rPSOYW" + project-slug: "workbench-hono-workflow" + env: + TURBO_TOKEN: ${{ secrets.TURBO_TOKEN }} + TURBO_TEAM: ${{ vars.TURBO_TEAM }} + WORKFLOW_PUBLIC_MANIFEST: '1' + WORKFLOW_CHAOS: ${{ matrix.chaos_mode }} + WORKFLOW_CHAOS_SEED: ${{ github.sha }} + steps: + - name: Checkout Repo + uses: actions/checkout@v4 + with: + ref: ${{ github.event.pull_request.head.sha || github.sha }} + + - name: Setup environment + uses: ./.github/actions/setup-workflow-dev + with: + build-packages: 'false' + + - name: Build CLI + run: pnpm turbo run build --filter='@workflow/cli' + + - name: Waiting for the Vercel deployment + id: waitForDeployment + uses: ./.github/actions/wait-for-vercel-project + with: + team-id: "team_nO2mCG4W8IxPIeKoSsqwAxxB" + project-id: ${{ matrix.app.project-id }} + vercel-token: ${{ secrets.VERCEL_LABS_TOKEN }} + timeout: 1000 + check-interval: 15 + environment: ${{ github.ref == 'refs/heads/main' && 'production' || 'preview' }} + + - name: Run E2E Tests (Chaos) + run: pnpm run test:e2e --reporter=default + env: + NODE_OPTIONS: "--enable-source-maps" + DEPLOYMENT_URL: ${{ steps.waitForDeployment.outputs.deployment-url }} + VERCEL_DEPLOYMENT_ID: ${{ steps.waitForDeployment.outputs.deployment-id }} + APP_NAME: ${{ matrix.app.name }} + WORKFLOW_VERCEL_ENV: ${{ github.ref == 'refs/heads/main' && 'production' || 'preview' }} + WORKFLOW_VERCEL_AUTH_TOKEN: ${{ secrets.VERCEL_LABS_TOKEN }} + WORKFLOW_VERCEL_TEAM: "team_nO2mCG4W8IxPIeKoSsqwAxxB" + WORKFLOW_VERCEL_PROJECT: ${{ matrix.app.project-id }} + WORKFLOW_VERCEL_PROJECT_SLUG: ${{ matrix.app.project-slug }} + VERCEL_AUTOMATION_BYPASS_SECRET: ${{ secrets.VERCEL_AUTOMATION_BYPASS_SECRET }} + build-error-messages: name: Node.js Module Build Errors Test runs-on: ubuntu-latest diff --git a/packages/core/src/runtime.ts b/packages/core/src/runtime.ts index ad3df0059a..5e2fb83c06 100644 --- a/packages/core/src/runtime.ts +++ b/packages/core/src/runtime.ts @@ -2,6 +2,7 @@ import { WorkflowAPIError, WorkflowRuntimeError } from '@workflow/errors'; import { parseWorkflowName } from '@workflow/utils/parse-name'; import { type Event, + requestContext, SPEC_VERSION_CURRENT, WorkflowInvokePayloadSchema, type WorkflowRun, @@ -102,307 +103,182 @@ export function workflowEntrypoint( traceCarrier: traceContext, requestedAt, serverErrorRetryCount, + chaos, + chaosSeed, } = WorkflowInvokePayloadSchema.parse(message_); // Extract the workflow name from the topic name const workflowName = metadata.queueName.slice('__wkf_workflow_'.length); const spanLinks = await linkToCurrentContext(); - // Invoke user workflow within the propagated trace context and baggage - return await withTraceContext(traceContext, async () => { - // Set workflow context as baggage for automatic propagation - return await withWorkflowBaggage( - { workflowRunId: runId, workflowName }, - async () => { - const world = getWorld(); - return trace( - `WORKFLOW ${workflowName}`, - { links: spanLinks }, - async (span) => { - span?.setAttributes({ - ...Attribute.WorkflowName(workflowName), - ...Attribute.WorkflowOperation('execute'), - // Standard OTEL messaging conventions - ...Attribute.MessagingSystem('vercel-queue'), - ...Attribute.MessagingDestinationName(metadata.queueName), - ...Attribute.MessagingMessageId(metadata.messageId), - ...Attribute.MessagingOperationType('process'), - ...getQueueOverhead({ requestedAt }), - }); - - // TODO: validate `workflowName` exists before consuming message? - - span?.setAttributes({ - ...Attribute.WorkflowRunId(runId), - ...Attribute.WorkflowTracePropagated(!!traceContext), - }); - - return await withThrottleRetry(async () => { - let workflowStartedAt = -1; - let workflowRun = await world.runs.get(runId); - try { - if (workflowRun.status === 'pending') { - // Transition run to 'running' via event (event-sourced architecture) - const result = await world.events.create(runId, { - eventType: 'run_started', - specVersion: SPEC_VERSION_CURRENT, - }); - // Use the run entity from the event response (no extra get call needed) - if (!result.run) { - throw new WorkflowRuntimeError( - `Event creation for 'run_started' did not return the run entity for run "${runId}"` - ); - } - workflowRun = result.run; - } + // Enter chaos request context so World implementations can apply + // chaos behavior (e.g., routing to chaos server, injecting failures). + // Only enter the ALS when chaos config is present to avoid returning + // a truthy empty object from getRequestContext() in non-chaos cases. + const runHandler = () => + // Invoke user workflow within the propagated trace context and baggage + withTraceContext(traceContext, async () => { + // Set workflow context as baggage for automatic propagation + return await withWorkflowBaggage( + { workflowRunId: runId, workflowName }, + async () => { + const world = getWorld(); + return trace( + `WORKFLOW ${workflowName}`, + { links: spanLinks }, + async (span) => { + span?.setAttributes({ + ...Attribute.WorkflowName(workflowName), + ...Attribute.WorkflowOperation('execute'), + // Standard OTEL messaging conventions + ...Attribute.MessagingSystem('vercel-queue'), + ...Attribute.MessagingDestinationName(metadata.queueName), + ...Attribute.MessagingMessageId(metadata.messageId), + ...Attribute.MessagingOperationType('process'), + ...getQueueOverhead({ requestedAt }), + }); - // At this point, the workflow is "running" and `startedAt` should - // definitely be set. - if (!workflowRun.startedAt) { - throw new WorkflowRuntimeError( - `Workflow run "${runId}" has no "startedAt" timestamp` - ); - } - workflowStartedAt = +workflowRun.startedAt; - - span?.setAttributes({ - ...Attribute.WorkflowRunStatus(workflowRun.status), - ...Attribute.WorkflowStartedAt(workflowStartedAt), - }); - - if (workflowRun.status !== 'running') { - // Workflow has already completed or failed, so we can skip it - runtimeLogger.info( - 'Workflow already completed or failed, skipping', - { - workflowRunId: runId, - status: workflowRun.status, - } - ); + // TODO: validate `workflowName` exists before consuming message? - // TODO: for `cancel`, we actually want to propagate a WorkflowCancelled event - // inside the workflow context so the user can gracefully exit. this is SIGTERM - // TODO: furthermore, there should be a timeout or a way to force cancel SIGKILL - // so that we actually exit here without replaying the workflow at all, in the case - // the replaying the workflow is itself failing. + span?.setAttributes({ + ...Attribute.WorkflowRunId(runId), + ...Attribute.WorkflowTracePropagated(!!traceContext), + }); - return; - } - - // Load all events into memory before running - const events = await getAllWorkflowRunEvents( - workflowRun.runId - ); - - // Check for any elapsed waits and create wait_completed events - const now = Date.now(); - - // Pre-compute completed correlation IDs for O(n) lookup instead of O(n²) - const completedWaitIds = new Set( - events - .filter((e) => e.eventType === 'wait_completed') - .map((e) => e.correlationId) - ); - - // Collect all waits that need completion - const waitsToComplete = events - .filter( - (e): e is typeof e & { correlationId: string } => - e.eventType === 'wait_created' && - e.correlationId !== undefined && - !completedWaitIds.has(e.correlationId) && - now >= (e.eventData.resumeAt as Date).getTime() - ) - .map((e) => ({ - eventType: 'wait_completed' as const, - specVersion: SPEC_VERSION_CURRENT, - correlationId: e.correlationId, - })); - - // Create all wait_completed events - for (const waitEvent of waitsToComplete) { - try { - const result = await world.events.create( - runId, - waitEvent - ); - // Add the event to the events array so the workflow can see it - events.push(result.event!); - } catch (err) { - if (WorkflowAPIError.is(err) && err.status === 409) { - runtimeLogger.info( - 'Wait already completed, skipping', - { - workflowRunId: runId, - correlationId: waitEvent.correlationId, - } + return await withThrottleRetry(async () => { + let workflowStartedAt = -1; + let workflowRun = await world.runs.get(runId); + try { + if (workflowRun.status === 'pending') { + // Transition run to 'running' via event (event-sourced architecture) + const result = await world.events.create(runId, { + eventType: 'run_started', + specVersion: SPEC_VERSION_CURRENT, + }); + // Use the run entity from the event response (no extra get call needed) + if (!result.run) { + throw new WorkflowRuntimeError( + `Event creation for 'run_started' did not return the run entity for run "${runId}"` ); - continue; } - throw err; + workflowRun = result.run; } - } - const result = await trace( - 'workflow.replay', - {}, - async (replaySpan) => { - replaySpan?.setAttributes({ - ...Attribute.WorkflowEventsCount(events.length), - }); - // Resolve the encryption key for this run's deployment - const rawKey = - await world.getEncryptionKeyForRun?.(workflowRun); - const encryptionKey = rawKey - ? await importKey(rawKey) - : undefined; - return await runWorkflow( - workflowCode, - workflowRun, - events, - encryptionKey + // At this point, the workflow is "running" and `startedAt` should + // definitely be set. + if (!workflowRun.startedAt) { + throw new WorkflowRuntimeError( + `Workflow run "${runId}" has no "startedAt" timestamp` ); } - ); + workflowStartedAt = +workflowRun.startedAt; - // Complete the workflow run via event (event-sourced architecture) - try { - await world.events.create(runId, { - eventType: 'run_completed', - specVersion: SPEC_VERSION_CURRENT, - eventData: { - output: result, - }, + span?.setAttributes({ + ...Attribute.WorkflowRunStatus(workflowRun.status), + ...Attribute.WorkflowStartedAt(workflowStartedAt), }); - } catch (err) { - if ( - WorkflowAPIError.is(err) && - (err.status === 409 || err.status === 410) - ) { - runtimeLogger.warn( - 'Tried completing workflow run, but run has already finished.', + + if (workflowRun.status !== 'running') { + // Workflow has already completed or failed, so we can skip it + runtimeLogger.info( + 'Workflow already completed or failed, skipping', { workflowRunId: runId, - message: err.message, + status: workflowRun.status, } ); + + // TODO: for `cancel`, we actually want to propagate a WorkflowCancelled event + // inside the workflow context so the user can gracefully exit. this is SIGTERM + // TODO: furthermore, there should be a timeout or a way to force cancel SIGKILL + // so that we actually exit here without replaying the workflow at all, in the case + // the replaying the workflow is itself failing. + return; - } else { - throw err; } - } - span?.setAttributes({ - ...Attribute.WorkflowRunStatus('completed'), - ...Attribute.WorkflowEventsCount(events.length), - }); - } catch (err) { - if (WorkflowSuspension.is(err)) { - const suspensionMessage = buildWorkflowSuspensionMessage( - runId, - err.stepCount, - err.hookCount, - err.waitCount + // Load all events into memory before running + const events = await getAllWorkflowRunEvents( + workflowRun.runId ); - if (suspensionMessage) { - runtimeLogger.debug(suspensionMessage); - } - const result = await handleSuspension({ - suspension: err, - world, - run: workflowRun, - span, - }); + // Check for any elapsed waits and create wait_completed events + const now = Date.now(); - if (result.timeoutSeconds !== undefined) { - return { timeoutSeconds: result.timeoutSeconds }; - } - } else { - // Retry server errors (5xx) with exponential backoff before failing the run - if ( - WorkflowAPIError.is(err) && - err.status !== undefined && - err.status >= 500 - ) { - const retryCount = serverErrorRetryCount ?? 0; - const delaySecondSteps = [5, 30, 120]; // 5s, 30s, 120s - if (retryCount < delaySecondSteps.length) { - runtimeLogger.warn( - 'Server error (5xx), re-enqueueing workflow with backoff', - { - workflowRunId: runId, - retryCount, - delaySeconds: delaySecondSteps[retryCount], - error: err.message, - } - ); - await queueMessage( - world, - getWorkflowQueueName(workflowName), - { - runId, - serverErrorRetryCount: retryCount + 1, - traceCarrier: await serializeTraceCarrier(), - requestedAt: new Date(), - }, - { delaySeconds: delaySecondSteps[retryCount] } - ); - return; // Don't fail the run, retry later - } - // Fall through to run_failed after exhausting retries - } else if ( - WorkflowAPIError.is(err) && - err.status === 429 - ) { - // Throw to let withThrottleRetry handle it - throw err; - } - - // NOTE: this error could be an error thrown in user code, or could also be a WorkflowRuntimeError - // (for instance when the event log is corrupted, this is thrown by the event consumer). We could - // specially handle these if needed. + // Pre-compute completed correlation IDs for O(n) lookup instead of O(n²) + const completedWaitIds = new Set( + events + .filter((e) => e.eventType === 'wait_completed') + .map((e) => e.correlationId) + ); - // Record exception for OTEL error tracking - if (err instanceof Error) { - span?.recordException?.(err); - } + // Collect all waits that need completion + const waitsToComplete = events + .filter( + (e): e is typeof e & { correlationId: string } => + e.eventType === 'wait_created' && + e.correlationId !== undefined && + !completedWaitIds.has(e.correlationId) && + now >= (e.eventData.resumeAt as Date).getTime() + ) + .map((e) => ({ + eventType: 'wait_completed' as const, + specVersion: SPEC_VERSION_CURRENT, + correlationId: e.correlationId, + })); - const normalizedError = await normalizeUnknownError(err); - const errorName = - normalizedError.name || getErrorName(err); - const errorMessage = normalizedError.message; - let errorStack = - normalizedError.stack || getErrorStack(err); - - // Remap error stack using source maps to show original source locations - if (errorStack) { - const parsedName = parseWorkflowName(workflowName); - const filename = - parsedName?.moduleSpecifier || workflowName; - errorStack = remapErrorStack( - errorStack, - filename, - workflowCode - ); + // Create all wait_completed events + for (const waitEvent of waitsToComplete) { + try { + const result = await world.events.create( + runId, + waitEvent + ); + // Add the event to the events array so the workflow can see it + events.push(result.event!); + } catch (err) { + if (WorkflowAPIError.is(err) && err.status === 409) { + runtimeLogger.info( + 'Wait already completed, skipping', + { + workflowRunId: runId, + correlationId: waitEvent.correlationId, + } + ); + continue; + } + throw err; + } } - runtimeLogger.error('Error while running workflow', { - workflowRunId: runId, - errorName, - errorStack, - }); + const result = await trace( + 'workflow.replay', + {}, + async (replaySpan) => { + replaySpan?.setAttributes({ + ...Attribute.WorkflowEventsCount(events.length), + }); + // Resolve the encryption key for this run's deployment + const rawKey = + await world.getEncryptionKeyForRun?.(workflowRun); + const encryptionKey = rawKey + ? await importKey(rawKey) + : undefined; + return await runWorkflow( + workflowCode, + workflowRun, + events, + encryptionKey + ); + } + ); - // Fail the workflow run via event (event-sourced architecture) + // Complete the workflow run via event (event-sourced architecture) try { await world.events.create(runId, { - eventType: 'run_failed', + eventType: 'run_completed', specVersion: SPEC_VERSION_CURRENT, eventData: { - error: { - message: errorMessage, - stack: errorStack, - }, - // TODO: include error codes when we define them + output: result, }, }); } catch (err) { @@ -411,17 +287,12 @@ export function workflowEntrypoint( (err.status === 409 || err.status === 410) ) { runtimeLogger.warn( - 'Tried failing workflow run, but run has already finished.', + 'Tried completing workflow run, but run has already finished.', { workflowRunId: runId, message: err.message, } ); - span?.setAttributes({ - ...Attribute.WorkflowErrorName(errorName), - ...Attribute.WorkflowErrorMessage(errorMessage), - ...Attribute.ErrorType(errorName), - }); return; } else { throw err; @@ -429,19 +300,167 @@ export function workflowEntrypoint( } span?.setAttributes({ - ...Attribute.WorkflowRunStatus('failed'), - ...Attribute.WorkflowErrorName(errorName), - ...Attribute.WorkflowErrorMessage(errorMessage), - ...Attribute.ErrorType(errorName), + ...Attribute.WorkflowRunStatus('completed'), + ...Attribute.WorkflowEventsCount(events.length), }); + } catch (err) { + if (WorkflowSuspension.is(err)) { + const suspensionMessage = + buildWorkflowSuspensionMessage( + runId, + err.stepCount, + err.hookCount, + err.waitCount + ); + if (suspensionMessage) { + runtimeLogger.debug(suspensionMessage); + } + + const result = await handleSuspension({ + suspension: err, + world, + run: workflowRun, + span, + chaos, + chaosSeed, + }); + + if (result.timeoutSeconds !== undefined) { + return { timeoutSeconds: result.timeoutSeconds }; + } + } else { + // Retry server errors (5xx) with exponential backoff before failing the run + if ( + WorkflowAPIError.is(err) && + err.status !== undefined && + err.status >= 500 + ) { + const retryCount = serverErrorRetryCount ?? 0; + const delaySecondSteps = [5, 30, 120]; // 5s, 30s, 120s + if (retryCount < delaySecondSteps.length) { + runtimeLogger.warn( + 'Server error (5xx), re-enqueueing workflow with backoff', + { + workflowRunId: runId, + retryCount, + delaySeconds: delaySecondSteps[retryCount], + error: err.message, + } + ); + await queueMessage( + world, + getWorkflowQueueName(workflowName), + { + runId, + serverErrorRetryCount: retryCount + 1, + traceCarrier: await serializeTraceCarrier(), + requestedAt: new Date(), + chaos, + chaosSeed, + }, + { delaySeconds: delaySecondSteps[retryCount] } + ); + return; // Don't fail the run, retry later + } + // Fall through to run_failed after exhausting retries + } else if ( + WorkflowAPIError.is(err) && + err.status === 429 + ) { + // Throw to let withThrottleRetry handle it + throw err; + } + + // NOTE: this error could be an error thrown in user code, or could also be a WorkflowRuntimeError + // (for instance when the event log is corrupted, this is thrown by the event consumer). We could + // specially handle these if needed. + + // Record exception for OTEL error tracking + if (err instanceof Error) { + span?.recordException?.(err); + } + + const normalizedError = + await normalizeUnknownError(err); + const errorName = + normalizedError.name || getErrorName(err); + const errorMessage = normalizedError.message; + let errorStack = + normalizedError.stack || getErrorStack(err); + + // Remap error stack using source maps to show original source locations + if (errorStack) { + const parsedName = parseWorkflowName(workflowName); + const filename = + parsedName?.moduleSpecifier || workflowName; + errorStack = remapErrorStack( + errorStack, + filename, + workflowCode + ); + } + + runtimeLogger.error('Error while running workflow', { + workflowRunId: runId, + errorName, + errorStack, + }); + + // Fail the workflow run via event (event-sourced architecture) + try { + await world.events.create(runId, { + eventType: 'run_failed', + specVersion: SPEC_VERSION_CURRENT, + eventData: { + error: { + message: errorMessage, + stack: errorStack, + }, + // TODO: include error codes when we define them + }, + }); + } catch (err) { + if ( + WorkflowAPIError.is(err) && + (err.status === 409 || err.status === 410) + ) { + runtimeLogger.warn( + 'Tried failing workflow run, but run has already finished.', + { + workflowRunId: runId, + message: err.message, + } + ); + span?.setAttributes({ + ...Attribute.WorkflowErrorName(errorName), + ...Attribute.WorkflowErrorMessage(errorMessage), + ...Attribute.ErrorType(errorName), + }); + return; + } else { + throw err; + } + } + + span?.setAttributes({ + ...Attribute.WorkflowRunStatus('failed'), + ...Attribute.WorkflowErrorName(errorName), + ...Attribute.WorkflowErrorMessage(errorMessage), + ...Attribute.ErrorType(errorName), + }); + } } - } - }); // End withThrottleRetry - } - ); // End trace - } - ); // End withWorkflowBaggage - }); // End withTraceContext + }); // End withThrottleRetry + } + ); // End trace + } + ); // End withWorkflowBaggage + }); // End withTraceContext + + if (chaos || chaosSeed) { + return await requestContext.run({ chaos, chaosSeed }, runHandler); + } + return await runHandler(); } ); diff --git a/packages/core/src/runtime/resume-hook.ts b/packages/core/src/runtime/resume-hook.ts index fedc1de3da..aadaaa10f5 100644 --- a/packages/core/src/runtime/resume-hook.ts +++ b/packages/core/src/runtime/resume-hook.ts @@ -177,6 +177,9 @@ export async function resumeHook( // attach the trace carrier from the workflow run traceCarrier: workflowRun.executionContext?.traceCarrier ?? undefined, + // propagate chaos config from the run's execution context + chaos: workflowRun.executionContext?.chaos ?? undefined, + chaosSeed: workflowRun.executionContext?.chaosSeed ?? undefined, } satisfies WorkflowInvokePayload, { deploymentId: workflowRun.deploymentId, diff --git a/packages/core/src/runtime/start.ts b/packages/core/src/runtime/start.ts index e8ba64908b..23f6685530 100644 --- a/packages/core/src/runtime/start.ts +++ b/packages/core/src/runtime/start.ts @@ -1,7 +1,11 @@ import { waitUntil } from '@vercel/functions'; import { WorkflowRuntimeError } from '@workflow/errors'; import type { WorkflowInvokePayload, World } from '@workflow/world'; -import { isLegacySpecVersion, SPEC_VERSION_CURRENT } from '@workflow/world'; +import { + isLegacySpecVersion, + requestContext, + SPEC_VERSION_CURRENT, +} from '@workflow/world'; import { monotonicFactory } from 'ulid'; import { importKey } from '../encryption.js'; import type { Serializable } from '../schemas.js'; @@ -79,141 +83,160 @@ export async function start( argsOrOptions?: TArgs | StartOptions, options?: StartOptions ) { - return await waitedUntil(() => { - // @ts-expect-error this field is added by our client transform - const workflowName = workflow?.workflowId; - - if (!workflowName) { - throw new WorkflowRuntimeError( - `'start' received an invalid workflow function. Ensure the Workflow Development Kit is configured correctly and the function includes a 'use workflow' directive.`, - { slug: 'start-invalid-workflow-function' } - ); - } - - return trace(`workflow.start ${workflowName}`, async (span) => { - span?.setAttributes({ - ...Attribute.WorkflowName(workflowName), - ...Attribute.WorkflowOperation('start'), - }); + // Read chaos config from environment (set by CI / test runner) + const chaos = process.env.WORKFLOW_CHAOS || undefined; + const chaosSeed = process.env.WORKFLOW_CHAOS_SEED || undefined; - let args: Serializable[] = []; - let opts: StartOptions = options ?? {}; - if (Array.isArray(argsOrOptions)) { - args = argsOrOptions as Serializable[]; - } else if (typeof argsOrOptions === 'object') { - opts = argsOrOptions; - } + const doStart = () => + waitedUntil(() => { + // @ts-expect-error this field is added by our client transform + const workflowName = workflow?.workflowId; - span?.setAttributes({ - ...Attribute.WorkflowArgumentsCount(args.length), - }); + if (!workflowName) { + throw new WorkflowRuntimeError( + `'start' received an invalid workflow function. Ensure the Workflow Development Kit is configured correctly and the function includes a 'use workflow' directive.`, + { slug: 'start-invalid-workflow-function' } + ); + } - const world = opts?.world ?? getWorld(); - let deploymentId = opts.deploymentId ?? (await world.getDeploymentId()); + return trace(`workflow.start ${workflowName}`, async (span) => { + span?.setAttributes({ + ...Attribute.WorkflowName(workflowName), + ...Attribute.WorkflowOperation('start'), + }); + + let args: Serializable[] = []; + let opts: StartOptions = options ?? {}; + if (Array.isArray(argsOrOptions)) { + args = argsOrOptions as Serializable[]; + } else if (typeof argsOrOptions === 'object') { + opts = argsOrOptions; + } - // When 'latest' is requested, resolve the actual latest deployment ID - // for the current deployment's environment (same production target or - // same git branch for preview deployments). - if (deploymentId === 'latest') { - if (!world.resolveLatestDeploymentId) { - throw new WorkflowRuntimeError( - "deploymentId 'latest' requires a World that implements resolveLatestDeploymentId()" - ); + span?.setAttributes({ + ...Attribute.WorkflowArgumentsCount(args.length), + }); + + const world = opts?.world ?? getWorld(); + let deploymentId = opts.deploymentId ?? (await world.getDeploymentId()); + + // When 'latest' is requested, resolve the actual latest deployment ID + // for the current deployment's environment (same production target or + // same git branch for preview deployments). + if (deploymentId === 'latest') { + if (!world.resolveLatestDeploymentId) { + throw new WorkflowRuntimeError( + "deploymentId 'latest' requires a World that implements resolveLatestDeploymentId()" + ); + } + deploymentId = await world.resolveLatestDeploymentId(); } - deploymentId = await world.resolveLatestDeploymentId(); - } - const ops: Promise[] = []; + const ops: Promise[] = []; - // Generate runId client-side so we have it before serialization - // (required for future E2E encryption where runId is part of the encryption context) - const runId = `wrun_${ulid()}`; + // Generate runId client-side so we have it before serialization + // (required for future E2E encryption where runId is part of the encryption context) + const runId = `wrun_${ulid()}`; - // Serialize current trace context to propagate across queue boundary - const traceCarrier = await serializeTraceCarrier(); + // Serialize current trace context to propagate across queue boundary + const traceCarrier = await serializeTraceCarrier(); - const specVersion = opts.specVersion ?? SPEC_VERSION_CURRENT; - const v1Compat = isLegacySpecVersion(specVersion); + const specVersion = opts.specVersion ?? SPEC_VERSION_CURRENT; + const v1Compat = isLegacySpecVersion(specVersion); - // Resolve encryption key for the new run. The runId has already been - // generated above (client-generated ULID) and will be used for both - // key derivation and the run_created event. The World implementation - // uses the runId for per-run HKDF key derivation. We pass the resolved - // deploymentId (not just the raw opts) so the World can use it for - // key resolution even when deploymentId was inferred from the environment - // rather than explicitly provided in opts (e.g., in e2e test runners). - const rawKey = await world.getEncryptionKeyForRun?.(runId, { - ...opts, - deploymentId, - }); - const encryptionKey = rawKey ? await importKey(rawKey) : undefined; - - // Create run via run_created event (event-sourced architecture) - // Pass client-generated runId - server will accept and use it - const workflowArguments = await dehydrateWorkflowArguments( - args, - runId, - encryptionKey, - ops, - globalThis, - v1Compat - ); - const result = await world.events.create( - runId, - { - eventType: 'run_created', - specVersion, - eventData: { - deploymentId: deploymentId, - workflowName: workflowName, - input: workflowArguments, - executionContext: { traceCarrier, workflowCoreVersion }, - }, - }, - { v1Compat } - ); + // Resolve encryption key for the new run. The runId has already been + // generated above (client-generated ULID) and will be used for both + // key derivation and the run_created event. The World implementation + // uses the runId for per-run HKDF key derivation. We pass the resolved + // deploymentId (not just the raw opts) so the World can use it for + // key resolution even when deploymentId was inferred from the environment + // rather than explicitly provided in opts (e.g., in e2e test runners). + const rawKey = await world.getEncryptionKeyForRun?.(runId, { + ...opts, + deploymentId, + }); + const encryptionKey = rawKey ? await importKey(rawKey) : undefined; - // Assert that the run was created - if (!result.run) { - throw new WorkflowRuntimeError( - "Missing 'run' in server response for 'run_created' event" + // Create run via run_created event (event-sourced architecture) + // Pass client-generated runId - server will accept and use it + const workflowArguments = await dehydrateWorkflowArguments( + args, + runId, + encryptionKey, + ops, + globalThis, + v1Compat ); - } - - // Verify server accepted our runId - if (!v1Compat && result.run.runId !== runId) { - throw new WorkflowRuntimeError( - `Server returned different runId than requested: expected ${runId}, got ${result.run.runId}` + const result = await world.events.create( + runId, + { + eventType: 'run_created', + specVersion, + eventData: { + deploymentId: deploymentId, + workflowName: workflowName, + input: workflowArguments, + executionContext: { + traceCarrier, + workflowCoreVersion, + ...(chaos ? { chaos } : {}), + ...(chaosSeed ? { chaosSeed } : {}), + }, + }, + }, + { v1Compat } ); - } - waitUntil( - Promise.all(ops).catch((err) => { - // Ignore expected client disconnect errors (e.g., browser refresh during streaming) - const isAbortError = - err?.name === 'AbortError' || err?.name === 'ResponseAborted'; - if (!isAbortError) throw err; - }) - ); - - span?.setAttributes({ - ...Attribute.WorkflowRunId(runId), - ...Attribute.WorkflowRunStatus(result.run.status), - ...Attribute.DeploymentId(deploymentId), - }); + // Assert that the run was created + if (!result.run) { + throw new WorkflowRuntimeError( + "Missing 'run' in server response for 'run_created' event" + ); + } - await world.queue( - getWorkflowQueueName(workflowName), - { - runId, - traceCarrier, - } satisfies WorkflowInvokePayload, - { - deploymentId, + // Verify server accepted our runId + if (!v1Compat && result.run.runId !== runId) { + throw new WorkflowRuntimeError( + `Server returned different runId than requested: expected ${runId}, got ${result.run.runId}` + ); } - ); - return new Run(runId); + waitUntil( + Promise.all(ops).catch((err) => { + // Ignore expected client disconnect errors (e.g., browser refresh during streaming) + const isAbortError = + err?.name === 'AbortError' || err?.name === 'ResponseAborted'; + if (!isAbortError) throw err; + }) + ); + + span?.setAttributes({ + ...Attribute.WorkflowRunId(runId), + ...Attribute.WorkflowRunStatus(result.run.status), + ...Attribute.DeploymentId(deploymentId), + }); + + await world.queue( + getWorkflowQueueName(workflowName), + { + runId, + traceCarrier, + ...(chaos ? { chaos } : {}), + ...(chaosSeed ? { chaosSeed } : {}), + } satisfies WorkflowInvokePayload, + { + deploymentId, + } + ); + + return new Run(runId); + }); }); - }); + + // Only enter the request context ALS when chaos config is present. + // This ensures getRequestContext() returns undefined in non-chaos cases. + if (chaos || chaosSeed) { + return await requestContext.run({ chaos, chaosSeed }, doStart); + } + return await doStart(); } diff --git a/packages/core/src/runtime/step-handler.ts b/packages/core/src/runtime/step-handler.ts index 2938077641..a0769e755e 100644 --- a/packages/core/src/runtime/step-handler.ts +++ b/packages/core/src/runtime/step-handler.ts @@ -7,7 +7,11 @@ import { } from '@workflow/errors'; import { pluralize } from '@workflow/utils'; import { getPort } from '@workflow/utils/get-port'; -import { SPEC_VERSION_CURRENT, StepInvokePayloadSchema } from '@workflow/world'; +import { + requestContext, + SPEC_VERSION_CURRENT, + StepInvokePayloadSchema, +} from '@workflow/world'; import { importKey } from '../encryption.js'; import { runtimeLogger, stepLogger } from '../logger.js'; import { getStepFunction } from '../private.js'; @@ -62,363 +66,222 @@ const stepHandler = getWorldHandlers().createQueueHandler( stepId, traceCarrier: traceContext, requestedAt, + chaos, + chaosSeed, } = StepInvokePayloadSchema.parse(message_); const spanLinks = await linkToCurrentContext(); - // Execute step within the propagated trace context - return await withTraceContext(traceContext, async () => { - // Extract the step name from the topic name - const stepName = metadata.queueName.slice('__wkf_step_'.length); - const world = getWorld(); - const isVercel = process.env.VERCEL_URL !== undefined; - - // Resolve local async values concurrently before entering the trace span - const [port, spanKind] = await Promise.all([ - isVercel ? undefined : getPort(), - getSpanKind('CONSUMER'), - ]); - - return trace( - `STEP ${stepName}`, - { kind: spanKind, links: spanLinks }, - async (span) => { - span?.setAttributes({ - ...Attribute.StepName(stepName), - ...Attribute.StepAttempt(metadata.attempt), - // Standard OTEL messaging conventions - ...Attribute.MessagingSystem('vercel-queue'), - ...Attribute.MessagingDestinationName(metadata.queueName), - ...Attribute.MessagingMessageId(metadata.messageId), - ...Attribute.MessagingOperationType('process'), - ...getQueueOverhead({ requestedAt }), - }); - - const stepFn = getStepFunction(stepName); - if (!stepFn) { - throw new Error(`Step "${stepName}" not found`); - } - if (typeof stepFn !== 'function') { - throw new Error( - `Step "${stepName}" is not a function (got ${typeof stepFn})` - ); - } - - const maxRetries = stepFn.maxRetries ?? DEFAULT_STEP_MAX_RETRIES; - - span?.setAttributes({ - ...Attribute.WorkflowName(workflowName), - ...Attribute.WorkflowRunId(workflowRunId), - ...Attribute.StepId(stepId), - ...Attribute.StepMaxRetries(maxRetries), - ...Attribute.StepTracePropagated(!!traceContext), - }); - - // step_started validates state and returns the step entity, so no separate - // world.steps.get() call is needed. The server checks: - // - Step not in terminal state (returns 409) - // - retryAfter timestamp reached (returns 425 with Retry-After header) - // - Workflow still active (returns 410 if completed) - let step; - try { - const startResult = await withServerErrorRetry(() => - world.events.create(workflowRunId, { - eventType: 'step_started', - specVersion: SPEC_VERSION_CURRENT, - correlationId: stepId, - }) - ); - - if (!startResult.step) { - throw new WorkflowRuntimeError( - `step_started event for "${stepId}" did not return step entity` - ); - } - step = startResult.step; - } catch (err) { - if (WorkflowAPIError.is(err)) { - if (WorkflowAPIError.is(err) && err.status === 429) { - const retryRetryAfter = Math.max( - 1, - typeof err.retryAfter === 'number' ? err.retryAfter : 1 - ); - runtimeLogger.warn( - 'Throttled again on retry, deferring to queue', - { - retryAfterSeconds: retryRetryAfter, - } - ); - return { timeoutSeconds: retryRetryAfter }; - } - // 410 Gone: Workflow has already completed - if (err.status === 410) { - runtimeLogger.info( - `Workflow run "${workflowRunId}" has already completed, skipping step "${stepId}": ${err.message}` - ); - return; - } - // 409 Conflict: Step in terminal state (completed/failed/cancelled) - // Re-enqueue the workflow to continue processing - if (err.status === 409) { - runtimeLogger.debug( - 'Step in terminal state, re-enqueuing workflow', - { - stepName, - stepId, - workflowRunId, - error: err.message, - } - ); - span?.setAttributes({ - ...Attribute.StepSkipped(true), - // Use 'completed' as a representative terminal state for the skip reason - ...Attribute.StepSkipReason('completed'), - }); - // Add span event for step skip - span?.addEvent?.('step.skipped', { - 'skip.reason': 'terminal_state', - 'step.name': stepName, - 'step.id': stepId, - }); - await queueMessage(world, getWorkflowQueueName(workflowName), { - runId: workflowRunId, - traceCarrier: await serializeTraceCarrier(), - requestedAt: new Date(), - }); - return; - } + // Enter chaos request context so World implementations can apply + // chaos behavior (e.g., routing to chaos server, injecting failures). + // Only enter the ALS when chaos config is present. + const runStepHandler = () => + // Execute step within the propagated trace context + withTraceContext(traceContext, async () => { + // Extract the step name from the topic name + const stepName = metadata.queueName.slice('__wkf_step_'.length); + const world = getWorld(); + const isVercel = process.env.VERCEL_URL !== undefined; + + // Resolve local async values concurrently before entering the trace span + const [port, spanKind] = await Promise.all([ + isVercel ? undefined : getPort(), + getSpanKind('CONSUMER'), + ]); + + return trace( + `STEP ${stepName}`, + { kind: spanKind, links: spanLinks }, + async (span) => { + span?.setAttributes({ + ...Attribute.StepName(stepName), + ...Attribute.StepAttempt(metadata.attempt), + // Standard OTEL messaging conventions + ...Attribute.MessagingSystem('vercel-queue'), + ...Attribute.MessagingDestinationName(metadata.queueName), + ...Attribute.MessagingMessageId(metadata.messageId), + ...Attribute.MessagingOperationType('process'), + ...getQueueOverhead({ requestedAt }), + }); - // 425 Too Early: retryAfter timestamp not reached yet - // Return timeout to queue so it retries later - if (err.status === 425) { - // Parse retryAfter from error response meta - const retryAfterStr = (err as any).meta?.retryAfter; - const retryAfter = retryAfterStr - ? new Date(retryAfterStr) - : new Date(Date.now() + 1000); - const timeoutSeconds = Math.max( - 1, - Math.ceil((retryAfter.getTime() - Date.now()) / 1000) - ); - span?.setAttributes({ - ...Attribute.StepRetryTimeoutSeconds(timeoutSeconds), - }); - // Add span event for delayed retry - span?.addEvent?.('step.delayed', { - 'delay.reason': 'retry_after_not_reached', - 'delay.timeout_seconds': timeoutSeconds, - 'delay.retry_after': retryAfter.toISOString(), - }); - runtimeLogger.debug( - 'Step retryAfter timestamp not yet reached', - { - stepName, - stepId, - retryAfter, - timeoutSeconds, - } - ); - return { timeoutSeconds }; - } + const stepFn = getStepFunction(stepName); + if (!stepFn) { + throw new Error(`Step "${stepName}" not found`); } - // Re-throw other errors - throw err; - } - - runtimeLogger.debug('Step execution details', { - stepName, - stepId: step.stepId, - status: step.status, - attempt: step.attempt, - }); - - span?.setAttributes({ - ...Attribute.StepStatus(step.status), - }); - - let result: unknown; - - // Check max retries AFTER step_started (attempt was just incremented) - // step.attempt tracks how many times step_started has been called. - // Note: maxRetries is the number of RETRIES after the first attempt, so total attempts = maxRetries + 1 - // Use > here (not >=) because this guards against re-invocation AFTER all attempts are used. - // The post-failure check uses >= to decide whether to retry after a failure. - if (step.attempt > maxRetries + 1) { - const retryCount = step.attempt - 1; - const errorMessage = `Step "${stepName}" exceeded max retries (${retryCount} ${pluralize('retry', 'retries', retryCount)})`; - stepLogger.error('Step exceeded max retries', { - workflowRunId, - stepName, - retryCount, - }); - // Fail the step via event (event-sourced architecture) - try { - await world.events.create(workflowRunId, { - eventType: 'step_failed', - specVersion: SPEC_VERSION_CURRENT, - correlationId: stepId, - eventData: { - error: errorMessage, - stack: step.error?.stack, - }, - }); - } catch (err) { - if (WorkflowAPIError.is(err) && err.status === 409) { - runtimeLogger.warn( - 'Tried failing step, but step has already finished.', - { - workflowRunId, - stepId, - stepName, - message: err.message, - } - ); - return; - } - throw err; + if (typeof stepFn !== 'function') { + throw new Error( + `Step "${stepName}" is not a function (got ${typeof stepFn})` + ); } - span?.setAttributes({ - ...Attribute.StepStatus('failed'), - ...Attribute.StepRetryExhausted(true), - }); + const maxRetries = stepFn.maxRetries ?? DEFAULT_STEP_MAX_RETRIES; - // Re-invoke the workflow to handle the failed step - await queueMessage(world, getWorkflowQueueName(workflowName), { - runId: workflowRunId, - traceCarrier: await serializeTraceCarrier(), - requestedAt: new Date(), + span?.setAttributes({ + ...Attribute.WorkflowName(workflowName), + ...Attribute.WorkflowRunId(workflowRunId), + ...Attribute.StepId(stepId), + ...Attribute.StepMaxRetries(maxRetries), + ...Attribute.StepTracePropagated(!!traceContext), }); - return; - } - - try { - // step_started already validated the step is in valid state (pending/running) - // and returned the updated step entity with incremented attempt - // step.attempt is now the current attempt number (after increment) - const attempt = step.attempt; - - if (!step.startedAt) { - throw new WorkflowRuntimeError( - `Step "${stepId}" has no "startedAt" timestamp` + // step_started validates state and returns the step entity, so no separate + // world.steps.get() call is needed. The server checks: + // - Step not in terminal state (returns 409) + // - retryAfter timestamp reached (returns 425 with Retry-After header) + // - Workflow still active (returns 410 if completed) + let step; + try { + const startResult = await withServerErrorRetry(() => + world.events.create(workflowRunId, { + eventType: 'step_started', + specVersion: SPEC_VERSION_CURRENT, + correlationId: stepId, + }) ); - } - // Capture startedAt for use in async callback (TypeScript narrowing doesn't persist) - const stepStartedAt = step.startedAt; - - // Hydrate the step input arguments, closure variables, and thisVal - // NOTE: This captures only the synchronous portion of hydration. Any async - // operations (e.g., stream loading) are added to `ops` and executed later - // via Promise.all(ops) - their timing is not included in this measurement. - const ops: Promise[] = []; - const rawKey = await world.getEncryptionKeyForRun?.(workflowRunId); - const encryptionKey = rawKey ? await importKey(rawKey) : undefined; - const hydratedInput = await trace( - 'step.hydrate', - {}, - async (hydrateSpan) => { - const startTime = Date.now(); - const result = await hydrateStepArguments( - step.input, - workflowRunId, - encryptionKey, - ops + + if (!startResult.step) { + throw new WorkflowRuntimeError( + `step_started event for "${stepId}" did not return step entity` ); - const durationMs = Date.now() - startTime; - hydrateSpan?.setAttributes({ - ...Attribute.StepArgumentsCount(result.args.length), - ...Attribute.QueueDeserializeTimeMs(durationMs), - }); - return result; } - ); + step = startResult.step; + } catch (err) { + if (WorkflowAPIError.is(err)) { + if (WorkflowAPIError.is(err) && err.status === 429) { + const retryRetryAfter = Math.max( + 1, + typeof err.retryAfter === 'number' ? err.retryAfter : 1 + ); + runtimeLogger.warn( + 'Throttled again on retry, deferring to queue', + { + retryAfterSeconds: retryRetryAfter, + } + ); + return { timeoutSeconds: retryRetryAfter }; + } + // 410 Gone: Workflow has already completed + if (err.status === 410) { + runtimeLogger.info( + `Workflow run "${workflowRunId}" has already completed, skipping step "${stepId}": ${err.message}` + ); + return; + } - const args = hydratedInput.args; - const thisVal = hydratedInput.thisVal ?? null; + // 409 Conflict: Step in terminal state (completed/failed/cancelled) + // Re-enqueue the workflow to continue processing + if (err.status === 409) { + runtimeLogger.debug( + 'Step in terminal state, re-enqueuing workflow', + { + stepName, + stepId, + workflowRunId, + error: err.message, + } + ); + span?.setAttributes({ + ...Attribute.StepSkipped(true), + // Use 'completed' as a representative terminal state for the skip reason + ...Attribute.StepSkipReason('completed'), + }); + // Add span event for step skip + span?.addEvent?.('step.skipped', { + 'skip.reason': 'terminal_state', + 'step.name': stepName, + 'step.id': stepId, + }); + await queueMessage( + world, + getWorkflowQueueName(workflowName), + { + runId: workflowRunId, + traceCarrier: await serializeTraceCarrier(), + requestedAt: new Date(), + chaos, + chaosSeed, + } + ); + return; + } - // Execute the step function with tracing - const executionStartTime = Date.now(); - result = await trace('step.execute', {}, async () => { - return await contextStorage.run( - { - stepMetadata: { - stepName, - stepId, - stepStartedAt: new Date(+stepStartedAt), - attempt, - }, - workflowMetadata: { - workflowName, - workflowRunId, - workflowStartedAt: new Date(+workflowStartedAt), - // TODO: there should be a getUrl method on the world interface itself. This - // solution only works for vercel + local worlds. - url: isVercel - ? `https://${process.env.VERCEL_URL}` - : `http://localhost:${port ?? 3000}`, - }, - ops, - closureVars: hydratedInput.closureVars, - encryptionKey, - }, - () => stepFn.apply(thisVal, args) - ); + // 425 Too Early: retryAfter timestamp not reached yet + // Return timeout to queue so it retries later + if (err.status === 425) { + // Parse retryAfter from error response meta + const retryAfterStr = (err as any).meta?.retryAfter; + const retryAfter = retryAfterStr + ? new Date(retryAfterStr) + : new Date(Date.now() + 1000); + const timeoutSeconds = Math.max( + 1, + Math.ceil((retryAfter.getTime() - Date.now()) / 1000) + ); + span?.setAttributes({ + ...Attribute.StepRetryTimeoutSeconds(timeoutSeconds), + }); + // Add span event for delayed retry + span?.addEvent?.('step.delayed', { + 'delay.reason': 'retry_after_not_reached', + 'delay.timeout_seconds': timeoutSeconds, + 'delay.retry_after': retryAfter.toISOString(), + }); + runtimeLogger.debug( + 'Step retryAfter timestamp not yet reached', + { + stepName, + stepId, + retryAfter, + timeoutSeconds, + } + ); + return { timeoutSeconds }; + } + } + // Re-throw other errors + throw err; + } + + runtimeLogger.debug('Step execution details', { + stepName, + stepId: step.stepId, + status: step.status, + attempt: step.attempt, }); - const executionTimeMs = Date.now() - executionStartTime; span?.setAttributes({ - ...Attribute.QueueExecutionTimeMs(executionTimeMs), + ...Attribute.StepStatus(step.status), }); - // NOTE: None of the code from this point is guaranteed to run - // Since the step might fail or cause a function timeout and the process might be SIGKILL'd - // The workflow runtime must be resilient to the below code not executing on a failed step - result = await trace( - 'step.dehydrate', - {}, - async (dehydrateSpan) => { - const startTime = Date.now(); - const dehydrated = await dehydrateStepReturnValue( - result, - workflowRunId, - encryptionKey, - ops - ); - const durationMs = Date.now() - startTime; - dehydrateSpan?.setAttributes({ - ...Attribute.QueueSerializeTimeMs(durationMs), - ...Attribute.StepResultType(typeof dehydrated), - }); - return dehydrated; - } - ); - - waitUntil( - Promise.all(ops).catch((err) => { - // Ignore expected client disconnect errors (e.g., browser refresh during streaming) - const isAbortError = - err?.name === 'AbortError' || err?.name === 'ResponseAborted'; - if (!isAbortError) throw err; - }) - ); - - // Run step_completed and trace serialization concurrently; - // the trace carrier is used in the final queueMessage call below - let stepCompleted409 = false; - const [, traceCarrier] = await Promise.all([ - withServerErrorRetry(() => - world.events.create(workflowRunId, { - eventType: 'step_completed', + let result: unknown; + + // Check max retries AFTER step_started (attempt was just incremented) + // step.attempt tracks how many times step_started has been called. + // Note: maxRetries is the number of RETRIES after the first attempt, so total attempts = maxRetries + 1 + // Use > here (not >=) because this guards against re-invocation AFTER all attempts are used. + // The post-failure check uses >= to decide whether to retry after a failure. + if (step.attempt > maxRetries + 1) { + const retryCount = step.attempt - 1; + const errorMessage = `Step "${stepName}" exceeded max retries (${retryCount} ${pluralize('retry', 'retries', retryCount)})`; + stepLogger.error('Step exceeded max retries', { + workflowRunId, + stepName, + retryCount, + }); + // Fail the step via event (event-sourced architecture) + try { + await world.events.create(workflowRunId, { + eventType: 'step_failed', specVersion: SPEC_VERSION_CURRENT, correlationId: stepId, eventData: { - result: result as Uint8Array, + error: errorMessage, + stack: step.error?.stack, }, - }) - ).catch((err) => { + }); + } catch (err) { if (WorkflowAPIError.is(err) && err.status === 409) { runtimeLogger.warn( - 'Tried completing step, but step has already finished.', + 'Tried failing step, but step has already finished.', { workflowRunId, stepId, @@ -426,165 +289,266 @@ const stepHandler = getWorldHandlers().createQueueHandler( message: err.message, } ); - stepCompleted409 = true; return; } throw err; - }), - serializeTraceCarrier(), - ]); - - if (stepCompleted409) { - return; - } + } - span?.setAttributes({ - ...Attribute.StepStatus('completed'), - ...Attribute.StepResultType(typeof result), - }); + span?.setAttributes({ + ...Attribute.StepStatus('failed'), + ...Attribute.StepRetryExhausted(true), + }); - // Queue the workflow continuation with the concurrently-resolved trace carrier - await queueMessage(world, getWorkflowQueueName(workflowName), { - runId: workflowRunId, - traceCarrier, - requestedAt: new Date(), - }); - return; - } catch (err: unknown) { - const normalizedError = await normalizeUnknownError(err); - const normalizedStack = - normalizedError.stack || getErrorStack(err) || ''; - - // Record exception for OTEL error tracking - if (err instanceof Error) { - span?.recordException?.(err); + // Re-invoke the workflow to handle the failed step + await queueMessage(world, getWorkflowQueueName(workflowName), { + runId: workflowRunId, + traceCarrier: await serializeTraceCarrier(), + requestedAt: new Date(), + chaos, + chaosSeed, + }); + return; } - // Determine error category and retryability - const isFatal = FatalError.is(err); - const isRetryable = RetryableError.is(err); - const errorCategory = isFatal - ? 'fatal' - : isRetryable - ? 'retryable' - : 'transient'; + try { + // step_started already validated the step is in valid state (pending/running) + // and returned the updated step entity with incremented attempt - span?.setAttributes({ - ...Attribute.StepErrorName(getErrorName(err)), - ...Attribute.StepErrorMessage(normalizedError.message), - ...Attribute.ErrorType(getErrorName(err)), - ...Attribute.ErrorCategory(errorCategory), - ...Attribute.ErrorRetryable(!isFatal), - }); + // step.attempt is now the current attempt number (after increment) + const attempt = step.attempt; - if (WorkflowAPIError.is(err)) { - if (err.status === 410) { - // Workflow has already completed, so no-op - stepLogger.info( - 'Workflow run already completed, skipping step', - { - workflowRunId, - stepId, - message: err.message, - } + if (!step.startedAt) { + throw new WorkflowRuntimeError( + `Step "${stepId}" has no "startedAt" timestamp` ); - return; } + // Capture startedAt for use in async callback (TypeScript narrowing doesn't persist) + const stepStartedAt = step.startedAt; + + // Hydrate the step input arguments, closure variables, and thisVal + // NOTE: This captures only the synchronous portion of hydration. Any async + // operations (e.g., stream loading) are added to `ops` and executed later + // via Promise.all(ops) - their timing is not included in this measurement. + const ops: Promise[] = []; + const rawKey = + await world.getEncryptionKeyForRun?.(workflowRunId); + const encryptionKey = rawKey + ? await importKey(rawKey) + : undefined; + const hydratedInput = await trace( + 'step.hydrate', + {}, + async (hydrateSpan) => { + const startTime = Date.now(); + const result = await hydrateStepArguments( + step.input, + workflowRunId, + encryptionKey, + ops + ); + const durationMs = Date.now() - startTime; + hydrateSpan?.setAttributes({ + ...Attribute.StepArgumentsCount(result.args.length), + ...Attribute.QueueDeserializeTimeMs(durationMs), + }); + return result; + } + ); + + const args = hydratedInput.args; + const thisVal = hydratedInput.thisVal ?? null; - // Server errors (5xx) from workflow-server are treated as persistent - // infrastructure issues. The withServerErrorRetry wrapper already - // retried the call a few times; if we still have a 5xx here it's - // likely persistent. Re-throw so the queue can retry the job and - // re-invoke this handler. Note: by the time we reach this point, - // step_started has already run and incremented step.attempt, and a - // subsequent queue retry may increment attempts again depending on - // storage semantics, so these retries are not guaranteed to be - // "free" with respect to step attempts. - if (err.status !== undefined && err.status >= 500) { - runtimeLogger.warn( - 'Persistent server error (5xx) during step, deferring to queue retry', + // Execute the step function with tracing + const executionStartTime = Date.now(); + result = await trace('step.execute', {}, async () => { + return await contextStorage.run( { - status: err.status, - workflowRunId, - stepId, - error: err.message, - url: err.url, - } + stepMetadata: { + stepName, + stepId, + stepStartedAt: new Date(+stepStartedAt), + attempt, + }, + workflowMetadata: { + workflowName, + workflowRunId, + workflowStartedAt: new Date(+workflowStartedAt), + // TODO: there should be a getUrl method on the world interface itself. This + // solution only works for vercel + local worlds. + url: isVercel + ? `https://${process.env.VERCEL_URL}` + : `http://localhost:${port ?? 3000}`, + }, + ops, + closureVars: hydratedInput.closureVars, + encryptionKey, + }, + () => stepFn.apply(thisVal, args) ); - throw err; - } - } + }); + const executionTimeMs = Date.now() - executionStartTime; - if (isFatal) { - stepLogger.error( - 'Encountered FatalError while executing step, bubbling up to parent workflow', - { - workflowRunId, - stepName, - errorStack: normalizedStack, + span?.setAttributes({ + ...Attribute.QueueExecutionTimeMs(executionTimeMs), + }); + + // NOTE: None of the code from this point is guaranteed to run + // Since the step might fail or cause a function timeout and the process might be SIGKILL'd + // The workflow runtime must be resilient to the below code not executing on a failed step + result = await trace( + 'step.dehydrate', + {}, + async (dehydrateSpan) => { + const startTime = Date.now(); + const dehydrated = await dehydrateStepReturnValue( + result, + workflowRunId, + encryptionKey, + ops + ); + const durationMs = Date.now() - startTime; + dehydrateSpan?.setAttributes({ + ...Attribute.QueueSerializeTimeMs(durationMs), + ...Attribute.StepResultType(typeof dehydrated), + }); + return dehydrated; } ); - // Fail the step via event (event-sourced architecture) - try { - await withServerErrorRetry(() => + + waitUntil( + Promise.all(ops).catch((err) => { + // Ignore expected client disconnect errors (e.g., browser refresh during streaming) + const isAbortError = + err?.name === 'AbortError' || + err?.name === 'ResponseAborted'; + if (!isAbortError) throw err; + }) + ); + + // Run step_completed and trace serialization concurrently; + // the trace carrier is used in the final queueMessage call below + let stepCompleted409 = false; + const [, traceCarrier] = await Promise.all([ + withServerErrorRetry(() => world.events.create(workflowRunId, { - eventType: 'step_failed', + eventType: 'step_completed', specVersion: SPEC_VERSION_CURRENT, correlationId: stepId, eventData: { - error: normalizedError.message, - stack: normalizedStack, + result: result as Uint8Array, }, }) - ); - } catch (stepFailErr) { - if ( - WorkflowAPIError.is(stepFailErr) && - stepFailErr.status === 409 - ) { - runtimeLogger.warn( - 'Tried failing step, but step has already finished.', + ).catch((err) => { + if (WorkflowAPIError.is(err) && err.status === 409) { + runtimeLogger.warn( + 'Tried completing step, but step has already finished.', + { + workflowRunId, + stepId, + stepName, + message: err.message, + } + ); + stepCompleted409 = true; + return; + } + throw err; + }), + serializeTraceCarrier(), + ]); + + if (stepCompleted409) { + return; + } + + span?.setAttributes({ + ...Attribute.StepStatus('completed'), + ...Attribute.StepResultType(typeof result), + }); + + // Queue the workflow continuation with the concurrently-resolved trace carrier + await queueMessage(world, getWorkflowQueueName(workflowName), { + runId: workflowRunId, + traceCarrier, + requestedAt: new Date(), + chaos, + chaosSeed, + }); + return; + } catch (err: unknown) { + const normalizedError = await normalizeUnknownError(err); + const normalizedStack = + normalizedError.stack || getErrorStack(err) || ''; + + // Record exception for OTEL error tracking + if (err instanceof Error) { + span?.recordException?.(err); + } + + // Determine error category and retryability + const isFatal = FatalError.is(err); + const isRetryable = RetryableError.is(err); + const errorCategory = isFatal + ? 'fatal' + : isRetryable + ? 'retryable' + : 'transient'; + + span?.setAttributes({ + ...Attribute.StepErrorName(getErrorName(err)), + ...Attribute.StepErrorMessage(normalizedError.message), + ...Attribute.ErrorType(getErrorName(err)), + ...Attribute.ErrorCategory(errorCategory), + ...Attribute.ErrorRetryable(!isFatal), + }); + + if (WorkflowAPIError.is(err)) { + if (err.status === 410) { + // Workflow has already completed, so no-op + stepLogger.info( + 'Workflow run already completed, skipping step', { workflowRunId, stepId, - stepName, - message: stepFailErr.message, + message: err.message, } ); return; } - throw stepFailErr; - } - - span?.setAttributes({ - ...Attribute.StepStatus('failed'), - ...Attribute.StepFatalError(true), - }); - } else { - const maxRetries = stepFn.maxRetries ?? DEFAULT_STEP_MAX_RETRIES; - // step.attempt was incremented by step_started, use it here - const currentAttempt = step.attempt; - span?.setAttributes({ - ...Attribute.StepAttempt(currentAttempt), - ...Attribute.StepMaxRetries(maxRetries), - }); + // Server errors (5xx) from workflow-server are treated as persistent + // infrastructure issues. The withServerErrorRetry wrapper already + // retried the call a few times; if we still have a 5xx here it's + // likely persistent. Re-throw so the queue can retry the job and + // re-invoke this handler. Note: by the time we reach this point, + // step_started has already run and incremented step.attempt, and a + // subsequent queue retry may increment attempts again depending on + // storage semantics, so these retries are not guaranteed to be + // "free" with respect to step attempts. + if (err.status !== undefined && err.status >= 500) { + runtimeLogger.warn( + 'Persistent server error (5xx) during step, deferring to queue retry', + { + status: err.status, + workflowRunId, + stepId, + error: err.message, + url: err.url, + } + ); + throw err; + } + } - // Note: maxRetries is the number of RETRIES after the first attempt, so total attempts = maxRetries + 1 - if (currentAttempt >= maxRetries + 1) { - // Max retries reached - const retryCount = step.attempt - 1; + if (isFatal) { stepLogger.error( - 'Max retries reached, bubbling error to parent workflow', + 'Encountered FatalError while executing step, bubbling up to parent workflow', { workflowRunId, stepName, - attempt: step.attempt, - retryCount, errorStack: normalizedStack, } ); - const errorMessage = `Step "${stepName}" failed after ${maxRetries} ${pluralize('retry', 'retries', maxRetries)}: ${normalizedError.message}`; // Fail the step via event (event-sourced architecture) try { await withServerErrorRetry(() => @@ -593,7 +557,7 @@ const stepHandler = getWorldHandlers().createQueueHandler( specVersion: SPEC_VERSION_CURRENT, correlationId: stepId, eventData: { - error: errorMessage, + error: normalizedError.message, stack: normalizedStack, }, }) @@ -619,98 +583,169 @@ const stepHandler = getWorldHandlers().createQueueHandler( span?.setAttributes({ ...Attribute.StepStatus('failed'), - ...Attribute.StepRetryExhausted(true), + ...Attribute.StepFatalError(true), }); } else { - // Not at max retries yet - log as a retryable error - if (RetryableError.is(err)) { - stepLogger.warn( - 'Encountered RetryableError, step will be retried', + const maxRetries = + stepFn.maxRetries ?? DEFAULT_STEP_MAX_RETRIES; + // step.attempt was incremented by step_started, use it here + const currentAttempt = step.attempt; + + span?.setAttributes({ + ...Attribute.StepAttempt(currentAttempt), + ...Attribute.StepMaxRetries(maxRetries), + }); + + // Note: maxRetries is the number of RETRIES after the first attempt, so total attempts = maxRetries + 1 + if (currentAttempt >= maxRetries + 1) { + // Max retries reached + const retryCount = step.attempt - 1; + stepLogger.error( + 'Max retries reached, bubbling error to parent workflow', { workflowRunId, stepName, - attempt: currentAttempt, - message: err.message, + attempt: step.attempt, + retryCount, + errorStack: normalizedStack, } ); - } else { - stepLogger.warn('Encountered Error, step will be retried', { - workflowRunId, - stepName, - attempt: currentAttempt, - errorStack: normalizedStack, + const errorMessage = `Step "${stepName}" failed after ${maxRetries} ${pluralize('retry', 'retries', maxRetries)}: ${normalizedError.message}`; + // Fail the step via event (event-sourced architecture) + try { + await withServerErrorRetry(() => + world.events.create(workflowRunId, { + eventType: 'step_failed', + specVersion: SPEC_VERSION_CURRENT, + correlationId: stepId, + eventData: { + error: errorMessage, + stack: normalizedStack, + }, + }) + ); + } catch (stepFailErr) { + if ( + WorkflowAPIError.is(stepFailErr) && + stepFailErr.status === 409 + ) { + runtimeLogger.warn( + 'Tried failing step, but step has already finished.', + { + workflowRunId, + stepId, + stepName, + message: stepFailErr.message, + } + ); + return; + } + throw stepFailErr; + } + + span?.setAttributes({ + ...Attribute.StepStatus('failed'), + ...Attribute.StepRetryExhausted(true), }); - } - // Set step to pending for retry via event (event-sourced architecture) - // step_retrying records the error and sets status to pending - try { - await withServerErrorRetry(() => - world.events.create(workflowRunId, { - eventType: 'step_retrying', - specVersion: SPEC_VERSION_CURRENT, - correlationId: stepId, - eventData: { - error: normalizedError.message, - stack: normalizedStack, - ...(RetryableError.is(err) && { - retryAfter: err.retryAfter, - }), - }, - }) - ); - } catch (stepRetryErr) { - if ( - WorkflowAPIError.is(stepRetryErr) && - stepRetryErr.status === 409 - ) { - runtimeLogger.warn( - 'Tried retrying step, but step has already finished.', + } else { + // Not at max retries yet - log as a retryable error + if (RetryableError.is(err)) { + stepLogger.warn( + 'Encountered RetryableError, step will be retried', { workflowRunId, - stepId, stepName, - message: stepRetryErr.message, + attempt: currentAttempt, + message: err.message, } ); - return; + } else { + stepLogger.warn('Encountered Error, step will be retried', { + workflowRunId, + stepName, + attempt: currentAttempt, + errorStack: normalizedStack, + }); + } + // Set step to pending for retry via event (event-sourced architecture) + // step_retrying records the error and sets status to pending + try { + await withServerErrorRetry(() => + world.events.create(workflowRunId, { + eventType: 'step_retrying', + specVersion: SPEC_VERSION_CURRENT, + correlationId: stepId, + eventData: { + error: normalizedError.message, + stack: normalizedStack, + ...(RetryableError.is(err) && { + retryAfter: err.retryAfter, + }), + }, + }) + ); + } catch (stepRetryErr) { + if ( + WorkflowAPIError.is(stepRetryErr) && + stepRetryErr.status === 409 + ) { + runtimeLogger.warn( + 'Tried retrying step, but step has already finished.', + { + workflowRunId, + stepId, + stepName, + message: stepRetryErr.message, + } + ); + return; + } + throw stepRetryErr; } - throw stepRetryErr; - } - const timeoutSeconds = Math.max( - 1, - RetryableError.is(err) - ? Math.ceil((+err.retryAfter.getTime() - Date.now()) / 1000) - : 1 - ); + const timeoutSeconds = Math.max( + 1, + RetryableError.is(err) + ? Math.ceil( + (+err.retryAfter.getTime() - Date.now()) / 1000 + ) + : 1 + ); - span?.setAttributes({ - ...Attribute.StepRetryTimeoutSeconds(timeoutSeconds), - ...Attribute.StepRetryWillRetry(true), - }); + span?.setAttributes({ + ...Attribute.StepRetryTimeoutSeconds(timeoutSeconds), + ...Attribute.StepRetryWillRetry(true), + }); - // Add span event for retry scheduling - span?.addEvent?.('retry.scheduled', { - 'retry.timeout_seconds': timeoutSeconds, - 'retry.attempt': currentAttempt, - 'retry.max_retries': maxRetries, - }); + // Add span event for retry scheduling + span?.addEvent?.('retry.scheduled', { + 'retry.timeout_seconds': timeoutSeconds, + 'retry.attempt': currentAttempt, + 'retry.max_retries': maxRetries, + }); - // It's a retryable error - so have the queue keep the message visible - // so that it gets retried. - return { timeoutSeconds }; + // It's a retryable error - so have the queue keep the message visible + // so that it gets retried. + return { timeoutSeconds }; + } } } + + await queueMessage(world, getWorkflowQueueName(workflowName), { + runId: workflowRunId, + traceCarrier: await serializeTraceCarrier(), + requestedAt: new Date(), + chaos, + chaosSeed, + }); } + ); + }); // End withTraceContext - await queueMessage(world, getWorkflowQueueName(workflowName), { - runId: workflowRunId, - traceCarrier: await serializeTraceCarrier(), - requestedAt: new Date(), - }); - } - ); - }); + if (chaos || chaosSeed) { + return await requestContext.run({ chaos, chaosSeed }, runStepHandler); + } + return await runStepHandler(); } ); diff --git a/packages/core/src/runtime/suspension-handler.ts b/packages/core/src/runtime/suspension-handler.ts index c74870cc96..ebda188963 100644 --- a/packages/core/src/runtime/suspension-handler.ts +++ b/packages/core/src/runtime/suspension-handler.ts @@ -43,6 +43,10 @@ export interface SuspensionHandlerParams { world: World; run: WorkflowRun; span?: Span; + /** Chaos testing mode to propagate to step queue messages */ + chaos?: string; + /** Deterministic seed for reproducible chaos */ + chaosSeed?: string; } export interface SuspensionHandlerResult { @@ -63,6 +67,8 @@ export async function handleSuspension({ world, run, span, + chaos, + chaosSeed, }: SuspensionHandlerParams): Promise { const runId = run.runId; const workflowName = run.workflowName; @@ -260,6 +266,8 @@ export async function handleSuspension({ stepId: queueItem.correlationId, traceCarrier, requestedAt: new Date(), + chaos, + chaosSeed, }, { idempotencyKey: queueItem.correlationId, diff --git a/packages/world-vercel/src/utils.ts b/packages/world-vercel/src/utils.ts index ebcb5811b7..bc9021e257 100644 --- a/packages/world-vercel/src/utils.ts +++ b/packages/world-vercel/src/utils.ts @@ -2,7 +2,11 @@ import os from 'node:os'; import { inspect } from 'node:util'; import { getVercelOidcToken } from '@vercel/oidc'; import { WorkflowAPIError } from '@workflow/errors'; -import { type StructuredError, StructuredErrorSchema } from '@workflow/world'; +import { + getRequestContext, + type StructuredError, + StructuredErrorSchema, +} from '@workflow/world'; import { decode, encode } from 'cbor-x'; import type { z } from 'zod'; import { getDispatcher } from './http-client.js'; @@ -31,6 +35,13 @@ import { version } from './version.js'; */ const WORKFLOW_SERVER_URL_OVERRIDE = ''; +/** + * Hard-coded chaos workflow-server URL. + * When chaos testing is active (via request context), requests are routed + * to this dedicated deployment instead of the production server. + */ +const CHAOS_WORKFLOW_SERVER_URL = 'https://chaos.vercel-workflow.com'; + export interface APIConfig { token?: string; headers?: RequestInit['headers']; @@ -157,8 +168,13 @@ export const getHttpUrl = ( config?: APIConfig ): { baseUrl: string; usingProxy: boolean } => { const projectConfig = config?.projectConfig; + // Chaos testing overrides the workflow-server URL when active + const ctx = getRequestContext(); + const chaosOverride = ctx?.chaos ? CHAOS_WORKFLOW_SERVER_URL : undefined; const defaultHost = - WORKFLOW_SERVER_URL_OVERRIDE || 'https://vercel-workflow.com'; + chaosOverride || + WORKFLOW_SERVER_URL_OVERRIDE || + 'https://vercel-workflow.com'; const customProxyUrl = process.env.WORKFLOW_VERCEL_BACKEND_URL; const defaultProxyUrl = 'https://api.vercel.com/v1/workflow'; // Use proxy when we have project config (for authentication via Vercel API) @@ -193,8 +209,18 @@ export const getHeaders = ( // Only set workflow-api-url header when using the proxy, since the proxy // forwards it to the workflow-server. When not using proxy, requests go // directly to the workflow-server so this header has no effect. - if (WORKFLOW_SERVER_URL_OVERRIDE && options.usingProxy) { - headers.set('x-vercel-workflow-api-url', WORKFLOW_SERVER_URL_OVERRIDE); + const ctx = getRequestContext(); + const chaosOverride = ctx?.chaos ? CHAOS_WORKFLOW_SERVER_URL : undefined; + const urlOverride = chaosOverride || WORKFLOW_SERVER_URL_OVERRIDE; + if (urlOverride && options.usingProxy) { + headers.set('x-vercel-workflow-api-url', urlOverride); + } + // Add chaos testing headers when chaos mode is active in the request context + if (ctx?.chaos) { + headers.set('X-Chaos', ctx.chaos); + } + if (ctx?.chaosSeed) { + headers.set('X-Chaos-Seed', ctx.chaosSeed); } return headers; }; diff --git a/packages/world/src/index.ts b/packages/world/src/index.ts index 7822032f8a..ed2368f8d8 100644 --- a/packages/world/src/index.ts +++ b/packages/world/src/index.ts @@ -18,6 +18,8 @@ export { ValidQueueName, WorkflowInvokePayloadSchema, } from './queue.js'; +export type { RequestContext } from './request-context.js'; +export { getRequestContext, requestContext } from './request-context.js'; export type * from './runs.js'; export { WorkflowRunBaseSchema, @@ -43,10 +45,10 @@ export { } from './spec-version.js'; export type * from './steps.js'; export { StepSchema, StepStatusSchema } from './steps.js'; -export type * from './waits.js'; -export { WaitSchema, WaitStatusSchema } from './waits.js'; export { DEFAULT_TIMESTAMP_THRESHOLD_MS, ulidToDate, validateUlidTimestamp, } from './ulid.js'; +export type * from './waits.js'; +export { WaitSchema, WaitStatusSchema } from './waits.js'; diff --git a/packages/world/src/queue.ts b/packages/world/src/queue.ts index 9690b6b446..bdabe59c71 100644 --- a/packages/world/src/queue.ts +++ b/packages/world/src/queue.ts @@ -27,6 +27,10 @@ export const WorkflowInvokePayloadSchema = z.object({ requestedAt: z.coerce.date().optional(), /** Number of times this message has been re-enqueued due to server errors (5xx) */ serverErrorRetryCount: z.number().int().optional(), + /** Chaos testing mode (e.g., "random-500", "random-429"). Propagated through the execution chain. */ + chaos: z.string().optional(), + /** Deterministic seed for reproducible chaos. Propagated through the execution chain. */ + chaosSeed: z.string().optional(), }); export const StepInvokePayloadSchema = z.object({ @@ -36,6 +40,10 @@ export const StepInvokePayloadSchema = z.object({ stepId: z.string(), traceCarrier: TraceCarrierSchema.optional(), requestedAt: z.coerce.date().optional(), + /** Chaos testing mode (e.g., "random-500", "random-429"). Propagated through the execution chain. */ + chaos: z.string().optional(), + /** Deterministic seed for reproducible chaos. Propagated through the execution chain. */ + chaosSeed: z.string().optional(), }); export type WorkflowInvokePayload = z.infer; diff --git a/packages/world/src/request-context.ts b/packages/world/src/request-context.ts new file mode 100644 index 0000000000..c6e7df5656 --- /dev/null +++ b/packages/world/src/request-context.ts @@ -0,0 +1,52 @@ +import { AsyncLocalStorage } from 'node:async_hooks'; + +/** + * World-agnostic request context for cross-cutting concerns. + * + * This context is set by the core runtime (workflow/step queue handlers) + * and read by World implementations to apply per-request behavior. + * + * Each World implementation interprets the context in its own way: + * - `world-vercel`: Adds HTTP headers and routes to the chaos server + * - `world-local`: Could inject filesystem/queue failures + * - `world-postgres`: Could inject query failures + */ +export interface RequestContext { + /** + * Chaos testing mode identifier. + * + * When set, the World implementation should apply chaos behavior + * appropriate for its transport/storage mechanism. + * + * Examples: "random-500", "random-429", "slow-response:3000" + */ + chaos?: string; + + /** + * Deterministic seed for reproducible chaos behavior. + * + * When set, chaos failures should be deterministic for the given seed, + * making test failures reproducible in CI. + */ + chaosSeed?: string; +} + +/** + * AsyncLocalStorage instance for per-request context propagation. + * + * The core runtime enters this context when processing queue messages + * (both workflow and step handlers). World implementations read it + * to apply per-request behavior like chaos testing. + */ +export const requestContext = new AsyncLocalStorage(); + +/** + * Read the current request context, if any. + * + * Returns `undefined` when called outside of a `requestContext.run()` block. + * The runtime only enters `requestContext.run()` when chaos config is present, + * so a non-undefined return value indicates chaos testing is active. + */ +export function getRequestContext(): RequestContext | undefined { + return requestContext.getStore(); +}