-
Notifications
You must be signed in to change notification settings - Fork 215
Expand file tree
/
Copy pathruntime.ts
More file actions
455 lines (420 loc) · 17.8 KB
/
runtime.ts
File metadata and controls
455 lines (420 loc) · 17.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
import { WorkflowAPIError, WorkflowRuntimeError } from '@workflow/errors';
import { parseWorkflowName } from '@workflow/utils/parse-name';
import {
type Event,
SPEC_VERSION_CURRENT,
WorkflowInvokePayloadSchema,
type WorkflowRun,
} from '@workflow/world';
import { importKey } from './encryption.js';
import { WorkflowSuspension } from './global.js';
import { runtimeLogger } from './logger.js';
import {
getAllWorkflowRunEvents,
getQueueOverhead,
handleHealthCheckMessage,
parseHealthCheckPayload,
withHealthCheck,
} from './runtime/helpers.js';
import { handleSuspension } from './runtime/suspension-handler.js';
import { getWorld, getWorldHandlers } from './runtime/world.js';
import { remapErrorStack } from './source-map.js';
import * as Attribute from './telemetry/semantic-conventions.js';
import {
linkToCurrentContext,
trace,
withTraceContext,
withWorkflowBaggage,
} from './telemetry.js';
import { getErrorName, getErrorStack, normalizeUnknownError } from './types.js';
import { buildWorkflowSuspensionMessage } from './util.js';
import { runWorkflow } from './workflow.js';
export type { Event, WorkflowRun };
export { WorkflowSuspension } from './global.js';
export {
type HealthCheckEndpoint,
type HealthCheckOptions,
type HealthCheckResult,
healthCheck,
} from './runtime/helpers.js';
export {
getHookByToken,
resumeHook,
resumeWebhook,
} from './runtime/resume-hook.js';
export {
getRun,
Run,
type WorkflowReadableStreamOptions,
} from './runtime/run.js';
export {
cancelRun,
listStreams,
type ReadStreamOptions,
type RecreateRunOptions,
readStream,
recreateRunFromExisting,
reenqueueRun,
type StopSleepOptions,
type StopSleepResult,
wakeUpRun,
} from './runtime/runs.js';
export { type StartOptions, start } from './runtime/start.js';
export { stepEntrypoint } from './runtime/step-handler.js';
export {
createWorld,
getWorld,
getWorldHandlers,
setWorld,
} from './runtime/world.js';
/**
* Function that creates a single route which handles any workflow execution
* request and routes to the appropriate workflow function.
*
* @param workflowCode - The workflow bundle code containing all the workflow
* functions at the top level.
* @returns A function that can be used as a Vercel API route.
*/
export function workflowEntrypoint(
workflowCode: string
): (req: Request) => Promise<Response> {
const handler = getWorldHandlers().createQueueHandler(
'__wkf_workflow_',
async (message_, metadata) => {
// Check if this is a health check message
// NOTE: Health check messages are intentionally unauthenticated for monitoring purposes.
// They only write a simple status response to a stream and do not expose sensitive data.
// The stream name includes a unique correlationId that must be known by the caller.
const healthCheck = parseHealthCheckPayload(message_);
if (healthCheck) {
await handleHealthCheckMessage(healthCheck, 'workflow');
return;
}
const {
runId,
traceCarrier: traceContext,
requestedAt,
} = WorkflowInvokePayloadSchema.parse(message_);
// Extract the workflow name from the topic name
const workflowName = metadata.queueName.slice('__wkf_workflow_'.length);
const spanLinks = await linkToCurrentContext();
// Invoke user workflow within the propagated trace context and baggage
return await withTraceContext(traceContext, async () => {
// Set workflow context as baggage for automatic propagation
return await withWorkflowBaggage(
{ workflowRunId: runId, workflowName },
async () => {
const world = getWorld();
return trace(
`WORKFLOW ${workflowName}`,
{ links: spanLinks },
async (span) => {
span?.setAttributes({
...Attribute.WorkflowName(workflowName),
...Attribute.WorkflowOperation('execute'),
// Standard OTEL messaging conventions
...Attribute.MessagingSystem('vercel-queue'),
...Attribute.MessagingDestinationName(metadata.queueName),
...Attribute.MessagingMessageId(metadata.messageId),
...Attribute.MessagingOperationType('process'),
...getQueueOverhead({ requestedAt }),
});
// TODO: validate `workflowName` exists before consuming message?
span?.setAttributes({
...Attribute.WorkflowRunId(runId),
...Attribute.WorkflowTracePropagated(!!traceContext),
});
let workflowStartedAt = -1;
let workflowRun = await world.runs.get(runId);
// --- Infrastructure: prepare the run state ---
// Network/server errors propagate to the queue handler for retry.
// WorkflowRuntimeError (data integrity issues) are fatal and
// produce run_failed since retrying won't fix them.
try {
if (workflowRun.status === 'pending') {
// Transition run to 'running' via event (event-sourced architecture)
const result = await world.events.create(runId, {
eventType: 'run_started',
specVersion: SPEC_VERSION_CURRENT,
});
// Use the run entity from the event response (no extra get call needed)
if (!result.run) {
throw new WorkflowRuntimeError(
`Event creation for 'run_started' did not return the run entity for run "${runId}"`
);
}
workflowRun = result.run;
}
// At this point, the workflow is "running" and `startedAt` should
// definitely be set.
if (!workflowRun.startedAt) {
throw new WorkflowRuntimeError(
`Workflow run "${runId}" has no "startedAt" timestamp`
);
}
} catch (err) {
if (err instanceof WorkflowRuntimeError) {
runtimeLogger.error(
'Fatal runtime error during workflow setup',
{ workflowRunId: runId, error: err.message }
);
try {
await world.events.create(runId, {
eventType: 'run_failed',
specVersion: SPEC_VERSION_CURRENT,
eventData: {
error: {
message: err.message,
stack: err.stack,
},
},
});
} catch (failErr) {
if (
WorkflowAPIError.is(failErr) &&
(failErr.status === 409 || failErr.status === 410)
) {
return;
}
throw failErr;
}
return;
}
throw err;
}
workflowStartedAt = +workflowRun.startedAt;
span?.setAttributes({
...Attribute.WorkflowRunStatus(workflowRun.status),
...Attribute.WorkflowStartedAt(workflowStartedAt),
});
if (workflowRun.status !== 'running') {
// Workflow has already completed or failed, so we can skip it
runtimeLogger.info(
'Workflow already completed or failed, skipping',
{
workflowRunId: runId,
status: workflowRun.status,
}
);
// TODO: for `cancel`, we actually want to propagate a WorkflowCancelled event
// inside the workflow context so the user can gracefully exit. this is SIGTERM
// TODO: furthermore, there should be a timeout or a way to force cancel SIGKILL
// so that we actually exit here without replaying the workflow at all, in the case
// the replaying the workflow is itself failing.
return;
}
// Load all events into memory before running
let events = await getAllWorkflowRunEvents(workflowRun.runId);
// Check for any elapsed waits and create wait_completed events
const now = Date.now();
// Pre-compute completed correlation IDs for O(n) lookup instead of O(n²)
const completedWaitIds = new Set(
events
.filter((e) => e.eventType === 'wait_completed')
.map((e) => e.correlationId)
);
// Collect all waits that need completion
const waitsToComplete = events
.filter(
(e): e is typeof e & { correlationId: string } =>
e.eventType === 'wait_created' &&
e.correlationId !== undefined &&
!completedWaitIds.has(e.correlationId) &&
now >= (e.eventData.resumeAt as Date).getTime()
)
.map((e) => ({
eventType: 'wait_completed' as const,
specVersion: SPEC_VERSION_CURRENT,
correlationId: e.correlationId,
}));
// Create all wait_completed events.
// If any creation returns 409, a concurrent invocation already
// created the event. Re-fetch the full event log to get the
// authoritative ordering rather than appending locally.
let needsRefetch = false;
for (const waitEvent of waitsToComplete) {
try {
const result = await world.events.create(runId, waitEvent);
// Add the event to the events array so the workflow can see it
events.push(result.event!);
} catch (err) {
if (WorkflowAPIError.is(err) && err.status === 409) {
runtimeLogger.info('Wait already completed, skipping', {
workflowRunId: runId,
correlationId: waitEvent.correlationId,
});
needsRefetch = true;
continue;
}
throw err;
}
}
// Re-fetch the event log if a concurrent invocation created
// events we don't have locally, ensuring correct ordering.
if (needsRefetch) {
events = await getAllWorkflowRunEvents(workflowRun.runId);
}
// Resolve the encryption key for this run's deployment
const rawKey =
await world.getEncryptionKeyForRun?.(workflowRun);
const encryptionKey = rawKey
? await importKey(rawKey)
: undefined;
// --- User code execution ---
// Only errors from runWorkflow() (user workflow code) should
// produce run_failed. Infrastructure errors (network, server)
// must propagate to the queue handler for automatic retry.
let workflowResult: unknown;
try {
workflowResult = await trace(
'workflow.replay',
{},
async (replaySpan) => {
replaySpan?.setAttributes({
...Attribute.WorkflowEventsCount(events.length),
});
return await runWorkflow(
workflowCode,
workflowRun,
events,
encryptionKey
);
}
);
} catch (err) {
// WorkflowSuspension is normal control flow — not an error
if (WorkflowSuspension.is(err)) {
const suspensionMessage = buildWorkflowSuspensionMessage(
runId,
err.stepCount,
err.hookCount,
err.waitCount
);
if (suspensionMessage) {
runtimeLogger.debug(suspensionMessage);
}
const result = await handleSuspension({
suspension: err,
world,
run: workflowRun,
span,
});
if (result.timeoutSeconds !== undefined) {
return { timeoutSeconds: result.timeoutSeconds };
}
// Suspension handled, no further work needed
return;
}
// This is a user code error or a WorkflowRuntimeError
// (e.g., corrupted event log). Fail the workflow run.
// Record exception for OTEL error tracking
if (err instanceof Error) {
span?.recordException?.(err);
}
const normalizedError = await normalizeUnknownError(err);
const errorName = normalizedError.name || getErrorName(err);
const errorMessage = normalizedError.message;
let errorStack = normalizedError.stack || getErrorStack(err);
// Remap error stack using source maps to show original source locations
if (errorStack) {
const parsedName = parseWorkflowName(workflowName);
const filename =
parsedName?.moduleSpecifier || workflowName;
errorStack = remapErrorStack(
errorStack,
filename,
workflowCode
);
}
runtimeLogger.error('Error while running workflow', {
workflowRunId: runId,
errorName,
errorStack,
});
// Fail the workflow run via event (event-sourced architecture)
try {
await world.events.create(runId, {
eventType: 'run_failed',
specVersion: SPEC_VERSION_CURRENT,
eventData: {
error: {
message: errorMessage,
stack: errorStack,
},
// TODO: include error codes when we define them
},
});
} catch (failErr) {
if (
WorkflowAPIError.is(failErr) &&
(failErr.status === 409 || failErr.status === 410)
) {
runtimeLogger.warn(
'Tried failing workflow run, but run has already finished.',
{
workflowRunId: runId,
message: failErr.message,
}
);
span?.setAttributes({
...Attribute.WorkflowErrorName(errorName),
...Attribute.WorkflowErrorMessage(errorMessage),
...Attribute.ErrorType(errorName),
});
return;
} else {
throw failErr;
}
}
span?.setAttributes({
...Attribute.WorkflowRunStatus('failed'),
...Attribute.WorkflowErrorName(errorName),
...Attribute.WorkflowErrorMessage(errorMessage),
...Attribute.ErrorType(errorName),
});
return;
}
// --- Infrastructure: complete the run ---
// This is outside the user-code try/catch so that failures
// here (e.g., network errors) propagate to the queue handler.
try {
await world.events.create(runId, {
eventType: 'run_completed',
specVersion: SPEC_VERSION_CURRENT,
eventData: {
output: workflowResult,
},
});
} catch (err) {
if (
WorkflowAPIError.is(err) &&
(err.status === 409 || err.status === 410)
) {
runtimeLogger.warn(
'Tried completing workflow run, but run has already finished.',
{
workflowRunId: runId,
message: err.message,
}
);
return;
} else {
throw err;
}
}
span?.setAttributes({
...Attribute.WorkflowRunStatus('completed'),
...Attribute.WorkflowEventsCount(events.length),
});
}
); // End trace
}
); // End withWorkflowBaggage
}); // End withTraceContext
}
);
return withHealthCheck(handler);
}
// this is a no-op placeholder as the client is
// expecting this to be present but we aren't actually using it
export function runStep() {}