|
| 1 | +"""komi-learn — self-update: check PyPI for a newer release and upgrade in place. |
| 2 | +
|
| 3 | +`komi-learn update` should Just Work regardless of how the user installed us. The |
| 4 | +hard part isn't the PyPI check — it's upgrading *the right environment*. A blind |
| 5 | +`pip install -U` can hit the wrong interpreter, fight a pipx-managed venv, or need |
| 6 | +permissions it doesn't have. So we detect the install method first and run the |
| 7 | +command that matches it (pip-into-this-interpreter, or `pipx upgrade`). If we |
| 8 | +genuinely can't tell, we print the command instead of guessing and breaking the |
| 9 | +user's environment. |
| 10 | +
|
| 11 | +Everything here is best-effort and network-failure-safe: a flaky PyPI lookup never |
| 12 | +raises out of `check_latest`, it just returns ``None``. |
| 13 | +""" |
| 14 | + |
| 15 | +from __future__ import annotations |
| 16 | + |
| 17 | +import json |
| 18 | +import os |
| 19 | +import re |
| 20 | +import shutil |
| 21 | +import subprocess |
| 22 | +import sys |
| 23 | +import urllib.request |
| 24 | +from dataclasses import dataclass |
| 25 | +from typing import Optional |
| 26 | + |
| 27 | +DIST_NAME = "komi-learn" |
| 28 | +PYPI_JSON_URL = f"https://pypi.org/pypi/{DIST_NAME}/json" |
| 29 | +_PYPI_TIMEOUT = 8 # seconds — a stuck lookup must not hang the CLI |
| 30 | +_MAX_PYPI_BYTES = 5 * 1024 * 1024 # cap the response read (one project's JSON << this) |
| 31 | + |
| 32 | + |
| 33 | +class _NoRedirect(urllib.request.HTTPRedirectHandler): |
| 34 | + """A redirect handler that refuses every redirect (returns None).""" |
| 35 | + |
| 36 | + def redirect_request(self, *args, **kwargs): # noqa: D401 |
| 37 | + return None |
| 38 | + |
| 39 | + |
| 40 | +# Module-level opener so we don't rebuild it per call. Refuses 3xx redirects. |
| 41 | +_NO_REDIRECT_OPENER = urllib.request.build_opener(_NoRedirect) |
| 42 | + |
| 43 | + |
| 44 | +# ── version comparison ─────────────────────────────────────────────────────── |
| 45 | +# |
| 46 | +# A self-contained PEP 440-lite comparator. We deliberately do NOT depend on |
| 47 | +# `packaging`: the engine's ethos is zero required deps, and "is 0.4.0 newer than |
| 48 | +# 0.3.0" doesn't justify a runtime dependency that may or may not be present |
| 49 | +# (depending on whether the lib happened to compute the answer differently per |
| 50 | +# environment). This handles the version shapes komi-learn actually ships — |
| 51 | +# release tuples, pre-releases (a/b/rc), post/dev, and a leading epoch — with one |
| 52 | +# code path, so the answer never varies by environment. |
| 53 | + |
| 54 | +# pre-release phase ranks: anything pre sorts BEFORE the plain release; post AFTER. |
| 55 | +_PRE_RANK = {"a": 0, "alpha": 0, "b": 1, "beta": 1, "rc": 2, "c": 2, "pre": 2, |
| 56 | + "preview": 2} |
| 57 | +_FINAL = 3 # a plain release (no pre/post/dev) sits between pre and post |
| 58 | +_POST = 4 |
| 59 | +# .devN sorts before everything at the same release (earliest), so it gets a phase |
| 60 | +# below the lowest pre-rank. |
| 61 | +_DEV = -1 |
| 62 | + |
| 63 | +_RELEASE_RE = re.compile(r"^(?:(\d+)!)?(\d+(?:\.\d+)*)(.*)$") |
| 64 | +_PRE_RE = re.compile(r"[._-]?(a|b|c|rc|alpha|beta|pre|preview)[._-]?(\d*)") |
| 65 | +_POST_RE = re.compile(r"[._-]?(?:post|rev|r)[._-]?(\d*)|-(\d+)") |
| 66 | +_DEV_RE = re.compile(r"[._-]?dev[._-]?(\d*)") |
| 67 | + |
| 68 | + |
| 69 | +def _norm_release(rel: str) -> tuple: |
| 70 | + """Numeric release tuple with trailing zeros stripped so 1.0 == 1.0.0.""" |
| 71 | + parts = [int(p) for p in rel.split(".")] |
| 72 | + while len(parts) > 1 and parts[-1] == 0: |
| 73 | + parts.pop() |
| 74 | + return tuple(parts) |
| 75 | + |
| 76 | + |
| 77 | +def _version_key(v: str): |
| 78 | + """Map a version string to a tuple that sorts per PEP 440 (for the shapes we |
| 79 | + ship). Unparseable input sorts lowest so it never spuriously beats a real |
| 80 | + release. The key shape is: |
| 81 | + (epoch, release_tuple, phase, phase_num, dev_num) |
| 82 | + where ``phase`` orders dev < pre(a<b<rc) < final < post. |
| 83 | + """ |
| 84 | + s = (v or "").strip().lower() |
| 85 | + if s[:1] == "v": # tolerate a leading v/V tag |
| 86 | + s = s[1:] |
| 87 | + m = _RELEASE_RE.match(s) |
| 88 | + if not m: |
| 89 | + return (-1, (), _DEV, -1, -1) # lowest possible |
| 90 | + epoch = int(m.group(1) or 0) |
| 91 | + release = _norm_release(m.group(2)) |
| 92 | + suffix = m.group(3) or "" |
| 93 | + |
| 94 | + dev = _DEV_RE.search(suffix) |
| 95 | + pre = _PRE_RE.search(suffix) |
| 96 | + post = _POST_RE.search(suffix) |
| 97 | + |
| 98 | + # dev takes precedence as the earliest phase; then pre; then post; else final. |
| 99 | + if dev: |
| 100 | + phase, phase_num = _DEV, int(dev.group(1) or 0) |
| 101 | + elif pre: |
| 102 | + phase, phase_num = _PRE_RANK.get(pre.group(1), 2), int(pre.group(2) or 0) |
| 103 | + elif post: |
| 104 | + phase, phase_num = _POST, int(post.group(1) or post.group(2) or 0) |
| 105 | + else: |
| 106 | + phase, phase_num = _FINAL, 0 |
| 107 | + dev_num = int(dev.group(1) or 0) if dev else 0 |
| 108 | + return (epoch, release, phase, phase_num, dev_num) |
| 109 | + |
| 110 | + |
| 111 | +# kept as a public helper (tests + back-compat); now returns the release tuple. |
| 112 | +def _parse_version(v: str) -> tuple: |
| 113 | + """The numeric release tuple (trailing zeros stripped), e.g. "0.4.0rc1"->(0,4). |
| 114 | + For full ordering (incl. pre/post/epoch) use :func:`is_newer`.""" |
| 115 | + return _version_key(v)[1] or (0,) |
| 116 | + |
| 117 | + |
| 118 | +def is_newer(latest: str, current: str) -> bool: |
| 119 | + """True if ``latest`` is a strictly newer version than ``current`` (PEP 440-lite).""" |
| 120 | + return _version_key(latest) > _version_key(current) |
| 121 | + |
| 122 | + |
| 123 | +# ── PyPI lookup ────────────────────────────────────────────────────────────── |
| 124 | + |
| 125 | +def check_latest(*, timeout: int = _PYPI_TIMEOUT) -> Optional[str]: |
| 126 | + """Return the latest version string on PyPI, or ``None`` if it can't be |
| 127 | + determined (offline, PyPI down, malformed payload). Never raises.""" |
| 128 | + try: |
| 129 | + req = urllib.request.Request( |
| 130 | + PYPI_JSON_URL, headers={"Accept": "application/json", |
| 131 | + "User-Agent": f"{DIST_NAME}-updater"}) |
| 132 | + # Refuse redirects: the hardcoded https literal only guarantees the FIRST |
| 133 | + # hop. urllib's default handler would follow a 3xx to any host/scheme, |
| 134 | + # including http — a MITM could downgrade the lookup and feed us a fake |
| 135 | + # "newer" version to coerce an upgrade. PyPI's JSON API doesn't redirect |
| 136 | + # cross-scheme here, so refusing is safe. |
| 137 | + with _NO_REDIRECT_OPENER.open(req, timeout=timeout) as resp: # nosec B310 - https, no-redirect |
| 138 | + if not (resp.geturl() or "").lower().startswith("https://"): |
| 139 | + return None |
| 140 | + # Bound the read: a hostile endpoint must not be able to stream a huge |
| 141 | + # body and OOM the CLI. One project's JSON is far under this cap. |
| 142 | + raw = resp.read(_MAX_PYPI_BYTES + 1) |
| 143 | + if len(raw) > _MAX_PYPI_BYTES: |
| 144 | + return None |
| 145 | + data = json.loads(raw.decode("utf-8")) |
| 146 | + ver = (data.get("info") or {}).get("version") |
| 147 | + return ver or None |
| 148 | + except Exception: |
| 149 | + return None |
| 150 | + |
| 151 | + |
| 152 | +# ── install-method detection ───────────────────────────────────────────────── |
| 153 | + |
| 154 | +def _is_pipx() -> bool: |
| 155 | + """Are we running from a pipx-managed venv? pipx installs each app into |
| 156 | + ``.../pipx/venvs/<app>/``. |
| 157 | +
|
| 158 | + We key off the *interpreter prefix* — the one signal an attacker can't set |
| 159 | + just by exporting an env var. A bare ``PIPX_HOME`` is NOT sufficient on its |
| 160 | + own: that would let a hostile environment flip a plain pip install into the |
| 161 | + pipx branch (and then a planted ``pipx`` on PATH would run). The env var only |
| 162 | + *corroborates* a prefix that already lives under it. |
| 163 | + """ |
| 164 | + prefix = (sys.prefix or "").replace("\\", "/") |
| 165 | + low = prefix.lower() |
| 166 | + if "/pipx/venvs/" in low or "/pipx/shared" in low: |
| 167 | + return True |
| 168 | + # If PIPX_HOME is set AND our interpreter actually lives under it, trust it. |
| 169 | + pipx_home = (os.environ.get("PIPX_HOME") or "").replace("\\", "/") |
| 170 | + if pipx_home and prefix.startswith(pipx_home): |
| 171 | + return True |
| 172 | + return False |
| 173 | + |
| 174 | + |
| 175 | +def _pip_available() -> bool: |
| 176 | + try: |
| 177 | + import pip # noqa: F401 |
| 178 | + return True |
| 179 | + except Exception: |
| 180 | + return False |
| 181 | + |
| 182 | + |
| 183 | +@dataclass |
| 184 | +class UpgradePlan: |
| 185 | + """How we intend to upgrade. ``cmd`` is the argv to run; ``manager`` is for |
| 186 | + display; ``runnable`` is False when we couldn't determine a safe command (then |
| 187 | + ``cmd`` is the human-facing suggestion to print, not to execute).""" |
| 188 | + manager: str |
| 189 | + cmd: list |
| 190 | + runnable: bool |
| 191 | + |
| 192 | + def display(self) -> str: |
| 193 | + return " ".join(self.cmd) |
| 194 | + |
| 195 | + |
| 196 | +def plan_upgrade() -> UpgradePlan: |
| 197 | + """Decide the upgrade command for *this* environment.""" |
| 198 | + if _is_pipx(): |
| 199 | + # pipx manages its own venv; pip -U inside it is the wrong tool. Resolve |
| 200 | + # pipx to an ABSOLUTE path (never an unqualified "pipx" off PATH — that |
| 201 | + # would let a planted binary earlier in PATH run during `update`). If we |
| 202 | + # can't find it, refuse and print the command rather than guess. |
| 203 | + pipx_path = shutil.which("pipx") |
| 204 | + if not pipx_path: |
| 205 | + return UpgradePlan("pipx", ["pipx", "upgrade", DIST_NAME], runnable=False) |
| 206 | + return UpgradePlan("pipx", [pipx_path, "upgrade", DIST_NAME], runnable=True) |
| 207 | + if _pip_available(): |
| 208 | + # Upgrade into the very interpreter that's running komi-learn — this is the |
| 209 | + # one whose `import komi` the hooks use. Mirrors model_install.py. |
| 210 | + return UpgradePlan( |
| 211 | + "pip", |
| 212 | + [sys.executable, "-m", "pip", "install", "--upgrade", DIST_NAME], |
| 213 | + runnable=True, |
| 214 | + ) |
| 215 | + # Couldn't find a package manager we trust to drive — hand the user a command |
| 216 | + # rather than risk corrupting a standalone/frozen install. |
| 217 | + return UpgradePlan("pip", ["pip", "install", "--upgrade", DIST_NAME], runnable=False) |
| 218 | + |
| 219 | + |
| 220 | +# ── upgrade execution + re-verify ──────────────────────────────────────────── |
| 221 | + |
| 222 | +def installed_version_via_subprocess() -> Optional[str]: |
| 223 | + """Read komi-learn's version in a *fresh* interpreter. |
| 224 | +
|
| 225 | + importlib.metadata caches distribution info for the life of a process, so after |
| 226 | + an in-process pip upgrade the running interpreter still reports the OLD version. |
| 227 | + Shelling out to a clean python gets the truth post-upgrade. |
| 228 | + """ |
| 229 | + # Pass the dist name as argv (sys.argv[1]) rather than interpolating it into |
| 230 | + # the code string — keeps the snippet free of string-building even though |
| 231 | + # DIST_NAME is a trusted literal. |
| 232 | + code = ( |
| 233 | + "import sys\n" |
| 234 | + "try:\n" |
| 235 | + " from importlib.metadata import version\n" |
| 236 | + " sys.stdout.write(version(sys.argv[1]))\n" |
| 237 | + "except Exception:\n" |
| 238 | + " pass\n" |
| 239 | + ) |
| 240 | + try: |
| 241 | + r = subprocess.run([sys.executable, "-c", code, DIST_NAME], |
| 242 | + capture_output=True, text=True, timeout=30) |
| 243 | + out = (r.stdout or "").strip() |
| 244 | + return out or None |
| 245 | + except Exception: |
| 246 | + return None |
| 247 | + |
| 248 | + |
| 249 | +def run_upgrade(plan: UpgradePlan, *, timeout: int = 1200) -> tuple[bool, str]: |
| 250 | + """Execute the upgrade command. Returns (ok, detail). Output streams to the |
| 251 | + user's terminal so they see pip's progress (and any resolver errors).""" |
| 252 | + if not plan.runnable: |
| 253 | + return False, "no runnable upgrade command for this environment" |
| 254 | + try: |
| 255 | + r = subprocess.run(plan.cmd, timeout=timeout) |
| 256 | + except FileNotFoundError: |
| 257 | + return False, f"`{plan.cmd[0]}` not found on PATH" |
| 258 | + except subprocess.TimeoutExpired: |
| 259 | + return False, "upgrade timed out" |
| 260 | + except Exception as e: |
| 261 | + return False, f"upgrade failed to launch: {e}" |
| 262 | + if r.returncode != 0: |
| 263 | + return False, f"upgrade exited {r.returncode}" |
| 264 | + return True, "upgraded" |
| 265 | + |
| 266 | + |
| 267 | +__all__ = [ |
| 268 | + "DIST_NAME", "PYPI_JSON_URL", |
| 269 | + "check_latest", "is_newer", "plan_upgrade", "UpgradePlan", |
| 270 | + "run_upgrade", "installed_version_via_subprocess", |
| 271 | +] |
0 commit comments