Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,67 @@ jobs:
- name: Run Unit Tests
run: pnpm test --filter='!./docs' --filter='!./workbench/vitest'

chaos-e2e-vercel:
name: Chaos E2E (${{ matrix.chaos-mode }} / ${{ matrix.app.name }})
runs-on: ubuntu-latest
timeout-minutes: 45
if: ${{ !contains(github.event.pull_request.labels.*.name, 'workflow-server-test') }}
strategy:
fail-fast: false
matrix:
chaos-mode: [random-500, random-429]
app:
- name: "nextjs-turbopack"
project-id: "prj_yjkM7UdHliv8bfxZ1sMJQf1pMpdi"
project-slug: "example-nextjs-workflow-turbopack"
- name: "hono"
project-id: "prj_p0GIEsfl53L7IwVbosPvi9rPSOYW"
project-slug: "workbench-hono-workflow"
env:
TURBO_TOKEN: ${{ secrets.TURBO_TOKEN }}
TURBO_TEAM: ${{ vars.TURBO_TEAM }}
WORKFLOW_PUBLIC_MANIFEST: '1'
WORKFLOW_CHAOS: ${{ matrix.chaos-mode }}
WORKFLOW_CHAOS_SEED: ${{ github.sha }}
steps:
- name: Checkout Repo
uses: actions/checkout@v4
with:
ref: ${{ github.event.pull_request.head.sha || github.sha }}

- name: Setup environment
uses: ./.github/actions/setup-workflow-dev
with:
build-packages: 'false'

- name: Build CLI
run: pnpm turbo run build --filter='@workflow/cli'

- name: Waiting for the Vercel deployment
id: waitForDeployment
uses: ./.github/actions/wait-for-vercel-project
with:
team-id: "team_nO2mCG4W8IxPIeKoSsqwAxxB"
project-id: ${{ matrix.app.project-id }}
vercel-token: ${{ secrets.VERCEL_LABS_TOKEN }}
timeout: 1000
check-interval: 15
environment: ${{ github.ref == 'refs/heads/main' && 'production' || 'preview' }}

- name: Run E2E Tests (Chaos)
run: pnpm run test:e2e --reporter=default
env:
NODE_OPTIONS: "--enable-source-maps"
DEPLOYMENT_URL: ${{ steps.waitForDeployment.outputs.deployment-url }}
VERCEL_DEPLOYMENT_ID: ${{ steps.waitForDeployment.outputs.deployment-id }}
APP_NAME: ${{ matrix.app.name }}
WORKFLOW_VERCEL_ENV: ${{ github.ref == 'refs/heads/main' && 'production' || 'preview' }}
WORKFLOW_VERCEL_AUTH_TOKEN: ${{ secrets.VERCEL_LABS_TOKEN }}
WORKFLOW_VERCEL_TEAM: "team_nO2mCG4W8IxPIeKoSsqwAxxB"
WORKFLOW_VERCEL_PROJECT: ${{ matrix.app.project-id }}
WORKFLOW_VERCEL_PROJECT_SLUG: ${{ matrix.app.project-slug }}
VERCEL_AUTOMATION_BYPASS_SECRET: ${{ secrets.VERCEL_AUTOMATION_BYPASS_SECRET }}

build-error-messages:
name: Node.js Module Build Errors Test
runs-on: ubuntu-latest
Expand Down
606 changes: 311 additions & 295 deletions packages/core/src/runtime.ts

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions packages/core/src/runtime/resume-hook.ts
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ export async function resumeHook<T = any>(
// attach the trace carrier from the workflow run
traceCarrier:
workflowRun.executionContext?.traceCarrier ?? undefined,
// propagate chaos mode from the run's execution context
chaos: workflowRun.executionContext?.chaos ?? undefined,
} satisfies WorkflowInvokePayload,
{
deploymentId: workflowRun.deploymentId,
Expand Down
258 changes: 137 additions & 121 deletions packages/core/src/runtime/start.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import { waitUntil } from '@vercel/functions';
import { WorkflowRuntimeError } from '@workflow/errors';
import type { WorkflowInvokePayload, World } from '@workflow/world';
import { isLegacySpecVersion, SPEC_VERSION_CURRENT } from '@workflow/world';
import {
isLegacySpecVersion,
requestContext,
SPEC_VERSION_CURRENT,
} from '@workflow/world';
import { monotonicFactory } from 'ulid';
import { importKey } from '../encryption.js';
import type { Serializable } from '../schemas.js';
Expand Down Expand Up @@ -79,141 +83,153 @@ export async function start<TArgs extends unknown[], TResult>(
argsOrOptions?: TArgs | StartOptions,
options?: StartOptions
) {
return await waitedUntil(() => {
// @ts-expect-error this field is added by our client transform
const workflowName = workflow?.workflowId;

if (!workflowName) {
throw new WorkflowRuntimeError(
`'start' received an invalid workflow function. Ensure the Workflow Development Kit is configured correctly and the function includes a 'use workflow' directive.`,
{ slug: 'start-invalid-workflow-function' }
);
}

return trace(`workflow.start ${workflowName}`, async (span) => {
span?.setAttributes({
...Attribute.WorkflowName(workflowName),
...Attribute.WorkflowOperation('start'),
});
// Read chaos config from environment (set by CI / test runner)
const chaos = process.env.WORKFLOW_CHAOS || undefined;
const chaosSeed = process.env.WORKFLOW_CHAOS_SEED || undefined;
const chaosCtx = chaos || chaosSeed ? { chaos, chaosSeed } : {};

let args: Serializable[] = [];
let opts: StartOptions = options ?? {};
if (Array.isArray(argsOrOptions)) {
args = argsOrOptions as Serializable[];
} else if (typeof argsOrOptions === 'object') {
opts = argsOrOptions;
}
return await requestContext.run(chaosCtx, () =>
waitedUntil(() => {
// @ts-expect-error this field is added by our client transform
const workflowName = workflow?.workflowId;

span?.setAttributes({
...Attribute.WorkflowArgumentsCount(args.length),
});
if (!workflowName) {
throw new WorkflowRuntimeError(
`'start' received an invalid workflow function. Ensure the Workflow Development Kit is configured correctly and the function includes a 'use workflow' directive.`,
{ slug: 'start-invalid-workflow-function' }
);
}

const world = opts?.world ?? getWorld();
let deploymentId = opts.deploymentId ?? (await world.getDeploymentId());
return trace(`workflow.start ${workflowName}`, async (span) => {
span?.setAttributes({
...Attribute.WorkflowName(workflowName),
...Attribute.WorkflowOperation('start'),
});

let args: Serializable[] = [];
let opts: StartOptions = options ?? {};
if (Array.isArray(argsOrOptions)) {
args = argsOrOptions as Serializable[];
} else if (typeof argsOrOptions === 'object') {
opts = argsOrOptions;
}

// When 'latest' is requested, resolve the actual latest deployment ID
// for the current deployment's environment (same production target or
// same git branch for preview deployments).
if (deploymentId === 'latest') {
if (!world.resolveLatestDeploymentId) {
throw new WorkflowRuntimeError(
"deploymentId 'latest' requires a World that implements resolveLatestDeploymentId()"
);
span?.setAttributes({
...Attribute.WorkflowArgumentsCount(args.length),
});

const world = opts?.world ?? getWorld();
let deploymentId = opts.deploymentId ?? (await world.getDeploymentId());

// When 'latest' is requested, resolve the actual latest deployment ID
// for the current deployment's environment (same production target or
// same git branch for preview deployments).
if (deploymentId === 'latest') {
if (!world.resolveLatestDeploymentId) {
throw new WorkflowRuntimeError(
"deploymentId 'latest' requires a World that implements resolveLatestDeploymentId()"
);
}
deploymentId = await world.resolveLatestDeploymentId();
}
deploymentId = await world.resolveLatestDeploymentId();
}

const ops: Promise<void>[] = [];
const ops: Promise<void>[] = [];

// Generate runId client-side so we have it before serialization
// (required for future E2E encryption where runId is part of the encryption context)
const runId = `wrun_${ulid()}`;
// Generate runId client-side so we have it before serialization
// (required for future E2E encryption where runId is part of the encryption context)
const runId = `wrun_${ulid()}`;

// Serialize current trace context to propagate across queue boundary
const traceCarrier = await serializeTraceCarrier();
// Serialize current trace context to propagate across queue boundary
const traceCarrier = await serializeTraceCarrier();

const specVersion = opts.specVersion ?? SPEC_VERSION_CURRENT;
const v1Compat = isLegacySpecVersion(specVersion);
const specVersion = opts.specVersion ?? SPEC_VERSION_CURRENT;
const v1Compat = isLegacySpecVersion(specVersion);

// Resolve encryption key for the new run. The runId has already been
// generated above (client-generated ULID) and will be used for both
// key derivation and the run_created event. The World implementation
// uses the runId for per-run HKDF key derivation. We pass the resolved
// deploymentId (not just the raw opts) so the World can use it for
// key resolution even when deploymentId was inferred from the environment
// rather than explicitly provided in opts (e.g., in e2e test runners).
const rawKey = await world.getEncryptionKeyForRun?.(runId, {
...opts,
deploymentId,
});
const encryptionKey = rawKey ? await importKey(rawKey) : undefined;

// Create run via run_created event (event-sourced architecture)
// Pass client-generated runId - server will accept and use it
const workflowArguments = await dehydrateWorkflowArguments(
args,
runId,
encryptionKey,
ops,
globalThis,
v1Compat
);
const result = await world.events.create(
runId,
{
eventType: 'run_created',
specVersion,
eventData: {
deploymentId: deploymentId,
workflowName: workflowName,
input: workflowArguments,
executionContext: { traceCarrier, workflowCoreVersion },
},
},
{ v1Compat }
);
// Resolve encryption key for the new run. The runId has already been
// generated above (client-generated ULID) and will be used for both
// key derivation and the run_created event. The World implementation
// uses the runId for per-run HKDF key derivation. We pass the resolved
// deploymentId (not just the raw opts) so the World can use it for
// key resolution even when deploymentId was inferred from the environment
// rather than explicitly provided in opts (e.g., in e2e test runners).
const rawKey = await world.getEncryptionKeyForRun?.(runId, {
...opts,
deploymentId,
});
const encryptionKey = rawKey ? await importKey(rawKey) : undefined;

// Assert that the run was created
if (!result.run) {
throw new WorkflowRuntimeError(
"Missing 'run' in server response for 'run_created' event"
// Create run via run_created event (event-sourced architecture)
// Pass client-generated runId - server will accept and use it
const workflowArguments = await dehydrateWorkflowArguments(
args,
runId,
encryptionKey,
ops,
globalThis,
v1Compat
);
}

// Verify server accepted our runId
if (!v1Compat && result.run.runId !== runId) {
throw new WorkflowRuntimeError(
`Server returned different runId than requested: expected ${runId}, got ${result.run.runId}`
const result = await world.events.create(
runId,
{
eventType: 'run_created',
specVersion,
eventData: {
deploymentId: deploymentId,
workflowName: workflowName,
input: workflowArguments,
executionContext: {
traceCarrier,
workflowCoreVersion,
...(chaos ? { chaos } : {}),
},
},
},
{ v1Compat }
);
}

waitUntil(
Promise.all(ops).catch((err) => {
// Ignore expected client disconnect errors (e.g., browser refresh during streaming)
const isAbortError =
err?.name === 'AbortError' || err?.name === 'ResponseAborted';
if (!isAbortError) throw err;
})
);

span?.setAttributes({
...Attribute.WorkflowRunId(runId),
...Attribute.WorkflowRunStatus(result.run.status),
...Attribute.DeploymentId(deploymentId),
});
// Assert that the run was created
if (!result.run) {
throw new WorkflowRuntimeError(
"Missing 'run' in server response for 'run_created' event"
);
}

await world.queue(
getWorkflowQueueName(workflowName),
{
runId,
traceCarrier,
} satisfies WorkflowInvokePayload,
{
deploymentId,
// Verify server accepted our runId
if (!v1Compat && result.run.runId !== runId) {
throw new WorkflowRuntimeError(
`Server returned different runId than requested: expected ${runId}, got ${result.run.runId}`
);
}
);

return new Run<TResult>(runId);
});
});
waitUntil(
Promise.all(ops).catch((err) => {
// Ignore expected client disconnect errors (e.g., browser refresh during streaming)
const isAbortError =
err?.name === 'AbortError' || err?.name === 'ResponseAborted';
if (!isAbortError) throw err;
})
);

span?.setAttributes({
...Attribute.WorkflowRunId(runId),
...Attribute.WorkflowRunStatus(result.run.status),
...Attribute.DeploymentId(deploymentId),
});

await world.queue(
getWorkflowQueueName(workflowName),
{
runId,
traceCarrier,
...(chaos ? { chaos } : {}),
} satisfies WorkflowInvokePayload,
{
deploymentId,
}
);

return new Run<TResult>(runId);
});
})
);
}
Loading
Loading