Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/commands/telegram.ts
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,11 @@ async function handleMessage(message: TelegramMessage): Promise<void> {
const result = await runUserMessage("telegram", prefixedPrompt);

if (result.exitCode !== 0) {
await sendMessage(config.token, chatId, `Error (exit ${result.exitCode}): ${result.stderr || "Unknown error"}`, threadId);
const isKilled = result.exitCode === 143 || result.exitCode === 137;
const errorMsg = isKilled
? `⏱ Request timed out (exit ${result.exitCode}: ${result.exitCode === 143 ? "SIGTERM" : "SIGKILL"}) — the subprocess took too long and was killed. Try again or split into smaller steps.`
: `Error (exit ${result.exitCode}): ${result.stderr || "Unknown error"}`;
await sendMessage(config.token, chatId, errorMsg, threadId);
} else {
const { cleanedText, reactionEmoji } = extractReactionDirective(result.stdout || "");
if (reactionEmoji) {
Expand Down
25 changes: 20 additions & 5 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@ const DEFAULT_SETTINGS: Settings = {
forwardToTelegram: true,
},
telegram: { token: "", allowedUserIds: [] },
discord: { token: "", allowedUserIds: [], listenChannels: [] },
discord: { token: "", allowedUserIds: [] },
security: { level: "moderate", allowedTools: [], disallowedTools: [] },
web: { enabled: false, host: "127.0.0.1", port: 4632 },
stt: { baseUrl: "", model: "" },
timeouts: { telegram: 5, heartbeat: 15, job: 30, default: 5 },
};

export interface HeartbeatExcludeWindow {
Expand All @@ -53,7 +54,6 @@ export interface TelegramConfig {
export interface DiscordConfig {
token: string;
allowedUserIds: string[]; // Discord snowflake IDs exceed Number.MAX_SAFE_INTEGER
listenChannels: string[]; // Channel IDs where bot responds to all messages (no mention needed)
}

export type SecurityLevel =
Expand All @@ -68,6 +68,17 @@ export interface SecurityConfig {
disallowedTools: string[];
}

export interface TimeoutsConfig {
/** Max seconds for a telegram message subprocess. Default: 5 min. */
telegram: number;
/** Max minutes for a heartbeat subprocess. Default: 5 min. */
heartbeat: number;
/** Max minutes for a scheduled job subprocess. Default: 30 min. */
job: number;
/** Max minutes for all other subprocesses (bootstrap, trigger, etc). Default: 5 min. */
default: number;
}

export interface Settings {
model: string;
api: string;
Expand All @@ -80,6 +91,7 @@ export interface Settings {
security: SecurityConfig;
web: WebConfig;
stt: SttConfig;
timeouts: TimeoutsConfig;
}

export interface ModelConfig {
Expand Down Expand Up @@ -157,9 +169,6 @@ function parseSettings(raw: Record<string, any>, discordUserIds?: string[]): Set
: Array.isArray(raw.discord?.allowedUserIds)
? raw.discord.allowedUserIds.map(String)
: [],
listenChannels: Array.isArray(raw.discord?.listenChannels)
? raw.discord.listenChannels.map(String)
: [],
},
security: {
level,
Expand All @@ -179,6 +188,12 @@ function parseSettings(raw: Record<string, any>, discordUserIds?: string[]): Set
baseUrl: typeof raw.stt?.baseUrl === "string" ? raw.stt.baseUrl.trim() : "",
model: typeof raw.stt?.model === "string" ? raw.stt.model.trim() : "",
},
timeouts: {
telegram: Number.isFinite(raw.timeouts?.telegram) ? Number(raw.timeouts.telegram) : 5,
heartbeat: Number.isFinite(raw.timeouts?.heartbeat) ? Number(raw.timeouts.heartbeat) : 15,
job: Number.isFinite(raw.timeouts?.job) ? Number(raw.timeouts.job) : 30,
default: Number.isFinite(raw.timeouts?.default) ? Number(raw.timeouts.default) : 5,
},
};
}

Expand Down
128 changes: 119 additions & 9 deletions src/runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,64 @@ const LEGACY_PROJECT_CLAUDE_MD = join(process.cwd(), ".claude", "CLAUDE.md");
const CLAUDECLAW_BLOCK_START = "<!-- claudeclaw:managed:start -->";
const CLAUDECLAW_BLOCK_END = "<!-- claudeclaw:managed:end -->";

// Grace period between SIGTERM and SIGKILL when a subprocess times out.
const SIGKILL_GRACE_MS = 5_000;

export interface RunResult {
stdout: string;
stderr: string;
exitCode: number;
}

const RATE_LIMIT_PATTERN = /you.ve hit your limit|out of extra usage/i;
const RATE_LIMIT_PATTERN = /you(?:'|')ve hit your limit/i;
const RATE_LIMIT_RESET_PATTERN = /resets?\s+(\d{1,2})(?::(\d{2}))?\s*(am|pm)?\s*\(?\s*UTC\s*\)?/i;

// --- Rate limit state ---
let rateLimitResetAt: number = 0; // epoch ms; 0 = not rate-limited
let rateLimitNotified: boolean = false;

function parseRateLimitResetTime(text: string): number | null {
const match = text.match(RATE_LIMIT_RESET_PATTERN);
if (!match) return null;

let hours = Number(match[1]);
const minutes = match[2] ? Number(match[2]) : 0;
const ampm = match[3]?.toLowerCase();

if (ampm === "pm" && hours < 12) hours += 12;
if (ampm === "am" && hours === 12) hours = 0;

const now = new Date();
const reset = new Date(now);
reset.setUTCHours(hours, minutes, 0, 0);
// If the reset time is in the past, it means tomorrow
if (reset.getTime() <= now.getTime()) {
reset.setUTCDate(reset.getUTCDate() + 1);
}
return reset.getTime();
}

export function isRateLimited(): boolean {
if (rateLimitResetAt === 0) return false;
if (Date.now() >= rateLimitResetAt) {
rateLimitResetAt = 0;
rateLimitNotified = false;
return false;
}
return true;
}

export function getRateLimitResetAt(): number {
return rateLimitResetAt;
}

export function wasRateLimitNotified(): boolean {
return rateLimitNotified;
}

export function markRateLimitNotified(): void {
rateLimitNotified = true;
}

// Serial queue — prevents concurrent --resume on the same session
let queue: Promise<unknown> = Promise.resolve();
Expand Down Expand Up @@ -72,12 +123,36 @@ function buildChildEnv(baseEnv: Record<string, string>, model: string, api: stri
return childEnv;
}

/**
* Resolve the subprocess timeout (in ms) for a given invocation name.
* Values are read fresh from settings on every call, so hot-reload works
* automatically: edit settings.json and the next subprocess picks it up.
*
* Name mapping:
* "telegram" → settings.timeouts.telegram (default 5 min)
* "heartbeat" → settings.timeouts.heartbeat (default 5 min)
* anything else (jobs, bootstrap, trigger…) → settings.timeouts.job (default 30 min)
*/
function resolveTimeoutMs(name: string): number {
const t = getSettings().timeouts;
let minutes: number;
if (name === "telegram") {
minutes = t.telegram;
} else if (name === "heartbeat") {
minutes = t.heartbeat;
} else {
minutes = t.job;
}
return minutes * 60_000;
}

async function runClaudeOnce(
baseArgs: string[],
model: string,
api: string,
baseEnv: Record<string, string>
): Promise<{ rawStdout: string; stderr: string; exitCode: number }> {
baseEnv: Record<string, string>,
timeoutMs: number
): Promise<{ rawStdout: string; stderr: string; exitCode: number; timedOut: boolean }> {
const args = [...baseArgs];
const normalizedModel = model.trim().toLowerCase();
if (model.trim() && normalizedModel !== "glm") args.push("--model", model.trim());
Expand All @@ -88,16 +163,31 @@ async function runClaudeOnce(
env: buildChildEnv(baseEnv, model, api),
});

let timedOut = false;
let sigkillTimer: ReturnType<typeof setTimeout> | null = null;

const killTimer = setTimeout(() => {
timedOut = true;
try { proc.kill("SIGTERM"); } catch { /* already dead */ }
sigkillTimer = setTimeout(() => {
try { proc.kill("SIGKILL"); } catch { /* already dead */ }
}, SIGKILL_GRACE_MS);
}, timeoutMs);

const [rawStdout, stderr] = await Promise.all([
new Response(proc.stdout).text(),
new Response(proc.stderr).text(),
]);
await proc.exited;

clearTimeout(killTimer);
if (sigkillTimer !== null) clearTimeout(sigkillTimer);

return {
rawStdout,
stderr,
exitCode: proc.exitCode ?? 1,
timedOut,
};
}

Expand Down Expand Up @@ -239,8 +329,11 @@ async function execClaude(name: string, prompt: string): Promise<RunResult> {
};
const securityArgs = buildSecurityArgs(security);

const timeoutMs = resolveTimeoutMs(name);
const timeoutMin = timeoutMs / 60_000;

console.log(
`[${new Date().toLocaleTimeString()}] Running: ${name} (${isNew ? "new session" : `resume ${existing.sessionId.slice(0, 8)}`}, security: ${security.level})`
`[${new Date().toLocaleTimeString()}] Running: ${name} (${isNew ? "new session" : `resume ${existing.sessionId.slice(0, 8)}`}, security: ${security.level}, timeout: ${timeoutMin}m)`
);

// New session: use json output to capture Claude's session_id
Expand Down Expand Up @@ -280,15 +373,23 @@ async function execClaude(name: string, prompt: string): Promise<RunResult> {
const { CLAUDECODE: _, ...cleanEnv } = process.env;
const baseEnv = { ...cleanEnv } as Record<string, string>;

let exec = await runClaudeOnce(args, primaryConfig.model, primaryConfig.api, baseEnv);
let exec = await runClaudeOnce(args, primaryConfig.model, primaryConfig.api, baseEnv, timeoutMs);

if (exec.timedOut) {
console.warn(
`[${new Date().toLocaleTimeString()}] TIMEOUT: ${name} subprocess killed after ${timeoutMin}m (SIGTERM+SIGKILL)`
);
}

const primaryRateLimit = extractRateLimitMessage(exec.rawStdout, exec.stderr);
let usedFallback = false;

if (primaryRateLimit && hasModelConfig(fallbackConfig) && !sameModelConfig(primaryConfig, fallbackConfig)) {
// Only retry with fallback on rate limit — not on timeout
if (!exec.timedOut && primaryRateLimit && hasModelConfig(fallbackConfig) && !sameModelConfig(primaryConfig, fallbackConfig)) {
console.warn(
`[${new Date().toLocaleTimeString()}] Claude limit reached; retrying with fallback${fallbackConfig.model ? ` (${fallbackConfig.model})` : ""}...`
);
exec = await runClaudeOnce(args, fallbackConfig.model, fallbackConfig.api, baseEnv);
exec = await runClaudeOnce(args, fallbackConfig.model, fallbackConfig.api, baseEnv, timeoutMs);
usedFallback = true;
}

Expand All @@ -301,10 +402,18 @@ async function execClaude(name: string, prompt: string): Promise<RunResult> {

if (rateLimitMessage) {
stdout = rateLimitMessage;
// Set global rate limit state so daemon can pause heartbeats/jobs
const resetTime = parseRateLimitResetTime(rateLimitMessage);
rateLimitResetAt = resetTime ?? (Date.now() + 60 * 60_000); // fallback: 1 hour
rateLimitNotified = false;
console.warn(
`[${new Date().toLocaleTimeString()}] Rate limit detected. Reset at: ${new Date(rateLimitResetAt).toISOString()}`
);
}

// For new sessions, parse the JSON to extract session_id and result text
if (!rateLimitMessage && isNew && exitCode === 0) {
// For new sessions, parse the JSON to extract session_id and result text.
// Skip if timed out — output won't be valid JSON.
if (!exec.timedOut && !rateLimitMessage && isNew && exitCode === 0) {
try {
const json = JSON.parse(rawStdout);
sessionId = json.session_id;
Expand All @@ -328,6 +437,7 @@ async function execClaude(name: string, prompt: string): Promise<RunResult> {
`Date: ${new Date().toISOString()}`,
`Session: ${sessionId} (${isNew ? "new" : "resumed"})`,
`Model config: ${usedFallback ? "fallback" : "primary"}`,
`Timeout: ${timeoutMin}m${exec.timedOut ? " [TIMED OUT]" : ""}`,
`Prompt: ${prompt}`,
`Exit code: ${result.exitCode}`,
"",
Expand Down