diff --git a/.agents/shared/references/lead-proxy.md b/.agents/shared/references/lead-proxy.md index 2ffbbe047..bbfbc05af 100644 --- a/.agents/shared/references/lead-proxy.md +++ b/.agents/shared/references/lead-proxy.md @@ -10,7 +10,8 @@ Start a background poll for the report file with `create_worker.py await`. It reads `finish_report_path` from the task file's frontmatter, blocks until that file appears, prints its contents, and exits 0; on timeout it exits non-zero (code 124). Run it with Bash's `run_in_background: true` so it returns the -instant the report lands. +instant the report lands. Pass `--name ` so the poll also watches +the memory watchdog's shed ledger. `await` is a generic poll-until-file primitive; the gate cycle below is this flow's *use* of it. Non-interactive callers that launch a tightly-scoped agent @@ -20,6 +21,7 @@ and wait for one finish report use the same `await` (or the synchronous ```bash # Run with Bash run_in_background: true uv run .agents/skills/launch-task/scripts/create_worker.py await \ + --name \ --task-file ``` @@ -29,6 +31,13 @@ plus a body. If await exits non-zero (timeout) without printing a report, do *not* immediately treat it as a terminal failure -- see "Diagnose worker liveness" below. +If await exits with code 75, the worker's own agent was **paused by the memory +watchdog** to relieve memory pressure: it will not report until revived. This is +not a worker bug -- revive it with `mngr start --restart` (a plain +`mngr message` or `mngr start` does not relaunch a shed agent), then re-send its +task. On restart the worker is told it was paused, so it can re-check state +before continuing. + ## Diagnose worker liveness before invoking failure flow If the timeout trips without a report appearing, the worker may still be diff --git a/.agents/skills/launch-task/SKILL.md b/.agents/skills/launch-task/SKILL.md index 332cddf00..6094175aa 100644 --- a/.agents/skills/launch-task/SKILL.md +++ b/.agents/skills/launch-task/SKILL.md @@ -100,11 +100,15 @@ Poll with `create_worker.py await` as a background task (`run_in_background: true`) and continue with whatever else you were doing. It reads `finish_report_path` from the task file (`runtime/launch-task/$NAME/reports/report.md`), blocks until the worker pushes -back, then prints the report. +back, then prints the report. Pass `--name $NAME` so the poll also watches the +memory watchdog's shed ledger: if the worker's own agent is paused for memory +pressure (so it will never report until revived), the poll surfaces that +promptly and actionably instead of waiting out the full timeout. ```bash # Run with Bash run_in_background: true uv run .agents/skills/launch-task/scripts/create_worker.py await \ + --name $NAME \ --task-file runtime/launch-task/$NAME/task.md ``` diff --git a/.agents/skills/launch-task/references/dead-worker-recovery.md b/.agents/skills/launch-task/references/dead-worker-recovery.md index 7695e1990..66702eeb9 100644 --- a/.agents/skills/launch-task/references/dead-worker-recovery.md +++ b/.agents/skills/launch-task/references/dead-worker-recovery.md @@ -2,6 +2,28 @@ When a worker (sub-agent created via `launch-task`) is in `STOPPED` state -- claude session died mid-iteration, but the worktree (and any uncommitted work in it) is still intact -- the default path is to restart it, not to manually salvage. `mngr start` only re-creates the tmux session and re-execs claude in the existing worktree; it does not touch git state, so uncommitted changes survive the restart. +## First: was the worker shed for memory pressure? + +A worker can die because the **memory watchdog** shed it -- the container was running out of memory and the watchdog killed the most-expendable work first. Check the shed ledger before reviving, because reviving into ongoing memory pressure just gets the worker killed again: + +```bash +# Did the watchdog shed this worker? (look for your worker's name) +# Absolute path: the ledger is shared at /mngr/code/runtime/, but your cwd is +# your own worktree, so a relative `runtime/...` would miss it. +grep '"agent_name": *""' /mngr/code/runtime/memory_watchdog/events/shed/events.jsonl + +# Is the container still under pressure right now? +cat /mngr/code/runtime/memory_watchdog/status.json # is_under_pressure, used_fraction +``` + +Revival guidelines when a worker was shed: + +- **If pressure is still elevated** (`is_under_pressure` is true, or `used_fraction` is near the threshold): do NOT revive. Surface the situation to the user and let them decide -- reviving now will likely just be shed again and deepen the crunch. +- **If pressure has cleared**: revive at most once with `mngr start --restart`, then re-establish your report poll. A shed agent needs `--restart` -- a plain `mngr start` or `mngr message` will not relaunch it. On restart it is told it was paused, so it can re-check state before continuing; re-send its task. +- **If the same worker has already been shed twice** (two `process_shed` lines naming it): stop. Do not keep reviving. Surface to the user with the ledger details -- something about this worker's footprint is incompatible with the current memory budget. + +If the worker was *not* in the ledger, it died for some other reason (e.g. a claude crash); proceed with the normal restart path below, where a plain `mngr start` suffices. + ## Default: restart the worker and resume 1. Bring claude back up in the existing worktree: diff --git a/.agents/skills/launch-task/scripts/create_worker.py b/.agents/skills/launch-task/scripts/create_worker.py index d84a0d40e..fe8eaee64 100755 --- a/.agents/skills/launch-task/scripts/create_worker.py +++ b/.agents/skills/launch-task/scripts/create_worker.py @@ -114,6 +114,11 @@ # matching coreutils ``timeout``'s convention so the prose's mental model # carries over. _AWAIT_TIMEOUT_RC = 124 +# Distinct exit code for an await that stopped early because the worker's own +# agent was shed by the memory watchdog (so it will never report until revived). +# Separate from the timeout code so the lead can tell "paused for memory" apart +# from "still running, just slow". +_AWAIT_SHED_RC = 75 def _normalize_dir(value: str) -> str: @@ -224,6 +229,20 @@ def _read_finish_report_path(task_file: Path) -> Path: return Path(value) +def _worker_name_from_task_file(task_file: Path) -> str: + """Best-effort worker (mngr agent) name, from the task file's directory. + + Every flow stages the task at ``runtime///task.md`` where + ```` is the worker's mngr agent name (launch passes the same ```` + as both the directory and ``mngr create ``). So the parent directory + name is the worker name -- which is what lets ``await`` watch the shed ledger + for this worker even when the caller did not pass ``--name`` explicitly. If + the derived name is wrong it simply never matches a ledger record (no false + positive), so this is safe as a default. + """ + return task_file.resolve().parent.name + + class Runner: """Indirection over ``subprocess.run`` so tests can intercept commands. @@ -357,6 +376,11 @@ def launch( template, "--label", f"workspace={workspace}", + # Marks this as an agent-created (worker) agent so the memory + # watchdog classifies it at tier 7 -- shed before user-created + # agents (tier 5) under memory pressure. + "--label", + "agent_created=true", ], check=True, ) @@ -382,6 +406,61 @@ def launch( return 0 +def _resolve_shed_ledger_path() -> Path | None: + """Locate the memory watchdog's shed ledger via the watchdog's own path + module, so await resolves the exact file the watchdog writes and the revival + hook reads -- no second copy of the layout to drift. Returns None if the + watchdog package can't be imported, in which case the shed check is skipped + (await falls back to plain timeout behaviour). + """ + try: + watchdog_src = ( + Path(__file__).resolve().parents[4] / "libs" / "memory_watchdog" / "src" + ) + if watchdog_src.is_dir() and str(watchdog_src) not in sys.path: + sys.path.insert(0, str(watchdog_src)) + from memory_watchdog.paths import shed_ledger_path + + return shed_ledger_path() + except ImportError: + return None + + +def _worker_has_pending_shed(ledger_path: Path, worker_name: str) -> bool: + """Whether the worker's own agent is currently shed and not yet revived. + + Uses the same "pending" notion as the revival hook: a ``process_shed`` record + whose ``agent_name`` is the worker (the watchdog stamps ``agent_name`` only + when it sheds an agent's main process -- tier 5/7 -- not a mere subprocess), + newer than the latest ``notice_delivered`` marker for that worker (which the + revival hook writes when the worker restarts). So a shed that has already been + followed by a revival does not count, while a shed not yet revived does -- + regardless of whether this await was started before or after the shed (the + realistic case is a re-run poll started *after* the worker died). + """ + try: + text = ledger_path.read_text(encoding="utf-8") + except OSError: + return False + last_delivered = "" + shed_timestamps: list[str] = [] + for line in text.splitlines(): + if not line.strip(): + continue + try: + record = json.loads(line) + except json.JSONDecodeError: + continue + if record.get("agent_name") != worker_name: + continue + record_type = record.get("type") + if record_type == "notice_delivered": + last_delivered = max(last_delivered, str(record.get("up_to_timestamp", ""))) + elif record_type == "process_shed": + shed_timestamps.append(str(record.get("timestamp", ""))) + return any(timestamp > last_delivered for timestamp in shed_timestamps) + + def await_report( report_path: Path, timeout_seconds: float, @@ -389,6 +468,8 @@ def await_report( sleeper: Callable[[float], None] = time.sleep, clock: Callable[[], float] = time.monotonic, out: TextIO | None = None, + worker_name: str | None = None, + shed_ledger_path: Path | None = None, ) -> int: """Block until ``report_path`` exists, then print its contents. @@ -397,6 +478,15 @@ def await_report( on stderr so the caller diagnoses worker liveness per lead-proxy.md rather than treating the timeout as a terminal failure. + If ``worker_name`` and ``shed_ledger_path`` are supplied, each poll also + checks whether the worker's own agent was shed by the memory watchdog. A shed + worker will never report until it is revived, so rather than wait out the full + timeout we surface an actionable message and return ``_AWAIT_SHED_RC`` -- this + is what turns the lead's silent "poll died / timed out" into "your worker was + paused for memory; revive it". The report file is still checked first each + loop, so a report that landed before the shed (or a worker revived and + reporting) still wins. + ``sleeper``/``clock`` are injected so tests can drive the poll loop without real time. The file is checked before the first sleep, so a report already present returns immediately. @@ -407,6 +497,23 @@ def await_report( if report_path.is_file(): stream.write(report_path.read_text(encoding="utf-8")) return 0 + if ( + worker_name is not None + and shed_ledger_path is not None + and _worker_has_pending_shed(shed_ledger_path, worker_name) + ): + print( + f"create_worker: worker '{worker_name}' was stopped by the memory " + "watchdog to relieve memory pressure -- its agent process was shed " + "and its background tasks (including its own report poll) were " + "cancelled, so it will NOT report until it is revived. Revive it " + f"with: mngr start {worker_name} --restart (a plain `mngr message` " + "or `mngr start` will not relaunch a shed agent), then re-send its " + "task. On restart it is told it was paused, so it can re-check " + "state before continuing.", + file=sys.stderr, + ) + return _AWAIT_SHED_RC if clock() >= deadline: print( f"create_worker: timed out after {timeout_seconds:g}s waiting for " @@ -601,10 +708,20 @@ def _run_await(args: argparse.Namespace) -> int: # file; let the ValueError raise for a full traceback rather than swallowing # it into a terse exit-2 message (matches ``launch``'s handling above). report_path = _read_finish_report_path(args.task_file) + # Watch the watchdog's shed ledger so a worker paused for memory pressure + # surfaces promptly (and actionably) instead of as a silent 30-minute + # timeout. The worker name defaults to the task file's directory name (the + # runtime/// convention) so this works even when the caller did + # not pass --name explicitly -- which a lead following the skill easily + # forgets. --name overrides when given. + worker_name = args.name or _worker_name_from_task_file(args.task_file) + shed_ledger = _resolve_shed_ledger_path() return await_report( report_path=report_path, timeout_seconds=args.timeout, poll_interval_seconds=args.poll_interval, + worker_name=worker_name, + shed_ledger_path=shed_ledger, ) @@ -677,6 +794,13 @@ def main(argv: Sequence[str] | None = None, runner: Runner | None = None) -> int help="Same task file as launch; its frontmatter `finish_report_path` " "names the file to wait for.", ) + await_parser.add_argument( + "--name", + default=None, + help="Worker name. When given, await also watches the memory watchdog's " + "shed ledger so a worker paused for memory pressure is surfaced promptly " + "(and actionably) instead of as a silent timeout.", + ) await_parser.add_argument( "--timeout", default=_DEFAULT_TIMEOUT, diff --git a/.agents/skills/launch-task/scripts/create_worker_test.py b/.agents/skills/launch-task/scripts/create_worker_test.py index a43644349..c98b5c7e5 100644 --- a/.agents/skills/launch-task/scripts/create_worker_test.py +++ b/.agents/skills/launch-task/scripts/create_worker_test.py @@ -105,7 +105,17 @@ def test_happy_path_no_artifacts(tmp_path: Path) -> None: assert rc == 0 argvs = [c.argv for c in runner.calls] assert argvs == [ - ["mngr", "create", "demo-worker", "-t", "worker", "--label", "workspace=ws-1"], + [ + "mngr", + "create", + "demo-worker", + "-t", + "worker", + "--label", + "workspace=ws-1", + "--label", + "agent_created=true", + ], [ "mngr", "rsync", @@ -451,7 +461,17 @@ def test_common_transcript_flushed_before_message_send(tmp_path: Path) -> None: argvs = [c.argv for c in runner.calls] expected_script = str(state_dir / "commands" / "common_transcript.sh") assert argvs == [ - ["mngr", "create", "demo-worker", "-t", "worker", "--label", "workspace=ws-1"], + [ + "mngr", + "create", + "demo-worker", + "-t", + "worker", + "--label", + "workspace=ws-1", + "--label", + "agent_created=true", + ], [ "mngr", "rsync", @@ -667,6 +687,105 @@ def test_await_times_out_when_report_never_appears( assert "timed out" in capsys.readouterr().err +def test_worker_name_is_derived_from_the_task_file_directory(tmp_path: Path) -> None: + """await derives the worker name from runtime///task.md so the + shed-ledger watch works even when the caller forgot --name (a lead following + the skill easily does).""" + task = tmp_path / "runtime" / "launch-task" / "mem-probe" / "task.md" + task.parent.mkdir(parents=True) + task.write_text("---\nfinish_report_path: x/report.md\n---\n\nbody\n") + assert create_worker_mod._worker_name_from_task_file(task) == "mem-probe" + + +def _write_shed_ledger(ledger_path: Path, records: list[dict[str, Any]]) -> None: + ledger_path.parent.mkdir(parents=True, exist_ok=True) + ledger_path.write_text("\n".join(json.dumps(r) for r in records) + "\n") + + +def test_await_returns_shed_code_when_worker_has_pending_shed( + tmp_path: Path, capsys: pytest.CaptureFixture[str] +) -> None: + """A worker shed not yet followed by a revival stops the poll early with the + shed code and an actionable, revive-with-`--restart` message -- even on a poll + re-run started after the shed (the realistic lead-recovery case).""" + report = tmp_path / "reports" / "report.md" + report.parent.mkdir(parents=True) + ledger = tmp_path / "events" / "shed" / "events.jsonl" + _write_shed_ledger( + ledger, + [ + { + "type": "process_shed", + "tier_rank": 7, + "label": "demo", + "agent_name": "demo", + "timestamp": "2026-01-01T00:00:10.000000000Z", + } + ], + ) + out = io.StringIO() + + rc = create_worker_mod.await_report( + report_path=report, + timeout_seconds=1800, + poll_interval_seconds=5, + sleeper=_no_sleep, + clock=lambda: 0.0, + out=out, + worker_name="demo", + shed_ledger_path=ledger, + ) + + assert rc == create_worker_mod._AWAIT_SHED_RC + assert out.getvalue() == "" + err = capsys.readouterr().err + assert "memory" in err + assert "mngr start demo --restart" in err + + +def test_await_ignores_a_shed_already_followed_by_a_revival( + tmp_path: Path, capsys: pytest.CaptureFixture[str] +) -> None: + """A shed that has already been delivered/revived (a notice_delivered marker + covers it) is not pending, so await waits normally rather than crying shed -- + this is what stops a revived, healthy worker from looking paused.""" + report = tmp_path / "reports" / "report.md" + report.parent.mkdir(parents=True) + ledger = tmp_path / "events" / "shed" / "events.jsonl" + _write_shed_ledger( + ledger, + [ + { + "type": "process_shed", + "tier_rank": 7, + "label": "demo", + "agent_name": "demo", + "timestamp": "2026-01-01T00:00:10.000000000Z", + }, + { + "type": "notice_delivered", + "agent_name": "demo", + "up_to_timestamp": "2026-01-01T00:00:10.000000000Z", + }, + ], + ) + out = io.StringIO() + + rc = create_worker_mod.await_report( + report_path=report, + timeout_seconds=30, + poll_interval_seconds=5, + sleeper=_no_sleep, + clock=_FakeClock(step=20), + out=out, + worker_name="demo", + shed_ledger_path=ledger, + ) + + assert rc == create_worker_mod._AWAIT_TIMEOUT_RC + assert "timed out" in capsys.readouterr().err + + def test_read_finish_report_path_returns_field(tmp_path: Path) -> None: """_read_finish_report_path pulls the path out of the task frontmatter.""" task = tmp_path / "task.md" diff --git a/.claude/settings.json b/.claude/settings.json index 6dd960a5f..2804b0ff9 100644 --- a/.claude/settings.json +++ b/.claude/settings.json @@ -18,6 +18,10 @@ { "type": "command", "command": "${MNGR_AGENT_WORK_DIR:-.}/scripts/ensure_tk_on_path.sh" + }, + { + "type": "command", + "command": "python3 ${MNGR_AGENT_WORK_DIR:-.}/scripts/claude_shed_notice_hook.py" } ] } diff --git a/.mngr/settings.toml b/.mngr/settings.toml index f272e1e9f..8011aef80 100644 --- a/.mngr/settings.toml +++ b/.mngr/settings.toml @@ -33,6 +33,18 @@ host_env__extend = [ # subdirs too. /mngr/code/ is the container WORKDIR (set by the Dockerfile); # a /code -> /mngr/code symlink is kept in the image as a safety net. "TICKETS_DIR=/mngr/code/runtime/tickets", + # The memory watchdog writes its shed ledger + status file under runtime/ + # (so they ride the runtime-backup branch). The watchdog (writer) and the + # system interface (status reader) run under the system-services agent, + # whose work dir IS /mngr/code, so they resolve the path correctly from the + # work-dir fallback. But the per-agent revival SessionStart hook resolves + # the same path relative to ITS agent's MNGR_AGENT_WORK_DIR -- which for a + # worker is its own worktree (/mngr/worktree/), not /mngr/code. So a + # shed worker's hook read a nonexistent worktree-local ledger and never told + # the revived agent it had been paused. Pinning the absolute path here (same + # rationale as TICKETS_DIR above) makes the writer and every agent's reader + # resolve one shared ledger, regardless of work dir. + "MEMORY_WATCHDOG_RUNTIME_DIR=/mngr/code/runtime/memory_watchdog", # Workaround for an Apple M5 lima-VZ guest CPU mismatch: the Debian 12 # guest kernel advertises SVE / SVE2 in /proc/cpuinfo, but lima-VZ on M5 # traps the `cntb` SVE instruction emitted by OpenSSL's diff --git a/CLAUDE.md b/CLAUDE.md index 4f7851d8b..5e29743d6 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -418,6 +418,8 @@ If you get a failure when trying to commit the first time, just try committing a If something unexpected happens -- errors, confusing state, things not working as documented -- use the `dealing-with-the-unexpected` skill for guidance. +A background memory-watchdog SIGKILLs ("sheds") memory-heavy processes -- most-expendable first (an agent's build/test/tool subprocesses before the agent itself) -- under sustained memory pressure. If a command of yours dies with exit 137 and you did not kill it, confirm by checking the shed ledger at `/mngr/code/runtime/memory_watchdog/events/shed/events.jsonl` for a record naming it (or the memory banner in the UI). If it was shed, do NOT blindly re-run a memory-heavy command -- it will likely be shed again; find a lower-memory approach (smaller batches, streaming, releasing data you no longer need) and only retry if you can. + # claude -p If ever building AI-powered services and wanting to use `claude -p`, make sure to unset the MAIN_CLAUDE_SESSION_ID for the process. This prevents conversation rendering issues. \ No newline at end of file diff --git a/apps/system_interface/frontend/src/style.css b/apps/system_interface/frontend/src/style.css index 16c70beff..c5a525b65 100644 --- a/apps/system_interface/frontend/src/style.css +++ b/apps/system_interface/frontend/src/style.css @@ -58,6 +58,127 @@ body { pointer-events: none; } +/* ── Memory-pressure banner ── + A calm, low-key strip shown only while the watchdog reports sustained memory + pressure. Deliberately not alarming (no red): amber-tinted, thin, informative. */ +.memory-pressure-banner { + flex: 0 0 auto; + padding: 8px 16px; + font-size: 13px; + line-height: 1.5; + color: var(--color-text-primary); + background: color-mix(in srgb, #d9a441 16%, var(--color-bg, #ffffff)); + border-bottom: 1px solid color-mix(in srgb, #d9a441 40%, transparent); +} + +.memory-pressure-banner__title { + font-weight: 600; +} + +.memory-pressure-banner__detail { + opacity: 0.85; +} + +.memory-pressure-banner__main { + display: flex; + flex-wrap: wrap; + align-items: baseline; + gap: 4px 8px; +} + +/* The collapsed-state count doubles as the disclosure control. The chevron is + the affordance -- no underline (which collided awkwardly with the chevron); + hover just lifts a faint chip behind it. */ +.memory-pressure-banner__toggle { + display: inline-flex; + align-items: center; + gap: 5px; + margin: 0; + padding: 1px 6px; + background: none; + border: none; + border-radius: 5px; + font: inherit; + font-weight: 500; + color: var(--color-text-primary); + cursor: pointer; + opacity: 0.85; + transition: + opacity 0.12s ease, + background-color 0.12s ease; +} + +.memory-pressure-banner__toggle:hover { + opacity: 1; + background: color-mix(in srgb, #d9a441 16%, transparent); +} + +/* Drop the mouse-click focus ring, but keep a visible ring for keyboard users. */ +.memory-pressure-banner__toggle:focus { + outline: none; +} + +.memory-pressure-banner__toggle:focus-visible { + outline: 2px solid color-mix(in srgb, #d9a441 70%, transparent); + outline-offset: 2px; +} + +.memory-pressure-banner__chevron { + display: inline-block; + font-size: 14px; + line-height: 1; + transition: transform 0.15s ease; +} + +.memory-pressure-banner__chevron[data-expanded="true"] { + transform: rotate(90deg); +} + +/* Scroll container for the expanded details: capped so a long list scrolls + inside the banner rather than growing it without bound (which, combined with + min-h-0 on the workspace pane, keeps the chat input on screen). */ +.memory-pressure-banner__detail-panel { + margin: 8px 0 2px; + max-width: 720px; + max-height: 38vh; + overflow-y: auto; +} + +.memory-pressure-banner__table { + border-collapse: collapse; + width: 100%; + font-size: 12.5px; +} + +.memory-pressure-banner__table th, +.memory-pressure-banner__table td { + text-align: left; + padding: 4px 16px 4px 0; + white-space: nowrap; +} + +.memory-pressure-banner__table th { + font-weight: 600; + opacity: 0.7; + border-bottom: 1px solid color-mix(in srgb, #d9a441 35%, transparent); +} + +.memory-pressure-banner__table tbody tr + tr td { + border-top: 1px solid color-mix(in srgb, #d9a441 18%, transparent); +} + +.memory-pressure-banner__cell-process { + font-family: var(--font-mono, ui-monospace, "SF Mono", Menlo, monospace); + font-weight: 500; +} + +.memory-pressure-banner__col-freed { + text-align: right; + padding-right: 0; + font-variant-numeric: tabular-nums; + opacity: 0.85; +} + /* ── Title bar ── */ .app-header { padding: 14px 32px; @@ -2221,6 +2342,19 @@ body { text-overflow: ellipsis; } +/* "Reconnecting…" is a degraded, not-working state: mute the dot and stop the + activity pulse so it reads as "waiting to reconnect", not "agent is busy". */ +.agent-activity-indicator[data-state="reconnecting"] { + color: var(--color-text-secondary); + opacity: 0.8; +} + +.agent-activity-indicator[data-state="reconnecting"] .agent-activity-indicator__dot { + background: var(--color-text-secondary); + animation: none; + opacity: 0.6; +} + .claude-login-overlay { position: absolute; inset: 0; diff --git a/apps/system_interface/frontend/src/views/ActivityIndicator.test.ts b/apps/system_interface/frontend/src/views/ActivityIndicator.test.ts index 4e93601c6..9ea41357f 100644 --- a/apps/system_interface/frontend/src/views/ActivityIndicator.test.ts +++ b/apps/system_interface/frontend/src/views/ActivityIndicator.test.ts @@ -1,6 +1,6 @@ import { describe, expect, it } from "vitest"; import type { TranscriptEvent } from "../models/Response"; -import { isWorkingActivityState, labelForActivityState } from "./ActivityIndicator"; +import { activityIndicatorContent, isWorkingActivityState, labelForActivityState } from "./ActivityIndicator"; function userMsg(ts: string): TranscriptEvent { return { timestamp: ts, type: "user_message", event_id: `u-${ts}`, source: "test", role: "user", content: "hi" }; @@ -56,6 +56,33 @@ describe("labelForActivityState — fixed-label states", () => { }); }); +describe("activityIndicatorContent — connection gating", () => { + it("shows 'Reconnecting…' while disconnected, ignoring any stale activity state", () => { + // A cached THINKING must not be rendered as activity once the socket is + // down -- the agent may have finished or been killed and we'd never hear. + expect(activityIndicatorContent(false, "THINKING", [])).toEqual({ + label: "Reconnecting…", + dataState: "reconnecting", + }); + expect(activityIndicatorContent(false, "IDLE", [])).toEqual({ + label: "Reconnecting…", + dataState: "reconnecting", + }); + }); + + it("collapses when connected and the agent is idle / untracked", () => { + expect(activityIndicatorContent(true, "IDLE", [])).toBe(null); + expect(activityIndicatorContent(true, null, [])).toBe(null); + }); + + it("shows the live activity label when connected and working", () => { + expect(activityIndicatorContent(true, "THINKING", [userMsg("2026-04-28T01:00:00Z")])).toEqual({ + label: "Thinking…", + dataState: "THINKING", + }); + }); +}); + describe("labelForActivityState — TOOL_RUNNING transcript enrichment", () => { it("labels Read with the file basename", () => { const events = [ diff --git a/apps/system_interface/frontend/src/views/ActivityIndicator.ts b/apps/system_interface/frontend/src/views/ActivityIndicator.ts index 1196eb7e5..5bba36e8d 100644 --- a/apps/system_interface/frontend/src/views/ActivityIndicator.ts +++ b/apps/system_interface/frontend/src/views/ActivityIndicator.ts @@ -37,6 +37,7 @@ import m from "mithril"; import type { ToolCall, TranscriptEvent } from "../models/Response"; import { getEffectiveActivityState } from "../models/PendingMessages"; +import { isConnected } from "../models/AgentManager"; // Note: Agent / Task are intentionally NOT in this map. labelForToolCall // short-circuits with the "Delegating to sub-agent…" label for those tools @@ -202,6 +203,34 @@ export function labelForActivityState(state: string | null | undefined, events: return null; } +/** + * What the activity strip should render, given the live connection status and + * the agent's server-derived activity state. + * + * The agent's ``activity_state`` arrives only over the agents-updated + * websocket. While that socket is disconnected we receive no updates, so a + * cached "Thinking…"/"Running tool…" is stale and untrustworthy -- e.g. the + * agent may have finished, or (as when the memory watchdog sheds it) been + * killed outright -- and would otherwise stay pinned indefinitely. So while + * disconnected we show a calm "Reconnecting…" instead of asserting activity. + * The server pushes a fresh state snapshot the instant the socket reconnects, + * so the true state is restored within one reconnect cycle. + * + * Returns null when the strip should collapse (connected + no active turn). + */ +export function activityIndicatorContent( + connected: boolean, + state: string | null | undefined, + events: TranscriptEvent[], +): { label: string; dataState: string } | null { + if (!connected) { + return { label: "Reconnecting…", dataState: "reconnecting" }; + } + const label = labelForActivityState(state, events); + if (label === null) return null; + return { label, dataState: state ?? "" }; +} + interface ActivityIndicatorAttrs { agentId: string; events: TranscriptEvent[]; @@ -210,13 +239,17 @@ interface ActivityIndicatorAttrs { export function ActivityIndicator(): m.Component { return { view(vnode) { - const state = getEffectiveActivityState(vnode.attrs.agentId); - const label = labelForActivityState(state, vnode.attrs.events); - if (label === null) return null; - return m("div.agent-activity-indicator", { "data-state": state, role: "status", "aria-live": "polite" }, [ - m("span.agent-activity-indicator__dot"), - m("span.agent-activity-indicator__label", label), - ]); + const content = activityIndicatorContent( + isConnected(), + getEffectiveActivityState(vnode.attrs.agentId), + vnode.attrs.events, + ); + if (content === null) return null; + return m( + "div.agent-activity-indicator", + { "data-state": content.dataState, role: "status", "aria-live": "polite" }, + [m("span.agent-activity-indicator__dot"), m("span.agent-activity-indicator__label", content.label)], + ); }, }; } diff --git a/apps/system_interface/frontend/src/views/App.ts b/apps/system_interface/frontend/src/views/App.ts index 15fbc76a4..cd9d77064 100644 --- a/apps/system_interface/frontend/src/views/App.ts +++ b/apps/system_interface/frontend/src/views/App.ts @@ -1,19 +1,30 @@ import m from "mithril"; import { DockviewWorkspace } from "./DockviewWorkspace"; import { ClaudeLoginModal } from "./ClaudeLoginModal"; +import { MemoryPressureBanner } from "./MemoryPressureBanner"; import { isLoginModalOpen, closeLoginModal } from "../models/ClaudeAuth"; export function App(): m.Component { return { view() { - return m("div", { class: "app-layout flex", style: "height: calc(100vh - var(--minds-titlebar-height, 0px))" }, [ - m("div", { class: "minds-titlebar-spacer" }), - m("div", { class: "app-main flex flex-1 min-w-80" }, [m(DockviewWorkspace)]), - // Claude auth is mind-global, so the login modal is a single - // app-level instance driven by global auth state -- not one per - // ChatPanel. It opens when any agent surfaces an auth-error. - isLoginModalOpen() ? m(ClaudeLoginModal, { onDismiss: closeLoginModal }) : null, - ]); + return m( + "div", + { class: "app-layout flex flex-col", style: "height: calc(100vh - var(--minds-titlebar-height, 0px))" }, + [ + m("div", { class: "minds-titlebar-spacer" }), + // Full-width pressure strip above the workspace; renders nothing (zero + // layout impact) unless the watchdog reports sustained memory pressure. + m(MemoryPressureBanner), + // min-h-0 lets this flex child shrink below its content height, so an + // expanded pressure banner above it reduces the workspace height + // instead of pushing the chat input off the bottom of the viewport. + m("div", { class: "app-main flex flex-1 min-w-80 min-h-0 overflow-hidden" }, [m(DockviewWorkspace)]), + // Claude auth is mind-global, so the login modal is a single + // app-level instance driven by global auth state -- not one per + // ChatPanel. It opens when any agent surfaces an auth-error. + isLoginModalOpen() ? m(ClaudeLoginModal, { onDismiss: closeLoginModal }) : null, + ], + ); }, }; } diff --git a/apps/system_interface/frontend/src/views/MemoryPressureBanner.test.ts b/apps/system_interface/frontend/src/views/MemoryPressureBanner.test.ts new file mode 100644 index 000000000..0f642e0b6 --- /dev/null +++ b/apps/system_interface/frontend/src/views/MemoryPressureBanner.test.ts @@ -0,0 +1,80 @@ +import { describe, expect, it } from "vitest"; +import { + creatorLabel, + humanizeKb, + pausedSummary, + processName, + tierLabel, + totalPausedCount, +} from "./MemoryPressureBanner"; + +function item( + label: string, + tier_rank: number, + count: number, + reclaimed_kb: number, + owning_agent_name: string | null = null, +) { + return { label, tier_rank, count, reclaimed_kb, owning_agent_name }; +} + +describe("humanizeKb", () => { + it("renders KB below 1 MB", () => { + expect(humanizeKb(512)).toBe("512 KB"); + }); + it("renders whole MB between 1 MB and 1 GB", () => { + expect(humanizeKb(2048)).toBe("2 MB"); + expect(humanizeKb(500_000)).toBe("488 MB"); + }); + it("renders GB with one decimal at/above 1 GB", () => { + expect(humanizeKb(3_817_068)).toBe("3.6 GB"); + }); +}); + +describe("tierLabel", () => { + it("maps the sheddable tiers to friendly names", () => { + expect(tierLabel(8)).toBe("Agent subprocess"); + expect(tierLabel(7)).toBe("Worker agent"); + expect(tierLabel(6)).toBe("Background service"); + expect(tierLabel(5)).toBe("Agent"); + }); + it("falls back for an unknown rank", () => { + expect(tierLabel(1)).toBe("Background process"); + }); +}); + +describe("paused count + summary", () => { + it("sums counts across items", () => { + expect(totalPausedCount([item("python3", 8, 1, 100), item("sleep", 8, 3, 30)])).toBe(4); + }); + it("returns null when nothing was paused (toggle hidden)", () => { + expect(pausedSummary([])).toBe(null); + }); + it("uses singular for one and plural for many", () => { + expect(pausedSummary([item("python3", 8, 1, 100)])).toBe("1 background task paused"); + expect(pausedSummary([item("python3", 8, 2, 100)])).toBe("2 background tasks paused"); + }); +}); + +describe("processName", () => { + it("shows the command alone for a single process", () => { + expect(processName(item("python3 hog.py", 8, 1, 3_131_972))).toBe("python3 hog.py"); + }); + it("appends a multiplier when several of one kind were paused", () => { + expect(processName(item("sleep", 8, 3, 3324))).toBe("sleep ×3"); + }); +}); + +describe("creatorLabel", () => { + it("names the owning agent for a subprocess", () => { + expect(creatorLabel(item("python3 hog.py", 8, 1, 3_131_972, "hogtest"))).toBe("hogtest"); + }); + it("falls back to a friendly kind when a subprocess has no owner", () => { + expect(creatorLabel(item("python3", 8, 1, 1000))).toBe("Agent"); + }); + it("describes the kind for non-subprocess tiers", () => { + expect(creatorLabel(item("alice", 5, 1, 90000, "alice"))).toBe("Agent"); + expect(creatorLabel(item("worker", 7, 1, 90000, "worker"))).toBe("Worker agent"); + expect(creatorLabel(item("web", 6, 1, 90000))).toBe("Background service"); + }); +}); diff --git a/apps/system_interface/frontend/src/views/MemoryPressureBanner.ts b/apps/system_interface/frontend/src/views/MemoryPressureBanner.ts new file mode 100644 index 000000000..d03040d9c --- /dev/null +++ b/apps/system_interface/frontend/src/views/MemoryPressureBanner.ts @@ -0,0 +1,182 @@ +import m from "mithril"; +import { apiUrl } from "../base-path"; + +// A calm, non-alarming strip that appears only while the workspace is under +// sustained memory pressure. By default it shows the reassuring message plus a +// count of how much background work was paused; a chevron expands an itemized +// list (what was paused, which kind of work, how much memory it freed). It +// disappears on its own once pressure clears. + +interface RecentShedItem { + label: string; + tier_rank: number; + count: number; + reclaimed_kb: number; + owning_agent_name?: string | null; +} + +interface MemoryStatus { + is_under_pressure: boolean; + used_fraction: number; + recently_shed: RecentShedItem[]; + blocked_services: string[]; +} + +const POLL_INTERVAL_MS = 5000; + +let currentStatus: MemoryStatus | null = null; + +async function pollMemoryStatus(): Promise { + try { + const status = await m.request({ + method: "GET", + url: apiUrl("/api/memory-status"), + }); + currentStatus = status; + } catch { + // The status endpoint is best-effort; on any failure leave the banner as + // it was rather than flapping it. A genuinely-down interface has its own + // recovery path. + return; + } + m.redraw(); +} + +// Friendly names for the watchdog's shed tiers (only the sheddable tiers 5-8 +// ever appear here). Keeps the expanded list readable without exposing the +// internal tier vocabulary. +const TIER_LABELS: Record = { + 5: "Agent", + 6: "Background service", + 7: "Worker agent", + 8: "Agent subprocess", +}; + +export function tierLabel(tierRank: number): string { + return TIER_LABELS[tierRank] ?? "Background process"; +} + +export function humanizeKb(kb: number): string { + if (kb >= 1024 * 1024) return `${(kb / 1024 / 1024).toFixed(1)} GB`; + if (kb >= 1024) return `${Math.round(kb / 1024)} MB`; + return `${kb} KB`; +} + +export function totalPausedCount(items: RecentShedItem[]): number { + return items.reduce((sum, item) => sum + item.count, 0); +} + +// Collapsed-state summary: a plain count of how many things were paused, or +// null when nothing was (the toggle is then hidden). +export function pausedSummary(items: RecentShedItem[]): string | null { + const total = totalPausedCount(items); + if (total <= 0) return null; + return `${total} background ${total === 1 ? "task" : "tasks"} paused`; +} + +// The "Process" cell: the command, with a multiplier when several of the same +// kind were paused ("sleep ×3"). +export function processName(item: RecentShedItem): string { + return item.count > 1 ? `${item.label} ×${item.count}` : item.label; +} + +// The "Creator" cell: who the paused work belonged to. A subprocess is +// attributed to its agent by name ("hogtest"); everything else falls back to a +// friendly description of what kind of thing it was. +export function creatorLabel(item: RecentShedItem): string { + if (item.tier_rank === 8) { + return item.owning_agent_name ?? "Agent"; + } + return tierLabel(item.tier_rank); +} + +export function MemoryPressureBanner(): m.Component { + let timerId: number | undefined; + let expanded = false; + + return { + oncreate() { + void pollMemoryStatus(); + timerId = window.setInterval(() => void pollMemoryStatus(), POLL_INTERVAL_MS); + }, + onremove() { + if (timerId !== undefined) { + window.clearInterval(timerId); + } + }, + view() { + if (!currentStatus || !currentStatus.is_under_pressure) { + return null; + } + const items = currentStatus.recently_shed; + const blocked = currentStatus.blocked_services; + const summary = pausedSummary(items); + const hasDetails = items.length > 0 || blocked.length > 0; + + return m("div", { class: "memory-pressure-banner", role: "status" }, [ + m("div", { class: "memory-pressure-banner__main" }, [ + m("span", { class: "memory-pressure-banner__title" }, "The workspace is low on memory."), + m( + "span", + { class: "memory-pressure-banner__detail" }, + " Background work may be paused to keep things responsive; your conversations and data are safe.", + ), + hasDetails + ? m( + "button", + { + class: "memory-pressure-banner__toggle", + type: "button", + "aria-expanded": expanded ? "true" : "false", + onclick: () => { + expanded = !expanded; + }, + }, + [ + m("span", summary ?? "Details"), + m( + "span", + { + class: "memory-pressure-banner__chevron", + "data-expanded": expanded ? "true" : "false", + "aria-hidden": "true", + }, + "›", + ), + ], + ) + : null, + ]), + expanded && hasDetails + ? m("div", { class: "memory-pressure-banner__detail-panel" }, [ + m("table", { class: "memory-pressure-banner__table" }, [ + m("thead", [ + m("tr", [ + m("th", "Process"), + m("th", "Creator"), + m("th", { class: "memory-pressure-banner__col-freed" }, "Freed"), + ]), + ]), + m("tbody", [ + ...items.map((item) => + m("tr", [ + m("td", { class: "memory-pressure-banner__cell-process" }, processName(item)), + m("td", creatorLabel(item)), + m("td", { class: "memory-pressure-banner__col-freed" }, humanizeKb(item.reclaimed_kb)), + ]), + ), + ...blocked.map((service) => + m("tr", [ + m("td", { class: "memory-pressure-banner__cell-process" }, service), + m("td", "System service"), + m("td", { class: "memory-pressure-banner__col-freed" }, "—"), + ]), + ), + ]), + ]), + ]) + : null, + ]); + }, + }; +} diff --git a/apps/system_interface/imbue/system_interface/agent_manager.py b/apps/system_interface/imbue/system_interface/agent_manager.py index f5372f376..b716e0600 100644 --- a/apps/system_interface/imbue/system_interface/agent_manager.py +++ b/apps/system_interface/imbue/system_interface/agent_manager.py @@ -116,6 +116,10 @@ def _build_chat_create_command( "none", "--template", "chat", + # Tags this as a user-created agent so the memory watchdog protects it at + # tier 5 (the same label the worktree-create path already sets). + "--label", + "user_created=true", "--no-connect", ] # Inherit workspace and project labels from the primary agent. diff --git a/apps/system_interface/imbue/system_interface/agent_manager_test.py b/apps/system_interface/imbue/system_interface/agent_manager_test.py index 4d2ecae16..1b7d8eff0 100644 --- a/apps/system_interface/imbue/system_interface/agent_manager_test.py +++ b/apps/system_interface/imbue/system_interface/agent_manager_test.py @@ -932,6 +932,17 @@ def test_chat_create_argv_accepted_by_live_cli() -> None: assert_mngr_argv_valid(argv) +def test_chat_create_tags_user_created() -> None: + argv = _build_chat_create_command( + mngr_binary="mngr", + name="demo", + agent_id="agent-123", + primary_labels={"workspace": "ws"}, + ) + labels = [argv[i + 1] for i, arg in enumerate(argv) if arg == "--label"] + assert "user_created=true" in labels + + def test_observe_argv_accepted_by_live_cli() -> None: argv = _build_observe_command_argv("mngr") assert_mngr_argv_valid(argv) diff --git a/apps/system_interface/imbue/system_interface/models.py b/apps/system_interface/imbue/system_interface/models.py index 18345d341..e5dc17548 100644 --- a/apps/system_interface/imbue/system_interface/models.py +++ b/apps/system_interface/imbue/system_interface/models.py @@ -113,6 +113,31 @@ class StartAgentResponse(FrozenModel): status: str = Field(description="Result of the start operation") +class RecentShedItem(FrozenModel): + """One aggregated line of what the memory watchdog shed recently.""" + + label: str = Field(description="What was shed (service, agent, or command name)") + tier_rank: int = Field(description="1..8 priority rank of the shed processes") + count: int = Field(description="How many processes with this label were shed") + reclaimed_kb: int = Field(description="Total resident memory reclaimed, in kibibytes") + owning_agent_name: str | None = Field( + default=None, description="Agent these processes belonged to, if any" + ) + + +class MemoryStatusResponse(FrozenModel): + """Response from /api/memory-status -- a projection of the watchdog status file.""" + + is_under_pressure: bool = Field(description="Whether the pressure banner should show") + used_fraction: float = Field(description="Fraction of total memory in use, 0..1") + recently_shed: list[RecentShedItem] = Field( + default_factory=list, description="What the watchdog shed recently" + ) + blocked_services: list[str] = Field( + default_factory=list, description="Services the service manager has paused under pressure" + ) + + class ClaudeAuthStatusResponse(FrozenModel): """Response from /api/claude-auth/status.""" diff --git a/apps/system_interface/imbue/system_interface/server.py b/apps/system_interface/imbue/system_interface/server.py index cefc39321..f006765cc 100644 --- a/apps/system_interface/imbue/system_interface/server.py +++ b/apps/system_interface/imbue/system_interface/server.py @@ -16,6 +16,7 @@ from flask import send_from_directory from flask_sock import Sock from loguru import logger as _loguru_logger +from memory_watchdog.ledger import status_path as memory_watchdog_status_path from simple_websocket import ConnectionClosed from werkzeug.exceptions import HTTPException @@ -51,7 +52,9 @@ from imbue.system_interface.models import DestroyAgentResponse from imbue.system_interface.models import ErrorResponse from imbue.system_interface.models import InterruptAgentResponse +from imbue.system_interface.models import MemoryStatusResponse from imbue.system_interface.models import RandomNameResponse +from imbue.system_interface.models import RecentShedItem from imbue.system_interface.models import SendMessageRequest from imbue.system_interface.models import SendMessageResponse from imbue.system_interface.models import StartAgentResponse @@ -544,6 +547,65 @@ def _random_name_endpoint() -> Response: return _json_response(RandomNameResponse(name=name).model_dump()) +def _read_memory_status() -> MemoryStatusResponse: + """Project the watchdog's status file into the banner's response shape. + + Returns a healthy (no-pressure) status when the file is missing (the + watchdog has not started yet) or unreadable, so the banner stays hidden + rather than erroring. The status-file path comes from the watchdog's own + ``status_path`` helper so producer and reader can't drift. The fields are + read leniently (not validated against the writer's model) so a future status + schema addition can't break the banner. + """ + file_path = memory_watchdog_status_path() + healthy = MemoryStatusResponse( + is_under_pressure=False, + used_fraction=0.0, + recently_shed=[], + blocked_services=[], + ) + if not file_path.exists(): + return healthy + # Reading and projecting are guarded together: besides IO / JSON-syntax + # errors, a status file that parses to a non-dict (e.g. ``null`` or a list) + # or carries a non-numeric value where a number is expected would otherwise + # raise AttributeError / TypeError / ValueError out of the comprehension and + # surface as an HTTP 500. The docstring promises a malformed or + # future-schema status file leaves the banner hidden, so all of those fall + # back to the healthy response (logged once, never silent). + try: + raw = json.loads(file_path.read_text()) + recently_shed = [ + RecentShedItem( + label=str(item.get("label", "")), + tier_rank=int(item.get("tier_rank", 0)), + count=int(item.get("count", 0)), + reclaimed_kb=int(item.get("reclaimed_kb", 0)), + owning_agent_name=( + str(item["owning_agent_name"]) + if item.get("owning_agent_name") is not None + else None + ), + ) + for item in raw.get("recently_shed", []) + if isinstance(item, dict) + ] + return MemoryStatusResponse( + is_under_pressure=bool(raw.get("is_under_pressure", False)), + used_fraction=float(raw.get("used_fraction", 0.0)), + recently_shed=recently_shed, + blocked_services=[str(s) for s in raw.get("blocked_services", [])], + ) + except (OSError, json.JSONDecodeError, AttributeError, TypeError, ValueError) as e: + _loguru_logger.warning("Failed to read memory watchdog status file: {}", e) + return healthy + + +def _memory_status_endpoint() -> Response: + """Serve the current memory-pressure status for the UI banner.""" + return _json_response(_read_memory_status().model_dump()) + + def _create_worktree_agent() -> Response: """Create a new worktree agent.""" agent_manager: AgentManager = get_state().agent_manager @@ -972,6 +1034,7 @@ def create_application( application.add_url_rule("/api/agents/create-worktree", view_func=_create_worktree_agent, methods=["POST"]) application.add_url_rule("/api/agents/create-chat", view_func=_create_chat_agent, methods=["POST"]) application.add_url_rule("/api/random-name", view_func=_random_name_endpoint, methods=["GET"]) + application.add_url_rule("/api/memory-status", view_func=_memory_status_endpoint, methods=["GET"]) application.add_url_rule("/api/agents//events", view_func=_get_events, methods=["GET"]) application.add_url_rule("/api/agents//stream", view_func=_stream_events, methods=["GET"]) application.add_url_rule("/api/agents//message", view_func=_send_message_endpoint, methods=["POST"]) diff --git a/apps/system_interface/imbue/system_interface/server_test.py b/apps/system_interface/imbue/system_interface/server_test.py index 5b5bc3603..b90056a22 100644 --- a/apps/system_interface/imbue/system_interface/server_test.py +++ b/apps/system_interface/imbue/system_interface/server_test.py @@ -458,6 +458,73 @@ def test_random_name_endpoint(client: FlaskClient) -> None: assert len(data["name"]) > 0 +def test_memory_status_healthy_when_no_status_file( + client: FlaskClient, tmp_path: Path, monkeypatch: pytest.MonkeyPatch +) -> None: + """With no watchdog status file, the endpoint reports no pressure.""" + monkeypatch.setenv("MNGR_AGENT_WORK_DIR", str(tmp_path)) + response = client.get("/api/memory-status") + assert response.status_code == 200 + data = response.get_json() + assert data["is_under_pressure"] is False + assert data["recently_shed"] == [] + + +def test_memory_status_reflects_pressure_file( + client: FlaskClient, tmp_path: Path, monkeypatch: pytest.MonkeyPatch +) -> None: + """The endpoint projects the watchdog status file into the banner shape.""" + monkeypatch.setenv("MNGR_AGENT_WORK_DIR", str(tmp_path)) + status_path = tmp_path / "runtime" / "memory_watchdog" / "status.json" + status_path.parent.mkdir(parents=True) + status_path.write_text( + json.dumps( + { + "is_under_pressure": True, + "used_fraction": 0.93, + "recently_shed": [ + { + "label": "python3 hog.py", + "tier_rank": 8, + "count": 2, + "reclaimed_kb": 500000, + "owning_agent_name": "alice", + } + ], + "blocked_services": ["web"], + } + ) + ) + response = client.get("/api/memory-status") + assert response.status_code == 200 + data = response.get_json() + assert data["is_under_pressure"] is True + assert data["recently_shed"][0]["label"] == "python3 hog.py" + assert data["recently_shed"][0]["owning_agent_name"] == "alice" + assert data["blocked_services"] == ["web"] + + +def test_memory_status_healthy_when_status_file_malformed( + client: FlaskClient, tmp_path: Path, monkeypatch: pytest.MonkeyPatch +) -> None: + """A status file that parses to a non-dict must not 500 the endpoint. + + The endpoint promises a malformed/future-schema status file leaves the banner + hidden rather than erroring. A top-level ``null`` (valid JSON, not a dict) + would otherwise raise AttributeError out of the projection, so this guards the + healthy fallback for that case. + """ + monkeypatch.setenv("MNGR_AGENT_WORK_DIR", str(tmp_path)) + status_path = tmp_path / "runtime" / "memory_watchdog" / "status.json" + status_path.parent.mkdir(parents=True) + status_path.write_text("null") + response = client.get("/api/memory-status") + assert response.status_code == 200 + data = response.get_json() + assert data["is_under_pressure"] is False + assert data["recently_shed"] == [] + + def test_create_chat_agent_without_work_dir(monkeypatch: pytest.MonkeyPatch) -> None: """Creating a chat agent without a primary agent work dir returns 400.""" monkeypatch.delenv("MNGR_AGENT_WORK_DIR", raising=False) diff --git a/apps/system_interface/pyproject.toml b/apps/system_interface/pyproject.toml index 831604827..72b23a628 100644 --- a/apps/system_interface/pyproject.toml +++ b/apps/system_interface/pyproject.toml @@ -11,6 +11,7 @@ dependencies = [ "imbue-common", "imbue-mngr", "imbue-mngr-claude", + "memory-watchdog", "pexpect>=4.9", "pluggy>=1.5", "pydantic-settings>=2.13.1", diff --git a/blueprint/oom-graceful-degradation/plan-oom-graceful-degradation.md b/blueprint/oom-graceful-degradation/plan-oom-graceful-degradation.md new file mode 100644 index 000000000..f24c1b1eb --- /dev/null +++ b/blueprint/oom-graceful-degradation/plan-oom-graceful-degradation.md @@ -0,0 +1,49 @@ +# OOM prioritization and graceful degradation + +## Overview + +- Today, when a container built from this repo runs out of memory, victim selection is at the kernel's whim — often the biggest process (a claude agent, a pytest run, Chromium) dies silently, and nothing records it, notifies anyone, or guides recovery. In the worst case the kernel takes out tmux, the system interface, or the tunnel, and the user loses their window into the system entirely. +- The design assigns every process in the container to a priority tier via one shared classifier, consumed by two mechanisms: an `oom_score_adj` tagger (steers the kernel's own OOM killer where the kernel honors it — runc/lima) and a shedder watchdog (works in every mode, including gVisor where in-container `oom_score_adj` cannot steer the host's victim selection and a hard OOM kills the whole sandbox). +- The shedder polls `/proc/meminfo` and, under sustained pressure, kills whole tiers from the bottom up — most-expendable work dies first, the user's window in (system_interface, cloudflared, ttyd) and the recovery machinery are never shed. +- Everything is recorded: a shed-event ledger plus a continuously updated status file under `runtime/` (backed up, and the raw data behind the UI banner, agent notices, and revival decisions). +- Recovery is a closed loop: bootstrap and the watchdog supervise each other, the watchdog also covers telegram/ttyd, bootstrap gets a crash-loop circuit breaker, and shed agents revive on the next user message with an injected notice. Whole-container death is already auto-recovered by the minds desktop client's host-restart tier (verified by exploring `vendor/mngr/apps/minds`; that outer layer only fires when the UI is unreachable, so the two mechanisms are complementary, not redundant). + +### Tier table (most protected first) + +| Tier | Members | adj | Sheddable | +|---|---|---|---| +| 1 | tmux server, sshd, container entrypoint | 0 | never | +| 2 | system_interface, cloudflared, ttyd | 0 | never | +| 3 | bootstrap, watchdog | 0 | never | +| 4 | runtime-backup, host-backup | 0 | never | +| 5 | user-created agents (`user_created=true` label; unlabeled agents default here) | + | last resort | +| 6 | telegram-bot, web, app-watcher, and all agent-added services.toml services | ++ | yes | +| 7 | agent-created agents (workers etc.) | +++ | yes | +| 8 | agent children: builds, tests, Chromium, pollers (no exemptions) | ++++ | shed first | + +- Protected tiers stay at `oom_score_adj` 0 and expendable tiers get increasingly positive values — negative values would require `CAP_SYS_RESOURCE`, which Docker's default cap set does not grant; positive-only tagging achieves the same relative ordering without extra capabilities. + +## Expected behavior + +- Under normal memory conditions nothing is visible: the tagger keeps `oom_score_adj` values current as processes come and go, and the status file reports healthy. +- Under sustained pressure (defaults to be tuned: ~90% usage held for ~10s, exposed as constants), the shedder kills tier 8 in its entirety, re-evaluates, and escalates tier by tier — 8, then 7, then 6, then 5. Tiers 1-4 are never shed. +- Every kill and every pause is appended to the shed-event ledger; the status file (current usage, threshold, last-shed summary) is rewritten every poll as the read API for the banner, for agents checking pressure before heavy work, and for revival decisions. +- A shed user-created agent stays dead until the user next messages it (which is what revives agents today). On revival, a hook injects the queued notice — it was killed to relieve memory pressure, and its background tasks (e.g. polling loops) were cancelled and not restarted — then clears the pending notice. +- A shed worker is discovered by its parent through the existing launch-task flow (report-poll timeout, then liveness diagnosis); no new notification mechanism, and nothing impersonates the worker-report contract. The dead-worker-recovery reference gains a step: check the ledger, and follow the revival guidelines — do not revive while pressure is elevated (surface to the user instead), revive at most once after pressure clears, and a twice-shed worker always escalates to the user. +- A config-driven auto-revive list (default empty) names agents the watchdog re-messages once pressure clears; everything else stays down until explicitly revived. The intelligent "watcher agent" that decides what to bring back is deferred follow-up work. +- A service that fails rapidly N times in a row trips bootstrap's circuit breaker: restarts pause for a cooldown, the service is marked blocked in the ledger, and it resumes after the cooldown. +- system_interface shows a calm, non-alarming banner during sustained pressure or recent shedding, listing what was shed and which services are paused; it disappears when pressure clears. +- Supervision is mutual: bootstrap restarts the watchdog via the existing services.toml `on-failure` policy; the watchdog restarts bootstrap, telegram-bot, and ttyd if their processes die (preserving ttyd's terminal-survives-bootstrap-failure property). +- Under runc (lima), a kernel OOM kill that beats the shedder still follows tier order thanks to the adj tags. Under gVisor, the shedder is the only ordered mechanism; if it loses the race, the sandbox dies and the minds host-restart tier brings the container back (existing behavior, outside this plan). +- Headless deployments (no minds desktop client attached — e.g. telegram-only) have no outer container-restart layer; that gap is explicitly deferred. + +## Changes + +- New watchdog service, registered in services.toml with `restart = "on-failure"`: process classifier (tier assignment from process ancestry, tmux session/window mapping, and agent labels), `oom_score_adj` sweep, pressure monitor, whole-tier shedder, ledger + status file writer, supervision of bootstrap/telegram-bot/ttyd, and auto-revive list processing. +- bootstrap: restart backoff and crash-loop circuit breaker, with blocked-service state written to the ledger. +- system_interface: memory-pressure banner fed from the watchdog's status file and ledger. +- Agent creation paths: add the `user_created=true` label to UI chat creates and the bootstrap initial chat (UI worktree creates already set it); workers and other agent-created paths get no label and classify as tier 7; unlabeled agents default protectively to tier 5. +- Claude Code hook: on agent revival, inject any pending shed notice from the ledger as context, then clear it. +- `.agents/shared/references/dead-worker-recovery.md`: add the ledger check and the revival guidelines. +- A documented manual OOM drill (run a memory hog, observe shed order, banner, notices, recovery); unit tests cover classifier, shedder, and breaker logic — real-container testing is performed by the user. +- Explicitly out of scope: `--restart` docker args (minds' host-restart tier covers container death), recovery-page memory-pressure probe in vendor/mngr (avoids further outer-app/container coupling), headless container-death recovery, per-service tier configuration, and any auto-restart of shed background work. diff --git a/libs/bootstrap/src/bootstrap/manager.py b/libs/bootstrap/src/bootstrap/manager.py index b69fb2f25..53eabcff6 100644 --- a/libs/bootstrap/src/bootstrap/manager.py +++ b/libs/bootstrap/src/bootstrap/manager.py @@ -260,6 +260,10 @@ def _build_create_chat_command(host_name: str, labels: dict[str, str]) -> list[s "chat", "--message", "/welcome", + # Tags the initial chat as a user-created agent so the memory watchdog + # protects it at tier 5 (matching the New Chat / New Agent paths). + "--label", + "user_created=true", "--no-connect", "--format", "json", diff --git a/libs/memory_watchdog/OOM_DRILL.md b/libs/memory_watchdog/OOM_DRILL.md new file mode 100644 index 000000000..fb44d1688 --- /dev/null +++ b/libs/memory_watchdog/OOM_DRILL.md @@ -0,0 +1,91 @@ +# Manual out-of-memory drill + +The watchdog's shed ordering, banner, notices, and recovery loop can only be +*fully* verified by actually exhausting memory inside a running container and +watching tiers die in order. That is environment-dependent (and destructive), so +it is a documented manual drill rather than a CI test. Run it once per deployment +mode you care about (docker/gVisor, lima/runc, vps). + +The pure logic underneath (classification, shed selection, the blocked-services +ledger logic, status/ledger IO) is covered by the unit tests next to the source +and runs in CI. + +## Prerequisites + +- A live workspace container with supervisord up and the watchdog running + (`supervisorctl status memory-watchdog` shows `RUNNING`). +- A way to watch memory: `watch -n1 free -m` in a terminal. +- The system interface open in the UI. + +## Procedure + +1. **Baseline.** Confirm the watchdog is publishing status: + + ```bash + cat runtime/memory_watchdog/status.json # is_under_pressure should be false + ``` + + Confirm tagging is happening: pick an agent's pytest/chromium child PID and + check it has a high oom_score_adj: + + ```bash + cat /proc//oom_score_adj # expect ~900 for an agent child + cat /proc//oom_score_adj # expect 0 (protected) + ``` + +2. **Create the conditions.** In a *worker* or chat agent, start a deliberately + large but bounded memory hog as a child process (a build, a big pytest run, or + a synthetic allocator). A simple synthetic hog: + + ```bash + python3 -c "x=bytearray(1); + import time + chunks=[] + while True: + chunks.append(bytearray(50_000_000)) # 50 MB at a time + time.sleep(0.2)" + ``` + + Watch `free -m` climb toward the container limit. + +3. **Observe shedding.** As usage crosses ~90% for ~10s, the watchdog should kill + the hog (tier 8, agent child) first. Verify: + + ```bash + tail -n 20 runtime/memory_watchdog/events/shed/events.jsonl + ``` + + You should see `process_shed` lines, lowest tier (8) first. The agent process + itself, the system interface, bootstrap, and backups must NOT appear. + +4. **Observe the banner.** The system interface should show the calm + memory-pressure strip naming what was shed. It should clear on its own once + usage subsides. + +5. **Observe the notice.** If you push the drill hard enough that a *worker + agent* (tier 7) gets shed, revive it with `mngr start --restart` (a + shed agent needs `--restart`; a plain `mngr start`/`mngr message` will not + relaunch it), then message it. Its first turn should carry the injected notice + that it was stopped for memory and its background tasks were not restarted. + Verify the ledger gained a `notice_delivered` line for it. + +6. **Observe recovery.** supervisord owns liveness, including the watchdog's + own -- the watchdog supervises nothing itself. Kill the watchdog process to + confirm supervisord brings it straight back: + + ```bash + supervisorctl status memory-watchdog # note the current pid / uptime + pkill -f "memory-watchdog" + supervisorctl status memory-watchdog # RUNNING again within a second or two, new pid + ``` + +7. **Crash-loop visibility (reserved -- not yet observable).** supervisord now + owns restarts (`autorestart` + `startretries`), so nothing writes + `service_blocked` records today and the banner's paused-service line stays + empty. There is no drill step for it until a `supervisorctl`-driven poller + repopulates that signal; see the "Crash-loop visibility (reserved)" section + of `README.md`. + +## Cleanup + +Kill any leftover synthetic hog and confirm `is_under_pressure` returns to false. diff --git a/libs/memory_watchdog/README.md b/libs/memory_watchdog/README.md new file mode 100644 index 000000000..1ab0f81a9 --- /dev/null +++ b/libs/memory_watchdog/README.md @@ -0,0 +1,90 @@ +# memory_watchdog + +Background service that keeps the container's memory usage survivable and makes +out-of-memory situations degrade gracefully instead of at the kernel's whim. + +Every few seconds it: + +1. Snapshots the process tree (`/proc`), the tmux panes, and the host's agent + labels, then classifies every process into one of eight OOM-priority tiers + (see `data_types.Tier`). +2. Writes each process's `oom_score_adj` to match its tier, so that under the + runc runtime (lima) the kernel's own OOM killer picks the most expendable + work first. Under gVisor the kernel ignores this, which is why step 3 exists. +3. If memory usage stays above the shed threshold for long enough, sheds + individual processes -- most-expendable tier first and largest process first + within a tier, stopping as soon as the projected reclaim clears the relief + threshold (and never killing a process below the minimum-RSS floor). So agent + build/test/browser subprocesses go first, then worker agents, then auxiliary + services, and the user's own agents only as a last resort. Infrastructure, the + UI, the recovery machinery, and the backups (tiers 1-4) are never shed. + +The watchdog only *decides what to shed*; it does not supervise other processes. +Liveness is owned by supervisord (see `supervisord.conf`): supervisord restarts +this watchdog if it dies, and restarts any service the watchdog sheds. + +## Outputs + +- **Shed ledger** (`runtime/memory_watchdog/events/shed/events.jsonl`): + append-only record of every kill. Backed up via the runtime-backup branch. + Consumed by the revival-notice hook and the dead-worker-recovery guidance. +- **Status file** (`runtime/memory_watchdog/status.json`): current usage, + threshold, whether the banner should show, what was shed recently, and which + services are blocked. The system interface reads this to render its + memory-pressure banner. (`blocked_services` is currently always empty -- it is + reserved for a future `supervisorctl`-driven crash-loop signal; see below.) + +## Tiers + +| Tier | Rank | Members | Shed | +|---|---|---|---| +| INFRASTRUCTURE | 1 | tmux server, sshd, entrypoint, pane shells, supervisord | never | +| USER_INTERFACE | 2 | system_interface, cloudflared, ttyd | never | +| RECOVERY | 3 | this watchdog | never | +| DURABILITY | 4 | runtime-backup, host-backup | never | +| USER_AGENT | 5 | user-created agents (and unlabeled agents) | last resort | +| AUXILIARY_SERVICE | 6 | web, app-watcher, agent-added services | yes | +| WORKER_AGENT | 7 | agent-created agents (workers) | yes | +| AGENT_CHILD | 8 | an agent's build/test/browser subprocesses | first | + +## How services are classified + +Background services run as `[program:*]` children of supervisord, which itself +runs in the `bootstrap` tmux pane. So a service is not its own tmux window -- +it is a process in supervisord's subtree, identified by its command line (e.g. +`uv run web-server`, `system-interface`, `bash scripts/run_ttyd.sh`). The +classifier matches each supervisord child's command to a tier; anything it does +not recognize defaults to AUXILIARY_SERVICE (tier 6), so an agent-added program +is shed before worker agents but after the recognized infrastructure. Agent +sessions (separate `mngr-` tmux sessions) are tiered by their agent label +(user-created vs worker), independent of supervisord. + +## Crash-loop visibility (reserved) + +Under the previous service manager, the bootstrap restart loop tripped a +crash-loop breaker and recorded `blocked`/`unblocked` services to the ledger so +the banner could surface a thrashing service. supervisord now owns restarts +(`autorestart` + `startretries`), so nothing writes those records today and +`blocked_services` stays empty. The ledger's block/unblock writers remain in +place so a future poller (reading `supervisorctl status` for BACKOFF/FATAL +programs) can repopulate the banner without re-plumbing. + +## Paths + +`memory_watchdog.paths` is the single source of truth for the on-disk layout: +`shed_ledger_path()` / `status_path()` (re-exported through +`memory_watchdog.ledger` for existing callers, and imported by the system +interface as the status reader). The base resolves relative to +`MNGR_AGENT_WORK_DIR` (the repo root) and is overridable via +`MEMORY_WATCHDOG_RUNTIME_DIR`. `paths` is deliberately dependency-free (stdlib +only), so even the SessionStart notice hook +(`scripts/claude_shed_notice_hook.py`) -- which runs in a plain claude +environment that cannot import the package's heavier modules -- imports the same +helper (by putting the package's `src` dir on `sys.path`) rather than +duplicating the layout, so producer and every reader can never resolve a +different file. + +## CLI + +- `memory-watchdog` -- run the watchdog loop (started and supervised by + supervisord via the `[program:memory-watchdog]` entry in `supervisord.conf`). diff --git a/libs/memory_watchdog/pyproject.toml b/libs/memory_watchdog/pyproject.toml new file mode 100644 index 000000000..f291be660 --- /dev/null +++ b/libs/memory_watchdog/pyproject.toml @@ -0,0 +1,20 @@ +[project] +name = "memory-watchdog" +version = "0.1.0" +description = "Classifies the container's process tree into OOM-priority tiers, tags oom_score_adj, and sheds load under memory pressure" +readme = "README.md" +requires-python = ">=3.11" +dependencies = [ + "imbue-common", + "loguru>=0.7.0", +] + +[project.scripts] +memory-watchdog = "memory_watchdog.watchdog:main" + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.hatch.build.targets.wheel] +packages = ["src/memory_watchdog"] diff --git a/libs/memory_watchdog/src/memory_watchdog/__init__.py b/libs/memory_watchdog/src/memory_watchdog/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/libs/memory_watchdog/src/memory_watchdog/classifier.py b/libs/memory_watchdog/src/memory_watchdog/classifier.py new file mode 100644 index 000000000..aecd8266c --- /dev/null +++ b/libs/memory_watchdog/src/memory_watchdog/classifier.py @@ -0,0 +1,371 @@ +from collections import defaultdict +from collections.abc import Mapping, Sequence +from typing import Final + +from imbue.imbue_common.pure import pure + +from memory_watchdog.data_types import ( + ProcessClassification, + ProcessInfo, + Tier, + TmuxPane, +) + +# Background services run as [program:*] children of supervisord (see +# supervisord.conf), not as their own tmux windows. A service is identified by a +# distinctive token in the command line of supervisord's direct child: for the +# `bash -c "... && realserver"` wrappers that token is the realserver name; for +# `uv run X` services it is X. The first matching token wins, so the tokens are +# chosen to be unambiguous across services. A supervisord child whose command +# matches nothing defaults to AUXILIARY_SERVICE (tier 6) -- so an agent-added +# program is shed before worker agents but after the recognized infrastructure. +_SERVICE_TIER_RULES: Final[tuple[tuple[str, str, Tier], ...]] = ( + ("system-interface", "system_interface", Tier.USER_INTERFACE), + ("cloudflare-tunnel", "cloudflared", Tier.USER_INTERFACE), + ("run_ttyd", "terminal", Tier.USER_INTERFACE), + ("memory-watchdog", "memory-watchdog", Tier.RECOVERY), + ("runtime-backup", "runtime-backup", Tier.DURABILITY), + ("host-backup", "host-backup", Tier.DURABILITY), + ("web-server", "web", Tier.AUXILIARY_SERVICE), + ("app-watcher", "app-watcher", Tier.AUXILIARY_SERVICE), + ("deferred_install", "deferred-install", Tier.AUXILIARY_SERVICE), +) + +_SUPERVISORD_COMMAND_BASENAME: Final[str] = "supervisord" + +# Depth (relative to an agent session's pane shell) at and below which a process +# is treated as an agent-spawned child (builds, tests, browsers) rather than the +# agent itself. The pane shell is depth 0, the claude process it launches is +# depth 1, and the tool subprocesses claude spawns are depth 2+. +_AGENT_CHILD_MIN_DEPTH: Final[int] = 2 + +# Per-agent coordination/observability helper processes that live inside an +# agent's subtree but are NOT expendable work. mngr runs a background-task loop +# for every agent, which spawns transcript streamers that feed the UI; a lead +# additionally runs a worker-report poll (create_worker.py await). These are tiny +# (KB-MB) yet load-bearing: shedding one frees nothing meaningful but blinds the +# UI or severs lead<->worker coordination (the lead's poll is what tells it a +# worker was paused). So they are classified as never-shed infrastructure rather +# than as expendable agent children, regardless of their depth. Matched by a +# distinctive token anywhere in the command line. +_AGENT_INFRA_HELPER_TOKENS: Final[tuple[str, ...]] = ( + "claude_background_tasks.sh", + "stream_transcript.sh", + "common_transcript.sh", + "create_worker.py", +) + + +@pure +def _is_agent_infra_helper(command_line: str) -> bool: + """Whether a process inside an agent's subtree is mngr coordination/observability + machinery that must never be shed (see ``_AGENT_INFRA_HELPER_TOKENS``).""" + return any(token in command_line for token in _AGENT_INFRA_HELPER_TOKENS) + + +@pure +def _command_basename(command_line: str) -> str: + first_token = command_line.split(" ", 1)[0] if command_line else "" + return first_token.rsplit("/", 1)[-1] + + +@pure +def _is_supervisord(command_line: str) -> bool: + """Whether this process is supervisord (the root of the service subtree). + + supervisord may be exec'd directly (argv[0] = ``.../supervisord``) or run + through the interpreter (``/usr/bin/python3 /usr/bin/supervisord ...``, which + is how the container's image launches it). So match when the basename of + either of the first two argv tokens is exactly ``supervisord``, rather than + only checking argv[0] -- otherwise the interpreter form is missed and every + service falls through to the protected infrastructure tier and is never shed. + Only the first two tokens are considered so a later argument or config/log + path (``supervisord.conf``) cannot be mistaken for the process itself. + """ + tokens = command_line.split() + return any( + token.rsplit("/", 1)[-1] == _SUPERVISORD_COMMAND_BASENAME + for token in tokens[:2] + ) + + +@pure +def _short_command_label(command_line: str, fallback: str) -> str: + """A compact label for a process, for the ledger and banner.""" + basename = _command_basename(command_line) + return basename or fallback + + +# Interpreters/launchers whose own name ("python3", "uv", "node") says little +# about what is actually running -- for these we look past the launcher to the +# first real target (a script path or subcommand) so the label is specific. +_COMMAND_RUNNERS: Final[frozenset[str]] = frozenset( + { + "python", + "python3", + "node", + "nodejs", + "bash", + "sh", + "uv", + "uvx", + "npx", + "ruby", + "perl", + "env", + "sudo", + } +) +# Runner sub-tokens to skip while reaching for the real target (e.g. the "run" +# in "uv run pytest", the "-m" in "python3 -m pytest"). +_RUNNER_SKIP_TOKENS: Final[frozenset[str]] = frozenset({"run", "exec", "-m"}) + + +@pure +def _describe_command(command_line: str, fallback: str) -> str: + """A specific label for a subprocess, e.g. "python3 hog.py", "pytest". + + A bare interpreter name ("python3") is uninformative, so when the command is + a known runner we append the basename of the first real target token (the + script or subcommand) -- "python3 /tmp/hog.py" -> "python3 hog.py", + "uv run pytest" -> "uv pytest", "python3 -m pytest" -> "python3 pytest". A + non-runner command keeps its own basename ("/usr/bin/pytest" -> "pytest"). + """ + tokens = command_line.split() + if not tokens: + return fallback or "process" + runner = tokens[0].rsplit("/", 1)[-1] + if runner not in _COMMAND_RUNNERS: + return runner or fallback or "process" + for token in tokens[1:]: + if token in _RUNNER_SKIP_TOKENS: + continue + if token.startswith("-"): + continue + return f"{runner} {token.rsplit('/', 1)[-1]}" + return runner + + +@pure +def _service_tier_and_label(command_line: str) -> tuple[Tier, str]: + """Tier + label for one supervisord child, matched by its command line.""" + for token, label, tier in _SERVICE_TIER_RULES: + if token in command_line: + return tier, label + return Tier.AUXILIARY_SERVICE, _short_command_label(command_line, "service") + + +@pure +def _agent_name_from_session(session_name: str, mngr_prefix: str) -> str | None: + """Return the agent name for an agent session, or None if it is not one. + + Agent sessions are named ````. The services + session is excluded by the caller, so any other prefixed session is an + agent. + """ + if mngr_prefix and session_name.startswith(mngr_prefix): + return session_name[len(mngr_prefix) :] + return None + + +@pure +def _agent_tier( + agent_name: str, + user_created_agent_names: frozenset[str], + agent_created_agent_names: frozenset[str], +) -> Tier: + """Tier for an agent session's main process. + + User-created agents are tier 5. Agents explicitly created by other agents + (workers) are tier 7. An agent we have no label for defaults protectively to + tier 5 -- we would rather shed it last than risk shedding a user's agent + early. + """ + if agent_name in user_created_agent_names: + return Tier.USER_AGENT + if agent_name in agent_created_agent_names: + return Tier.WORKER_AGENT + return Tier.USER_AGENT + + +@pure +def _build_children_by_parent(processes: Sequence[ProcessInfo]) -> dict[int, list[int]]: + children_by_parent: dict[int, list[int]] = defaultdict(list) + for process in processes: + children_by_parent[process.parent_pid].append(process.pid) + return children_by_parent + + +@pure +def _walk_subtree_depths( + root_pid: int, + children_by_parent: Mapping[int, Sequence[int]], + already_assigned: frozenset[int], +) -> list[tuple[int, int]]: + """Return (pid, depth) for every process in root_pid's subtree. + + The root is depth 0. Processes already assigned to another pane's subtree + are skipped (and their descendants pruned) so each process lands in exactly + one tier. + """ + discovered: list[tuple[int, int]] = [] + frontier: list[tuple[int, int]] = [(root_pid, 0)] + visited: set[int] = set() + while frontier: + pid, depth = frontier.pop() + if pid in visited or pid in already_assigned: + continue + visited.add(pid) + discovered.append((pid, depth)) + for child_pid in children_by_parent.get(pid, ()): + frontier.append((child_pid, depth + 1)) + return discovered + + +@pure +def classify_processes( + processes: Sequence[ProcessInfo], + panes: Sequence[TmuxPane], + services_session_name: str, + mngr_prefix: str, + user_created_agent_names: frozenset[str], + agent_created_agent_names: frozenset[str], +) -> list[ProcessClassification]: + """Assign every process an OOM-priority tier. + + Three passes: + + 1. Services. Each supervisord child roots one service's subtree, tiered by + matching its command line; supervisord itself is infrastructure. This is + how background services are tiered now that they are supervisord children + rather than individual ``svc-`` tmux windows. + 2. Panes. Agent sessions map to their agent's tier, with that agent's tool + subprocesses (depth >= 2) dropped to AGENT_CHILD. The services session's + remaining processes -- the supervisord launch chain and the services + agent's own idle shell -- are infrastructure we never shed. + 3. Leftovers. Processes under no pane (the tmux server, sshd, the container + entrypoint, anything else) are treated as INFRASTRUCTURE. + """ + process_by_pid: dict[int, ProcessInfo] = {p.pid: p for p in processes} + children_by_parent = _build_children_by_parent(processes) + + classifications: list[ProcessClassification] = [] + assigned_pids: set[int] = set() + + # Pass 1: supervisord-managed services. supervisord (and its launch chain, + # classified as infrastructure in pass 2) is never shed; each direct child + # roots a service subtree tiered by command line. + for process in processes: + if not _is_supervisord(process.command_line): + continue + assigned_pids.add(process.pid) + classifications.append( + ProcessClassification( + pid=process.pid, + resident_kb=process.resident_kb, + tier=Tier.INFRASTRUCTURE, + label=_SUPERVISORD_COMMAND_BASENAME, + ) + ) + for child_pid in children_by_parent.get(process.pid, ()): + child = process_by_pid.get(child_pid) + if child is None: + continue + tier, label = _service_tier_and_label(child.command_line) + for pid, _depth in _walk_subtree_depths( + child_pid, children_by_parent, frozenset(assigned_pids) + ): + descendant = process_by_pid.get(pid) + if descendant is None: + continue + assigned_pids.add(pid) + classifications.append( + ProcessClassification( + pid=pid, + resident_kb=descendant.resident_kb, + tier=tier, + label=label, + ) + ) + + # Pass 2: panes. Assign each pane's subtree; a process already claimed (by + # pass 1 or an earlier pane) is not reclassified. + for pane in panes: + if pane.pane_pid not in process_by_pid: + continue + is_services_session = pane.session_name == services_session_name + agent_name = ( + None + if is_services_session + else _agent_name_from_session(pane.session_name, mngr_prefix) + ) + + subtree = _walk_subtree_depths( + pane.pane_pid, children_by_parent, frozenset(assigned_pids) + ) + for pid, depth in subtree: + process = process_by_pid.get(pid) + if process is None: + continue + owning_agent: str | None = None + if is_services_session: + # The supervisord launch chain (shell, uv) and the services + # agent's own idle shell. Never shed: killing any of these tears + # down supervisord or the services session itself. + tier = Tier.INFRASTRUCTURE + label = pane.window_name + elif depth == 0: + # An agent session's pane shell. Never shed it: killing it drops + # the session instead of leaving it idle for revive-on-message. + tier = Tier.INFRASTRUCTURE + label = pane.session_name + elif agent_name is not None: + base_tier = _agent_tier( + agent_name, user_created_agent_names, agent_created_agent_names + ) + owning_agent = agent_name + if _is_agent_infra_helper(process.command_line): + # Per-agent coordination/observability machinery (the + # background-task loop, transcript streamers, a lead's + # worker-report poll). Never shed -- it frees ~nothing but + # blinds the UI or severs lead<->worker coordination -- so it + # rides with the protected infrastructure tier regardless of + # its depth in the agent's subtree. + tier = Tier.INFRASTRUCTURE + label = _describe_command(process.command_line, agent_name) + elif depth >= _AGENT_CHILD_MIN_DEPTH: + tier = Tier.AGENT_CHILD + label = _describe_command(process.command_line, agent_name) + else: + tier = base_tier + label = agent_name + else: + # A non-services session whose name lacks the agent prefix, so + # we cannot resolve an agent for it; protect it like a user agent. + tier = Tier.USER_AGENT + label = pane.session_name + assigned_pids.add(pid) + classifications.append( + ProcessClassification( + pid=pid, + resident_kb=process.resident_kb, + tier=tier, + label=label, + owning_agent_name=owning_agent, + ) + ) + + # Pass 3: everything outside any pane subtree is infrastructure we never shed. + for process in processes: + if process.pid in assigned_pids: + continue + classifications.append( + ProcessClassification( + pid=process.pid, + resident_kb=process.resident_kb, + tier=Tier.INFRASTRUCTURE, + label=_short_command_label(process.command_line, "system"), + ) + ) + + return classifications diff --git a/libs/memory_watchdog/src/memory_watchdog/classifier_test.py b/libs/memory_watchdog/src/memory_watchdog/classifier_test.py new file mode 100644 index 000000000..8c8356798 --- /dev/null +++ b/libs/memory_watchdog/src/memory_watchdog/classifier_test.py @@ -0,0 +1,348 @@ +from memory_watchdog.classifier import classify_processes +from memory_watchdog.data_types import ProcessInfo, Tier, TmuxPane + +_SERVICES_SESSION = "mngr-services" +_PREFIX = "mngr-" + +_SYSTEM_INTERFACE_CMD = ( + 'bash -c "python3 scripts/forward_port.py --url http://localhost:8000 ' + '--name system_interface && system-interface"' +) + + +def _tier_by_pid(classifications) -> dict[int, Tier]: + return {c.pid: c.tier for c in classifications} + + +def _label_by_pid(classifications) -> dict[int, str]: + return {c.pid: c.label for c in classifications} + + +def _owning_by_pid(classifications) -> dict[int, str | None]: + return {c.pid: c.owning_agent_name for c in classifications} + + +def _build_standard_tree() -> tuple[list[ProcessInfo], list[TmuxPane]]: + """A representative supervisord-era container. + + The services session has one ``bootstrap`` window whose pane runs the shell + that exec'd supervisord; every background service is a supervisord child (not + its own tmux window). The session also has the services agent's idle window. + Two agent sessions (a user agent with a tool subprocess, and a worker) round + it out. + """ + processes = [ + # Infrastructure not under any pane. + ProcessInfo(pid=1, parent_pid=0, resident_kb=1000, command_line="/sbin/init"), + ProcessInfo(pid=10, parent_pid=1, resident_kb=2000, command_line="tmux"), + ProcessInfo( + pid=11, parent_pid=1, resident_kb=1500, command_line="/usr/sbin/sshd" + ), + # Services session, "bootstrap" window: shell -> supervisord -> services. + ProcessInfo(pid=100, parent_pid=10, resident_kb=500, command_line="bash"), + ProcessInfo( + pid=102, + parent_pid=100, + resident_kb=4000, + command_line="supervisord -n -c supervisord.conf", + ), + # supervisord children (one per service), some with a grandchild. + ProcessInfo( + pid=110, + parent_pid=102, + resident_kb=2000, + command_line=_SYSTEM_INTERFACE_CMD, + ), + ProcessInfo( + pid=111, parent_pid=110, resident_kb=80000, command_line="system-interface" + ), + ProcessInfo( + pid=120, parent_pid=102, resident_kb=20000, command_line="uv run web-server" + ), + ProcessInfo( + pid=130, + parent_pid=102, + resident_kb=15000, + command_line="uv run memory-watchdog", + ), + ProcessInfo( + pid=140, + parent_pid=102, + resident_kb=12000, + command_line="uv run host-backup", + ), + ProcessInfo( + pid=150, + parent_pid=102, + resident_kb=3000, + command_line="bash scripts/run_ttyd.sh", + ), + ProcessInfo( + pid=160, parent_pid=102, resident_kb=30000, command_line="my-dashboard" + ), + # Services session, the services agent's own idle window. + ProcessInfo(pid=170, parent_pid=10, resident_kb=500, command_line="bash"), + ProcessInfo( + pid=171, parent_pid=170, resident_kb=120000, command_line="node claude" + ), + # User agent session: claude + a tool subprocess running pytest. + ProcessInfo(pid=200, parent_pid=10, resident_kb=500, command_line="bash"), + ProcessInfo( + pid=201, parent_pid=200, resident_kb=300000, command_line="node claude" + ), + ProcessInfo( + pid=202, parent_pid=201, resident_kb=8000, command_line="bash -c pytest" + ), + ProcessInfo( + pid=203, parent_pid=202, resident_kb=500000, command_line="/usr/bin/pytest" + ), + # Worker agent session: just claude. + ProcessInfo(pid=300, parent_pid=10, resident_kb=500, command_line="bash"), + ProcessInfo( + pid=301, parent_pid=300, resident_kb=250000, command_line="node claude" + ), + ] + panes = [ + TmuxPane(session_name=_SERVICES_SESSION, window_name="bootstrap", pane_pid=100), + TmuxPane(session_name=_SERVICES_SESSION, window_name="0", pane_pid=170), + TmuxPane(session_name="mngr-alice", window_name="0", pane_pid=200), + TmuxPane(session_name="mngr-worker7", window_name="0", pane_pid=300), + ] + return processes, panes + + +def _classify(processes, panes): + return classify_processes( + processes=processes, + panes=panes, + services_session_name=_SERVICES_SESSION, + mngr_prefix=_PREFIX, + user_created_agent_names=frozenset({"alice"}), + agent_created_agent_names=frozenset({"worker7"}), + ) + + +def test_services_are_tiered_by_supervisord_child_command() -> None: + processes, panes = _build_standard_tree() + tier_by_pid = _tier_by_pid(_classify(processes, panes)) + + # supervisord itself and the shell that launched it are never shed. + assert tier_by_pid[100] == Tier.INFRASTRUCTURE + assert tier_by_pid[102] == Tier.INFRASTRUCTURE + # Each service is tiered by its command line; a service's whole subtree + # (e.g. the system_interface bash wrapper plus the server) shares the tier. + assert tier_by_pid[110] == Tier.USER_INTERFACE + assert tier_by_pid[111] == Tier.USER_INTERFACE + assert tier_by_pid[120] == Tier.AUXILIARY_SERVICE # web + assert tier_by_pid[130] == Tier.RECOVERY # memory-watchdog + assert tier_by_pid[140] == Tier.DURABILITY # host-backup + assert tier_by_pid[150] == Tier.USER_INTERFACE # ttyd / terminal + # An unrecognized supervisord child (agent-added) defaults to auxiliary. + assert tier_by_pid[160] == Tier.AUXILIARY_SERVICE + + +def test_services_tiered_when_supervisord_launched_via_interpreter() -> None: + # The container image launches supervisord as `python3 /usr/bin/supervisord`, + # so argv[0] is the interpreter rather than supervisord. Detection must still + # find it and tier the service children; otherwise every service falls + # through to the protected infrastructure tier and is never shed. + processes, panes = _build_standard_tree() + processes = [ + p.model_copy( + update={ + "command_line": "/usr/bin/python3 /usr/bin/supervisord -n -c supervisord.conf" + } + ) + if p.pid == 102 + else p + for p in processes + ] + tier_by_pid = _tier_by_pid(_classify(processes, panes)) + assert tier_by_pid[102] == Tier.INFRASTRUCTURE + assert tier_by_pid[120] == Tier.AUXILIARY_SERVICE # web + assert tier_by_pid[130] == Tier.RECOVERY # memory-watchdog + assert tier_by_pid[111] == Tier.USER_INTERFACE # system_interface + + +def test_service_label_is_the_service_name_not_the_window() -> None: + processes, panes = _build_standard_tree() + label_by_pid = _label_by_pid(_classify(processes, panes)) + assert label_by_pid[111] == "system_interface" + assert label_by_pid[120] == "web" + assert label_by_pid[130] == "memory-watchdog" + assert label_by_pid[150] == "terminal" + + +def test_services_agent_idle_window_is_protected() -> None: + processes, panes = _build_standard_tree() + tier_by_pid = _tier_by_pid(_classify(processes, panes)) + # The services agent's own idle shell and claude must never be shed -- they + # keep the services session alive. + assert tier_by_pid[170] == Tier.INFRASTRUCTURE + assert tier_by_pid[171] == Tier.INFRASTRUCTURE + + +def test_agents_and_their_children() -> None: + processes, panes = _build_standard_tree() + tier_by_pid = _tier_by_pid(_classify(processes, panes)) + # Agent pane shells are spared so the session survives shedding. + assert tier_by_pid[200] == Tier.INFRASTRUCTURE + assert tier_by_pid[300] == Tier.INFRASTRUCTURE + # The user's agent is tier 5; its tool subprocesses are tier 8. + assert tier_by_pid[201] == Tier.USER_AGENT + assert tier_by_pid[202] == Tier.AGENT_CHILD + assert tier_by_pid[203] == Tier.AGENT_CHILD + # The worker agent is tier 7. + assert tier_by_pid[301] == Tier.WORKER_AGENT + + +def test_agent_subprocess_carries_owning_agent_and_specific_label() -> None: + processes, panes = _build_standard_tree() + classifications = _classify(processes, panes) + owning = _owning_by_pid(classifications) + label = _label_by_pid(classifications) + # An agent tool subprocess is attributed to its agent... + assert owning[203] == "alice" # /usr/bin/pytest + assert label[203] == "pytest" + # ...and an interpreter command gets a label past the interpreter name. + assert label[202] == "bash pytest" # was "bash -c pytest" + # The agent's own process (tier 5/7) is attributed too. + assert owning[201] == "alice" # alice claude (tier 5) + assert owning[301] == "worker7" # worker claude (tier 7) + # Services and infrastructure carry no owning agent. + assert owning[120] is None # web service + assert owning[10] is None # tmux + + +def test_interpreter_subprocess_label_names_the_script() -> None: + # The exact scenario behind the request: a "python3 /tmp/hog.py" subprocess + # should read as "python3 hog.py", attributed to the agent that spawned it, + # not just "python3". + processes = [ + ProcessInfo(pid=10, parent_pid=1, resident_kb=2000, command_line="tmux"), + ProcessInfo(pid=200, parent_pid=10, resident_kb=500, command_line="bash"), + ProcessInfo( + pid=201, parent_pid=200, resident_kb=300000, command_line="node claude" + ), + ProcessInfo( + pid=202, parent_pid=201, resident_kb=500, command_line="bash /tmp/runhog.sh" + ), + ProcessInfo( + pid=203, + parent_pid=202, + resident_kb=2400000, + command_line="python3 /tmp/hog.py", + ), + ] + panes = [TmuxPane(session_name="mngr-alice", window_name="0", pane_pid=200)] + classifications = classify_processes( + processes=processes, + panes=panes, + services_session_name=_SERVICES_SESSION, + mngr_prefix=_PREFIX, + user_created_agent_names=frozenset({"alice"}), + agent_created_agent_names=frozenset(), + ) + label = _label_by_pid(classifications) + owning = _owning_by_pid(classifications) + assert _tier_by_pid(classifications)[203] == Tier.AGENT_CHILD + assert label[203] == "python3 hog.py" + assert owning[203] == "alice" + + +def test_agent_coordination_and_observability_helpers_are_never_shed() -> None: + # An agent's mngr machinery -- the background-task loop, the transcript + # streamers it spawns, and a lead's worker-report poll -- shares the agent's + # subtree but must never be shed: doing so frees ~nothing yet blinds the UI or + # severs lead<->worker coordination. They are classified as infrastructure, + # while the agent's actual work subprocess stays an expendable agent child. + processes = [ + ProcessInfo(pid=10, parent_pid=1, resident_kb=2000, command_line="tmux"), + ProcessInfo(pid=200, parent_pid=10, resident_kb=500, command_line="bash"), + ProcessInfo( + pid=201, parent_pid=200, resident_kb=300000, command_line="node claude" + ), + # Background-task loop (depth 1, sibling of claude) + the streamers it + # spawns (depth 2). All mngr observability machinery. + ProcessInfo( + pid=210, + parent_pid=200, + resident_kb=2700, + command_line="bash /mngr/agents/agent-x/commands/claude_background_tasks.sh mngr-alice agent", + ), + ProcessInfo( + pid=211, + parent_pid=210, + resident_kb=3400, + command_line="bash /mngr/agents/agent-x/commands/stream_transcript.sh", + ), + ProcessInfo( + pid=212, + parent_pid=210, + resident_kb=2400, + command_line="bash /mngr/agents/agent-x/commands/common_transcript.sh", + ), + # The lead's worker-report poll (depth 2, child of claude). + ProcessInfo( + pid=213, + parent_pid=201, + resident_kb=11000, + command_line="uv run .agents/skills/launch-task/scripts/create_worker.py await --name w", + ), + # An actual work subprocess (depth 2/3) -- expendable. + ProcessInfo( + pid=220, parent_pid=201, resident_kb=8000, command_line="bash -c pytest" + ), + ProcessInfo( + pid=221, parent_pid=220, resident_kb=500000, command_line="/usr/bin/pytest" + ), + ] + panes = [TmuxPane(session_name="mngr-alice", window_name="0", pane_pid=200)] + classifications = classify_processes( + processes=processes, + panes=panes, + services_session_name=_SERVICES_SESSION, + mngr_prefix=_PREFIX, + user_created_agent_names=frozenset({"alice"}), + agent_created_agent_names=frozenset(), + ) + tier_by_pid = _tier_by_pid(classifications) + # The agent itself and its real work subprocesses keep their tiers. + assert tier_by_pid[201] == Tier.USER_AGENT + assert tier_by_pid[220] == Tier.AGENT_CHILD + assert tier_by_pid[221] == Tier.AGENT_CHILD + # The coordination/observability helpers are never-shed infrastructure. + assert tier_by_pid[210] == Tier.INFRASTRUCTURE # background-task loop + assert tier_by_pid[211] == Tier.INFRASTRUCTURE # transcript streamer + assert tier_by_pid[212] == Tier.INFRASTRUCTURE # common transcript + assert tier_by_pid[213] == Tier.INFRASTRUCTURE # lead's worker-report poll + + +def test_infrastructure_outside_any_pane() -> None: + processes, panes = _build_standard_tree() + tier_by_pid = _tier_by_pid(_classify(processes, panes)) + assert tier_by_pid[1] == Tier.INFRASTRUCTURE + assert tier_by_pid[10] == Tier.INFRASTRUCTURE + assert tier_by_pid[11] == Tier.INFRASTRUCTURE + + +def test_unlabeled_agent_defaults_to_user_agent_protective() -> None: + processes = [ + ProcessInfo(pid=10, parent_pid=1, resident_kb=2000, command_line="tmux"), + ProcessInfo(pid=200, parent_pid=10, resident_kb=500, command_line="bash"), + ProcessInfo( + pid=201, parent_pid=200, resident_kb=300000, command_line="node claude" + ), + ] + panes = [TmuxPane(session_name="mngr-mystery", window_name="0", pane_pid=200)] + classifications = classify_processes( + processes=processes, + panes=panes, + services_session_name=_SERVICES_SESSION, + mngr_prefix=_PREFIX, + user_created_agent_names=frozenset(), + agent_created_agent_names=frozenset(), + ) + # No label either way -> protect it at tier 5 rather than risk shedding a + # user's agent early. + assert _tier_by_pid(classifications)[201] == Tier.USER_AGENT diff --git a/libs/memory_watchdog/src/memory_watchdog/data_types.py b/libs/memory_watchdog/src/memory_watchdog/data_types.py new file mode 100644 index 000000000..f87e74351 --- /dev/null +++ b/libs/memory_watchdog/src/memory_watchdog/data_types.py @@ -0,0 +1,202 @@ +from datetime import datetime, timezone +from enum import auto +from typing import Final + +from imbue.imbue_common.enums import UpperCaseStrEnum +from imbue.imbue_common.frozen_model import FrozenModel +from imbue.imbue_common.logging import format_nanosecond_iso_timestamp +from pydantic import Field + +# The on-disk timestamp format for ledger records and the status file: +# nanosecond-precision ISO 8601 in UTC. Defined once here so producers and the +# strptime parser in watchdog._prune_recent_records cannot drift apart. +ISO_TIMESTAMP_FORMAT: Final[str] = "%Y-%m-%dT%H:%M:%S.%f000Z" + + +def now_iso_timestamp() -> str: + """Current UTC time as a nanosecond-precision ISO 8601 string. + + Delegates to imbue_common.format_nanosecond_iso_timestamp (the same helper + mngr uses for event/discovery timestamps) so the watchdog does not reimplement + the format. + """ + return format_nanosecond_iso_timestamp(datetime.now(timezone.utc)) + + +class Tier(UpperCaseStrEnum): + """A process's OOM-priority tier, from most protected to most expendable.""" + + # Tier 1: the tmux server, sshd, and the container entrypoint. Losing any of + # these loses the user's ability to reach or recover the container. + INFRASTRUCTURE = auto() + # Tier 2: the user's window into the system -- the web UI, the tunnel that + # carries it, and the terminal. + USER_INTERFACE = auto() + # Tier 3: the recovery machinery -- this watchdog. (The service manager, + # supervisord, and the bootstrap launcher are tier-1 infrastructure.) + RECOVERY = auto() + # Tier 4: durability -- the runtime and host backup services. + DURABILITY = auto() + # Tier 5: agents the user created (the initial chat plus any New Agent / + # New Chat ones). The shedder's last resort. + USER_AGENT = auto() + # Tier 6: auxiliary services -- telegram, web, app-watcher, and any service + # an agent added to supervisord.conf. + AUXILIARY_SERVICE = auto() + # Tier 7: agents created by other agents (workers and the like). + WORKER_AGENT = auto() + # Tier 8: the build/test/browser subprocesses an agent spawns. Shed first. + AGENT_CHILD = auto() + + +# Ordinal rank per tier (1 = most protected). Drives shed ordering and is the +# only place the 1..8 numbering is defined. +TIER_RANK_BY_TIER: Final[dict[Tier, int]] = { + Tier.INFRASTRUCTURE: 1, + Tier.USER_INTERFACE: 2, + Tier.RECOVERY: 3, + Tier.DURABILITY: 4, + Tier.USER_AGENT: 5, + Tier.AUXILIARY_SERVICE: 6, + Tier.WORKER_AGENT: 7, + Tier.AGENT_CHILD: 8, +} + +# oom_score_adj written to /proc//oom_score_adj per tier. Protected tiers +# (1-4) stay at 0; expendable tiers get increasingly positive values so the +# kernel's own OOM killer (where it honors per-process adjustment, i.e. runc) +# picks them first. Negative values would require CAP_SYS_RESOURCE, which the +# default Docker capability set does not grant, so positive-only tagging is used +# to achieve the same relative ordering without extra privileges. +OOM_SCORE_ADJ_BY_TIER: Final[dict[Tier, int]] = { + Tier.INFRASTRUCTURE: 0, + Tier.USER_INTERFACE: 0, + Tier.RECOVERY: 0, + Tier.DURABILITY: 0, + Tier.USER_AGENT: 200, + Tier.AUXILIARY_SERVICE: 400, + Tier.WORKER_AGENT: 600, + Tier.AGENT_CHILD: 900, +} + +# Tiers the shedder may kill, ordered most-expendable first. Tiers 1-4 are never +# shed. USER_AGENT (5) is the last resort. +SHEDDABLE_TIERS_IN_SHED_ORDER: Final[tuple[Tier, ...]] = ( + Tier.AGENT_CHILD, + Tier.WORKER_AGENT, + Tier.AUXILIARY_SERVICE, + Tier.USER_AGENT, +) + + +class ProcessInfo(FrozenModel): + """A single process as observed in /proc at one instant.""" + + pid: int = Field(description="Process id") + parent_pid: int = Field(description="Parent process id (PPid from /proc)") + resident_kb: int = Field(description="Resident set size in kibibytes") + command_line: str = Field( + description="Full argv joined by spaces, or comm when argv is empty" + ) + + +class TmuxPane(FrozenModel): + """A tmux pane: the shell process that roots one window's process subtree.""" + + session_name: str = Field(description="tmux session the pane belongs to") + window_name: str = Field( + description="tmux window name (e.g. bootstrap, 0)" + ) + pane_pid: int = Field(description="PID of the pane's root shell process") + + +class ProcessClassification(FrozenModel): + """The tier assigned to one process, with the reason for traceability.""" + + pid: int = Field(description="Process id") + resident_kb: int = Field(description="Resident set size in kibibytes") + tier: Tier = Field(description="Assigned OOM-priority tier") + # Human-readable label for what this process is (service name, agent name, or + # a fallback). Used in the shed ledger so the user can see what was killed. + label: str = Field(description="What this process is, for the ledger and banner") + # The agent whose session this process lives under, set for an agent's own + # process (tier 5/7) and for its subprocesses (tier 8) alike. Distinct from + # ShedRecord.agent_name (which only marks a shed *agent* main process, for the + # revival notice): this is for attribution/display -- "whose subprocess was + # this". None for services and infrastructure. + owning_agent_name: str | None = Field( + default=None, description="Agent whose session this process belongs to, if any" + ) + + +class MemoryPressure(FrozenModel): + """A point-in-time reading of container memory usage.""" + + total_kb: int = Field(description="MemTotal in kibibytes") + available_kb: int = Field(description="MemAvailable in kibibytes") + + @property + def used_fraction(self) -> float: + if self.total_kb <= 0: + return 0.0 + return 1.0 - (self.available_kb / self.total_kb) + + +class ShedRecord(FrozenModel): + """A single process the shedder killed, appended to the shed ledger.""" + + timestamp: str = Field( + description="Nanosecond-precision UTC ISO 8601 time of the kill" + ) + tier: Tier = Field(description="Tier the process belonged to") + tier_rank: int = Field(description="1..8 rank of the tier (8 = shed first)") + label: str = Field(description="What the process was (service, agent, or command)") + pid: int = Field(description="PID that was killed") + resident_kb: int = Field(description="Resident memory reclaimed, in kibibytes") + # Set when the killed process is an agent's own process (tier 5/7), so the + # revival-notice hook can find which agents were shed. + agent_name: str | None = Field( + description="Agent whose main process this was, if any" + ) + # The agent this process belonged to, set for an agent's subprocesses + # (tier 8) as well as its main process -- for attribution in the banner + # ("a subprocess of "). Unlike agent_name, this does NOT imply the + # agent itself was shed, so it never triggers the revival notice. + owning_agent_name: str | None = Field( + default=None, description="Agent whose session this process belonged to, if any" + ) + + +class RecentShedSummary(FrozenModel): + """An aggregated line for the UI banner: how many of one label were shed.""" + + label: str = Field(description="Process label that was shed") + tier_rank: int = Field(description="1..8 rank of the tier") + count: int = Field( + description="How many processes with this label were shed recently" + ) + reclaimed_kb: int = Field( + description="Total resident memory reclaimed for this label" + ) + owning_agent_name: str | None = Field( + default=None, description="Agent these processes belonged to, if any" + ) + + +class MemoryStatus(FrozenModel): + """The watchdog's continuously published status, read by the UI banner.""" + + timestamp: str = Field(description="When this status was written (UTC ISO 8601)") + used_fraction: float = Field(description="Fraction of total memory in use, 0..1") + total_kb: int = Field(description="MemTotal in kibibytes") + available_kb: int = Field(description="MemAvailable in kibibytes") + pressure_threshold_fraction: float = Field( + description="Used-fraction at which shedding arms" + ) + is_under_pressure: bool = Field(description="Whether the banner should be shown") + recently_shed: tuple[RecentShedSummary, ...] = Field( + description="What was shed recently" + ) + blocked_services: tuple[str, ...] = Field( + description="Services bootstrap has paused under pressure" + ) diff --git a/libs/memory_watchdog/src/memory_watchdog/ledger.py b/libs/memory_watchdog/src/memory_watchdog/ledger.py new file mode 100644 index 000000000..75082744b --- /dev/null +++ b/libs/memory_watchdog/src/memory_watchdog/ledger.py @@ -0,0 +1,126 @@ +import json +import os +import tempfile +from collections.abc import Sequence +from typing import Final + +from loguru import logger + +from memory_watchdog.data_types import MemoryStatus, ShedRecord, now_iso_timestamp + +# The ledger is the append-only history; the status file is the current-state +# read API for the UI banner and for pressure checks. Their on-disk locations +# come from memory_watchdog.paths -- the single dependency-free source of truth +# shared with the system interface and the revival hook (re-exported here so +# existing ``from memory_watchdog.ledger import ...`` callers keep working). +from memory_watchdog.paths import shed_ledger_path, status_path + +# Record-type tags written into the ledger's "type" field. +_RECORD_TYPE_PROCESS_SHED: Final[str] = "process_shed" +_RECORD_TYPE_SERVICE_BLOCKED: Final[str] = "service_blocked" +_RECORD_TYPE_SERVICE_UNBLOCKED: Final[str] = "service_unblocked" +_RECORD_TYPE_NOTICE_DELIVERED: Final[str] = "notice_delivered" + + +def _append_ledger_line(record: dict[str, object]) -> None: + ledger_path = shed_ledger_path() + ledger_path.parent.mkdir(parents=True, exist_ok=True) + with open(ledger_path, "a") as ledger_file: + ledger_file.write(json.dumps(record) + "\n") + + +def append_shed_records(records: Sequence[ShedRecord]) -> None: + """Append one ledger line per shed process.""" + for record in records: + _append_ledger_line( + { + "timestamp": record.timestamp, + "type": _RECORD_TYPE_PROCESS_SHED, + "tier": str(record.tier), + "tier_rank": record.tier_rank, + "label": record.label, + "pid": record.pid, + "resident_kb": record.resident_kb, + "agent_name": record.agent_name, + "owning_agent_name": record.owning_agent_name, + } + ) + + +def record_service_blocked(service_name: str, reason: str) -> None: + """Record that a crash-looping service was paused under pressure. + + Reserved: no caller writes these today (supervisord now owns restarts). Kept + for a future supervisorctl-driven poller -- see README's crash-loop section. + """ + _append_ledger_line( + { + "timestamp": now_iso_timestamp(), + "type": _RECORD_TYPE_SERVICE_BLOCKED, + "service": service_name, + "reason": reason, + } + ) + + +def record_service_unblocked(service_name: str) -> None: + """Record that a previously paused service resumed.""" + _append_ledger_line( + { + "timestamp": now_iso_timestamp(), + "type": _RECORD_TYPE_SERVICE_UNBLOCKED, + "service": service_name, + } + ) + + +def read_currently_blocked_services() -> list[str]: + """Compute which services are presently paused, from the append-only ledger. + + A ``service_blocked`` line marks a service paused; a later + ``service_unblocked`` line for the same service clears it. Returns the + services currently in the blocked state, sorted. + """ + ledger_path = shed_ledger_path() + if not ledger_path.exists(): + return [] + blocked: set[str] = set() + try: + ledger_text = ledger_path.read_text() + except OSError as e: + logger.warning("Failed to read shed ledger for blocked services: {}", e) + return [] + for line in ledger_text.splitlines(): + if not line.strip(): + continue + try: + record = json.loads(line) + except json.JSONDecodeError: + continue + record_type = record.get("type") + service = record.get("service") + if not isinstance(service, str): + continue + if record_type == _RECORD_TYPE_SERVICE_BLOCKED: + blocked.add(service) + elif record_type == _RECORD_TYPE_SERVICE_UNBLOCKED: + blocked.discard(service) + return sorted(blocked) + + +def write_status(status: MemoryStatus) -> None: + """Atomically write the current status file (the UI banner's data source).""" + target_path = status_path() + target_path.parent.mkdir(parents=True, exist_ok=True) + payload = status.model_dump_json() + tmp_fd, tmp_name = tempfile.mkstemp( + dir=target_path.parent, prefix="status.", suffix=".tmp" + ) + try: + with os.fdopen(tmp_fd, "w") as tmp_file: + tmp_file.write(payload) + os.replace(tmp_name, target_path) + except OSError as e: + logger.warning("Failed to write watchdog status file: {}", e) + if os.path.exists(tmp_name): + os.unlink(tmp_name) diff --git a/libs/memory_watchdog/src/memory_watchdog/ledger_test.py b/libs/memory_watchdog/src/memory_watchdog/ledger_test.py new file mode 100644 index 000000000..3b5d821f8 --- /dev/null +++ b/libs/memory_watchdog/src/memory_watchdog/ledger_test.py @@ -0,0 +1,92 @@ +import json +from pathlib import Path + +import pytest + +from memory_watchdog.data_types import MemoryStatus, RecentShedSummary, ShedRecord, Tier +from memory_watchdog.ledger import ( + append_shed_records, + read_currently_blocked_services, + record_service_blocked, + record_service_unblocked, + shed_ledger_path, + status_path, + write_status, +) + + +@pytest.fixture(autouse=True) +def _redirect_runtime_dir(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("MEMORY_WATCHDOG_RUNTIME_DIR", str(tmp_path / "memory_watchdog")) + + +def test_append_shed_records_writes_one_jsonl_line_each() -> None: + records = [ + ShedRecord( + timestamp="2026-06-12T10:00:00.000000000Z", + tier=Tier.AGENT_CHILD, + tier_rank=8, + label="pytest", + pid=42, + resident_kb=500000, + agent_name=None, + ), + ShedRecord( + timestamp="2026-06-12T10:00:01.000000000Z", + tier=Tier.AGENT_CHILD, + tier_rank=8, + label="python3 build.py", + pid=43, + resident_kb=250000, + agent_name=None, + owning_agent_name="worker7", + ), + ] + append_shed_records(records) + lines = shed_ledger_path().read_text().splitlines() + assert len(lines) == 2 + first = json.loads(lines[0]) + assert first["type"] == "process_shed" + assert first["label"] == "pytest" + assert first["agent_name"] is None + # The owning agent is persisted so the durable ledger is not lossier than + # the live status file (a subprocess shed carries its owning agent even + # though agent_name -- which drives the revival notice -- stays None). + assert first["owning_agent_name"] is None + second = json.loads(lines[1]) + assert second["agent_name"] is None + assert second["owning_agent_name"] == "worker7" + + +def test_blocked_services_reflect_block_then_unblock() -> None: + record_service_blocked("web", "crash-looped") + record_service_blocked("telegram", "crash-looped") + assert read_currently_blocked_services() == ["telegram", "web"] + record_service_unblocked("web") + assert read_currently_blocked_services() == ["telegram"] + + +def test_blocked_services_empty_when_no_ledger() -> None: + assert read_currently_blocked_services() == [] + + +def test_write_status_round_trips() -> None: + status = MemoryStatus( + timestamp="2026-06-12T10:00:00.000000000Z", + used_fraction=0.93, + total_kb=4_000_000, + available_kb=280_000, + pressure_threshold_fraction=0.90, + is_under_pressure=True, + recently_shed=( + RecentShedSummary( + label="pytest", tier_rank=8, count=2, reclaimed_kb=500000 + ), + ), + blocked_services=("web",), + ) + write_status(status) + written = json.loads(status_path().read_text()) + assert written["is_under_pressure"] is True + assert written["recently_shed"][0]["label"] == "pytest" + assert written["blocked_services"] == ["web"] diff --git a/libs/memory_watchdog/src/memory_watchdog/paths.py b/libs/memory_watchdog/src/memory_watchdog/paths.py new file mode 100644 index 000000000..7cdbd8dca --- /dev/null +++ b/libs/memory_watchdog/src/memory_watchdog/paths.py @@ -0,0 +1,43 @@ +"""On-disk layout of the watchdog's ledger and status file -- the single source +of truth for where those files live. + +The watchdog (writer), the system interface (status reader), and the revival +SessionStart hook (ledger reader) all resolve the same paths through this +module, so the layout can't drift between producer and consumers. + +This module deliberately imports nothing beyond the standard library. The +revival hook runs in a plain ``python3`` environment (not ``uv run``), where the +heavier ``memory_watchdog`` modules' third-party dependencies (loguru, pydantic, +imbue_common) are unavailable; keeping the path logic dependency-free lets that +hook import it by putting this package's ``src`` directory on ``sys.path``. + +Both files live under ``runtime/`` so they ride the runtime-backup branch and +survive container loss. The base resolves relative to the agent work dir (the +repo root, where every service runs), falling back to the current directory, and +is overridable in full via ``MEMORY_WATCHDOG_RUNTIME_DIR`` -- honored uniformly +so a production override can't make readers and the writer diverge. +""" + +import os +from pathlib import Path +from typing import Final + +_RUNTIME_DIR_ENV_VAR: Final[str] = "MEMORY_WATCHDOG_RUNTIME_DIR" +_RUNTIME_SUBDIR: Final[Path] = Path("runtime") / "memory_watchdog" + + +def watchdog_runtime_dir() -> Path: + override = os.environ.get(_RUNTIME_DIR_ENV_VAR, "") + if override: + return Path(override) + work_dir = os.environ.get("MNGR_AGENT_WORK_DIR", "") + base = Path(work_dir) if work_dir else Path.cwd() + return base / _RUNTIME_SUBDIR + + +def shed_ledger_path() -> Path: + return watchdog_runtime_dir() / "events" / "shed" / "events.jsonl" + + +def status_path() -> Path: + return watchdog_runtime_dir() / "status.json" diff --git a/libs/memory_watchdog/src/memory_watchdog/shedder.py b/libs/memory_watchdog/src/memory_watchdog/shedder.py new file mode 100644 index 000000000..51ae8971a --- /dev/null +++ b/libs/memory_watchdog/src/memory_watchdog/shedder.py @@ -0,0 +1,166 @@ +import os +import signal +from collections import defaultdict +from collections.abc import Sequence +from typing import Final + +from imbue.imbue_common.pure import pure +from loguru import logger + +from memory_watchdog.data_types import ( + SHEDDABLE_TIERS_IN_SHED_ORDER, + TIER_RANK_BY_TIER, + ProcessClassification, + RecentShedSummary, + ShedRecord, + Tier, + now_iso_timestamp, +) + +# Shed-priority index per tier: AGENT_CHILD first (most expendable), USER_AGENT +# last. Used to order shed candidates across tiers while preserving the tier +# hierarchy -- a more-protected process is never shed before a less-protected +# one, regardless of resident size. +_SHED_ORDER_INDEX: Final[dict[Tier, int]] = { + tier: index for index, tier in enumerate(SHEDDABLE_TIERS_IN_SHED_ORDER) +} + + +@pure +def _projected_used_fraction(available_kb: int, freed_kb: int, total_kb: int) -> float: + """Used fraction we expect once `freed_kb` of resident memory is reclaimed.""" + if total_kb <= 0: + return 0.0 + return 1.0 - ((available_kb + freed_kb) / total_kb) + + +@pure +def select_processes_to_shed( + classifications: Sequence[ProcessClassification], + available_kb: int, + total_kb: int, + relief_threshold: float, + min_resident_kb: int, +) -> list[ProcessClassification]: + """Choose the individual processes to shed, stopping as soon as the projected + post-shed usage drops below the relief threshold. + + Candidates are ordered by tier shed-priority first (AGENT_CHILD before + WORKER_AGENT before AUXILIARY_SERVICE before USER_AGENT) and, within a tier, + largest resident set first. So the cheapest, biggest wins come first and we + stop the instant the projection clears relief -- shedding the one process + actually holding the memory rather than its whole tier. This is what keeps a + single large agent-child hog from taking down the agent's claude, its + transcript streamer, its lead's report poll, and every other tier-8 helper + alongside it. + + Processes whose resident set is below `min_resident_kb` are never shed: + killing them frees too little to move the needle, so doing so would be pure + collateral (sleeps, transcript streamers, coordination polls). The next poll + re-reads real usage and sheds again if the estimate fell short. + + The projection is based on the processes' resident memory -- what shedding + them is expected to reclaim -- rather than re-reading /proc/meminfo between + kills. The kernel reclaims a SIGKILLed process's pages asynchronously, so an + immediate re-read still reports the pre-kill usage and would make the shedder + over-shed. + """ + candidates = [ + classification + for classification in classifications + if classification.tier in _SHED_ORDER_INDEX + and classification.resident_kb >= min_resident_kb + ] + candidates.sort(key=lambda c: (_SHED_ORDER_INDEX[c.tier], -c.resident_kb)) + chosen: list[ProcessClassification] = [] + freed_kb = 0 + for candidate in candidates: + if ( + _projected_used_fraction(available_kb, freed_kb, total_kb) + < relief_threshold + ): + break + chosen.append(candidate) + freed_kb += candidate.resident_kb + return chosen + + +@pure +def summarize_recent_sheds(records: Sequence[ShedRecord]) -> list[RecentShedSummary]: + """Aggregate shed records for the UI banner. + + Grouped by (label, owning agent) so the same command run by two different + agents stays on separate lines and each line can name its owning agent. + """ + count_by_key: dict[tuple[str, str | None], int] = defaultdict(int) + reclaimed_by_key: dict[tuple[str, str | None], int] = defaultdict(int) + rank_by_key: dict[tuple[str, str | None], int] = {} + for record in records: + key = (record.label, record.owning_agent_name) + count_by_key[key] = count_by_key[key] + 1 + reclaimed_by_key[key] = reclaimed_by_key[key] + record.resident_kb + rank_by_key[key] = record.tier_rank + summaries = [ + RecentShedSummary( + label=label, + tier_rank=rank_by_key[(label, owning_agent_name)], + count=count, + reclaimed_kb=reclaimed_by_key[(label, owning_agent_name)], + owning_agent_name=owning_agent_name, + ) + for (label, owning_agent_name), count in count_by_key.items() + ] + return sorted(summaries, key=lambda s: s.reclaimed_kb, reverse=True) + + +def _kill_process(pid: int) -> bool: + """SIGKILL one process. Returns whether the signal was delivered. + + SIGKILL (not SIGTERM) is deliberate: under memory pressure we need the + resident set reclaimed immediately, and waiting for graceful shutdown is a + luxury we do not have. A vanished process (already dead) is treated as a + success -- the goal state is reached either way. + """ + try: + os.kill(pid, signal.SIGKILL) + except ProcessLookupError: + return True + except PermissionError as e: + logger.warning("Not permitted to kill pid {}: {}", pid, e) + return False + return True + + +def shed_processes(targets: Sequence[ProcessClassification]) -> list[ShedRecord]: + """SIGKILL each chosen process and return a ledger record per kill. + + The watchdog's own process and its process group are never selected (they + are not in a sheddable tier), but are skipped defensively. + """ + own_pid = os.getpid() + own_group = os.getpgrp() + records: list[ShedRecord] = [] + for target in targets: + if target.pid == own_pid: + continue + try: + if os.getpgid(target.pid) == own_group: + continue + except (ProcessLookupError, PermissionError): + pass + if not _kill_process(target.pid): + continue + is_agent_process = target.tier in (Tier.USER_AGENT, Tier.WORKER_AGENT) + records.append( + ShedRecord( + timestamp=now_iso_timestamp(), + tier=target.tier, + tier_rank=TIER_RANK_BY_TIER[target.tier], + label=target.label, + pid=target.pid, + resident_kb=target.resident_kb, + agent_name=target.label if is_agent_process else None, + owning_agent_name=target.owning_agent_name, + ) + ) + return records diff --git a/libs/memory_watchdog/src/memory_watchdog/shedder_test.py b/libs/memory_watchdog/src/memory_watchdog/shedder_test.py new file mode 100644 index 000000000..c89909fb9 --- /dev/null +++ b/libs/memory_watchdog/src/memory_watchdog/shedder_test.py @@ -0,0 +1,301 @@ +import subprocess + +from memory_watchdog.data_types import ProcessClassification, ShedRecord, Tier +from memory_watchdog.shedder import ( + select_processes_to_shed, + shed_processes, + summarize_recent_sheds, +) + +# Relief at 80% used: the watchdog stops shedding once projected usage drops +# below this. Matches watchdog.SHED_RELIEF_THRESHOLD. +_RELIEF = 0.80 +# Resident-size floor below which a process is never shed. Matches +# watchdog.MIN_SHEDDABLE_RSS_KB (10 MiB). +_FLOOR = 10 * 1024 + + +def _classification( + pid: int, + tier: Tier, + resident_kb: int, + label: str, + owning_agent_name: str | None = None, +) -> ProcessClassification: + return ProcessClassification( + pid=pid, + resident_kb=resident_kb, + tier=tier, + label=label, + owning_agent_name=owning_agent_name, + ) + + +def _pids(chosen: list[ProcessClassification]) -> list[int]: + return [c.pid for c in chosen] + + +def test_sheds_only_the_hog_and_stops_when_it_frees_enough() -> None: + # An 800MB agent-child hog under acute pressure (50MB available of 1000MB). + # Shedding it alone projects usage down to 15%, so the user's agent (tier 5) + # must NOT be selected -- the over-shed regression guard. + classifications = [ + _classification(1, Tier.AGENT_CHILD, 800_000, "hog"), + _classification(2, Tier.USER_AGENT, 100_000, "claude"), + ] + chosen = select_processes_to_shed( + classifications, + available_kb=50_000, + total_kb=1_000_000, + relief_threshold=_RELIEF, + min_resident_kb=_FLOOR, + ) + assert _pids(chosen) == [1] + + +def test_spares_small_helpers_sharing_the_hogs_tier() -> None: + # The collateral-damage regression: a big agent-child hog plus a swarm of + # tiny tier-8 helpers (transcript streamer, a lead's report poll, a sleep). + # Only the hog is shed; the helpers, the worker's own claude, and the lead's + # agent all survive -- shedding the hog alone already clears relief, and the + # sub-floor helpers could not have helped anyway. + classifications = [ + _classification(1, Tier.AGENT_CHILD, 800_000, "python3 hog.py", "worker"), + _classification( + 2, Tier.AGENT_CHILD, 3_000, "bash stream_transcript.sh", "worker" + ), + _classification( + 3, Tier.AGENT_CHILD, 11_000, "python3 create_worker.py", "lead" + ), + _classification(4, Tier.AGENT_CHILD, 1_100, "sleep", "worker"), + _classification(5, Tier.WORKER_AGENT, 300_000, "worker", "worker"), + _classification(6, Tier.USER_AGENT, 250_000, "lead", "lead"), + ] + chosen = select_processes_to_shed( + classifications, + available_kb=50_000, + total_kb=1_000_000, + relief_threshold=_RELIEF, + min_resident_kb=_FLOOR, + ) + assert _pids(chosen) == [1] + + +def test_escalates_across_tiers_but_stops_before_user_agent() -> None: + # Tier 8 and 7 are small; only after shedding both does projected usage clear + # relief, so escalation reaches the worker tier but still stops before the + # user agent (tier 5). + classifications = [ + _classification(1, Tier.AGENT_CHILD, 100_000, "child"), + _classification(2, Tier.WORKER_AGENT, 120_000, "worker"), + _classification(3, Tier.USER_AGENT, 500_000, "claude"), + ] + chosen = select_processes_to_shed( + classifications, + available_kb=50_000, + total_kb=1_000_000, + relief_threshold=_RELIEF, + min_resident_kb=_FLOOR, + ) + assert _pids(chosen) == [1, 2] + + +def test_orders_by_tier_then_largest_resident_first() -> None: + # Pressure so acute that even shedding everything cannot reach relief, so all + # candidates are selected -- letting us assert the ORDER: lower tier first + # (agent-child before worker), largest resident set first within a tier. + classifications = [ + _classification(1, Tier.AGENT_CHILD, 30_000, "small-child"), + _classification(2, Tier.AGENT_CHILD, 50_000, "big-child"), + _classification(3, Tier.WORKER_AGENT, 70_000, "worker"), + ] + chosen = select_processes_to_shed( + classifications, + available_kb=10_000, + total_kb=1_000_000, + relief_threshold=_RELIEF, + min_resident_kb=_FLOOR, + ) + assert _pids(chosen) == [2, 1, 3] + + +def test_sheds_nothing_when_already_relieved() -> None: + classifications = [_classification(1, Tier.AGENT_CHILD, 50_000, "child")] + chosen = select_processes_to_shed( + classifications, + available_kb=300_000, + total_kb=1_000_000, + relief_threshold=_RELIEF, + min_resident_kb=_FLOOR, + ) + assert chosen == [] + + +def test_user_agent_is_last_resort() -> None: + # Everything cheaper is absent and the user agent is the sole holder; it is + # selected only because nothing else can relieve the pressure. + classifications = [_classification(1, Tier.USER_AGENT, 800_000, "claude")] + chosen = select_processes_to_shed( + classifications, + available_kb=50_000, + total_kb=1_000_000, + relief_threshold=_RELIEF, + min_resident_kb=_FLOOR, + ) + assert _pids(chosen) == [1] + + +def test_negligible_processes_are_never_shed() -> None: + # Under acute pressure but the only processes are sub-floor helpers: shedding + # any of them frees too little to matter, so nothing is shed (no pointless + # collateral). The next poll / the kernel OOM killer is the backstop. + classifications = [ + _classification(1, Tier.AGENT_CHILD, 2_000, "sleep"), + _classification(2, Tier.AGENT_CHILD, 3_000, "bash stream_transcript.sh"), + ] + chosen = select_processes_to_shed( + classifications, + available_kb=20_000, + total_kb=1_000_000, + relief_threshold=_RELIEF, + min_resident_kb=_FLOOR, + ) + assert chosen == [] + + +def test_summarize_recent_sheds_aggregates_by_label() -> None: + records = [ + ShedRecord( + timestamp="t", + tier=Tier.AGENT_CHILD, + tier_rank=8, + label="pytest", + pid=1, + resident_kb=100, + agent_name=None, + ), + ShedRecord( + timestamp="t", + tier=Tier.AGENT_CHILD, + tier_rank=8, + label="pytest", + pid=2, + resident_kb=300, + agent_name=None, + ), + ShedRecord( + timestamp="t", + tier=Tier.WORKER_AGENT, + tier_rank=7, + label="worker", + pid=3, + resident_kb=50, + agent_name="worker", + ), + ] + summaries = summarize_recent_sheds(records) + summary_by_label = {s.label: s for s in summaries} + assert summary_by_label["pytest"].count == 2 + assert summary_by_label["pytest"].reclaimed_kb == 400 + assert summary_by_label["worker"].count == 1 + # Largest reclaimer comes first. + assert summaries[0].label == "pytest" + + +def test_summarize_groups_by_owning_agent() -> None: + # The same command run by two agents must stay on separate lines so each can + # name its owning agent, rather than collapsing into one ambiguous "python3". + records = [ + ShedRecord( + timestamp="t", + tier=Tier.AGENT_CHILD, + tier_rank=8, + label="python3 hog.py", + pid=1, + resident_kb=2000, + agent_name=None, + owning_agent_name="alice", + ), + ShedRecord( + timestamp="t", + tier=Tier.AGENT_CHILD, + tier_rank=8, + label="python3 hog.py", + pid=2, + resident_kb=1000, + agent_name=None, + owning_agent_name="bob", + ), + ] + summaries = summarize_recent_sheds(records) + assert len(summaries) == 2 + by_agent = {s.owning_agent_name: s for s in summaries} + assert by_agent["alice"].count == 1 + assert by_agent["alice"].label == "python3 hog.py" + assert by_agent["bob"].count == 1 + + +def test_shed_processes_records_owning_agent_for_subprocess() -> None: + # A shed subprocess is attributed to its agent (owning_agent_name) but is not + # itself an agent shed (agent_name stays None, so no revival notice fires). + victim = subprocess.Popen(["sleep", "98768"], start_new_session=True) + try: + records = shed_processes( + [ + _classification( + victim.pid, Tier.AGENT_CHILD, 1000, "python3 hog.py", "alice" + ) + ] + ) + assert len(records) == 1 + assert records[0].agent_name is None + assert records[0].owning_agent_name == "alice" + assert _wait_until_dead(victim, timeout_seconds=5.0) + finally: + if victim.poll() is None: + victim.kill() + victim.wait(timeout=5) + + +def _wait_until_dead(process: subprocess.Popen, timeout_seconds: float) -> bool: + """Block until the process exits or the timeout elapses (no busy-wait).""" + try: + process.wait(timeout=timeout_seconds) + except subprocess.TimeoutExpired: + return False + return True + + +def test_shed_processes_kills_only_the_given_targets() -> None: + # start_new_session puts each child in its own process group, so the + # shedder's same-group self-protection does not skip it. + victim = subprocess.Popen(["sleep", "98765"], start_new_session=True) + survivor = subprocess.Popen(["sleep", "98766"], start_new_session=True) + try: + records = shed_processes( + [_classification(victim.pid, Tier.AGENT_CHILD, 1000, "victim")] + ) + assert [r.pid for r in records] == [victim.pid] + assert _wait_until_dead(victim, timeout_seconds=5.0) + # A process not in the target list is untouched. + assert survivor.poll() is None + finally: + for process in (victim, survivor): + if process.poll() is None: + process.kill() + process.wait(timeout=5) + + +def test_shed_processes_marks_agent_name_only_for_agent_tiers() -> None: + victim = subprocess.Popen(["sleep", "98767"], start_new_session=True) + try: + records = shed_processes( + [_classification(victim.pid, Tier.WORKER_AGENT, 1000, "worker7")] + ) + assert len(records) == 1 + assert records[0].agent_name == "worker7" + assert _wait_until_dead(victim, timeout_seconds=5.0) + finally: + if victim.poll() is None: + victim.kill() + victim.wait(timeout=5) diff --git a/libs/memory_watchdog/src/memory_watchdog/system_probe.py b/libs/memory_watchdog/src/memory_watchdog/system_probe.py new file mode 100644 index 000000000..dce9da072 --- /dev/null +++ b/libs/memory_watchdog/src/memory_watchdog/system_probe.py @@ -0,0 +1,144 @@ +import json +import os +import subprocess +from pathlib import Path +from typing import Final + +from loguru import logger + +from memory_watchdog.data_types import MemoryPressure, ProcessInfo, TmuxPane + +_PROC_DIR: Final[Path] = Path("/proc") +_MEMINFO_PATH: Final[Path] = _PROC_DIR / "meminfo" +_TMUX_PANE_FORMAT: Final[str] = "#{session_name}\t#{window_name}\t#{pane_pid}" +# Hard ceiling for the tmux query so a wedged tmux server can't stall a poll. +_TMUX_TIMEOUT_SECONDS: Final[float] = 5.0 + + +def _read_process_info(pid: int) -> ProcessInfo | None: + """Read one process's parent, RSS, and command line from /proc. + + Returns None if the process vanished mid-read (a normal race) or its + /proc entry is unreadable. + """ + status_path = _PROC_DIR / str(pid) / "status" + cmdline_path = _PROC_DIR / str(pid) / "cmdline" + try: + status_text = status_path.read_text() + except OSError: + return None + parent_pid = 0 + resident_kb = 0 + for line in status_text.splitlines(): + if line.startswith("PPid:"): + parent_pid = int(line.split(":", 1)[1].strip()) + elif line.startswith("VmRSS:"): + # Format: "VmRSS:\t 12345 kB" + resident_kb = int(line.split(":", 1)[1].strip().split()[0]) + try: + raw_cmdline = cmdline_path.read_bytes() + except OSError: + raw_cmdline = b"" + command_line = raw_cmdline.replace(b"\x00", b" ").decode("utf-8", "replace").strip() + return ProcessInfo( + pid=pid, + parent_pid=parent_pid, + resident_kb=resident_kb, + command_line=command_line, + ) + + +def read_all_processes() -> list[ProcessInfo]: + """Snapshot every process currently visible in /proc.""" + processes: list[ProcessInfo] = [] + for entry in _PROC_DIR.iterdir(): + if not entry.name.isdigit(): + continue + process = _read_process_info(int(entry.name)) + if process is not None: + processes.append(process) + return processes + + +def read_tmux_panes() -> list[TmuxPane]: + """List every tmux pane across all sessions as (session, window, pane_pid).""" + try: + result = subprocess.run( + ["tmux", "list-panes", "-a", "-F", _TMUX_PANE_FORMAT], + capture_output=True, + text=True, + timeout=_TMUX_TIMEOUT_SECONDS, + check=False, + ) + except (OSError, subprocess.TimeoutExpired) as e: + logger.warning("Failed to list tmux panes: {}", e) + return [] + if result.returncode != 0: + logger.warning( + "tmux list-panes exited {}: {}", result.returncode, result.stderr.strip() + ) + return [] + panes: list[TmuxPane] = [] + for line in result.stdout.splitlines(): + parts = line.split("\t") + if len(parts) != 3: + continue + session_name, window_name, pane_pid_str = parts + if not pane_pid_str.isdigit(): + continue + panes.append( + TmuxPane( + session_name=session_name, + window_name=window_name, + pane_pid=int(pane_pid_str), + ) + ) + return panes + + +def read_memory_pressure() -> MemoryPressure: + """Read MemTotal / MemAvailable from /proc/meminfo.""" + total_kb = 0 + available_kb = 0 + for line in _MEMINFO_PATH.read_text().splitlines(): + if line.startswith("MemTotal:"): + total_kb = int(line.split(":", 1)[1].strip().split()[0]) + elif line.startswith("MemAvailable:"): + available_kb = int(line.split(":", 1)[1].strip().split()[0]) + return MemoryPressure(total_kb=total_kb, available_kb=available_kb) + + +def read_agent_label_sets() -> tuple[frozenset[str], frozenset[str]]: + """Scan the host's agent records for user-created vs agent-created names. + + Reads ``$MNGR_HOST_DIR/agents/*/data.json``; each record carries the agent + ``name`` and a ``labels`` dict. An agent with ``user_created=true`` is + user-created (tier 5); one with ``agent_created=true`` is a worker (tier 7). + Returns (user_created_names, agent_created_names). + """ + host_dir = os.environ.get("MNGR_HOST_DIR", "") + if not host_dir: + return frozenset(), frozenset() + agents_dir = Path(host_dir) / "agents" + if not agents_dir.is_dir(): + return frozenset(), frozenset() + user_created: set[str] = set() + agent_created: set[str] = set() + for agent_dir in agents_dir.iterdir(): + data_path = agent_dir / "data.json" + if not data_path.exists(): + continue + try: + data = json.loads(data_path.read_text()) + except (json.JSONDecodeError, OSError) as e: + logger.warning("Failed to read agent record {}: {}", data_path, e) + continue + name = data.get("name") + labels = data.get("labels") + if not isinstance(name, str) or not isinstance(labels, dict): + continue + if str(labels.get("user_created", "")).lower() == "true": + user_created.add(name) + if str(labels.get("agent_created", "")).lower() == "true": + agent_created.add(name) + return frozenset(user_created), frozenset(agent_created) diff --git a/libs/memory_watchdog/src/memory_watchdog/tagger.py b/libs/memory_watchdog/src/memory_watchdog/tagger.py new file mode 100644 index 000000000..39e56a0c9 --- /dev/null +++ b/libs/memory_watchdog/src/memory_watchdog/tagger.py @@ -0,0 +1,51 @@ +from collections.abc import Sequence +from pathlib import Path +from typing import Final + +from loguru import logger + +from memory_watchdog.data_types import OOM_SCORE_ADJ_BY_TIER, ProcessClassification + +_PROC_DIR: Final[Path] = Path("/proc") + + +def _oom_score_adj_path(pid: int) -> Path: + return _PROC_DIR / str(pid) / "oom_score_adj" + + +def _read_current_adj(pid: int) -> int | None: + try: + return int(_oom_score_adj_path(pid).read_text().strip()) + except (OSError, ValueError): + return None + + +def apply_oom_score_adjustments( + classifications: Sequence[ProcessClassification], +) -> int: + """Write each process's tier-derived oom_score_adj into /proc. + + Only writes when the current value differs, to avoid needless churn. Returns + the number of processes whose adjustment was changed. Per-process failures + (the process exited, or /proc is not writable for it) are skipped -- on + gVisor the kernel ignores in-container oom_score_adj entirely, so failures + here are expected and not worth more than a debug line. + """ + changed_count = 0 + for classification in classifications: + desired_adj = OOM_SCORE_ADJ_BY_TIER[classification.tier] + current_adj = _read_current_adj(classification.pid) + if current_adj == desired_adj: + continue + try: + _oom_score_adj_path(classification.pid).write_text(f"{desired_adj}\n") + except OSError as e: + logger.debug( + "Skipped oom_score_adj write for pid {} ({}): {}", + classification.pid, + classification.label, + e, + ) + continue + changed_count = changed_count + 1 + return changed_count diff --git a/libs/memory_watchdog/src/memory_watchdog/watchdog.py b/libs/memory_watchdog/src/memory_watchdog/watchdog.py new file mode 100644 index 000000000..86963a9d0 --- /dev/null +++ b/libs/memory_watchdog/src/memory_watchdog/watchdog.py @@ -0,0 +1,258 @@ +"""Memory watchdog service. + +Each poll it snapshots the container's process tree, classifies every process +into an OOM-priority tier, and keeps each process's oom_score_adj in line with +its tier. Under sustained memory pressure it sheds individual processes, most +expendable tier first and largest process first within a tier, stopping as soon +as the projected reclaim is enough (agent build/test/browser subprocesses first, +the user's own agents only as a last resort), recording every kill to a ledger +and publishing a status file the UI banner reads. + +Liveness of the watchdog itself, and of every other background service, is owned +by supervisord (see supervisord.conf): supervisord restarts this process if it +dies, and restarts any service it sheds. The watchdog does not supervise other +processes -- it only decides which to shed under pressure. +""" + +import os +import signal +import threading +from collections.abc import Sequence +from datetime import datetime, timezone +from typing import Final + +from imbue.imbue_common.logging import format_nanosecond_iso_timestamp +from imbue.imbue_common.pure import pure +from loguru import logger + +from memory_watchdog.classifier import classify_processes +from memory_watchdog.data_types import ( + ISO_TIMESTAMP_FORMAT, + MemoryPressure, + MemoryStatus, + ProcessClassification, + ShedRecord, +) +from memory_watchdog.ledger import ( + append_shed_records, + read_currently_blocked_services, + write_status, +) +from memory_watchdog.shedder import ( + select_processes_to_shed, + shed_processes, + summarize_recent_sheds, +) +from memory_watchdog.system_probe import ( + read_agent_label_sets, + read_all_processes, + read_memory_pressure, + read_tmux_panes, +) +from memory_watchdog.tagger import apply_oom_score_adjustments + +# How often we re-snapshot and re-tag. This is deliberately short, but NOT to +# "beat" the kernel OOM killer -- that fires synchronously the instant an +# allocation can't be satisfied, so nothing can preempt it. The interval instead +# governs how fresh each process's oom_score_adj is: under runc (lima), +# oom_score_adj is the real lever, so a process that spawns and grows between +# polls is untagged when the kernel picks a victim. A short interval keeps the +# tags current; a poll is cheap (one /proc walk, one tmux list-panes, a few small +# file reads), so the cost is negligible. +POLL_INTERVAL_SECONDS: Final[float] = 3.0 +# Used-fraction at which the shedder arms, and how long it must stay there before +# the first kill. Hysteresis: pressure must be sustained, not a momentary spike. +SHED_PRESSURE_THRESHOLD: Final[float] = 0.90 +SHED_SUSTAINED_SECONDS: Final[float] = 10.0 +# The shedder stops escalating once usage drops back below this (lower than the +# arm threshold so it does not immediately re-arm on noise). +SHED_RELIEF_THRESHOLD: Final[float] = 0.80 +# Processes below this resident size are never shed: killing one frees too little +# to relieve pressure, so it would be pure collateral. This protects the small +# but functionally critical helpers that share the agent-child tier with real +# memory hogs -- transcript streamers, background-task loops, a lead's +# worker-report poll, bare `sleep`s -- which a whole-tier shed used to take down +# alongside the one process actually holding the memory. +MIN_SHEDDABLE_RSS_KB: Final[int] = 10 * 1024 +# The banner shows while usage is above this, or while anything was shed within +# the recent window. +BANNER_PRESSURE_THRESHOLD: Final[float] = 0.80 +RECENT_SHED_WINDOW_SECONDS: Final[float] = 120.0 + +_MNGR_PREFIX_DEFAULT: Final[str] = "mngr-" + + +def _now() -> datetime: + return datetime.now(timezone.utc) + + +def _iso(moment: datetime) -> str: + return format_nanosecond_iso_timestamp(moment) + + +def _services_session_name(mngr_prefix: str) -> str: + """The tmux session that runs supervisord and the background services. + + The watchdog runs as a supervisord child of the services agent (in a minds + workspace, the constant-named ``system-services`` agent), so the services + session is that agent's own tmux session, named ````. + The agent name is exported as MNGR_AGENT_NAME in the agent environment that + supervisord -- and therefore this process -- inherits, so we read it directly + rather than reconstructing it by walking the process tree. + + If MNGR_AGENT_NAME is somehow unset, the result matches no real session, so + the services session's processes are simply classified like any other + agent's (the pane shell stays infrastructure and the main process defaults to + the protected user-agent tier -- never shed early). We log once so the + misconfiguration is visible. + """ + agent_name = os.environ.get("MNGR_AGENT_NAME", "") + if not agent_name: + logger.warning( + "MNGR_AGENT_NAME is not set; cannot identify the services session by name. " + "Its processes will be treated like any other agent's (still protected)." + ) + return f"{mngr_prefix}{agent_name}" + + +@pure +def _build_status( + pressure: MemoryPressure, + recent_records: Sequence[ShedRecord], + blocked_services: Sequence[str], + now_iso: str, +) -> MemoryStatus: + is_under_pressure = ( + pressure.used_fraction >= BANNER_PRESSURE_THRESHOLD or len(recent_records) > 0 + ) + return MemoryStatus( + timestamp=now_iso, + used_fraction=pressure.used_fraction, + total_kb=pressure.total_kb, + available_kb=pressure.available_kb, + pressure_threshold_fraction=SHED_PRESSURE_THRESHOLD, + is_under_pressure=is_under_pressure, + recently_shed=tuple(summarize_recent_sheds(recent_records)), + blocked_services=tuple(blocked_services), + ) + + +def _shed_until_relieved( + classifications: Sequence[ProcessClassification], + pressure: MemoryPressure, +) -> list[ShedRecord]: + """Shed individual processes, most expendable and largest first, stopping + once the projected reclaim is enough. + + Which processes to shed is decided up front by ``select_processes_to_shed``, + which orders candidates by tier shed-priority then resident size and projects + how much each would free, instead of re-reading /proc between kills. That + avoids the over-shed bug where the kernel had not yet reclaimed a just-killed + process's pages, so an immediate re-read still showed high usage and the + shedder escalated into the user's own agents even though a cheaper kill had + already freed enough. Selecting per process (rather than whole tiers) also + means a single large hog is shed on its own, sparing the small helpers that + share its tier. The next poll re-reads real usage and sheds again if the + estimate fell short. + """ + targets = select_processes_to_shed( + classifications, + pressure.available_kb, + pressure.total_kb, + SHED_RELIEF_THRESHOLD, + MIN_SHEDDABLE_RSS_KB, + ) + records = shed_processes(targets) + if records: + logger.warning("Shed {} process(es) to relieve memory pressure", len(records)) + append_shed_records(records) + return records + + +def _prune_recent_records( + recent_records: Sequence[ShedRecord], now: datetime +) -> list[ShedRecord]: + """Drop shed records older than the recent window (used for the banner).""" + cutoff = now.timestamp() - RECENT_SHED_WINDOW_SECONDS + kept: list[ShedRecord] = [] + for record in recent_records: + try: + record_time = datetime.strptime( + record.timestamp, ISO_TIMESTAMP_FORMAT + ).replace(tzinfo=timezone.utc) + except ValueError: + continue + if record_time.timestamp() >= cutoff: + kept.append(record) + return kept + + +def main() -> None: + mngr_prefix = os.environ.get("MNGR_PREFIX", _MNGR_PREFIX_DEFAULT) + # The services session is fixed for the life of the process (it comes from + # the inherited agent environment, which never changes), so resolve it once. + services_session_name = _services_session_name(mngr_prefix) + logger.info( + "Started memory watchdog (agent prefix: {}, services session: {})", + mngr_prefix, + services_session_name, + ) + + stop_event = threading.Event() + + def _handle_signal(signum: int, frame: object) -> None: + stop_event.set() + + signal.signal(signal.SIGTERM, _handle_signal) + signal.signal(signal.SIGINT, _handle_signal) + + # Loop state. + pressure_over_threshold_since: float | None = None + recent_records: list[ShedRecord] = [] + + while not stop_event.is_set(): + now = _now() + + # Snapshot the system and classify every process. + processes = read_all_processes() + panes = read_tmux_panes() + user_created_names, agent_created_names = read_agent_label_sets() + classifications = classify_processes( + processes=processes, + panes=panes, + services_session_name=services_session_name, + mngr_prefix=mngr_prefix, + user_created_agent_names=user_created_names, + agent_created_agent_names=agent_created_names, + ) + + # Keep oom_score_adj in sync (the kernel-level last resort under runc). + apply_oom_score_adjustments(classifications) + + # Decide whether sustained pressure warrants shedding. + pressure = read_memory_pressure() + if pressure.used_fraction >= SHED_PRESSURE_THRESHOLD: + if pressure_over_threshold_since is None: + pressure_over_threshold_since = now.timestamp() + sustained_for = now.timestamp() - pressure_over_threshold_since + if sustained_for >= SHED_SUSTAINED_SECONDS: + new_records = _shed_until_relieved(classifications, pressure) + recent_records.extend(new_records) + pressure_over_threshold_since = None + else: + pressure_over_threshold_since = None + + # Publish the status the UI banner reads. + recent_records = _prune_recent_records(recent_records, now) + blocked_services = read_currently_blocked_services() + write_status( + _build_status(pressure, recent_records, blocked_services, _iso(now)) + ) + + stop_event.wait(POLL_INTERVAL_SECONDS) + + logger.info("Memory watchdog stopped") + + +if __name__ == "__main__": + main() diff --git a/libs/memory_watchdog/src/memory_watchdog/watchdog_test.py b/libs/memory_watchdog/src/memory_watchdog/watchdog_test.py new file mode 100644 index 000000000..fcb46ef35 --- /dev/null +++ b/libs/memory_watchdog/src/memory_watchdog/watchdog_test.py @@ -0,0 +1,58 @@ +import pytest + +from memory_watchdog.data_types import ( + MemoryPressure, + ShedRecord, + Tier, +) +from memory_watchdog.watchdog import _build_status, _services_session_name + + +def _pressure(used_fraction: float) -> MemoryPressure: + total = 1_000_000 + return MemoryPressure( + total_kb=total, available_kb=int(total * (1.0 - used_fraction)) + ) + + +def test_status_under_pressure_when_usage_high() -> None: + status = _build_status(_pressure(0.95), (), (), "2026-06-12T10:00:00.000000000Z") + assert status.is_under_pressure is True + + +def test_status_under_pressure_when_recent_sheds_even_if_usage_low() -> None: + record = ShedRecord( + timestamp="2026-06-12T10:00:00.000000000Z", + tier=Tier.AGENT_CHILD, + tier_rank=8, + label="pytest", + pid=1, + resident_kb=1000, + agent_name=None, + ) + status = _build_status( + _pressure(0.10), (record,), (), "2026-06-12T10:00:00.000000000Z" + ) + assert status.is_under_pressure is True + assert status.recently_shed[0].label == "pytest" + + +def test_status_not_under_pressure_when_calm() -> None: + status = _build_status(_pressure(0.10), (), (), "2026-06-12T10:00:00.000000000Z") + assert status.is_under_pressure is False + + +def test_services_session_name_from_agent_env(monkeypatch: pytest.MonkeyPatch) -> None: + # The services session is the services agent's own tmux session: + # . + monkeypatch.setenv("MNGR_AGENT_NAME", "system-services") + assert _services_session_name("mngr-") == "mngr-system-services" + + +def test_services_session_name_when_agent_name_missing( + monkeypatch: pytest.MonkeyPatch, +) -> None: + # With no agent name, the result is just the prefix -- it matches no real + # session, so the services session is never mistaken for a sheddable one. + monkeypatch.delenv("MNGR_AGENT_NAME", raising=False) + assert _services_session_name("mngr-") == "mngr-" diff --git a/libs/memory_watchdog/test_memory_watchdog_ratchets.py b/libs/memory_watchdog/test_memory_watchdog_ratchets.py new file mode 100644 index 000000000..769a34145 --- /dev/null +++ b/libs/memory_watchdog/test_memory_watchdog_ratchets.py @@ -0,0 +1,74 @@ +from pathlib import Path + +from imbue.imbue_common.ratchet_testing import standard_ratchet_checks as rc +from inline_snapshot import snapshot + +_DIR = Path(__file__).parent + + +# --- Code safety --- + + +def test_prevent_todos() -> None: + rc.check_todos(_DIR, snapshot(0)) + + +def test_prevent_exec_usage() -> None: + rc.check_exec(_DIR, snapshot(0)) + + +def test_prevent_eval_usage() -> None: + rc.check_eval(_DIR, snapshot(0)) + + +def test_prevent_while_true() -> None: + rc.check_while_true(_DIR, snapshot(0)) + + +def test_prevent_time_sleep() -> None: + rc.check_time_sleep(_DIR, snapshot(0)) + + +def test_prevent_global_keyword() -> None: + rc.check_global_keyword(_DIR, snapshot(0)) + + +def test_prevent_bare_print() -> None: + rc.check_bare_print(_DIR, snapshot(0)) + + +# --- Exception handling --- + + +def test_prevent_bare_except() -> None: + rc.check_bare_except(_DIR, snapshot(0)) + + +def test_prevent_broad_exception_catch() -> None: + rc.check_broad_exception_catch(_DIR, snapshot(0)) + + +def test_prevent_builtin_exception_raises() -> None: + rc.check_builtin_exception_raises(_DIR, snapshot(0)) + + +# --- Import style --- + + +def test_prevent_inline_imports() -> None: + rc.check_inline_imports(_DIR, snapshot(0)) + + +def test_prevent_relative_imports() -> None: + rc.check_relative_imports(_DIR, snapshot(0)) + + +# --- Banned libraries and patterns --- + + +def test_prevent_asyncio_import() -> None: + rc.check_asyncio_import(_DIR, snapshot(0)) + + +def test_prevent_dataclasses_import() -> None: + rc.check_dataclasses_import(_DIR, snapshot(0)) diff --git a/pyproject.toml b/pyproject.toml index 91d3ef988..bc38f220a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -11,6 +11,7 @@ dependencies = [ "web-server", "runtime-backup", "host-backup", + "memory-watchdog", "tk-command-parsing", "imbue-common", "tomlkit>=0.12", @@ -51,7 +52,7 @@ dev = [ package = false [tool.uv.workspace] -members = ["libs/telegram_bot", "libs/bootstrap", "libs/cloudflare_tunnel", "libs/app_watcher", "libs/web_server", "libs/runtime_backup", "libs/host_backup", "libs/mngr_cli_contract", "libs/tk_command_parsing", "apps/system_interface"] +members = ["libs/telegram_bot", "libs/bootstrap", "libs/cloudflare_tunnel", "libs/app_watcher", "libs/web_server", "libs/runtime_backup", "libs/host_backup", "libs/memory_watchdog", "libs/mngr_cli_contract", "libs/tk_command_parsing", "apps/system_interface"] [tool.uv.sources] telegram-bot = { workspace = true } @@ -61,6 +62,7 @@ app-watcher = { workspace = true } web-server = { workspace = true } runtime-backup = { workspace = true } host-backup = { workspace = true } +memory-watchdog = { workspace = true } mngr-cli-contract = { workspace = true } tk-command-parsing = { workspace = true } imbue-common = { path = "vendor/mngr/libs/imbue_common", editable = true } diff --git a/scripts/claude_shed_notice_hook.py b/scripts/claude_shed_notice_hook.py new file mode 100755 index 000000000..26f4ca7bf --- /dev/null +++ b/scripts/claude_shed_notice_hook.py @@ -0,0 +1,122 @@ +#!/usr/bin/env python3 +"""SessionStart hook: tell a revived agent it was stopped for memory pressure. + +When the memory watchdog sheds an agent's own process (tier 5/7), it records the +kill in the shed ledger. The agent stays down until the user next messages it, +which restarts the claude process and fires this hook. The hook looks for shed +records naming this agent that have not yet been delivered, prints a notice +(SessionStart stdout becomes session context), and appends a delivery marker so +the same notice is not injected again. + +Self-contained (stdlib only) so it runs in the agent's plain claude environment +without importing the memory_watchdog package. +""" + +import json +import os +import sys +from datetime import datetime, timezone +from pathlib import Path + +# This hook runs under a bare `python3` (see .claude/settings.json), so none of +# memory_watchdog's third-party deps (loguru, pydantic) are importable. The +# ledger *layout*, however, lives in memory_watchdog.paths, which is deliberately +# stdlib-only -- so we put that package's source dir on sys.path and import the +# shared path helper rather than duplicating the layout and risking drift. The +# MEMORY_WATCHDOG_RUNTIME_DIR override and work-dir base are thus honored +# identically by the writer and this reader. +sys.path.insert( + 0, str(Path(__file__).resolve().parents[1] / "libs" / "memory_watchdog" / "src") +) + +from memory_watchdog.paths import shed_ledger_path + +_RECORD_TYPE_PROCESS_SHED = "process_shed" +_RECORD_TYPE_NOTICE_DELIVERED = "notice_delivered" + + +def _read_ledger_records(ledger_path: Path) -> list[dict]: + if not ledger_path.exists(): + return [] + records: list[dict] = [] + for line in ledger_path.read_text().splitlines(): + if not line.strip(): + continue + try: + records.append(json.loads(line)) + except json.JSONDecodeError: + continue + return records + + +def _latest_delivered_timestamp(records: list[dict], agent_name: str) -> str: + """Highest up_to_timestamp already delivered to this agent (or empty).""" + delivered = [ + str(r.get("up_to_timestamp", "")) + for r in records + if r.get("type") == _RECORD_TYPE_NOTICE_DELIVERED + and r.get("agent_name") == agent_name + ] + return max(delivered) if delivered else "" + + +def _pending_shed_timestamps( + records: list[dict], agent_name: str, after_timestamp: str +) -> list[str]: + """Timestamps of this agent's own shed records newer than the last delivery.""" + pending: list[str] = [] + for record in records: + if record.get("type") != _RECORD_TYPE_PROCESS_SHED: + continue + if record.get("agent_name") != agent_name: + continue + timestamp = str(record.get("timestamp", "")) + if timestamp and timestamp > after_timestamp: + pending.append(timestamp) + return pending + + +def _append_delivery_marker( + ledger_path: Path, agent_name: str, up_to_timestamp: str +) -> None: + marker = { + "timestamp": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%f000Z"), + "type": _RECORD_TYPE_NOTICE_DELIVERED, + "agent_name": agent_name, + "up_to_timestamp": up_to_timestamp, + } + ledger_path.parent.mkdir(parents=True, exist_ok=True) + with open(ledger_path, "a") as ledger_file: + ledger_file.write(json.dumps(marker) + "\n") + + +def main() -> None: + agent_name = os.environ.get("MNGR_AGENT_NAME", "") + if not agent_name: + return + ledger_path = shed_ledger_path() + records = _read_ledger_records(ledger_path) + if not records: + return + last_delivered = _latest_delivered_timestamp(records, agent_name) + pending = _pending_shed_timestamps(records, agent_name, last_delivered) + if not pending: + return + + print( + "Note: you were previously stopped to relieve a memory-pressure " + "(out-of-memory) situation in this workspace. Any background tasks you " + "had running -- for example polling loops waiting on another agent or an " + "external event -- were cancelled and were NOT automatically restarted. " + "If you were in the middle of multi-step work, re-check the current state " + "before continuing rather than assuming your last action completed. If you " + "were running a memory-intensive task, do not simply re-run it as-is -- it " + "will likely be stopped again; first find a way to do it using less memory " + "(smaller batches, streaming instead of loading everything at once, " + "releasing data you no longer need), and only retry if you can." + ) + _append_delivery_marker(ledger_path, agent_name, max(pending)) + + +if __name__ == "__main__": + main() diff --git a/supervisord.conf b/supervisord.conf index 8ea06de1f..ded645c76 100644 --- a/supervisord.conf +++ b/supervisord.conf @@ -145,6 +145,27 @@ stderr_logfile_maxbytes=10MB stdout_logfile_backups=3 stderr_logfile_backups=3 +# Classifies the process tree into OOM-priority tiers, keeps each process's +# oom_score_adj in sync, and sheds individual processes (most-expendable tier +# first, largest first within a tier) under sustained memory pressure so the +# container degrades gracefully instead of at the kernel's whim. supervisord +# restarts it if it dies. See +# libs/memory_watchdog/README.md. +[program:memory-watchdog] +command=uv run memory-watchdog +directory=/mngr/code +autostart=true +autorestart=true +startretries=1000000 +stopasgroup=true +killasgroup=true +stdout_logfile=/var/log/supervisor/memory-watchdog-stdout.log +stderr_logfile=/var/log/supervisor/memory-watchdog-stderr.log +stdout_logfile_maxbytes=10MB +stderr_logfile_maxbytes=10MB +stdout_logfile_backups=3 +stderr_logfile_backups=3 + # Terminal-over-web (ttyd). Registers its port via forward_port.py, then execs # ttyd. Previously an extra_window; now a supervised service like the rest. [program:terminal] diff --git a/uv.lock b/uv.lock index a72e966b8..71e5df505 100644 --- a/uv.lock +++ b/uv.lock @@ -13,6 +13,7 @@ members = [ "cloudflare-tunnel", "forever-claude-template", "host-backup", + "memory-watchdog", "mngr-cli-contract", "runtime-backup", "system-interface", @@ -818,6 +819,7 @@ dependencies = [ { name = "host-backup" }, { name = "imbue-common" }, { name = "litellm" }, + { name = "memory-watchdog" }, { name = "playwright" }, { name = "runtime-backup" }, { name = "telegram-bot" }, @@ -849,6 +851,7 @@ requires-dist = [ { name = "host-backup", editable = "libs/host_backup" }, { name = "imbue-common", editable = "vendor/mngr/libs/imbue_common" }, { name = "litellm", specifier = ">=1.88.1" }, + { name = "memory-watchdog", editable = "libs/memory_watchdog" }, { name = "playwright", specifier = "==1.58.0" }, { name = "runtime-backup", editable = "libs/runtime_backup" }, { name = "telegram-bot", editable = "libs/telegram_bot" }, @@ -1718,6 +1721,21 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/b3/38/89ba8ad64ae25be8de66a6d463314cf1eb366222074cfda9ee839c56a4b4/mdurl-0.1.2-py3-none-any.whl", hash = "sha256:84008a41e51615a49fc9966191ff91509e3c40b939176e643fd50a5c2196b8f8", size = 9979, upload-time = "2022-08-14T12:40:09.779Z" }, ] +[[package]] +name = "memory-watchdog" +version = "0.1.0" +source = { editable = "libs/memory_watchdog" } +dependencies = [ + { name = "imbue-common" }, + { name = "loguru" }, +] + +[package.metadata] +requires-dist = [ + { name = "imbue-common", editable = "vendor/mngr/libs/imbue_common" }, + { name = "loguru", specifier = ">=0.7.0" }, +] + [[package]] name = "mngr-cli-contract" version = "0.1.0" @@ -2882,6 +2900,7 @@ dependencies = [ { name = "imbue-common" }, { name = "imbue-mngr" }, { name = "imbue-mngr-claude" }, + { name = "memory-watchdog" }, { name = "pexpect" }, { name = "pluggy" }, { name = "pydantic-settings" }, @@ -2915,6 +2934,7 @@ requires-dist = [ { name = "imbue-common", editable = "vendor/mngr/libs/imbue_common" }, { name = "imbue-mngr", editable = "vendor/mngr/libs/mngr" }, { name = "imbue-mngr-claude", editable = "vendor/mngr/libs/mngr_claude" }, + { name = "memory-watchdog", editable = "libs/memory_watchdog" }, { name = "pexpect", specifier = ">=4.9" }, { name = "pluggy", specifier = ">=1.5" }, { name = "pydantic-settings", specifier = ">=2.13.1" },