diff --git a/dream-server/bin/dream-host-agent.py b/dream-server/bin/dream-host-agent.py index 73d6b15cf..21fc7642b 100755 --- a/dream-server/bin/dream-host-agent.py +++ b/dream-server/bin/dream-host-agent.py @@ -80,6 +80,8 @@ def _to_bash_path(path: Path) -> str: # Model download state — only one download at a time _model_download_lock = threading.Lock() _model_download_thread: threading.Thread | None = None +_model_download_proc: subprocess.Popen | None = None +_model_download_cancel = threading.Event() # Model activation lock — prevent concurrent .env writes and Docker restarts _model_activate_lock = threading.Lock() @@ -597,6 +599,8 @@ def do_POST(self): self._handle_service_logs() elif self.path == "/v1/model/download": self._handle_model_download() + elif self.path == "/v1/model/download/cancel": + self._handle_model_download_cancel() elif self.path == "/v1/model/activate": self._handle_model_activate() elif self.path == "/v1/model/delete": @@ -1312,8 +1316,10 @@ def _handle_model_download(self): json_response(self, 409, {"error": "Another download is in progress"}) return + _model_download_cancel.clear() + def _download(): - import time as _time + global _model_download_proc status_path = INSTALL_DIR / "data" / "model-download-status.json" try: models_dir.mkdir(parents=True, exist_ok=True) @@ -1321,6 +1327,8 @@ def _download(): _write_model_status(status_path, "downloading", label, 0, 0) for part_idx, (part_file_name, part_url) in enumerate(download_plan, 1): + if _model_download_cancel.is_set(): + break part_target = models_dir / part_file_name part_tmp = models_dir / f"{part_file_name}.part" part_label = part_file_name if len(download_plan) == 1 else f"{part_file_name} (part {part_idx}/{len(download_plan)})" @@ -1343,11 +1351,19 @@ def _download(): _write_model_status(status_path, "downloading", part_label, 0, part_total) - # Progress polling: update status by checking .part file size + # Progress polling: update status by checking .part file size. + # Also kills the active curl process when cancel is requested. _stop_progress = threading.Event() def _poll_progress(): while not _stop_progress.is_set(): + if _model_download_cancel.is_set(): + proc_ref = _model_download_proc + if proc_ref is not None: + try: + proc_ref.kill() + except (OSError, AttributeError): + pass try: if part_tmp.exists(): current = part_tmp.stat().st_size @@ -1359,18 +1375,32 @@ def _poll_progress(): progress_thread = threading.Thread(target=_poll_progress, daemon=True) progress_thread.start() - # Download with retry + # Download with retry. Use Popen (not run) so the process can + # be killed from the cancel handler or _poll_progress thread. success = False for attempt in range(1, 4): + if _model_download_cancel.is_set(): + break if attempt > 1: logger.info("Model download retry %d/3 for %s", attempt, part_file_name) - _time.sleep(5) - result = subprocess.run( + # Use wait() instead of sleep() so cancel is honored immediately + _model_download_cancel.wait(5) + proc = subprocess.Popen( ["curl", "-fSL", "-C", "-", "--connect-timeout", "30", "-o", str(part_tmp), part_url], - capture_output=True, text=True, timeout=14400, + stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) - if result.returncode == 0: + _model_download_proc = proc + try: + proc.wait(timeout=14400) + except subprocess.TimeoutExpired: + proc.kill() + proc.wait(timeout=5) + _model_download_proc = None + + if _model_download_cancel.is_set(): + break + if proc.returncode == 0: _stop_progress.set() part_tmp.rename(part_target) success = True @@ -1378,6 +1408,13 @@ def _poll_progress(): _write_model_status(status_path, "downloading", part_label, 0, part_total, f"Retry {attempt}/3") _stop_progress.set() + progress_thread.join(timeout=3) + + if _model_download_cancel.is_set(): + part_tmp.unlink(missing_ok=True) + _write_model_status(status_path, "cancelled", gguf_file, 0, 0, "Download cancelled by user") + logger.info("Model download cancelled: %s", gguf_file) + return if not success: part_tmp.unlink(missing_ok=True) @@ -1390,6 +1427,9 @@ def _poll_progress(): # entry. Empty checksum -> warn (do not silently skip), so # missing catalog entries surface during operator review. import hashlib + if _model_download_cancel.is_set(): + _write_model_status(status_path, "cancelled", gguf_file, 0, 0, "Download cancelled by user") + return for part_idx, (part_file_name, _) in enumerate(download_plan, 1): expected = expected_sha_by_file.get(part_file_name, "") final_target = models_dir / part_file_name @@ -1434,6 +1474,25 @@ def _poll_progress(): json_response(self, 200, {"status": "started"}) + def _handle_model_download_cancel(self): + """Cancel an in-progress model download.""" + if not check_auth(self): + return + with _model_download_lock: + if _model_download_thread is None or not _model_download_thread.is_alive(): + json_response(self, 200, {"status": "no_download"}) + return + _model_download_cancel.set() + # Capture local reference to avoid TOCTOU race — the download thread + # may null out _model_download_proc between the check and kill. + proc_ref = _model_download_proc + if proc_ref is not None: + try: + proc_ref.kill() + except (OSError, AttributeError): + pass + json_response(self, 200, {"status": "cancelling"}) + def _handle_model_activate(self): """Swap active model: update .env + models.ini + restart llama-server.""" if not check_auth(self): @@ -1458,6 +1517,7 @@ def _handle_model_activate(self): def _do_model_activate(self, model_id: str): """Inner activate logic — called with _model_activate_lock held.""" + import time # Look up model in library library_path = INSTALL_DIR / "config" / "model-library.json" model = None @@ -1477,9 +1537,14 @@ def _do_model_activate(self, model_id: str): gguf_file = model.get("gguf_file", "") llm_model_name = model.get("llm_model_name", model_id) context_length = model.get("context_length", 32768) + llama_server_image = model.get("llama_server_image") - # Verify GGUF exists on disk - target = INSTALL_DIR / "data" / "models" / gguf_file + # Verify GGUF exists on disk (with path traversal protection) + models_dir = INSTALL_DIR / "data" / "models" + target = (models_dir / gguf_file).resolve() + if not target.is_relative_to(models_dir.resolve()): + json_response(self, 400, {"error": "Invalid model file path"}) + return if not target.exists(): json_response(self, 400, {"error": f"Model file not downloaded: {gguf_file}"}) return @@ -1489,6 +1554,10 @@ def _do_model_activate(self, model_id: str): lemonade_yaml = INSTALL_DIR / "config" / "litellm" / "lemonade.yaml" try: + # Read current env BEFORE modification — needed for gpu_backend guard + env_pre = load_env(env_path) + gpu_backend = env_pre.get("GPU_BACKEND", "nvidia") + # Save rollback snapshot env_backup = env_path.read_text(encoding="utf-8") if env_path.exists() else "" ini_backup = models_ini.read_text(encoding="utf-8") if models_ini.exists() else "" @@ -1503,10 +1572,10 @@ def _do_model_activate(self, model_id: str): "CTX_SIZE": str(context_length), "MAX_CONTEXT": str(context_length), } - # Update server image if the model requires a specific build - # (e.g., Gemma 4 needs server-cuda-b8648 instead of b8248) - if model.get("llama_server_image"): - updates["LLAMA_SERVER_IMAGE"] = model["llama_server_image"] + # Only update LLAMA_SERVER_IMAGE on Docker backends. + # macOS runs llama-server natively (no Docker image to pull). + if llama_server_image and gpu_backend != "apple": + updates["LLAMA_SERVER_IMAGE"] = llama_server_image new_lines = [] seen = set() for line in lines: @@ -1558,18 +1627,60 @@ def _do_model_activate(self, model_id: str): logger.info("Regenerated lemonade.yaml for model: extra.%s", gguf_file) # Restart llama-server with the new model. - # Two strategies depending on where the agent runs: - # - Host-native (Linux/macOS): docker compose stop+up, same as - # bootstrap-upgrade.sh. Simple, correct, preserves all config. - # - Containerized (Docker Desktop WSL2): docker inspect+run. - # Compose can't be used because relative bind-mount paths - # resolve to the agent container's filesystem, not the host. + # Three strategies depending on platform / agent location: + # - apple (macOS): llama-server runs natively via Metal, not Docker. + # Managed via PID file — SIGTERM the old process, launch new one. + # - _in_container (Docker Desktop / WSL2): docker inspect+run. + # Compose can't be used because relative bind-mount paths resolve + # to the agent container's filesystem, not the host. + # - Host-native Linux: docker compose stop+up, same as bootstrap-upgrade.sh. env = load_env(env_path) - gpu_backend = env.get("GPU_BACKEND", "nvidia") _in_container = bool(os.environ.get("DREAM_HOST_INSTALL_DIR")) - if _in_container: - override_image = model.get("llama_server_image") or "" + if gpu_backend == "apple": + # macOS: manage native llama-server process via PID file + pid_file = INSTALL_DIR / "data" / ".llama-server.pid" + llama_bin = INSTALL_DIR / "bin" / "llama-server" + llama_log = INSTALL_DIR / "data" / "llama-server.log" + + if not llama_bin.exists(): + env_path.write_text(env_backup, encoding="utf-8") + models_ini.write_text(ini_backup, encoding="utf-8") + json_response(self, 500, {"error": "llama-server binary not found — re-run installer"}) + return + + # Stop existing native process + if pid_file.exists(): + try: + old_pid = int(pid_file.read_text(encoding="utf-8").strip()) + # Verify PID is llama-server before killing (prevent PID reuse accidents) + try: + ps_result = subprocess.run( + ["ps", "-p", str(old_pid), "-o", "comm="], + capture_output=True, text=True, timeout=5, + ) + if "llama" not in ps_result.stdout.lower(): + raise OSError("PID is not llama-server") + except (subprocess.TimeoutExpired, OSError): + pid_file.unlink(missing_ok=True) + raise OSError("stale PID") + os.kill(old_pid, signal.SIGTERM) + for _ in range(20): + try: + os.kill(old_pid, 0) + time.sleep(0.5) + except OSError: + break + else: + os.kill(old_pid, signal.SIGKILL) + except (ValueError, OSError): + pass + pid_file.unlink(missing_ok=True) + + # Re-launch native llama-server with new model + _launch_native_llama_server(env_path, llama_bin, llama_log, pid_file) + elif _in_container: + override_image = llama_server_image or "" _recreate_llama_server(env, override_image=override_image) else: _compose_restart_llama_server(env) @@ -1577,16 +1688,17 @@ def _do_model_activate(self, model_id: str): # Health check (up to 5 min) # Use container name on docker network (localhost is the agent # container when running containerized, not the llama-server). - import time # Determine health check URL based on where the agent runs: # - Inside a container (DREAM_HOST_INSTALL_DIR set): use docker # network name + internal port 8080 - # - On the host (native systemd): use localhost + OLLAMA_PORT + # - On the host (native systemd or macOS): use 127.0.0.1 + OLLAMA_PORT. + # (Use 127.0.0.1, not localhost — localhost resolves to ::1 on + # IPv6-enabled hosts but Docker binds to 127.0.0.1 only.) if os.environ.get("DREAM_HOST_INSTALL_DIR"): llama_host = "dream-llama-server" llama_port = "8080" else: - llama_host = "localhost" + llama_host = "127.0.0.1" llama_port = env.get("OLLAMA_PORT", "8080") health_path = "/api/v1/health" if gpu_backend == "amd" else "/health" health_url = f"http://{llama_host}:{llama_port}{health_path}" @@ -1654,7 +1766,35 @@ def _do_model_activate(self, model_id: str): if lemonade_backup is not None: lemonade_yaml.write_text(lemonade_backup, encoding="utf-8") rollback_env = load_env(env_path) - if _in_container: + if gpu_backend == "apple": + # Stop newly launched native process, re-launch with old params + if pid_file.exists(): + try: + new_pid = int(pid_file.read_text(encoding="utf-8").strip()) + try: + ps_result = subprocess.run( + ["ps", "-p", str(new_pid), "-o", "comm="], + capture_output=True, text=True, timeout=5, + ) + if "llama" not in ps_result.stdout.lower(): + raise OSError("PID is not llama-server") + except (subprocess.TimeoutExpired, OSError): + pid_file.unlink(missing_ok=True) + raise OSError("stale PID") + os.kill(new_pid, signal.SIGTERM) + for _ in range(20): + try: + os.kill(new_pid, 0) + time.sleep(0.5) + except OSError: + break + else: + os.kill(new_pid, signal.SIGKILL) + except (ValueError, OSError): + pass + pid_file.unlink(missing_ok=True) + _launch_native_llama_server(env_path, llama_bin, llama_log, pid_file) + elif _in_container: _recreate_llama_server(rollback_env) else: _compose_restart_llama_server(rollback_env) @@ -1787,21 +1927,49 @@ def _write_lemonade_config(install_dir: Path, gguf_file: str): config_path.write_text(content, encoding="utf-8") logger.info("Wrote lemonade.yaml for model: extra.%s", gguf_file) +def _launch_native_llama_server(env_path: Path, llama_bin: Path, llama_log: Path, pid_file: Path): + """Launch the native (Metal) llama-server process and write its PID file. + + Reads the current .env for GGUF_FILE, CTX_SIZE, and LLAMA_REASONING so + the caller only needs to ensure .env is up-to-date before calling. + """ + env = load_env(env_path) + gguf_file = env.get("GGUF_FILE", "") + ctx_size = env.get("CTX_SIZE", "32768") + model_path = INSTALL_DIR / "data" / "models" / gguf_file + reasoning = env.get("LLAMA_REASONING", "off") + reasoning_fmt = {"off": "none", "on": "deepseek"}.get(reasoning, reasoning) + with open(llama_log, "a") as log_f: + proc = subprocess.Popen( + [str(llama_bin), + "--host", "0.0.0.0", "--port", "8080", + "--model", str(model_path), + "--ctx-size", ctx_size, + "--n-gpu-layers", "999", + "--reasoning-format", reasoning_fmt, + "--metrics"], + stdout=log_f, stderr=log_f, + ) + pid_file.write_text(str(proc.pid), encoding="utf-8") + logger.info("Native llama-server launched (pid %d, model %s)", proc.pid, gguf_file) + def _compose_restart_llama_server(env: dict): """Restart llama-server via docker compose (host-native path). - Primary restart strategy for Linux (systemd) and macOS (launchd) where the - agent runs natively on the host. Mirrors bootstrap-upgrade.sh lines 289-304. + This is the primary restart strategy for Linux (systemd) where the agent + runs natively on the host. It mirrors the proven pattern from + bootstrap-upgrade.sh lines 289-304. + Uses resolve_compose_flags() so the compose stack is always built from the + current install state — avoids stale or missing .compose-flags files. + Uses stop + up -d (not restart) so that updated .env values are picked up + by the new container. Raises RuntimeError on any docker-layer failure so _do_model_activate can surface the error immediately instead of waiting for the health-check loop. """ gpu_backend = env.get("GPU_BACKEND", "nvidia") - compose_flags = [] - flags_file = INSTALL_DIR / ".compose-flags" - if flags_file.exists(): - compose_flags = flags_file.read_text(encoding="utf-8").strip().split() + compose_flags = resolve_compose_flags() def _run(argv, timeout): result = subprocess.run( @@ -1815,11 +1983,14 @@ def _run(argv, timeout): ) if gpu_backend == "amd": - # Lemonade: restart preserves cached binary, reads models.ini on boot + # Lemonade reads models.ini on boot, so stop + up preserves the named + # cache volumes while ensuring the fresh config is picked up. if compose_flags: - _run(["docker", "compose"] + compose_flags + ["restart", "llama-server"], 300) + _run(["docker", "compose"] + compose_flags + ["stop", "llama-server"], 120) + _run(["docker", "compose"] + compose_flags + ["up", "-d", "llama-server"], 300) else: - _run(["docker", "restart", "dream-llama-server"], 300) + _run(["docker", "stop", "dream-llama-server"], 120) + _run(["docker", "start", "dream-llama-server"], 300) else: # llama.cpp: recreate to pick up new GGUF_FILE from .env if compose_flags: diff --git a/dream-server/extensions/services/dashboard-api/routers/models.py b/dream-server/extensions/services/dashboard-api/routers/models.py index 917c4dfa0..be503496b 100644 --- a/dream-server/extensions/services/dashboard-api/routers/models.py +++ b/dream-server/extensions/services/dashboard-api/routers/models.py @@ -214,6 +214,13 @@ def download_model(model_id: str, api_key: str = Depends(verify_api_key)): return result +@router.post("/api/models/download/cancel") +def cancel_download(api_key: str = Depends(verify_api_key)): + """Cancel an in-progress model download.""" + result = _call_agent_model("/v1/model/download/cancel", {}) + return result + + @router.post("/api/models/{model_id}/load") def load_model(model_id: str, api_key: str = Depends(verify_api_key)): """Activate a model — update config and restart llama-server.""" diff --git a/dream-server/extensions/services/dashboard-api/tests/test_host_agent.py b/dream-server/extensions/services/dashboard-api/tests/test_host_agent.py index 36ed90bb1..fef1430bf 100644 --- a/dream-server/extensions/services/dashboard-api/tests/test_host_agent.py +++ b/dream-server/extensions/services/dashboard-api/tests/test_host_agent.py @@ -449,3 +449,44 @@ def test_500_missing_schema(self, env_update_env): assert handler.response_code == 500 assert ".env.schema.json not found" in handler.parse_response()["error"] + + +class TestHandleModelDownloadCancel: + + def test_returns_no_download_when_idle(self, monkeypatch): + handler = _FakeHandler(b"") + monkeypatch.setattr(_mod, "AGENT_API_KEY", "test-key") + monkeypatch.setattr(_mod, "_model_download_thread", None) + _mod._model_download_cancel.clear() + + _mod.AgentHandler._handle_model_download_cancel(handler) + + assert handler.response_code == 200 + assert handler.parse_response()["status"] == "no_download" + assert _mod._model_download_cancel.is_set() is False + + def test_sets_cancel_flag_and_kills_active_proc(self, monkeypatch): + class _AliveThread: + def is_alive(self): + return True + + class _FakeProc: + def __init__(self): + self.killed = False + + def kill(self): + self.killed = True + + handler = _FakeHandler(b"") + proc = _FakeProc() + monkeypatch.setattr(_mod, "AGENT_API_KEY", "test-key") + monkeypatch.setattr(_mod, "_model_download_thread", _AliveThread()) + monkeypatch.setattr(_mod, "_model_download_proc", proc) + _mod._model_download_cancel.clear() + + _mod.AgentHandler._handle_model_download_cancel(handler) + + assert handler.response_code == 200 + assert handler.parse_response()["status"] == "cancelling" + assert _mod._model_download_cancel.is_set() is True + assert proc.killed is True diff --git a/dream-server/extensions/services/dashboard-api/tests/test_model_activate.py b/dream-server/extensions/services/dashboard-api/tests/test_model_activate.py index c6b023d00..f5df911ce 100644 --- a/dream-server/extensions/services/dashboard-api/tests/test_model_activate.py +++ b/dream-server/extensions/services/dashboard-api/tests/test_model_activate.py @@ -16,6 +16,8 @@ _check_lemonade_health = _mod._check_lemonade_health _send_lemonade_warmup = _mod._send_lemonade_warmup _write_lemonade_config = _mod._write_lemonade_config +_compose_restart_llama_server = _mod._compose_restart_llama_server +_launch_native_llama_server = _mod._launch_native_llama_server # --- _check_lemonade_health --- @@ -136,6 +138,78 @@ def test_file_path(self, tmp_path): assert (litellm_dir / "lemonade.yaml").exists() +class TestComposeRestartLlamaServer: + + def test_amd_uses_stop_then_up(self, monkeypatch, tmp_path): + calls = [] + + def fake_run(cmd, **kwargs): + calls.append(cmd) + return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="") + + monkeypatch.setattr(_mod, "INSTALL_DIR", tmp_path) + monkeypatch.setattr( + _mod, + "resolve_compose_flags", + lambda: ["--env-file", ".env", "-f", "docker-compose.base.yml"], + ) + monkeypatch.setattr(subprocess, "run", fake_run) + + _compose_restart_llama_server({"GPU_BACKEND": "amd"}) + + assert calls == [ + [ + "docker", "compose", "--env-file", ".env", "-f", + "docker-compose.base.yml", "stop", "llama-server", + ], + [ + "docker", "compose", "--env-file", ".env", "-f", + "docker-compose.base.yml", "up", "-d", "llama-server", + ], + ] + + +class TestLaunchNativeLlamaServer: + + def test_reads_env_and_writes_pid(self, monkeypatch, tmp_path): + env_path = tmp_path / ".env" + env_path.write_text( + "GGUF_FILE=test-model.gguf\nCTX_SIZE=8192\nLLAMA_REASONING=on\n", + encoding="utf-8", + ) + (tmp_path / "data" / "models").mkdir(parents=True) + (tmp_path / "data").mkdir(exist_ok=True) + llama_bin = tmp_path / "bin" / "llama-server" + llama_bin.parent.mkdir(parents=True) + llama_bin.write_text("", encoding="utf-8") + llama_log = tmp_path / "data" / "llama-server.log" + pid_file = tmp_path / "data" / ".llama-server.pid" + + calls = [] + + class _FakeProc: + pid = 4321 + + def fake_popen(cmd, **kwargs): + calls.append((cmd, kwargs)) + return _FakeProc() + + monkeypatch.setattr(_mod, "INSTALL_DIR", tmp_path) + monkeypatch.setattr(subprocess, "Popen", fake_popen) + + _launch_native_llama_server(env_path, llama_bin, llama_log, pid_file) + + assert pid_file.read_text(encoding="utf-8") == "4321" + cmd, _kwargs = calls[0] + assert cmd[0] == str(llama_bin) + assert "--model" in cmd + assert str(tmp_path / "data" / "models" / "test-model.gguf") in cmd + assert "--ctx-size" in cmd + assert "8192" in cmd + assert "--reasoning-format" in cmd + assert "deepseek" in cmd + + # --- Rollback integration --- diff --git a/dream-server/extensions/services/dashboard/src/hooks/useDownloadProgress.js b/dream-server/extensions/services/dashboard/src/hooks/useDownloadProgress.js index 79e1567ce..8d3b89590 100644 --- a/dream-server/extensions/services/dashboard/src/hooks/useDownloadProgress.js +++ b/dream-server/extensions/services/dashboard/src/hooks/useDownloadProgress.js @@ -34,10 +34,10 @@ export function useDownloadProgress(pollIntervalMs = 1000) { } else if (data.status === 'complete' || data.status === 'idle') { setIsDownloading(false) setProgress(null) - } else if (data.status === 'failed' || data.status === 'error') { + } else if (data.status === 'failed' || data.status === 'error' || data.status === 'cancelled') { setIsDownloading(false) setProgress({ - error: data.error || data.message || 'Download failed', + error: data.error || data.message || (data.status === 'cancelled' ? 'Download cancelled' : 'Download failed'), model: data.model }) } @@ -76,11 +76,21 @@ export function useDownloadProgress(pollIntervalMs = 1000) { return eta } + const cancelDownload = useCallback(async () => { + try { + await fetch('/api/models/download/cancel', { method: 'POST' }) + fetchProgress() + } catch (err) { + console.error('Failed to cancel download:', err) + } + }, [fetchProgress]) + return { isDownloading, progress, formatBytes, formatEta, - refresh: fetchProgress + refresh: fetchProgress, + cancelDownload } } diff --git a/dream-server/extensions/services/dashboard/src/pages/Models.jsx b/dream-server/extensions/services/dashboard/src/pages/Models.jsx index aeed979ab..90c9572e8 100644 --- a/dream-server/extensions/services/dashboard/src/pages/Models.jsx +++ b/dream-server/extensions/services/dashboard/src/pages/Models.jsx @@ -319,9 +319,19 @@ function DownloadProgressBar({ progress, helpers }) {
- - {progress.percent?.toFixed(0) || 0}% - +