Skip to content

Commit dfb106c

Browse files
Teerapat-Vatpitakwarren618
authored andcommitted
fix(swarm): surface errors, output contract, Windows-safe store, redact paths (HKUDS#119)
* fix(swarm): surface task error at read boundaries run_swarm / get_swarm_status / get_run_result and the in-process run_swarm tool each hand-maintained a field allowlist that omitted SwarmTask.error. A misconfigured provider returned status="failed" with no diagnosable reason, though the error was captured on disk the whole time. Add src/swarm/serialization.py as the single source of truth for the per-task projection (now includes error + iterations) plus a run-level error summary; route all three boundaries through it so the allowlist cannot silently drift again. Additive JSON change; the only shape normalization is _format_result missing-summary "" -> null. (cherry picked from commit 95f2717ac32bd02829f650bc971e3cd6dd7fffa6) * fix(swarm): enforce worker output contract A worker that emitted only its plan ("Phase 1 — Plan"), mock data, unparsed tool markup, or (for a data agent) made no tool call and wrote no report was returned status="completed". runtime.py also folded timeout/token_limit into completed. The run reported success with a stub as final_report (P01); degraded workers were invisible (P03). Add WorkerStatus.incomplete and a hybrid output contract in worker.py: content-sanity for every agent, tool-evidence only for data agents so tool-less synthesis/editor roles are not false-rejected. Fail-fast to incomplete (no corrective retry — left for a follow-up). runtime.py only maps a true completed to TaskStatus.completed; anything else fails the run, so final_report can no longer be a non-deliverable. (cherry picked from commit 774b26824509678c953cd0200ef7a56c03f23201) * fix(swarm): thread shell tools + unify runs root The MCP run_swarm wrapper called start_run() without include_shell_tools, so stdio swarm workers silently lost bash and could not run the scripts their own execution rules mandate (P03-B). build_filtered_registry also dropped a requested-but-unavailable tool with no log, hiding the contradiction. Pass the server's shell-tool policy into start_run and warn on a dropped tool. Add swarm_runs_root() as the single source of truth for the run store location, shared by mcp_server and the path_utils run-dir allow-list so the two can no longer drift (P03-A: an installed-wheel layout had put every worker run_dir outside it). (cherry picked from commit 2975fda09b5ad44d15580d9330890f72f43c00ce) * fix(swarm): make store atomic write Windows-safe SwarmStore._atomic_write did tmp.replace(target) with no retry. On Windows os.replace raises PermissionError WinError 5/32 when the target is concurrently open by load_run on the poll path, crashing the worker thread and leaving the run stuck pending (P13). POSIX os.replace is atomic and never raises these. Add a WinError-scoped, bounded retry around the rename and a matching transient read/parse retry in load_run (the other side of the same race). Non-transient errors re-raise immediately; off-Windows the loop runs once, so POSIX behavior is unchanged. Pre-existing, not introduced by the P01/P03/P04 fixes — surfaced by E2E dogfooding. (cherry picked from commit 46a779cf436353591e7840879f8c32f763dde826) * style(swarm): drop unused imports (ruff F401) Remove an unused `from pathlib import Path` in the PR03 test added by 2975fda, and the pre-existing unused `from src.swarm.mailbox import Mailbox` in runtime.py (file already in this branch's diff). Net ruff delta for PR1-PR4 is now -1. The remaining mcp_server.py:41 E402 is pre-existing on upstream and structural — left out of scope. (cherry picked from commit e96f56af1ead9b5b7058c90ec0d4df738b45e473) * fix: redact internal absolute paths in errors Unknown preset / workspace-escape / missing run-or-task dir errors embedded the full install path (OS username, .venvs/site-packages topology) — CWE-209/497, reachable via MCP/API/agent/CLI (P10). Drop the absolute path while keeping the actionable bits: preset name + Available list, the workspace-root boundary, and the logical run/task id (basename only). Two test_path_safety assertions that matched the old leaking message are updated to the redacted wording (behavior unchanged — still raises and rejects). Verified-clean sites (caller's own input / env-var / names list) left untouched. (cherry picked from commit 69ea45295410cd25ea3079c25b7e3c4c73390b80) * fix(security): redact internal paths in errors Add agent/src/tools/redaction.py: anchored prefix redaction (no regex, idempotent, None-safe) that hides only known-internal root prefixes while keeping the relative tail for diagnosability. Wire it through the swarm error read boundaries (serialization, runtime, store) and the file tools' final broad except so a leaked absolute path cannot reach a user-facing string (CWE-209/497, P10). ValueError branches and the protected providers/llm.py are left untouched. (cherry picked from commit 21dc9cdfacf17cccd31cacfbe34481f62f1390af) * fix(security): redact swarm event error payloads --------- Co-authored-by: Haozhe Wu <haozhe_wu@connect.hku.hk>
1 parent 37d465b commit dfb106c

22 files changed

Lines changed: 1139 additions & 146 deletions

agent/mcp_server.py

Lines changed: 16 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -371,15 +371,17 @@ def run_swarm(preset_name: str, variables: dict[str, str]) -> str:
371371
"""
372372
import time
373373
from src.swarm.runtime import SwarmRuntime
374-
from src.swarm.store import SwarmStore
374+
from src.swarm.store import SwarmStore, swarm_runs_root
375375
from src.swarm.models import RunStatus
376376

377-
swarm_dir = AGENT_DIR / ".swarm" / "runs"
377+
swarm_dir = swarm_runs_root()
378378
store = SwarmStore(base_dir=swarm_dir)
379379
runtime = SwarmRuntime(store=store)
380380

381381
try:
382-
run = runtime.start_run(preset_name, variables)
382+
run = runtime.start_run(
383+
preset_name, variables, include_shell_tools=_include_shell_tools
384+
)
383385
except FileNotFoundError as exc:
384386
return json.dumps({"status": "error", "error": str(exc)}, ensure_ascii=False)
385387
except ValueError as exc:
@@ -392,16 +394,16 @@ def run_swarm(preset_name: str, variables: dict[str, str]) -> str:
392394
if current is None:
393395
return json.dumps({"status": "error", "error": "Run record lost"}, ensure_ascii=False)
394396
if current.status in (RunStatus.completed, RunStatus.failed, RunStatus.cancelled):
395-
tasks = [
396-
{"id": t.id, "agent_id": t.agent_id, "status": t.status.value, "summary": t.summary}
397-
for t in current.tasks
398-
]
397+
from src.swarm.serialization import run_level_error, serialize_task
398+
399+
tasks = [serialize_task(t) for t in current.tasks]
399400
return json.dumps(
400401
{
401402
"status": current.status.value,
402403
"preset": preset_name,
403404
"run_id": current.id,
404405
"final_report": current.final_report,
406+
"error": run_level_error(current),
405407
"tasks": tasks,
406408
"total_input_tokens": current.total_input_tokens,
407409
"total_output_tokens": current.total_output_tokens,
@@ -552,28 +554,23 @@ def get_market_data(
552554

553555

554556
def _get_swarm_store():
555-
swarm_dir = AGENT_DIR / ".swarm" / "runs"
556-
swarm_dir.mkdir(parents=True, exist_ok=True)
557-
from src.swarm.store import SwarmStore
557+
from src.swarm.store import SwarmStore, swarm_runs_root
558558

559+
swarm_dir = swarm_runs_root()
560+
swarm_dir.mkdir(parents=True, exist_ok=True)
559561
return SwarmStore(base_dir=swarm_dir)
560562

561563

562564
def _run_to_dict(run) -> dict:
565+
from src.swarm.serialization import run_level_error, serialize_task
566+
563567
return {
564568
"run_id": run.id,
565569
"status": run.status.value,
566570
"preset": run.preset_name,
567571
"created_at": run.created_at,
568-
"tasks": [
569-
{
570-
"id": t.id,
571-
"agent_id": t.agent_id,
572-
"status": t.status.value,
573-
"summary": t.summary,
574-
}
575-
for t in run.tasks
576-
],
572+
"error": run_level_error(run),
573+
"tasks": [serialize_task(t) for t in run.tasks],
577574
"final_report": run.final_report,
578575
"total_input_tokens": run.total_input_tokens,
579576
"total_output_tokens": run.total_output_tokens,

agent/src/swarm/models.py

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,23 @@ class RunStatus(str, Enum):
4040
cancelled = "cancelled"
4141

4242

43+
class WorkerStatus(str, Enum):
44+
"""Terminal status a worker returns.
45+
46+
``incomplete`` is distinct from ``failed``: the worker ran without an
47+
exception but produced no substantive deliverable (plan-only stub,
48+
fabricated/mock numbers, unparsed tool markup, or a data agent that
49+
made no tool call and wrote no report). It must never be folded into
50+
``completed`` (see P01/P03).
51+
"""
52+
53+
completed = "completed"
54+
failed = "failed"
55+
timeout = "timeout"
56+
token_limit = "token_limit"
57+
incomplete = "incomplete"
58+
59+
4360
class SwarmAgentSpec(BaseModel):
4461
"""Role definition for a single agent in a Swarm.
4562
@@ -203,7 +220,7 @@ class WorkerResult(BaseModel):
203220
"""Return value after worker execution completes.
204221
205222
Attributes:
206-
status: "completed" or "failed".
223+
status: WorkerStatus — completed|failed|timeout|token_limit|incomplete.
207224
summary: Execution summary.
208225
artifact_paths: List of generated artifact file paths.
209226
iterations: Actual ReAct iterations executed.
@@ -212,7 +229,7 @@ class WorkerResult(BaseModel):
212229
output_tokens: Cumulative output tokens (exact or estimated).
213230
"""
214231

215-
status: str
232+
status: WorkerStatus
216233
summary: str
217234
artifact_paths: list[str] = Field(default_factory=list)
218235
iterations: int = 0

agent/src/swarm/presets.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ def load_preset(name: str) -> dict:
3939
if not path.exists():
4040
available = [p.stem for p in PRESETS_DIR.glob("*.yaml")] if PRESETS_DIR.exists() else []
4141
raise FileNotFoundError(
42-
f"Preset '{name}' not found at {path}. Available: {available}"
42+
f"Preset {name!r} not found. Available: {available}"
4343
)
4444
return yaml.safe_load(path.read_text(encoding="utf-8"))
4545

agent/src/swarm/runtime.py

Lines changed: 65 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
from typing import Callable
2222

2323
from src.swarm import grounding
24-
from src.swarm.mailbox import Mailbox
2524
from src.swarm.models import (
2625
RunStatus,
2726
SwarmAgentSpec,
@@ -39,6 +38,7 @@
3938
topological_layers,
4039
validate_dag,
4140
)
41+
from src.tools.redaction import redact_internal_paths
4242
from src.swarm.worker import run_worker
4343

4444
logger = logging.getLogger(__name__)
@@ -269,11 +269,12 @@ def _execute_run(
269269
run.total_input_tokens += result.input_tokens
270270
run.total_output_tokens += result.output_tokens
271271

272-
if result.status in ("completed", "timeout", "token_limit"):
272+
if result.status == "completed":
273273
task_summaries[tid] = result.summary
274274
now_iso = datetime.now(timezone.utc).isoformat()
275275
task_store.update_status(
276-
tid, TaskStatus.completed,
276+
tid,
277+
TaskStatus.completed,
277278
summary=result.summary,
278279
completed_at=now_iso,
279280
artifacts=result.artifact_paths,
@@ -282,41 +283,51 @@ def _execute_run(
282283
resolve_dependencies(run_dir / "tasks", tid)
283284
self._emit_event(
284285
run_id,
285-
self._make_event("task_completed", task_id=tid,
286-
data={"status": result.status,
287-
"iterations": result.iterations,
288-
"input_tokens": result.input_tokens,
289-
"output_tokens": result.output_tokens}),
286+
self._make_event(
287+
"task_completed",
288+
task_id=tid,
289+
data={
290+
"status": result.status,
291+
"iterations": result.iterations,
292+
"input_tokens": result.input_tokens,
293+
"output_tokens": result.output_tokens,
294+
},
295+
),
290296
)
291297
else:
292298
all_succeeded = False
293299
task_store.update_status(
294-
tid, TaskStatus.failed,
295-
error=result.error or "Unknown error",
300+
tid,
301+
TaskStatus.failed,
302+
error=redact_internal_paths(result.error)
303+
or f"worker did not complete (status={result.status})",
296304
completed_at=datetime.now(timezone.utc).isoformat(),
297305
worker_iterations=result.iterations,
298306
)
299307
self._emit_event(
300308
run_id,
301-
self._make_event("task_failed", task_id=tid,
302-
data={"error": result.error,
303-
"input_tokens": result.input_tokens,
304-
"output_tokens": result.output_tokens}),
309+
self._make_event(
310+
"task_failed",
311+
task_id=tid,
312+
data={
313+
"error": redact_internal_paths(result.error),
314+
"input_tokens": result.input_tokens,
315+
"output_tokens": result.output_tokens,
316+
},
317+
),
305318
)
306319

307320
except Exception as exc:
308321
logger.error("Run %s failed with exception", run_id, exc_info=True)
309322
all_succeeded = False
310323
self._emit_event(
311324
run_id,
312-
self._make_event("run_error", data={"error": str(exc)}),
325+
self._make_event("run_error", data={"error": redact_internal_paths(str(exc))}),
313326
)
314327

315328
# Finalize run
316329
final_status = (
317-
RunStatus.cancelled if cancel_event.is_set()
318-
else RunStatus.completed if all_succeeded
319-
else RunStatus.failed
330+
RunStatus.cancelled if cancel_event.is_set() else RunStatus.completed if all_succeeded else RunStatus.failed
320331
)
321332
run.status = final_status
322333
run.completed_at = datetime.now(timezone.utc).isoformat()
@@ -350,7 +361,9 @@ def _prefetch_grounding_data(self, run: SwarmRun) -> None:
350361
if len(symbols) > symbol_limit:
351362
logger.warning(
352363
"grounding: limiting run %s symbols from %d to %d",
353-
run.id, len(symbols), symbol_limit,
364+
run.id,
365+
len(symbols),
366+
symbol_limit,
354367
)
355368
symbols = symbols[:symbol_limit]
356369

@@ -359,7 +372,9 @@ def _prefetch_grounding_data(self, run: SwarmRun) -> None:
359372
except Exception:
360373
logger.warning(
361374
"grounding: pre-fetch failed for run %s symbols=%s",
362-
run.id, symbols, exc_info=True,
375+
run.id,
376+
symbols,
377+
exc_info=True,
363378
)
364379
return
365380

@@ -416,14 +431,16 @@ def _event_callback(event: SwarmEvent) -> None:
416431
agent_spec = agent_map.get(task.agent_id)
417432
if agent_spec is None:
418433
results[tid] = WorkerResult(
419-
status="failed", summary="",
434+
status="failed",
435+
summary="",
420436
error=f"Agent '{task.agent_id}' not found in preset",
421437
)
422438
continue
423439

424440
# Mark task as in_progress
425441
task_store.update_status(
426-
tid, TaskStatus.in_progress,
442+
tid,
443+
TaskStatus.in_progress,
427444
started_at=datetime.now(timezone.utc).isoformat(),
428445
)
429446
self._emit_event(
@@ -467,7 +484,8 @@ def _event_callback(event: SwarmEvent) -> None:
467484
except Exception as exc:
468485
logger.error("Worker for task %s raised exception", tid, exc_info=True)
469486
results[tid] = WorkerResult(
470-
status="failed", summary="",
487+
status="failed",
488+
summary="",
471489
error=str(exc),
472490
)
473491
except FuturesTimeoutError:
@@ -477,10 +495,12 @@ def _event_callback(event: SwarmEvent) -> None:
477495
pending.cancel()
478496
logger.error(
479497
"Worker for task %s exceeded layer deadline (%ds)",
480-
tid, layer_deadline,
498+
tid,
499+
layer_deadline,
481500
)
482501
results[tid] = WorkerResult(
483-
status="timeout", summary="",
502+
status="timeout",
503+
summary="",
484504
error=f"Worker exceeded layer deadline of {layer_deadline}s",
485505
)
486506
except KeyboardInterrupt:
@@ -539,13 +559,18 @@ def _run_worker_with_retries(
539559
"task_retry",
540560
agent_id=agent_spec.id,
541561
task_id=task.id,
542-
data={"attempt": attempt + 1, "max_retries": max_retries,
543-
"previous_error": result.error if result else None},
562+
data={
563+
"attempt": attempt + 1,
564+
"max_retries": max_retries,
565+
"previous_error": result.error if result else None,
566+
},
544567
),
545568
)
546569
logger.info(
547570
"Retrying task %s (attempt %d/%d)",
548-
task.id, attempt + 1, max_retries + 1,
571+
task.id,
572+
attempt + 1,
573+
max_retries + 1,
549574
)
550575

551576
result = run_worker(
@@ -564,18 +589,22 @@ def _run_worker_with_retries(
564589

565590
if result.status != "failed":
566591
# Success (or timeout/token_limit/completed) — no more retries
567-
result = result.model_copy(update={
568-
"input_tokens": cumulative_input_tokens,
569-
"output_tokens": cumulative_output_tokens,
570-
})
592+
result = result.model_copy(
593+
update={
594+
"input_tokens": cumulative_input_tokens,
595+
"output_tokens": cumulative_output_tokens,
596+
}
597+
)
571598
return result
572599

573600
# All retries exhausted, return the last failed result with cumulative tokens
574601
if result is not None:
575-
result = result.model_copy(update={
576-
"input_tokens": cumulative_input_tokens,
577-
"output_tokens": cumulative_output_tokens,
578-
})
602+
result = result.model_copy(
603+
update={
604+
"input_tokens": cumulative_input_tokens,
605+
"output_tokens": cumulative_output_tokens,
606+
}
607+
)
579608
return result # type: ignore[return-value]
580609

581610
def _cancel_remaining_tasks(

agent/src/swarm/serialization.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
"""Shared serialization helpers for the swarm read boundaries.
2+
3+
Single source of truth for projecting a :class:`SwarmTask` into the per-task
4+
JSON dict returned by the MCP tools (``run_swarm`` / ``get_swarm_status`` /
5+
``get_run_result``) and the in-process ``run_swarm`` agent tool.
6+
7+
Before this module each boundary hand-maintained its own field allowlist and
8+
all three silently omitted ``SwarmTask.error``: a misconfigured provider
9+
produced ``status="failed"`` with no diagnosable reason anywhere the caller
10+
could see, even though the error was captured on disk (see P04).
11+
"""
12+
13+
from __future__ import annotations
14+
15+
from typing import Any
16+
17+
from src.tools.redaction import redact_internal_paths
18+
19+
20+
def serialize_task(task: Any) -> dict:
21+
"""Project a SwarmTask into its public per-task dict.
22+
23+
``error`` and ``iterations`` are always included so a failed or degraded
24+
task is diagnosable from every read path, not only the on-disk artifacts.
25+
"""
26+
status = task.status.value if hasattr(task.status, "value") else str(task.status)
27+
return {
28+
"id": task.id,
29+
"agent_id": task.agent_id,
30+
"status": status,
31+
"summary": task.summary,
32+
"iterations": getattr(task, "worker_iterations", 0),
33+
"error": redact_internal_paths(getattr(task, "error", None)) or None,
34+
}
35+
36+
37+
def run_level_error(run: Any) -> str | None:
38+
"""First failed task's error, for a top-level ``error`` field.
39+
40+
Returns ``None`` (an explicit null, not an absent key) when no task carries
41+
an error, so a caller that only reads the top level still gets a signal.
42+
"""
43+
for task in getattr(run, "tasks", None) or []:
44+
err = getattr(task, "error", None)
45+
if err:
46+
return f"{task.id}/{task.agent_id}: {redact_internal_paths(err)}"
47+
return None

0 commit comments

Comments
 (0)