Skip to content

Commit 30b4243

Browse files
1872-svgclaude
andcommitted
feat: streaming output, /verbose mode, fork agent, and bugfixes
- Stream-json output with real-time Telegram message editing - /verbose command to show tool calls in streaming message - Sliding window display (last 8 tool lines, 15 text lines) to stay within 4096 limit - Newline separation between successive assistant message turns - Fork agent (haiku) for parallel queries when main agent is busy - /kill command to stop active agent - Fix duplicate messages from edit/sendMessage race condition - Fix verbose final message lost when edit fails silently - Fix verbose fallback duplicate on short text-only responses Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent fbeb45e commit 30b4243

File tree

2 files changed

+512
-115
lines changed

2 files changed

+512
-115
lines changed

src/commands/telegram.ts

Lines changed: 212 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { ensureProjectClaudeMd, run, runUserMessage } from "../runner";
1+
import { ensureProjectClaudeMd, run, runUserMessage, runFork, killActive, isMainBusy } from "../runner";
22
import { getSettings, loadSettings } from "../config";
33
import { resetSession } from "../sessions";
44
import { transcribeAudioToText } from "../whisper";
@@ -300,6 +300,128 @@ async function sendTyping(token: string, chatId: number, threadId?: number): Pro
300300
}).catch(() => {});
301301
}
302302

303+
// Chat IDs with verbose tool display enabled
304+
const verboseChats = new Set<number>();
305+
306+
/**
307+
* Build a streaming callback using editMessageText.
308+
* On first chunk: send a placeholder message to get message_id.
309+
* On subsequent chunks (throttled): edit that message with accumulated plain text.
310+
* In verbose mode, tool call/result lines appear above the text response.
311+
*/
312+
function makeStreamCallback(
313+
token: string,
314+
chatId: number,
315+
threadId: number | undefined,
316+
options: { intervalMs?: number; verbose?: boolean } = {}
317+
): { onChunk: (text: string) => void; onToolEvent: (line: string) => void; waitForStreamMsg: () => Promise<number | null> } {
318+
const { intervalMs = 500, verbose = false } = options;
319+
let textAcc = "";
320+
const toolLines: string[] = [];
321+
let lastSentAt = 0;
322+
let timer: ReturnType<typeof setTimeout> | null = null;
323+
let streamMsgId: number | null = null;
324+
let initPromise: Promise<void> | null = null;
325+
let finalized = false;
326+
327+
const getDisplay = () => {
328+
const MAX_TOOL_LINES = 8;
329+
const MAX_TEXT_LINES = 15;
330+
let toolPart: string;
331+
if (toolLines.length > MAX_TOOL_LINES) {
332+
const shown = toolLines.slice(-MAX_TOOL_LINES);
333+
toolPart = `[...${toolLines.length - MAX_TOOL_LINES} earlier]\n` + shown.join("\n");
334+
} else {
335+
toolPart = toolLines.join("\n");
336+
}
337+
let textPart = textAcc;
338+
const textLines = textPart.split("\n");
339+
if (textLines.length > MAX_TEXT_LINES) {
340+
textPart = `[...]\n` + textLines.slice(-MAX_TEXT_LINES).join("\n");
341+
}
342+
return toolPart + (textPart ? (toolPart ? "\n\n" : "") + textPart : "");
343+
};
344+
345+
const editStream = () => {
346+
if (!streamMsgId || finalized) return;
347+
let display: string;
348+
if (verbose) {
349+
display = getDisplay();
350+
} else {
351+
// Keep last N lines of text for streaming preview
352+
const lines = textAcc.split("\n");
353+
display = lines.length > 30 ? `[...]\n${lines.slice(-30).join("\n")}` : textAcc;
354+
}
355+
if (!display) return;
356+
callApi(token, "editMessageText", {
357+
chat_id: chatId,
358+
message_id: streamMsgId,
359+
text: display.slice(0, 4096),
360+
}).catch(() => {});
361+
};
362+
363+
const flush = async () => {
364+
const display = verbose ? getDisplay() : textAcc;
365+
if (!display) return;
366+
lastSentAt = Date.now();
367+
368+
if (!streamMsgId && !initPromise) {
369+
initPromise = (async () => {
370+
try {
371+
const res = await callApi<{ ok: boolean; result: { message_id: number } }>(
372+
token, "sendMessage", {
373+
chat_id: chatId,
374+
text: "⏳",
375+
...(threadId ? { message_thread_id: threadId } : {}),
376+
}
377+
);
378+
if (res.ok) {
379+
streamMsgId = res.result.message_id;
380+
editStream();
381+
}
382+
} catch {}
383+
})();
384+
await initPromise;
385+
} else {
386+
if (initPromise) await initPromise;
387+
editStream();
388+
}
389+
};
390+
391+
const onChunk = (text: string) => {
392+
textAcc += text;
393+
const now = Date.now();
394+
if (now - lastSentAt >= intervalMs) {
395+
if (timer) { clearTimeout(timer); timer = null; }
396+
flush();
397+
} else if (!timer) {
398+
timer = setTimeout(() => { timer = null; flush(); }, intervalMs - (now - lastSentAt));
399+
}
400+
};
401+
402+
const onToolEvent = (line: string) => {
403+
if (!verbose) return;
404+
toolLines.push(line);
405+
// Use same throttle logic as onChunk to avoid spamming the API
406+
const now = Date.now();
407+
if (now - lastSentAt >= intervalMs) {
408+
if (timer) { clearTimeout(timer); timer = null; }
409+
flush();
410+
} else if (!timer) {
411+
timer = setTimeout(() => { timer = null; flush(); }, intervalMs - (now - lastSentAt));
412+
}
413+
};
414+
415+
const waitForStreamMsg = async (): Promise<{ msgId: number | null; hadToolLines: boolean }> => {
416+
if (timer) { clearTimeout(timer); timer = null; }
417+
if (initPromise) await initPromise;
418+
finalized = true;
419+
return { msgId: streamMsgId, hadToolLines: toolLines.length > 0 };
420+
};
421+
422+
return { onChunk, onToolEvent, waitForStreamMsg };
423+
}
424+
303425
function extractReactionDirective(text: string): { cleanedText: string; reactionEmoji: string | null } {
304426
let reactionEmoji: string | null = null;
305427
const cleanedText = text
@@ -516,6 +638,47 @@ async function handleMessage(message: TelegramMessage): Promise<void> {
516638
return;
517639
}
518640

641+
if (command === "/kill") {
642+
const killed = killActive();
643+
await sendMessage(config.token, chatId, killed ? "Killed active agent." : "No active agent running.", threadId);
644+
return;
645+
}
646+
647+
if (command === "/verbose") {
648+
if (verboseChats.has(chatId)) {
649+
verboseChats.delete(chatId);
650+
await sendMessage(config.token, chatId, "Verbose mode off.", threadId);
651+
} else {
652+
verboseChats.add(chatId);
653+
await sendMessage(config.token, chatId, "Verbose mode on — tool calls will be shown.", threadId);
654+
}
655+
return;
656+
}
657+
658+
if (command === "/fork") {
659+
const forkPrompt = text.replace(/^\/fork\s*/i, "").trim();
660+
if (!forkPrompt) {
661+
await sendMessage(config.token, chatId, "Usage: /fork <prompt>", threadId);
662+
return;
663+
}
664+
const typingInterval = setInterval(() => sendTyping(config.token, chatId, threadId), 4000);
665+
try {
666+
await sendTyping(config.token, chatId, threadId);
667+
const senderLabel = message.from?.username ?? String(userId ?? "unknown");
668+
const result = await runFork(`[Telegram from ${senderLabel}]\nMessage: ${forkPrompt}`);
669+
if (result.exitCode !== 0) {
670+
await sendMessage(config.token, chatId, `Fork error (exit ${result.exitCode}): ${result.stderr || "Unknown error"}`, threadId);
671+
} else {
672+
await sendMessage(config.token, chatId, result.stdout || "(empty response)", threadId);
673+
}
674+
} catch (err) {
675+
await sendMessage(config.token, chatId, `Fork error: ${err instanceof Error ? err.message : String(err)}`, threadId);
676+
} finally {
677+
clearInterval(typingInterval);
678+
}
679+
return;
680+
}
681+
519682
// Secretary: detect reply to a bot alert message → treat as custom reply
520683
const replyToMsgId = message.reply_to_message?.message_id;
521684
if (replyToMsgId && text && botId && message.reply_to_message?.from?.id === botId) {
@@ -598,18 +761,63 @@ async function handleMessage(message: TelegramMessage): Promise<void> {
598761
);
599762
}
600763
const prefixedPrompt = promptParts.join("\n");
601-
const result = await runUserMessage("telegram", prefixedPrompt);
764+
const busy = isMainBusy();
765+
const verbose = verboseChats.has(chatId);
766+
let result;
767+
let streamMsgId: number | null = null;
768+
let hadToolLines = false;
769+
if (busy) {
770+
result = await runFork(prefixedPrompt);
771+
} else {
772+
const stream = makeStreamCallback(config.token, chatId, threadId, { verbose });
773+
result = await runUserMessage("telegram", prefixedPrompt, stream.onChunk, stream.onToolEvent);
774+
const streamResult = await stream.waitForStreamMsg();
775+
streamMsgId = streamResult.msgId;
776+
hadToolLines = streamResult.hadToolLines;
777+
}
602778

603779
if (result.exitCode !== 0) {
604-
await sendMessage(config.token, chatId, `Error (exit ${result.exitCode}): ${result.stderr || "Unknown error"}`, threadId);
780+
const errText = `Error (exit ${result.exitCode}): ${result.stderr || "Unknown error"}`;
781+
if (streamMsgId) {
782+
await callApi(config.token, "editMessageText", {
783+
chat_id: chatId, message_id: streamMsgId, text: errText,
784+
}).catch(() => sendMessage(config.token, chatId, errText, threadId));
785+
} else {
786+
await sendMessage(config.token, chatId, errText, threadId);
787+
}
605788
} else {
606789
const { cleanedText, reactionEmoji } = extractReactionDirective(result.stdout || "");
607790
if (reactionEmoji) {
608791
await sendReaction(config.token, chatId, message.message_id, reactionEmoji).catch((err) => {
609792
console.error(`[Telegram] Failed to send reaction for ${label}: ${err instanceof Error ? err.message : err}`);
610793
});
611794
}
612-
await sendMessage(config.token, chatId, cleanedText || "(empty response)", threadId);
795+
const finalText = cleanedText || "(empty response)";
796+
if (streamMsgId) {
797+
// Edit the streaming message with final formatted HTML.
798+
// editStream() already set the message to the correct plain text content,
799+
// so if all edits fail (e.g. "message is not modified"), do NOT send a new
800+
// message — the user already sees the correct content and a sendMessage
801+
// would create a duplicate.
802+
const html = markdownToTelegramHtml(normalizeTelegramText(finalText));
803+
await callApi(config.token, "editMessageText", {
804+
chat_id: chatId, message_id: streamMsgId,
805+
text: html.slice(0, 4096), parse_mode: "HTML",
806+
}).catch(() => callApi(config.token, "editMessageText", {
807+
chat_id: chatId, message_id: streamMsgId,
808+
text: finalText.slice(0, 4096),
809+
}).catch(() => {
810+
// If all edits fail and the stream message has tool output (verbose),
811+
// send the final response as a new message. But if there were no tool
812+
// lines, the stream message already shows the correct text — "not
813+
// modified" just means it's already right, so don't send a duplicate.
814+
if (verbose && hadToolLines) {
815+
return sendMessage(config.token, chatId, finalText, threadId);
816+
}
817+
}));
818+
} else {
819+
await sendMessage(config.token, chatId, finalText, threadId);
820+
}
613821
}
614822
} catch (err) {
615823
const errMsg = err instanceof Error ? err.message : String(err);

0 commit comments

Comments
 (0)