Skip to content

Commit aaf7f9a

Browse files
authored
fix(workflows): preserve completed node state across DAG multi-resume cycles (#1530)
* fix(workflows): preserve completed node state across DAG multi-resume cycles (#1520) Bug: getCompletedDagNodeOutputs only queried 'node_completed' events, but resumed runs emit 'node_skipped_prior_success' for already-completed nodes. On a second resume, previously completed nodes were re-executed because their skip events were invisible to the query. Fix: - Query both 'node_completed' and 'node_skipped_prior_success' event types in getCompletedDagNodeOutputs so multi-resume correctly identifies all previously completed nodes - Store original node_output in node_skipped_prior_success event data so the output is available for $nodeId.output substitution on next resume - Improve error messaging when all nodes fail: name the specific failed nodes and count skipped downstream nodes instead of the generic 'no successful nodes' message Closes #1520 * test: address review feedback — assert node_output, test failure message format - Assert node_output field in node_skipped_prior_success event test - Add test for empty-string fallback when node ID has undefined output - Add dedicated test verifying failure message names failing nodes - Update stale comment from 'no successful nodes' to new behavior - Add edge-case test for only node_skipped_prior_success rows (no node_completed)
1 parent 7bdf931 commit aaf7f9a

4 files changed

Lines changed: 145 additions & 9 deletions

File tree

packages/core/src/db/workflow-events.test.ts

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,46 @@ describe('workflow-events', () => {
208208
]);
209209
});
210210

211+
test('returns outputs from node_skipped_prior_success events (multi-resume)', async () => {
212+
mockQuery.mockResolvedValueOnce(
213+
createQueryResult([
214+
{ step_name: 'node-a', data: { node_output: 'output A' } },
215+
{ step_name: 'node-b', data: { reason: 'prior_success', node_output: 'output B' } },
216+
])
217+
);
218+
219+
const result = await getCompletedDagNodeOutputs('run-resume');
220+
221+
expect(result.size).toBe(2);
222+
expect(result.get('node-a')).toBe('output A');
223+
expect(result.get('node-b')).toBe('output B');
224+
expect(mockQuery).toHaveBeenCalledWith(
225+
expect.stringContaining('node_skipped_prior_success'),
226+
['run-resume']
227+
);
228+
});
229+
230+
test('returns outputs when only node_skipped_prior_success rows exist (no node_completed)', async () => {
231+
mockQuery.mockResolvedValueOnce(
232+
createQueryResult([
233+
{
234+
step_name: 'node-x',
235+
data: { reason: 'prior_success', node_output: 'skipped output X' },
236+
},
237+
{
238+
step_name: 'node-y',
239+
data: { reason: 'prior_success', node_output: 'skipped output Y' },
240+
},
241+
])
242+
);
243+
244+
const result = await getCompletedDagNodeOutputs('run-all-skipped');
245+
246+
expect(result.size).toBe(2);
247+
expect(result.get('node-x')).toBe('skipped output X');
248+
expect(result.get('node-y')).toBe('skipped output Y');
249+
});
250+
211251
test('parses JSON string data (SQLite path)', async () => {
212252
mockQuery.mockResolvedValueOnce(
213253
createQueryResult([

packages/core/src/db/workflow-events.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ export async function getCompletedDagNodeOutputs(
127127
data: string | Record<string, unknown>;
128128
}>(
129129
`SELECT step_name, data FROM remote_agent_workflow_events
130-
WHERE workflow_run_id = $1 AND event_type = 'node_completed'
130+
WHERE workflow_run_id = $1 AND event_type IN ('node_completed', 'node_skipped_prior_success')
131131
ORDER BY created_at ASC`,
132132
[workflowRunId]
133133
);

packages/workflows/src/dag-executor.test.ts

Lines changed: 91 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1292,10 +1292,10 @@ describe('executeDagWorkflow -- bash nodes', () => {
12921292
);
12931293

12941294
// The workflow should complete (it handles failures) but the node failed
1295-
// The mock platform should have received a failure message about no successful nodes
1295+
// The mock platform should have received a failure message about the failed node
12961296
const sendMessage = platform.sendMessage as ReturnType<typeof mock>;
12971297
const messages = sendMessage.mock.calls.map((call: unknown[]) => call[1] as string);
1298-
const failMsg = messages.find((m: string) => m.includes('no successful nodes'));
1298+
const failMsg = messages.find((m: string) => m.includes('failed') && m.includes('fail'));
12991299
expect(failMsg).toBeDefined();
13001300
});
13011301

@@ -3239,6 +3239,54 @@ describe('executeDagWorkflow -- resume with priorCompletedNodes', () => {
32393239
(call[0] as { step_name: string }).step_name === 'step1'
32403240
);
32413241
expect(skippedEvent).toBeDefined();
3242+
expect(skippedEvent[0].data.node_output).toBe('prior output');
3243+
});
3244+
3245+
it('emits node_skipped_prior_success with empty output when node ID not in map', async () => {
3246+
const store = createMockStore();
3247+
const mockDeps = createMockDeps(store);
3248+
const platform = createMockPlatform();
3249+
const workflowRun = makeWorkflowRun('resume-empty-output');
3250+
3251+
// priorCompletedNodes has step1 but with undefined value to test the ?? '' fallback
3252+
const priorCompletedNodes = new Map<string, string>([
3253+
['step1', undefined as unknown as string],
3254+
]);
3255+
3256+
await executeDagWorkflow(
3257+
mockDeps,
3258+
platform,
3259+
'conv-resume',
3260+
testDir,
3261+
{
3262+
name: 'two-step',
3263+
nodes: [
3264+
{ id: 'step1', command: 'step1' },
3265+
{ id: 'step2', command: 'step2', depends_on: ['step1'] },
3266+
],
3267+
},
3268+
workflowRun,
3269+
'claude',
3270+
undefined,
3271+
join(testDir, 'artifacts'),
3272+
join(testDir, 'logs'),
3273+
'main',
3274+
'docs/',
3275+
minimalConfig,
3276+
undefined,
3277+
undefined,
3278+
priorCompletedNodes
3279+
);
3280+
3281+
const eventCalls = (store.createWorkflowEvent as ReturnType<typeof mock>).mock.calls;
3282+
const skippedEvent = eventCalls.find(
3283+
(call: unknown[]) =>
3284+
(call[0] as { event_type: string }).event_type === 'node_skipped_prior_success' &&
3285+
(call[0] as { step_name: string }).step_name === 'step1'
3286+
);
3287+
expect(skippedEvent).toBeDefined();
3288+
// The ?? '' fallback kicks in when the map value is undefined
3289+
expect(skippedEvent[0].data.node_output).toBe('');
32423290
});
32433291

32443292
it('runs all nodes when priorCompletedNodes is empty', async () => {
@@ -4988,7 +5036,7 @@ describe('executeDagWorkflow -- terminal node output selection', () => {
49885036

49895037
expect(store.failWorkflowRun).toHaveBeenCalled();
49905038
// The "unknown provider" detail surfaces on the node_failed event; the
4991-
// workflow-level fail message is a generic "no successful nodes" summary.
5039+
// workflow-level fail message names the failing node(s).
49925040
const eventCalls = (store.createWorkflowEvent as ReturnType<typeof mock>).mock.calls;
49935041
const nodeFailedEvents = eventCalls.filter(
49945042
(call: unknown[]) => (call[0] as Record<string, unknown>).event_type === 'node_failed'
@@ -5001,6 +5049,44 @@ describe('executeDagWorkflow -- terminal node output selection', () => {
50015049
expect(nodeFailedData.error).toContain("unknown provider 'claud'");
50025050
});
50035051

5052+
it('failure message names the failing node instead of generic summary', async () => {
5053+
const store = createMockStore();
5054+
const mockDeps = createMockDeps(store);
5055+
const platform = createMockPlatform();
5056+
const workflowRun = makeWorkflowRun();
5057+
5058+
await executeDagWorkflow(
5059+
mockDeps,
5060+
platform,
5061+
'conv-dag',
5062+
testDir,
5063+
{
5064+
name: 'fail-msg-test',
5065+
nodes: [
5066+
{
5067+
id: 'fail-node',
5068+
command: 'my-cmd',
5069+
provider: 'nonexistent',
5070+
},
5071+
],
5072+
},
5073+
workflowRun,
5074+
'claude',
5075+
undefined,
5076+
join(testDir, 'artifacts'),
5077+
join(testDir, 'logs'),
5078+
'main',
5079+
'docs/',
5080+
minimalConfig
5081+
);
5082+
5083+
expect(store.failWorkflowRun).toHaveBeenCalled();
5084+
const failCall = (store.failWorkflowRun as ReturnType<typeof mock>).mock.calls[0];
5085+
const failMsg = failCall[1] as string;
5086+
expect(failMsg).toContain('fail-node failed');
5087+
expect(failMsg).not.toContain('no successful nodes');
5088+
});
5089+
50045090
it('excludes intermediate nodes with dependents from terminal set (fan-in DAG)', async () => {
50055091
let callCount = 0;
50065092
mockSendQueryDag.mockImplementation(async function* () {
@@ -6545,7 +6631,7 @@ describe('executeDagWorkflow -- script nodes', () => {
65456631

65466632
const sendMessage = platform.sendMessage as ReturnType<typeof mock>;
65476633
const messages = sendMessage.mock.calls.map((call: unknown[]) => call[1] as string);
6548-
const failMsg = messages.find((m: string) => m.includes('no successful nodes'));
6634+
const failMsg = messages.find((m: string) => m.includes('failed') && m.includes('fail-script'));
65496635
expect(failMsg).toBeDefined();
65506636
});
65516637

@@ -6640,7 +6726,7 @@ describe('executeDagWorkflow -- script nodes', () => {
66406726
const sendMessage = platform.sendMessage as ReturnType<typeof mock>;
66416727
const messages = sendMessage.mock.calls.map((call: unknown[]) => call[1] as string);
66426728
// Workflow fails because the only node failed (timeout)
6643-
const failMsg = messages.find((m: string) => m.includes('no successful nodes'));
6729+
const failMsg = messages.find((m: string) => m.includes('failed') && m.includes('slow-script'));
66446730
expect(failMsg).toBeDefined();
66456731
}, 10000);
66466732

packages/workflows/src/dag-executor.ts

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2647,7 +2647,10 @@ export async function executeDagWorkflow(
26472647
workflow_run_id: workflowRun.id,
26482648
event_type: 'node_skipped_prior_success',
26492649
step_name: node.id,
2650-
data: { reason: 'prior_success' },
2650+
data: {
2651+
reason: 'prior_success',
2652+
node_output: priorCompletedNodes.get(node.id) ?? '',
2653+
},
26512654
})
26522655
.catch((err: Error) => {
26532656
getLog().error(
@@ -3146,9 +3149,16 @@ export async function executeDagWorkflow(
31463149

31473150
if (!anyCompleted) {
31483151
if (await skipIfStatusChanged('dag.skip_fail_status_changed')) return;
3152+
const failedNodes: string[] = [];
3153+
for (const [nodeId, o] of nodeOutputs) {
3154+
if (o.state === 'failed') failedNodes.push(nodeId);
3155+
}
31493156
const failMsg =
3150-
`DAG workflow '${workflow.name}' completed with no successful nodes. ` +
3151-
'Check node conditions, trigger rules, and upstream failures.';
3157+
failedNodes.length > 0
3158+
? `DAG workflow '${workflow.name}' failed: node${failedNodes.length > 1 ? 's' : ''} ${failedNodes.join(', ')} failed. ` +
3159+
`${nodeCounts.skipped} downstream node${nodeCounts.skipped !== 1 ? 's were' : ' was'} skipped.`
3160+
: `DAG workflow '${workflow.name}' completed with no successful nodes. ` +
3161+
'Check node conditions, trigger rules, and upstream failures.';
31523162
// Note: nodeCounts not stored for failed runs — failWorkflowRun only stores { error }.
31533163
// Frontend guards with isValidNodeCounts so missing node_counts is safe.
31543164
await deps.store.failWorkflowRun(workflowRun.id, failMsg).catch((dbErr: Error) => {

0 commit comments

Comments
 (0)