-
Notifications
You must be signed in to change notification settings - Fork 216
Expand file tree
/
Copy pathstart.ts
More file actions
248 lines (217 loc) · 8.6 KB
/
start.ts
File metadata and controls
248 lines (217 loc) · 8.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
import { waitUntil } from '@vercel/functions';
import { WorkflowRuntimeError } from '@workflow/errors';
import type { WorkflowInvokePayload, World } from '@workflow/world';
import { isLegacySpecVersion, SPEC_VERSION_CURRENT } from '@workflow/world';
import { monotonicFactory } from 'ulid';
import { importKey } from '../encryption.js';
import type { Serializable } from '../schemas.js';
import { dehydrateWorkflowArguments } from '../serialization.js';
import * as Attribute from '../telemetry/semantic-conventions.js';
import { serializeTraceCarrier, trace } from '../telemetry.js';
import { waitedUntil } from '../util.js';
import { version as workflowCoreVersion } from '../version.js';
import { getWorkflowQueueName } from './helpers.js';
import { Run } from './run.js';
import { getWorld } from './world.js';
/** ULID generator for client-side runId generation */
const ulid = monotonicFactory();
export interface StartOptionsBase {
/**
* The world to use for the workflow run creation,
* by default the world is inferred from the environment variables.
*/
world?: World;
/**
* The spec version to use for the workflow run. Defaults to the latest version.
*/
specVersion?: number;
}
export interface StartOptionsWithDeploymentId extends StartOptionsBase {
/**
* The deployment ID to use for the workflow run.
*
* Set to `'latest'` to automatically resolve the most recent deployment
* for the current environment (same production target or git branch).
* Requires a World that implements `resolveLatestDeploymentId()`.
*
* @deprecated This property should not be set in user code under normal circumstances.
* It is automatically inferred from environment variables when deploying to Vercel.
* Only set this if you are doing something advanced and know what you are doing.
*
* **Note:** When `deploymentId` is provided, the argument and return types become `unknown`
* since there is no guarantee the types will be consistent across deployments.
*/
deploymentId: 'latest' | (string & {});
}
export interface StartOptionsWithoutDeploymentId extends StartOptionsBase {
deploymentId?: undefined;
}
/**
* Options for starting a workflow run.
*/
export type StartOptions =
| StartOptionsWithDeploymentId
| StartOptionsWithoutDeploymentId;
/**
* Represents an imported workflow function.
*/
export type WorkflowFunction<TArgs extends unknown[], TResult> = (
...args: TArgs
) => Promise<TResult>;
/**
* Represents the generated metadata of a workflow function.
*/
export type WorkflowMetadata = { workflowId: string };
/**
* Starts a workflow run.
*
* @param workflow - The imported workflow function to start.
* @param args - The arguments to pass to the workflow (optional).
* @param options - The options for the workflow run (optional).
* @returns The unique run ID for the newly started workflow invocation.
*/
// Overloads with deploymentId - args and return type become unknown
export function start(
workflow: WorkflowFunction<unknown[], unknown> | WorkflowMetadata,
args: unknown[],
options: StartOptionsWithDeploymentId
): Promise<Run<unknown>>;
export function start(
workflow: WorkflowFunction<unknown[], unknown> | WorkflowMetadata,
options: StartOptionsWithDeploymentId
): Promise<Run<unknown>>;
// Overloads without deploymentId - preserve type inference
export function start<TArgs extends unknown[], TResult>(
workflow: WorkflowFunction<TArgs, TResult> | WorkflowMetadata,
args: TArgs,
options?: StartOptionsWithoutDeploymentId
): Promise<Run<TResult>>;
export function start<TResult>(
workflow: WorkflowFunction<[], TResult> | WorkflowMetadata,
options?: StartOptionsWithoutDeploymentId
): Promise<Run<TResult>>;
export async function start<TArgs extends unknown[], TResult>(
workflow: WorkflowFunction<TArgs, TResult> | WorkflowMetadata,
argsOrOptions?: TArgs | StartOptions,
options?: StartOptions
) {
return await waitedUntil(() => {
// @ts-expect-error this field is added by our client transform
const workflowName = workflow?.workflowId;
if (!workflowName) {
throw new WorkflowRuntimeError(
`'start' received an invalid workflow function. Ensure the Workflow Development Kit is configured correctly and the function includes a 'use workflow' directive.`,
{ slug: 'start-invalid-workflow-function' }
);
}
return trace(`workflow.start ${workflowName}`, async (span) => {
span?.setAttributes({
...Attribute.WorkflowName(workflowName),
...Attribute.WorkflowOperation('start'),
});
let args: Serializable[] = [];
let opts: StartOptions = options ?? {};
if (Array.isArray(argsOrOptions)) {
args = argsOrOptions as Serializable[];
} else if (typeof argsOrOptions === 'object') {
opts = argsOrOptions;
}
span?.setAttributes({
...Attribute.WorkflowArgumentsCount(args.length),
});
const world = opts?.world ?? getWorld();
let deploymentId = opts.deploymentId ?? (await world.getDeploymentId());
// When 'latest' is requested, resolve the actual latest deployment ID
// for the current deployment's environment (same production target or
// same git branch for preview deployments).
if (deploymentId === 'latest') {
if (!world.resolveLatestDeploymentId) {
throw new WorkflowRuntimeError(
"deploymentId 'latest' requires a World that implements resolveLatestDeploymentId()"
);
}
deploymentId = await world.resolveLatestDeploymentId();
}
const ops: Promise<void>[] = [];
// Generate runId client-side so we have it before serialization
// (required for future E2E encryption where runId is part of the encryption context)
const runId = `wrun_${ulid()}`;
// Serialize current trace context to propagate across queue boundary
const traceCarrier = await serializeTraceCarrier();
const specVersion = opts.specVersion ?? SPEC_VERSION_CURRENT;
const v1Compat = isLegacySpecVersion(specVersion);
// Resolve encryption key for the new run. The runId has already been
// generated above (client-generated ULID) and will be used for both
// key derivation and the run_created event. The World implementation
// uses the runId for per-run HKDF key derivation. We pass the resolved
// deploymentId (not just the raw opts) so the World can use it for
// key resolution even when deploymentId was inferred from the environment
// rather than explicitly provided in opts (e.g., in e2e test runners).
const rawKey = await world.getEncryptionKeyForRun?.(runId, {
...opts,
deploymentId,
});
const encryptionKey = rawKey ? await importKey(rawKey) : undefined;
// Create run via run_created event (event-sourced architecture)
// Pass client-generated runId - server will accept and use it
const workflowArguments = await dehydrateWorkflowArguments(
args,
runId,
encryptionKey,
ops,
globalThis,
v1Compat
);
const result = await world.events.create(
runId,
{
eventType: 'run_created',
specVersion,
eventData: {
deploymentId: deploymentId,
workflowName: workflowName,
input: workflowArguments,
executionContext: { traceCarrier, workflowCoreVersion },
},
},
{ v1Compat }
);
// Assert that the run was created
if (!result.run) {
throw new WorkflowRuntimeError(
"Missing 'run' in server response for 'run_created' event"
);
}
// Verify server accepted our runId
if (!v1Compat && result.run.runId !== runId) {
throw new WorkflowRuntimeError(
`Server returned different runId than requested: expected ${runId}, got ${result.run.runId}`
);
}
waitUntil(
Promise.all(ops).catch((err) => {
// Ignore expected client disconnect errors (e.g., browser refresh during streaming)
const isAbortError =
err?.name === 'AbortError' || err?.name === 'ResponseAborted';
if (!isAbortError) throw err;
})
);
span?.setAttributes({
...Attribute.WorkflowRunId(runId),
...Attribute.WorkflowRunStatus(result.run.status),
...Attribute.DeploymentId(deploymentId),
});
await world.queue(
getWorkflowQueueName(workflowName),
{
runId,
traceCarrier,
} satisfies WorkflowInvokePayload,
{
deploymentId,
}
);
return new Run<TResult>(runId);
});
});
}