Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ jobs:
bazel test //apollo/tests:test_auth --test_output=all
bazel test //apollo/tests:test_validation --test_output=all
bazel test //apollo/tests:test_admin_routes_supported_products --test_output=all
bazel test //apollo/tests:test_api_osv --test_output=all

- name: Integration Tests
run: ./build/scripts/test.bash
14 changes: 5 additions & 9 deletions apollo/server/routes/api_osv.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,6 @@ def to_osv_advisory(ui_url: str, advisory: Advisory) -> OSVAdvisory:
for pkg in affected_packages:
x = pkg[0]
nevra = pkg[1]
# Only process "src" packages
if nevra.group(5) != "src":
continue
if x.nevra in processed_nvra:
Expand Down Expand Up @@ -198,11 +197,9 @@ def to_osv_advisory(ui_url: str, advisory: Advisory) -> OSVAdvisory:
if advisory.red_hat_advisory:
osv_credits.append(OSVCredit(name="Red Hat"))

# Calculate severity by finding the highest CVSS score
highest_cvss_base_score = 0.0
final_score_vector = None
for x in advisory.cves:
# Convert cvss3_scoring_vector to a float
base_score = x.cvss3_base_score
if base_score and base_score != "UNKNOWN":
base_score = float(base_score)
Expand Down Expand Up @@ -255,15 +252,14 @@ async def get_advisories_osv(
cve,
synopsis,
severity,
kind="Security",
kind=None,
fetch_related=True,
)
count = fetch_adv[0]
advisories = fetch_adv[1]

ui_url = await get_setting(UI_URL)
osv_advisories = [to_osv_advisory(ui_url, x) for x in advisories]
page = create_page(osv_advisories, count, params)
osv_advisories = [to_osv_advisory(ui_url, adv) for adv in advisories if adv.cves]
page = create_page(osv_advisories, len(osv_advisories), params)

state = await RedHatIndexState.first()
page.last_updated_at = (
Expand All @@ -282,7 +278,7 @@ async def get_advisories_osv(
)
async def get_advisory_osv(advisory_id: str):
advisory = (
await Advisory.filter(name=advisory_id, kind="Security")
await Advisory.filter(name=advisory_id)
.prefetch_related(
"packages",
"cves",
Expand All @@ -295,7 +291,7 @@ async def get_advisory_osv(advisory_id: str):
.get_or_none()
)

if not advisory:
if not advisory or not advisory.cves:
raise HTTPException(404)

ui_url = await get_setting(UI_URL)
Expand Down
8 changes: 8 additions & 0 deletions apollo/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,11 @@ py_test(
"//apollo/server:server_lib",
],
)

py_test(
name = "test_api_osv",
srcs = ["test_api_osv.py"],
deps = [
"//apollo/server:server_lib",
],
)
248 changes: 248 additions & 0 deletions apollo/tests/test_api_osv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
"""
Tests for OSV API CVE filtering functionality
"""

import unittest
import datetime
from unittest.mock import Mock

from apollo.server.routes.api_osv import to_osv_advisory


class MockSupportedProduct:
"""Mock SupportedProduct model"""

def __init__(self, variant="Rocky Linux", vendor="Rocky Enterprise Software Foundation"):
self.variant = variant
self.vendor = vendor


class MockSupportedProductsRhMirror:
"""Mock SupportedProductsRhMirror model"""

def __init__(self, match_major_version=9):
self.match_major_version = match_major_version


class MockPackage:
"""Mock Package model"""

def __init__(
self,
nevra,
product_name="Rocky Linux 9",
repo_name="BaseOS",
supported_product=None,
supported_products_rh_mirror=None,
):
self.nevra = nevra
self.product_name = product_name
self.repo_name = repo_name
self.supported_product = supported_product or MockSupportedProduct()
self.supported_products_rh_mirror = supported_products_rh_mirror


class MockCVE:
"""Mock CVE model"""

def __init__(
self,
cve="CVE-2024-1234",
cvss3_base_score="7.5",
cvss3_scoring_vector="CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:N/A:N",
):
self.cve = cve
self.cvss3_base_score = cvss3_base_score
self.cvss3_scoring_vector = cvss3_scoring_vector


class MockFix:
"""Mock Fix model"""

def __init__(self, source="https://bugzilla.redhat.com/show_bug.cgi?id=1234567"):
self.source = source


class MockAdvisory:
"""Mock Advisory model"""

def __init__(
self,
name="RLSA-2024:1234",
synopsis="Important: test security update",
description="A security update for test package",
published_at=None,
updated_at=None,
packages=None,
cves=None,
fixes=None,
red_hat_advisory=None,
):
self.name = name
self.synopsis = synopsis
self.description = description
self.published_at = published_at or datetime.datetime.now(
datetime.timezone.utc
)
self.updated_at = updated_at or datetime.datetime.now(datetime.timezone.utc)
self.packages = packages or []
self.cves = cves or []
self.fixes = fixes or []
self.red_hat_advisory = red_hat_advisory


class TestOSVCVEFiltering(unittest.TestCase):
"""Test CVE filtering logic in OSV API"""

def test_advisory_with_cve_has_upstream_references(self):
"""Test that advisories with CVEs have upstream references populated"""
packages = [
MockPackage(
nevra="pcs-0:0.11.8-2.el9_5.src",
supported_products_rh_mirror=MockSupportedProductsRhMirror(9),
),
]
cves = [MockCVE(cve="CVE-2024-1234")]

advisory = MockAdvisory(packages=packages, cves=cves)
result = to_osv_advisory("https://errata.rockylinux.org", advisory)

self.assertIsNotNone(result.upstream)
self.assertEqual(len(result.upstream), 1)
self.assertIn("CVE-2024-1234", result.upstream)

def test_advisory_with_multiple_cves(self):
"""Test that advisories with multiple CVEs include all in upstream"""
packages = [
MockPackage(
nevra="openssl-1:3.0.7-28.el9_5.src",
supported_products_rh_mirror=MockSupportedProductsRhMirror(9),
),
]
cves = [
MockCVE(cve="CVE-2024-1111"),
MockCVE(cve="CVE-2024-2222"),
MockCVE(cve="CVE-2024-3333"),
]

advisory = MockAdvisory(packages=packages, cves=cves)
result = to_osv_advisory("https://errata.rockylinux.org", advisory)

self.assertIsNotNone(result.upstream)
self.assertEqual(len(result.upstream), 3)
self.assertIn("CVE-2024-1111", result.upstream)
self.assertIn("CVE-2024-2222", result.upstream)
self.assertIn("CVE-2024-3333", result.upstream)

def test_advisory_without_cves_has_empty_upstream(self):
"""Test that advisories without CVEs have empty upstream list"""
packages = [
MockPackage(
nevra="kernel-0:5.14.0-427.el9.src",
supported_products_rh_mirror=MockSupportedProductsRhMirror(9),
),
]

advisory = MockAdvisory(packages=packages, cves=[])
result = to_osv_advisory("https://errata.rockylinux.org", advisory)

self.assertIsNotNone(result.upstream)
self.assertEqual(len(result.upstream), 0)

def test_source_packages_only(self):
"""Test that only source packages are processed, not binary packages"""
packages = [
MockPackage(
nevra="httpd-0:2.4.57-8.el9.src",
supported_products_rh_mirror=MockSupportedProductsRhMirror(9),
),
MockPackage(
nevra="httpd-0:2.4.57-8.el9.x86_64",
supported_products_rh_mirror=MockSupportedProductsRhMirror(9),
),
MockPackage(
nevra="httpd-0:2.4.57-8.el9.aarch64",
supported_products_rh_mirror=MockSupportedProductsRhMirror(9),
),
]
cves = [MockCVE()]

advisory = MockAdvisory(packages=packages, cves=cves)
result = to_osv_advisory("https://errata.rockylinux.org", advisory)

# Should only have 1 affected package (the source package)
self.assertEqual(len(result.affected), 1)
self.assertEqual(result.affected[0].package.name, "httpd")

def test_severity_from_highest_cvss(self):
"""Test that severity uses the highest CVSS score from multiple CVEs"""
packages = [
MockPackage(
nevra="vim-2:9.0.1592-1.el9.src",
supported_products_rh_mirror=MockSupportedProductsRhMirror(9),
),
]
cves = [
MockCVE(
cve="CVE-2024-1111",
cvss3_base_score="5.5",
cvss3_scoring_vector="CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:L/I:N/A:N",
),
MockCVE(
cve="CVE-2024-2222",
cvss3_base_score="9.8",
cvss3_scoring_vector="CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H",
),
MockCVE(
cve="CVE-2024-3333",
cvss3_base_score="7.5",
cvss3_scoring_vector="CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:N/A:N",
),
]

advisory = MockAdvisory(packages=packages, cves=cves)
result = to_osv_advisory("https://errata.rockylinux.org", advisory)

self.assertIsNotNone(result.severity)
self.assertEqual(len(result.severity), 1)
self.assertEqual(result.severity[0].type, "CVSS_V3")
self.assertEqual(
result.severity[0].score, "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H"
)

def test_ecosystem_format(self):
"""Test that ecosystem field is formatted correctly"""
packages = [
MockPackage(
nevra="bash-0:5.1.8-9.el9.src",
product_name="Rocky Linux 9",
supported_products_rh_mirror=MockSupportedProductsRhMirror(9),
),
]
cves = [MockCVE()]

advisory = MockAdvisory(packages=packages, cves=cves)
result = to_osv_advisory("https://errata.rockylinux.org", advisory)

self.assertEqual(len(result.affected), 1)
self.assertEqual(result.affected[0].package.ecosystem, "Rocky Linux:9")

def test_version_format_with_epoch(self):
"""Test that fixed version includes epoch in epoch:version-release format"""
packages = [
MockPackage(
nevra="systemd-0:252-38.el9_5.src",
supported_products_rh_mirror=MockSupportedProductsRhMirror(9),
),
]
cves = [MockCVE()]

advisory = MockAdvisory(packages=packages, cves=cves)
result = to_osv_advisory("https://errata.rockylinux.org", advisory)

fixed_version = result.affected[0].ranges[0].events[1].fixed
self.assertEqual(fixed_version, "0:252-38.el9_5")


if __name__ == "__main__":
unittest.main(verbosity=2)
Loading