diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index 2454fd2..394e28c 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -37,6 +37,8 @@ jobs: bazel test //apollo/tests:test_auth --test_output=all bazel test //apollo/tests:test_validation --test_output=all bazel test //apollo/tests:test_admin_routes_supported_products --test_output=all + bazel test //apollo/tests:test_api_osv --test_output=all + bazel test //apollo/tests:test_database_service --test_output=all - name: Integration Tests run: ./build/scripts/test.bash diff --git a/apollo/rhcsaf/__init__.py b/apollo/rhcsaf/__init__.py index 762cedf..95175c4 100644 --- a/apollo/rhcsaf/__init__.py +++ b/apollo/rhcsaf/__init__.py @@ -4,24 +4,65 @@ from common.logger import Logger from apollo.rpm_helpers import parse_nevra -# Initialize Info before Logger for this module - logger = Logger() +EUS_CPE_PRODUCTS = frozenset([ + "rhel_eus", # Extended Update Support + "rhel_e4s", # Update Services for SAP Solutions + "rhel_aus", # Advanced Update Support (IBM Power) + "rhel_tus", # Telecommunications Update Service +]) + +EUS_PRODUCT_NAME_KEYWORDS = frozenset([ + "e4s", + "eus", + "aus", + "tus", + "extended update support", + "update services for sap", + "advanced update support", + "telecommunications update service", +]) + +def _is_eus_product(product_name: str, cpe: str) -> bool: + """ + Detects if a product is EUS-related based on product name and CPE. + + Args: + product_name: Full product name (e.g., "Red Hat Enterprise Linux AppStream E4S (v.9.0)") + cpe: CPE string (e.g., "cpe:/a:redhat:rhel_e4s:9.0::appstream") + + Returns: + True if product is EUS/E4S/AUS/TUS, False otherwise + """ + if cpe: + parts = cpe.split(":") + if len(parts) > 3: + cpe_product = parts[3] + if cpe_product in EUS_CPE_PRODUCTS: + return True + + if product_name: + name_lower = product_name.lower() + for keyword in EUS_PRODUCT_NAME_KEYWORDS: + if keyword in name_lower: + return True + + return False + + def extract_rhel_affected_products_for_db(csaf: dict) -> set: """ Extracts all needed info for red_hat_advisory_affected_products table from CSAF product_tree. Expands 'noarch' to all main arches and maps names to user-friendly values. Returns a set of tuples: (variant, name, major_version, minor_version, arch) """ - # Maps architecture short names to user-friendly product names arch_name_map = { "aarch64": "Red Hat Enterprise Linux for ARM 64", "x86_64": "Red Hat Enterprise Linux for x86_64", "s390x": "Red Hat Enterprise Linux for IBM z Systems", "ppc64le": "Red Hat Enterprise Linux for Power, little endian", } - # List of main architectures to expand 'noarch' main_arches = list(arch_name_map.keys()) affected_products = set() product_tree = csaf.get("product_tree", {}) @@ -29,25 +70,20 @@ def extract_rhel_affected_products_for_db(csaf: dict) -> set: logger.warning("No product tree found in CSAF document") return affected_products - # Iterate over all vendor branches in the product tree for vendor_branch in product_tree.get("branches", []): - # Find the product_family branch for RHEL family_branch = None arches = set() for branch in vendor_branch.get("branches", []): if branch.get("category") == "product_family" and branch.get("name") == "Red Hat Enterprise Linux": family_branch = branch - # Collect all architecture branches at the same level as product_family elif branch.get("category") == "architecture": arch = branch.get("name") if arch: arches.add(arch) - # If 'noarch' is present, expand to all main architectures if "noarch" in arches: arches = set(main_arches) if not family_branch: continue - # Find the product_name branch for CPE/version info prod_name = None cpe = None for branch in family_branch.get("branches", []): @@ -59,24 +95,24 @@ def extract_rhel_affected_products_for_db(csaf: dict) -> set: if not prod_name or not cpe: continue - # Parses the CPE string to extract major and minor version numbers + if _is_eus_product(prod_name, cpe): + logger.debug(f"Skipping EUS product: {prod_name}") + continue + # Example CPE: "cpe:/a:redhat:enterprise_linux:9::appstream" - parts = cpe.split(":") # Split the CPE string by colon + parts = cpe.split(":") major = None minor = None if len(parts) > 4: - version = parts[4] # The version is typically the 5th field (index 4) + version = parts[4] if version: if "." in version: - # If the version contains a dot, split into major and minor major, minor = version.split(".", 1) major = int(major) minor = int(minor) else: - # If no dot, only major version is present major = int(version) - # For each architecture, add a tuple with product info to the set for arch in arches: name = arch_name_map.get(arch) if name is None: @@ -84,26 +120,142 @@ def extract_rhel_affected_products_for_db(csaf: dict) -> set: continue if major: affected_products.add(( - family_branch.get("name"), # variant (e.g., "Red Hat Enterprise Linux") - name, # user-friendly architecture name - major, # major version number - minor, # minor version number (may be None) - arch # architecture short name + family_branch.get("name"), + name, + major, + minor, + arch )) logger.debug(f"Number of affected products: {len(affected_products)}") return affected_products + +def _traverse_for_eus(branches, product_eus_map=None): + """ + Recursively traverse CSAF branches to build EUS product map. + + Args: + branches: List of CSAF branch dictionaries to traverse + product_eus_map: Optional dict to accumulate results + + Returns: + Dict mapping product_id to boolean indicating if product is EUS + """ + if product_eus_map is None: + product_eus_map = {} + + for branch in branches: + category = branch.get("category") + + if category == "product_name": + prod = branch.get("product", {}) + product_id = prod.get("product_id") + + if product_id: + product_name = prod.get("name", "") + cpe = prod.get("product_identification_helper", {}).get("cpe", "") + is_eus = _is_eus_product(product_name, cpe) + product_eus_map[product_id] = is_eus + + if "branches" in branch: + _traverse_for_eus(branch["branches"], product_eus_map) + + return product_eus_map + + +def _extract_packages_from_branches(branches, product_eus_map, packages=None): + """ + Recursively traverse CSAF branches to extract package NEVRAs. + + Args: + branches: List of CSAF branch dictionaries to traverse + product_eus_map: Dict mapping product_id to EUS status + packages: Optional set to accumulate results + + Returns: + Set of NEVRA strings + """ + if packages is None: + packages = set() + + for branch in branches: + category = branch.get("category") + + if category == "product_version": + prod = branch.get("product", {}) + product_id = prod.get("product_id") + purl = prod.get("product_identification_helper", {}).get("purl") + + if not product_id: + continue + + if purl and not purl.startswith("pkg:rpm/"): + continue + + # Product IDs for packages can have format: "AppStream-9.0.0.Z.E4S:package-nevra" + # or just "package-nevra" for packages in product_version entries + skip_eus = False + for eus_prod_id, is_eus in product_eus_map.items(): + if is_eus and (":" in product_id and product_id.startswith(eus_prod_id + ":")): + skip_eus = True + break + + if skip_eus: + continue + + # Format: "package-epoch:version-release.arch" or "package-epoch:version-release.arch::module:stream" + packages.add(product_id.split("::")[0]) + + if "branches" in branch: + _extract_packages_from_branches(branch["branches"], product_eus_map, packages) + + return packages + + +def _extract_packages_from_product_tree(csaf: dict) -> set: + """ + Extracts fixed packages from CSAF product_tree using product_id fields. + Handles both regular and modular packages by extracting NEVRAs directly from product_id. + Filters out EUS products. + + Args: + csaf: CSAF document dict + + Returns: + Set of NEVRA strings + """ + product_tree = csaf.get("product_tree", {}) + + if not product_tree: + return set() + + product_eus_map = {} + for vendor_branch in product_tree.get("branches", []): + product_eus_map = _traverse_for_eus(vendor_branch.get("branches", []), product_eus_map) + + packages = set() + for vendor_branch in product_tree.get("branches", []): + packages = _extract_packages_from_branches(vendor_branch.get("branches", []), product_eus_map, packages) + + return packages + + def red_hat_advisory_scraper(csaf: dict): # At the time of writing there are ~254 advisories that do not have any vulnerabilities. if not csaf.get("vulnerabilities"): logger.warning("No vulnerabilities found in CSAF document") return None - # red_hat_advisories table values - red_hat_issued_at = csaf["document"]["tracking"]["initial_release_date"] # "2025-02-24T03:42:46+00:00" - red_hat_updated_at = csaf["document"]["tracking"]["current_release_date"] # "2025-04-17T12:08:56+00:00" - name = csaf["document"]["tracking"]["id"] # "RHSA-2025:1234" - red_hat_synopsis = csaf["document"]["title"] # "Red Hat Bug Fix Advisory: Red Hat Quay v3.13.4 bug fix release" + name = csaf["document"]["tracking"]["id"] + + red_hat_affected_products = extract_rhel_affected_products_for_db(csaf) + if not red_hat_affected_products: + logger.info(f"Skipping advisory {name}: all products are EUS-only") + return None + + red_hat_issued_at = csaf["document"]["tracking"]["initial_release_date"] + red_hat_updated_at = csaf["document"]["tracking"]["current_release_date"] + red_hat_synopsis = csaf["document"]["title"] red_hat_description = None topic = None for item in csaf["document"]["notes"]: @@ -112,59 +264,31 @@ def red_hat_advisory_scraper(csaf: dict): elif item["category"] == "summary": topic = item["text"] kind_lookup = {"RHSA": "Security", "RHBA": "Bug Fix", "RHEA": "Enhancement"} - kind = kind_lookup[name.split("-")[0]] # "RHSA-2025:1234" --> "Security" - severity = csaf["document"]["aggregate_severity"]["text"] # "Important" + kind = kind_lookup[name.split("-")[0]] + severity = csaf["document"]["aggregate_severity"]["text"] - # To maintain consistency with the existing database, we need to replace the + # To maintain consistency with the existing database, replace # "Red Hat [KIND] Advisory:" prefixes with the severity level. red_hat_synopsis = red_hat_synopsis.replace("Red Hat Bug Fix Advisory: ", f"{severity}:") red_hat_synopsis = red_hat_synopsis.replace("Red Hat Security Advisory:", f"{severity}:") red_hat_synopsis = red_hat_synopsis.replace("Red Hat Enhancement Advisory: ", f"{severity}:") - # red_hat_advisory_packages table values - red_hat_fixed_packages = set() + red_hat_fixed_packages = _extract_packages_from_product_tree(csaf) + red_hat_cve_set = set() red_hat_bugzilla_set = set() - product_id_suffix_list = ( - ".aarch64", - ".i386", - ".i686", - ".noarch", - ".ppc", - ".ppc64", - ".ppc64le", - ".s390", - ".s390x", - ".src", - ".x86_64" - ) # TODO: find a better way to filter product IDs. This is a workaround for the fact that - # the product IDs in the CSAF documents also contain artifacts like container images - # and we only are interested in RPMs. + for vulnerability in csaf["vulnerabilities"]: - for product_id in vulnerability["product_status"]["fixed"]: - if product_id.endswith(product_id_suffix_list): - # These IDs are in the format product:package_nevra - # ie- AppStream-9.4.0.Z.EUS:rsync-0:3.2.3-19.el9_4.1.aarch64" - split_on_colon = product_id.split(":") - product = split_on_colon[0] - package_nevra = ":".join(split_on_colon[-2:]) - red_hat_fixed_packages.add(package_nevra) - - # red_hat_advisory_cves table values. Many older advisories do not have CVEs and so we need to handle that. cve_id = vulnerability.get("cve", None) cve_cvss3_scoring_vector = vulnerability.get("scores", [{}])[0].get("cvss_v3", {}).get("vectorString", None) cve_cvss3_base_score = vulnerability.get("scores", [{}])[0].get("cvss_v3", {}).get("baseScore", None) cve_cwe = vulnerability.get("cwe", {}).get("id", None) red_hat_cve_set.add((cve_id, cve_cvss3_scoring_vector, cve_cvss3_base_score, cve_cwe)) - # red_hat_advisory_bugzilla_bugs table values for bug_id in vulnerability.get("ids", []): if bug_id.get("system_name") == "Red Hat Bugzilla ID": red_hat_bugzilla_set.add(bug_id["text"]) - # red_hat_advisory_affected_products table values - red_hat_affected_products = extract_rhel_affected_products_for_db(csaf) - return { "red_hat_issued_at": str(red_hat_issued_at), "red_hat_updated_at": str(red_hat_updated_at), diff --git a/apollo/rhworker/poll_rh_activities.py b/apollo/rhworker/poll_rh_activities.py index e592136..85a4380 100644 --- a/apollo/rhworker/poll_rh_activities.py +++ b/apollo/rhworker/poll_rh_activities.py @@ -651,8 +651,11 @@ async def fetch_csv_with_dates(session, url): releases = await fetch_csv_with_dates(session, base_url + "releases.csv") deletions = await fetch_csv_with_dates(session, base_url + "deletions.csv") - # Merge changes and releases, keeping the most recent timestamp for each advisory - all_advisories = {**changes, **releases} + # Merge changes and releases, prioritizing changes.csv for updated timestamps + # changes.csv contains the most recent modification time for each advisory + # releases.csv contains original publication dates + # We want changes.csv to take precedence to catch updates to existing advisories + all_advisories = {**releases, **changes} # Remove deletions for advisory_id in deletions: all_advisories.pop(advisory_id, None) diff --git a/apollo/server/routes/admin_workflows.py b/apollo/server/routes/admin_workflows.py index ef319dc..cfb26ec 100644 --- a/apollo/server/routes/admin_workflows.py +++ b/apollo/server/routes/admin_workflows.py @@ -21,7 +21,8 @@ async def admin_workflows(request: Request, user: User = Depends(admin_user_sche """Render admin workflows page for manual workflow triggering""" db_service = DatabaseService() env_info = await db_service.get_environment_info() - + index_state = await db_service.get_last_indexed_at() + return templates.TemplateResponse( "admin_workflows.jinja", { "request": request, @@ -29,6 +30,8 @@ async def admin_workflows(request: Request, user: User = Depends(admin_user_sche "env_name": env_info["environment"], "is_production": env_info["is_production"], "reset_allowed": env_info["reset_allowed"], + "last_indexed_at": index_state.get("last_indexed_at_iso"), + "last_indexed_exists": index_state.get("exists", False), } ) @@ -92,6 +95,39 @@ async def trigger_poll_rhcsaf( return RedirectResponse(url="/admin/workflows", status_code=303) +@router.post("/workflows/update-index-timestamp") +async def update_index_timestamp( + request: Request, + new_timestamp: str = Form(...), + user: User = Depends(admin_user_scheme) +): + """Update the last_indexed_at timestamp in red_hat_index_state""" + try: + # Parse the timestamp + timestamp_dt = datetime.fromisoformat(new_timestamp.replace("Z", "+00:00")) + + db_service = DatabaseService() + result = await db_service.update_last_indexed_at(timestamp_dt, user.email) + + Logger().info(f"Admin user {user.email} updated last_indexed_at to {new_timestamp}") + + # Store success message in session + request.session["workflow_message"] = result["message"] + request.session["workflow_type"] = "success" + + except ValueError as e: + Logger().error(f"Invalid timestamp format: {str(e)}") + request.session["workflow_message"] = f"Invalid timestamp format: {str(e)}" + request.session["workflow_type"] = "error" + + except Exception as e: + Logger().error(f"Error updating last_indexed_at: {str(e)}") + request.session["workflow_message"] = f"Error updating timestamp: {str(e)}" + request.session["workflow_type"] = "error" + + return RedirectResponse(url="/admin/workflows", status_code=303) + + @router.get("/workflows/database/preview-reset") async def preview_database_reset( request: Request, diff --git a/apollo/server/routes/api_osv.py b/apollo/server/routes/api_osv.py index f0022ee..debf89a 100644 --- a/apollo/server/routes/api_osv.py +++ b/apollo/server/routes/api_osv.py @@ -143,7 +143,6 @@ def to_osv_advisory(ui_url: str, advisory: Advisory) -> OSVAdvisory: for pkg in affected_packages: x = pkg[0] nevra = pkg[1] - # Only process "src" packages if nevra.group(5) != "src": continue if x.nevra in processed_nvra: @@ -198,11 +197,9 @@ def to_osv_advisory(ui_url: str, advisory: Advisory) -> OSVAdvisory: if advisory.red_hat_advisory: osv_credits.append(OSVCredit(name="Red Hat")) - # Calculate severity by finding the highest CVSS score highest_cvss_base_score = 0.0 final_score_vector = None for x in advisory.cves: - # Convert cvss3_scoring_vector to a float base_score = x.cvss3_base_score if base_score and base_score != "UNKNOWN": base_score = float(base_score) @@ -255,15 +252,14 @@ async def get_advisories_osv( cve, synopsis, severity, - kind="Security", + kind=None, fetch_related=True, ) - count = fetch_adv[0] advisories = fetch_adv[1] ui_url = await get_setting(UI_URL) - osv_advisories = [to_osv_advisory(ui_url, x) for x in advisories] - page = create_page(osv_advisories, count, params) + osv_advisories = [to_osv_advisory(ui_url, adv) for adv in advisories if adv.cves] + page = create_page(osv_advisories, len(osv_advisories), params) state = await RedHatIndexState.first() page.last_updated_at = ( @@ -282,7 +278,7 @@ async def get_advisories_osv( ) async def get_advisory_osv(advisory_id: str): advisory = ( - await Advisory.filter(name=advisory_id, kind="Security") + await Advisory.filter(name=advisory_id) .prefetch_related( "packages", "cves", @@ -295,7 +291,7 @@ async def get_advisory_osv(advisory_id: str): .get_or_none() ) - if not advisory: + if not advisory or not advisory.cves: raise HTTPException(404) ui_url = await get_setting(UI_URL) diff --git a/apollo/server/services/database_service.py b/apollo/server/services/database_service.py index 78d6fb0..0a66800 100644 --- a/apollo/server/services/database_service.py +++ b/apollo/server/services/database_service.py @@ -123,4 +123,67 @@ async def get_environment_info(self) -> Dict[str, str]: "environment": env_name, "is_production": self.is_production_environment(), "reset_allowed": not self.is_production_environment() - } \ No newline at end of file + } + + async def get_last_indexed_at(self) -> Dict[str, Any]: + """ + Get the current last_indexed_at timestamp from red_hat_index_state + + Returns: + Dictionary with timestamp information + """ + index_state = await RedHatIndexState.first() + + if not index_state or not index_state.last_indexed_at: + return { + "last_indexed_at": None, + "last_indexed_at_iso": None, + "exists": False + } + + return { + "last_indexed_at": index_state.last_indexed_at, + "last_indexed_at_iso": index_state.last_indexed_at.isoformat(), + "exists": True + } + + async def update_last_indexed_at(self, new_timestamp: datetime, user_email: str) -> Dict[str, Any]: + """ + Update the last_indexed_at timestamp in red_hat_index_state + + Args: + new_timestamp: New timestamp to set + user_email: Email of user making the change (for logging) + + Returns: + Dictionary with operation results + + Raises: + ValueError: If timestamp is invalid + """ + logger = Logger() + + try: + # Get or create index state + index_state = await RedHatIndexState.first() + + old_timestamp = None + if index_state: + old_timestamp = index_state.last_indexed_at + index_state.last_indexed_at = new_timestamp + await index_state.save() + logger.info(f"Updated last_indexed_at by {user_email}: {old_timestamp} -> {new_timestamp}") + else: + await RedHatIndexState.create(last_indexed_at=new_timestamp) + logger.info(f"Created last_indexed_at by {user_email}: {new_timestamp}") + + return { + "success": True, + "old_timestamp": old_timestamp.isoformat() if old_timestamp else None, + "new_timestamp": new_timestamp.isoformat(), + "message": f"Successfully updated last_indexed_at to {new_timestamp.isoformat()}" + } + + except Exception as e: + logger.error(f"Failed to update last_indexed_at: {e}") + raise RuntimeError(f"Failed to update timestamp: {e}") from e \ No newline at end of file diff --git a/apollo/server/templates/admin_workflows.jinja b/apollo/server/templates/admin_workflows.jinja index 6dd396a..b3f7916 100644 --- a/apollo/server/templates/admin_workflows.jinja +++ b/apollo/server/templates/admin_workflows.jinja @@ -71,7 +71,7 @@

Poll RH CSAF Advisories Workflow

Polls Red Hat for new CSAF (Common Security Advisory Framework) advisories.

-
+ @@ -80,6 +80,44 @@ + +
+
+
+

Update CSAF Index Timestamp

+

Set the last_indexed_at timestamp to control which advisories are processed by the Poll RHCSAF workflow.

+ + {% if last_indexed_exists %} +

+ Current last_indexed_at: {{ last_indexed_at }} +

+ {% else %} +

+ No timestamp set - workflow will process all advisories +

+ {% endif %} + + +
+ + + +
+ The workflow will process advisories with timestamps after this date.
+ Time will be set to 00:00:00 UTC. +
+
+ + + +
+
+
+ {% if reset_allowed %}
@@ -114,7 +152,7 @@
- -{% if reset_allowed %} -{% endif %} {% endblock %} \ No newline at end of file diff --git a/apollo/tests/BUILD.bazel b/apollo/tests/BUILD.bazel index b658f79..b08f0d9 100644 --- a/apollo/tests/BUILD.bazel +++ b/apollo/tests/BUILD.bazel @@ -61,3 +61,21 @@ py_test( "//apollo/server:server_lib", ], ) + +py_test( + name = "test_api_osv", + srcs = ["test_api_osv.py"], + deps = [ + "//apollo/server:server_lib", + ], +) + +py_test( + name = "test_database_service", + srcs = ["test_database_service.py"], + deps = [ + "//apollo/server:server_lib", + "//apollo/db:db_lib", + "//common:common_lib", + ], +) diff --git a/apollo/tests/test_api_osv.py b/apollo/tests/test_api_osv.py new file mode 100644 index 0000000..6422c3d --- /dev/null +++ b/apollo/tests/test_api_osv.py @@ -0,0 +1,248 @@ +""" +Tests for OSV API CVE filtering functionality +""" + +import unittest +import datetime +from unittest.mock import Mock + +from apollo.server.routes.api_osv import to_osv_advisory + + +class MockSupportedProduct: + """Mock SupportedProduct model""" + + def __init__(self, variant="Rocky Linux", vendor="Rocky Enterprise Software Foundation"): + self.variant = variant + self.vendor = vendor + + +class MockSupportedProductsRhMirror: + """Mock SupportedProductsRhMirror model""" + + def __init__(self, match_major_version=9): + self.match_major_version = match_major_version + + +class MockPackage: + """Mock Package model""" + + def __init__( + self, + nevra, + product_name="Rocky Linux 9", + repo_name="BaseOS", + supported_product=None, + supported_products_rh_mirror=None, + ): + self.nevra = nevra + self.product_name = product_name + self.repo_name = repo_name + self.supported_product = supported_product or MockSupportedProduct() + self.supported_products_rh_mirror = supported_products_rh_mirror + + +class MockCVE: + """Mock CVE model""" + + def __init__( + self, + cve="CVE-2024-1234", + cvss3_base_score="7.5", + cvss3_scoring_vector="CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:N/A:N", + ): + self.cve = cve + self.cvss3_base_score = cvss3_base_score + self.cvss3_scoring_vector = cvss3_scoring_vector + + +class MockFix: + """Mock Fix model""" + + def __init__(self, source="https://bugzilla.redhat.com/show_bug.cgi?id=1234567"): + self.source = source + + +class MockAdvisory: + """Mock Advisory model""" + + def __init__( + self, + name="RLSA-2024:1234", + synopsis="Important: test security update", + description="A security update for test package", + published_at=None, + updated_at=None, + packages=None, + cves=None, + fixes=None, + red_hat_advisory=None, + ): + self.name = name + self.synopsis = synopsis + self.description = description + self.published_at = published_at or datetime.datetime.now( + datetime.timezone.utc + ) + self.updated_at = updated_at or datetime.datetime.now(datetime.timezone.utc) + self.packages = packages or [] + self.cves = cves or [] + self.fixes = fixes or [] + self.red_hat_advisory = red_hat_advisory + + +class TestOSVCVEFiltering(unittest.TestCase): + """Test CVE filtering logic in OSV API""" + + def test_advisory_with_cve_has_upstream_references(self): + """Test that advisories with CVEs have upstream references populated""" + packages = [ + MockPackage( + nevra="pcs-0:0.11.8-2.el9_5.src", + supported_products_rh_mirror=MockSupportedProductsRhMirror(9), + ), + ] + cves = [MockCVE(cve="CVE-2024-1234")] + + advisory = MockAdvisory(packages=packages, cves=cves) + result = to_osv_advisory("https://errata.rockylinux.org", advisory) + + self.assertIsNotNone(result.upstream) + self.assertEqual(len(result.upstream), 1) + self.assertIn("CVE-2024-1234", result.upstream) + + def test_advisory_with_multiple_cves(self): + """Test that advisories with multiple CVEs include all in upstream""" + packages = [ + MockPackage( + nevra="openssl-1:3.0.7-28.el9_5.src", + supported_products_rh_mirror=MockSupportedProductsRhMirror(9), + ), + ] + cves = [ + MockCVE(cve="CVE-2024-1111"), + MockCVE(cve="CVE-2024-2222"), + MockCVE(cve="CVE-2024-3333"), + ] + + advisory = MockAdvisory(packages=packages, cves=cves) + result = to_osv_advisory("https://errata.rockylinux.org", advisory) + + self.assertIsNotNone(result.upstream) + self.assertEqual(len(result.upstream), 3) + self.assertIn("CVE-2024-1111", result.upstream) + self.assertIn("CVE-2024-2222", result.upstream) + self.assertIn("CVE-2024-3333", result.upstream) + + def test_advisory_without_cves_has_empty_upstream(self): + """Test that advisories without CVEs have empty upstream list""" + packages = [ + MockPackage( + nevra="kernel-0:5.14.0-427.el9.src", + supported_products_rh_mirror=MockSupportedProductsRhMirror(9), + ), + ] + + advisory = MockAdvisory(packages=packages, cves=[]) + result = to_osv_advisory("https://errata.rockylinux.org", advisory) + + self.assertIsNotNone(result.upstream) + self.assertEqual(len(result.upstream), 0) + + def test_source_packages_only(self): + """Test that only source packages are processed, not binary packages""" + packages = [ + MockPackage( + nevra="httpd-0:2.4.57-8.el9.src", + supported_products_rh_mirror=MockSupportedProductsRhMirror(9), + ), + MockPackage( + nevra="httpd-0:2.4.57-8.el9.x86_64", + supported_products_rh_mirror=MockSupportedProductsRhMirror(9), + ), + MockPackage( + nevra="httpd-0:2.4.57-8.el9.aarch64", + supported_products_rh_mirror=MockSupportedProductsRhMirror(9), + ), + ] + cves = [MockCVE()] + + advisory = MockAdvisory(packages=packages, cves=cves) + result = to_osv_advisory("https://errata.rockylinux.org", advisory) + + # Should only have 1 affected package (the source package) + self.assertEqual(len(result.affected), 1) + self.assertEqual(result.affected[0].package.name, "httpd") + + def test_severity_from_highest_cvss(self): + """Test that severity uses the highest CVSS score from multiple CVEs""" + packages = [ + MockPackage( + nevra="vim-2:9.0.1592-1.el9.src", + supported_products_rh_mirror=MockSupportedProductsRhMirror(9), + ), + ] + cves = [ + MockCVE( + cve="CVE-2024-1111", + cvss3_base_score="5.5", + cvss3_scoring_vector="CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:L/I:N/A:N", + ), + MockCVE( + cve="CVE-2024-2222", + cvss3_base_score="9.8", + cvss3_scoring_vector="CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H", + ), + MockCVE( + cve="CVE-2024-3333", + cvss3_base_score="7.5", + cvss3_scoring_vector="CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:N/A:N", + ), + ] + + advisory = MockAdvisory(packages=packages, cves=cves) + result = to_osv_advisory("https://errata.rockylinux.org", advisory) + + self.assertIsNotNone(result.severity) + self.assertEqual(len(result.severity), 1) + self.assertEqual(result.severity[0].type, "CVSS_V3") + self.assertEqual( + result.severity[0].score, "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H" + ) + + def test_ecosystem_format(self): + """Test that ecosystem field is formatted correctly""" + packages = [ + MockPackage( + nevra="bash-0:5.1.8-9.el9.src", + product_name="Rocky Linux 9", + supported_products_rh_mirror=MockSupportedProductsRhMirror(9), + ), + ] + cves = [MockCVE()] + + advisory = MockAdvisory(packages=packages, cves=cves) + result = to_osv_advisory("https://errata.rockylinux.org", advisory) + + self.assertEqual(len(result.affected), 1) + self.assertEqual(result.affected[0].package.ecosystem, "Rocky Linux:9") + + def test_version_format_with_epoch(self): + """Test that fixed version includes epoch in epoch:version-release format""" + packages = [ + MockPackage( + nevra="systemd-0:252-38.el9_5.src", + supported_products_rh_mirror=MockSupportedProductsRhMirror(9), + ), + ] + cves = [MockCVE()] + + advisory = MockAdvisory(packages=packages, cves=cves) + result = to_osv_advisory("https://errata.rockylinux.org", advisory) + + fixed_version = result.affected[0].ranges[0].events[1].fixed + self.assertEqual(fixed_version, "0:252-38.el9_5") + + +if __name__ == "__main__": + unittest.main(verbosity=2) diff --git a/apollo/tests/test_csaf_processing.py b/apollo/tests/test_csaf_processing.py index dbd0f91..aa31c10 100644 --- a/apollo/tests/test_csaf_processing.py +++ b/apollo/tests/test_csaf_processing.py @@ -22,17 +22,10 @@ ) class TestCsafProcessing(unittest.IsolatedAsyncioTestCase): - @classmethod - async def asyncSetUp(cls): - # Initialize test database for all tests in this class + async def asyncSetUp(self): + # Initialize test database before each test await initialize_test_db() - - @classmethod - async def asyncTearDown(cls): - # Close database connections when tests are done - await close_test_db() - def setUp(self): # Create sample CSAF data matching schema requirements self.sample_csaf = { "document": { @@ -69,10 +62,35 @@ def setUp(self): "name": "Red Hat Enterprise Linux 9", "product": { "name": "Red Hat Enterprise Linux 9", + "product_id": "AppStream-9.4.0.Z.MAIN", "product_identification_helper": { - "cpe": "cpe:/o:redhat:enterprise_linux:9.4" + "cpe": "cpe:/o:redhat:enterprise_linux:9::appstream" + } + }, + "branches": [ + { + "category": "product_version", + "name": "rsync-0:3.2.3-19.el9_4.1.x86_64", + "product": { + "name": "rsync-0:3.2.3-19.el9_4.1.x86_64", + "product_id": "rsync-0:3.2.3-19.el9_4.1.x86_64", + "product_identification_helper": { + "purl": "pkg:rpm/redhat/rsync@3.2.3-19.el9_4.1?arch=x86_64" + } + } + }, + { + "category": "product_version", + "name": "rsync-0:3.2.3-19.el9_4.1.src", + "product": { + "name": "rsync-0:3.2.3-19.el9_4.1.src", + "product_id": "rsync-0:3.2.3-19.el9_4.1.src", + "product_identification_helper": { + "purl": "pkg:rpm/redhat/rsync@3.2.3-19.el9_4.1?arch=src" + } + } } - } + ] } ] }, @@ -95,8 +113,8 @@ def setUp(self): ], "product_status": { "fixed": [ - "AppStream-9.4.0.Z.EUS:rsync-0:3.2.3-19.el9_4.1.x86_64", - "AppStream-9.4.0.Z.EUS:rsync-0:3.2.3-19.el9_4.1.src" + "AppStream-9.4.0.Z.MAIN:rsync-0:3.2.3-19.el9_4.1.x86_64", + "AppStream-9.4.0.Z.MAIN:rsync-0:3.2.3-19.el9_4.1.src" ] }, "scores": [{ @@ -117,28 +135,31 @@ def setUp(self): } ] } - + # Create a temporary file with the sample data self.test_file = pathlib.Path("test_csaf.json") with open(self.test_file, "w") as f: json.dump(self.sample_csaf, f) - async def tearDown(self): - # Clean up database and temporary files after each test + async def asyncTearDown(self): + # Clean up database entries and temporary files after each test await RedHatAdvisory.all().delete() await RedHatAdvisoryPackage.all().delete() await RedHatAdvisoryCVE.all().delete() - await RedHatAdvisoryBugzillaBug.all().delete() + await RedHatAdvisoryBugzillaBug.all().delete() await RedHatAdvisoryAffectedProduct.all().delete() - - # Clean up temporary file + + # Close database connections + await close_test_db() + + # Clean up temporary files self.test_file.unlink(missing_ok=True) pathlib.Path("invalid_csaf.json").unlink(missing_ok=True) async def test_new_advisory_creation(self): # Test creating a new advisory with a real test database result = await process_csaf_file(self.sample_csaf, "test.json") - + # Verify advisory was created correctly advisory = await RedHatAdvisory.get_or_none(name="RHSA-2025:1234") self.assertIsNotNone(advisory) @@ -176,7 +197,8 @@ async def test_new_advisory_creation(self): self.assertEqual(products[0].variant, "Red Hat Enterprise Linux") self.assertEqual(products[0].arch, "x86_64") self.assertEqual(products[0].major_version, 9) - self.assertEqual(products[0].minor_version, 4) + # Minor version is None because CPE doesn't include minor version + self.assertIsNone(products[0].minor_version) async def test_advisory_update(self): # First create an advisory with different values @@ -224,12 +246,13 @@ async def test_no_vulnerabilities(self): self.assertEqual(count, 0) async def test_no_fixed_packages(self): - # Test CSAF with vulnerabilities but no fixed packages + # Test CSAF with vulnerabilities but no fixed packages in product_tree csaf = self.sample_csaf.copy() - csaf["vulnerabilities"][0]["product_status"]["fixed"] = [] + # Remove product_version entries from product_tree to simulate no fixed packages + csaf["product_tree"]["branches"][0]["branches"][0]["branches"][0].pop("branches", None) result = await process_csaf_file(csaf, "test.json") self.assertIsNone(result) - + # Verify nothing was created count = await RedHatAdvisory.all().count() self.assertEqual(count, 0) @@ -239,4 +262,7 @@ async def test_db_exception(self, mock_get_or_none): # Simulate a database error mock_get_or_none.side_effect = Exception("DB error") with self.assertRaises(Exception): - await process_csaf_file(self.sample_csaf, "test.json") \ No newline at end of file + await process_csaf_file(self.sample_csaf, "test.json") + +if __name__ == '__main__': + unittest.main() \ No newline at end of file diff --git a/apollo/tests/test_database_service.py b/apollo/tests/test_database_service.py new file mode 100644 index 0000000..0a8c215 --- /dev/null +++ b/apollo/tests/test_database_service.py @@ -0,0 +1,226 @@ +""" +Tests for DatabaseService functionality +Tests utility functions for database operations including timestamp management +""" + +import unittest +import asyncio +from datetime import datetime, timezone +from unittest.mock import Mock, AsyncMock, patch +import os + +# Add the project root to the Python path +import sys + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../..")) + +from apollo.server.services.database_service import DatabaseService + + +class TestEnvironmentDetection(unittest.TestCase): + """Test environment detection functionality.""" + + def test_is_production_when_env_is_production(self): + """Test production detection when ENV=production.""" + with patch.dict(os.environ, {"ENV": "production"}): + service = DatabaseService() + self.assertTrue(service.is_production_environment()) + + def test_is_not_production_when_env_is_development(self): + """Test production detection when ENV=development.""" + with patch.dict(os.environ, {"ENV": "development"}): + service = DatabaseService() + self.assertFalse(service.is_production_environment()) + + def test_is_not_production_when_env_not_set(self): + """Test production detection when ENV is not set.""" + with patch.dict(os.environ, {}, clear=True): + service = DatabaseService() + self.assertFalse(service.is_production_environment()) + + def test_is_not_production_with_staging_env(self): + """Test production detection with staging environment.""" + with patch.dict(os.environ, {"ENV": "staging"}): + service = DatabaseService() + self.assertFalse(service.is_production_environment()) + + def test_get_environment_info_production(self): + """Test getting environment info for production.""" + with patch.dict(os.environ, {"ENV": "production"}): + service = DatabaseService() + result = asyncio.run(service.get_environment_info()) + + self.assertEqual(result["environment"], "production") + self.assertTrue(result["is_production"]) + self.assertFalse(result["reset_allowed"]) + + def test_get_environment_info_development(self): + """Test getting environment info for development.""" + with patch.dict(os.environ, {"ENV": "development"}): + service = DatabaseService() + result = asyncio.run(service.get_environment_info()) + + self.assertEqual(result["environment"], "development") + self.assertFalse(result["is_production"]) + self.assertTrue(result["reset_allowed"]) + + +class TestLastIndexedAtOperations(unittest.TestCase): + """Test last_indexed_at timestamp operations.""" + + def test_get_last_indexed_at_when_exists(self): + """Test getting last_indexed_at when record exists.""" + mock_index_state = Mock() + test_time = datetime(2025, 7, 1, 0, 0, 0, tzinfo=timezone.utc) + mock_index_state.last_indexed_at = test_time + + with patch("apollo.server.services.database_service.RedHatIndexState") as mock_state: + mock_state.first = AsyncMock(return_value=mock_index_state) + + service = DatabaseService() + result = asyncio.run(service.get_last_indexed_at()) + + self.assertEqual(result["last_indexed_at"], test_time) + self.assertEqual(result["last_indexed_at_iso"], "2025-07-01T00:00:00+00:00") + self.assertTrue(result["exists"]) + + def test_get_last_indexed_at_when_not_exists(self): + """Test getting last_indexed_at when no record exists.""" + with patch("apollo.server.services.database_service.RedHatIndexState") as mock_state: + mock_state.first = AsyncMock(return_value=None) + + service = DatabaseService() + result = asyncio.run(service.get_last_indexed_at()) + + self.assertIsNone(result["last_indexed_at"]) + self.assertIsNone(result["last_indexed_at_iso"]) + self.assertFalse(result["exists"]) + + def test_get_last_indexed_at_when_timestamp_is_none(self): + """Test getting last_indexed_at when timestamp field is None.""" + mock_index_state = Mock() + mock_index_state.last_indexed_at = None + + with patch("apollo.server.services.database_service.RedHatIndexState") as mock_state: + mock_state.first = AsyncMock(return_value=mock_index_state) + + service = DatabaseService() + result = asyncio.run(service.get_last_indexed_at()) + + self.assertIsNone(result["last_indexed_at"]) + self.assertIsNone(result["last_indexed_at_iso"]) + self.assertFalse(result["exists"]) + + def test_update_last_indexed_at_existing_record(self): + """Test updating last_indexed_at for existing record.""" + old_time = datetime(2025, 6, 1, 0, 0, 0, tzinfo=timezone.utc) + new_time = datetime(2025, 7, 1, 0, 0, 0, tzinfo=timezone.utc) + + mock_index_state = Mock() + mock_index_state.last_indexed_at = old_time + mock_index_state.save = AsyncMock() + + with patch("apollo.server.services.database_service.RedHatIndexState") as mock_state, \ + patch("apollo.server.services.database_service.Logger"): + mock_state.first = AsyncMock(return_value=mock_index_state) + + service = DatabaseService() + result = asyncio.run(service.update_last_indexed_at(new_time, "admin@example.com")) + + self.assertTrue(result["success"]) + self.assertEqual(result["old_timestamp"], "2025-06-01T00:00:00+00:00") + self.assertEqual(result["new_timestamp"], "2025-07-01T00:00:00+00:00") + self.assertIn("Successfully updated", result["message"]) + + # Verify save was called + mock_index_state.save.assert_called_once() + # Verify timestamp was updated + self.assertEqual(mock_index_state.last_indexed_at, new_time) + + def test_update_last_indexed_at_create_new_record(self): + """Test updating last_indexed_at when no record exists (creates new).""" + new_time = datetime(2025, 7, 1, 0, 0, 0, tzinfo=timezone.utc) + + with patch("apollo.server.services.database_service.RedHatIndexState") as mock_state, \ + patch("apollo.server.services.database_service.Logger"): + mock_state.first = AsyncMock(return_value=None) + mock_state.create = AsyncMock() + + service = DatabaseService() + result = asyncio.run(service.update_last_indexed_at(new_time, "admin@example.com")) + + self.assertTrue(result["success"]) + self.assertIsNone(result["old_timestamp"]) + self.assertEqual(result["new_timestamp"], "2025-07-01T00:00:00+00:00") + self.assertIn("Successfully updated", result["message"]) + + # Verify create was called with correct timestamp + mock_state.create.assert_called_once_with(last_indexed_at=new_time) + + def test_update_last_indexed_at_handles_exception(self): + """Test that update_last_indexed_at handles database exceptions.""" + new_time = datetime(2025, 7, 1, 0, 0, 0, tzinfo=timezone.utc) + + with patch("apollo.server.services.database_service.RedHatIndexState") as mock_state, \ + patch("apollo.server.services.database_service.Logger"): + mock_state.first = AsyncMock(side_effect=Exception("Database error")) + + service = DatabaseService() + + with self.assertRaises(RuntimeError) as cm: + asyncio.run(service.update_last_indexed_at(new_time, "admin@example.com")) + + self.assertIn("Failed to update timestamp", str(cm.exception)) + + +class TestPartialResetValidation(unittest.TestCase): + """Test partial reset validation logic.""" + + def test_preview_partial_reset_blocks_in_production(self): + """Test that preview_partial_reset raises error in production.""" + with patch.dict(os.environ, {"ENV": "production"}): + service = DatabaseService() + cutoff_date = datetime(2025, 6, 1, 0, 0, 0, tzinfo=timezone.utc) + + with self.assertRaises(ValueError) as cm: + asyncio.run(service.preview_partial_reset(cutoff_date)) + + self.assertIn("production environment", str(cm.exception)) + + def test_preview_partial_reset_rejects_future_date(self): + """Test that preview_partial_reset rejects future dates.""" + with patch.dict(os.environ, {"ENV": "development"}): + service = DatabaseService() + future_date = datetime(2099, 1, 1, 0, 0, 0, tzinfo=timezone.utc) + + with self.assertRaises(ValueError) as cm: + asyncio.run(service.preview_partial_reset(future_date)) + + self.assertIn("must be in the past", str(cm.exception)) + + def test_perform_partial_reset_blocks_in_production(self): + """Test that perform_partial_reset raises error in production.""" + with patch.dict(os.environ, {"ENV": "production"}): + service = DatabaseService() + cutoff_date = datetime(2025, 6, 1, 0, 0, 0, tzinfo=timezone.utc) + + with self.assertRaises(ValueError) as cm: + asyncio.run(service.perform_partial_reset(cutoff_date, "admin@example.com")) + + self.assertIn("production environment", str(cm.exception)) + + def test_perform_partial_reset_rejects_future_date(self): + """Test that perform_partial_reset rejects future dates.""" + with patch.dict(os.environ, {"ENV": "development"}): + service = DatabaseService() + future_date = datetime(2099, 1, 1, 0, 0, 0, tzinfo=timezone.utc) + + with self.assertRaises(ValueError) as cm: + asyncio.run(service.perform_partial_reset(future_date, "admin@example.com")) + + self.assertIn("must be in the past", str(cm.exception)) + + +if __name__ == "__main__": + # Run with verbose output + unittest.main(verbosity=2) diff --git a/apollo/tests/test_rhcsaf.py b/apollo/tests/test_rhcsaf.py index 1c62f0a..2b4fc62 100644 --- a/apollo/tests/test_rhcsaf.py +++ b/apollo/tests/test_rhcsaf.py @@ -52,7 +52,29 @@ def setUp(self): "product_identification_helper": { "cpe": "cpe:/o:redhat:enterprise_linux:9.4" } - } + }, + "branches": [ + { + "category": "product_version", + "name": "rsync-0:3.2.3-19.el9_4.1.x86_64", + "product": { + "product_id": "rsync-0:3.2.3-19.el9_4.1.x86_64", + "product_identification_helper": { + "purl": "pkg:rpm/redhat/rsync@3.2.3-19.el9_4.1?arch=x86_64" + } + } + }, + { + "category": "product_version", + "name": "rsync-0:3.2.3-19.el9_4.1.src", + "product": { + "product_id": "rsync-0:3.2.3-19.el9_4.1.src", + "product_identification_helper": { + "purl": "pkg:rpm/redhat/rsync@3.2.3-19.el9_4.1?arch=src" + } + } + } + ] } ] }, @@ -252,4 +274,227 @@ def test_major_only_version(self): self.assertIn( ("Red Hat Enterprise Linux", "Red Hat Enterprise Linux for x86_64", 9, None, "x86_64"), result - ) \ No newline at end of file + ) + + +class TestEUSDetection(unittest.TestCase): + """Test EUS product detection and filtering""" + + def setUp(self): + with patch('common.logger.Logger') as mock_logger_class: + mock_logger_class.return_value = MagicMock() + from apollo.rhcsaf import _is_eus_product + self._is_eus_product = _is_eus_product + + def test_detect_eus_via_cpe(self): + """Test EUS detection via CPE product field""" + # EUS CPE products + self.assertTrue(self._is_eus_product("Some Product", "cpe:/a:redhat:rhel_eus:9.4::appstream")) + self.assertTrue(self._is_eus_product("Some Product", "cpe:/a:redhat:rhel_e4s:9.0::appstream")) + self.assertTrue(self._is_eus_product("Some Product", "cpe:/a:redhat:rhel_aus:8.2::appstream")) + self.assertTrue(self._is_eus_product("Some Product", "cpe:/a:redhat:rhel_tus:8.8::appstream")) + + # Non-EUS CPE product + self.assertFalse(self._is_eus_product("Some Product", "cpe:/a:redhat:enterprise_linux:9::appstream")) + + def test_detect_eus_via_name(self): + """Test EUS detection via product name keywords""" + self.assertTrue(self._is_eus_product("Red Hat Enterprise Linux AppStream EUS (v.9.4)", "")) + self.assertTrue(self._is_eus_product("Red Hat Enterprise Linux AppStream E4S (v.9.0)", "")) + self.assertTrue(self._is_eus_product("Red Hat Enterprise Linux AppStream AUS (v.8.2)", "")) + self.assertTrue(self._is_eus_product("Red Hat Enterprise Linux AppStream TUS (v.8.8)", "")) + + # Non-EUS product name + self.assertFalse(self._is_eus_product("Red Hat Enterprise Linux AppStream", "")) + + def test_eus_filtering_in_affected_products(self): + """Test that EUS products are filtered from affected products""" + csaf = { + "product_tree": { + "branches": [ + { + "branches": [ + { + "category": "product_family", + "name": "Red Hat Enterprise Linux", + "branches": [ + { + "category": "product_name", + "product": { + "name": "Red Hat Enterprise Linux AppStream EUS (v.9.4)", + "product_identification_helper": { + "cpe": "cpe:/a:redhat:rhel_eus:9.4::appstream" + } + } + } + ] + }, + { + "category": "architecture", + "name": "x86_64" + } + ] + } + ] + } + } + + result = extract_rhel_affected_products_for_db(csaf) + # Should be empty because the only product is EUS + self.assertEqual(len(result), 0) + + +class TestModularPackages(unittest.TestCase): + """Test modular package extraction""" + + def test_extract_modular_packages(self): + """Test extraction of modular packages with ::module:stream suffix""" + csaf = { + "document": { + "tracking": { + "initial_release_date": "2025-07-28T00:00:00+00:00", + "current_release_date": "2025-07-28T00:00:00+00:00", + "id": "RHSA-2025:12008" + }, + "title": "Red Hat Security Advisory: Important: redis:7 security update", + "aggregate_severity": {"text": "Important"}, + "notes": [ + {"category": "general", "text": "Test description"}, + {"category": "summary", "text": "Test topic"} + ] + }, + "product_tree": { + "branches": [ + { + "branches": [ + { + "category": "product_family", + "name": "Red Hat Enterprise Linux", + "branches": [ + { + "category": "product_name", + "name": "Red Hat Enterprise Linux 9", + "product": { + "name": "Red Hat Enterprise Linux 9", + "product_identification_helper": { + "cpe": "cpe:/o:redhat:enterprise_linux:9::appstream" + } + }, + "branches": [ + { + "category": "product_version", + "name": "redis-0:7.2.10-1.module+el9.6.0+23332+115a3b01.x86_64::redis:7", + "product": { + "product_id": "redis-0:7.2.10-1.module+el9.6.0+23332+115a3b01.x86_64::redis:7", + "product_identification_helper": { + "purl": "pkg:rpm/redhat/redis@7.2.10-1.module+el9.6.0+23332+115a3b01?arch=x86_64&rpmmod=redis:7:9060020250716081121:9" + } + } + }, + { + "category": "product_version", + "name": "redis-0:7.2.10-1.module+el9.6.0+23332+115a3b01.src::redis:7", + "product": { + "product_id": "redis-0:7.2.10-1.module+el9.6.0+23332+115a3b01.src::redis:7", + "product_identification_helper": { + "purl": "pkg:rpm/redhat/redis@7.2.10-1.module+el9.6.0+23332+115a3b01?arch=src&rpmmod=redis:7:9060020250716081121:9" + } + } + } + ] + } + ] + }, + { + "category": "architecture", + "name": "x86_64" + } + ] + } + ] + }, + "vulnerabilities": [ + { + "cve": "CVE-2025-12345", + "ids": [{"system_name": "Red Hat Bugzilla ID", "text": "123456"}], + "product_status": {"fixed": []}, + "scores": [{"cvss_v3": {"vectorString": "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H", "baseScore": 9.8}}], + "cwe": {"id": "CWE-79"} + } + ] + } + + result = red_hat_advisory_scraper(csaf) + + # Check that modular packages were extracted with ::module:stream stripped + self.assertIn("redis-0:7.2.10-1.module+el9.6.0+23332+115a3b01.x86_64", result["red_hat_fixed_packages"]) + self.assertIn("redis-0:7.2.10-1.module+el9.6.0+23332+115a3b01.src", result["red_hat_fixed_packages"]) + + # Verify epoch is preserved + for pkg in result["red_hat_fixed_packages"]: + if "redis" in pkg: + self.assertIn("0:", pkg, "Epoch should be preserved in NEVRA") + + +class TestEUSAdvisoryFiltering(unittest.TestCase): + """Test that EUS-only advisories are filtered out""" + + def test_eus_only_advisory_returns_none(self): + """Test that advisory with only EUS products returns None""" + csaf = { + "document": { + "tracking": { + "initial_release_date": "2025-01-01T00:00:00+00:00", + "current_release_date": "2025-01-01T00:00:00+00:00", + "id": "RHSA-2025:9756" + }, + "title": "Red Hat Security Advisory: Important: package security update", + "aggregate_severity": {"text": "Important"}, + "notes": [ + {"category": "general", "text": "EUS advisory"}, + {"category": "summary", "text": "EUS topic"} + ] + }, + "product_tree": { + "branches": [ + { + "branches": [ + { + "category": "product_family", + "name": "Red Hat Enterprise Linux", + "branches": [ + { + "category": "product_name", + "name": "Red Hat Enterprise Linux AppStream EUS (v.9.4)", + "product": { + "name": "Red Hat Enterprise Linux AppStream EUS (v.9.4)", + "product_identification_helper": { + "cpe": "cpe:/a:redhat:rhel_eus:9.4::appstream" + } + } + } + ] + }, + { + "category": "architecture", + "name": "x86_64" + } + ] + } + ] + }, + "vulnerabilities": [ + { + "cve": "CVE-2025-99999", + "ids": [{"system_name": "Red Hat Bugzilla ID", "text": "999999"}], + "product_status": {"fixed": []}, + "scores": [{"cvss_v3": {"vectorString": "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H", "baseScore": 9.8}}], + "cwe": {"id": "CWE-79"} + } + ] + } + + result = red_hat_advisory_scraper(csaf) + + # Advisory should be filtered out (return None) because all products are EUS + self.assertIsNone(result) \ No newline at end of file diff --git a/scripts/generate_rocky_config.py b/scripts/generate_rocky_config.py index 1ae0438..1aafea8 100644 --- a/scripts/generate_rocky_config.py +++ b/scripts/generate_rocky_config.py @@ -602,7 +602,7 @@ def parse_repomd_path(repomd_url: str, base_url: str) -> Dict[str, str]: def build_mirror_config( - version: str, arch: str, name_suffix: Optional[str] = None + version: str, arch: str, name_suffix: Optional[str] = None, mirror_name_base: Optional[str] = None ) -> Dict[str, Any]: """ Build a mirror configuration dictionary. @@ -611,15 +611,19 @@ def build_mirror_config( version: Rocky Linux version arch: Architecture name_suffix: Optional suffix for mirror name + mirror_name_base: Optional custom base for mirror name (e.g., "Rocky Linux 9") Returns: Mirror configuration dictionary """ - # Build mirror name with optional suffix - if name_suffix is not None and name_suffix != "": - mirror_name = f"Rocky Linux {version} {name_suffix} {arch}" + # Build mirror name with optional custom base or suffix + if not mirror_name_base: + mirror_name_base = f"Rocky Linux {version}" + + if name_suffix: + mirror_name = f"{mirror_name_base} {name_suffix} {arch}" else: - mirror_name = f"Rocky Linux {version} {arch}" + mirror_name = f"{mirror_name_base} {arch}" # Parse version to extract major and minor components if version != UNKNOWN_VALUE and "." in version: @@ -690,6 +694,7 @@ def generate_rocky_config( include_source: bool = True, architectures: List[str] = None, name_suffix: Optional[str] = None, + mirror_name_base: Optional[str] = None, ) -> List[Dict[str, Any]]: """ Generate Rocky Linux configuration by discovering repository structure. @@ -702,6 +707,7 @@ def generate_rocky_config( include_source: Whether to include source repository URLs (default: True) architectures: List of architectures to include (default: auto-detect) name_suffix: Optional suffix to add to mirror names (e.g., "test", "staging") + mirror_name_base: Optional custom base for mirror name (e.g., "Rocky Linux 9") Returns: List of configuration dictionaries ready for JSON export @@ -730,10 +736,12 @@ def generate_rocky_config( continue # Skip if version filter specified and doesn't match + # Supports both exact version match (e.g., "9.5") and major version match (e.g., "9") if ( version and metadata["version"] != version and metadata["version"] != UNKNOWN_VALUE + and metadata["version"].split(".")[0] != version ): continue @@ -773,7 +781,7 @@ def generate_rocky_config( if not detected_version: detected_version = UNKNOWN_VALUE - mirror_config = build_mirror_config(detected_version, arch, name_suffix) + mirror_config = build_mirror_config(detected_version, arch, name_suffix, mirror_name_base) # Group repos by name and type repo_groups = {} @@ -828,6 +836,8 @@ def main(): %(prog)s https://mirror.example.com/pub/rocky/ --output rocky_config.json %(prog)s https://mirror.example.com/pub/rocky/ --name-suffix test --version 9.6 %(prog)s https://staging.example.com/pub/rocky/ --name-suffix staging --arch riscv64 + %(prog)s https://mirror.example.com/pub/rocky/ --mirror-name-base "Rocky Linux 9" --version 9.6 + %(prog)s https://mirror.example.com/pub/rocky/ --mirror-name-base "Rocky Linux 9 (Legacy)" --version 9 """, ) @@ -880,6 +890,11 @@ def main(): help="Optional suffix to add to mirror names (e.g., 'test', 'staging')", ) + parser.add_argument( + "--mirror-name-base", + help="Optional custom base for mirror name (e.g., 'Rocky Linux 9' instead of 'Rocky Linux 9.6')", + ) + parser.add_argument("--output", "-o", help="Output file path (default: stdout)") parser.add_argument( @@ -926,6 +941,7 @@ def main(): include_source=not args.no_source, architectures=args.arch, name_suffix=args.name_suffix, + mirror_name_base=args.mirror_name_base, ) if not config: