Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions hermeto/core/models/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ def show_error(error: "ErrorDict") -> str:
# Add experimental package managers (or package managers whose implementation is in progress)
# here with an x- prefix (e.g. "x-foo"):
"x-maven",
"x-uv",
]


Expand Down Expand Up @@ -407,6 +408,12 @@ class RpmPackageInput(_PackageInputBase):
binary: RpmBinaryFilters | None = None


class UvPackageInput(_PackageInputBase):
"""Accepted input for a uv package."""

type: Literal["x-uv"]


class YarnPackageInput(_PackageInputBase):
"""Accepted input for a yarn package."""

Expand All @@ -431,6 +438,7 @@ def _workspaces_not_empty(cls, workspaces: list[str] | None) -> list[str] | None
| PipPackageInput
| PnpmPackageInput
| RpmPackageInput
| UvPackageInput
| YarnPackageInput,
# https://pydantic-docs.helpmanual.io/usage/types/#discriminated-unions-aka-tagged-unions
pydantic.Field(discriminator="type"),
Expand Down Expand Up @@ -548,6 +556,11 @@ def rpm_packages(self) -> list[RpmPackageInput]:
"""Get the rpm packages specified for this request."""
return self._packages_by_type(RpmPackageInput)

@property
def uv_packages(self) -> list[UvPackageInput]:
"""Get the uv packages specified for this request."""
return self._packages_by_type(UvPackageInput)

@property
def yarn_packages(self) -> list[YarnPackageInput]:
"""Get the yarn packages specified for this request."""
Expand Down
4 changes: 4 additions & 0 deletions hermeto/core/package_managers/uv/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# SPDX-License-Identifier: GPL-3.0-only
from hermeto.core.package_managers.uv.main import fetch_uv_source

__all__ = ["fetch_uv_source"]
210 changes: 210 additions & 0 deletions hermeto/core/package_managers/uv/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
# SPDX-License-Identifier: GPL-3.0-only
import asyncio
import logging
import subprocess
from dataclasses import dataclass
from pathlib import Path
from typing import NamedTuple
from urllib.parse import urlparse

from hermeto.core.checksum import must_match_any_checksum
from hermeto.core.config import get_config
from hermeto.core.errors import MissingChecksum, PackageRejected
from hermeto.core.models.input import Request
from hermeto.core.models.output import EnvironmentVariable, ProjectFile, RequestOutput
from hermeto.core.models.sbom import Component
from hermeto.core.package_managers.general import async_download_files
from hermeto.core.package_managers.pip.project_files import PyProjectTOML
from hermeto.core.package_managers.uv.models import UvArtifact, UvLock, UvPackage
from hermeto.core.rooted_path import RootedPath
from hermeto.core.scm import clone_as_tarball
from hermeto.core.utils import run_cmd

log = logging.getLogger(__name__)

DEFAULT_LOCKFILE_NAME = "uv.lock"


class _DownloadItem(NamedTuple):
package: UvPackage
artifact: UvArtifact
target: Path


@dataclass
class UvPackageInfo:
"""Resolved uv package with everything fetch-deps produced for it."""

name: str
version: str | None
components: list[Component]
rewritten_lockfile: ProjectFile | None


def fetch_uv_source(request: Request) -> RequestOutput:
"""Resolve and fetch uv dependencies for the given request."""
components: list[Component] = []
project_files: list[ProjectFile] = []

for package in request.uv_packages:
package_dir = request.source_dir.join_within_root(package.path)
info = _resolve_uv(package_dir, request.output_dir)
components.extend(info.components)
if info.rewritten_lockfile is not None:
project_files.append(info.rewritten_lockfile)

environment_variables = _generate_environment_variables()

return RequestOutput.from_obj_list(components, environment_variables, project_files)


def _resolve_uv(package_dir: RootedPath, output_dir: RootedPath) -> UvPackageInfo:
pyproject = package_dir.join_within_root("pyproject.toml")
if not pyproject.path.exists():
raise PackageRejected(
reason="pyproject.toml not found",
solution="A uv project requires a pyproject.toml next to uv.lock.",
)

name, version = _get_pyproject_metadata(package_dir)

_validate_lockfile(package_dir)

lock = UvLock.from_file(package_dir)
log.debug("Parsed %d packages from %s", len(lock.packages), DEFAULT_LOCKFILE_NAME)

_download_dependencies(output_dir, lock)

return UvPackageInfo(name=name, version=version, components=[], rewritten_lockfile=None)


def _validate_lockfile(package_dir: RootedPath) -> None:
"""`uv lock --check` validates without modifying the lockfile; non-zero on mismatch."""
log.info("Validating uv.lock at %s", package_dir)
try:
run_cmd(["uv", "lock", "--check", "--no-cache"], params={"cwd": package_dir.path})
except subprocess.CalledProcessError as e:
detail = (e.stderr or "").strip() or "uv.lock is not in sync with pyproject.toml"
raise PackageRejected(
reason=f"`uv lock --check` failed for {package_dir}: {detail}",
solution="Regenerate the lockfile with `uv lock`",
) from e


def _download_dependencies(output_dir: RootedPath, lock: UvLock) -> None:
"""Fetch every remote artifact recorded in uv.lock into a flat deps/uv directory."""
deps_dir = output_dir.join_within_root("deps", "uv")
deps_dir.path.mkdir(parents=True, exist_ok=True)

items: list[_DownloadItem] = []
files_to_download: dict[str, Path] = {}
for package in lock.packages:
if package.source.kind == "git":
_download_git_package(package, deps_dir)
continue
for artifact in _artifacts_to_download(package):
if artifact.url is None:
# should not happen
raise RuntimeError(f"artifact {artifact.filename!r} has no download URL")
target = deps_dir.join_within_root(_get_artifact_filename(artifact)).path
items.append(_DownloadItem(package, artifact, target))
if not target.exists():
files_to_download[artifact.url] = target

if files_to_download:
log.info("Downloading %d artifacts from %s", len(files_to_download), DEFAULT_LOCKFILE_NAME)
asyncio.run(async_download_files(files_to_download, get_config().runtime.concurrency_limit))

for package, artifact, target in items:
if artifact.checksum_info is not None:
must_match_any_checksum(target, [artifact.checksum_info])
else:
log.warning("Missing checksum for %s==%s", package.name, package.version)


def _artifacts_to_download(package: UvPackage) -> list[UvArtifact]:
"""Return the remote artifacts to fetch for a package.

Only sdists are fetched for now. Like pip's process_package_distributions,
this is the single place where binary filters will decide the sdist/wheel
split once they are supported. Local sources need no fetching and git
sources are cloned rather than downloaded, so both yield nothing here.
"""
if package.source.kind == "registry":
# registry checksums are optional in uv.lock; a missing hash is
# tolerated here and reported by the download phase
if package.sdist is None or package.sdist.url is None:
raise PackageRejected(
reason=f"{package.name}=={package.version} has no sdist in uv.lock",
solution=None,
)
return [package.sdist]

if package.source.kind == "url":
# a url source points at a single sdist or wheel file; uv records the
# mandatory hash under `sdist` or `wheels`, but the download URL only
# in the source itself
recorded = next((a for a in (package.sdist, *package.wheels) if a and a.hash), None)
if recorded is None or recorded.hash is None:
raise MissingChecksum(
f"{package.name}=={package.version}",
solution=(
"uv requires a hash for URL dependencies, so this lockfile looks "
"corrupted. Regenerate it with `uv lock`."
),
)
return [UvArtifact(url=package.source.location, hash=recorded.hash)]

return []


def _get_artifact_filename(artifact: UvArtifact) -> str:
"""Get the file name under deps/uv for a remote artifact."""
if artifact.filename:
return artifact.filename
filename = Path(urlparse(artifact.url or "").path).name
if not filename:
raise PackageRejected(
reason=f"Cannot determine a file name from artifact URL {artifact.url!r} in uv.lock",
solution="Regenerate the lockfile with `uv lock`.",
)
return filename


def _download_git_package(package: UvPackage, deps_dir: RootedPath) -> None:
"""Clone a git package at its resolved commit and archive it as a tarball."""
commit = package.source.get_git_commit()
tarball = deps_dir.join_within_root(f"{package.name}-gitcommit-{commit}.tar.gz")
if tarball.path.exists():
log.debug("%s already exists, skipping clone", tarball.path.name)
return

log.info("Cloning git repository for %s==%s", package.name, package.version)
clone_as_tarball(package.source.git_clone_url, commit, to_path=tarball.path)


def _get_pyproject_metadata(package_dir: RootedPath) -> tuple[str, str | None]:
"""Read the project's name/version from pyproject.toml's [project] table."""
pyproject = PyProjectTOML(package_dir)

name = pyproject.get_name()
if not name:
Comment thread
a-ovchinnikov marked this conversation as resolved.
raise PackageRejected(
reason="pyproject.toml does not declare a project name",
solution="Add a [project] table with a `name` field to pyproject.toml.",
)

version = pyproject.get_version()
if version is None:
log.warning("Could not resolve version from pyproject.toml at %s", package_dir)

return name, version


def _generate_environment_variables() -> list[EnvironmentVariable]:
return [
EnvironmentVariable(name="UV_OFFLINE", value="true"),
EnvironmentVariable(name="UV_FIND_LINKS", value="${output_dir}/deps/uv"),
EnvironmentVariable(name="UV_FROZEN", value="true"),
EnvironmentVariable(name="UV_NO_BINARY", value="true"),
]
Loading