Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 31 additions & 3 deletions dream-server/bin/dream-host-agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,18 @@ def _post_install_core_recreate(service_id: str) -> None:
"Post-install recreate of open-webui failed after openclaw install: %s",
err,
)
# Surface the silent failure into the progress file so the dashboard
# can show a follow-up toast. Status stays "started" — openclaw
# itself IS running; the warning communicates that the open-webui
# overlay won't take effect until a manual restart.
_write_progress(
service_id,
status="started",
warnings=[
f"Installed, but post-install recreate of open-webui failed: "
f"{err}. Run 'dream restart' to retry.",
],
)


def _parse_mem_value(s: str) -> float:
Expand All @@ -395,19 +407,32 @@ def _iso_now() -> str:


def _write_progress(service_id: str, status: str, phase_label: str = "",
error: str | None = None) -> None:
"""Atomically write install progress file."""
error: str | None = None,
warnings: list[str] | None = None) -> None:
"""Atomically write install progress file.

``warnings`` is an optional list of non-fatal messages surfaced alongside
a terminal status (e.g. ``"started"``). Used by callers like
``_post_install_core_recreate`` to flag that the install itself
succeeded but a follow-up step (overlay re-apply) silently failed. The
field is omitted from the JSON when no warnings are present so existing
consumers keep their current shape.
"""
progress_dir = DATA_DIR / "extension-progress"
progress_dir.mkdir(parents=True, exist_ok=True)
progress_file = progress_dir / f"{service_id}.json"
tmp_file = progress_file.with_suffix(".json.tmp")

# Preserve started_at from existing file
# Preserve started_at and any pre-existing warnings from existing file
started_at = _iso_now()
existing_warnings: list[str] = []
if progress_file.exists():
try:
existing = json.loads(progress_file.read_text(encoding="utf-8"))
started_at = existing.get("started_at", started_at)
prior = existing.get("warnings")
if isinstance(prior, list):
existing_warnings = [w for w in prior if isinstance(w, str)]
except (json.JSONDecodeError, OSError):
pass

Expand All @@ -421,6 +446,9 @@ def _write_progress(service_id: str, status: str, phase_label: str = "",
"started_at": started_at,
"updated_at": _iso_now(),
}
merged_warnings = existing_warnings + list(warnings or [])
if merged_warnings:
data["warnings"] = merged_warnings
tmp_file.write_text(json.dumps(data), encoding="utf-8")
os.rename(str(tmp_file), str(progress_file))

Expand Down
116 changes: 95 additions & 21 deletions dream-server/extensions/services/dashboard-api/routers/extensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@
_SERVICE_ID_RE = re.compile(r"^[a-z0-9][a-z0-9_-]*$")
_MAX_EXTENSION_BYTES = 50 * 1024 * 1024 # 50 MB

# Per-probe budget for unmonitored user extensions (no /health endpoint).
# Probes run concurrently via asyncio.gather, so total catalog cost stays
# bounded even with many unmonitored exts.
_UNMONITORED_PROBE_TIMEOUT = 0.5


def _is_stale(iso_timestamp: str, max_age_seconds: int) -> bool:
"""Check if an ISO timestamp is older than max_age_seconds."""
Expand Down Expand Up @@ -141,6 +146,53 @@ def _sync_extension_config(service_id: str) -> bool:
return _call_agent_sync_config(service_id)


async def _probe_unmonitored_user_ext(sid: str, cfg: dict):
"""TCP-connect probe for user extensions that declare no /health endpoint.

Returns a ``ServiceStatus`` on connect-success, ``None`` on any failure
(timeout / connection refused / DNS / OS error). A ``None`` result is
intentional: it leaves the extension out of ``services_by_id`` so
``_compute_extension_status`` falls through to the ``stopped`` branch
rather than reporting a bogus ``healthy``.

The probe budget is intentionally tight (``_UNMONITORED_PROBE_TIMEOUT``);
callers run probes concurrently via ``asyncio.gather`` so the catalog
response stays well under the 8 s frontend abort.
"""
from models import ServiceStatus

host = cfg.get("host") or "127.0.0.1"
port = cfg.get("port") or 0
if not port:
return None

start = time.monotonic()
try:
_reader, writer = await asyncio.wait_for(
asyncio.open_connection(host, port),
timeout=_UNMONITORED_PROBE_TIMEOUT,
)
except (asyncio.TimeoutError, OSError):
return None

try:
writer.close()
with contextlib.suppress(OSError):
await writer.wait_closed()
except OSError:
pass

elapsed_ms = (time.monotonic() - start) * 1000.0
return ServiceStatus(
id=sid,
name=cfg.get("name", sid),
port=port,
external_port=cfg.get("external_port", port),
status="healthy",
response_time_ms=elapsed_ms,
)


def _compute_extension_status(ext: dict, services_by_id: dict) -> str:
"""Compute the runtime status of an extension."""
ext_id = ext["id"]
Expand Down Expand Up @@ -821,17 +873,23 @@ def _log_cleanup_error(f: asyncio.Future) -> None:
if not isinstance(result, BaseException):
services_by_id[sid] = result

# Extensions without health endpoints — assume running if scanned
# (presence in user_svc_configs means compose.yaml + manifest exist)
from models import ServiceStatus
for sid, cfg in user_svc_configs.items():
if not cfg.get("health") and sid not in services_by_id:
services_by_id[sid] = ServiceStatus(
id=sid, name=cfg.get("name", sid),
port=cfg.get("port", 0),
external_port=cfg.get("external_port", cfg.get("port", 0)),
status="healthy", response_time_ms=None,
)
# Extensions without a /health endpoint: TCP-probe their port concurrently
# so we can distinguish a running container ("enabled") from one that's not
# running ("stopped"). Probes that fail (timeout / refused / DNS) drop out
# of services_by_id and fall through to the "stopped" branch.
unmonitored = {
sid: cfg for sid, cfg in user_svc_configs.items()
if not cfg.get("health") and sid not in services_by_id
}
if unmonitored:
probe_results = await asyncio.gather(
*(_probe_unmonitored_user_ext(sid, cfg)
for sid, cfg in unmonitored.items()),
return_exceptions=True,
)
for sid, result in zip(unmonitored.keys(), probe_results):
if not isinstance(result, BaseException) and result is not None:
services_by_id[sid] = result

extensions = []
for ext in EXTENSION_CATALOG:
Expand Down Expand Up @@ -964,15 +1022,20 @@ async def extension_detail(
if not isinstance(result, BaseException):
services_by_id[sid] = result

from models import ServiceStatus
for sid, cfg in user_svc_configs.items():
if not cfg.get("health") and sid not in services_by_id:
services_by_id[sid] = ServiceStatus(
id=sid, name=cfg.get("name", sid),
port=cfg.get("port", 0),
external_port=cfg.get("external_port", cfg.get("port", 0)),
status="healthy", response_time_ms=None,
)
# Same TCP-probe fan-out as the catalog endpoint for unmonitored exts.
unmonitored = {
sid: cfg for sid, cfg in user_svc_configs.items()
if not cfg.get("health") and sid not in services_by_id
}
if unmonitored:
probe_results = await asyncio.gather(
*(_probe_unmonitored_user_ext(sid, cfg)
for sid, cfg in unmonitored.items()),
return_exceptions=True,
)
for sid, result in zip(unmonitored.keys(), probe_results):
if not isinstance(result, BaseException) and result is not None:
services_by_id[sid] = result

status = _compute_extension_status(ext, services_by_id)
installable = _is_installable(service_id)
Expand Down Expand Up @@ -1404,10 +1467,16 @@ def enable_extension(
if enabled_services:
_call_agent_invalidate_compose_cache()

# Start all enabled services via agent (outside lock)
# Start all enabled services via agent (outside lock).
# Mirror the stopped-path pattern: write an initial progress record so
# the dashboard's poll-loop has something to track, then surface a clear
# error progress on agent failure. Without this, the frontend polls
# /api/extensions/<sid>/progress, gets {"status": "idle"} forever, and
# never reaches a terminal toast.
agent_ok = True
warnings: list[str] = []
for svc_id in enabled_services:
_write_initial_progress(svc_id)
# pre_start failure is terminal for this service — do not start it
if not _call_agent_hook(svc_id, "pre_start"):
agent_ok = False
Expand All @@ -1418,6 +1487,11 @@ def enable_extension(
continue
if not _call_agent("start", svc_id):
agent_ok = False
_write_error_progress(
svc_id,
"Host agent failed to start extension. Run 'dream restart' to recover.",
)
continue
# post_start is non-terminal — log failure but don't fail the enable
if not _call_agent_hook(svc_id, "post_start"):
logger.warning("post_start hook failed for %s (non-fatal)", svc_id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2115,6 +2115,127 @@ def test_install_error_progress_includes_restart_guidance(
assert data["status"] == "error"
assert "dream restart" in data["error"]

def test_unmonitored_user_ext_probe_refused_falls_through_to_stopped(
self, test_client, monkeypatch, tmp_path,
):
"""User extension with no /health endpoint: TCP probe refused → stopped.

Regression for #511 — previously the catalog synthesised a synthetic
``healthy`` ServiceStatus for any unmonitored user extension that had
a compose.yaml on disk, regardless of whether the container was
actually running. TCP-connect probe distinguishes "running" from
"stopped".
"""
user_dir = tmp_path / "user"
ext_dir = user_dir / "my-ext"
ext_dir.mkdir(parents=True)
(ext_dir / "compose.yaml").write_text(_SAFE_COMPOSE)
(ext_dir / "manifest.yaml").write_text(yaml.dump({
"schema_version": "dream.services.v1",
"service": {"id": "my-ext", "name": "My Ext", "port": 8080},
}))

catalog = [_make_catalog_ext("my-ext", "My Extension")]
_patch_extensions_config(monkeypatch, catalog, tmp_path=tmp_path)
monkeypatch.setattr("routers.extensions.USER_EXTENSIONS_DIR", user_dir)

async def _refuse(*_args, **_kwargs):
raise ConnectionRefusedError("port closed")

with patch("user_extensions.get_user_services_cached",
return_value={"my-ext": {"host": "127.0.0.1", "port": 8080,
"name": "My Ext"}}):
with patch("helpers.get_all_services", new_callable=AsyncMock,
return_value=[]):
with patch("routers.extensions.asyncio.open_connection",
side_effect=_refuse):
resp = test_client.get(
"/api/extensions/catalog",
headers=test_client.auth_headers,
)

assert resp.status_code == 200
ext = resp.json()["extensions"][0]
assert ext["status"] == "stopped"

def test_unmonitored_user_ext_probe_success_reports_enabled(
self, test_client, monkeypatch, tmp_path,
):
"""User extension with no /health endpoint: TCP probe succeeds → enabled."""
user_dir = tmp_path / "user"
ext_dir = user_dir / "my-ext"
ext_dir.mkdir(parents=True)
(ext_dir / "compose.yaml").write_text(_SAFE_COMPOSE)
(ext_dir / "manifest.yaml").write_text(yaml.dump({
"schema_version": "dream.services.v1",
"service": {"id": "my-ext", "name": "My Ext", "port": 8080},
}))

catalog = [_make_catalog_ext("my-ext", "My Extension")]
_patch_extensions_config(monkeypatch, catalog, tmp_path=tmp_path)
monkeypatch.setattr("routers.extensions.USER_EXTENSIONS_DIR", user_dir)

class _FakeWriter:
def close(self):
pass

async def wait_closed(self):
return None

async def _accept(*_args, **_kwargs):
return (object(), _FakeWriter())

with patch("user_extensions.get_user_services_cached",
return_value={"my-ext": {"host": "127.0.0.1", "port": 8080,
"name": "My Ext"}}):
with patch("helpers.get_all_services", new_callable=AsyncMock,
return_value=[]):
with patch("routers.extensions.asyncio.open_connection",
side_effect=_accept):
resp = test_client.get(
"/api/extensions/catalog",
headers=test_client.auth_headers,
)

assert resp.status_code == 200
ext = resp.json()["extensions"][0]
assert ext["status"] == "enabled"

def test_enable_disabled_writes_error_progress_on_agent_start_failure(
self, test_client, monkeypatch, tmp_path,
):
"""Activate-from-disabled path writes error progress when host agent
fails to start the service.

Regression: previously the loop set ``agent_ok = False`` but never
wrote an error progress file, so the dashboard's poll-loop got
``{"status": "idle"}`` forever and no terminal toast.
"""
user_dir = _setup_user_ext(tmp_path, "my-ext", enabled=False)
_patch_mutation_config(monkeypatch, tmp_path, user_dir=user_dir)
monkeypatch.setattr("routers.extensions._call_agent_hook",
lambda sid, hook: True)
monkeypatch.setattr("routers.extensions._call_agent_invalidate_compose_cache",
lambda: None)
monkeypatch.setattr("routers.extensions._call_agent",
lambda action, sid: False)

resp = test_client.post(
"/api/extensions/my-ext/enable",
headers=test_client.auth_headers,
)

assert resp.status_code == 200
assert resp.json()["restart_required"] is True

progress_file = Path(tmp_path) / "extension-progress" / "my-ext.json"
assert progress_file.exists(), (
"activate path must write progress on agent start failure"
)
data = json.loads(progress_file.read_text())
assert data["status"] == "error"
assert "dream restart" in data["error"]

def test_enable_stopped_rejects_malicious_compose(self, test_client, monkeypatch, tmp_path):
"""Enable stopped ext with malicious compose.yaml → 400."""
bad_compose = "services:\n svc:\n image: test\n privileged: true\n"
Expand Down
Loading
Loading