diff --git a/external_plugins/telegram/.claude-plugin/plugin.json b/external_plugins/telegram/.claude-plugin/plugin.json index 9e3c96a2f..e1edd215a 100644 --- a/external_plugins/telegram/.claude-plugin/plugin.json +++ b/external_plugins/telegram/.claude-plugin/plugin.json @@ -1,7 +1,7 @@ { "name": "telegram", "description": "Telegram channel for Claude Code \u2014 messaging bridge with built-in access control. Manage pairing, allowlists, and policy via /telegram:access.", - "version": "0.0.5", + "version": "0.0.6", "keywords": [ "telegram", "messaging", diff --git a/external_plugins/telegram/server.ts b/external_plugins/telegram/server.ts index 6a07e35b2..126be5084 100644 --- a/external_plugins/telegram/server.ts +++ b/external_plugins/telegram/server.ts @@ -19,7 +19,7 @@ import { z } from 'zod' import { Bot, GrammyError, InlineKeyboard, InputFile, type Context } from 'grammy' import type { ReactionTypeEmoji } from 'grammy/types' import { randomBytes } from 'crypto' -import { readFileSync, writeFileSync, mkdirSync, readdirSync, rmSync, statSync, renameSync, realpathSync, chmodSync } from 'fs' +import { readFileSync, writeFileSync, mkdirSync, readdirSync, rmSync, statSync, renameSync, realpathSync, chmodSync, openSync, closeSync } from 'fs' import { homedir } from 'os' import { join, extname, sep } from 'path' @@ -51,22 +51,50 @@ if (!TOKEN) { process.exit(1) } const INBOX_DIR = join(STATE_DIR, 'inbox') -const PID_FILE = join(STATE_DIR, 'bot.pid') +const LOCK_FILE = join(STATE_DIR, 'poll.lock') -// Telegram allows exactly one getUpdates consumer per token. If a previous -// session crashed (SIGKILL, terminal closed) its server.ts grandchild can -// survive as an orphan and hold the slot forever, so every new session sees -// 409 Conflict. Kill any stale holder before we start polling. +// Telegram allows exactly one getUpdates consumer per token. Use an atomic +// exclusive-create lock to decide who polls. If the lock file already exists +// and the holder is alive, this instance yields (runs in follower mode with +// outbound tools only) instead of killing the holder — preserving the active +// MCP pipe. True orphans (dead PID) are reclaimed by removing the stale lock. mkdirSync(STATE_DIR, { recursive: true, mode: 0o700 }) -try { - const stale = parseInt(readFileSync(PID_FILE, 'utf8'), 10) - if (stale > 1 && stale !== process.pid) { - process.kill(stale, 0) - process.stderr.write(`telegram channel: replacing stale poller pid=${stale}\n`) - process.kill(stale, 'SIGTERM') + +let isPollingLeader = false + +function acquirePollLock(): boolean { + try { + const fd = openSync(LOCK_FILE, 'wx') // O_CREAT | O_EXCL — atomic + writeFileSync(fd, String(process.pid)) + closeSync(fd) + return true + } catch { + // Lock file exists — check if holder is alive + try { + const holder = parseInt(readFileSync(LOCK_FILE, 'utf8'), 10) + if (holder > 1 && holder !== process.pid) { + process.kill(holder, 0) // throws if dead + // Holder is alive — yield, don't kill + process.stderr.write( + `telegram channel: active poller pid=${holder}, entering follower mode (outbound tools only)\n`, + ) + return false + } + } catch {} + // Holder is dead or PID unreadable — reclaim stale lock + try { rmSync(LOCK_FILE) } catch {} + try { + const fd = openSync(LOCK_FILE, 'wx') + writeFileSync(fd, String(process.pid)) + closeSync(fd) + return true + } catch { + return false + } } -} catch {} -writeFileSync(PID_FILE, String(process.pid)) +} + +isPollingLeader = acquirePollLock() // Last-resort safety net — without these the process dies silently on any // unhandled promise rejection. With them it logs and keeps serving tools. @@ -638,7 +666,7 @@ function shutdown(): void { shuttingDown = true process.stderr.write('telegram channel: shutting down\n') try { - if (parseInt(readFileSync(PID_FILE, 'utf8'), 10) === process.pid) rmSync(PID_FILE) + if (parseInt(readFileSync(LOCK_FILE, 'utf8'), 10) === process.pid) rmSync(LOCK_FILE) } catch {} // bot.stop() signals the poll loop to end; the current getUpdates request // may take up to its long-poll timeout to return. Force-exit after 2s. @@ -985,6 +1013,19 @@ bot.catch(err => { process.stderr.write(`telegram channel: handler error (polling continues): ${err.error}\n`) }) +// Follower mode: skip polling, keep MCP tools active for outbound calls +// (reply, edit_message, react, download_attachment all use bot.api.* which +// does not require a polling loop). The leader handles inbound delivery. +if (!isPollingLeader) { + process.stderr.write('telegram channel: follower mode — outbound tools active, no polling\n') + // Fetch bot info so outbound tools work (bot.api needs the token, not polling) + void bot.api.getMe().then(info => { + botUsername = info.username + process.stderr.write(`telegram channel: follower connected as @${info.username}\n`) + }).catch(err => { + process.stderr.write(`telegram channel: follower getMe failed: ${err}\n`) + }) +} else { // 409 Conflict = another getUpdates consumer is still active (zombie from a // previous session, or a second Claude Code instance). Retry with backoff // until the slot frees up instead of crashing on the first rejection. @@ -1033,3 +1074,4 @@ void (async () => { } } })() +}