Skip to content

Commit c1dcc04

Browse files
committed
fix(swarm): unified reconcile + MCP keepalive + heartbeat coverage (#132)
Symptoms reported in #132 (WorkBuddy + DeepSeek MCP): - MCP run_swarm blocked 30 min then errored, losing run_id - get_swarm_status / get_run_result showed stale snapshot while tasks/*.json had real progress - Host crash left runs "running" forever with no recovery path - list_swarm_runs / SSE rendered the same stale snapshot Architecture: - store.reconcile_run(write=True): single read-side entry point. Hydrates live task files, recovers terminal-but-not-finalized state (host crashed between last layer sync and finalize), and reaps stale running runs whose events.jsonl has gone silent past compute_stale_threshold. - Threshold is per-run: max(60, min(heartbeat_interval*10, retry_ceiling)) — fast detection when heartbeats work, retry-budget-clamped when not. - HeartbeatTimer wraps registry.execute, llm.stream_chat, AND grounding fetch so no normal execution phase can produce >heartbeat_interval of silence. Slow first-token / reasoning-mode / pure tool-call responses no longer trip the reaper. - run_swarm emits ctx.report_progress every poll with elapsed seconds, so MCP transport stays alive via progress notifications; first frame is fixed-format ``swarm_started run_id=<id>`` so transport-drop clients can still recover the handle. Endpoints updated to use reconcile_run: - MCP: get_swarm_status, get_run_result, list_runs, run_swarm polling - API: /swarm/runs, /swarm/runs/{id}, /swarm/runs/{id}/events SSE - SwarmTool: polling + budget-out path (no longer cancels on timeout) Other fixes: - SWARM_HEARTBEAT_INTERVAL_S parses with ValueError fallback in worker (was crashing import on bad config) - reaper derives run terminal status from task statuses (no more hardcoded "failed" on a run whose tasks all completed) - serialize_task surfaces started_at / completed_at / depends_on / blocked_by - reap_stale_runs() MCP tool for explicit user-triggered cleanup Tests: 22 new regressions in tests/test_swarm_status_hydration.py covering hydrate, reconcile, terminal recovery, stale reap, MCP keepalive cadence, auto-recovery on read, threshold math, env var robustness, heartbeat wiring around LLM + tool execution. Verification: - targeted suite: 22 passed - full swarm/mcp suite: 169 passed, 4 skipped, 0 failed - compileall + import smoke clean
1 parent 7fef09a commit c1dcc04

8 files changed

Lines changed: 1242 additions & 89 deletions

File tree

agent/api_server.py

Lines changed: 32 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1687,46 +1687,45 @@ async def create_swarm_run(payload: dict, http_request: Request):
16871687

16881688
@app.get("/swarm/runs", dependencies=[Depends(require_auth)])
16891689
async def list_swarm_runs(limit: int = Query(20, ge=1, le=100)):
1690-
"""List swarm runs (newest first)."""
1690+
"""List swarm runs (newest first), reconciled."""
16911691
runtime = _get_swarm_runtime()
16921692
runs = runtime._store.list_runs(limit=limit)
1693-
return [
1694-
{
1695-
"id": r.id,
1696-
"preset_name": r.preset_name,
1697-
"status": r.status.value,
1698-
"created_at": r.created_at,
1699-
"task_count": len(r.tasks),
1700-
"completed_count": sum(1 for t in r.tasks if t.status.value == "completed"),
1701-
}
1702-
for r in runs
1703-
]
1693+
items = []
1694+
for r in runs:
1695+
# Reconcile each row: a zombie running run will be auto-finalized so
1696+
# the dashboard never shows a permanent "running" stuck row.
1697+
reconciled = runtime._store.reconcile_run(r, write=True)
1698+
items.append(
1699+
{
1700+
"id": reconciled.id,
1701+
"preset_name": reconciled.preset_name,
1702+
"status": reconciled.status.value,
1703+
"is_stale": runtime._store.is_run_stale(reconciled),
1704+
"created_at": reconciled.created_at,
1705+
"completed_at": reconciled.completed_at,
1706+
"task_count": len(reconciled.tasks),
1707+
"completed_count": sum(1 for t in reconciled.tasks if t.status.value == "completed"),
1708+
}
1709+
)
1710+
return items
17041711

17051712

17061713
@app.get("/swarm/runs/{run_id}", dependencies=[Depends(require_auth)])
17071714
async def get_swarm_run(run_id: str):
1708-
"""Swarm run detail including task statuses."""
1709-
from src.swarm.task_store import TaskStore
1710-
1715+
"""Swarm run detail including task statuses (reconciled)."""
17111716
_validate_path_param(run_id, "run_id")
17121717
runtime = _get_swarm_runtime()
1713-
run = runtime._store.load_run(run_id)
1714-
if not run:
1718+
loaded = runtime._store.load_run(run_id)
1719+
if not loaded:
17151720
raise HTTPException(status_code=404, detail=f"Run {run_id} not found")
17161721

1717-
# Merge real-time task statuses from task_store (updated during execution)
1718-
run_dir = runtime._store.run_dir(run_id)
1719-
tasks_dir = run_dir / "tasks"
1720-
if tasks_dir.exists():
1721-
task_store = TaskStore(run_dir)
1722-
live_tasks = task_store.load_all()
1723-
if live_tasks:
1724-
run.tasks = live_tasks
1722+
run = runtime._store.reconcile_run(loaded, write=True)
17251723

17261724
return {
17271725
"id": run.id,
17281726
"preset_name": run.preset_name,
17291727
"status": run.status.value,
1728+
"is_stale": runtime._store.is_run_stale(run),
17301729
"user_vars": run.user_vars,
17311730
"agents": [a.model_dump() for a in run.agents],
17321731
"tasks": [t.model_dump() for t in run.tasks],
@@ -1754,9 +1753,14 @@ async def event_stream():
17541753
idx += 1
17551754
yield f"id: {idx}\nevent: {evt.type}\ndata: {json.dumps(evt.model_dump(), ensure_ascii=False)}\n\n"
17561755
run = runtime._store.load_run(run_id)
1757-
if run and run.status.value in ("completed", "failed", "cancelled"):
1758-
yield f"event: done\ndata: {{\"status\": \"{run.status.value}\"}}\n\n"
1759-
break
1756+
if run:
1757+
# Reconcile so a zombie running run can still close this SSE
1758+
# stream cleanly — without it, a dead host would keep the
1759+
# stream open forever and block the dashboard's "done" state.
1760+
reconciled = runtime._store.reconcile_run(run, write=True)
1761+
if reconciled.status.value in ("completed", "failed", "cancelled"):
1762+
yield f"event: done\ndata: {{\"status\": \"{reconciled.status.value}\"}}\n\n"
1763+
break
17601764
await asyncio.sleep(2)
17611765

17621766
return StreamingResponse(event_stream(), media_type="text/event-stream")

agent/mcp_server.py

Lines changed: 164 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
if str(AGENT_DIR) not in sys.path:
4141
sys.path.insert(0, str(AGENT_DIR))
4242

43-
from fastmcp import FastMCP
43+
from fastmcp import Context, FastMCP
4444

4545
mcp = FastMCP("Vibe-Trading")
4646

@@ -356,23 +356,40 @@ def list_swarm_presets() -> str:
356356

357357

358358
@mcp.tool
359-
def run_swarm(preset_name: str, variables: dict[str, str]) -> str:
360-
"""Run a swarm multi-agent team and return the final report.
359+
async def run_swarm(
360+
preset_name: str,
361+
variables: dict[str, str],
362+
wait_seconds: int = 3600,
363+
start_only: bool = False,
364+
ctx: Context | None = None,
365+
) -> str:
366+
"""Run a swarm multi-agent team and stream progress back to the caller.
361367
362368
Assembles a team of specialized agents that collaborate through a DAG workflow.
363369
For example, the 'investment_committee' preset runs bull analyst, bear analyst,
364370
risk officer, and portfolio manager in sequence.
365371
366372
Use list_swarm_presets() to see available presets and their required variables.
367373
374+
The tool keeps the MCP call open via ``Context.report_progress`` while the
375+
swarm runs, so the caller sees live "N/M tasks complete" updates instead
376+
of timing out silently. Only if ``wait_seconds`` is exhausted does the
377+
tool return early with the current ``run_id`` — call ``get_run_result``
378+
afterwards to fetch the final report.
379+
368380
Args:
369381
preset_name: Swarm preset name (e.g. 'investment_committee', 'quant_strategy_desk').
370382
variables: Required variables for the preset (e.g. {"target": "AAPL.US", "market": "US"}).
383+
wait_seconds: Maximum seconds to keep the MCP call open. Default 3600
384+
(1 hour); the progress-notification keepalive means the transport
385+
stays connected for the full budget.
386+
start_only: If True, kick off the run and return immediately with
387+
``run_id`` + current status. Ignores ``wait_seconds``.
371388
"""
389+
import asyncio
372390
import time
373391
from src.swarm.runtime import SwarmRuntime
374392
from src.swarm.store import SwarmStore, swarm_runs_root
375-
from src.swarm.models import RunStatus
376393

377394
swarm_dir = swarm_runs_root()
378395
store = SwarmStore(base_dir=swarm_dir)
@@ -387,32 +404,62 @@ def run_swarm(preset_name: str, variables: dict[str, str]) -> str:
387404
except ValueError as exc:
388405
return json.dumps({"status": "error", "error": f"DAG validation failed: {exc}"}, ensure_ascii=False)
389406

390-
# Poll until complete (max 30 minutes)
391-
for _ in range(360):
392-
time.sleep(5)
393-
current = store.load_run(run.id)
394-
if current is None:
395-
return json.dumps({"status": "error", "error": "Run record lost"}, ensure_ascii=False)
396-
if current.status in (RunStatus.completed, RunStatus.failed, RunStatus.cancelled):
397-
from src.swarm.serialization import run_level_error, serialize_task
398-
399-
tasks = [serialize_task(t) for t in current.tasks]
400-
return json.dumps(
401-
{
402-
"status": current.status.value,
403-
"preset": preset_name,
404-
"run_id": current.id,
405-
"final_report": current.final_report,
406-
"error": run_level_error(current),
407-
"tasks": tasks,
408-
"total_input_tokens": current.total_input_tokens,
409-
"total_output_tokens": current.total_output_tokens,
410-
},
411-
ensure_ascii=False,
412-
indent=2,
413-
)
407+
if start_only or wait_seconds <= 0:
408+
return json.dumps(
409+
_build_run_payload(store, run.id, preset_name, timed_out=False),
410+
ensure_ascii=False,
411+
indent=2,
412+
)
414413

415-
return json.dumps({"status": "error", "error": "Swarm timed out after 30 minutes"}, ensure_ascii=False)
414+
# Surface the run_id immediately in a fixed-format progress message so a
415+
# caller whose transport drops mid-run (or whose MCP client enforces a
416+
# hard tool-call timeout that ignores progress notifications) can still
417+
# recover the run via ``get_run_result(run_id)``. Parsers should match
418+
# ``swarm_started run_id=<id>`` literally; later frames are free-form.
419+
if ctx is not None:
420+
try:
421+
await ctx.report_progress(
422+
progress=0,
423+
total=1,
424+
message=f"swarm_started run_id={run.id} preset={preset_name}",
425+
)
426+
except Exception:
427+
pass
428+
429+
terminal = {"completed", "failed", "cancelled"}
430+
started_at = time.monotonic()
431+
deadline = started_at + wait_seconds
432+
while True:
433+
payload = _build_run_payload(store, run.id, preset_name, timed_out=False)
434+
if payload["status"] == "error":
435+
return json.dumps(payload, ensure_ascii=False)
436+
if payload["status"] in terminal:
437+
return json.dumps(payload, ensure_ascii=False, indent=2)
438+
439+
# Emit a progress frame every loop, NOT only on state change — MCP
440+
# clients use these as transport keepalive. A long task that doesn't
441+
# transition for 30 minutes still needs ticks or the client times out.
442+
# ``elapsed`` keeps the message content fresh so dedup-on-message
443+
# clients still see updates.
444+
if ctx is not None:
445+
tasks = payload.get("tasks") or []
446+
total = max(1, len(tasks))
447+
done = sum(1 for t in tasks if t.get("status") in terminal)
448+
elapsed = int(time.monotonic() - started_at)
449+
try:
450+
await ctx.report_progress(
451+
progress=done,
452+
total=total,
453+
message=f"{done}/{total} tasks complete · {elapsed}s elapsed (run {run.id})",
454+
)
455+
except Exception:
456+
pass
457+
458+
remaining = deadline - time.monotonic()
459+
if remaining <= 0:
460+
payload = _build_run_payload(store, run.id, preset_name, timed_out=True)
461+
return json.dumps(payload, ensure_ascii=False, indent=2)
462+
await asyncio.sleep(min(5.0, remaining))
416463

417464

418465
# ---------------------------------------------------------------------------
@@ -561,28 +608,63 @@ def _get_swarm_store():
561608
return SwarmStore(base_dir=swarm_dir)
562609

563610

564-
def _run_to_dict(run) -> dict:
611+
def _run_to_dict(run, *, timed_out: bool = False, is_stale: bool = False) -> dict:
612+
"""Public projection of a (live-hydrated) :class:`SwarmRun`.
613+
614+
``timed_out`` flips on only for the ``run_swarm`` wait-budget path. It does
615+
not change the run's actual status — callers can still see ``running`` and
616+
fetch the final report later via :func:`get_run_result`.
617+
618+
``is_stale`` is a read-only signal: ``True`` means the run is still
619+
``running`` but its events.jsonl has been silent past the per-run
620+
threshold. No disk state is changed by setting this — the explicit
621+
:func:`reap_stale_runs` tool is what finalizes a stale run.
622+
"""
565623
from src.swarm.serialization import run_level_error, serialize_task
566624

567625
return {
568626
"run_id": run.id,
569627
"status": run.status.value,
570628
"preset": run.preset_name,
571629
"created_at": run.created_at,
630+
"completed_at": run.completed_at,
572631
"error": run_level_error(run),
573632
"tasks": [serialize_task(t) for t in run.tasks],
574633
"final_report": run.final_report,
575634
"total_input_tokens": run.total_input_tokens,
576635
"total_output_tokens": run.total_output_tokens,
636+
"timed_out": timed_out,
637+
"is_stale": is_stale,
577638
}
578639

579640

641+
def _build_run_payload(store, run_id: str, preset_name: str | None, *, timed_out: bool) -> dict:
642+
"""Reconcile + project a run for the MCP response.
643+
644+
Used by ``run_swarm`` (polling + start_only). Returns a normal payload on
645+
success and a ``{"status": "error", ...}`` envelope when the run record
646+
disappears (mid-run directory wipe / sandbox eviction).
647+
"""
648+
run = store.load_run(run_id)
649+
if run is None:
650+
return {"status": "error", "error": "Run record lost", "run_id": run_id}
651+
reconciled = store.reconcile_run(run, write=True)
652+
payload = _run_to_dict(
653+
reconciled,
654+
timed_out=timed_out,
655+
is_stale=store.is_run_stale(reconciled),
656+
)
657+
if preset_name:
658+
payload["preset"] = preset_name
659+
return payload
660+
661+
580662
@mcp.tool
581663
def get_swarm_status(run_id: str) -> str:
582664
"""Get the current status of a swarm run.
583665
584-
Returns status, task progress, and token usage for the specified run.
585-
Use this to poll a long-running swarm without blocking.
666+
Returns status, task progress, token usage, and an ``is_stale`` flag for
667+
the specified run. Use this to poll a long-running swarm without blocking.
586668
587669
Args:
588670
run_id: The run ID returned by run_swarm.
@@ -591,15 +673,22 @@ def get_swarm_status(run_id: str) -> str:
591673
run = store.load_run(run_id)
592674
if run is None:
593675
return json.dumps({"status": "error", "error": f"Run {run_id} not found"}, ensure_ascii=False)
594-
return json.dumps(_run_to_dict(run), ensure_ascii=False, indent=2)
676+
reconciled = store.reconcile_run(run, write=True)
677+
return json.dumps(
678+
_run_to_dict(reconciled, is_stale=store.is_run_stale(reconciled)),
679+
ensure_ascii=False,
680+
indent=2,
681+
)
595682

596683

597684
@mcp.tool
598685
def get_run_result(run_id: str) -> str:
599-
"""Get the final report and task summaries of a completed swarm run.
686+
"""Get the final report and task summaries of a swarm run.
600687
601-
Returns the final_report text and per-task summaries. If the run is
602-
still in progress, returns current status instead.
688+
Reconciles the run on read: an orphaned ``running`` run whose host
689+
process exited will be transitioned to its real terminal status
690+
(``completed`` / ``failed`` / ``cancelled`` derived from the task
691+
statuses), so the caller never sees a permanent zombie.
603692
604693
Args:
605694
run_id: The run ID returned by run_swarm.
@@ -608,15 +697,18 @@ def get_run_result(run_id: str) -> str:
608697
run = store.load_run(run_id)
609698
if run is None:
610699
return json.dumps({"status": "error", "error": f"Run {run_id} not found"}, ensure_ascii=False)
611-
return json.dumps(_run_to_dict(run), ensure_ascii=False, indent=2)
700+
reconciled = store.reconcile_run(run, write=True)
701+
payload = _run_to_dict(reconciled, is_stale=store.is_run_stale(reconciled))
702+
payload["ready"] = payload["status"] in {"completed", "failed", "cancelled"}
703+
return json.dumps(payload, ensure_ascii=False, indent=2)
612704

613705

614706
@mcp.tool
615707
def list_runs(limit: int = 20) -> str:
616708
"""List recent swarm runs sorted by creation time (newest first).
617709
618-
Returns run IDs, presets, statuses, and creation timestamps.
619-
Use get_run_result(run_id) to fetch full details for a specific run.
710+
Each row includes task counts and an ``is_stale`` flag so callers can
711+
spot abandoned runs without a follow-up status call.
620712
621713
Args:
622714
limit: Maximum number of runs to return (default 20).
@@ -625,19 +717,46 @@ def list_runs(limit: int = 20) -> str:
625717
runs = store.list_runs(limit=limit)
626718
items = []
627719
for run in runs:
720+
# write=True so a zombie listed alongside live runs gets finalized;
721+
# the cost is bounded by ``limit`` (default 20) and most rows are
722+
# already terminal — reconcile is a no-op for those.
723+
reconciled = store.reconcile_run(run, write=True)
724+
counts = {"total": len(reconciled.tasks)}
725+
for t in reconciled.tasks:
726+
counts[t.status.value] = counts.get(t.status.value, 0) + 1
628727
items.append(
629728
{
630-
"run_id": run.id,
631-
"preset": run.preset_name,
632-
"status": run.status.value,
633-
"created_at": run.created_at,
634-
"total_input_tokens": run.total_input_tokens,
635-
"total_output_tokens": run.total_output_tokens,
729+
"run_id": reconciled.id,
730+
"preset": reconciled.preset_name,
731+
"status": reconciled.status.value,
732+
"is_stale": store.is_run_stale(reconciled),
733+
"created_at": reconciled.created_at,
734+
"completed_at": reconciled.completed_at,
735+
"task_counts": counts,
736+
"total_input_tokens": reconciled.total_input_tokens,
737+
"total_output_tokens": reconciled.total_output_tokens,
636738
}
637739
)
638740
return json.dumps(items, ensure_ascii=False, indent=2)
639741

640742

743+
@mcp.tool
744+
def reap_stale_runs() -> str:
745+
"""Mark every ``running`` run whose host process died as ``failed``.
746+
747+
Walks the swarm store, applies the per-run stale threshold, and
748+
finalizes any run that has gone silent past it (writes ``run.json`` +
749+
``tasks/*.json`` + appends a ``run_reaped`` event). Already-terminal
750+
runs and still-alive runs are left untouched.
751+
752+
Returns:
753+
JSON list of reaped run IDs (empty when nothing was stale).
754+
"""
755+
store = _get_swarm_store()
756+
reaped = store.reap_stale_running_runs()
757+
return json.dumps({"reaped": reaped}, ensure_ascii=False, indent=2)
758+
759+
641760
# ---------------------------------------------------------------------------
642761
# Trade journal tool
643762
# ---------------------------------------------------------------------------

0 commit comments

Comments
 (0)