Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
dd42560
Improve generate_rocky_config.py version matching and naming flexibility
rockythorn Nov 10, 2025
95319c3
Simplify conditional logic in generate_rocky_config.py
rockythorn Nov 12, 2025
cb9aacc
Return all advisories with CVEs
rockythorn Oct 21, 2025
531bbd0
Remove redundant comments from OSV API
rockythorn Nov 12, 2025
f1d918b
Simplify OSV API advisory filtering logic
rockythorn Nov 13, 2025
25f74e0
Fix CSAF parser for modular packages and add EUS filtering
rockythorn Nov 7, 2025
48a57c8
Fix CSV merge to prioritize changes.csv over releases.csv
rockythorn Nov 7, 2025
c555bcf
Add web UI for managing CSAF index timestamp
rockythorn Nov 7, 2025
ced651a
Fix test_csaf_processing to work with refactored CSAF parser and Bazel
rockythorn Nov 10, 2025
d43f40c
Refactor EUS product identifiers into file-level constants
rockythorn Nov 13, 2025
12695ae
Simplify package extraction logic
rockythorn Nov 13, 2025
43ee733
Use Pythonic empty set check
rockythorn Nov 13, 2025
6b997e5
Remove redundant comments from CSAF processing code
rockythorn Nov 13, 2025
c87bb75
Improve exception handling in database_service
rockythorn Nov 13, 2025
db07012
Refactor nested functions to pure functions
rockythorn Nov 13, 2025
0dc061c
Move EUS-only check earlier to avoid unnecessary work
rockythorn Nov 13, 2025
99fcfa4
Fix config import validation issues
rockythorn Nov 3, 2025
4a786bd
Update tests for integer Decimal serialization
rockythorn Nov 6, 2025
b73a5f4
Remove unnecessary comments
rockythorn Nov 13, 2025
046bfb7
Remove redundant comments from validation module
rockythorn Nov 13, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion .dockerignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
node_modules
.venv
.ijwb
.idea
.idea
temp
csaf_analysis
bazel-*
.git
container_data
2 changes: 2 additions & 0 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ jobs:
bazel test //apollo/tests:test_auth --test_output=all
bazel test //apollo/tests:test_validation --test_output=all
bazel test //apollo/tests:test_admin_routes_supported_products --test_output=all
bazel test //apollo/tests:test_api_osv --test_output=all
bazel test //apollo/tests:test_database_service --test_output=all

- name: Integration Tests
run: ./build/scripts/test.bash
242 changes: 183 additions & 59 deletions apollo/rhcsaf/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,50 +4,86 @@
from common.logger import Logger
from apollo.rpm_helpers import parse_nevra

# Initialize Info before Logger for this module

logger = Logger()

EUS_CPE_PRODUCTS = frozenset([
"rhel_eus", # Extended Update Support
"rhel_e4s", # Update Services for SAP Solutions
"rhel_aus", # Advanced Update Support (IBM Power)
"rhel_tus", # Telecommunications Update Service
])

EUS_PRODUCT_NAME_KEYWORDS = frozenset([
"e4s",
"eus",
"aus",
"tus",
"extended update support",
"update services for sap",
"advanced update support",
"telecommunications update service",
])

def _is_eus_product(product_name: str, cpe: str) -> bool:
"""
Detects if a product is EUS-related based on product name and CPE.

Args:
product_name: Full product name (e.g., "Red Hat Enterprise Linux AppStream E4S (v.9.0)")
cpe: CPE string (e.g., "cpe:/a:redhat:rhel_e4s:9.0::appstream")

Returns:
True if product is EUS/E4S/AUS/TUS, False otherwise
"""
if cpe:
parts = cpe.split(":")
if len(parts) > 3:
cpe_product = parts[3]
if cpe_product in EUS_CPE_PRODUCTS:
return True

if product_name:
name_lower = product_name.lower()
for keyword in EUS_PRODUCT_NAME_KEYWORDS:
if keyword in name_lower:
return True

return False


def extract_rhel_affected_products_for_db(csaf: dict) -> set:
"""
Extracts all needed info for red_hat_advisory_affected_products table from CSAF product_tree.
Expands 'noarch' to all main arches and maps names to user-friendly values.
Returns a set of tuples: (variant, name, major_version, minor_version, arch)
"""
# Maps architecture short names to user-friendly product names
arch_name_map = {
"aarch64": "Red Hat Enterprise Linux for ARM 64",
"x86_64": "Red Hat Enterprise Linux for x86_64",
"s390x": "Red Hat Enterprise Linux for IBM z Systems",
"ppc64le": "Red Hat Enterprise Linux for Power, little endian",
}
# List of main architectures to expand 'noarch'
main_arches = list(arch_name_map.keys())
affected_products = set()
product_tree = csaf.get("product_tree", {})
if not product_tree:
logger.warning("No product tree found in CSAF document")
return affected_products

# Iterate over all vendor branches in the product tree
for vendor_branch in product_tree.get("branches", []):
# Find the product_family branch for RHEL
family_branch = None
arches = set()
for branch in vendor_branch.get("branches", []):
if branch.get("category") == "product_family" and branch.get("name") == "Red Hat Enterprise Linux":
family_branch = branch
# Collect all architecture branches at the same level as product_family
elif branch.get("category") == "architecture":
arch = branch.get("name")
if arch:
arches.add(arch)
# If 'noarch' is present, expand to all main architectures
if "noarch" in arches:
arches = set(main_arches)
if not family_branch:
continue
# Find the product_name branch for CPE/version info
prod_name = None
cpe = None
for branch in family_branch.get("branches", []):
Expand All @@ -59,51 +95,167 @@ def extract_rhel_affected_products_for_db(csaf: dict) -> set:
if not prod_name or not cpe:
continue

# Parses the CPE string to extract major and minor version numbers
if _is_eus_product(prod_name, cpe):
logger.debug(f"Skipping EUS product: {prod_name}")
continue

# Example CPE: "cpe:/a:redhat:enterprise_linux:9::appstream"
parts = cpe.split(":") # Split the CPE string by colon
parts = cpe.split(":")
major = None
minor = None
if len(parts) > 4:
version = parts[4] # The version is typically the 5th field (index 4)
version = parts[4]
if version:
if "." in version:
# If the version contains a dot, split into major and minor
major, minor = version.split(".", 1)
major = int(major)
minor = int(minor)
else:
# If no dot, only major version is present
major = int(version)

# For each architecture, add a tuple with product info to the set
for arch in arches:
name = arch_name_map.get(arch)
if name is None:
logger.warning(f"'{arch}' not in arch_name_map, skipping.")
continue
if major:
affected_products.add((
family_branch.get("name"), # variant (e.g., "Red Hat Enterprise Linux")
name, # user-friendly architecture name
major, # major version number
minor, # minor version number (may be None)
arch # architecture short name
family_branch.get("name"),
name,
major,
minor,
arch
))
logger.debug(f"Number of affected products: {len(affected_products)}")
return affected_products


def _traverse_for_eus(branches, product_eus_map=None):
"""
Recursively traverse CSAF branches to build EUS product map.

Args:
branches: List of CSAF branch dictionaries to traverse
product_eus_map: Optional dict to accumulate results

Returns:
Dict mapping product_id to boolean indicating if product is EUS
"""
if product_eus_map is None:
product_eus_map = {}

for branch in branches:
category = branch.get("category")

if category == "product_name":
prod = branch.get("product", {})
product_id = prod.get("product_id")

if product_id:
product_name = prod.get("name", "")
cpe = prod.get("product_identification_helper", {}).get("cpe", "")
is_eus = _is_eus_product(product_name, cpe)
product_eus_map[product_id] = is_eus

if "branches" in branch:
_traverse_for_eus(branch["branches"], product_eus_map)

return product_eus_map


def _extract_packages_from_branches(branches, product_eus_map, packages=None):
"""
Recursively traverse CSAF branches to extract package NEVRAs.

Args:
branches: List of CSAF branch dictionaries to traverse
product_eus_map: Dict mapping product_id to EUS status
packages: Optional set to accumulate results

Returns:
Set of NEVRA strings
"""
if packages is None:
packages = set()

for branch in branches:
category = branch.get("category")

if category == "product_version":
prod = branch.get("product", {})
product_id = prod.get("product_id")
purl = prod.get("product_identification_helper", {}).get("purl")

if not product_id:
continue

if purl and not purl.startswith("pkg:rpm/"):
continue

# Product IDs for packages can have format: "AppStream-9.0.0.Z.E4S:package-nevra"
# or just "package-nevra" for packages in product_version entries
skip_eus = False
for eus_prod_id, is_eus in product_eus_map.items():
if is_eus and (":" in product_id and product_id.startswith(eus_prod_id + ":")):
skip_eus = True
break

if skip_eus:
continue

# Format: "package-epoch:version-release.arch" or "package-epoch:version-release.arch::module:stream"
packages.add(product_id.split("::")[0])

if "branches" in branch:
_extract_packages_from_branches(branch["branches"], product_eus_map, packages)

return packages


def _extract_packages_from_product_tree(csaf: dict) -> set:
"""
Extracts fixed packages from CSAF product_tree using product_id fields.
Handles both regular and modular packages by extracting NEVRAs directly from product_id.
Filters out EUS products.

Args:
csaf: CSAF document dict

Returns:
Set of NEVRA strings
"""
product_tree = csaf.get("product_tree", {})

if not product_tree:
return set()

product_eus_map = {}
for vendor_branch in product_tree.get("branches", []):
product_eus_map = _traverse_for_eus(vendor_branch.get("branches", []), product_eus_map)

packages = set()
for vendor_branch in product_tree.get("branches", []):
packages = _extract_packages_from_branches(vendor_branch.get("branches", []), product_eus_map, packages)

return packages


def red_hat_advisory_scraper(csaf: dict):
# At the time of writing there are ~254 advisories that do not have any vulnerabilities.
if not csaf.get("vulnerabilities"):
logger.warning("No vulnerabilities found in CSAF document")
return None

# red_hat_advisories table values
red_hat_issued_at = csaf["document"]["tracking"]["initial_release_date"] # "2025-02-24T03:42:46+00:00"
red_hat_updated_at = csaf["document"]["tracking"]["current_release_date"] # "2025-04-17T12:08:56+00:00"
name = csaf["document"]["tracking"]["id"] # "RHSA-2025:1234"
red_hat_synopsis = csaf["document"]["title"] # "Red Hat Bug Fix Advisory: Red Hat Quay v3.13.4 bug fix release"
name = csaf["document"]["tracking"]["id"]

red_hat_affected_products = extract_rhel_affected_products_for_db(csaf)
if not red_hat_affected_products:
logger.info(f"Skipping advisory {name}: all products are EUS-only")
return None

red_hat_issued_at = csaf["document"]["tracking"]["initial_release_date"]
red_hat_updated_at = csaf["document"]["tracking"]["current_release_date"]
red_hat_synopsis = csaf["document"]["title"]
red_hat_description = None
topic = None
for item in csaf["document"]["notes"]:
Expand All @@ -112,59 +264,31 @@ def red_hat_advisory_scraper(csaf: dict):
elif item["category"] == "summary":
topic = item["text"]
kind_lookup = {"RHSA": "Security", "RHBA": "Bug Fix", "RHEA": "Enhancement"}
kind = kind_lookup[name.split("-")[0]] # "RHSA-2025:1234" --> "Security"
severity = csaf["document"]["aggregate_severity"]["text"] # "Important"
kind = kind_lookup[name.split("-")[0]]
severity = csaf["document"]["aggregate_severity"]["text"]

# To maintain consistency with the existing database, we need to replace the
# To maintain consistency with the existing database, replace
# "Red Hat [KIND] Advisory:" prefixes with the severity level.
red_hat_synopsis = red_hat_synopsis.replace("Red Hat Bug Fix Advisory: ", f"{severity}:")
red_hat_synopsis = red_hat_synopsis.replace("Red Hat Security Advisory:", f"{severity}:")
red_hat_synopsis = red_hat_synopsis.replace("Red Hat Enhancement Advisory: ", f"{severity}:")

# red_hat_advisory_packages table values
red_hat_fixed_packages = set()
red_hat_fixed_packages = _extract_packages_from_product_tree(csaf)

red_hat_cve_set = set()
red_hat_bugzilla_set = set()
product_id_suffix_list = (
".aarch64",
".i386",
".i686",
".noarch",
".ppc",
".ppc64",
".ppc64le",
".s390",
".s390x",
".src",
".x86_64"
) # TODO: find a better way to filter product IDs. This is a workaround for the fact that
# the product IDs in the CSAF documents also contain artifacts like container images
# and we only are interested in RPMs.

for vulnerability in csaf["vulnerabilities"]:
for product_id in vulnerability["product_status"]["fixed"]:
if product_id.endswith(product_id_suffix_list):
# These IDs are in the format product:package_nevra
# ie- AppStream-9.4.0.Z.EUS:rsync-0:3.2.3-19.el9_4.1.aarch64"
split_on_colon = product_id.split(":")
product = split_on_colon[0]
package_nevra = ":".join(split_on_colon[-2:])
red_hat_fixed_packages.add(package_nevra)

# red_hat_advisory_cves table values. Many older advisories do not have CVEs and so we need to handle that.
cve_id = vulnerability.get("cve", None)
cve_cvss3_scoring_vector = vulnerability.get("scores", [{}])[0].get("cvss_v3", {}).get("vectorString", None)
cve_cvss3_base_score = vulnerability.get("scores", [{}])[0].get("cvss_v3", {}).get("baseScore", None)
cve_cwe = vulnerability.get("cwe", {}).get("id", None)
red_hat_cve_set.add((cve_id, cve_cvss3_scoring_vector, cve_cvss3_base_score, cve_cwe))

# red_hat_advisory_bugzilla_bugs table values
for bug_id in vulnerability.get("ids", []):
if bug_id.get("system_name") == "Red Hat Bugzilla ID":
red_hat_bugzilla_set.add(bug_id["text"])

# red_hat_advisory_affected_products table values
red_hat_affected_products = extract_rhel_affected_products_for_db(csaf)

return {
"red_hat_issued_at": str(red_hat_issued_at),
"red_hat_updated_at": str(red_hat_updated_at),
Expand Down
7 changes: 5 additions & 2 deletions apollo/rhworker/poll_rh_activities.py
Original file line number Diff line number Diff line change
Expand Up @@ -651,8 +651,11 @@ async def fetch_csv_with_dates(session, url):
releases = await fetch_csv_with_dates(session, base_url + "releases.csv")
deletions = await fetch_csv_with_dates(session, base_url + "deletions.csv")

# Merge changes and releases, keeping the most recent timestamp for each advisory
all_advisories = {**changes, **releases}
# Merge changes and releases, prioritizing changes.csv for updated timestamps
# changes.csv contains the most recent modification time for each advisory
# releases.csv contains original publication dates
# We want changes.csv to take precedence to catch updates to existing advisories
all_advisories = {**releases, **changes}
# Remove deletions
for advisory_id in deletions:
all_advisories.pop(advisory_id, None)
Expand Down
2 changes: 2 additions & 0 deletions apollo/server/routes/admin_supported_products.py
Original file line number Diff line number Diff line change
Expand Up @@ -1296,6 +1296,8 @@ async def _get_mirror_config_data(mirror: SupportedProductsRhMirror) -> Dict[str
def _json_serializer(obj):
"""Custom JSON serializer for non-standard types"""
if isinstance(obj, Decimal):
if obj % 1 == 0:
return int(obj)
return float(obj)
raise TypeError(f"Object of type {obj.__class__.__name__} is not JSON serializable")

Expand Down
Loading
Loading