Skip to content
2 changes: 1 addition & 1 deletion docs/public/telemetry.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ Every event property passes through a strict whitelist scrubber — any key not
| `chroma_available` | `true` | Whether the vector-search backend was reachable for a search (false = fell back to full-text search) |
| `fallback_reason` | `none` | Why a search fell back from vector search: none / chroma_connection / chroma_error / chroma_not_initialized — a closed enum, never an error message |
| `invalid_output_class` | `idle` | Coarse class of an unusable compression output: xml / idle / prose / poisoned (`xml` = looked like the expected format but failed to parse) — never the output itself |
| `consecutive_invalid_outputs` | `3` | How many unusable outputs occurred in a row before recovery |
| `respawn_threshold` | `3` | The unusable-output burst threshold that triggered a session respawn |
| `respawn_triggered` | `true` | Whether the compression agent was restarted after repeated unusable output |
| `abort_reason` | `idle` | Why a compression session was aborted: idle / shutdown / overflow / restart_guard / quota / poisoned / none — a closed enum |
| `previous_shutdown` | `clean` | How the previous worker run ended, detected at startup: crash / clean / unknown |
Expand Down
2 changes: 1 addition & 1 deletion src/npx-cli/commands/telemetry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ const COLLECTED_FIELDS = [
'chroma_available whether vector search was reachable for a search',
'fallback_reason none / chroma_connection / chroma_error / chroma_not_initialized',
'invalid_output_class xml / idle / prose / poisoned (never the output)',
'consecutive_invalid_outputs unusable outputs in a row before recovery',
'respawn_threshold unusable-output threshold that triggered a respawn',
'respawn_triggered whether the compression agent was restarted',
'abort_reason idle / shutdown / overflow / restart_guard / quota / poisoned / none',
'previous_shutdown crash / clean / unknown (detected at worker start)',
Expand Down
2 changes: 1 addition & 1 deletion src/services/telemetry/scrub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ export const ALLOWED_PROPERTY_KEYS: Set<string> = new Set([
// idle | shutdown | overflow | restart_guard | quota | poisoned | none).
// Never model output, never raw abort strings.
'invalid_output_class',
'consecutive_invalid_outputs',
'respawn_threshold',
'respawn_triggered',
'abort_reason',
// Worker lifecycle health — previous_shutdown (crash | clean | unknown),
Expand Down
12 changes: 7 additions & 5 deletions src/services/worker-types.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@

import type { Response } from 'express';
import type { FailureWindow } from './worker/agents/respawn-policy.js';

export interface ConversationMessage {
role: 'user' | 'assistant';
Expand All @@ -26,12 +27,13 @@ export interface ActiveSession {
currentProvider: 'claude' | 'gemini' | 'openrouter' | null;
consecutiveRestarts: number;
/**
* Consecutive non-XML (idle/prose/poisoned) observer outputs. Reset to 0 on a
* valid parse. When it reaches the recovery threshold the SDK session is
* killed and respawned so a poisoned session can't wedge the pipeline at zero
* (plan-11, #2485).
* Rolling time-window of non-exempt invalid observer outputs. When `badCount`
* reaches the configured threshold within `windowMs`, the SDK session is
* killed and respawned (time-windowed burst, systemd/OTP model). Reset to a
* fresh window on a valid parse, session create, and respawn. Replaces the
* old unbounded consecutive counter (#3032).
*/
consecutiveInvalidOutputs: number;
invalidOutputWindow: FailureWindow;
forceInit?: boolean;
idleTimedOut?: boolean;
lastGeneratorActivity: number;
Expand Down
7 changes: 4 additions & 3 deletions src/services/worker/SessionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { SessionMessageBuffer } from './SessionMessageBuffer.js';
import { getSdkProcessForSession, ensureSdkProcessExit } from '../../supervisor/process-registry.js';
import { getSupervisor } from '../../supervisor/index.js';
import { telemetryBuffer } from '../telemetry/buffer.js';
import { freshWindow } from './agents/respawn-policy.js';

export class SessionManager {
private dbManager: DatabaseManager;
Expand Down Expand Up @@ -123,7 +124,7 @@ export class SessionManager {
conversationHistory: [], // Initialize empty - will be populated by agents
currentProvider: null, // Will be set when generator starts
consecutiveRestarts: 0,
consecutiveInvalidOutputs: 0,
invalidOutputWindow: freshWindow(),
lastGeneratorActivity: Date.now(), // Initialize for stale detection (Issue #1099)
pendingAgentId: null, // Subagent identity carried from the most recent claimed message
pendingAgentType: null
Expand Down Expand Up @@ -259,15 +260,15 @@ export class SessionManager {
logger.warn('SESSION', 'Respawning poisoned SDK session, preserving pending messages', {
sessionId: sessionDbId,
preservedPending,
consecutiveInvalidOutputs: session.consecutiveInvalidOutputs,
badCount: session.invalidOutputWindow.badCount,
});

// Re-yield anything claimed-but-unconfirmed so the fresh generator picks it up.
await this.resetProcessingToPending(sessionDbId);

// Drop stale conversation context: the poisoned turns are what wedged it.
session.conversationHistory = [];
session.consecutiveInvalidOutputs = 0;
session.invalidOutputWindow = freshWindow();
session.memorySessionId = null; // force a fresh SDK session id on respawn

session.abortReason = 'poisoned';
Expand Down
52 changes: 18 additions & 34 deletions src/services/worker/agents/ResponseProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,10 @@ import type { WorkerRef, StorageResult } from './types.js';
import { broadcastObservation, broadcastSummary } from './ObservationBroadcaster.js';
import { telemetryBuffer } from '../../telemetry/buffer.js';
import { instrument } from '../../telemetry/instrument.js';
import { evaluateRespawn, getRespawnPolicy, freshWindow, DEFAULT_RESPAWN_THRESHOLD } from './respawn-policy.js';

/**
* Consecutive non-XML observer outputs tolerated before we kill and respawn the
* SDK session (plan-11, #2485). Idle and prose both count; poisoned triggers an
* immediate respawn regardless of the count.
*/
export const INVALID_OUTPUT_RESPAWN_THRESHOLD = 3;
/** @deprecated default fallback; real value resolved via getRespawnPolicy(). */
export const INVALID_OUTPUT_RESPAWN_THRESHOLD = DEFAULT_RESPAWN_THRESHOLD;

export async function processAgentResponse(
text: string,
Expand Down Expand Up @@ -53,43 +50,33 @@ export async function processAgentResponse(
'claude';

if (!parsed.valid) {
// Classify the non-XML output so a dropped batch is VISIBLE, not silent
// (plan-11, #2485). Attach a preview for diagnostics.
const outputClass = classifyObserverOutput(text);
const preview = previewOutput(text);

session.consecutiveInvalidOutputs = (session.consecutiveInvalidOutputs ?? 0) + 1;
const policy = getRespawnPolicy();
const { window, shouldRespawn } = evaluateRespawn(
outputClass, session.invalidOutputWindow, policy, processingStartedAt,
);
session.invalidOutputWindow = window;

logger.warn('PARSER', `${agentName} returned non-XML ${outputClass} response — ignoring queued batch`, {
sessionId: session.sessionDbId,
outputClass,
preview,
consecutiveInvalidOutputs: session.consecutiveInvalidOutputs,
badCount: window.badCount,
threshold: policy.threshold,
windowMs: policy.windowMs,
});

// Recover from poison (plan-11, #2485): a poisoned closure string means the
// SDK session is wedged and will keep emitting garbage — respawn immediately.
// For idle/prose, only respawn after N consecutive invalid outputs so we
// don't churn the session on benign single-batch misses.
const mustRespawn =
outputClass === 'poisoned' ||
session.consecutiveInvalidOutputs >= INVALID_OUTPUT_RESPAWN_THRESHOLD;

if (mustRespawn) {
// Single instrumentation call: the local poison/respawn error line (full
// fidelity) and the scrubbed session_compressed rollup are one logical
// event. Respawn-gated telemetry ONLY (never per invalid output —
// volume). Closed enums and counts; the raw model output never leaves
// the box.
if (shouldRespawn) {
instrument(
'SESSION',
'error',
`${agentName} session poisoned — killing and respawning, pending messages preserved`,
{
sessionId: session.sessionDbId,
outputClass,
consecutiveInvalidOutputs: session.consecutiveInvalidOutputs,
threshold: INVALID_OUTPUT_RESPAWN_THRESHOLD,
threshold: policy.threshold,
windowMs: policy.windowMs,
},
{
event: 'session_compressed',
Expand All @@ -98,8 +85,9 @@ export async function processAgentResponse(
props: {
outcome: 'invalid_output',
invalid_output_class: outputClass,
consecutive_invalid_outputs: session.consecutiveInvalidOutputs,
respawn_triggered: true,
// Replaces the old `consecutive_invalid_outputs` dimension.
respawn_threshold: policy.threshold,
provider: providerName,
model: typeof modelId === 'string' && modelId ? modelId : 'unknown',
ide: session.platformSource,
Expand All @@ -111,17 +99,13 @@ export async function processAgentResponse(
return;
}

// Plain-text skip responses are intentionally ignored. Re-queueing them
// creates an observer loop where the same low-signal batch is retried
// until the restart guard fires or the provider quota is exhausted.
await sessionManager.confirmClaimedMessages(session.sessionDbId);
session.earliestPendingTimestamp = null;
return;
}

// Valid parse — clear the invalid-output counter so transient misses don't
// accumulate toward a respawn across a healthy session.
session.consecutiveInvalidOutputs = 0;
// Valid parse — reset the failure window so transient misses don't accumulate.
session.invalidOutputWindow = freshWindow();

if (!session.memorySessionId) {
logger.warn('SDK', 'memorySessionId not yet captured; deferring storage until next round', {
Expand Down
168 changes: 168 additions & 0 deletions src/services/worker/agents/respawn-policy.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
// src/services/worker/agents/respawn-policy.ts
import { join } from 'path';
import { logger } from '../../../utils/logger.js';
import type { ObserverOutputClass } from '../../../sdk/output-classifier.js';
import { SettingsDefaultsManager } from '../../../shared/SettingsDefaultsManager.js';

/**
* Output classes that MAY be configured exempt from the respawn window.
* 'xml' is valid (never reaches the counter); 'poisoned' is always-immediate
* and is intentionally NOT exemptable.
*/
export const EXEMPTABLE_CLASSES = ['idle', 'prose'] as const satisfies readonly ObserverOutputClass[];
export type ExemptableClass = typeof EXEMPTABLE_CLASSES[number];

const EXEMPTABLE_SET = new Set<string>(EXEMPTABLE_CLASSES);

/** Sole string→ExemptableClass narrowing site (parse-don't-validate). */
export function isExemptableClass(x: string): x is ExemptableClass {
return EXEMPTABLE_SET.has(x);
}

// Bounds + defaults (no-hardcoded-magic-numbers). Defaults mirror SettingsDefaults.
export const RESPAWN_THRESHOLD_BOUNDS = { min: 1, max: 100 } as const;
export const RESPAWN_WINDOW_MS_BOUNDS = { min: 1000, max: 3_600_000 } as const;
export const DEFAULT_RESPAWN_THRESHOLD = 3; // was INVALID_OUTPUT_RESPAWN_THRESHOLD (#2485)
export const DEFAULT_RESPAWN_WINDOW_MS = 60_000; // 1 min — trips ~1-bad/15s wedge in ~45s; lets blips decay
export const DEFAULT_EXEMPT_CLASSES: readonly ExemptableClass[] = ['idle'];

export interface RespawnPolicy {
readonly exemptClasses: ReadonlySet<ExemptableClass>;
readonly threshold: number;
readonly windowMs: number;
}

export interface FailureWindow {
windowStart: number;
badCount: number;
}

export function freshWindow(): FailureWindow {
// windowStart: 0 is a placeholder; it is never read for an expiry decision —
// the first non-exempt output re-anchors it via the `badCount === 0` branch in evaluateRespawn.
return { windowStart: 0, badCount: 0 };
}

function parseBoundedInt(
raw: string,
bounds: { min: number; max: number },
fallback: number,
name: string,
): number {
const n = parseInt(raw, 10);
if (Number.isNaN(n) || n < bounds.min || n > bounds.max) {
logger.warn('SYSTEM', `Invalid ${name}, using default`, {
value: raw, min: bounds.min, max: bounds.max, fallback,
});
return fallback;
}
return n;
}

export function parseRespawnPolicy(
exemptRaw: string,
thresholdRaw: string,
windowMsRaw: string,
): RespawnPolicy {
const tokens = exemptRaw.split(',').map(t => t.trim()).filter(Boolean);
const exemptClasses = new Set<ExemptableClass>();
for (const tok of tokens) {
if (isExemptableClass(tok)) {
exemptClasses.add(tok);
} else {
logger.warn('SYSTEM', `Unknown exempt output class "${tok}" — ignored`, { value: tok });
}
}
// `exemptClasses` is typed ReadonlySet (compile-time immutability). We do not
// Object.freeze it: freezing a Set blocks property writes but NOT Set.add/
// delete (they mutate internal slots), so it would give false runtime
// confidence. For this internal, freshly-constructed value the structural
// ReadonlySet guarantee is sufficient and intentional (design §9).
return {
exemptClasses: exemptClasses.size > 0
? exemptClasses
: new Set<ExemptableClass>(DEFAULT_EXEMPT_CLASSES),
threshold: parseBoundedInt(
thresholdRaw, RESPAWN_THRESHOLD_BOUNDS, DEFAULT_RESPAWN_THRESHOLD,
'CLAUDE_MEM_INVALID_OUTPUT_RESPAWN_THRESHOLD',
),
windowMs: parseBoundedInt(
windowMsRaw, RESPAWN_WINDOW_MS_BOUNDS, DEFAULT_RESPAWN_WINDOW_MS,
'CLAUDE_MEM_INVALID_OUTPUT_WINDOW_MS',
),
};
}

function assertNeverClass(x: never): never {
throw new Error(`Unhandled ObserverOutputClass: ${String(x)}`);
}

/**
* Decide whether an observer output should trigger a session respawn,
* using a time-windowed burst counter (systemd StartLimitBurst / Erlang OTP
* intensity-period). `poisoned` → immediate; exempt classes → invisible; all
* other classes accumulate within `windowMs` until `threshold`. Pure; `now` is
* injected for testability. The window is anchored at the first bad output
* (badCount===0) and re-anchored when it expires.
*/
export function evaluateRespawn(
cls: ObserverOutputClass,
window: FailureWindow,
policy: RespawnPolicy,
now: number,
): { window: FailureWindow; shouldRespawn: boolean } {
switch (cls) {
case 'poisoned':
return { window: freshWindow(), shouldRespawn: true };
case 'idle':
case 'prose':
case 'xml': {
// Exempt classes are invisible to the window. ('xml' is never exemptable,
// so an xml-tagged-but-unparseable output still counts — preserving the
// prior recovery behavior for malformed observation blocks.)
if (isExemptableClass(cls) && policy.exemptClasses.has(cls)) {
return { window, shouldRespawn: false };
}
const startFresh = window.badCount === 0 || (now - window.windowStart > policy.windowMs);
const base = startFresh ? { windowStart: now, badCount: 0 } : window;
const badCount = base.badCount + 1;
const shouldRespawn = badCount >= policy.threshold;
return {
window: shouldRespawn ? freshWindow() : { windowStart: base.windowStart, badCount },
shouldRespawn,
};
}
default:
return assertNeverClass(cls);
}
}

let cachedPolicy: RespawnPolicy | null = null;

function respawnSettingsPath(): string {
// Call-time resolution (mirrors worker-utils.getWorkerSettingsPath) so test
// isolation via CLAUDE_MEM_DATA_DIR and runtime env overrides both work.
return join(SettingsDefaultsManager.get('CLAUDE_MEM_DATA_DIR'), 'settings.json');
}

/**
* Resolve the respawn policy once from settings (env → settings.json, mirroring
* worker-utils' settings-backed timeout). Cached; call clearRespawnPolicyCache()
* in tests or after a settings change.
*/
export function getRespawnPolicy(): RespawnPolicy {
if (cachedPolicy !== null) {
return cachedPolicy;
}
const s = SettingsDefaultsManager.loadFromFile(respawnSettingsPath());
cachedPolicy = parseRespawnPolicy(
s.CLAUDE_MEM_INVALID_OUTPUT_EXEMPT_CLASSES,
s.CLAUDE_MEM_INVALID_OUTPUT_RESPAWN_THRESHOLD,
s.CLAUDE_MEM_INVALID_OUTPUT_WINDOW_MS,
);
return cachedPolicy;
}

export function clearRespawnPolicyCache(): void {
cachedPolicy = null;
}
6 changes: 6 additions & 0 deletions src/shared/SettingsDefaultsManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ export interface SettingsDefaults {
CLAUDE_MEM_SERVER_BETA_URL: string;
CLAUDE_MEM_SERVER_BETA_API_KEY: string;
CLAUDE_MEM_SERVER_BETA_PROJECT_ID: string;
CLAUDE_MEM_INVALID_OUTPUT_EXEMPT_CLASSES: string;
CLAUDE_MEM_INVALID_OUTPUT_RESPAWN_THRESHOLD: string;
CLAUDE_MEM_INVALID_OUTPUT_WINDOW_MS: string;
}

export class SettingsDefaultsManager {
Expand Down Expand Up @@ -163,6 +166,9 @@ export class SettingsDefaultsManager {
CLAUDE_MEM_SERVER_BETA_URL: `http://127.0.0.1:${process.env.CLAUDE_MEM_SERVER_PORT ?? String(37877 + ((process.getuid?.() ?? 77) % 100))}`, // Default server-beta runtime URL — UID-derived for multi-account isolation
CLAUDE_MEM_SERVER_BETA_API_KEY: '', // Local hook API key, populated by installer when runtime=server-beta
CLAUDE_MEM_SERVER_BETA_PROJECT_ID: '', // Default Postgres project_id used by hooks when runtime=server-beta
CLAUDE_MEM_INVALID_OUTPUT_EXEMPT_CLASSES: 'idle', // #3032 — observer output classes exempt from the respawn window; 'idle' is benign ("nothing to say"). 'poisoned' is never exemptable.
CLAUDE_MEM_INVALID_OUTPUT_RESPAWN_THRESHOLD: '3', // N non-exempt bad outputs within the window before kill+respawn. Bounds 1..=100.
CLAUDE_MEM_INVALID_OUTPUT_WINDOW_MS: '60000', // Rolling burst window (systemd StartLimitIntervalSec / OTP period). Bounds 1000..=3_600_000.
};

static getAllDefaults(): SettingsDefaults {
Expand Down
Loading