Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
225 changes: 225 additions & 0 deletions src/bridge/message-bridge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,25 @@ export class MessageBridge {
cardMessageId: string;
turnId: string;
}>();
/**
* AskUserQuestion calls that fired between turns (no activeTurn in the
* executor at the time of the PreToolUse hook). The bridge displays them
* as standalone question cards and routes the user's next typed reply
* back to {@link PersistentClaudeExecutor.resolveQuestion}.
*
* Without this, the question text would only appear inside the coalesced
* "Agent activity" body and the user's reply would be treated as a fresh
* user turn — which then blocks for 6 minutes on the still-hanging hook.
*
* Single in-flight slot per chatId. If a second between-turn question
* fires while one is still pending, the later one wins — the older
* resolver hangs until its 6-minute timeout (rare in practice).
*/
private pendingBetweenTurnQuestions = new Map<string, {
toolUseId: string;
questions: PendingQuestion['questions'];
cardMessageId: string;
}>();
/** Callback for activity lifecycle events (task started/completed/failed). */
onActivityEvent?: (event: ActivityEventData) => void;

Expand Down Expand Up @@ -473,6 +492,19 @@ export class MessageBridge {
if (cont) {
cont.abortController.abort();
}
// Between-turn question whose resolver is now dead — flush the
// question card to an error state and drop the bookkeeping so the
// user's next message isn't intercepted as the answer.
const q = this.pendingBetweenTurnQuestions.get(chatId);
if (q) {
this.pendingBetweenTurnQuestions.delete(chatId);
void this.finalizeBetweenTurnQuestionCard(q.cardMessageId, {
status: 'error',
userPrompt: 'Question',
responseText: '_Question canceled — agent session ended._',
toolCalls: [],
});
}
});
this.logger.info(
{
Expand Down Expand Up @@ -512,9 +544,190 @@ export class MessageBridge {
// (card + stream loop + finalize). Errors are logged inside.
void this.handleContinuationTurn(chatId, handle as ExecutionHandle);
});
exec.on('between-turn-question', (payload: {
toolUseId: string;
questions: PendingQuestion['questions'];
}) => {
void this.handleBetweenTurnQuestion(chatId, payload);
});
this.logger.debug({ chatId }, 'MessageBridge: attached executor subscriptions');
}

/**
* Surface a between-turn AskUserQuestion as its own card on the chat.
* Called from the `between-turn-question` executor event. The user's
* next typed reply for this chatId is intercepted in
* {@link handleMessage} and routed back via the executor's
* {@link PersistentClaudeExecutor.resolveQuestion}.
*
* Single in-flight slot per chatId — if one is already pending, the
* older one is abandoned (its resolver will hang to the SDK's 6-min
* timeout, then return empty answers). The older card is marked
* "superseded" so the user sees what happened.
*/
private async handleBetweenTurnQuestion(
chatId: string,
payload: { toolUseId: string; questions: PendingQuestion['questions'] },
): Promise<void> {
if (!payload.questions || payload.questions.length === 0) {
this.logger.warn({ chatId, toolUseId: payload.toolUseId }, 'between-turn question with no parsed questions; skipping card');
return;
}
const existing = this.pendingBetweenTurnQuestions.get(chatId);
if (existing) {
this.logger.warn(
{ chatId, prevToolUseId: existing.toolUseId, newToolUseId: payload.toolUseId },
'MessageBridge: between-turn question superseded by newer one',
);
void this.finalizeBetweenTurnQuestionCard(existing.cardMessageId, {
status: 'error',
userPrompt: 'Question',
responseText: '_Superseded by a newer question._',
toolCalls: [],
});
this.pendingBetweenTurnQuestions.delete(chatId);
}

// Show only the first question on the card (matches runOneTurn — the
// bridge surfaces one sub-question at a time, advancing the card on
// each typed reply). Multi-question case is logged below; the existing
// bridge code path doesn't support advancing between-turn sub-questions
// yet, so we route only the first answer and short-circuit the rest.
if (payload.questions.length > 1) {
this.logger.warn(
{ chatId, toolUseId: payload.toolUseId, total: payload.questions.length },
'between-turn AskUserQuestion has multiple sub-questions; only the first will be displayed and routed',
);
}
const displayQuestion: PendingQuestion = {
toolUseId: payload.toolUseId,
questions: [payload.questions[0]],
};

const card: CardState = {
status: 'waiting_for_input',
userPrompt: '(between-turn question)',
responseText: '',
toolCalls: [],
pendingQuestion: displayQuestion,
};

const send = this.sender.sendQuestionCard
? this.sender.sendQuestionCard.bind(this.sender)
: this.sender.sendCard.bind(this.sender);
let cardMessageId: string | undefined;
try {
cardMessageId = await send(chatId, card);
} catch (err) {
this.logger.error({ err, chatId, toolUseId: payload.toolUseId }, 'MessageBridge: failed to send between-turn question card');
return;
}
if (!cardMessageId) {
this.logger.warn({ chatId, toolUseId: payload.toolUseId }, 'MessageBridge: between-turn question card returned no messageId');
return;
}

this.pendingBetweenTurnQuestions.set(chatId, {
toolUseId: payload.toolUseId,
questions: payload.questions,
cardMessageId,
});
this.logger.info(
{ chatId, toolUseId: payload.toolUseId, cardMessageId },
'MessageBridge: between-turn question card opened',
);
}

/**
* Update the dedicated question card after the user answers (or after the
* executor is torn down). Uses updateQuestionCard if the sender supports
* it, else falls back to updateCard.
*/
private async finalizeBetweenTurnQuestionCard(
cardMessageId: string,
state: CardState,
): Promise<void> {
try {
const update = this.sender.updateQuestionCard
? this.sender.updateQuestionCard.bind(this.sender)
: this.sender.updateCard.bind(this.sender);
await update(cardMessageId, state);
} catch (err) {
this.logger.warn({ err, cardMessageId }, 'MessageBridge: failed to update between-turn question card');
}
}

/**
* Treat the user's typed reply as the answer to a pending between-turn
* question. Routes through {@link PersistentClaudeExecutor.resolveQuestion}
* so the AskUserQuestion PreToolUse hook unblocks and the SDK proceeds.
* Returns true if the message was consumed as an answer (caller should
* NOT continue to executeQuery).
*/
private async tryHandleBetweenTurnQuestionReply(msg: IncomingMessage): Promise<boolean> {
const { chatId, text, imageKey } = msg;
const pending = this.pendingBetweenTurnQuestions.get(chatId);
if (!pending) return false;

// Image-only reply isn't a valid answer; nudge the user.
if (imageKey && !text.trim()) {
await this.sender.sendText(chatId, '请用文字回复问题卡片中的选项编号或自定义答案。');
return true;
}

const trimmed = text.trim();
const firstQ = pending.questions[0];
let answerText: string;
const num = parseInt(trimmed, 10);
if (Number.isFinite(num) && num >= 1 && num <= firstQ.options.length) {
answerText = firstQ.options[num - 1].label;
} else {
answerText = trimmed;
}

const answers: Record<string, string> = { [firstQ.header]: answerText };
// For multi-sub-question between-turn calls (rare; logged on arrival),
// synthesize empty answers for the rest so the hook still resolves.
for (let i = 1; i < pending.questions.length; i++) {
answers[pending.questions[i].header] = '';
}

const executor = this.persistentRegistry?.peek(chatId);
if (!executor) {
this.logger.warn(
{ chatId, toolUseId: pending.toolUseId },
'MessageBridge: between-turn answer arrived but executor is gone; dropping',
);
this.pendingBetweenTurnQuestions.delete(chatId);
await this.finalizeBetweenTurnQuestionCard(pending.cardMessageId, {
status: 'error',
userPrompt: 'Question',
responseText: '_Question canceled — agent session ended._',
toolCalls: [],
});
return true;
}

this.pendingBetweenTurnQuestions.delete(chatId);
try {
executor.resolveQuestion(pending.toolUseId, answers);
} catch (err) {
this.logger.error({ err, chatId, toolUseId: pending.toolUseId }, 'MessageBridge: resolveQuestion threw');
}
this.logger.info(
{ chatId, toolUseId: pending.toolUseId, answer: answerText },
'MessageBridge: resolved between-turn question',
);

await this.finalizeBetweenTurnQuestionCard(pending.cardMessageId, {
status: 'complete',
userPrompt: 'Question',
responseText: `> **Reply:** ${answerText}`,
toolCalls: [],
});
return true;
}

/**
* Buffer a spontaneous message and (re)arm the coalesce timer. We extract
* just the human-readable bits — assistant text and tool_use intent —
Expand Down Expand Up @@ -1011,6 +1224,18 @@ export class MessageBridge {
return;
}

// Between-turn AskUserQuestion reply — must run BEFORE the
// running-task / queue branches below, because no `runningTasks` entry
// exists for between-turn questions (they fire from the persistent
// executor outside of any user-initiated turn). If we let the message
// fall through, it would spawn a fresh turn that immediately blocks on
// the still-hanging hook for 6 minutes. See:
// [[bug_feishu_v2_mobile_action_buttons]] history and
// PersistentClaudeExecutor.askUserQuestionHook.
if (await this.tryHandleBetweenTurnQuestionReply(msg)) {
return;
}

// Check if there's a pending question waiting for an answer
const task = this.runningTasks.get(chatId);
if (task && task.pendingQuestion) {
Expand Down
62 changes: 62 additions & 0 deletions src/engines/claude/persistent-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,47 @@ export function classifyBurstSource(raw: unknown): BurstSource {
return 'spontaneous';
}

/**
* Parse the `tool_input` payload of an AskUserQuestion PreToolUse hook into
* the bridge's PendingQuestion['questions'] shape. Mirrors stream-processor's
* `extractPendingQuestion` (kept separate so the persistent executor doesn't
* have to import from stream-processor).
*
* Exported for unit tests; not part of the public executor API.
*/
export function parseAskUserQuestionInput(input: unknown): Array<{
question: string;
header: string;
options: Array<{ label: string; description: string }>;
multiSelect: boolean;
}> {
if (!input || typeof input !== 'object') return [];
const inp = input as Record<string, unknown>;
const questions = inp.questions;
if (!Array.isArray(questions)) return [];
return questions.map((q: any) => ({
question: String(q?.question || ''),
header: String(q?.header || ''),
options: Array.isArray(q?.options)
? q.options.map((o: any) => ({
label: String(o?.label || ''),
description: String(o?.description || ''),
}))
: [],
multiSelect: Boolean(q?.multiSelect),
}));
}

/**
* Payload of the `between-turn-question` event. Structurally identical to
* {@link PendingQuestion} (src/types.ts); a separate name avoids a coupling
* import from the bridge types.
*/
export interface BetweenTurnQuestionEvent {
toolUseId: string;
questions: ReturnType<typeof parseAskUserQuestionInput>;
}

export class PersistentClaudeExecutor extends EventEmitter {
private state: ExecutorState = 'starting';
private inputQueue: AsyncQueue<SDKUserMessage>;
Expand Down Expand Up @@ -660,6 +701,18 @@ export class PersistentClaudeExecutor extends EventEmitter {
// marks AskUserQuestion as requiresUserInteraction; in bypassPermissions
// mode we intercept, pause until the bridge supplies the user's answers,
// then return them as updatedInput so the SDK auto-allows.
//
// Between-turn fire: if the hook trips while no activeTurn is in flight
// (teammate / `/goal` / continuation-burst follow-up), we additionally
// emit `between-turn-question` so the bridge can mount a dedicated
// question card on the chat. Without this side-channel the question text
// only lands in the coalesced "Agent activity" card body and the user's
// typed reply gets treated as a fresh user turn (which then blocks for
// 6 minutes on this still-hanging hook). The resolver registration path
// below is unchanged — the bridge calls resolveQuestion() to feed answers
// back through the same `updatedInput` mechanism. The emitted event
// shape matches PendingQuestion (src/types.ts) so the bridge can use it
// verbatim in CardState.
const askUserQuestionHook = async (
input: { hook_event_name: string; tool_name: string; tool_input: unknown; tool_use_id: string },
_toolUseId: string | undefined,
Expand All @@ -669,6 +722,15 @@ export class PersistentClaudeExecutor extends EventEmitter {
const id = input.tool_use_id;
const answers = await new Promise<Record<string, string>>((resolve) => {
this.pendingQuestionResolvers.set(id, resolve);
// Surface the question to the bridge if we're between turns (no
// listener owns the live stream — the agent_activity coalesce card
// would otherwise eat the question silently). Done AFTER setting
// the resolver so a fast bridge reply can't race past it.
if (!this.activeTurn) {
const parsed = parseAskUserQuestionInput(toolInput);
log.info({ toolUseId: id, questionCount: parsed.length }, 'PersistentExecutor: between-turn AskUserQuestion');
this.emit('between-turn-question', { toolUseId: id, questions: parsed });
}
const timeout = setTimeout(() => {
if (this.pendingQuestionResolvers.delete(id)) {
log.warn({ toolUseId: id }, 'AskUserQuestion hook timed out (6 min) — empty answers');
Expand Down
Loading
Loading