Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
dd42560
Improve generate_rocky_config.py version matching and naming flexibility
rockythorn Nov 10, 2025
95319c3
Simplify conditional logic in generate_rocky_config.py
rockythorn Nov 12, 2025
cb9aacc
Return all advisories with CVEs
rockythorn Oct 21, 2025
531bbd0
Remove redundant comments from OSV API
rockythorn Nov 12, 2025
f1d918b
Simplify OSV API advisory filtering logic
rockythorn Nov 13, 2025
25f74e0
Fix CSAF parser for modular packages and add EUS filtering
rockythorn Nov 7, 2025
48a57c8
Fix CSV merge to prioritize changes.csv over releases.csv
rockythorn Nov 7, 2025
c555bcf
Add web UI for managing CSAF index timestamp
rockythorn Nov 7, 2025
ced651a
Fix test_csaf_processing to work with refactored CSAF parser and Bazel
rockythorn Nov 10, 2025
d43f40c
Refactor EUS product identifiers into file-level constants
rockythorn Nov 13, 2025
12695ae
Simplify package extraction logic
rockythorn Nov 13, 2025
43ee733
Use Pythonic empty set check
rockythorn Nov 13, 2025
6b997e5
Remove redundant comments from CSAF processing code
rockythorn Nov 13, 2025
c87bb75
Improve exception handling in database_service
rockythorn Nov 13, 2025
db07012
Refactor nested functions to pure functions
rockythorn Nov 13, 2025
0dc061c
Move EUS-only check earlier to avoid unnecessary work
rockythorn Nov 13, 2025
42aea5c
Fix config import validation issues
rockythorn Nov 3, 2025
2e3b51f
Update tests for integer Decimal serialization
rockythorn Nov 6, 2025
6c7eb9e
Remove unnecessary comments
rockythorn Nov 13, 2025
98d9112
Remove redundant comments from validation module
rockythorn Nov 13, 2025
7182460
Add active field to mirror configuration
rockythorn Nov 4, 2025
35258c3
Fix active checkbox not saving when unchecked
rockythorn Nov 4, 2025
7616756
Add mirror sorting and status visualization
rockythorn Nov 4, 2025
3ab6eeb
Skip inactive mirrors in RHMatcherWorkflow
rockythorn Nov 5, 2025
25fee12
Fix duplicate loop bug in block_remaining_rh_advisories
rockythorn Nov 5, 2025
10e11f3
Simplify checkbox handling for mirror active field
rockythorn Nov 6, 2025
ada78f0
Add tests for active field checkbox handling
rockythorn Nov 6, 2025
7e01a2d
Remove redundant comments from mirror active field code
rockythorn Nov 13, 2025
0f8300e
Extract duplicate repomd form validation into helper function
rockythorn Nov 13, 2025
d4d670c
Remove unnecessary comments
rockythorn Nov 13, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion .dockerignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
node_modules
.venv
.ijwb
.idea
.idea
temp
csaf_analysis
bazel-*
.git
container_data
2 changes: 2 additions & 0 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ jobs:
bazel test //apollo/tests:test_auth --test_output=all
bazel test //apollo/tests:test_validation --test_output=all
bazel test //apollo/tests:test_admin_routes_supported_products --test_output=all
bazel test //apollo/tests:test_api_osv --test_output=all
bazel test //apollo/tests:test_database_service --test_output=all

- name: Integration Tests
run: ./build/scripts/test.bash
1 change: 1 addition & 0 deletions apollo/db/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ class SupportedProductsRhMirror(Model):
match_major_version = fields.IntField()
match_minor_version = fields.IntField(null=True)
match_arch = fields.CharField(max_length=255)
active = fields.BooleanField(default=True)

rpm_repomds: fields.ReverseRelation["SupportedProductsRpmRepomd"]
rpm_rh_overrides: fields.ReverseRelation["SupportedProductsRpmRhOverride"]
Expand Down
11 changes: 11 additions & 0 deletions apollo/migrations/20251104111759_add_mirror_active_field.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
-- migrate:up
alter table supported_products_rh_mirrors
add column active boolean not null default true;

create index supported_products_rh_mirrors_active_idx
on supported_products_rh_mirrors(active);


-- migrate:down
drop index if exists supported_products_rh_mirrors_active_idx;
alter table supported_products_rh_mirrors drop column active;
242 changes: 183 additions & 59 deletions apollo/rhcsaf/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,50 +4,86 @@
from common.logger import Logger
from apollo.rpm_helpers import parse_nevra

# Initialize Info before Logger for this module

logger = Logger()

EUS_CPE_PRODUCTS = frozenset([
"rhel_eus", # Extended Update Support
"rhel_e4s", # Update Services for SAP Solutions
"rhel_aus", # Advanced Update Support (IBM Power)
"rhel_tus", # Telecommunications Update Service
])

EUS_PRODUCT_NAME_KEYWORDS = frozenset([
"e4s",
"eus",
"aus",
"tus",
"extended update support",
"update services for sap",
"advanced update support",
"telecommunications update service",
])

def _is_eus_product(product_name: str, cpe: str) -> bool:
"""
Detects if a product is EUS-related based on product name and CPE.

Args:
product_name: Full product name (e.g., "Red Hat Enterprise Linux AppStream E4S (v.9.0)")
cpe: CPE string (e.g., "cpe:/a:redhat:rhel_e4s:9.0::appstream")

Returns:
True if product is EUS/E4S/AUS/TUS, False otherwise
"""
if cpe:
parts = cpe.split(":")
if len(parts) > 3:
cpe_product = parts[3]
if cpe_product in EUS_CPE_PRODUCTS:
return True

if product_name:
name_lower = product_name.lower()
for keyword in EUS_PRODUCT_NAME_KEYWORDS:
if keyword in name_lower:
return True

return False


def extract_rhel_affected_products_for_db(csaf: dict) -> set:
"""
Extracts all needed info for red_hat_advisory_affected_products table from CSAF product_tree.
Expands 'noarch' to all main arches and maps names to user-friendly values.
Returns a set of tuples: (variant, name, major_version, minor_version, arch)
"""
# Maps architecture short names to user-friendly product names
arch_name_map = {
"aarch64": "Red Hat Enterprise Linux for ARM 64",
"x86_64": "Red Hat Enterprise Linux for x86_64",
"s390x": "Red Hat Enterprise Linux for IBM z Systems",
"ppc64le": "Red Hat Enterprise Linux for Power, little endian",
}
# List of main architectures to expand 'noarch'
main_arches = list(arch_name_map.keys())
affected_products = set()
product_tree = csaf.get("product_tree", {})
if not product_tree:
logger.warning("No product tree found in CSAF document")
return affected_products

# Iterate over all vendor branches in the product tree
for vendor_branch in product_tree.get("branches", []):
# Find the product_family branch for RHEL
family_branch = None
arches = set()
for branch in vendor_branch.get("branches", []):
if branch.get("category") == "product_family" and branch.get("name") == "Red Hat Enterprise Linux":
family_branch = branch
# Collect all architecture branches at the same level as product_family
elif branch.get("category") == "architecture":
arch = branch.get("name")
if arch:
arches.add(arch)
# If 'noarch' is present, expand to all main architectures
if "noarch" in arches:
arches = set(main_arches)
if not family_branch:
continue
# Find the product_name branch for CPE/version info
prod_name = None
cpe = None
for branch in family_branch.get("branches", []):
Expand All @@ -59,51 +95,167 @@ def extract_rhel_affected_products_for_db(csaf: dict) -> set:
if not prod_name or not cpe:
continue

# Parses the CPE string to extract major and minor version numbers
if _is_eus_product(prod_name, cpe):
logger.debug(f"Skipping EUS product: {prod_name}")
continue

# Example CPE: "cpe:/a:redhat:enterprise_linux:9::appstream"
parts = cpe.split(":") # Split the CPE string by colon
parts = cpe.split(":")
major = None
minor = None
if len(parts) > 4:
version = parts[4] # The version is typically the 5th field (index 4)
version = parts[4]
if version:
if "." in version:
# If the version contains a dot, split into major and minor
major, minor = version.split(".", 1)
major = int(major)
minor = int(minor)
else:
# If no dot, only major version is present
major = int(version)

# For each architecture, add a tuple with product info to the set
for arch in arches:
name = arch_name_map.get(arch)
if name is None:
logger.warning(f"'{arch}' not in arch_name_map, skipping.")
continue
if major:
affected_products.add((
family_branch.get("name"), # variant (e.g., "Red Hat Enterprise Linux")
name, # user-friendly architecture name
major, # major version number
minor, # minor version number (may be None)
arch # architecture short name
family_branch.get("name"),
name,
major,
minor,
arch
))
logger.debug(f"Number of affected products: {len(affected_products)}")
return affected_products


def _traverse_for_eus(branches, product_eus_map=None):
"""
Recursively traverse CSAF branches to build EUS product map.

Args:
branches: List of CSAF branch dictionaries to traverse
product_eus_map: Optional dict to accumulate results

Returns:
Dict mapping product_id to boolean indicating if product is EUS
"""
if product_eus_map is None:
product_eus_map = {}

for branch in branches:
category = branch.get("category")

if category == "product_name":
prod = branch.get("product", {})
product_id = prod.get("product_id")

if product_id:
product_name = prod.get("name", "")
cpe = prod.get("product_identification_helper", {}).get("cpe", "")
is_eus = _is_eus_product(product_name, cpe)
product_eus_map[product_id] = is_eus

if "branches" in branch:
_traverse_for_eus(branch["branches"], product_eus_map)

return product_eus_map


def _extract_packages_from_branches(branches, product_eus_map, packages=None):
"""
Recursively traverse CSAF branches to extract package NEVRAs.

Args:
branches: List of CSAF branch dictionaries to traverse
product_eus_map: Dict mapping product_id to EUS status
packages: Optional set to accumulate results

Returns:
Set of NEVRA strings
"""
if packages is None:
packages = set()

for branch in branches:
category = branch.get("category")

if category == "product_version":
prod = branch.get("product", {})
product_id = prod.get("product_id")
purl = prod.get("product_identification_helper", {}).get("purl")

if not product_id:
continue

if purl and not purl.startswith("pkg:rpm/"):
continue

# Product IDs for packages can have format: "AppStream-9.0.0.Z.E4S:package-nevra"
# or just "package-nevra" for packages in product_version entries
skip_eus = False
for eus_prod_id, is_eus in product_eus_map.items():
if is_eus and (":" in product_id and product_id.startswith(eus_prod_id + ":")):
skip_eus = True
break

if skip_eus:
continue

# Format: "package-epoch:version-release.arch" or "package-epoch:version-release.arch::module:stream"
packages.add(product_id.split("::")[0])

if "branches" in branch:
_extract_packages_from_branches(branch["branches"], product_eus_map, packages)

return packages


def _extract_packages_from_product_tree(csaf: dict) -> set:
"""
Extracts fixed packages from CSAF product_tree using product_id fields.
Handles both regular and modular packages by extracting NEVRAs directly from product_id.
Filters out EUS products.

Args:
csaf: CSAF document dict

Returns:
Set of NEVRA strings
"""
product_tree = csaf.get("product_tree", {})

if not product_tree:
return set()

product_eus_map = {}
for vendor_branch in product_tree.get("branches", []):
product_eus_map = _traverse_for_eus(vendor_branch.get("branches", []), product_eus_map)

packages = set()
for vendor_branch in product_tree.get("branches", []):
packages = _extract_packages_from_branches(vendor_branch.get("branches", []), product_eus_map, packages)

return packages


def red_hat_advisory_scraper(csaf: dict):
# At the time of writing there are ~254 advisories that do not have any vulnerabilities.
if not csaf.get("vulnerabilities"):
logger.warning("No vulnerabilities found in CSAF document")
return None

# red_hat_advisories table values
red_hat_issued_at = csaf["document"]["tracking"]["initial_release_date"] # "2025-02-24T03:42:46+00:00"
red_hat_updated_at = csaf["document"]["tracking"]["current_release_date"] # "2025-04-17T12:08:56+00:00"
name = csaf["document"]["tracking"]["id"] # "RHSA-2025:1234"
red_hat_synopsis = csaf["document"]["title"] # "Red Hat Bug Fix Advisory: Red Hat Quay v3.13.4 bug fix release"
name = csaf["document"]["tracking"]["id"]

red_hat_affected_products = extract_rhel_affected_products_for_db(csaf)
if not red_hat_affected_products:
logger.info(f"Skipping advisory {name}: all products are EUS-only")
return None

red_hat_issued_at = csaf["document"]["tracking"]["initial_release_date"]
red_hat_updated_at = csaf["document"]["tracking"]["current_release_date"]
red_hat_synopsis = csaf["document"]["title"]
red_hat_description = None
topic = None
for item in csaf["document"]["notes"]:
Expand All @@ -112,59 +264,31 @@ def red_hat_advisory_scraper(csaf: dict):
elif item["category"] == "summary":
topic = item["text"]
kind_lookup = {"RHSA": "Security", "RHBA": "Bug Fix", "RHEA": "Enhancement"}
kind = kind_lookup[name.split("-")[0]] # "RHSA-2025:1234" --> "Security"
severity = csaf["document"]["aggregate_severity"]["text"] # "Important"
kind = kind_lookup[name.split("-")[0]]
severity = csaf["document"]["aggregate_severity"]["text"]

# To maintain consistency with the existing database, we need to replace the
# To maintain consistency with the existing database, replace
# "Red Hat [KIND] Advisory:" prefixes with the severity level.
red_hat_synopsis = red_hat_synopsis.replace("Red Hat Bug Fix Advisory: ", f"{severity}:")
red_hat_synopsis = red_hat_synopsis.replace("Red Hat Security Advisory:", f"{severity}:")
red_hat_synopsis = red_hat_synopsis.replace("Red Hat Enhancement Advisory: ", f"{severity}:")

# red_hat_advisory_packages table values
red_hat_fixed_packages = set()
red_hat_fixed_packages = _extract_packages_from_product_tree(csaf)

red_hat_cve_set = set()
red_hat_bugzilla_set = set()
product_id_suffix_list = (
".aarch64",
".i386",
".i686",
".noarch",
".ppc",
".ppc64",
".ppc64le",
".s390",
".s390x",
".src",
".x86_64"
) # TODO: find a better way to filter product IDs. This is a workaround for the fact that
# the product IDs in the CSAF documents also contain artifacts like container images
# and we only are interested in RPMs.

for vulnerability in csaf["vulnerabilities"]:
for product_id in vulnerability["product_status"]["fixed"]:
if product_id.endswith(product_id_suffix_list):
# These IDs are in the format product:package_nevra
# ie- AppStream-9.4.0.Z.EUS:rsync-0:3.2.3-19.el9_4.1.aarch64"
split_on_colon = product_id.split(":")
product = split_on_colon[0]
package_nevra = ":".join(split_on_colon[-2:])
red_hat_fixed_packages.add(package_nevra)

# red_hat_advisory_cves table values. Many older advisories do not have CVEs and so we need to handle that.
cve_id = vulnerability.get("cve", None)
cve_cvss3_scoring_vector = vulnerability.get("scores", [{}])[0].get("cvss_v3", {}).get("vectorString", None)
cve_cvss3_base_score = vulnerability.get("scores", [{}])[0].get("cvss_v3", {}).get("baseScore", None)
cve_cwe = vulnerability.get("cwe", {}).get("id", None)
red_hat_cve_set.add((cve_id, cve_cvss3_scoring_vector, cve_cvss3_base_score, cve_cwe))

# red_hat_advisory_bugzilla_bugs table values
for bug_id in vulnerability.get("ids", []):
if bug_id.get("system_name") == "Red Hat Bugzilla ID":
red_hat_bugzilla_set.add(bug_id["text"])

# red_hat_advisory_affected_products table values
red_hat_affected_products = extract_rhel_affected_products_for_db(csaf)

return {
"red_hat_issued_at": str(red_hat_issued_at),
"red_hat_updated_at": str(red_hat_updated_at),
Expand Down
Loading
Loading