Skip to content

Commit 6ba5ffb

Browse files
AliceLJYclaude
andcommitted
feat: Phase 2 — reliability + cost dashboard (idempotency, circuit breaker, retry, cost dimensions)
Research-backed production reliability layer for multi-agent teams: - Idempotency Key: TaskItem + TeamMessage support dedup key, create()/send() return existing result on duplicate key instead of creating duplicates (distributed systems Tier 0) - Cost Dashboard MVP: CostEvent gains task_id field, CostSummary adds by_model/by_task dimensions, `costs show --by agent|task|model` CLI, cost_rate() per-minute trend (competitive whitespace — no competitor offers this) - Circuit Breaker: AgentHealth 3-state model (healthy→degraded→open) with quality_score, record_outcome() tracks consecutive failures, auto-opens at threshold, half-open probe after cooldown (distributed systems + swarm intelligence SOC) - Retry with Backoff: RetryConfig on AgentDef, spawn_with_retry() wrapper with exponential backoff, configurable max_retries/base/cap (distributed systems Tier 2) 38 new tests, 378 total passing, 0 regressions. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 675c70c commit 6ba5ffb

9 files changed

Lines changed: 641 additions & 7 deletions

File tree

clawteam/cli/commands.py

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1008,6 +1008,7 @@ def cost_report(
10081008
provider: str = typer.Option("", "--provider", help="Provider name (e.g. anthropic)"),
10091009
model: str = typer.Option("", "--model", help="Model name"),
10101010
agent: Optional[str] = typer.Option(None, "--agent", "-a", help="Agent name (default: from env)"),
1011+
task_id: str = typer.Option("", "--task-id", help="Associated task ID"),
10111012
):
10121013
"""Report token usage and cost for an agent."""
10131014
from clawteam.identity import AgentIdentity
@@ -1023,6 +1024,7 @@ def cost_report(
10231024
input_tokens=input_tokens,
10241025
output_tokens=output_tokens,
10251026
cost_cents=cost_cents,
1027+
task_id=task_id,
10261028
)
10271029
data = _dump(event)
10281030

@@ -1049,6 +1051,7 @@ def _human(d):
10491051
def cost_show(
10501052
team: str = typer.Argument(..., help="Team name"),
10511053
agent: Optional[str] = typer.Option(None, "--agent", "-a", help="Filter by agent"),
1054+
by: Optional[str] = typer.Option(None, "--by", "-b", help="Breakdown dimension: agent, task, or model"),
10521055
):
10531056
"""Show cost summary and event history."""
10541057
from clawteam.team.costs import CostStore
@@ -1059,10 +1062,12 @@ def cost_show(
10591062
events = store.list_events(agent_name=agent or "")
10601063
config = TeamManager.get_team(team)
10611064
budget = config.budget_cents if config else 0.0
1065+
rate = store.cost_rate()
10621066

10631067
data = {
10641068
"summary": _dump(summary),
10651069
"budget_cents": budget,
1070+
"cost_rate_per_min": rate,
10661071
"events": [_dump(e) for e in events],
10671072
}
10681073

@@ -1077,11 +1082,21 @@ def _human(d):
10771082
console.print(f" Input tokens: {s.get('totalInputTokens', 0):,}")
10781083
console.print(f" Output tokens: {s.get('totalOutputTokens', 0):,}")
10791084
console.print(f" Events: {s.get('eventCount', 0)}")
1080-
by_agent = s.get("byAgent", {})
1081-
if by_agent:
1082-
console.print(" By agent:")
1083-
for a, c in sorted(by_agent.items()):
1084-
console.print(f" {a}: ${c / 100:.4f}")
1085+
if rate > 0:
1086+
console.print(f" Rate: ${rate / 100:.4f}/min")
1087+
1088+
# Dimension breakdown
1089+
dimension = by or "agent"
1090+
dimension_key = {
1091+
"agent": "byAgent",
1092+
"model": "byModel",
1093+
"task": "byTask",
1094+
}.get(dimension, "byAgent")
1095+
breakdown = s.get(dimension_key, {})
1096+
if breakdown:
1097+
console.print(f" By {dimension}:")
1098+
for k, c in sorted(breakdown.items()):
1099+
console.print(f" {k}: ${c / 100:.4f}")
10851100

10861101
evts = d["events"]
10871102
if evts:
@@ -1092,6 +1107,7 @@ def _human(d):
10921107
table.add_column("Out Tokens", justify="right")
10931108
table.add_column("Cost", justify="right")
10941109
table.add_column("Model", style="dim")
1110+
table.add_column("Task", style="dim")
10951111
for e in evts[-20:]: # show last 20
10961112
table.add_row(
10971113
(e.get("reportedAt") or "")[:19],
@@ -1100,6 +1116,7 @@ def _human(d):
11001116
f"{e.get('outputTokens', 0):,}",
11011117
f"${e.get('costCents', 0) / 100:.4f}",
11021118
e.get("model", ""),
1119+
e.get("taskId", ""),
11031120
)
11041121
console.print(table)
11051122

clawteam/spawn/__init__.py

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,13 @@
22

33
from __future__ import annotations
44

5+
import logging
6+
import time
7+
58
from clawteam.spawn.base import SpawnBackend
69

10+
logger = logging.getLogger(__name__)
11+
712

813
def get_backend(name: str = "tmux") -> SpawnBackend:
914
"""Factory function to get a spawn backend by name."""
@@ -17,4 +22,31 @@ def get_backend(name: str = "tmux") -> SpawnBackend:
1722
raise ValueError(f"Unknown spawn backend: {name}. Available: subprocess, tmux")
1823

1924

20-
__all__ = ["SpawnBackend", "get_backend"]
25+
def spawn_with_retry(
26+
backend: SpawnBackend,
27+
max_retries: int = 3,
28+
backoff_base: float = 1.0,
29+
backoff_max: float = 30.0,
30+
**spawn_kwargs,
31+
) -> str:
32+
"""Wrap backend.spawn() with exponential backoff retry.
33+
34+
Returns the result of a successful spawn, or the last error message.
35+
"""
36+
last_result = ""
37+
for attempt in range(max_retries + 1):
38+
result = backend.spawn(**spawn_kwargs)
39+
if not result.startswith("Error"):
40+
return result
41+
last_result = result
42+
if attempt < max_retries:
43+
delay = min(backoff_base * (2 ** attempt), backoff_max)
44+
logger.warning(
45+
"Spawn attempt %d/%d failed: %s — retrying in %.1fs",
46+
attempt + 1, max_retries + 1, result, delay,
47+
)
48+
time.sleep(delay)
49+
return last_result
50+
51+
52+
__all__ = ["SpawnBackend", "get_backend", "spawn_with_retry"]

clawteam/spawn/registry.py

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,134 @@
55
import json
66
import subprocess
77
import time
8+
from enum import Enum
89
from pathlib import Path
910

11+
from pydantic import BaseModel, Field
12+
1013
from clawteam.fileutil import atomic_write_text, file_locked
1114
from clawteam.paths import ensure_within_root, validate_identifier
1215
from clawteam.team.models import get_data_dir
1316

17+
# ---------------------------------------------------------------------------
18+
# Circuit Breaker — agent health tracking
19+
# ---------------------------------------------------------------------------
20+
21+
class HealthState(str, Enum):
22+
healthy = "healthy"
23+
degraded = "degraded"
24+
open = "open"
25+
26+
27+
class AgentHealth(BaseModel):
28+
"""Health status for a spawned agent (circuit breaker pattern)."""
29+
30+
model_config = {"populate_by_name": True}
31+
32+
agent_name: str = Field(alias="agentName")
33+
state: HealthState = HealthState.healthy
34+
quality_score: float = Field(default=1.0, alias="qualityScore")
35+
consecutive_failures: int = Field(default=0, alias="consecutiveFailures")
36+
total_successes: int = Field(default=0, alias="totalSuccesses")
37+
total_failures: int = Field(default=0, alias="totalFailures")
38+
last_failure_at: float = Field(default=0.0, alias="lastFailureAt")
39+
cooldown_seconds: float = Field(default=60.0, alias="cooldownSeconds")
40+
41+
@property
42+
def is_accepting_tasks(self) -> bool:
43+
"""Return True if the agent can accept new tasks."""
44+
if self.state != HealthState.open:
45+
return True
46+
# Half-open: allow after cooldown
47+
if self.last_failure_at and (time.time() - self.last_failure_at) >= self.cooldown_seconds:
48+
return True
49+
return False
50+
51+
52+
DEFAULT_FAILURE_THRESHOLD = 3
53+
DEFAULT_COOLDOWN_SECONDS = 60.0
54+
55+
56+
def _health_path(team_name: str) -> Path:
57+
return ensure_within_root(
58+
get_data_dir() / "teams",
59+
validate_identifier(team_name, "team name"),
60+
"agent_health.json",
61+
)
62+
63+
64+
def _load_health(team_name: str) -> dict[str, dict]:
65+
path = _health_path(team_name)
66+
if path.exists():
67+
try:
68+
return json.loads(path.read_text())
69+
except (json.JSONDecodeError, OSError):
70+
return {}
71+
return {}
72+
73+
74+
def _save_health(team_name: str, data: dict[str, dict]) -> None:
75+
atomic_write_text(_health_path(team_name), json.dumps(data, indent=2))
76+
77+
78+
def get_agent_health(team_name: str, agent_name: str) -> AgentHealth:
79+
"""Return health status for an agent (creates default if not tracked)."""
80+
health_data = _load_health(team_name)
81+
if agent_name in health_data:
82+
return AgentHealth.model_validate(health_data[agent_name])
83+
return AgentHealth(agent_name=agent_name)
84+
85+
86+
def get_all_health(team_name: str) -> dict[str, AgentHealth]:
87+
"""Return health for all tracked agents."""
88+
health_data = _load_health(team_name)
89+
return {
90+
name: AgentHealth.model_validate(data)
91+
for name, data in health_data.items()
92+
}
93+
94+
95+
def record_outcome(
96+
team_name: str,
97+
agent_name: str,
98+
success: bool,
99+
failure_threshold: int = DEFAULT_FAILURE_THRESHOLD,
100+
cooldown_seconds: float = DEFAULT_COOLDOWN_SECONDS,
101+
) -> AgentHealth:
102+
"""Record a task outcome and update agent health state.
103+
104+
State transitions:
105+
- healthy → degraded: first failure
106+
- degraded → open: consecutive_failures >= threshold
107+
- open → healthy: success after cooldown (half-open probe)
108+
- any → healthy: success resets consecutive failures
109+
"""
110+
path = _health_path(team_name)
111+
with file_locked(path):
112+
health_data = _load_health(team_name)
113+
raw = health_data.get(agent_name, {"agentName": agent_name})
114+
health = AgentHealth.model_validate(raw)
115+
health.cooldown_seconds = cooldown_seconds
116+
117+
if success:
118+
health.consecutive_failures = 0
119+
health.total_successes += 1
120+
health.quality_score = min(1.0, health.quality_score + 0.1)
121+
health.state = HealthState.healthy
122+
else:
123+
health.consecutive_failures += 1
124+
health.total_failures += 1
125+
health.last_failure_at = time.time()
126+
health.quality_score = max(0.0, health.quality_score - 0.2)
127+
if health.consecutive_failures >= failure_threshold:
128+
health.state = HealthState.open
129+
elif health.consecutive_failures >= 1:
130+
health.state = HealthState.degraded
131+
132+
health_data[agent_name] = json.loads(health.model_dump_json(by_alias=True))
133+
_save_health(team_name, health_data)
134+
return health
135+
14136

15137
def _registry_path(team_name: str) -> Path:
16138
return ensure_within_root(

clawteam/store/file.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,14 @@ def create(
8383
blocks: list[str] | None = None,
8484
blocked_by: list[str] | None = None,
8585
metadata: dict[str, Any] | None = None,
86+
idempotency_key: str | None = None,
8687
) -> TaskItem:
88+
# Idempotency: return existing task if key matches
89+
if idempotency_key:
90+
existing = self._find_by_idempotency_key(idempotency_key)
91+
if existing is not None:
92+
return existing
93+
8794
task = TaskItem(
8895
subject=subject,
8996
description=description,
@@ -92,6 +99,7 @@ def create(
9299
blocks=blocks or [],
93100
blocked_by=blocked_by or [],
94101
metadata=metadata or {},
102+
idempotency_key=idempotency_key,
95103
)
96104
self._validate_blocked_by_unlocked(task.id, task.blocked_by)
97105
if task.blocked_by:
@@ -100,6 +108,19 @@ def create(
100108
self._save_unlocked(task)
101109
return task
102110

111+
def _find_by_idempotency_key(self, key: str) -> TaskItem | None:
112+
"""Return existing task with matching idempotency key, if any."""
113+
root = _tasks_root(self.team_name)
114+
for f in root.glob("task-*.json"):
115+
try:
116+
data = json.loads(f.read_text(encoding="utf-8"))
117+
task = TaskItem.model_validate(data)
118+
if task.idempotency_key == key:
119+
return task
120+
except Exception:
121+
continue
122+
return None
123+
103124
def get(self, task_id: str) -> TaskItem | None:
104125
return self._get_unlocked(task_id)
105126

0 commit comments

Comments
 (0)