Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,28 @@ describe('handleExecutionDelay', () => {
});

describe('short wait — abort during in-process sleep (real timers)', () => {
it('on step abort during sleep sets RUNNING and returns (cancel / interrupt path)', async () => {
it('flushes state during short waits for live progress', async () => {
const params = makeParams();
const resumeAt = new Date(Date.now() + 2000).toISOString();
const stepRuntime = makeStepRuntime({
node: { stepType: 'wait' } as any,
stepExecution: {
status: ExecutionStatus.WAITING,
state: { resumeAt },
} as any,
});

const delayPromise = handleExecutionDelay(params, stepRuntime);
await Promise.resolve();
stepRuntime.abortController.abort();
await delayPromise;

expect(params.stepIoService.flush).toHaveBeenCalled();
expect(params.workflowLogger.flushEvents).toHaveBeenCalled();
expect(params.workflowTaskManager.scheduleResumeTask).not.toHaveBeenCalled();
});

it('does not reset workflow status to RUNNING when delay is aborted for cancellation', async () => {
const params = makeParams();
const resumeAt = new Date(Date.now() + 3000).toISOString();
const ac = new AbortController();
Expand All @@ -342,12 +363,13 @@ describe('handleExecutionDelay', () => {
} as any,
});

queueMicrotask(() => ac.abort());

await handleExecutionDelay(params, stepRuntime);
const delayPromise = handleExecutionDelay(params, stepRuntime);
await Promise.resolve();
ac.abort();
await delayPromise;

expect(params.workflowTaskManager.scheduleResumeTask).not.toHaveBeenCalled();
expect(params.workflowExecutionState.updateWorkflowExecution).toHaveBeenCalledWith({
expect(params.workflowExecutionState.updateWorkflowExecution).not.toHaveBeenCalledWith({
status: ExecutionStatus.RUNNING,
});
});
Expand All @@ -368,7 +390,7 @@ describe('handleExecutionDelay', () => {

await handleExecutionDelay(params, stepRuntime);

expect(params.stepIoService.flush).toHaveBeenCalled();
expect(params.stepIoService.flush).not.toHaveBeenCalled();
expect(params.workflowTaskManager.scheduleResumeTask).toHaveBeenCalledTimes(1);
const call = (params.workflowTaskManager.scheduleResumeTask as jest.Mock).mock.calls[0][0];
expect(call.workflowExecution).toEqual(expect.objectContaining({ id: 'exec-parent' }));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,17 +102,24 @@ export async function handleExecutionDelay(
const resumeAt = new Date(resumeAtFromState);
const now = new Date();
const diff = resumeAt.getTime() - now.getTime();
await flushState(params);
params.workflowExecutionState.updateWorkflowExecution({
status: ExecutionStatus.WAITING,
});
if (diff < SHORT_DURATION_THRESHOLD) {
// Flush while workflow is still RUNNING so the persistence loop stays active and
// cancellation is not racing a freshly-persisted WAITING workflow status.
await flushState(params);
params.workflowExecutionState.updateWorkflowExecution({
status: ExecutionStatus.WAITING,
});
const timeout = diff > 0 ? diff : 0;

try {
await abortableTimeout(timeout, stepExecutionRuntime.abortController.signal);
} catch (error) {
if (error instanceof TimeoutAbortedError) {
if (stepExecutionRuntime.abortController.signal.aborted) {
return;
}
// Delay was interrupted for other reasons (e.g. on-failure continue).
// Reset status to RUNNING so the execution loop can continue.
params.workflowExecutionState.updateWorkflowExecution({
status: ExecutionStatus.RUNNING,
});
Expand All @@ -125,6 +132,9 @@ export async function handleExecutionDelay(
status: ExecutionStatus.RUNNING,
});
} else {
params.workflowExecutionState.updateWorkflowExecution({
status: ExecutionStatus.WAITING,
});
await params.workflowTaskManager.scheduleResumeTask({
workflowExecution: workflowExecution as EsWorkflowExecution,
resumeAt,
Expand Down
Loading