diff --git a/.changeset/fix-concurrent-wait-completed.md b/.changeset/fix-concurrent-wait-completed.md new file mode 100644 index 0000000000..deed1e0669 --- /dev/null +++ b/.changeset/fix-concurrent-wait-completed.md @@ -0,0 +1,5 @@ +--- +'@workflow/world-local': patch +--- + +Fix concurrent `wait_completed` race condition that caused duplicate events and `Unconsumed event` errors during replay diff --git a/.changeset/fix-wait-completed-event-refetch.md b/.changeset/fix-wait-completed-event-refetch.md new file mode 100644 index 0000000000..23abaddfb8 --- /dev/null +++ b/.changeset/fix-wait-completed-event-refetch.md @@ -0,0 +1,6 @@ +--- +'@workflow/core': patch +'workflow': patch +--- + +Re-fetch event log on `wait_completed` 409 conflict to ensure correct event ordering diff --git a/packages/core/src/runtime.ts b/packages/core/src/runtime.ts index ecbbfc85b2..fcbb1e33f1 100644 --- a/packages/core/src/runtime.ts +++ b/packages/core/src/runtime.ts @@ -218,7 +218,7 @@ export function workflowEntrypoint( } // Load all events into memory before running - const events = await getAllWorkflowRunEvents(workflowRun.runId); + let events = await getAllWorkflowRunEvents(workflowRun.runId); // Check for any elapsed waits and create wait_completed events const now = Date.now(); @@ -245,7 +245,11 @@ export function workflowEntrypoint( correlationId: e.correlationId, })); - // Create all wait_completed events + // Create all wait_completed events. + // If any creation returns 409, a concurrent invocation already + // created the event. Re-fetch the full event log to get the + // authoritative ordering rather than appending locally. + let needsRefetch = false; for (const waitEvent of waitsToComplete) { try { const result = await world.events.create(runId, waitEvent); @@ -257,12 +261,19 @@ export function workflowEntrypoint( workflowRunId: runId, correlationId: waitEvent.correlationId, }); + needsRefetch = true; continue; } throw err; } } + // Re-fetch the event log if a concurrent invocation created + // events we don't have locally, ensuring correct ordering. + if (needsRefetch) { + events = await getAllWorkflowRunEvents(workflowRun.runId); + } + // Resolve the encryption key for this run's deployment const rawKey = await world.getEncryptionKeyForRun?.(workflowRun); diff --git a/packages/world-local/src/storage.test.ts b/packages/world-local/src/storage.test.ts index 641ad2c623..46db0205dd 100644 --- a/packages/world-local/src/storage.test.ts +++ b/packages/world-local/src/storage.test.ts @@ -12,9 +12,11 @@ import { afterEach, beforeEach, describe, expect, it } from 'vitest'; import { writeJSON } from './fs.js'; import { createStorage } from './storage.js'; import { + completeWait, createHook, createRun, createStep, + createWait, disposeHook, updateRun, updateStep, @@ -2666,6 +2668,102 @@ describe('Storage', () => { }); }); + describe('waits', () => { + it('should create and complete a wait', async () => { + const run = await createRun(storage, { + deploymentId: 'dep-1', + workflowName: 'wf-1', + input: new Uint8Array(), + }); + await updateRun(storage, run.runId, 'run_started'); + + const wait = await createWait(storage, run.runId, { + waitId: 'wait_001', + resumeAt: new Date('2099-01-01'), + }); + expect(wait.status).toBe('waiting'); + + const completed = await completeWait(storage, run.runId, 'wait_001'); + expect(completed.status).toBe('completed'); + }); + + it('should reject duplicate wait_created for the same correlationId', async () => { + const run = await createRun(storage, { + deploymentId: 'dep-1', + workflowName: 'wf-1', + input: new Uint8Array(), + }); + await updateRun(storage, run.runId, 'run_started'); + + await createWait(storage, run.runId, { + waitId: 'wait_dup', + resumeAt: new Date('2099-01-01'), + }); + + await expect( + createWait(storage, run.runId, { + waitId: 'wait_dup', + resumeAt: new Date('2099-01-01'), + }) + ).rejects.toThrow(/already exists/i); + }); + + it('should reject duplicate wait_completed for the same correlationId', async () => { + const run = await createRun(storage, { + deploymentId: 'dep-1', + workflowName: 'wf-1', + input: new Uint8Array(), + }); + await updateRun(storage, run.runId, 'run_started'); + + await createWait(storage, run.runId, { + waitId: 'wait_once', + resumeAt: new Date('2099-01-01'), + }); + + await completeWait(storage, run.runId, 'wait_once'); + + // Second completion should be rejected with 409 + await expect( + completeWait(storage, run.runId, 'wait_once') + ).rejects.toThrow(/already completed/i); + }); + + it('should reject concurrent wait_completed for the same correlationId', async () => { + const run = await createRun(storage, { + deploymentId: 'dep-1', + workflowName: 'wf-1', + input: new Uint8Array(), + }); + await updateRun(storage, run.runId, 'run_started'); + + await createWait(storage, run.runId, { + waitId: 'wait_race', + resumeAt: new Date('2099-01-01'), + }); + + // Simulate two concurrent completions racing + const results = await Promise.allSettled([ + completeWait(storage, run.runId, 'wait_race'), + completeWait(storage, run.runId, 'wait_race'), + ]); + + const fulfilled = results.filter((r) => r.status === 'fulfilled'); + const rejected = results.filter((r) => r.status === 'rejected'); + + // Exactly one should succeed, one should fail with 409 + expect(fulfilled).toHaveLength(1); + expect(rejected).toHaveLength(1); + expect((rejected[0] as PromiseRejectedResult).reason).toBeInstanceOf( + WorkflowAPIError + ); + expect( + ((rejected[0] as PromiseRejectedResult).reason as WorkflowAPIError) + .status + ).toBe(409); + }); + }); + describe('custom runId validation', () => { const runCreatedEvent = { eventType: 'run_created' as const, diff --git a/packages/world-local/src/storage/events-storage.ts b/packages/world-local/src/storage/events-storage.ts index 56d636aed0..03616cb24a 100644 --- a/packages/world-local/src/storage/events-storage.ts +++ b/packages/world-local/src/storage/events-storage.ts @@ -718,8 +718,23 @@ export function createEventsStorage( wait ); } else if (data.eventType === 'wait_completed') { - // wait_completed: Transitions wait to 'completed', rejects duplicates + // wait_completed: Transitions wait to 'completed', rejects duplicates. + // Uses writeExclusive on a lock file to atomically prevent concurrent + // invocations from both completing the same wait (TOCTOU race). const waitCompositeKey = `${effectiveRunId}-${data.correlationId}`; + const lockPath = taggedPath( + basedir, + 'waits', + `${waitCompositeKey}.completed`, + tag + ); + const claimed = await writeExclusive(lockPath, ''); + if (!claimed) { + throw new WorkflowAPIError( + `Wait "${data.correlationId}" already completed`, + { status: 409 } + ); + } const existingWait = await readJSONWithFallback( basedir, 'waits', @@ -732,12 +747,6 @@ export function createEventsStorage( status: 404, }); } - if (existingWait.status === 'completed') { - throw new WorkflowAPIError( - `Wait "${data.correlationId}" already completed`, - { status: 409 } - ); - } wait = { ...existingWait, status: 'completed', diff --git a/packages/world-local/src/test-helpers.ts b/packages/world-local/src/test-helpers.ts index 684ee2753a..eab7d589a0 100644 --- a/packages/world-local/src/test-helpers.ts +++ b/packages/world-local/src/test-helpers.ts @@ -3,6 +3,7 @@ import type { SerializedData, Step, Storage, + Wait, WorkflowRun, } from '@workflow/world'; import { SPEC_VERSION_CURRENT } from '@workflow/world'; @@ -139,3 +140,45 @@ export async function disposeHook( correlationId: hookId, }); } + +/** + * Create a new wait through the wait_created event. + */ +export async function createWait( + storage: Storage, + runId: string, + data: { + waitId: string; + resumeAt: Date; + } +): Promise { + const result = await storage.events.create(runId, { + eventType: 'wait_created', + specVersion: SPEC_VERSION_CURRENT, + correlationId: data.waitId, + eventData: { resumeAt: data.resumeAt }, + }); + if (!result.wait) { + throw new Error('Expected wait to be created'); + } + return result.wait; +} + +/** + * Complete a wait through the wait_completed event. + */ +export async function completeWait( + storage: Storage, + runId: string, + waitId: string +): Promise { + const result = await storage.events.create(runId, { + eventType: 'wait_completed', + specVersion: SPEC_VERSION_CURRENT, + correlationId: waitId, + }); + if (!result.wait) { + throw new Error('Expected wait to be completed'); + } + return result.wait; +}