Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/fix-concurrent-wait-completed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@workflow/world-local': patch
---

Fix concurrent `wait_completed` race condition that caused duplicate events and `Unconsumed event` errors during replay
6 changes: 6 additions & 0 deletions .changeset/fix-wait-completed-event-refetch.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@workflow/core': patch
'workflow': patch
---

Re-fetch event log on `wait_completed` 409 conflict to ensure correct event ordering
15 changes: 13 additions & 2 deletions packages/core/src/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ export function workflowEntrypoint(
}

// Load all events into memory before running
const events = await getAllWorkflowRunEvents(workflowRun.runId);
let events = await getAllWorkflowRunEvents(workflowRun.runId);

// Check for any elapsed waits and create wait_completed events
const now = Date.now();
Expand All @@ -245,7 +245,11 @@ export function workflowEntrypoint(
correlationId: e.correlationId,
}));

// Create all wait_completed events
// Create all wait_completed events.
// If any creation returns 409, a concurrent invocation already
// created the event. Re-fetch the full event log to get the
// authoritative ordering rather than appending locally.
let needsRefetch = false;
for (const waitEvent of waitsToComplete) {
try {
const result = await world.events.create(runId, waitEvent);
Expand All @@ -257,12 +261,19 @@ export function workflowEntrypoint(
workflowRunId: runId,
correlationId: waitEvent.correlationId,
});
needsRefetch = true;
continue;
}
throw err;
}
}

// Re-fetch the event log if a concurrent invocation created
// events we don't have locally, ensuring correct ordering.
if (needsRefetch) {
events = await getAllWorkflowRunEvents(workflowRun.runId);
}

// Resolve the encryption key for this run's deployment
const rawKey =
await world.getEncryptionKeyForRun?.(workflowRun);
Expand Down
98 changes: 98 additions & 0 deletions packages/world-local/src/storage.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ import { afterEach, beforeEach, describe, expect, it } from 'vitest';
import { writeJSON } from './fs.js';
import { createStorage } from './storage.js';
import {
completeWait,
createHook,
createRun,
createStep,
createWait,
disposeHook,
updateRun,
updateStep,
Expand Down Expand Up @@ -2544,6 +2546,102 @@ describe('Storage', () => {
});
});

describe('waits', () => {
it('should create and complete a wait', async () => {
const run = await createRun(storage, {
deploymentId: 'dep-1',
workflowName: 'wf-1',
input: new Uint8Array(),
});
await updateRun(storage, run.runId, 'run_started');

const wait = await createWait(storage, run.runId, {
waitId: 'wait_001',
resumeAt: new Date('2099-01-01'),
});
expect(wait.status).toBe('waiting');

const completed = await completeWait(storage, run.runId, 'wait_001');
expect(completed.status).toBe('completed');
});

it('should reject duplicate wait_created for the same correlationId', async () => {
const run = await createRun(storage, {
deploymentId: 'dep-1',
workflowName: 'wf-1',
input: new Uint8Array(),
});
await updateRun(storage, run.runId, 'run_started');

await createWait(storage, run.runId, {
waitId: 'wait_dup',
resumeAt: new Date('2099-01-01'),
});

await expect(
createWait(storage, run.runId, {
waitId: 'wait_dup',
resumeAt: new Date('2099-01-01'),
})
).rejects.toThrow(/already exists/i);
});

it('should reject duplicate wait_completed for the same correlationId', async () => {
const run = await createRun(storage, {
deploymentId: 'dep-1',
workflowName: 'wf-1',
input: new Uint8Array(),
});
await updateRun(storage, run.runId, 'run_started');

await createWait(storage, run.runId, {
waitId: 'wait_once',
resumeAt: new Date('2099-01-01'),
});

await completeWait(storage, run.runId, 'wait_once');

// Second completion should be rejected with 409
await expect(
completeWait(storage, run.runId, 'wait_once')
).rejects.toThrow(/already completed/i);
});

it('should reject concurrent wait_completed for the same correlationId', async () => {
const run = await createRun(storage, {
deploymentId: 'dep-1',
workflowName: 'wf-1',
input: new Uint8Array(),
});
await updateRun(storage, run.runId, 'run_started');

await createWait(storage, run.runId, {
waitId: 'wait_race',
resumeAt: new Date('2099-01-01'),
});

// Simulate two concurrent completions racing
const results = await Promise.allSettled([
completeWait(storage, run.runId, 'wait_race'),
completeWait(storage, run.runId, 'wait_race'),
]);

const fulfilled = results.filter((r) => r.status === 'fulfilled');
const rejected = results.filter((r) => r.status === 'rejected');

// Exactly one should succeed, one should fail with 409
expect(fulfilled).toHaveLength(1);
expect(rejected).toHaveLength(1);
expect((rejected[0] as PromiseRejectedResult).reason).toBeInstanceOf(
WorkflowAPIError
);
expect(
((rejected[0] as PromiseRejectedResult).reason as WorkflowAPIError)
.status
).toBe(409);
});
});

describe('custom runId validation', () => {
const runCreatedEvent = {
eventType: 'run_created' as const,
Expand Down
23 changes: 16 additions & 7 deletions packages/world-local/src/storage/events-storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -718,8 +718,23 @@ export function createEventsStorage(
wait
);
} else if (data.eventType === 'wait_completed') {
// wait_completed: Transitions wait to 'completed', rejects duplicates
// wait_completed: Transitions wait to 'completed', rejects duplicates.
// Uses writeExclusive on a lock file to atomically prevent concurrent
// invocations from both completing the same wait (TOCTOU race).
const waitCompositeKey = `${effectiveRunId}-${data.correlationId}`;
const lockPath = taggedPath(
basedir,
'waits',
`${waitCompositeKey}.completed`,
tag
Comment on lines +725 to +729
);
const claimed = await writeExclusive(lockPath, '');
if (!claimed) {
throw new WorkflowAPIError(
`Wait "${data.correlationId}" already completed`,
{ status: 409 }
);
}
const existingWait = await readJSONWithFallback(
basedir,
'waits',
Expand All @@ -732,12 +747,6 @@ export function createEventsStorage(
status: 404,
});
}
Comment on lines 731 to 749
if (existingWait.status === 'completed') {
throw new WorkflowAPIError(
`Wait "${data.correlationId}" already completed`,
{ status: 409 }
);
}
wait = {
...existingWait,
status: 'completed',
Expand Down
43 changes: 43 additions & 0 deletions packages/world-local/src/test-helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import type {
SerializedData,
Step,
Storage,
Wait,
WorkflowRun,
} from '@workflow/world';
import { SPEC_VERSION_CURRENT } from '@workflow/world';
Expand Down Expand Up @@ -139,3 +140,45 @@ export async function disposeHook(
correlationId: hookId,
});
}

/**
* Create a new wait through the wait_created event.
*/
export async function createWait(
storage: Storage,
runId: string,
data: {
waitId: string;
resumeAt: Date;
}
): Promise<Wait> {
const result = await storage.events.create(runId, {
eventType: 'wait_created',
specVersion: SPEC_VERSION_CURRENT,
correlationId: data.waitId,
eventData: { resumeAt: data.resumeAt },
});
if (!result.wait) {
throw new Error('Expected wait to be created');
}
return result.wait;
}

/**
* Complete a wait through the wait_completed event.
*/
export async function completeWait(
storage: Storage,
runId: string,
waitId: string
): Promise<Wait> {
const result = await storage.events.create(runId, {
eventType: 'wait_completed',
specVersion: SPEC_VERSION_CURRENT,
correlationId: waitId,
});
if (!result.wait) {
throw new Error('Expected wait to be completed');
}
return result.wait;
}
Loading