Skip to content

Commit c4d9b0a

Browse files
authored
Merge pull request coleam00#962 from dynamous-community/archon/task-fix-issue-961
fix: prevent SIGTERM from orphaning CLI workflow runs
2 parents 5a14f33 + 161fbde commit c4d9b0a

4 files changed

Lines changed: 90 additions & 24 deletions

File tree

packages/cli/src/cli.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ import { chatCommand } from './commands/chat';
6565
import { setupCommand } from './commands/setup';
6666
import { validateWorkflowsCommand, validateCommandsCommand } from './commands/validate';
6767
import { closeDatabase } from '@archon/core';
68+
import { createWorkflowStore } from '@archon/core/workflows/store-adapter';
6869
import { setLogLevel, createLogger } from '@archon/paths';
6970
import * as git from '@archon/git';
7071

@@ -214,6 +215,14 @@ async function main(): Promise<number> {
214215
setLogLevel('debug');
215216
}
216217

218+
// Mark workflow runs orphaned by previous process termination as failed
219+
void createWorkflowStore()
220+
.failOrphanedRuns()
221+
.catch((err: unknown) => {
222+
const e = err as Error;
223+
getLog().warn({ err: e, errorType: e.constructor.name }, 'cli.fail_orphans_failed');
224+
});
225+
217226
// Validate working directory exists
218227
let effectiveCwd = cwd;
219228
if (requiresGitRepo) {

packages/cli/src/commands/workflow.ts

Lines changed: 35 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -518,20 +518,36 @@ export async function workflowRunCommand(
518518
);
519519

520520
// Register cleanup handlers for graceful termination
521-
const cleanup = async (signal: string): Promise<void> => {
521+
let terminating = false;
522+
const cleanup = (signal: string): void => {
523+
if (terminating) return;
524+
terminating = true;
522525
getLog().info({ conversationId: conversation.id, signal }, 'workflow.process_terminating');
523-
try {
524-
const activeRun = await workflowDb.getActiveWorkflowRun(conversation.id);
525-
if (activeRun) {
526-
await workflowDb.failWorkflowRun(activeRun.id, `Process terminated (${signal})`);
527-
}
528-
} catch (err) {
529-
getLog().error({ err: err as Error }, 'workflow.termination_cleanup_failed');
530-
}
531-
process.exit(1);
526+
workflowDb
527+
.getActiveWorkflowRun(conversation.id)
528+
.then(activeRun => {
529+
if (activeRun) {
530+
return workflowDb.failWorkflowRun(activeRun.id, `Process terminated (${signal})`);
531+
}
532+
return undefined;
533+
})
534+
.catch((err: unknown) => {
535+
const e = err as Error;
536+
getLog().error(
537+
{ err: e, errorType: e.constructor.name },
538+
'workflow.termination_cleanup_failed'
539+
);
540+
})
541+
.finally(() => {
542+
process.exit(1);
543+
});
532544
};
533-
process.on('SIGTERM', () => void cleanup('SIGTERM'));
534-
process.on('SIGINT', () => void cleanup('SIGINT'));
545+
process.once('SIGTERM', () => {
546+
cleanup('SIGTERM');
547+
});
548+
process.once('SIGINT', () => {
549+
cleanup('SIGINT');
550+
});
535551

536552
// Execute workflow with workingCwd (may be worktree path)
537553
const result = await executeWorkflow(
@@ -730,14 +746,13 @@ export async function workflowStatusCommand(json?: boolean, verbose?: boolean):
730746
if (nodes.length > 0) {
731747
console.log(' Nodes:');
732748
for (const node of nodes) {
733-
const icon =
734-
node.state === 'completed'
735-
? '✓'
736-
: node.state === 'failed'
737-
? '✗'
738-
: node.state === 'skipped'
739-
? '-'
740-
: '◌';
749+
const iconMap: Record<string, string> = {
750+
completed: '✓',
751+
failed: '✗',
752+
skipped: '-',
753+
running: '◌',
754+
};
755+
const icon = iconMap[node.state] ?? '◌';
741756
const duration =
742757
node.durationMs !== undefined ? ` (${formatDuration(node.durationMs)})` : '';
743758
const stateLabel = node.state === 'running' ? ' (running)' : '';

packages/core/src/db/workflows.test.ts

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -489,7 +489,42 @@ describe('workflows database', () => {
489489
expect(query).toContain('working_path = $2');
490490
expect(query).not.toContain('conversation_id');
491491
expect(query).toContain('ORDER BY started_at DESC');
492-
expect(params).toEqual(['feature-development', '/repo/path']);
492+
expect(params).toEqual(['feature-development', '/repo/path', 1]);
493+
});
494+
495+
test('returns a stale running run (no activity for >1 day)', async () => {
496+
const twoDaysAgo = new Date(Date.now() - 2 * 24 * 60 * 60 * 1000).toISOString();
497+
const staleRun = {
498+
...mockWorkflowRun,
499+
status: 'running' as const,
500+
working_path: '/repo/path',
501+
last_activity_at: twoDaysAgo,
502+
};
503+
mockQuery.mockResolvedValueOnce(createQueryResult([staleRun]));
504+
505+
const result = await findResumableRun('feature-development', '/repo/path');
506+
507+
expect(result).toEqual(staleRun);
508+
const [query, params] = mockQuery.mock.calls[0] as [string, unknown[]];
509+
expect(query).toContain("status = 'running'");
510+
expect(query).toContain('last_activity_at');
511+
expect(params).toEqual(['feature-development', '/repo/path', 1]);
512+
});
513+
514+
test('returns a running run with null last_activity_at (never recorded activity)', async () => {
515+
const staleRun = {
516+
...mockWorkflowRun,
517+
status: 'running' as const,
518+
working_path: '/repo/path',
519+
last_activity_at: null,
520+
};
521+
mockQuery.mockResolvedValueOnce(createQueryResult([staleRun]));
522+
523+
const result = await findResumableRun('feature-development', '/repo/path');
524+
525+
expect(result).toEqual(staleRun);
526+
const [query] = mockQuery.mock.calls[0] as [string, unknown[]];
527+
expect(query).toContain('last_activity_at IS NULL');
493528
});
494529

495530
test('returns null when no resumable run exists', async () => {

packages/core/src/db/workflows.ts

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -226,21 +226,28 @@ export async function findResumableRun(
226226
workflowName: string,
227227
workingPath: string
228228
): Promise<WorkflowRun | null> {
229+
const dialect = getDialect();
229230
try {
230231
const result = await pool.query<WorkflowRun>(
231232
`SELECT * FROM remote_agent_workflow_runs
232233
WHERE workflow_name = $1
233234
AND working_path = $2
234-
AND status IN ('failed', 'paused')
235+
AND (
236+
status IN ('failed', 'paused')
237+
OR (status = 'running' AND (last_activity_at IS NULL OR last_activity_at < ${dialect.nowMinusDays(3)})) -- nowMinusDays(3): param index 3, value bound as $3 = 1 day
238+
)
235239
ORDER BY started_at DESC
236240
LIMIT 1`,
237-
[workflowName, workingPath]
241+
[workflowName, workingPath, 1]
238242
);
239243
const row = result.rows[0];
240244
return row ? normalizeWorkflowRun(row) : null;
241245
} catch (error) {
242246
const err = error as Error;
243-
getLog().warn({ err, workflowName, workingPath }, 'db.workflow_run_find_resumable_failed');
247+
getLog().error(
248+
{ err, errorType: err.constructor.name, workflowName, workingPath },
249+
'db.workflow_run_find_resumable_failed'
250+
);
244251
throw new Error(`Failed to find resumable run: ${err.message}`);
245252
}
246253
}

0 commit comments

Comments
 (0)