Skip to content

Commit a56302e

Browse files
chernistryclaude
andcommitted
fix: agent lifecycle reliability — stale claim, stderr, heartbeat, worktree
Three root causes for "agent died without output" failures: 1. Stale claim used task.created_at instead of actual claim time — tasks created hours ago were immediately marked stale when freshly claimed. Added claimed_at field to Task model, set on claim, used for timeout. Default timeout increased from 10m to 15m. 2. Agent stderr redirected to /dev/null — auth failures, MCP config errors, and CLI crashes were invisible. Now captured in .stderr.log. 3. Heartbeat file touched after spawn instead of before — race window where running agent had no heartbeat and looked dead to watchdog. Now touched before adapter.spawn() call. 4. Worktree creation failure didn't block spawn — agent launched into nonexistent directory and crashed silently. Now raises SpawnError so retry logic handles it properly. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 999112b commit a56302e

7 files changed

Lines changed: 46 additions & 27 deletions

File tree

src/bernstein/adapters/claude.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -412,6 +412,8 @@ def _launch_process(
412412
parent environment is inherited (legacy behaviour).
413413
"""
414414
log_file = log_path.open("w")
415+
stderr_path = log_path.with_suffix(".stderr.log")
416+
stderr_file = stderr_path.open("w")
415417
preexec_fn = self._get_preexec_fn()
416418
try:
417419
try:
@@ -420,7 +422,7 @@ def _launch_process(
420422
cwd=workdir,
421423
env=env,
422424
stdout=subprocess.PIPE,
423-
stderr=subprocess.DEVNULL,
425+
stderr=stderr_file,
424426
start_new_session=True,
425427
preexec_fn=preexec_fn,
426428
)
@@ -434,7 +436,7 @@ def _launch_process(
434436
[sys.executable, "-c", wrapper],
435437
stdin=claude_proc.stdout,
436438
stdout=log_file,
437-
stderr=subprocess.DEVNULL,
439+
stderr=stderr_file,
438440
start_new_session=True,
439441
cwd=workdir,
440442
env=env,
@@ -444,6 +446,7 @@ def _launch_process(
444446
raise
445447
finally:
446448
log_file.close()
449+
stderr_file.close()
447450

448451
# Allow claude_proc to receive SIGPIPE if wrapper dies
449452
if claude_proc.stdout:

src/bernstein/core/models.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,7 @@ class Task:
261261
max_output_tokens: int | None = None # Escalated limit for model output
262262
meta_messages: list[str] = field(default_factory=list[str]) # Operational nudges/hints (T423)
263263
created_at: float = field(default_factory=time.time)
264+
claimed_at: float | None = None # Epoch timestamp when task was claimed by an agent
264265
completed_at: float | None = None # Epoch timestamp when task completed/failed
265266
closed_at: float | None = None # Epoch timestamp when task was verified and closed
266267
deadline: float | None = None # Epoch timestamp when task must be complete
@@ -349,6 +350,7 @@ def from_dict(cls, raw: dict[str, Any]) -> Task:
349350
max_output_tokens=raw.get("max_output_tokens"),
350351
meta_messages=list(raw.get("meta_messages", [])),
351352
created_at=raw.get("created_at", time.time()),
353+
claimed_at=raw.get("claimed_at"),
352354
completed_at=raw.get("completed_at"),
353355
closed_at=raw.get("closed_at"),
354356
deadline=raw.get("deadline"),
@@ -1055,6 +1057,7 @@ class OrchestratorConfig:
10551057
permission_mode: str | None = None # "bypass" | "plan" | "auto" | "default" — see permission_mode.py
10561058
agent_resource_limits: Any | None = None # ResourceLimits | None — OS-level limits for non-sandboxed spawns
10571059
shutdown_stagger_delay_s: float = 5.0 # Seconds between SHUTDOWN signals during drain
1060+
stale_claim_timeout_s: float = 900.0 # Seconds before a claimed task with no live agent is released
10581061

10591062
def __post_init__(self) -> None:
10601063
"""Parse nested workflow config if dict provided."""

src/bernstein/core/orchestrator.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ class Orchestrator:
246246
_MAX_PROCESSED_DONE: int = 500 # cap _processed_done_tasks set size
247247
_MANAGER_REVIEW_COMPLETION_THRESHOLD: int = 7 # trigger review after this many completions
248248
_MANAGER_REVIEW_STALL_S: float = 900.0 # trigger review after 15 min of no progress
249-
_STALE_CLAIM_TIMEOUT_S: float = 600.0 # 10 minutes — release claimed tasks older than this
249+
_STALE_CLAIM_TIMEOUT_S: float = 900.0 # default fallback; prefer config.stale_claim_timeout_s
250250

251251
def __init__(
252252
self,
@@ -2013,9 +2013,8 @@ def _release_stale_claims(self, claimed_tasks: list[Task]) -> int:
20132013
20142014
When an agent dies silently (no crash signal, no heartbeat timeout),
20152015
its claimed tasks stay in "claimed" forever. This method detects
2016-
tasks with no matching live agent that have exceeded
2017-
``_STALE_CLAIM_TIMEOUT_S`` and marks them failed so they can be
2018-
retried.
2016+
tasks with no matching live agent that have exceeded the stale claim
2017+
timeout and marks them failed so they can be retried.
20192018
20202019
Args:
20212020
claimed_tasks: Tasks with status "claimed" from the current tick.
@@ -2024,6 +2023,7 @@ def _release_stale_claims(self, claimed_tasks: list[Task]) -> int:
20242023
Number of tasks released.
20252024
"""
20262025
now = time.time()
2026+
timeout = self._config.stale_claim_timeout_s
20272027
released = 0
20282028
for task in claimed_tasks:
20292029
# Skip tasks that have a known live agent in this session
@@ -2033,9 +2033,12 @@ def _release_stale_claims(self, claimed_tasks: list[Task]) -> int:
20332033
if agent is not None and agent.status != "dead":
20342034
continue
20352035

2036-
# Use created_at as lower-bound proxy (task model has no claimed_at)
2037-
age_s = now - task.created_at
2038-
if age_s < self._STALE_CLAIM_TIMEOUT_S:
2036+
# Use claimed_at (when available) to measure actual time in claimed
2037+
# state. Fall back to created_at for legacy tasks that pre-date the
2038+
# claimed_at field — this is conservative (over-counts) but safe.
2039+
claim_epoch = task.claimed_at if task.claimed_at is not None else task.created_at
2040+
age_s = now - claim_epoch
2041+
if age_s < timeout:
20392042
continue
20402043

20412044
try:

src/bernstein/core/server.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -366,6 +366,7 @@ class TaskResponse(BaseModel):
366366
slack_context: dict[str, Any] | None = None
367367
metadata: dict[str, Any] = Field(default_factory=dict)
368368
created_at: float
369+
claimed_at: float | None = None
369370
deadline: float | None = None
370371
progress_log: list[ProgressEntry] = Field(default_factory=list)
371372
version: int = 1
@@ -849,6 +850,7 @@ def task_to_response(task: Task) -> TaskResponse:
849850
slack_context=task.slack_context,
850851
metadata=task.metadata,
851852
created_at=task.created_at,
853+
claimed_at=task.claimed_at,
852854
progress_log=list(cast("list[ProgressEntry]", task.progress_log)), # type: ignore[reportUnknownMemberType]
853855
version=task.version,
854856
parent_session_id=task.parent_session_id,

src/bernstein/core/spawner.py

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1269,13 +1269,10 @@ def _spawn_for_tasks_internal(self, tasks: list[Task], model_override: str | Non
12691269
self._worktree_paths[session_id] = spawn_cwd
12701270
self._worktree_roots[session_id] = worktree_repo_root
12711271
except WorktreeError as exc:
1272-
logger.warning(
1273-
"Cannot create workspace for agent %s. "
1274-
"Reason: %s. "
1275-
"Fix: run 'bernstein stop' then restart, or delete .sdd/worktrees/ manually",
1276-
session_id,
1277-
exc,
1278-
)
1272+
raise SpawnError(
1273+
f"Cannot create workspace for agent {session_id}: {exc}. "
1274+
"Fix: run 'bernstein stop' then restart, or delete .sdd/worktrees/ manually"
1275+
) from exc
12791276

12801277
# Build per-task MCP config: auto-detected servers merged with base config
12811278
effective_mcp = self._mcp_config
@@ -1375,6 +1372,16 @@ def _spawn_for_tasks_internal(self, tasks: list[Task], model_override: str | Non
13751372
_unattended_attempt = 0
13761373
result: SpawnResult | None = None
13771374

1375+
# Touch heartbeat file BEFORE spawn so the watchdog sees the agent as
1376+
# alive from the moment it starts — avoids a race window where the
1377+
# process is running but no heartbeat file exists yet.
1378+
try:
1379+
hb_dir = self._workdir / ".sdd" / "runtime" / "heartbeats"
1380+
hb_dir.mkdir(parents=True, exist_ok=True)
1381+
(hb_dir / session_id).touch()
1382+
except OSError:
1383+
pass
1384+
13781385
while True:
13791386
# Remote spawn already succeeded — skip the local adapter loop entirely
13801387
if remote_spawned:
@@ -1596,17 +1603,6 @@ def _spawn_for_tasks_internal(self, tasks: list[Task], model_override: str | Non
15961603
if result.log_path:
15971604
session.log_path = str(result.log_path)
15981605

1599-
# Touch heartbeat file immediately so the agent starts with a
1600-
# fresh timestamp — prevents idle recycling from killing agents
1601-
# that take a long time before emitting their first stream-json
1602-
# event (e.g. Claude Code thinking for 2+ minutes).
1603-
try:
1604-
hb_dir = self._workdir / ".sdd" / "runtime" / "heartbeats"
1605-
hb_dir.mkdir(parents=True, exist_ok=True)
1606-
(hb_dir / session_id).touch()
1607-
except OSError:
1608-
pass
1609-
16101606
if session.status != "working":
16111607
transition_agent(
16121608
session,

src/bernstein/core/task_store.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ class TaskRecord(TypedDict):
7474
risk_level: str
7575
slack_context: dict[str, Any] | None
7676
version: int
77+
claimed_at: float | None
7778
completed_at: float | None
7879
closed_at: float | None
7980
claimed_by_session: str | None
@@ -434,6 +435,7 @@ def recover_stale_claimed_tasks(self) -> int:
434435
for task in list(self._by_status.get(stale_status, {}).values()):
435436
self._index_remove(task)
436437
task.status = TaskStatus.OPEN
438+
task.claimed_at = None
437439
task.claimed_by_session = None
438440
self._index_add(task)
439441
reset_count += 1
@@ -604,6 +606,7 @@ def _task_to_record(self, task: Task) -> TaskRecord:
604606
"risk_level": task.risk_level,
605607
"slack_context": task.slack_context,
606608
"version": task.version,
609+
"claimed_at": task.claimed_at,
607610
"completed_at": task.completed_at,
608611
"closed_at": task.closed_at,
609612
"claimed_by_session": task.claimed_by_session,
@@ -984,6 +987,7 @@ async def claim_next(
984987
return None
985988
self._index_remove(task)
986989
transition_task(task, TaskStatus.CLAIMED, actor="task_store", reason="claim_next")
990+
task.claimed_at = time.time()
987991
task.claimed_by_session = claimed_by_session
988992
task.version += 1
989993
self._index_add(task)
@@ -1042,6 +1046,7 @@ async def claim_by_id(
10421046
raise ValueError(overlap_msg)
10431047
self._index_remove(task)
10441048
transition_task(task, TaskStatus.CLAIMED, actor="task_store", reason="claim_by_id")
1049+
task.claimed_at = time.time()
10451050
task.claimed_by_session = claimed_by_session
10461051
task.version += 1
10471052
self._index_add(task)
@@ -1087,6 +1092,7 @@ async def claim_batch(
10871092
continue
10881093
self._index_remove(task)
10891094
transition_task(task, TaskStatus.CLAIMED, actor="task_store", reason=f"claim_batch by {agent_id}")
1095+
task.claimed_at = time.time()
10901096
task.assigned_agent = agent_id
10911097
task.claimed_by_session = claimed_by_session
10921098
task.version += 1
@@ -1595,6 +1601,7 @@ async def force_claim(self, task_id: str) -> Task:
15951601
transition_task(task, TaskStatus.OPEN, actor="task_store", reason="force_claim")
15961602
self._index_add(task)
15971603
task.priority = 0
1604+
task.claimed_at = None # Clear claim timestamp on force-claim
15981605
task.claimed_by_session = None # Clear ownership on force-claim
15991606
task.version += 1
16001607
await self._append_jsonl(self._task_to_record(task))

tests/unit/test_orchestrator.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from __future__ import annotations
44

55
import json
6+
import subprocess
67
import time
78
from pathlib import Path
89
from types import SimpleNamespace
@@ -533,6 +534,10 @@ def test_starving_role_gets_slot_when_capacity_is_tight(self, tmp_path: Path) ->
533534
"""When max_agents capacity is near-full, a starving role gets the last slot
534535
instead of a role that already has an agent and is still under its per-role cap.
535536
"""
537+
# Worktree creation requires a git repo with at least one commit.
538+
subprocess.run(["git", "init", str(tmp_path)], capture_output=True, check=True)
539+
subprocess.run(["git", "-C", str(tmp_path), "commit", "--allow-empty", "-m", "init"], capture_output=True, check=True)
540+
536541
backend_task = _make_task(id="T-be", role="backend", priority=2)
537542
qa_task = _make_task(id="T-qa", role="qa", priority=2)
538543
all_tasks = [_task_as_dict(backend_task), _task_as_dict(qa_task)]

0 commit comments

Comments
 (0)