Skip to content

Commit 2ef5730

Browse files
AliceLJYxzq-xuwho96claude
authored
feat: cherry-pick upstream #85 #101 #102 — runtime injection + waiter fix + Windows compat (#54)
* fix(waiter): complete immediately when team has zero tasks TaskWaiter.wait() would loop forever when no tasks existed because the completion condition `total > 0 and completed == total` was never true for total=0. Remove the `total > 0` guard so the vacuous case (0 == 0) returns "completed" immediately, matching expected CLI/CI behavior. Also adds 19 comprehensive tests for TaskWaiter covering zero-task completion, normal completion, timeout, signal handling, progress deduplication, message draining, and dead-agent recovery. Made-with: Cursor * feat(runtime): inject inbox messages into running tmux sessions (upstream #85) * fix(tests): remove unused timedelta import * fix(watcher): send runtime warnings to stderr in json mode * fix: Windows compat — replace Path.rename() with os.replace() (upstream #102) Path.rename() fails on Windows when target exists. os.replace() is cross-platform safe. Applied to 5 files that still used .rename(). Also trimmed test_cli_commands.py to keep only runtime injection + lifecycle tests that are compatible with our fork. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: xzq.xu <zhiqiang.xu@nodeskai.com> Co-authored-by: who96 <825265100@qq.com> Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 23e452b commit 2ef5730

15 files changed

Lines changed: 1557 additions & 9 deletions

clawteam/cli/commands.py

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -789,6 +789,130 @@ def inbox_watch(
789789
watcher.watch()
790790

791791

792+
# ============================================================================
793+
# Runtime Commands
794+
# ============================================================================
795+
796+
runtime_app = typer.Typer(help="Tmux-only runtime routing and live injection")
797+
app.add_typer(runtime_app, name="runtime")
798+
799+
800+
@runtime_app.command("inject")
801+
def runtime_inject(
802+
team: str = typer.Argument(..., help="Team name"),
803+
agent: str = typer.Argument(..., help="Target agent name"),
804+
source: str = typer.Option("system", "--source", "-s", help="Runtime notification source"),
805+
channel: str = typer.Option("direct", "--channel", help="Runtime notification channel"),
806+
priority: str = typer.Option("medium", "--priority", help="Runtime notification priority"),
807+
summary: str = typer.Option(..., "--summary", help="Summary text for the injected notification"),
808+
evidence: list[str] = typer.Option([], "--evidence", "-e", help="Repeatable evidence line"),
809+
recommended_next_action: Optional[str] = typer.Option(
810+
None,
811+
"--recommended-next-action",
812+
help="Optional recommended next action",
813+
),
814+
):
815+
"""Inject a structured runtime notification into a running tmux agent."""
816+
from clawteam.spawn.tmux_backend import TmuxBackend
817+
from clawteam.team.routing_policy import RuntimeEnvelope
818+
819+
envelope = RuntimeEnvelope(
820+
source=source,
821+
target=agent,
822+
channel=channel,
823+
priority=priority,
824+
message_type="manual",
825+
summary=summary,
826+
evidence=list(evidence),
827+
recommended_next_action=recommended_next_action,
828+
)
829+
ok, status = TmuxBackend().inject_runtime_message(team, agent, envelope)
830+
if not ok:
831+
console.print(f"[red]{status}[/red]")
832+
raise typer.Exit(1)
833+
834+
_output(
835+
{"team": team, "agent": agent, "status": status},
836+
lambda data: console.print(f"[green]OK[/green] {data['status']}"),
837+
)
838+
839+
840+
@runtime_app.command("watch")
841+
def runtime_watch(
842+
team: str = typer.Argument(..., help="Team name"),
843+
agent: Optional[str] = typer.Option(None, "--agent", "-a", help="Agent name (default: from env)"),
844+
poll_interval: float = typer.Option(1.0, "--poll-interval", "-p", help="Poll interval in seconds"),
845+
exec_cmd: Optional[str] = typer.Option(
846+
None,
847+
"--exec",
848+
"-e",
849+
help="Shell command to run for each new message (msg data in env vars)",
850+
),
851+
):
852+
"""Watch an inbox and route new messages into the running tmux session."""
853+
from clawteam.identity import AgentIdentity
854+
from clawteam.team.mailbox import MailboxManager
855+
from clawteam.team.manager import TeamManager
856+
from clawteam.team.router import RuntimeRouter
857+
from clawteam.team.watcher import InboxWatcher
858+
859+
identity = AgentIdentity.from_env()
860+
agent_name = TeamManager.resolve_inbox(team, agent or identity.agent_name, identity.user)
861+
mailbox = MailboxManager(team)
862+
router = RuntimeRouter(
863+
team_name=team,
864+
agent_name=agent_name,
865+
session_agent_name=agent or identity.agent_name,
866+
)
867+
868+
if not _json_output:
869+
console.print(
870+
f"Watching runtime routes for '{agent_name}' in team '{team}'... (Ctrl+C to stop)"
871+
)
872+
if exec_cmd:
873+
console.print(f" exec: {exec_cmd}")
874+
875+
watcher = InboxWatcher(
876+
team_name=team,
877+
agent_name=agent_name,
878+
mailbox=mailbox,
879+
poll_interval=poll_interval,
880+
json_output=_json_output,
881+
exec_cmd=exec_cmd,
882+
runtime_router=router,
883+
)
884+
watcher.watch()
885+
886+
887+
@runtime_app.command("state")
888+
def runtime_state(
889+
team: str = typer.Argument(..., help="Team name"),
890+
):
891+
"""Show persisted Phase 1 runtime throttle and dispatch state."""
892+
from clawteam.team.routing_policy import DefaultRoutingPolicy
893+
894+
state = DefaultRoutingPolicy(team_name=team).read_state()
895+
896+
def _human(data):
897+
console.print(
898+
f"Runtime state for '{data['team']}' (throttle={data['throttleSeconds']}s)"
899+
)
900+
routes = data.get("routes", {})
901+
if not routes:
902+
console.print("[dim]No runtime route state.[/dim]")
903+
return
904+
for key in sorted(routes):
905+
route = routes[key]
906+
console.print(
907+
f" {route.get('source', '?')} -> {route.get('target', '?')} "
908+
f"pending={route.get('pendingCount', 0)} "
909+
f"status={route.get('lastDispatchStatus', 'idle')} "
910+
f"flushAfter={route.get('flushAfter', '') or '-'}"
911+
)
912+
913+
_output(state, _human)
914+
915+
792916
# ============================================================================
793917
# Task Commands
794918
# ============================================================================

clawteam/spawn/sessions.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from __future__ import annotations
44

55
import json
6+
import os
67
from datetime import datetime, timezone
78
from pathlib import Path
89
from typing import Any
@@ -67,7 +68,7 @@ def save(
6768
tmp.write_text(
6869
session.model_dump_json(indent=2, by_alias=True), encoding="utf-8"
6970
)
70-
tmp.rename(path)
71+
os.replace(str(tmp), str(path))
7172
return session
7273

7374
def load(self, agent_name: str) -> SessionState | None:

clawteam/spawn/tmux_backend.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import sys
1111
import tempfile
1212
import time
13+
from xml.sax.saxutils import escape
1314

1415
from clawteam.spawn.base import SpawnBackend
1516
from clawteam.spawn.cli_env import build_spawn_path, resolve_clawteam_executable
@@ -289,6 +290,31 @@ def list_running(self) -> list[dict[str, str]]:
289290
for name, target in self._agents.items()
290291
]
291292

293+
def inject_runtime_message(self, team: str, agent_name: str, envelope) -> tuple[bool, str]:
294+
"""Best-effort runtime injection into an existing tmux agent pane."""
295+
if not shutil.which("tmux"):
296+
return False, "tmux not installed"
297+
298+
target = f"{self.session_name(team)}:{agent_name}"
299+
probe = subprocess.run(
300+
["tmux", "list-panes", "-t", target, "-F", "#{pane_id}"],
301+
capture_output=True,
302+
text=True,
303+
)
304+
if probe.returncode != 0 or not probe.stdout.strip():
305+
return False, f"tmux target '{target}' not found"
306+
307+
try:
308+
_inject_prompt_via_buffer(
309+
target,
310+
agent_name,
311+
_render_runtime_notification(envelope),
312+
)
313+
except Exception as exc:
314+
return False, f"runtime injection failed for '{target}': {exc}"
315+
316+
return True, f"Injected runtime notification into {target}"
317+
292318
@staticmethod
293319
def session_name(team_name: str) -> str:
294320
return f"clawteam-{team_name}"
@@ -665,3 +691,39 @@ def _inject_prompt_via_buffer(
665691
)
666692
finally:
667693
os.unlink(tmp_path)
694+
695+
def _render_runtime_notification(envelope) -> str:
696+
summary = str(getattr(envelope, "summary", "") or "").strip()
697+
if not summary:
698+
summary = "Runtime update"
699+
700+
evidence = getattr(envelope, "evidence", []) or []
701+
if isinstance(evidence, str):
702+
evidence = [evidence]
703+
evidence_block = "\n".join(str(item) for item in evidence if item)
704+
705+
lines = [
706+
'<clawteam_notification version="1"',
707+
f' source="{escape(str(getattr(envelope, "source", "system") or "system"))}"',
708+
f' target="{escape(str(getattr(envelope, "target", "") or ""))}"',
709+
f' channel="{escape(str(getattr(envelope, "channel", "direct") or "direct"))}"',
710+
f' priority="{escape(str(getattr(envelope, "priority", "medium") or "medium"))}">',
711+
"<summary>",
712+
escape(summary),
713+
"</summary>",
714+
]
715+
716+
if evidence_block:
717+
lines.extend(["<evidence>", escape(evidence_block), "</evidence>"])
718+
recommended_next_action = str(getattr(envelope, "recommended_next_action", "") or "").strip()
719+
if recommended_next_action:
720+
lines.extend(
721+
[
722+
"<recommended_next_action>",
723+
escape(recommended_next_action),
724+
"</recommended_next_action>",
725+
]
726+
)
727+
728+
lines.append("</clawteam_notification>")
729+
return "\n".join(lines)

clawteam/team/mailbox.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from __future__ import annotations
44

55
import json
6+
import os
67
import time
78
import uuid
89

@@ -55,7 +56,7 @@ def _log_event(self, msg: TeamMessage) -> None:
5556
msg.model_dump_json(indent=2, by_alias=True, exclude_none=True),
5657
encoding="utf-8",
5758
)
58-
tmp.rename(path)
59+
os.replace(str(tmp), str(path))
5960

6061
def get_event_log(self, limit: int = 100) -> list[TeamMessage]:
6162
"""Read event log (newest first). Non-destructive."""

clawteam/team/manager.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from __future__ import annotations
44

55
import json
6+
import os
67
import shutil
78
from pathlib import Path
89

@@ -43,7 +44,7 @@ def _save_config(config: TeamConfig) -> None:
4344
tmp.write_text(
4445
config.model_dump_json(indent=2, by_alias=True), encoding="utf-8"
4546
)
46-
tmp.rename(path)
47+
os.replace(str(tmp), str(path))
4748

4849

4950
class TeamManager:

clawteam/team/router.py

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
"""Thin runtime router for tmux live injection."""
2+
3+
from __future__ import annotations
4+
5+
import json
6+
from datetime import datetime
7+
8+
from clawteam.spawn.tmux_backend import TmuxBackend
9+
from clawteam.team.models import MessageType, TeamMessage
10+
from clawteam.team.routing_policy import DefaultRoutingPolicy, RouteDecision, RuntimeEnvelope
11+
12+
13+
class RuntimeRouter:
14+
"""Normalize inbox messages, ask policy for a decision, then dispatch."""
15+
16+
def __init__(
17+
self,
18+
team_name: str,
19+
agent_name: str,
20+
backend: TmuxBackend | None = None,
21+
policy: DefaultRoutingPolicy | None = None,
22+
session_agent_name: str | None = None,
23+
):
24+
self.team_name = team_name
25+
self.inbox_agent_name = agent_name
26+
self.agent_name = session_agent_name or agent_name
27+
self.backend = backend or TmuxBackend()
28+
self.policy = policy or DefaultRoutingPolicy(team_name=team_name)
29+
30+
def normalize_message(self, message: TeamMessage) -> RuntimeEnvelope:
31+
source = message.from_agent or "system"
32+
# Route to the live tmux pane name when the message does not carry an explicit target.
33+
target = message.to or self.agent_name
34+
channel = "team" if message.type == MessageType.broadcast else "direct"
35+
priority = self._priority_for_message(message)
36+
evidence = []
37+
if message.summary:
38+
evidence.append(f"summary: {message.summary}")
39+
if message.plan_file:
40+
evidence.append(f"planFile: {message.plan_file}")
41+
if message.status:
42+
evidence.append(f"status: {message.status}")
43+
if message.last_task:
44+
evidence.append(f"lastTask: {message.last_task}")
45+
if message.reason:
46+
evidence.append(f"reason: {message.reason}")
47+
if message.feedback:
48+
evidence.append(f"feedback: {message.feedback}")
49+
if message.request_id:
50+
evidence.append(f"requestId: {message.request_id}")
51+
52+
summary = (message.content or "").strip() or f"{message.type.value} from {source}"
53+
payload = json.loads(message.model_dump_json(by_alias=True, exclude_none=True))
54+
55+
return RuntimeEnvelope(
56+
source=source,
57+
target=target,
58+
channel=channel,
59+
priority=priority,
60+
message_type=message.type.value,
61+
summary=summary,
62+
evidence=evidence,
63+
recommended_next_action=self._recommended_next_action(message),
64+
payload=payload,
65+
dedupe_key=message.request_id or f"{source}:{target}:{message.type.value}:{message.timestamp}",
66+
created_at=message.timestamp,
67+
)
68+
69+
def route_message(
70+
self,
71+
message: TeamMessage,
72+
*,
73+
now: datetime | str | None = None,
74+
) -> RouteDecision:
75+
envelope = self.normalize_message(message)
76+
decision = self.policy.decide(envelope, now=now)
77+
self.dispatch(decision, now=now)
78+
return decision
79+
80+
def flush_due(self, *, now: datetime | str | None = None) -> list[RouteDecision]:
81+
decisions = self.policy.flush_due(target_agent=self.agent_name, now=now)
82+
for decision in decisions:
83+
self.dispatch(decision, now=now)
84+
return decisions
85+
86+
def dispatch(self, decision: RouteDecision, *, now: datetime | str | None = None) -> bool:
87+
if decision.action != "inject" or not decision.envelope.requires_injection:
88+
return False
89+
90+
if not hasattr(self.backend, "inject_runtime_message"):
91+
self.policy.record_dispatch_result(
92+
decision,
93+
success=False,
94+
now=now,
95+
error="backend does not support runtime injection",
96+
)
97+
return False
98+
99+
ok, reason = self.backend.inject_runtime_message(
100+
self.team_name,
101+
decision.envelope.target,
102+
decision.envelope,
103+
)
104+
self.policy.record_dispatch_result(
105+
decision,
106+
success=ok,
107+
now=now,
108+
error="" if ok else reason,
109+
)
110+
return ok
111+
112+
@staticmethod
113+
def _priority_for_message(message: TeamMessage) -> str:
114+
if message.type in {MessageType.shutdown_request, MessageType.shutdown_approved, MessageType.shutdown_rejected}:
115+
return "high"
116+
if message.type in {MessageType.idle, MessageType.plan_approval_request, MessageType.plan_rejected}:
117+
return "high"
118+
return "medium"
119+
120+
@staticmethod
121+
def _recommended_next_action(message: TeamMessage) -> str | None:
122+
if message.type == MessageType.plan_approval_request:
123+
return "Review the plan and respond with an approval decision."
124+
if message.type == MessageType.idle and message.last_task:
125+
return f"Check blocker status for {message.last_task}."
126+
return None

0 commit comments

Comments
 (0)