From 293b3583eab3f2b2bf457661ce24e8ff2e2f847b Mon Sep 17 00:00:00 2001 From: Lochlan McElroy <85311595+lochlanmcelroy@users.noreply.github.com> Date: Thu, 31 Jul 2025 23:21:28 -0500 Subject: [PATCH] Add comprehensive CVE data validation to prevent processing crashes --- src/cyhy_cvesync/cve_sync.py | 57 ++++++++++++++++++++++++++++++------ tests/test_cvesync.py | 50 +++++++++++++++++++++++++++++++ 2 files changed, 98 insertions(+), 9 deletions(-) diff --git a/src/cyhy_cvesync/cve_sync.py b/src/cyhy_cvesync/cve_sync.py index 1c8c1e0..fe40444 100644 --- a/src/cyhy_cvesync/cve_sync.py +++ b/src/cyhy_cvesync/cve_sync.py @@ -45,10 +45,16 @@ async def process_cve_json(cve_json: dict) -> Tuple[int, int]: created_cve_docs_count = 0 updated_cve_docs_count = 0 + # Validate top-level JSON structure + if not isinstance(cve_json, dict): + raise ValueError("CVE data must be a dictionary, got: %s" % type(cve_json).__name__) + if cve_json.get("CVE_data_type") != "CVE": raise ValueError("JSON does not look like valid CVE data.") cve_items = cve_json.get("CVE_Items", []) + if not isinstance(cve_items, list): + raise ValueError("CVE_Items must be a list, got: %s" % type(cve_items).__name__) logger.info( "Async task %d: Starting to process %d CVEs", @@ -57,35 +63,66 @@ async def process_cve_json(cve_json: dict) -> Tuple[int, int]: ) for cve in cve_items: try: + # Validate CVE item structure + if not isinstance(cve, dict): + logger.warning("Skipping non-dict CVE item: %s", type(cve).__name__) + continue + cve_id = cve["cve"]["CVE_data_meta"]["ID"] except KeyError: # JSON might be malformed, so we'll log what the CVE object looks like # and then raise an error - logger.error("CVE object: %s", cve) + logger.error("CVE object missing required fields: %s", cve) raise ValueError("JSON does not look like valid CVE data.") - # All fields are there but "ID" field is empty - if not cve_id: - raise ValueError("CVE ID is empty.") + + # Validate CVE ID format and content + if not cve_id or not isinstance(cve_id, str): + raise ValueError("CVE ID is empty or not a string.") + + if not cve_id.startswith("CVE-"): + logger.warning("CVE ID has unexpected format: %s", cve_id) # Only process CVEs that have CVSS V2 or V3 data - if any(k in cve["impact"] for k in ["baseMetricV2", "baseMetricV3"]): + impact = cve.get("impact", {}) + if not isinstance(impact, dict): + logger.warning("Skipping CVE %s: invalid impact data", cve_id) + continue + + if any(k in impact for k in ["baseMetricV2", "baseMetricV3"]): # Check if the CVE document already exists in the database global cve_map async with cve_map_lock: cve_doc = cve_map.pop(cve_id, None) - version = "V3" if "baseMetricV3" in cve["impact"] else "V2" + version = "V3" if "baseMetricV3" in impact else "V2" try: - cvss_base_score = cve["impact"]["baseMetric" + version][ + cvss_base_score = impact["baseMetric" + version][ "cvss" + version ]["baseScore"] - cvss_version_temp = cve["impact"]["baseMetric" + version][ + cvss_version_temp = impact["baseMetric" + version][ "cvss" + version ]["version"] except KeyError: - logger.error("CVE object: %s", cve) + logger.error("CVE object missing CVSS data: %s", cve) raise ValueError("JSON does not look like valid CVE data.") + # Validate CVSS score + if not isinstance(cvss_base_score, (int, float)): + logger.warning("Skipping CVE %s: CVSS score is not numeric: %s", + cve_id, cvss_base_score) + continue + + if not (0.0 <= cvss_base_score <= 10.0): + logger.warning("Skipping CVE %s: CVSS score out of range (0.0-10.0): %s", + cve_id, cvss_base_score) + continue + + # Validate CVSS version + if not isinstance(cvss_version_temp, str): + logger.warning("Skipping CVE %s: CVSS version is not a string: %s", + cve_id, cvss_version_temp) + continue + if cve_doc: # Update existing CVE doc if ( cve_doc.cvss_score != cvss_base_score @@ -106,6 +143,8 @@ async def process_cve_json(cve_json: dict) -> Tuple[int, int]: await cve_doc.save() logger.info("Created CVE document with id: %s", cve_id) created_cve_docs_count += 1 + else: + logger.debug("Skipping CVE %s: no CVSS data", cve_id) logger.info( "Async task %d: Created %d CVE document(s), updated %d CVE document(s)", id(asyncio.current_task()), diff --git a/tests/test_cvesync.py b/tests/test_cvesync.py index 915da43..6702051 100644 --- a/tests/test_cvesync.py +++ b/tests/test_cvesync.py @@ -44,6 +44,56 @@ async def test_process_cve_json_invalid_cve_data_type(): await process_cve_json({"CVE_data_type": "INVALID", "CVE_Items": []}) +async def test_process_cve_json_non_dict(): + """Test processing non-dictionary CVE data.""" + with pytest.raises(ValueError, match="CVE data must be a dictionary"): + await process_cve_json("not a dict") + + +async def test_process_cve_json_invalid_cve_items(): + """Test processing CVE data with invalid CVE_Items.""" + with pytest.raises(ValueError, match="CVE_Items must be a list"): + await process_cve_json({"CVE_data_type": "CVE", "CVE_Items": "not a list"}) + + +async def test_process_cve_json_invalid_cvss_score(): + """Test processing CVE with invalid CVSS score.""" + cve_json_invalid_score = { + "CVE_data_type": "CVE", + "CVE_Items": [ + { + "cve": {"CVE_data_meta": {"ID": "CVE-2023-1234"}}, + "impact": { + "baseMetricV3": {"cvssV3": {"baseScore": "not_a_number", "version": "3.1"}} + }, + } + ], + } + # Should not raise exception but skip the invalid CVE + created, updated = await process_cve_json(cve_json_invalid_score) + assert created == 0 + assert updated == 0 + + +async def test_process_cve_json_cvss_score_out_of_range(): + """Test processing CVE with CVSS score out of range.""" + cve_json_out_of_range = { + "CVE_data_type": "CVE", + "CVE_Items": [ + { + "cve": {"CVE_data_meta": {"ID": "CVE-2023-1234"}}, + "impact": { + "baseMetricV3": {"cvssV3": {"baseScore": 15.0, "version": "3.1"}} # Invalid: > 10.0 + }, + } + ], + } + # Should not raise exception but skip the invalid CVE + created, updated = await process_cve_json(cve_json_out_of_range) + assert created == 0 + assert updated == 0 + + async def test_process_cve_json_malformed_1(): """Test processing malformed CVE JSON data.""" with pytest.raises(ValueError, match="JSON does not look like valid CVE data."):