diff --git a/src/cli-main.ts b/src/cli-main.ts index 325d0ca5..28ba831f 100644 --- a/src/cli-main.ts +++ b/src/cli-main.ts @@ -1317,6 +1317,8 @@ export async function main(): Promise { authTokenEnv: getArg('--auth-token-env'), tlsCert: getArg('--tls-cert'), tlsKey: getArg('--tls-key'), + asyncMode: mcpArgs.includes('--async'), + longPollTimeout: getArg('--poll-timeout') ? Number(getArg('--poll-timeout')) : undefined, }; const { startMcpServer } = await import('./mcp-server'); diff --git a/src/cli.ts b/src/cli.ts index d0bb2fff..5beb4114 100644 --- a/src/cli.ts +++ b/src/cli.ts @@ -42,6 +42,7 @@ export class CLI { parseInt(value, 10) ) .option('--mcp-auth-token ', 'Bearer token for MCP HTTP server authentication') + .option('--mcp-async', 'Enable async job mode for MCP (start_job/get_job pattern)') .option( '-c, --check ', 'Specify check type (can be used multiple times)', @@ -240,6 +241,7 @@ export class CLI { mcp: Boolean(options.mcp), mcpPort: options.mcpPort, mcpAuthToken: options.mcpAuthToken, + mcpAsync: Boolean(options.mcpAsync), tui: Boolean(options.tui), keepWorkspace: Boolean(options.keepWorkspace), workspacePath: options.workspacePath, diff --git a/src/mcp-job-manager.ts b/src/mcp-job-manager.ts new file mode 100644 index 00000000..dfb26b79 --- /dev/null +++ b/src/mcp-job-manager.ts @@ -0,0 +1,378 @@ +/** + * MCP Job Manager — async job pattern for long-running MCP tool calls. + * + * Claude Code and other MCP clients have timeout issues with long-running tools. + * This module provides a `start_job` / `get_job` pattern: the tool call returns + * immediately with a job_id, and the client polls `get_job` until done. + * + * This is a thin wrapper over the existing TaskStore — jobs are persisted as + * regular Visor tasks (visible via `visor tasks list`, `visor tasks show`). + */ + +import crypto from 'crypto'; +import { logger } from './logger'; +import type { TaskStore } from './agent-protocol/task-store'; +import type { AgentTask, TaskState } from './agent-protocol/types'; + +/** Job status values exposed to MCP clients. */ +export type JobStatus = 'queued' | 'running' | 'completed' | 'failed' | 'expired'; + +/** Response shape returned by both start_job and get_job. */ +export interface JobResponse { + job_id: string; + status: JobStatus; + done: boolean; + progress: { + percent: number | null; + step: string; + message: string; + }; + polling: { + recommended_next_action: 'get_job' | 'none'; + recommended_delay_seconds: number; + }; + result: any; + error: { code: string; message: string; retryable: boolean } | null; + user_message: string; + next_instruction_for_model: string; +} + +/** Map TaskState → JobStatus for the MCP response. */ +function taskStateToJobStatus(state: TaskState): JobStatus { + switch (state) { + case 'submitted': + return 'queued'; + case 'working': + case 'input_required': + case 'auth_required': + return 'running'; + case 'completed': + return 'completed'; + case 'failed': + case 'canceled': + case 'rejected': + return 'failed'; + default: + return 'running'; + } +} + +/** + * Build the standardized MCP job response from an AgentTask. + */ +function buildResponse(task: AgentTask): JobResponse { + const status = taskStateToJobStatus(task.status.state); + const done = status === 'completed' || status === 'failed'; + + const polling = done + ? { recommended_next_action: 'none' as const, recommended_delay_seconds: 0 } + : { recommended_next_action: 'get_job' as const, recommended_delay_seconds: 0 }; + + // Extract result text from task status message or artifacts + let result: any = null; + if (status === 'completed') { + // The completed status message contains the AI response text + const statusText = task.status.message?.parts?.[0]?.text; + if (statusText) { + result = statusText; + } + // Also check artifacts for richer output + if (task.artifacts.length > 0) { + const artifactTexts = task.artifacts + .flatMap(a => a.parts) + .filter(p => p.text) + .map(p => p.text); + if (artifactTexts.length > 0 && !result) { + result = artifactTexts.join('\n'); + } + } + } + + // Extract error from failed status message + let error: JobResponse['error'] = null; + if (status === 'failed' && task.status.message) { + const errorText = task.status.message.parts?.[0]?.text || 'Unknown error'; + error = { + code: 'EXECUTION_ERROR', + message: errorText, + retryable: true, + }; + } + + // Build progress + let progress: JobResponse['progress']; + switch (status) { + case 'queued': + progress = { percent: 0, step: 'queued', message: 'Job accepted and queued' }; + break; + case 'running': + progress = { percent: null, step: 'running', message: 'Job is executing' }; + break; + case 'completed': + progress = { percent: 100, step: 'completed', message: 'Job finished successfully' }; + break; + case 'failed': + progress = { percent: null, step: 'failed', message: 'The job failed before completion' }; + break; + default: + progress = { percent: null, step: status, message: `Job is ${status}` }; + } + + // Instructions for the model + let userMessage: string; + let nextInstruction: string; + switch (status) { + case 'queued': + userMessage = 'The job has been queued.'; + nextInstruction = + 'Call get_job with this job_id. It will wait up to 59 seconds for the result. If it returns with done still false, call get_job again.'; + break; + case 'running': + userMessage = 'The job is still running.'; + nextInstruction = + 'Call get_job again with this job_id. It will wait up to 59 seconds for the result. If it returns with done still false, call get_job again.'; + break; + case 'completed': + userMessage = 'The result is ready.'; + nextInstruction = 'Use the result to answer the user.'; + break; + case 'failed': + userMessage = 'The job failed.'; + nextInstruction = + 'Explain the failure briefly. If the user still wants the result, start a new job.'; + break; + default: + userMessage = `Job status: ${status}`; + nextInstruction = 'Call get_job again after 10 seconds.'; + } + + return { + job_id: task.id.slice(0, 8), + status, + done, + progress, + polling, + result, + error, + user_message: userMessage, + next_instruction_for_model: nextInstruction, + }; +} + +/** Build an "expired/not found" response. */ +function buildExpiredResponse(jobId: string): JobResponse { + return { + job_id: jobId, + status: 'expired', + done: true, + progress: { percent: null, step: 'expired', message: 'This job is no longer available' }, + polling: { recommended_next_action: 'none', recommended_delay_seconds: 0 }, + result: null, + error: { code: 'JOB_EXPIRED', message: 'The job is no longer available', retryable: true }, + user_message: 'The previous job expired.', + next_instruction_for_model: 'If the user still wants the result, start a new job.', + }; +} + +/** + * Manages async jobs for MCP tool calls, backed by the existing TaskStore. + * + * Jobs created here are visible via `visor tasks list` and `visor tasks show`. + */ +export class JobManager { + private longPollTimeoutMs: number; + + constructor( + private taskStore: TaskStore, + opts?: { longPollTimeoutMs?: number } + ) { + this.longPollTimeoutMs = opts?.longPollTimeoutMs ?? 59_000; + } + + /** + * Start a new async job. Creates a task, runs the handler in the background, + * and returns immediately with the task ID. + */ + startJob( + handler: () => Promise, + opts: { + messageText: string; + workflowId?: string; + configPath?: string; + metadata?: Record; + } + ): JobResponse { + // Create task via trackExecution pattern (inline, since we need non-blocking) + const { getInstanceId } = require('./utils/instance-id'); + + const requestMessage = { + message_id: crypto.randomUUID(), + role: 'user' as const, + parts: [{ text: opts.messageText }], + }; + + const task = this.taskStore.createTask({ + contextId: crypto.randomUUID(), + requestMessage, + workflowId: opts.workflowId, + requestMetadata: { + source: 'mcp', + instance_id: getInstanceId(), + async_job: true, + ...opts.metadata, + }, + }); + + // Transition to working + this.taskStore.updateTaskState(task.id, 'working'); + this.taskStore.claimTask(task.id, getInstanceId()); + + logger.info( + `[MCP-AsyncJob] Job ${task.id.slice(0, 8)} started (workflow=${opts.workflowId || '-'})` + ); + + // Heartbeat timer + const heartbeatTimer = setInterval(() => { + try { + this.taskStore.heartbeat(task.id); + } catch { + // best-effort + } + }, 60_000); + if (heartbeatTimer.unref) heartbeatTimer.unref(); + + // Run handler in background + handler() + .then(result => { + clearInterval(heartbeatTimer); + + // Extract response text from the result (same logic as track-execution.ts) + let responseText = 'Execution completed'; + try { + // If result is an MCP content response, extract the text + const content = result?.content; + if (Array.isArray(content)) { + const texts = content.filter((c: any) => c.type === 'text').map((c: any) => c.text); + if (texts.length > 0) responseText = texts.join('\n'); + } else if (typeof result === 'string') { + responseText = result; + } else { + // Try engine result format + const history = result?.reviewSummary?.history; + if (history) { + const entries = Object.values(history); + for (let i = entries.length - 1; i >= 0; i--) { + const outputs = entries[i] as any[]; + if (!Array.isArray(outputs)) continue; + for (let j = outputs.length - 1; j >= 0; j--) { + const text = outputs[j]?.text; + if (typeof text === 'string' && text.trim().length > 0) { + responseText = text.trim(); + break; + } + } + if (responseText !== 'Execution completed') break; + } + } + } + } catch { + // ignore extraction errors + } + + const completedMsg = { + message_id: crypto.randomUUID(), + role: 'agent' as const, + parts: [{ text: responseText }], + }; + try { + this.taskStore.updateTaskState(task.id, 'completed', completedMsg); + logger.info(`[MCP-AsyncJob] Job ${task.id.slice(0, 8)} completed`); + } catch (stateErr) { + logger.warn( + `[MCP-AsyncJob] Job ${task.id.slice(0, 8)} completed but state transition failed: ${stateErr}` + ); + } + }) + .catch(err => { + clearInterval(heartbeatTimer); + const errorText = err instanceof Error ? err.message : String(err); + const failMsg = { + message_id: crypto.randomUUID(), + role: 'agent' as const, + parts: [{ text: errorText }], + }; + try { + this.taskStore.updateTaskState(task.id, 'failed', failMsg); + logger.info(`[MCP-AsyncJob] Job ${task.id.slice(0, 8)} failed: ${errorText}`); + } catch (stateErr) { + logger.warn( + `[MCP-AsyncJob] Job ${task.id.slice(0, 8)} failed but could not update task state: ${stateErr}` + ); + } + }); + + return buildResponse(task); + } + + /** + * Get the current state of a job by ID (supports both short and full IDs). + * + * Uses long polling: if the job is still running, waits up to 59 seconds + * for it to finish before responding. Returns immediately if the job is + * already in a terminal state (completed/failed). + */ + async getJob(jobId: string): Promise { + const resolvedTask = this.resolveTask(jobId); + if (!resolvedTask) { + return buildExpiredResponse(jobId); + } + + // If already done, return immediately + const initialStatus = taskStateToJobStatus(resolvedTask.status.state); + if (initialStatus === 'completed' || initialStatus === 'failed') { + return buildResponse(resolvedTask); + } + + // Long poll: check every 500ms for up to the configured timeout + const POLL_INTERVAL_MS = 500; + const MAX_WAIT_MS = this.longPollTimeoutMs; + const deadline = Date.now() + MAX_WAIT_MS; + + while (Date.now() < deadline) { + await new Promise(resolve => setTimeout(resolve, POLL_INTERVAL_MS)); + + const task = this.resolveTask(jobId); + if (!task) { + return buildExpiredResponse(jobId); + } + + const status = taskStateToJobStatus(task.status.state); + if (status === 'completed' || status === 'failed') { + return buildResponse(task); + } + } + + // Timeout — return current state (still running) + const finalTask = this.resolveTask(jobId); + if (!finalTask) { + return buildExpiredResponse(jobId); + } + return buildResponse(finalTask); + } + + /** + * Resolve a task by full UUID or short ID prefix. + */ + private resolveTask(jobId: string): import('./agent-protocol/types').AgentTask | null { + // Try direct lookup first (full UUID) + let task = this.taskStore.getTask(jobId); + + // If not found and looks like a short ID, search by prefix + if (!task && jobId.length < 36) { + const { tasks } = this.taskStore.listTasks({ limit: 200 }); + task = tasks.find(t => t.id.startsWith(jobId)) || null; + } + + return task; + } +} diff --git a/src/mcp-server.ts b/src/mcp-server.ts index adf31775..26e4ed0a 100644 --- a/src/mcp-server.ts +++ b/src/mcp-server.ts @@ -89,6 +89,15 @@ export interface McpServerOptions { /** Path to TLS private key PEM file. */ tlsKey?: string; + + /** Enable async job mode (start_job/get_job instead of blocking tool). */ + asyncMode?: boolean; + + /** Long poll timeout in seconds for get_job (default: 59). */ + longPollTimeout?: number; + + /** TaskStore for async job mode persistence. If not provided, one will be created. */ + taskStore?: import('./agent-protocol/task-store').TaskStore; } /** @@ -509,6 +518,105 @@ export async function executeFixedWorkflow( } } +/** + * Get or create a TaskStore for async job mode. + * If one is provided in options, use it. Otherwise create a SQLite-backed store. + */ +async function getOrCreateTaskStore( + options: McpServerOptions +): Promise { + if (options.taskStore) return options.taskStore; + const { SqliteTaskStore } = await import('./agent-protocol/task-store'); + const store = new SqliteTaskStore(); + await store.initialize(); + return store; +} + +/** + * Register async job tools (start_job / get_job) on an MCP server instance. + * Used by both standalone and createHttpMcpServer when asyncMode is enabled. + * + * Requires a TaskStore for persistence — jobs are stored as regular Visor tasks, + * visible via `visor tasks list` and `visor tasks show`. + */ +async function registerAsyncJobTools( + server: McpServer, + resolvedWorkflowPath: string | undefined, + toolName: string, + taskStore: import('./agent-protocol/task-store').TaskStore, + longPollTimeoutMs?: number +): Promise { + const { JobManager } = await import('./mcp-job-manager'); + const jobManager = new JobManager(taskStore, { longPollTimeoutMs }); + + const startJobName = + toolName === 'run_workflow' || toolName === 'send_message' ? 'start_job' : `start_${toolName}`; + const getJobName = 'get_job'; + + const workflowId = resolvedWorkflowPath + ? path.basename(resolvedWorkflowPath, path.extname(resolvedWorkflowPath)) + : undefined; + + // Build start_job schema based on whether we have a fixed workflow + if (resolvedWorkflowPath) { + (server as any).tool( + startJobName, + 'Start a long-running job. Returns immediately with a job_id. ' + + 'You MUST then call get_job with this job_id. get_job uses long polling (waits up to 59s). ' + + 'If done is still false, call get_job again.', + { + message: FixedWorkflowSchema.shape.message, + checks: FixedWorkflowSchema.shape.checks, + format: FixedWorkflowSchema.shape.format, + }, + async (args: any) => { + const response = jobManager.startJob( + async () => executeFixedWorkflow(args as FixedWorkflowArgs, resolvedWorkflowPath!), + { + messageText: args.message || `Run workflow: ${workflowId}`, + workflowId, + } + ); + return { content: [{ type: 'text' as const, text: JSON.stringify(response, null, 2) }] }; + } + ); + } else { + (server as any).tool( + startJobName, + 'Start a long-running job. Returns immediately with a job_id. ' + + 'You MUST then call get_job with this job_id. get_job uses long polling (waits up to 59s). ' + + 'If done is still false, call get_job again.', + { + workflow: RunWorkflowSchema.shape.workflow, + message: RunWorkflowSchema.shape.message, + checks: RunWorkflowSchema.shape.checks, + format: RunWorkflowSchema.shape.format, + }, + async (args: any) => { + const response = jobManager.startJob(async () => executeWorkflow(args as RunWorkflowArgs), { + messageText: args.message || `Run workflow: ${args.workflow}`, + workflowId: args.workflow, + }); + return { content: [{ type: 'text' as const, text: JSON.stringify(response, null, 2) }] }; + } + ); + } + + // Register get_job tool + (server as any).tool( + getJobName, + 'Check the status of a running job using long polling. Waits up to 59 seconds for the job to finish. ' + + 'Returns immediately if the job is already done. If done is still false after the wait, call again.', + { + job_id: z.string().describe('The job ID returned by start_job.'), + }, + async (args: { job_id: string }) => { + const response = await jobManager.getJob(args.job_id); + return { content: [{ type: 'text' as const, text: JSON.stringify(response, null, 2) }] }; + } + ); +} + /** * Validate a Bearer token from an HTTP request using timing-safe comparison. */ @@ -575,7 +683,11 @@ export async function createHttpMcpServer(options: McpServerOptions): Promise { - return this.handleMessage(args.message, args.session_id); + if (this.options.asyncMode) { + // Async job mode: register start_job and get_job instead of blocking tool + // Requires a TaskStore — jobs are stored as regular Visor tasks + if (!this.taskStore) { + const { SqliteTaskStore } = await import('../agent-protocol/task-store'); + this.taskStore = new SqliteTaskStore(); + await this.taskStore.initialize(); } - ); + + const { JobManager } = await import('../mcp-job-manager'); + const longPollMs = this.options.longPollTimeout + ? this.options.longPollTimeout * 1000 + : undefined; + const jobManager = new JobManager(this.taskStore, { longPollTimeoutMs: longPollMs }); + + const startJobName = toolName === 'send_message' ? 'start_job' : `start_${toolName}`; + const allChecks = Object.keys(this.cfg.checks || {}); + + (mcpServer as any).tool( + startJobName, + 'Start a long-running job. Returns immediately with a job_id. ' + + 'You MUST then call get_job with this job_id. get_job uses long polling (waits up to 59s). ' + + 'If done is still false, call get_job again.', + { + message: z.string().describe('The message to send to the assistant.'), + session_id: z + .string() + .optional() + .describe( + 'Optional conversation session ID for maintaining context across messages. ' + + 'If omitted, a new session is created.' + ), + }, + async (args: { message: string; session_id?: string }) => { + const response = jobManager.startJob( + async () => this.handleMessage(args.message, args.session_id), + { + messageText: args.message, + workflowId: allChecks.join(','), + } + ); + return { content: [{ type: 'text' as const, text: JSON.stringify(response, null, 2) }] }; + } + ); + + (mcpServer as any).tool( + 'get_job', + 'Check the status of a running job using long polling. Waits up to 59 seconds for the job to finish. ' + + 'Returns immediately if the job is already done. If done is still false after the wait, call again.', + { + job_id: z.string().describe('The job ID returned by start_job.'), + }, + async (args: { job_id: string }) => { + const response = await jobManager.getJob(args.job_id); + return { content: [{ type: 'text' as const, text: JSON.stringify(response, null, 2) }] }; + } + ); + + console.error(`Visor MCP frontend started in async job mode`); + } else { + (mcpServer as any).tool( + toolName, + toolDescription, + { + message: z.string().describe('The message to send to the assistant.'), + session_id: z + .string() + .optional() + .describe( + 'Optional conversation session ID for maintaining context across messages. ' + + 'If omitted, a new session is created. Re-use the same session_id for follow-up messages.' + ), + }, + async (args: { message: string; session_id?: string }) => { + return this.handleMessage(args.message, args.session_id); + } + ); + } const { transports } = this; diff --git a/src/runners/runner-factory.ts b/src/runners/runner-factory.ts index cc69d6fa..636fc777 100644 --- a/src/runners/runner-factory.ts +++ b/src/runners/runner-factory.ts @@ -150,6 +150,12 @@ export async function createRunner( tlsKey: mcpAny.tls_key || process.env.VISOR_MCP_TLS_KEY, toolName: mcpAny.tool_name || process.env.VISOR_MCP_TOOL_NAME, toolDescription: mcpAny.tool_description || process.env.VISOR_MCP_TOOL_DESCRIPTION, + asyncMode: options.mcpAsync || mcpAny.async_mode || process.env.VISOR_MCP_ASYNC === 'true', + longPollTimeout: + mcpAny.long_poll_timeout || + (process.env.VISOR_MCP_POLL_TIMEOUT + ? parseInt(process.env.VISOR_MCP_POLL_TIMEOUT) + : undefined), }); } diff --git a/src/types/cli.ts b/src/types/cli.ts index 217d2029..2b56be81 100644 --- a/src/types/cli.ts +++ b/src/types/cli.ts @@ -77,6 +77,8 @@ export interface CliOptions { mcpPort?: number; /** Bearer token for MCP HTTP server authentication */ mcpAuthToken?: string; + /** Enable async job mode for MCP server (start_job/get_job pattern) */ + mcpAsync?: boolean; /** Enable interactive TUI (chat + logs tabs) */ tui?: boolean; /** Keep workspace folders after execution (for debugging) */ diff --git a/tests/unit/mcp-job-manager.test.ts b/tests/unit/mcp-job-manager.test.ts new file mode 100644 index 00000000..df116c72 --- /dev/null +++ b/tests/unit/mcp-job-manager.test.ts @@ -0,0 +1,306 @@ +import { JobManager } from '../../src/mcp-job-manager'; +import type { TaskStore } from '../../src/agent-protocol/task-store'; +import type { + AgentTask, + TaskState, + AgentMessage, + AgentArtifact, +} from '../../src/agent-protocol/types'; + +/** + * In-memory mock TaskStore for testing the JobManager without SQLite. + */ +function createMockTaskStore(): TaskStore { + const tasks = new Map(); + + return { + async initialize() {}, + async shutdown() {}, + + createTask(params) { + const id = require('crypto').randomUUID(); + const task: AgentTask = { + id, + context_id: params.contextId, + status: { + state: 'submitted', + timestamp: new Date().toISOString(), + }, + artifacts: [], + history: [], + metadata: params.requestMetadata, + workflow_id: params.workflowId, + }; + tasks.set(id, task); + return task; + }, + + getTask(taskId: string) { + return tasks.get(taskId) || null; + }, + + listTasks(filter) { + const all = Array.from(tasks.values()); + return { tasks: all.slice(0, filter.limit || 50), total: all.length }; + }, + + updateTaskState(taskId: string, newState: TaskState, statusMessage?: AgentMessage) { + const task = tasks.get(taskId); + if (!task) throw new Error(`Task ${taskId} not found`); + task.status = { + state: newState, + message: statusMessage, + timestamp: new Date().toISOString(), + }; + }, + + claimTask() {}, + heartbeat() {}, + addArtifact(taskId: string, artifact: AgentArtifact) { + const task = tasks.get(taskId); + if (task) task.artifacts.push(artifact); + }, + appendHistory() {}, + setRunId() {}, + updateMetadata() {}, + claimNextSubmitted() { + return null; + }, + reclaimStaleTasks() { + return []; + }, + releaseClaim() {}, + failStaleTasks() { + return 0; + }, + failStaleTasksByAge() { + return 0; + }, + purgeOldTasks() { + return 0; + }, + deleteExpiredTasks() { + return 0; + }, + deleteTask() {}, + } as TaskStore; +} + +describe('JobManager', () => { + let manager: JobManager; + let taskStore: TaskStore; + + beforeEach(() => { + taskStore = createMockTaskStore(); + manager = new JobManager(taskStore); + }); + + it('should start a job and return running response immediately', () => { + const response = manager.startJob( + async () => { + await new Promise(resolve => setTimeout(resolve, 5000)); + return { data: 'result' }; + }, + { messageText: 'test job' } + ); + + expect(response.job_id).toHaveLength(8); + expect(response.done).toBe(false); + expect(response.status).toBe('running'); + expect(response.polling.recommended_next_action).toBe('get_job'); + expect(response.polling.recommended_delay_seconds).toBe(0); + expect(response.result).toBeNull(); + expect(response.error).toBeNull(); + expect(response.next_instruction_for_model).toContain('get_job'); + }); + + it('should long-poll and return result when job completes during wait', async () => { + // Job completes near-instantly — get_job should return within the first poll interval (500ms) + const response = manager.startJob( + async () => ({ content: [{ type: 'text', text: 'All good' }] }), + { messageText: 'test job' } + ); + + const start = Date.now(); + const status = await manager.getJob(response.job_id); + const elapsed = Date.now() - start; + + expect(status.status).toBe('completed'); + expect(status.done).toBe(true); + expect(status.result).toBe('All good'); + expect(status.progress.percent).toBe(100); + expect(status.polling.recommended_next_action).toBe('none'); + // Should resolve within the first poll interval (~500ms), not wait the full timeout + expect(elapsed).toBeLessThan(2000); + }); + + it('should long-poll and return immediately for already-completed jobs', async () => { + const response = manager.startJob(async () => ({ content: [{ type: 'text', text: 'done' }] }), { + messageText: 'test', + }); + + // Wait for completion first + await new Promise(resolve => setTimeout(resolve, 50)); + + const start = Date.now(); + const status = await manager.getJob(response.job_id); + const elapsed = Date.now() - start; + + expect(status.status).toBe('completed'); + expect(elapsed).toBeLessThan(100); // should be near-instant + }); + + it('should handle job failure via long poll', async () => { + const response = manager.startJob( + async () => { + throw new Error('Something went wrong'); + }, + { messageText: 'failing job' } + ); + + const status = await manager.getJob(response.job_id); + expect(status.status).toBe('failed'); + expect(status.done).toBe(true); + expect(status.error).toBeTruthy(); + expect(status.error!.message).toBe('Something went wrong'); + expect(status.error!.retryable).toBe(true); + expect(status.result).toBeNull(); + }); + + it('should return expired for unknown job IDs immediately', async () => { + const start = Date.now(); + const status = await manager.getJob('nonexistent'); + const elapsed = Date.now() - start; + + expect(status.status).toBe('expired'); + expect(status.done).toBe(true); + expect(status.error!.code).toBe('JOB_EXPIRED'); + expect(elapsed).toBeLessThan(100); // no waiting for unknown jobs + }); + + it('should return 8-char job IDs (UUID prefix)', () => { + const response = manager.startJob(async () => 'test', { messageText: 'test' }); + expect(response.job_id).toMatch(/^[0-9a-f]{8}$/); + }); + + it('should look up jobs by short ID prefix', async () => { + const response = manager.startJob( + async () => ({ content: [{ type: 'text', text: 'found it' }] }), + { messageText: 'test' } + ); + + const status = await manager.getJob(response.job_id); + expect(status.status).toBe('completed'); + expect(status.result).toBe('found it'); + }); + + it('should store jobs as tasks visible to TaskStore', async () => { + manager.startJob(async () => 'result', { messageText: 'visible task' }); + + await new Promise(resolve => setTimeout(resolve, 50)); + + const { tasks } = taskStore.listTasks({ limit: 10 }); + expect(tasks.length).toBe(1); + expect(tasks[0].metadata?.source).toBe('mcp'); + expect(tasks[0].metadata?.async_job).toBe(true); + }); + + it('should pass workflowId to the task', () => { + manager.startJob(async () => 'result', { + messageText: 'test', + workflowId: 'code-review', + }); + + const { tasks } = taskStore.listTasks({ limit: 10 }); + expect(tasks[0].workflow_id).toBe('code-review'); + }); + + it('should extract result from plain string return', async () => { + const response = manager.startJob(async () => 'plain string result', { + messageText: 'test', + }); + + const status = await manager.getJob(response.job_id); + expect(status.status).toBe('completed'); + expect(status.result).toBe('plain string result'); + }); + + it('should handle non-Error thrown values', async () => { + const response = manager.startJob( + async () => { + throw 'string error message'; + }, + { messageText: 'test' } + ); + + const status = await manager.getJob(response.job_id); + expect(status.status).toBe('failed'); + expect(status.error!.message).toBe('string error message'); + }); + + it('should allow concurrent getJob calls on the same job', async () => { + const response = manager.startJob( + async () => { + await new Promise(resolve => setTimeout(resolve, 200)); + return { content: [{ type: 'text', text: 'concurrent result' }] }; + }, + { messageText: 'test' } + ); + + // Both should resolve with the same result + const [status1, status2] = await Promise.all([ + manager.getJob(response.job_id), + manager.getJob(response.job_id), + ]); + + expect(status1.status).toBe('completed'); + expect(status2.status).toBe('completed'); + expect(status1.result).toBe('concurrent result'); + expect(status2.result).toBe('concurrent result'); + }); + + it('should include all response fields in completed status', async () => { + const response = manager.startJob( + async () => ({ content: [{ type: 'text', text: 'full response' }] }), + { messageText: 'test' } + ); + + const status = await manager.getJob(response.job_id); + expect(status).toEqual( + expect.objectContaining({ + job_id: expect.stringMatching(/^[0-9a-f]{8}$/), + status: 'completed', + done: true, + progress: { percent: 100, step: 'completed', message: 'Job finished successfully' }, + polling: { recommended_next_action: 'none', recommended_delay_seconds: 0 }, + result: 'full response', + error: null, + user_message: 'The result is ready.', + next_instruction_for_model: 'Use the result to answer the user.', + }) + ); + }); + + it('should respect custom long poll timeout', async () => { + const TIMEOUT_MS = 1000; + const shortManager = new JobManager(taskStore, { longPollTimeoutMs: TIMEOUT_MS }); + const response = shortManager.startJob( + async () => { + // Job takes much longer than the poll timeout + await new Promise(resolve => setTimeout(resolve, 10_000)); + return 'late'; + }, + { messageText: 'slow job' } + ); + + const start = Date.now(); + const status = await shortManager.getJob(response.job_id); + const elapsed = Date.now() - start; + + // Should timeout near the configured 1000ms, with tolerance for poll interval (500ms) + expect(status.done).toBe(false); + expect(status.status).toBe('running'); + expect(elapsed).toBeGreaterThanOrEqual(TIMEOUT_MS); + expect(elapsed).toBeLessThan(TIMEOUT_MS + 600); // at most one extra poll interval + }); +});