diff --git a/src/bridge/message-bridge.ts b/src/bridge/message-bridge.ts index 1c952c4d..59c97234 100644 --- a/src/bridge/message-bridge.ts +++ b/src/bridge/message-bridge.ts @@ -236,6 +236,25 @@ export class MessageBridge { cardMessageId: string; turnId: string; }>(); + /** + * AskUserQuestion calls that fired between turns (no activeTurn in the + * executor at the time of the PreToolUse hook). The bridge displays them + * as standalone question cards and routes the user's next typed reply + * back to {@link PersistentClaudeExecutor.resolveQuestion}. + * + * Without this, the question text would only appear inside the coalesced + * "Agent activity" body and the user's reply would be treated as a fresh + * user turn — which then blocks for 6 minutes on the still-hanging hook. + * + * Single in-flight slot per chatId. If a second between-turn question + * fires while one is still pending, the later one wins — the older + * resolver hangs until its 6-minute timeout (rare in practice). + */ + private pendingBetweenTurnQuestions = new Map(); /** Callback for activity lifecycle events (task started/completed/failed). */ onActivityEvent?: (event: ActivityEventData) => void; @@ -473,6 +492,19 @@ export class MessageBridge { if (cont) { cont.abortController.abort(); } + // Between-turn question whose resolver is now dead — flush the + // question card to an error state and drop the bookkeeping so the + // user's next message isn't intercepted as the answer. + const q = this.pendingBetweenTurnQuestions.get(chatId); + if (q) { + this.pendingBetweenTurnQuestions.delete(chatId); + void this.finalizeBetweenTurnQuestionCard(q.cardMessageId, { + status: 'error', + userPrompt: 'Question', + responseText: '_Question canceled — agent session ended._', + toolCalls: [], + }); + } }); this.logger.info( { @@ -512,9 +544,190 @@ export class MessageBridge { // (card + stream loop + finalize). Errors are logged inside. void this.handleContinuationTurn(chatId, handle as ExecutionHandle); }); + exec.on('between-turn-question', (payload: { + toolUseId: string; + questions: PendingQuestion['questions']; + }) => { + void this.handleBetweenTurnQuestion(chatId, payload); + }); this.logger.debug({ chatId }, 'MessageBridge: attached executor subscriptions'); } + /** + * Surface a between-turn AskUserQuestion as its own card on the chat. + * Called from the `between-turn-question` executor event. The user's + * next typed reply for this chatId is intercepted in + * {@link handleMessage} and routed back via the executor's + * {@link PersistentClaudeExecutor.resolveQuestion}. + * + * Single in-flight slot per chatId — if one is already pending, the + * older one is abandoned (its resolver will hang to the SDK's 6-min + * timeout, then return empty answers). The older card is marked + * "superseded" so the user sees what happened. + */ + private async handleBetweenTurnQuestion( + chatId: string, + payload: { toolUseId: string; questions: PendingQuestion['questions'] }, + ): Promise { + if (!payload.questions || payload.questions.length === 0) { + this.logger.warn({ chatId, toolUseId: payload.toolUseId }, 'between-turn question with no parsed questions; skipping card'); + return; + } + const existing = this.pendingBetweenTurnQuestions.get(chatId); + if (existing) { + this.logger.warn( + { chatId, prevToolUseId: existing.toolUseId, newToolUseId: payload.toolUseId }, + 'MessageBridge: between-turn question superseded by newer one', + ); + void this.finalizeBetweenTurnQuestionCard(existing.cardMessageId, { + status: 'error', + userPrompt: 'Question', + responseText: '_Superseded by a newer question._', + toolCalls: [], + }); + this.pendingBetweenTurnQuestions.delete(chatId); + } + + // Show only the first question on the card (matches runOneTurn — the + // bridge surfaces one sub-question at a time, advancing the card on + // each typed reply). Multi-question case is logged below; the existing + // bridge code path doesn't support advancing between-turn sub-questions + // yet, so we route only the first answer and short-circuit the rest. + if (payload.questions.length > 1) { + this.logger.warn( + { chatId, toolUseId: payload.toolUseId, total: payload.questions.length }, + 'between-turn AskUserQuestion has multiple sub-questions; only the first will be displayed and routed', + ); + } + const displayQuestion: PendingQuestion = { + toolUseId: payload.toolUseId, + questions: [payload.questions[0]], + }; + + const card: CardState = { + status: 'waiting_for_input', + userPrompt: '(between-turn question)', + responseText: '', + toolCalls: [], + pendingQuestion: displayQuestion, + }; + + const send = this.sender.sendQuestionCard + ? this.sender.sendQuestionCard.bind(this.sender) + : this.sender.sendCard.bind(this.sender); + let cardMessageId: string | undefined; + try { + cardMessageId = await send(chatId, card); + } catch (err) { + this.logger.error({ err, chatId, toolUseId: payload.toolUseId }, 'MessageBridge: failed to send between-turn question card'); + return; + } + if (!cardMessageId) { + this.logger.warn({ chatId, toolUseId: payload.toolUseId }, 'MessageBridge: between-turn question card returned no messageId'); + return; + } + + this.pendingBetweenTurnQuestions.set(chatId, { + toolUseId: payload.toolUseId, + questions: payload.questions, + cardMessageId, + }); + this.logger.info( + { chatId, toolUseId: payload.toolUseId, cardMessageId }, + 'MessageBridge: between-turn question card opened', + ); + } + + /** + * Update the dedicated question card after the user answers (or after the + * executor is torn down). Uses updateQuestionCard if the sender supports + * it, else falls back to updateCard. + */ + private async finalizeBetweenTurnQuestionCard( + cardMessageId: string, + state: CardState, + ): Promise { + try { + const update = this.sender.updateQuestionCard + ? this.sender.updateQuestionCard.bind(this.sender) + : this.sender.updateCard.bind(this.sender); + await update(cardMessageId, state); + } catch (err) { + this.logger.warn({ err, cardMessageId }, 'MessageBridge: failed to update between-turn question card'); + } + } + + /** + * Treat the user's typed reply as the answer to a pending between-turn + * question. Routes through {@link PersistentClaudeExecutor.resolveQuestion} + * so the AskUserQuestion PreToolUse hook unblocks and the SDK proceeds. + * Returns true if the message was consumed as an answer (caller should + * NOT continue to executeQuery). + */ + private async tryHandleBetweenTurnQuestionReply(msg: IncomingMessage): Promise { + const { chatId, text, imageKey } = msg; + const pending = this.pendingBetweenTurnQuestions.get(chatId); + if (!pending) return false; + + // Image-only reply isn't a valid answer; nudge the user. + if (imageKey && !text.trim()) { + await this.sender.sendText(chatId, '请用文字回复问题卡片中的选项编号或自定义答案。'); + return true; + } + + const trimmed = text.trim(); + const firstQ = pending.questions[0]; + let answerText: string; + const num = parseInt(trimmed, 10); + if (Number.isFinite(num) && num >= 1 && num <= firstQ.options.length) { + answerText = firstQ.options[num - 1].label; + } else { + answerText = trimmed; + } + + const answers: Record = { [firstQ.header]: answerText }; + // For multi-sub-question between-turn calls (rare; logged on arrival), + // synthesize empty answers for the rest so the hook still resolves. + for (let i = 1; i < pending.questions.length; i++) { + answers[pending.questions[i].header] = ''; + } + + const executor = this.persistentRegistry?.peek(chatId); + if (!executor) { + this.logger.warn( + { chatId, toolUseId: pending.toolUseId }, + 'MessageBridge: between-turn answer arrived but executor is gone; dropping', + ); + this.pendingBetweenTurnQuestions.delete(chatId); + await this.finalizeBetweenTurnQuestionCard(pending.cardMessageId, { + status: 'error', + userPrompt: 'Question', + responseText: '_Question canceled — agent session ended._', + toolCalls: [], + }); + return true; + } + + this.pendingBetweenTurnQuestions.delete(chatId); + try { + executor.resolveQuestion(pending.toolUseId, answers); + } catch (err) { + this.logger.error({ err, chatId, toolUseId: pending.toolUseId }, 'MessageBridge: resolveQuestion threw'); + } + this.logger.info( + { chatId, toolUseId: pending.toolUseId, answer: answerText }, + 'MessageBridge: resolved between-turn question', + ); + + await this.finalizeBetweenTurnQuestionCard(pending.cardMessageId, { + status: 'complete', + userPrompt: 'Question', + responseText: `> **Reply:** ${answerText}`, + toolCalls: [], + }); + return true; + } + /** * Buffer a spontaneous message and (re)arm the coalesce timer. We extract * just the human-readable bits — assistant text and tool_use intent — @@ -1011,6 +1224,18 @@ export class MessageBridge { return; } + // Between-turn AskUserQuestion reply — must run BEFORE the + // running-task / queue branches below, because no `runningTasks` entry + // exists for between-turn questions (they fire from the persistent + // executor outside of any user-initiated turn). If we let the message + // fall through, it would spawn a fresh turn that immediately blocks on + // the still-hanging hook for 6 minutes. See: + // [[bug_feishu_v2_mobile_action_buttons]] history and + // PersistentClaudeExecutor.askUserQuestionHook. + if (await this.tryHandleBetweenTurnQuestionReply(msg)) { + return; + } + // Check if there's a pending question waiting for an answer const task = this.runningTasks.get(chatId); if (task && task.pendingQuestion) { diff --git a/src/engines/claude/persistent-executor.ts b/src/engines/claude/persistent-executor.ts index 35515dce..bfd69021 100644 --- a/src/engines/claude/persistent-executor.ts +++ b/src/engines/claude/persistent-executor.ts @@ -255,6 +255,47 @@ export function classifyBurstSource(raw: unknown): BurstSource { return 'spontaneous'; } +/** + * Parse the `tool_input` payload of an AskUserQuestion PreToolUse hook into + * the bridge's PendingQuestion['questions'] shape. Mirrors stream-processor's + * `extractPendingQuestion` (kept separate so the persistent executor doesn't + * have to import from stream-processor). + * + * Exported for unit tests; not part of the public executor API. + */ +export function parseAskUserQuestionInput(input: unknown): Array<{ + question: string; + header: string; + options: Array<{ label: string; description: string }>; + multiSelect: boolean; +}> { + if (!input || typeof input !== 'object') return []; + const inp = input as Record; + const questions = inp.questions; + if (!Array.isArray(questions)) return []; + return questions.map((q: any) => ({ + question: String(q?.question || ''), + header: String(q?.header || ''), + options: Array.isArray(q?.options) + ? q.options.map((o: any) => ({ + label: String(o?.label || ''), + description: String(o?.description || ''), + })) + : [], + multiSelect: Boolean(q?.multiSelect), + })); +} + +/** + * Payload of the `between-turn-question` event. Structurally identical to + * {@link PendingQuestion} (src/types.ts); a separate name avoids a coupling + * import from the bridge types. + */ +export interface BetweenTurnQuestionEvent { + toolUseId: string; + questions: ReturnType; +} + export class PersistentClaudeExecutor extends EventEmitter { private state: ExecutorState = 'starting'; private inputQueue: AsyncQueue; @@ -660,6 +701,18 @@ export class PersistentClaudeExecutor extends EventEmitter { // marks AskUserQuestion as requiresUserInteraction; in bypassPermissions // mode we intercept, pause until the bridge supplies the user's answers, // then return them as updatedInput so the SDK auto-allows. + // + // Between-turn fire: if the hook trips while no activeTurn is in flight + // (teammate / `/goal` / continuation-burst follow-up), we additionally + // emit `between-turn-question` so the bridge can mount a dedicated + // question card on the chat. Without this side-channel the question text + // only lands in the coalesced "Agent activity" card body and the user's + // typed reply gets treated as a fresh user turn (which then blocks for + // 6 minutes on this still-hanging hook). The resolver registration path + // below is unchanged — the bridge calls resolveQuestion() to feed answers + // back through the same `updatedInput` mechanism. The emitted event + // shape matches PendingQuestion (src/types.ts) so the bridge can use it + // verbatim in CardState. const askUserQuestionHook = async ( input: { hook_event_name: string; tool_name: string; tool_input: unknown; tool_use_id: string }, _toolUseId: string | undefined, @@ -669,6 +722,15 @@ export class PersistentClaudeExecutor extends EventEmitter { const id = input.tool_use_id; const answers = await new Promise>((resolve) => { this.pendingQuestionResolvers.set(id, resolve); + // Surface the question to the bridge if we're between turns (no + // listener owns the live stream — the agent_activity coalesce card + // would otherwise eat the question silently). Done AFTER setting + // the resolver so a fast bridge reply can't race past it. + if (!this.activeTurn) { + const parsed = parseAskUserQuestionInput(toolInput); + log.info({ toolUseId: id, questionCount: parsed.length }, 'PersistentExecutor: between-turn AskUserQuestion'); + this.emit('between-turn-question', { toolUseId: id, questions: parsed }); + } const timeout = setTimeout(() => { if (this.pendingQuestionResolvers.delete(id)) { log.warn({ toolUseId: id }, 'AskUserQuestion hook timed out (6 min) — empty answers'); diff --git a/tests/between-turn-question.test.ts b/tests/between-turn-question.test.ts new file mode 100644 index 00000000..b0f1b859 --- /dev/null +++ b/tests/between-turn-question.test.ts @@ -0,0 +1,237 @@ +import { describe, it, expect } from 'vitest'; +import { + PersistentClaudeExecutor, + parseAskUserQuestionInput, +} from '../src/engines/claude/persistent-executor.js'; + +/** + * Step 1 of the between-turn AskUserQuestion fix. + * + * The hook fires from the SDK whenever Claude invokes AskUserQuestion. When + * it fires WHILE a user turn is in flight (the common case), the bridge sees + * `state.pendingQuestion` on the streaming CardState and renders a dedicated + * question card via the existing runOneTurn path. + * + * When the hook fires BETWEEN user turns (teammate / `/goal` / continuation + * follow-up), `activeTurn` is null and there is no streaming CardState to + * piggy-back on — the question text would otherwise only land in the + * coalesced "Agent activity" body, and the user's typed reply would be + * treated as a fresh user turn (which then blocks for 6 minutes on this + * still-hanging hook). PersistentClaudeExecutor must emit + * `between-turn-question` in that case so the bridge can mount a separate + * question card and route the reply via resolveQuestion(). + * + * These tests cover the executor side. The bridge wiring is covered by + * exercising MessageBridge in integration; see the manual test plan in the + * PR description. + */ + +const mockLogger = { + debug: () => {}, info: () => {}, warn: () => {}, error: () => {}, +} as any; + +function makeExec(): PersistentClaudeExecutor { + return new PersistentClaudeExecutor({ + cwd: '/tmp', + logger: mockLogger, + idleTimeoutMs: 0, + }); +} + +describe('parseAskUserQuestionInput', () => { + it('parses a well-formed tool_input', () => { + const out = parseAskUserQuestionInput({ + questions: [ + { + question: 'Pick one', + header: 'choice', + options: [ + { label: 'A', description: 'Alpha' }, + { label: 'B', description: 'Beta' }, + ], + multiSelect: false, + }, + ], + }); + expect(out).toEqual([ + { + question: 'Pick one', + header: 'choice', + options: [ + { label: 'A', description: 'Alpha' }, + { label: 'B', description: 'Beta' }, + ], + multiSelect: false, + }, + ]); + }); + + it('coerces missing / malformed fields into safe defaults', () => { + const out = parseAskUserQuestionInput({ + questions: [ + { /* no fields */ }, + { question: 'q', header: 'h', options: 'not-an-array' as any }, + ], + }); + expect(out).toHaveLength(2); + expect(out[0]).toEqual({ question: '', header: '', options: [], multiSelect: false }); + expect(out[1].options).toEqual([]); + }); + + it('returns [] for non-object input', () => { + expect(parseAskUserQuestionInput(null)).toEqual([]); + expect(parseAskUserQuestionInput(undefined)).toEqual([]); + expect(parseAskUserQuestionInput('nope')).toEqual([]); + expect(parseAskUserQuestionInput({ questions: 'no' })).toEqual([]); + }); +}); + +describe('PersistentClaudeExecutor between-turn AskUserQuestion', () => { + // Build the executor and invoke its private buildHooks() to grab the + // AskUserQuestion PreToolUse hook directly. We don't need a real SDK + // stream — only the hook itself is on the unit-test path. + function getHook(exec: PersistentClaudeExecutor): (input: any, toolUseId: string | undefined, ctx: { signal: AbortSignal }) => Promise> { + const hooks = (exec as any).buildHooks(); + const preToolUse = hooks.PreToolUse[0].hooks[0]; + return preToolUse; + } + + it('emits `between-turn-question` when no activeTurn is in flight', async () => { + const exec = makeExec(); + expect((exec as any).activeTurn).toBeNull(); + + const hook = getHook(exec); + const events: Array<{ toolUseId: string; questions: any[] }> = []; + exec.on('between-turn-question', (payload) => events.push(payload)); + + // Kick the hook and resolve it from "outside" via resolveQuestion. + const ac = new AbortController(); + const hookPromise = hook( + { + hook_event_name: 'PreToolUse', + tool_name: 'AskUserQuestion', + tool_use_id: 'toolu_btw_1', + tool_input: { + questions: [ + { + question: 'Continue?', + header: 'cont', + options: [ + { label: 'Yes', description: '' }, + { label: 'No', description: '' }, + ], + multiSelect: false, + }, + ], + }, + }, + undefined, + { signal: ac.signal }, + ); + + // The event should have fired synchronously inside the hook promise body. + expect(events).toHaveLength(1); + expect(events[0].toolUseId).toBe('toolu_btw_1'); + expect(events[0].questions).toHaveLength(1); + expect(events[0].questions[0].header).toBe('cont'); + expect(events[0].questions[0].options).toEqual([ + { label: 'Yes', description: '' }, + { label: 'No', description: '' }, + ]); + + // Feed an answer back through the executor's resolveQuestion API — + // this is what the bridge will do when the user replies in chat. + exec.resolveQuestion('toolu_btw_1', { cont: 'Yes' }); + + const result = await hookPromise; + expect(result).toEqual({ + hookSpecificOutput: { + hookEventName: 'PreToolUse', + permissionDecision: 'allow', + updatedInput: { + questions: expect.any(Array), + answers: { cont: 'Yes' }, + }, + }, + }); + }); + + it('does NOT emit `between-turn-question` when an activeTurn is in flight', async () => { + const exec = makeExec(); + // Simulate an active turn (the bridge would have started one via + // nextTurn() — we don't need the real one for this assertion). + (exec as any).activeTurn = { id: 't1', queue: { finish: () => {} }, detached: false, completed: false }; + + const hook = getHook(exec); + const events: Array = []; + exec.on('between-turn-question', (p) => events.push(p)); + + const ac = new AbortController(); + const hookPromise = hook( + { + hook_event_name: 'PreToolUse', + tool_name: 'AskUserQuestion', + tool_use_id: 'toolu_in_turn_1', + tool_input: { questions: [{ question: 'In-turn q', header: 'h', options: [], multiSelect: false }] }, + }, + undefined, + { signal: ac.signal }, + ); + + expect(events).toHaveLength(0); + + // Still resolves through the normal path + exec.resolveQuestion('toolu_in_turn_1', { h: 'answer' }); + await hookPromise; + }); + + it('the resolver registration happens BEFORE the event fires (avoids race)', async () => { + const exec = makeExec(); + const hook = getHook(exec); + + let resolverPresentWhenEventFired = false; + exec.on('between-turn-question', (payload: { toolUseId: string }) => { + // If the bridge handler tried to resolve as soon as the event arrived, + // the resolver map must already have the id registered — otherwise + // the bridge would hit the sendAnswer fallback path. + resolverPresentWhenEventFired = (exec as any).pendingQuestionResolvers.has(payload.toolUseId); + }); + + const ac = new AbortController(); + const p = hook( + { + hook_event_name: 'PreToolUse', + tool_name: 'AskUserQuestion', + tool_use_id: 'toolu_race_1', + tool_input: { questions: [{ question: 'q', header: 'h', options: [], multiSelect: false }] }, + }, + undefined, + { signal: ac.signal }, + ); + + expect(resolverPresentWhenEventFired).toBe(true); + exec.resolveQuestion('toolu_race_1', { h: 'x' }); + await p; + }); + + it('signal abort cancels the hook cleanly without leaking the resolver', async () => { + const exec = makeExec(); + const hook = getHook(exec); + const ac = new AbortController(); + const p = hook( + { + hook_event_name: 'PreToolUse', + tool_name: 'AskUserQuestion', + tool_use_id: 'toolu_abort_1', + tool_input: { questions: [{ question: 'q', header: 'h', options: [], multiSelect: false }] }, + }, + undefined, + { signal: ac.signal }, + ); + expect((exec as any).pendingQuestionResolvers.has('toolu_abort_1')).toBe(true); + ac.abort(); + const result = await p; + expect(result.hookSpecificOutput).toBeDefined(); + expect((exec as any).pendingQuestionResolvers.has('toolu_abort_1')).toBe(false); + }); +});