Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion src/website/shared/channels.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from dataclasses import dataclass

from pgpubsub.channel import TriggerChannel
from pgpubsub.channel import Channel, TriggerChannel

from shared.models import NixDerivation
from shared.models.cve import Container
Expand Down Expand Up @@ -46,3 +46,12 @@ class CVEDerivationClusterProposalChannel(TriggerChannel):
# We don't need to lock notifications.
# If we are caching twice the same proposal, we will just replace it.
lock_notifications = False


@dataclass
class NixEvaluationCompleteChannel(Channel):
evaluation_id: int
# We do not want to want to perform twice attribute path tracking.
# It's expensive and the second time it's the identity mapping we are constructing.
# We may revisit this if needed.
lock_notifications = True
125 changes: 124 additions & 1 deletion src/website/shared/listeners/nix_evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,18 @@
from django.conf import settings
from django.db.models import Avg

from shared.channels import NixEvaluationChannel
from shared.channels import NixEvaluationChannel, NixEvaluationCompleteChannel
from shared.evaluation import (
SyncBatchAttributeIngester,
parse_evaluation_result,
)
from shared.git import GitRepo
from shared.listeners.cache_suggestions import cache_new_suggestions
from shared.models import NixDerivation, NixEvaluation
from shared.models.linkage import (
CVEDerivationClusterProposal,
DerivationClusterProposalLink,
)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -240,6 +245,14 @@ async def evaluation_entrypoint(
state=NixEvaluation.EvaluationState.COMPLETED,
elapsed=elapsed,
)

# Notify that we have a new evaluation ready and
# any listeners should now proceed to an global update of old derivations
# via attribute path.
pgpubsub.notify(
"shared.channels.NixEvaluationCompleteChannel",
model_id=evaluation.pk,
)
except Exception as e:
elapsed = time.time() - start
logger.exception(
Expand Down Expand Up @@ -279,3 +292,113 @@ def run_evaluation_job(old: NixEvaluation, new: NixEvaluation) -> None:
new,
)
)


def rewire_new_derivations_following_attribute_paths(
proposals: list[CVEDerivationClusterProposal], evaluation: NixEvaluation
) -> None:
"""
This takes a list of proposals which have derivations from an older Nix channel attached to it.
The passed evaluation is supposed to be the result of a newer Nix channel evaluation.

The game is to update all the M2M links from that proposal to the newer derivations.
How to do this? Attribute paths.

Attribute paths are supposed to be constant and can relate from a channel to another.
This is what Hydra does to track what happens to a package build time history, etc.

This falls short whenever we will rename a package, move it to another attribute path, e.g. promoting a GNOME variant
to the top-level space of packages.

In those situations, this is not a big deal, this should not change intrinsically that we made the suggestion
for that GNOME variant based on intrinsic parameters of said package and therefore, we will lose its trace in
this new evaluation and will just "lose" it from this suggestion.

Once we obtain the ability to upgrade existing suggestions, we may re-attach it.
Nonetheless, we should return the list of lost derivations by rewiring with the new evaluation.
"""

# Ideally, we would settle this in a single UPDATE statement which looks like this:
# UPDATE SET derivation_id = nnd.id FROM derivation_cluster_proposal_link JOIN nixderivations AS ond ON ond.id = derivation_id JOIN nixderivations AS nnd ON nnd.attribute_path = ond.attribute_path WHERE proposal_id IN eligible_proposals AND nnd.parent_evaluation_id = new_evaluation_id;
# But this is a complicated one and it does not handle the situation where the right JOIN has missing items, i.e. lost derivations.
# Let's do slowly and we will see the impact in production.
# In general, we do not expect that size of proposal (:= nr of derivations in proposal) to be large and number of proposals should stay low.
# If we end up do one query for _all_ proposals, we are therefore looking at O(size of all proposals merged) in terms of query complexity.

current_links = {
link.derivation.attribute: link
for link in DerivationClusterProposalLink.objects.select_related("derivation")
.filter(proposal__in=proposals)
.iterator()
}
attribute_paths = list(current_links.keys())
new_derivations = {
d.attribute: d
for d in NixDerivation.objects.filter(
attribute__in=attribute_paths, parent_evaluation=evaluation
)
.values_list("attribute", "id")
.iterator()
}
updates = []
# This loop is O(size of all proposals merged) which is ≤ O(sum of size of all proposals).
# Depending on the situation, we may have many suggestions that shares the same derivations because they are suggestions
# for different CVE that ends up affecting the same package set.
# In that scenario, we are reduced to O(max(size of the largest proposal)).
# In the other scenario, where we have suggestions that affects uniformly all of nixpkgs, we are reduced to O(sum of all sizes of all proposals).
# The reality is probably between those two extremes.
for apath in attribute_paths:
current_link = current_links[apath]
# This is a lost derivation.
if apath not in new_derivations:
logger.warning(
"We lost the trace of '%s' following the channel update in '%s', marking that proposal's derivation as outdated."
)
current_link.outdated = True
updates.append(current_link)
# This is a known derivation!
else:
current_link.derivation = new_derivations[apath]
updates.append(current_link)

DerivationClusterProposalLink.objects.bulk_update(
updates, fields=("outdated", "derivation"), batch_size=1_000
)

# TODO: how to handle the activity log here?
# Bulk updates may not trigger anything and we would need special rendering here.
# To inform that the services updated N derivations and could not deal with M derivations (outdated ones).


@pgpubsub.listener(NixEvaluationCompleteChannel)
def run_attribute_tracking_job(evaluation_id: int) -> None:
# Our objective is to update:
# - pending suggestions
# - accepted suggestions
# TODO: in the future, we should not update "mitigated" issues so we can easily revisit _which_ derivation was the last one.
try:
evaluation = NixEvaluation.objects.get(evaluation_id)
except NixEvaluation.DoesNotExist:
logger.warning(
"Evaluation ID '%d' disappeared when we were updating the attributes it induced; this might be normal if we are recovering from a very old state",
evaluation_id,
)
return

# At this point, we have a known full evaluation.
# We would like to select all pending or accepted proposals.
eligible_proposals = list(
CVEDerivationClusterProposal.objects.filter(
status__in=(
CVEDerivationClusterProposal.Status.PENDING,
CVEDerivationClusterProposal.Status.ACCEPTED,
)
)
)
rewire_new_derivations_following_attribute_paths(eligible_proposals, evaluation)

for proposal in eligible_proposals:
# Once a proposal is rewired, we need to recache it.
# TODO(Raito): performance-wise, we could do a more complicated recalculation if we had better guarantees on the schema of derivations inside of it.
# We do not, so we will not until we prove there's a concern about performance.
cache_new_suggestions(proposal)
4 changes: 4 additions & 0 deletions src/website/shared/models/linkage.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ class DerivationClusterProposalLink(models.Model):

derivation = models.ForeignKey(NixDerivation, on_delete=models.CASCADE)

# Whether this M2M is obsolete with regards to the existence of a younger NixEvaluation containing
# a potentially newer derivation.
outdated = models.BooleanField()

# TODO: how to design the integrity here?
# we probably want to add a fancy check here.
provenance_flags = models.IntegerField()
Loading