Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,18 @@ def _get_service_data_info(service_id: str) -> dict | None:
_AGENT_LOG_TIMEOUT = 30 # seconds — log fetches should be fast


def _fetch_agent_logs(url: str, headers: dict, data: bytes, timeout: int) -> str:
"""Blocking POST to host agent that returns the response body as text.

Extracted so async handlers can offload the urllib call via
``asyncio.to_thread``. urllib.error.HTTPError / URLError raised inside
propagate back to the caller and are handled there.
"""
req = urllib.request.Request(url, data=data, headers=headers, method="POST")
with urllib.request.urlopen(req, timeout=timeout) as resp:
return resp.read().decode()


def _call_agent(action: str, service_id: str) -> bool:
"""Call host agent to start/stop a service. Returns True on success."""
url = f"{AGENT_URL}/v1/extension/{action}"
Expand All @@ -487,8 +499,11 @@ def _call_agent(action: str, service_id: str) -> bool:
try:
with urllib.request.urlopen(req, timeout=_AGENT_TIMEOUT) as resp:
return resp.status == 200
except Exception:
logger.warning("Host agent unreachable at %s — fallback to restart_required", AGENT_URL)
except (urllib.error.URLError, urllib.error.HTTPError, OSError, TimeoutError) as exc:
logger.warning(
"Host agent unreachable at %s — fallback to restart_required: %s",
AGENT_URL, exc,
)
return False


Expand All @@ -503,9 +518,10 @@ def _call_agent_invalidate_compose_cache() -> None:
logger.warning(
"compose-flags cache invalidation returned HTTP %d", resp.status,
)
except Exception:
except (urllib.error.URLError, urllib.error.HTTPError, OSError, TimeoutError) as exc:
logger.warning(
"Host agent unreachable for compose-flags invalidation at %s", AGENT_URL,
"Host agent unreachable for compose-flags invalidation at %s: %s",
AGENT_URL, exc,
)


Expand Down Expand Up @@ -583,8 +599,10 @@ def _call_agent_compose_rename(action: str, service_id: str) -> bool:
try:
with urllib.request.urlopen(req, timeout=_AGENT_LOG_TIMEOUT) as resp:
return resp.status == 200
except Exception:
logger.warning("Host agent unreachable for compose rename at %s", AGENT_URL)
except (urllib.error.URLError, urllib.error.HTTPError, OSError, TimeoutError) as exc:
logger.warning(
"Host agent unreachable for compose rename at %s: %s", AGENT_URL, exc,
)
return False


Expand All @@ -603,7 +621,7 @@ def _check_agent_health() -> bool:
req = urllib.request.Request(f"{AGENT_URL}/health")
with urllib.request.urlopen(req, timeout=3) as resp:
available = resp.status == 200
except Exception:
except (urllib.error.URLError, urllib.error.HTTPError, OSError, TimeoutError):
available = False
with _agent_cache_lock:
_agent_cache.update(available=available, checked_at=time.monotonic())
Expand Down Expand Up @@ -645,7 +663,16 @@ async def extensions_catalog(
api_key: str = Depends(verify_api_key),
):
"""Get the extensions catalog with computed status."""
asyncio.get_running_loop().run_in_executor(None, _cleanup_stale_progress)
_cleanup_future = asyncio.get_running_loop().run_in_executor(
None, _cleanup_stale_progress,
)

def _log_cleanup_error(f: asyncio.Future) -> None:
exc = f.exception()
if exc is not None:
logger.error("stale-progress cleanup failed: %s", exc, exc_info=exc)

_cleanup_future.add_done_callback(_log_cleanup_error)

from helpers import get_cached_services, get_all_services

Expand Down Expand Up @@ -752,12 +779,14 @@ async def extensions_catalog(
except OSError:
lib_available = False

agent_available = await asyncio.to_thread(_check_agent_health)

return {
"extensions": extensions,
"summary": summary,
"gpu_backend": GPU_BACKEND,
"library_available": lib_available,
"agent_available": _check_agent_health(),
"agent_available": agent_available,
}


Expand Down Expand Up @@ -863,10 +892,11 @@ async def extension_logs(
"Authorization": f"Bearer {DREAM_AGENT_KEY}",
}
data = json.dumps({"service_id": service_id, "tail": 100}).encode()
req = urllib.request.Request(url, data=data, headers=headers, method="POST")
try:
with urllib.request.urlopen(req, timeout=_AGENT_LOG_TIMEOUT) as resp:
return json.loads(resp.read().decode())
body = await asyncio.to_thread(
_fetch_agent_logs, url, headers, data, _AGENT_LOG_TIMEOUT,
)
return json.loads(body)
except urllib.error.HTTPError as exc:
try:
err_body = json.loads(exc.read().decode())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2278,3 +2278,63 @@ def test_assert_not_core_blocks_always_on(self, service_id):
with pytest.raises(HTTPException) as exc_info:
_assert_not_core(service_id)
assert exc_info.value.status_code == 403


class TestCallAgentErrorNarrowing:
"""_call_agent swallows network errors but not programmer errors."""

def test_call_agent_returns_false_on_urlerror(self, monkeypatch, caplog):
"""Network failures produce (False, warning) — callers rely on this."""
import logging
import urllib.error
from routers import extensions as ext_module

def _raise(*_args, **_kwargs):
raise urllib.error.URLError("timeout")

monkeypatch.setattr(ext_module.urllib.request, "urlopen", _raise)
with caplog.at_level(logging.WARNING, logger="routers.extensions"):
result = ext_module._call_agent("start", "svc-x")

assert result is False
assert any("Host agent unreachable" in r.message for r in caplog.records)

def test_call_agent_reraises_non_network_errors(self, monkeypatch):
"""Programmer errors (e.g. AttributeError) must not be swallowed."""
from routers import extensions as ext_module

def _raise(*_args, **_kwargs):
raise AttributeError("boom")

monkeypatch.setattr(ext_module.urllib.request, "urlopen", _raise)
with pytest.raises(AttributeError):
ext_module._call_agent("start", "svc-x")

def test_catalog_logs_when_cleanup_future_fails(
self, test_client, monkeypatch, tmp_path, caplog,
):
"""Stale-progress cleanup failures are logged, not lost to fire-and-forget."""
import logging

catalog = [_make_catalog_ext("test-svc", "Test Service")]
_patch_extensions_config(monkeypatch, catalog, tmp_path=tmp_path)

def _boom():
raise RuntimeError("cleanup exploded")

monkeypatch.setattr(
"routers.extensions._cleanup_stale_progress", _boom,
)

with caplog.at_level(logging.ERROR, logger="routers.extensions"):
with patch("helpers.get_all_services", new_callable=AsyncMock,
return_value=[]):
resp = test_client.get(
"/api/extensions/catalog",
headers=test_client.auth_headers,
)

assert resp.status_code == 200
assert any(
"stale-progress cleanup failed" in r.message for r in caplog.records
)
Loading