diff --git a/infra/staging.nix b/infra/staging.nix index b30f92c60..99a3889e9 100644 --- a/infra/staging.nix +++ b/infra/staging.nix @@ -1,7 +1,6 @@ { config, pkgs, - lib, ... }: let @@ -25,31 +24,6 @@ in }; swapDevices = [ { device = "/dev/disk/by-label/swap"; } ]; - # Given 16G RAM, this should be enough at the time of writing: - # - Postgres needs ~1.5G RAM - # - The application server needs <1G RAM - # - An evaluation of Nixpkgs peaks at 6-7G of RAM. - # - We run at most one instance of `nix-eval-jobs` concurrently. - # See `config.services.web-security-tracker.maxJobProcessors` to make sure. - # - We observe ~2M store paths in an active evaluation, taking ~5GB - # Adjust parameters (or get a bigger machine) if it doesn't work out. - systemd.mounts = [ - { - what = "tmpfs"; - where = "/tmp"; - type = "tmpfs"; - mountConfig.Options = lib.concatStringsSep "," [ - "mode=1777" - "strictatime" - "rw" - "nosuid" - "nodev" - "size=37%" - "nr_inodes=4m" - ]; - } - ]; - systemd.network.networks."10-wan" = { matchConfig.MACAddress = "96:00:03:d9:7c:85"; address = [ diff --git a/nix/tests/default.nix b/nix/tests/default.nix index 9ce2a9083..92ebdf79c 100644 --- a/nix/tests/default.nix +++ b/nix/tests/default.nix @@ -130,6 +130,9 @@ pkgs.testers.runNixOSTest { server.wait_for_unit("${application}-worker.service") server.wait_for_unit("mock-channels.service") + with subtest("Check that no migrations were missed"): + server.succeed("wst-manage makemigrations --check --dry-run") + with subtest("Check that channel are fetched and evaluations enqueued"): server.succeed("wst-manage fetch_all_channels") ${in-shell "succeed" '' @@ -175,10 +178,23 @@ pkgs.testers.runNixOSTest { ${ # XXX(@fricklerhandwerk): We do this at the end since it takes a while and would otherwise stall the Django tests. in-shell "wait_until_succeeds" '' - from shared.models import NixEvaluation + from shared.models import ( + NixEvaluation, + NixDerivation, + NixDerivationMeta, + NixMaintainer, + NixLicense, + ) assert NixEvaluation.objects.filter( state=NixEvaluation.EvaluationState.COMPLETED, ).count() == 3 + for model, count in [ + (NixDerivation, 3), + (NixDerivationMeta, 3), + (NixMaintainer, 1), + (NixLicense, 1), + ]: + assert model.objects.count() == count, f"{model._meta.object_name}: expected {count}, got {model.objects.count()}" '' } ''; diff --git a/src/project/settings.py b/src/project/settings.py index 23b8a5b12..2871b6ad5 100644 --- a/src/project/settings.py +++ b/src/project/settings.py @@ -333,9 +333,9 @@ class AppSettings(BaseModel): MAX_PARALLEL_EVALUATION = 3 # Where are the stderr of each `nix-eval-jobs` stored. EVALUATION_LOGS_DIRECTORY: str = str( - Path(BASE_DIR / ".." / ".." / "nixpkgs-evaluation-logs").resolve() + Path(BASE_DIR / ".." / "nixpkgs-evaluation-logs").resolve() ) -CVE_CACHE_DIR: str = str(Path(BASE_DIR / ".." / ".." / "cve-cache").resolve()) +CVE_CACHE_DIR: str = str(Path(BASE_DIR / ".." / "cve-cache").resolve()) # This can be tuned for your specific deployment, # this is used to wait for an evaluation slot to be available # It should be around the average evaluation time on your machine. diff --git a/src/shared/evaluation.py b/src/shared/evaluation.py index 46a35bc80..e0a2e239b 100644 --- a/src/shared/evaluation.py +++ b/src/shared/evaluation.py @@ -1,24 +1,20 @@ import json import logging import time -from collections.abc import Callable, Generator, Iterable +from collections.abc import Callable, Generator from dataclasses import dataclass, field -from itertools import chain from typing import Any, TypeVar -from dataclass_wizard import DumpMixin, JSONWizard, LoadMixin +from dataclass_wizard import JSONWizard, LoadMixin from django.db.models import Model -from django.db.utils import IntegrityError from shared.models.nix_evaluation import ( + MAJOR_CHANNELS, NixDerivation, NixDerivationMeta, - NixDerivationOutput, NixEvaluation, NixLicense, NixMaintainer, - NixOutput, - NixStorePathOutput, ) T = TypeVar("T", bound=Model) @@ -47,7 +43,7 @@ class LicenseAttribute(JSONWizard): @dataclass -class MetadataAttribute(JSONWizard, LoadMixin, DumpMixin): +class MetadataAttribute(JSONWizard, LoadMixin): outputs_to_install: list[str] = field(default_factory=list) available: bool = True broken: bool = False @@ -64,17 +60,6 @@ class MetadataAttribute(JSONWizard, LoadMixin, DumpMixin): platforms: list[str] = field(default_factory=list) known_vulnerabilities: list[str] = field(default_factory=list) - def __pre_as_dict__(self) -> None: - linearized_maintainers = [] - for maintainer in self.maintainers: - if maintainer.get("scope") is not None: # pyright: ignore generalTypeIssue - linearized_maintainers.extend( - maintainer.get("members", []) # pyright: ignore generalTypeIssue - ) - else: - linearized_maintainers.append(maintainer) - self.maintainers = linearized_maintainers - @dataclass class EvaluatedAttribute(JSONWizard): @@ -86,12 +71,32 @@ class EvaluatedAttribute(JSONWizard): attr_path: list[str] name: str drv_path: str - # drv -> list of outputs. - input_drvs: dict[str, list[str]] meta: MetadataAttribute | None outputs: dict[str, str] system: str + def as_key(self) -> tuple[str, str, str, str | None]: + """ + Unique dictionary key for a derivation + + These are the actual degrees of freedom for a derivation, judging from the data. + """ + # FIXME(@fricklerhandwerk): We should only need the derivation path! + # Extract the extra fields to save more space. + # A `NixPackage` could indeed consist of just `pname` (parsed from `name`, validate against `attribute` and `drv_metadata.name`). + # Then we'd describe all occurrences of a `NixPackage` with + # - NixDerivation + # - attribute_name + # - metadata__name + # - parent_evaluation (also extracted since derivation paths of close-to-root packages can be the same across evaluations) + # - version (also parsed from `name`, for easier querying) + return ( + self.drv_path, + self.attr, + self.name, + self.meta.name or None if self.meta else None, + ) + @dataclass class PartialEvaluatedAttribute: @@ -107,11 +112,11 @@ class PartialEvaluatedAttribute: evaluation: EvaluatedAttribute | None = None -def parse_total_evaluation(raw: dict[str, Any]) -> EvaluatedAttribute: +def fixup_evaluated_attribute(raw: dict[str, Any]) -> EvaluatedAttribute: # Various fixups to deal with... things. # my lord... if raw.get("meta", {}) is None: - print(raw) + logger.info(f"'{raw['attr']}' has no metadata") if ( raw.get("meta", {}) is not None @@ -132,7 +137,9 @@ def parse_total_evaluation(raw: dict[str, Any]) -> EvaluatedAttribute: and isinstance(raw.get("meta", {})["maintainers"], list) ): for maintainer in raw.get("meta", {})["maintainers"]: - if maintainer.get("scope") is not None: + if maintainer.get("shortName") is not None: + # FIXME(@fricklerhandwerk): This should actually never happen, judging from recent data. + logger.info("Maintainer '{maintainer['shortName']}' is actually a team") new_maintainers.extend(maintainer["members"]) else: new_maintainers.append(maintainer) @@ -147,22 +154,14 @@ def parse_evaluation_result(line: str) -> PartialEvaluatedAttribute: attr=raw.get("attr"), attr_path=raw.get("attr_path"), error=None, - evaluation=parse_total_evaluation(raw) if raw.get("error") is None else None, + evaluation=fixup_evaluated_attribute(raw) if raw.get("error") is None else None, ) -def parse_evaluation_results( - lines: Iterable[str], -) -> Generator[PartialEvaluatedAttribute]: - for line in lines: - yield parse_evaluation_result(line) - - -def bulkify[T]( +def by_drv_key[T]( gen: Generator[tuple[EvaluatedAttribute, list[T]]], -) -> Generator[tuple[str, list[T]]]: - for origin, elements in gen: - yield (origin.drv_path, elements) +) -> dict[tuple[str, str, str, str | None], list[T]]: + return dict((origin.as_key(), elements) for origin, elements in gen) class SyncBatchAttributeIngester: @@ -171,94 +170,96 @@ class SyncBatchAttributeIngester: of a bunch of **evaluated** attribute synchronously. """ - def __init__(self, evaluations: list[EvaluatedAttribute]) -> None: + def __init__( + self, evaluations: list[EvaluatedAttribute], parent_evaluation: NixEvaluation + ) -> None: self.evaluations = evaluations + self.parent_evaluation = parent_evaluation + # FIXME(@fricklerhandwerk): This will fall apart when we obtain the channel structure dynamically [ref:channel-structure] + self.rolling_release = ( + MAJOR_CHANNELS[0] in self.parent_evaluation.channel.channel_branch + ) def initialize(self) -> None: self.maintainers = list(NixMaintainer.objects.all()) self.licenses = list(NixLicense.objects.all()) - outputs = list(NixOutput.objects.all()) - self.outputs = {model.output_name: model for model in outputs} - def ingest_maintainers( + def parse_maintainers( self, maintainers: list[MaintainerAttribute] ) -> list[NixMaintainer]: - ms = [] + bulk = [] seen = set() + for m in maintainers: - # Maintainers without a GitHub or a GitHub ID cannot be reconciled. + # Maintainers without a GitHub ID cannot be reconciled. # This unfortunately creates a partial view of all maintainers of a # given package. If you want to fix this, you can start from # looking around https://github.com/NixOS/nixpkgs/pull/273220. - if m.github is None or m.github_id is None: + missing = [] + if m.github is None: + missing.append("GitHub handle") + # FIXME(@fricklerhandwerk): We could try to recover the maintainer based on the handle alone. + if m.github_id is None: + missing.append("GitHub ID") + logger.info( + f"Skipping maintainer '{m.name}': no {' and no '.join(missing)}" + ) continue - # Duplicate... if m.github_id in seen: continue - try: - ms.append( - NixMaintainer.objects.update_or_create( - defaults={ - "github": m.github, - "email": m.email, - "matrix": m.matrix, - "name": m.name, - }, - github_id=m.github_id, - ) + + bulk.append( + NixMaintainer( + github_id=m.github_id, + github=m.github, + email=m.email, + matrix=m.matrix, + name=m.name, ) - except IntegrityError: - # Skip this maintainer until we decide how to handle #657 - logger.debug(f"Skipping maintainer {m.github} due to username conflict") - continue + ) seen.add(m.github_id) - return [obj for obj, _ in ms] + return bulk - def ingest_licenses(self, licenses: list[LicenseAttribute]) -> list[NixLicense]: - lics = [] + def parse_licenses(self, licenses: list[LicenseAttribute]) -> list[NixLicense]: + bulk = [] seen = set() for lic in licenses: - if lic.spdx_id is None or lic.spdx_id in seen: + if lic.spdx_id is None: + logger.debug(f"Skipping license without SPDX-ID: {lic}") continue - lics.append( - NixLicense.objects.get_or_create( - defaults={ - "deprecated": lic.deprecated, - "free": lic.free, - "redistributable": lic.redistributable, - "full_name": lic.full_name, - "short_name": lic.short_name, - "url": lic.url, - }, + if lic.spdx_id in seen: + continue + + bulk.append( + NixLicense( spdx_id=lic.spdx_id, + deprecated=lic.deprecated, + free=lic.free, + redistributable=lic.redistributable, + full_name=lic.full_name, + short_name=lic.short_name, + url=lic.url, ) ) seen.add(lic.spdx_id) - return [obj for obj, _ in lics] + return bulk - def ingest_meta( - self, evaluation: EvaluatedAttribute + def parse_meta( + self, metadata: MetadataAttribute ) -> tuple[ NixDerivationMeta, - DeferredThrough[NixMaintainer], - DeferredThrough[NixLicense], + list[NixMaintainer], + list[NixLicense], ]: - metadata = evaluation.meta - assert metadata is not None, ( - "invalid ingest_meta call to an invalid metadata attribute" - ) + maintainers = self.parse_maintainers(metadata.maintainers) + licenses = self.parse_licenses(metadata.license) - maintainers = self.ingest_maintainers(metadata.maintainers) - if isinstance(metadata.license, list): - licenses = self.ingest_licenses(metadata.license) - else: - licenses = self.ingest_licenses([metadata.license]) meta = NixDerivationMeta( name=metadata.name, insecure=metadata.insecure, @@ -273,250 +274,169 @@ def ingest_meta( known_vulnerabilities=metadata.known_vulnerabilities, ) - # Those thunks are here to delay the evaluation of the M2M throughs. - def thunk_maintainers_throughs( - meta_pk: int, - ) -> list[NixMaintainer]: - return [ - NixDerivationMeta.maintainers.through( - nixderivationmeta_id=meta_pk, nixmaintainer_id=maintainer.pk - ) - for maintainer in maintainers - ] - - def thunk_licenses_throughs( - meta_pk: int, - ) -> list[NixLicense]: - return [ - NixDerivationMeta.licenses.through( - nixderivationmeta_id=meta_pk, nixlicense_id=license.pk - ) - for license in licenses - ] - - return meta, thunk_maintainers_throughs, thunk_licenses_throughs + return meta, maintainers, licenses - def ingest_outputs( - self, evaluation: EvaluatedAttribute - ) -> list[NixStorePathOutput]: - store_paths = [f"{value}!{key}" for (key, value) in evaluation.outputs.items()] - existing = NixStorePathOutput.objects.in_bulk( - store_paths, field_name="store_path" - ) - return list(existing.values()) + [ - NixStorePathOutput(store_path=store_path) - for store_path in store_paths - if store_path not in existing - ] - - def ingest_dependencies( - self, evaluation: EvaluatedAttribute - ) -> list[NixDerivationOutput]: - # FIXME(raitobezarius): bulk upsert the outputs - # then add them into the M2M. - - return [ - NixDerivationOutput(derivation_path=drvpath) - for drvpath in evaluation.input_drvs.keys() - ] - - def ingest_derivation_shell( + def make_derivation_shell( self, - evaluation: EvaluatedAttribute, - parent_evaluation: NixEvaluation, + attribute: EvaluatedAttribute, metadata: NixDerivationMeta | None = None, ) -> NixDerivation: return NixDerivation( - attribute=evaluation.attr.removesuffix(f".{evaluation.system}"), - derivation_path=evaluation.drv_path, - name=evaluation.name, + attribute=attribute.attr.removesuffix(f".{attribute.system}"), + derivation_path=attribute.drv_path, + name=attribute.name, metadata=metadata, - system=evaluation.system, - parent_evaluation=parent_evaluation, + system=attribute.system, + parent_evaluation=self.parent_evaluation, ) - def ingest(self, parent_evaluation: NixEvaluation) -> list[NixDerivation]: + def ingest(self) -> list[NixDerivation]: start = time.time() - dependencies = dict( - bulkify( - (evaluation, self.ingest_dependencies(evaluation)) - for evaluation in self.evaluations + bulk_derivations: dict[tuple[str, str, str, str | None], NixDerivation] = {} + bulk_maintainers: dict[int, NixMaintainer] = {} + bulk_licenses: dict[str, NixLicense] = {} + metadata = [] + meta_maintainers = [] + meta_licenses = [] + for index, attribute in enumerate(self.evaluations): + drv_metadata = None + if attribute.meta is not None: + ( + drv_metadata, + drv_maintainers, + drv_licenses, + ) = self.parse_meta(attribute.meta) + + metadata.append(drv_metadata) + meta_maintainers.append(drv_maintainers) + meta_licenses.append(drv_licenses) + for maintainer in drv_maintainers: + bulk_maintainers[maintainer.github_id] = maintainer + for license in drv_licenses: + bulk_licenses[license.spdx_id] = license + + bulk_derivations[attribute.as_key()] = self.make_derivation_shell( + attribute, drv_metadata ) - ) - NixDerivationOutput.objects.bulk_create( - chain.from_iterable(dependencies.values()) - ) + logger.debug( - "Ingestion of all dependencies (%d) took %f s", - len(dependencies), + "Parsed %d maintainers and %d licences for %d derivations in %f s", + len(bulk_maintainers), + len(bulk_licenses), + len(bulk_derivations), time.time() - start, ) - outputs = dict( - bulkify( - (evaluation, self.ingest_outputs(evaluation)) - for evaluation in self.evaluations - ) - ) - # When Django 5 will be available, we will be able to get PKs directly. + # Anything but the rolling release must be considered stale. + # Therefore we only add new rows if this is not a rolling release. start = time.time() - inserted = False - attempt = 0 - store_path_outputs = { - item.store_path: item for item in chain.from_iterable(outputs.values()) - } - new_store_path_outputs = [ - spo for spo in store_path_outputs.values() if spo.pk is None - ] - while not inserted: - try: - for spo in NixStorePathOutput.objects.bulk_create( - new_store_path_outputs - ): - store_path_outputs[spo.store_path].pk = spo.pk - inserted = True - logger.debug( - "Ingestion of all Nix store path outputs (%d) took %f s", - len(store_path_outputs), - time.time() - start, - ) - except IntegrityError: - logger.debug( - "Failed to bulk-insert all Nix store path outputs, attempt %d...", - attempt, - ) - attempt += 1 - existing_new = NixStorePathOutput.objects.in_bulk( - [spo.store_path for spo in new_store_path_outputs], - field_name="store_path", - ) - # Filter out existing new ones. - new_store_path_outputs = [ - spo - for spo in new_store_path_outputs - if spo.store_path not in existing_new - ] - # Extend existing new ones with IDs. - for spath, existing in existing_new.items(): - store_path_outputs[spath].pk = existing.pk - continue - - # FIXME(raitobezarius): bulk ingest the maintainers or licenses themselves. - # This requires knowing in advance the maintainer PK or license PK - # and thunking it further. - derivations: dict[str, NixDerivation] = {} - thunked_maintainers_throughs = [] - thunked_licenses_throughs = [] - maintainers_throughs = [] - licenses_throughs = [] - metadatas = [] + NixMaintainer.objects.bulk_create( + bulk_maintainers.values(), + # This will ignore existing rows and won't return primary keys when `True`. + # That's okay because we'll fetch the relevant objects aftwards unconditionally. + ignore_conflicts=not self.rolling_release, + update_conflicts=self.rolling_release, + unique_fields=["github_id"], + update_fields=["github", "email", "matrix", "name"], + ) + db_maintainers = NixMaintainer.objects.in_bulk( + bulk_maintainers.keys(), + field_name="github_id", + ) + logger.debug( + "Ingested %d maintainers for %d derivations in %f s", + len(bulk_maintainers), + len(bulk_derivations), + time.time() - start, + ) start = time.time() - for index, evaluation in enumerate(self.evaluations): - eval_dependencies = dependencies[evaluation.drv_path] - eval_outputs = outputs[evaluation.drv_path] - metadata = None - if evaluation.meta is not None: - ( - metadata, - drv_maintainers_throughs, - drv_licenses_throughs, - ) = self.ingest_meta(evaluation) - metadata_index = len(metadatas) - thunked_maintainers_throughs.append( - (metadata_index, drv_maintainers_throughs) - ) - thunked_licenses_throughs.append( - (metadata_index, drv_licenses_throughs) - ) - metadatas.append(metadata) - - derivations[evaluation.drv_path] = self.ingest_derivation_shell( - evaluation, parent_evaluation, metadata - ) + NixLicense.objects.bulk_create( + bulk_licenses.values(), + ignore_conflicts=not self.rolling_release, + update_conflicts=self.rolling_release, + unique_fields=["spdx_id"], + update_fields=[ + "deprecated", + "free", + "redistributable", + "full_name", + "short_name", + "url", + ], + ) + # FIXME(@fricklerhandwerk): This duplicates metadata entries at least by the number of systems we evaluate. + # [ref:deduplicate-metadata] + db_licenses = NixLicense.objects.in_bulk( + bulk_licenses.keys(), + field_name="spdx_id", + ) logger.debug( - "Ingestion of derivation shells (%d) and their maintainers or licenses took %f s", - len(derivations), + "Ingested %d licenses for %d derivations in %f s", + len(bulk_licenses), + len(bulk_derivations), time.time() - start, ) start = time.time() - metadatas = NixDerivationMeta.objects.bulk_create(metadatas) + db_metadata = NixDerivationMeta.objects.bulk_create(metadata) logger.debug( - "Ingestion of all metadata (%d) took %f s", - len(metadatas), + "Ingested %d metadata entries for %d derivations in %f s", + len(metadata), + len(bulk_derivations), time.time() - start, ) - derivations = { - drv.derivation_path: drv - for drv in NixDerivation.objects.bulk_create(derivations.values()) - } - for index, thunk in thunked_maintainers_throughs: - maintainers_throughs.extend(thunk(metadatas[index].pk)) - - for index, thunk in thunked_licenses_throughs: - licenses_throughs.extend(thunk(metadatas[index].pk)) - - deps_throughs = [] - outputs_throughs = [] - for drvpath, eval_dependencies in dependencies.items(): - assert all(dep.pk is not None for dep in eval_dependencies), ( - "One dependency has no PK" - ) - deps_throughs.extend( + start = time.time() + maintainers_throughs = [] + licenses_throughs = [] + for db_meta, maintainers, licenses in zip( + db_metadata, meta_maintainers, meta_licenses + ): + maintainers_throughs.extend( [ - NixDerivation.dependencies.through( - nixderivationoutput_id=dep.pk, - nixderivation_id=derivations[drvpath].pk, + NixDerivationMeta.maintainers.through( + nixderivationmeta_id=db_meta.pk, + nixmaintainer_id=db_maintainers[maintainer.github_id].pk, ) - for dep in eval_dependencies + for maintainer in maintainers ] ) - - for drvpath, eval_outputs in outputs.items(): - assert all( - store_path_outputs[output.store_path].pk is not None - for output in eval_outputs - ), "One output has no PK" - outputs_throughs.extend( + licenses_throughs.extend( [ - NixDerivation.outputs.through( - nixstorepathoutput_id=store_path_outputs[output.store_path].pk, - nixderivation_id=derivations[drvpath].pk, + NixDerivationMeta.licenses.through( + nixderivationmeta_id=db_meta.pk, + nixlicense_id=db_licenses[license.spdx_id].pk, ) - for output in eval_outputs + for license in licenses ] ) - start = time.time() NixDerivationMeta.maintainers.through.objects.bulk_create(maintainers_throughs) logger.debug( - "Ingestion of all maintainers M2M (%d) took %f s", + "Ingested %d maintainers M2Ms for %d derivations in %f s", len(maintainers_throughs), + len(bulk_derivations), time.time() - start, ) start = time.time() NixDerivationMeta.licenses.through.objects.bulk_create(licenses_throughs) logger.debug( - "Ingestion of all licenses M2M (%d) took %f s", + "Ingested %d licenses M2Ms for %d derivations in %f s", len(licenses_throughs), + len(bulk_derivations), time.time() - start, ) start = time.time() - NixDerivation.dependencies.through.objects.bulk_create(deps_throughs) - logger.debug( - "Ingestion of all dependencies M2M (%d) took %f s", - len(deps_throughs), - time.time() - start, + db_derivations_list = NixDerivation.objects.bulk_create( + bulk_derivations.values() ) - start = time.time() - NixDerivation.outputs.through.objects.bulk_create(outputs_throughs) + db_derivations = dict(zip(bulk_derivations.keys(), db_derivations_list)) logger.debug( - "Ingestion of all outputs M2M (%d) took %f s", - len(outputs_throughs), + "Ingested %d derivation shells in %f s", + len(bulk_derivations), time.time() - start, ) - return list(derivations.values()) + return list(db_derivations.values()) diff --git a/src/shared/github.py b/src/shared/github.py index 214432a31..89ba433f5 100644 --- a/src/shared/github.py +++ b/src/shared/github.py @@ -68,32 +68,14 @@ def cvss_details() -> str: return "" def maintainers() -> str: - # Get all maintainer github_ids from currently active packages - active_package_maintainer_ids = { - maintainer["github_id"] - for package in cached_suggestion.payload["packages"].values() - for maintainer in package["maintainers"] - if "github_id" in maintainer - } - - # Filter active maintainers to only those still in active packages - filtered_active_maintainers = [ - maintainer - for maintainer in cached_suggestion.payload["categorized_maintainers"][ - "active" - ] - if maintainer["github_id"] in active_package_maintainer_ids - ] + maintainers = cached_suggestion.payload["categorized_maintainers"] # We need to query for the latest username of each maintainer, because # those might have changed since they were written out in Nixpkgs; since # we have the user id (which is stable), we can ask the GitHub API maintainers_list = [ get_maintainer_username(maintainer, github) - for maintainer in ( - filtered_active_maintainers - + cached_suggestion.payload["categorized_maintainers"]["added"] - ) + for maintainer in (maintainers["active"] + maintainers["added"]) if "github_id" in maintainer and "github" in maintainer ] diff --git a/src/shared/listeners/cache_suggestions.py b/src/shared/listeners/cache_suggestions.py index 4ff1ca707..6d15f5c9b 100644 --- a/src/shared/listeners/cache_suggestions.py +++ b/src/shared/listeners/cache_suggestions.py @@ -175,8 +175,6 @@ def cache_new_suggestions(suggestion: CVEDerivationClusterProposal) -> None: derivations = list( suggestion.derivations.select_related("metadata", "parent_evaluation") .prefetch_related( - "outputs", - "dependencies", Prefetch( "metadata__maintainers", queryset=NixMaintainer.objects.distinct(), @@ -197,10 +195,6 @@ def cache_new_suggestions(suggestion: CVEDerivationClusterProposal) -> None: ) package_edits = list(suggestion.package_edits.all()) packages = apply_package_edits(original_packages, package_edits) - # FIXME(@fricklerhandwerk): We should just pass `packages`, but a tangled legacy view is still using this seemingly internal function and wants to pass a dict. - categorized_maintainers = categorize_maintainers( - original_packages, maintainers_edits - ) only_relevant_data = CachedSuggestion( pk=suggestion.pk, @@ -213,7 +207,7 @@ def cache_new_suggestions(suggestion: CVEDerivationClusterProposal) -> None: original_packages=packages, packages=packages, metrics=[to_dict(m) for m in prefetched_metrics], - categorized_maintainers=categorized_maintainers, + categorized_maintainers=categorize_maintainers(packages, maintainers_edits), ) _, created = CachedSuggestions.objects.update_or_create( @@ -427,19 +421,17 @@ def maintainers_list(packages: dict, edits: list[MaintainersEdit]) -> list[dict] def categorize_maintainers( - original_packages: dict[str, CachedSuggestion.Package], + packages: dict[str, CachedSuggestion.Package], maintainers_edits: list[MaintainersEdit], ) -> CachedSuggestion.CategorizedMaintainers: """ - Categorize maintainers associated the packages of a suggestion. + Categorize maintainers associated to the packages of a suggestion. """ # Collect all original maintainers from packages (deduplicated by github_id) original_maintainers_dict: dict[int, dict] = {} - for package in original_packages.values(): - for maintainer_dict in package.maintainers: - github_id = maintainer_dict["github_id"] - if github_id not in original_maintainers_dict: - original_maintainers_dict[github_id] = maintainer_dict + for package in packages.values(): + for maintainer in package.maintainers: + original_maintainers_dict[maintainer["github_id"]] = maintainer original_maintainers = list(original_maintainers_dict.values()) diff --git a/src/shared/listeners/nix_evaluation.py b/src/shared/listeners/nix_evaluation.py index c19f66efa..6b849e279 100644 --- a/src/shared/listeners/nix_evaluation.py +++ b/src/shared/listeners/nix_evaluation.py @@ -30,7 +30,6 @@ async def perform_evaluation( working_tree: pathlib.Path, - eval_store: pathlib.Path, evaluation_log_fd: int, limit: int = 16 * 1024 * 1024, ) -> asyncio.subprocess.Process: @@ -50,13 +49,10 @@ async def perform_evaluation( nixpkgs_config = "{ config = { allowUnfree = true; inHydra = false; allowInsecurePredicate = (_: true); scrubJobs = false; }; };" evaluation_wrapper = f"(import {{ nixpkgsArgs = {nixpkgs_config} }})" arguments = [ - "--show-input-drvs", "--force-recurse", "--meta", "--repair", "--quiet", - "--eval-store", - eval_store, "--expr", evaluation_wrapper, "--include", @@ -109,9 +105,9 @@ async def realtime_batch_process_attributes( return None start = time.time() - ingester = SyncBatchAttributeIngester(evaluated) + ingester = SyncBatchAttributeIngester(evaluated, parent_evaluation) await sync_to_async(ingester.initialize)() - drvs = await sync_to_async(ingester.ingest)(parent_evaluation) + drvs = await sync_to_async(ingester.ingest)() elapsed = time.time() - start logger.info( @@ -207,56 +203,55 @@ async def evaluation_entrypoint( ) as working_tree, aiofiles.open(evaluation_log_filepath, "w") as eval_log, ): - with tempfile.TemporaryDirectory() as eval_store: - # Kickstart the evaluation asynchronously. - eval_process = await perform_evaluation( - working_tree.path, pathlib.Path(eval_store), eval_log.fileno() + # Kickstart the evaluation asynchronously. + eval_process = await perform_evaluation( + working_tree.path, eval_log.fileno() + ) + assert eval_process.stdout is not None, ( + "Expected a valid `stdout` pipe for the asynchronous evaluation process" + ) + + # The idea here is that we want to match as close as possible + # our evaluation speed. So, we read as much lines as possible + # and then insert them During the insertion time, more lines + # may come in our internal buffer. On the next read, we will + # drain them again. + # Adding an item in the database takes around 1s max. + # So we don't want to wait more than one second for all the lines we can get. + count = 0 + async for lines in drain_lines(eval_process.stdout): + await realtime_batch_process_attributes( + evaluation, [line.decode("utf8") for line in lines] ) - assert eval_process.stdout is not None, ( - "Expected a valid `stdout` pipe for the asynchronous evaluation process" + count += len(lines) + # Wait for `nix-eval-jobs` to exit, at this point, + # It should be fairly quick because EOF has been reached. + rc = await eval_process.wait() + elapsed = time.time() - start + if rc in (SIGSEGV, SIGABRT): + raise RuntimeError("`nix-eval-jobs` crashed!") + elif rc != 0: + logger.error( + "`nix-eval-jobs` failed to evaluate (non-zero exit status), check the evaluation logs" + ) + await NixEvaluation.objects.filter(id=evaluation.pk).aupdate( + state=NixEvaluation.EvaluationState.FAILED, + elapsed=elapsed, + updated_at=timezone.now(), + ) + else: + logger.info( + "Processed %d attributes from %s in %f seconds", + count, + evaluation, + elapsed, + ) + await NixEvaluation.objects.filter(id=evaluation.pk).aupdate( + state=NixEvaluation.EvaluationState.COMPLETED, + elapsed=elapsed, + failure_reason=None, + updated_at=timezone.now(), ) - - # The idea here is that we want to match as close as possible - # our evaluation speed. So, we read as much lines as possible - # and then insert them During the insertion time, more lines - # may come in our internal buffer. On the next read, we will - # drain them again. - # Adding an item in the database takes around 1s max. - # So we don't want to wait more than one second for all the lines we can get. - count = 0 - async for lines in drain_lines(eval_process.stdout): - await realtime_batch_process_attributes( - evaluation, [line.decode("utf8") for line in lines] - ) - count += len(lines) - # Wait for `nix-eval-jobs` to exit, at this point, - # It should be fairly quick because EOF has been reached. - rc = await eval_process.wait() - elapsed = time.time() - start - if rc in (SIGSEGV, SIGABRT): - raise RuntimeError("`nix-eval-jobs` crashed!") - elif rc != 0: - logger.error( - "`nix-eval-jobs` failed to evaluate (non-zero exit status), check the evaluation logs" - ) - await NixEvaluation.objects.filter(id=evaluation.pk).aupdate( - state=NixEvaluation.EvaluationState.FAILED, - elapsed=elapsed, - updated_at=timezone.now(), - ) - else: - logger.info( - "Processed %d derivations from %s in %f seconds", - count, - evaluation, - elapsed, - ) - await NixEvaluation.objects.filter(id=evaluation.pk).aupdate( - state=NixEvaluation.EvaluationState.COMPLETED, - elapsed=elapsed, - failure_reason=None, - updated_at=timezone.now(), - ) except Exception as e: elapsed = time.time() - start logger.exception( diff --git a/src/shared/management/commands/clean_up_crashed_evaluations.py b/src/shared/management/commands/garbage_collect_derivations.py similarity index 64% rename from src/shared/management/commands/clean_up_crashed_evaluations.py rename to src/shared/management/commands/garbage_collect_derivations.py index e5bfa38f7..b7b1dfbf6 100644 --- a/src/shared/management/commands/clean_up_crashed_evaluations.py +++ b/src/shared/management/commands/garbage_collect_derivations.py @@ -1,4 +1,5 @@ from argparse import ArgumentParser +from itertools import chain from typing import Any from django.core.management.base import BaseCommand @@ -25,20 +26,33 @@ def handle(self, *args: Any, **options: Any) -> None: # FIXME(@fricklerhandwerk): This assumes derivations are deleted when their metadata is deleted, which is the case at the time of writing. # Derivations should instead be protected, but currently we're not deduplicating any metadata. # Fix this logic when derivation metadata is deduplicated! + # FIXME(@fricklerhandwerk): This orphans `NixDerivationOutput` row, which are currently already 100x redundant. + # It's impractical to garbage-collect them without copying, so the fix would be to rewire all live derivations in a migration, and drop that table. metas = NixDerivation.objects.filter( - parent_evaluation__state=NixEvaluation.EvaluationState.CRASHED + parent_evaluation__state__in=[ + NixEvaluation.EvaluationState.CRASHED, + NixEvaluation.EvaluationState.FAILED, + ] # XXX(@fricklerhandwerk): We could exclude drvs from crashed evals that belong to accepted or published suggestions to be safe. # But checking the production database, all such instances have drvs from completed evals with the same attribute name, # and we're not creating any new problematic instances. ).values_list("metadata", flat=True) - self.stdout.write("Querying derivations from crashed evaluations...") + orphaned = NixDerivationMeta.objects.filter( + derivation__isnull=True + ).values_list("id", flat=True) + + all_metas = chain(metas, orphaned) + + self.stdout.write("Querying derivations from crashed and failed evaluations...") batch_size = options["batch_size"] - total = metas.count() - metas_list = list(metas) + metas_list = list(all_metas) + total = len(metas_list) - self.stdout.write(f"Found {total} derivations") + self.stdout.write( + f"Found {metas.count()} derivations and {orphaned.count()} orphaned metadata entries" + ) for i in range(0, total, batch_size): batch = metas_list[i : i + batch_size] diff --git a/src/shared/management/commands/run_evaluation.py b/src/shared/management/commands/run_evaluation.py new file mode 100644 index 000000000..d9505aabf --- /dev/null +++ b/src/shared/management/commands/run_evaluation.py @@ -0,0 +1,52 @@ +import asyncio +import textwrap +from argparse import ArgumentParser +from typing import Any + +from django.conf import settings +from django.core.management.base import BaseCommand, CommandError + +from shared.listeners.nix_channels import enqueue_evaluation_job +from shared.listeners.nix_evaluation import evaluation_entrypoint +from shared.models import NixChannel, NixEvaluation + + +class Command(BaseCommand): + help = ( + "Evaluate the given commit from a fetched channel and ingest the resulting data" + ) + + def add_arguments(self, parser: ArgumentParser) -> None: + parser.add_argument( + "commit", + type=str, + help="Nixpkgs commit to evaluate", + ) + + def handle(self, *args: Any, **kwargs: Any) -> str | None: + try: + channel = NixChannel.objects.get(head_sha1_commit=kwargs["commit"]) + except NixChannel.DoesNotExist: + raise CommandError( + textwrap.dedent(""" + Need a commit from a fetched channel! + To fetch all channels, run: + + manage fetch_all_channels + """) + ) + try: + evaluation = NixEvaluation.objects.select_related("channel").get( + commit_sha1=kwargs["commit"] + ) + except NixEvaluation.DoesNotExist: + enqueue_evaluation_job(channel) + evaluation = NixEvaluation.objects.select_related("channel").get( + commit_sha1=kwargs["commit"] + ) + asyncio.run( + evaluation_entrypoint( + settings.DEFAULT_SLEEP_WAITING_FOR_EVALUATION_SLOT, + evaluation, + ) + ) diff --git a/src/shared/migrations/0068_alter_nixmaintainer_github.py b/src/shared/migrations/0068_alter_nixmaintainer_github.py new file mode 100644 index 000000000..36b64c9df --- /dev/null +++ b/src/shared/migrations/0068_alter_nixmaintainer_github.py @@ -0,0 +1,18 @@ +# Generated by Django 5.2.9 on 2026-02-07 05:50 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('shared', '0067_remove_nixchannel_pgpubsub_ae5b1_and_more'), + ] + + operations = [ + migrations.AlterField( + model_name='nixmaintainer', + name='github', + field=models.CharField(max_length=200), + ), + ] diff --git a/src/shared/migrations/0069_remove_nixderivation_dependencies_and_more.py b/src/shared/migrations/0069_remove_nixderivation_dependencies_and_more.py new file mode 100644 index 000000000..64c749bd9 --- /dev/null +++ b/src/shared/migrations/0069_remove_nixderivation_dependencies_and_more.py @@ -0,0 +1,37 @@ +# Generated by Django 5.2.9 on 2026-02-12 12:08 + +from django.db import migrations + + +class Migration(migrations.Migration): + + dependencies = [ + ('shared', '0068_alter_nixmaintainer_github'), + ] + + operations = [ + migrations.RemoveField( + model_name='nixderivation', + name='dependencies', + ), + migrations.RemoveField( + model_name='nixderivationoutput', + name='outputs', + ), + migrations.RemoveField( + model_name='nixderivation', + name='outputs', + ), + migrations.DeleteModel( + name='NixDerivationDependencyThrough', + ), + migrations.DeleteModel( + name='NixDerivationOutput', + ), + migrations.DeleteModel( + name='NixOutput', + ), + migrations.DeleteModel( + name='NixStorePathOutput', + ), + ] diff --git a/src/shared/models/issue.py b/src/shared/models/issue.py index 90a8beb3b..56670e234 100644 --- a/src/shared/models/issue.py +++ b/src/shared/models/issue.py @@ -101,8 +101,7 @@ def generate_code( except IntegrityError: continue raise RuntimeError( - "Failed to generate unique issue code for '%s'", - instance.suggestion.cve.cve_id, + f"Failed to generate unique issue code for '{instance.suggestion.cve.cve_id}'" ) diff --git a/src/shared/models/linkage.py b/src/shared/models/linkage.py index 9ab221ed2..cc4d0c443 100644 --- a/src/shared/models/linkage.py +++ b/src/shared/models/linkage.py @@ -59,6 +59,21 @@ def is_editable(self) -> bool: CVEDerivationClusterProposal.Status.ACCEPTED, ] + def ignore_package(self, package: str) -> None: + edit, created = self.package_edits.get_or_create( + package_attribute=package, + defaults={"edit_type": PackageEdit.EditType.REMOVE}, + ) + if not created and edit.edit_type != PackageEdit.EditType.REMOVE: + edit.edit_type = PackageEdit.EditType.REMOVE + edit.save() + + def restore_package(self, package: str) -> None: + self.package_edits.filter( + package_attribute=package, + edit_type=PackageEdit.EditType.REMOVE, + ).delete() + @pghistory.track( pghistory.ManualEvent("maintainers.add"), diff --git a/src/shared/models/nix_evaluation.py b/src/shared/models/nix_evaluation.py index f70db5c41..ec318a15f 100644 --- a/src/shared/models/nix_evaluation.py +++ b/src/shared/models/nix_evaluation.py @@ -24,7 +24,7 @@ class NixMaintainer(models.Model): """ github_id = models.IntegerField(unique=True, primary_key=True) - github = models.CharField(max_length=200, unique=True) + github = models.CharField(max_length=200) email = models.CharField(max_length=200, null=True) matrix = models.CharField(max_length=200, null=True) name = models.CharField(max_length=200, null=True) @@ -135,45 +135,6 @@ class Meta: # type: ignore[override] ] -class NixOutput(models.Model): - """ - This is all the known outputs names. - """ - - output_name = models.CharField(max_length=255, unique=True) - - def __str__(self) -> str: - return self.output_name - - -class NixStorePathOutput(models.Model): - """ - This is all the outputs of a given derivation, e.g. out, doc, etc. - associated to their store paths. - - This represents in database as '{store_path}!{out}'. - """ - - store_path = models.CharField(max_length=255, unique=True) - - def __hash__(self) -> int: - return hash(self.store_path) - - -class NixDerivationOutput(models.Model): - """ - A derivation may depend on another derivation, - but it must specify two things: - - - derivation path - - output depended upon - e.g. depending on /nix/store/eeeeeeeeeeeeeee-something.drv and its 'out' output. - """ - - outputs = models.ManyToManyField(NixOutput) - derivation_path = models.CharField(max_length=255) - - class NixChannel(TimeStampMixin): """ This represents a "Nixpkgs" (*) channel, e.g. @@ -270,22 +231,6 @@ class Meta: # type: ignore[override] unique_together = ("channel", "commit_sha1") -class NixDerivationDependencyThrough(models.Model): - pk = models.CompositePrimaryKey("nixderivation_id", "nixderivationoutput_id") - - nixderivation = models.ForeignKey( - "NixDerivation", - on_delete=models.CASCADE, - ) - nixderivationoutput = models.ForeignKey( - NixDerivationOutput, - on_delete=models.CASCADE, - ) - - class Meta: - db_table = "shared_nixderivation_dependencies" - - class NixDerivation(models.Model): """ This represents a Nix derivation "evaluated", @@ -297,18 +242,15 @@ class NixDerivation(models.Model): attribute = models.CharField(max_length=255) derivation_path = models.CharField(max_length=255) - dependencies = models.ManyToManyField( - NixDerivationOutput, - through=NixDerivationDependencyThrough, - ) name = models.CharField(max_length=255) + # FIXME(@fricklerhandwerk): [tag:deduplicate-metadata] We always need the metadata, so it shouldn't be in a separate table. + # Also we're currently paying for the join for practically all accesses: only 2k out of 35M have no metadata at all. metadata = models.OneToOneField( NixDerivationMeta, related_name="derivation", on_delete=models.CASCADE, null=True, ) - outputs = models.ManyToManyField(NixStorePathOutput) system = models.CharField(max_length=255) parent_evaluation = models.ForeignKey( NixEvaluation, related_name="derivations", on_delete=models.CASCADE diff --git a/src/webview/suggestions/views/base.py b/src/webview/suggestions/views/base.py index 55f7ece21..6b8a13533 100644 --- a/src/webview/suggestions/views/base.py +++ b/src/webview/suggestions/views/base.py @@ -113,6 +113,9 @@ class ForbiddenOperationError(Exception): def __init__(self, response: HttpResponse) -> None: self.error = response + # FIXME(@fricklerhandwerk): This conflates access control with database queries and bypasses the standard Django mechanism of overriding the respective view methods. + # The main problem here is that it results in very inefficient queries, as we can't express fetching related data in one go. + # A minor problem for now is obscured and coarse-grained access control, but there's no user story at the moment for which it's in the way. def _check_access_rights_and_get_suggestion( self, request: HttpRequest, suggestion_id: int ) -> tuple[CVEDerivationClusterProposal, SuggestionContext]: diff --git a/src/webview/suggestions/views/packages.py b/src/webview/suggestions/views/packages.py index 702b0ae4c..4ac5e0884 100644 --- a/src/webview/suggestions/views/packages.py +++ b/src/webview/suggestions/views/packages.py @@ -3,12 +3,16 @@ from django.db import transaction from django.http import HttpRequest, HttpResponse -from shared.listeners.cache_suggestions import apply_package_edits +from shared.listeners.cache_suggestions import ( + CachedSuggestion, + apply_package_edits, + categorize_maintainers, +) from shared.models.linkage import ( CVEDerivationClusterProposal, - PackageEdit, ) from webview.suggestions.context.builders import ( + get_maintainer_list_context, get_package_list_context, ) @@ -40,11 +44,19 @@ def post( try: with transaction.atomic(): self._perform_operation(suggestion, package_attr) - new_active_packages = apply_package_edits( + suggestion.cached.payload["packages"] = apply_package_edits( suggestion.cached.payload["original_packages"], suggestion.package_edits.all(), ) - suggestion.cached.payload["packages"] = new_active_packages + suggestion.cached.payload["categorized_maintainers"] = ( + categorize_maintainers( + { + k: CachedSuggestion.Package.model_validate(v) + for k, v in suggestion.cached.payload["packages"].items() + }, + suggestion.maintainers_edits.all(), + ).model_dump() + ) suggestion.cached.save() except Exception: return self._handle_error( @@ -55,6 +67,9 @@ def post( # Refresh the package list context and activity log suggestion_context.package_list_context = get_package_list_context(suggestion) + suggestion_context.maintainer_list_context = get_maintainer_list_context( + suggestion + ) suggestion_context.activity_log = fetch_activity_log(suggestion.pk) # Handle response based on request type @@ -88,13 +103,7 @@ def _perform_operation( self, suggestion: CVEDerivationClusterProposal, package_attr: str ) -> None: """Create or update PackageEdit to ignore the package.""" - edit, created = suggestion.package_edits.get_or_create( - package_attribute=package_attr, - defaults={"edit_type": PackageEdit.EditType.REMOVE}, - ) - if not created and edit.edit_type != PackageEdit.EditType.REMOVE: - edit.edit_type = PackageEdit.EditType.REMOVE - edit.save() + suggestion.ignore_package(package_attr) def _get_operation_name(self) -> str: return "ignore" @@ -107,10 +116,7 @@ def _perform_operation( self, suggestion: CVEDerivationClusterProposal, package_attr: str ) -> None: """Remove PackageEdit entries to restore the package.""" - suggestion.package_edits.filter( - package_attribute=package_attr, - edit_type=PackageEdit.EditType.REMOVE, - ).delete() + suggestion.restore_package(package_attr) def _get_operation_name(self) -> str: return "restore" diff --git a/src/webview/templates/suggestions/components/maintainers_list.html b/src/webview/templates/suggestions/components/maintainers_list.html index f4dc6803f..cfefac4f0 100644 --- a/src/webview/templates/suggestions/components/maintainers_list.html +++ b/src/webview/templates/suggestions/components/maintainers_list.html @@ -11,7 +11,6 @@

Package maintainers

- {% if data.editable and user|is_maintainer_or_admin %}

Maintainers of ignored packages won't be notified.

{% endif %}
{% if data.active %}