Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions infra/production.nix
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ in
EMAIL_USE_SSL = true;
EMAIL_HOST_USER = "noreply-securitytracker@nixos.org";
DEFAULT_FROM_EMAIL = "noreply-securitytracker@nixos.org";
ACTIVE_MATCHING_ALGORITHM_VERSION = 1;
};

secrets = {
Expand Down
1 change: 1 addition & 0 deletions infra/staging.nix
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ in
GH_SECURITY_TEAM = "sectracker-testing-security";
GH_COMMITTERS_TEAM = "sectracker-testing-committers";
EMAIL_BACKEND = "django.core.mail.backends.console.EmailBackend";
ACTIVE_MATCHING_ALGORITHM_VERSION = 1;
};

secrets = {
Expand Down
19 changes: 19 additions & 0 deletions src/project/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,25 @@ class DjangoSettings(BaseModel):
""",
default=1_000,
)
ACTIVE_MATCHING_ALGORITHM_VERSION: int = Field(
description="""
Controls which registered matching algorithm version is used when
linking CVEs to derivations. Must match a VERSION defined in
shared/listeners/algorithms/. Bump this setting to activate a new
algorithm version without changing code.
Comment on lines +174 to +177
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't work. Changing an environment variable for us is exactly equivalent to changing code, because there's no such thing as a deployment that doesn't touch the source.

That means, for our purposes at this point in development it would just as good (and a lot simpler) to hard-code it in e.g. the suggestion model like we have it for caching (and as discussed in #722 (comment)):

@classproperty
def CURRENT_SCHEMA_VERSION(cls) -> int: # noqa: N802, N805
return 2

""",
default=1,
)
CANDIDATE_MATCHING_ALGORITHM_VERSION: int | None = Field(
description="""
Optional. When set, identifies a new algorithm version being evaluated
in parallel. The candidate version does not run automatically — it is
only invoked by the test-run management command, which generates proposals
tagged with this version number for later metric comparison.
Set to None when no candidate is under evaluation.
""",
default=None,
)
Comment on lines +181 to +190
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we actually need that parameter (regardless of where it lives). Arguably we could just dry-run all non-active versions above the active one. We can control the number of them by putting them in that directory or not.

SHOW_DEMO_DISCLAIMER: bool = Field(
description="""
When set to True, the application will display a disclaimer about
Expand Down
57 changes: 57 additions & 0 deletions src/shared/listeners/algorithms/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
"""
Algorithm registry for CVE-to-derivation matching.

Each algorithm version lives in its own module (v1.py, v2.py, ...) and must expose:
- VERSION
- build_new_links

Modules register themselves by calling `register()`. The listener in
"""

from typing import Protocol

from django.conf import settings

from shared.models.cve import Container


class MatchingAlgorithm(Protocol):
VERSION: int

def build_new_links(self, container: Container) -> bool: ...
Comment thread
adekoder marked this conversation as resolved.
Dismissed


_registry: dict[int, MatchingAlgorithm] = {}


def register(module: MatchingAlgorithm) -> None:
"""Register an algorithm module under its VERSION."""
_registry[module.VERSION] = module
Comment on lines +27 to +29
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we just read the directory and map the v*.py filenames to versions automatically?



def _resolve(version: int) -> MatchingAlgorithm:
"""Return a resigister alogirithm that match the version."""
if not _registry:
raise RuntimeError("No matching algorithm registered.")
try:
return _registry[version]
except KeyError:
raise KeyError(f"No matching algorithm registered for version {version}.")
Comment on lines +32 to +39
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be perfectly enough to check in a test that selecting the active and candidate version works, simply to guard against typos and omissions. Since for now we're controlling the selection in code, all we need to guarantee is basic consistency.

Then the actual selection is simply _registry[version] right there at the call site.



def current_algorithm() -> MatchingAlgorithm:
"""Return the active algorithm (ACTIVE_MATCHING_ALGORITHM_VERSION)."""
return _resolve(settings.ACTIVE_MATCHING_ALGORITHM_VERSION)


# TODO (@adekoder) will be used when we create the process to run the inactive new algorithm
# verison
def candidate_algorithm() -> MatchingAlgorithm | None:
"""
Return the candidate algorithm (CANDIDATE_MATCHING_ALGORITHM_VERSION),
or None if no candidate is configured.
"""
version = settings.CANDIDATE_MATCHING_ALGORITHM_VERSION
if version is None:
return None
return _resolve(version)
196 changes: 196 additions & 0 deletions src/shared/listeners/algorithms/v1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
"""
Matching algorithm version 1.

Candidates are found by matching package name / product name (case-insensitive
substring) against derivation names in the latest completed evaluation of each
major channel. No version constraint checking is applied at this stage.
"""

import logging
import sys

from django.conf import settings
from django.db.models import (
Case,
Exists,
F,
IntegerField,
OuterRef,
Q,
QuerySet,
Value,
When,
Window,
)
from django.db.models.functions import RowNumber

from shared.models.cve import AffectedProduct, Container, Cpe
from shared.models.linkage import CVEDerivationClusterProposal, ProvenanceFlags
from shared.models.nix_evaluation import MAJOR_CHANNELS, NixDerivation, NixEvaluation

from . import register

VERSION: int = 1

logger = logging.getLogger(__name__)


def produce_linkage_candidates(
container: Container,
filtered_affected: QuerySet[AffectedProduct],
) -> dict[NixDerivation, ProvenanceFlags]:
# FIXME(@fricklerhandwerk): This will fall apart when we obtain the channel structure dynamically [ref:channel-structure]
active_channels_q = Q()
for ch in MAJOR_CHANNELS:
active_channels_q |= Q(channel__channel_branch__contains=ch)

latest_complete_channels = (
NixEvaluation.objects.filter(
active_channels_q,
state=NixEvaluation.EvaluationState.COMPLETED,
)
.annotate(
row_num=Window(
expression=RowNumber(),
partition_by=[F("channel")],
order_by=F("updated_at").desc(),
),
)
.filter(row_num=1)
)

package_names = (
filtered_affected.exclude(package_name__isnull=True)
.values_list("package_name", flat=True)
.distinct()
)
products = (
filtered_affected.exclude(product__isnull=True)
.values_list("product", flat=True)
.distinct()
)

package_q = Q()
for name in package_names:
package_q |= Q(name__icontains=name)

product_q = Q()
for product in products:
product_q |= Q(name__icontains=product)

if not package_q | product_q:
return {}

annotations = {}
if package_q:
annotations["package_match"] = Case(
When(package_q, then=Value(ProvenanceFlags.PACKAGE_NAME_MATCH)),
default=Value(0),
output_field=IntegerField(),
)
if product_q:
annotations["product_match"] = Case(
When(product_q, then=Value(ProvenanceFlags.PRODUCT_MATCH)),
default=Value(0),
output_field=IntegerField(),
)

candidates: dict[NixDerivation, ProvenanceFlags] = {}
matches = NixDerivation.objects.filter(
package_q | product_q,
parent_evaluation__in=list(latest_complete_channels),
).annotate(**annotations)
for drv in matches.iterator():
flags = getattr(drv, "package_match", 0) | getattr(drv, "product_match", 0)
candidates[drv] = ProvenanceFlags(flags)

return candidates


def build_new_links(container: Container) -> bool:
if container.cve.triaged:
logger.info(
"Container received for '%s', but already triaged, skipping linkage.",
container.cve,
)
return False

if CVEDerivationClusterProposal.objects.filter(
cve=container.cve, algorithm_version=VERSION
).exists():
logger.info("Suggestion already exists for '%s', skipping", container.cve)
return False

if container.tags.filter(value="exclusively-hosted-service").exists():
logger.info(
"Container for '%s' is exclusively-hosted-service, rejecting without match.",
container.cve,
)
CVEDerivationClusterProposal.objects.create(
cve=container.cve,
status=CVEDerivationClusterProposal.Status.REJECTED,
rejection_reason=CVEDerivationClusterProposal.RejectionReason.EXCLUSIVELY_HOSTED_SERVICE,
algorithm_version=VERSION,
)
return True

has_any_cpe = Exists(Cpe.objects.filter(affectedproduct=OuterRef("pk")))
has_non_hardware_cpe = Exists(
Cpe.objects.filter(affectedproduct=OuterRef("pk")).exclude(
name__istartswith="cpe:2.3:h:"
)
)
filtered_affected = container.affected.exclude(has_any_cpe & ~has_non_hardware_cpe)

if container.affected.exists() and not filtered_affected.exists():
logger.info(
"Container for '%s' has only hardware CPEs, rejecting without match.",
container.cve,
)
CVEDerivationClusterProposal.objects.create(
cve=container.cve,
status=CVEDerivationClusterProposal.Status.REJECTED,
rejection_reason=CVEDerivationClusterProposal.RejectionReason.HARDWARE_ONLY_CPE,
algorithm_version=VERSION,
)
return True

drvs = produce_linkage_candidates(container, filtered_affected)
if not drvs:
logger.info("No derivations matching '%s', ignoring", container.cve)
return False

if len(drvs) > settings.MAX_MATCHES:
logger.warning(
"More than '%d' derivations matching '%s', ignoring",
settings.MAX_MATCHES,
container.cve,
)
return False

proposal = CVEDerivationClusterProposal.objects.create(
cve=container.cve,
algorithm_version=VERSION,
)

drvs_throughs = [
CVEDerivationClusterProposal.derivations.through(
proposal_id=proposal.pk, derivation_id=drv.pk, provenance_flags=flags
)
for drv, flags in drvs.items()
]

CVEDerivationClusterProposal.derivations.through.objects.bulk_create(drvs_throughs)

if drvs_throughs:
logger.info(
"Matching suggestion for '%s': %d derivations found.",
container.cve,
len(drvs_throughs),
)

return True


# Self-register when imported
register(sys.modules[__name__]) # type: ignore[arg-type]
Loading