Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 48 additions & 9 deletions src/cyhy_cvesync/cve_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,16 @@ async def process_cve_json(cve_json: dict) -> Tuple[int, int]:
created_cve_docs_count = 0
updated_cve_docs_count = 0

# Validate top-level JSON structure
if not isinstance(cve_json, dict):
raise ValueError("CVE data must be a dictionary, got: %s" % type(cve_json).__name__)

if cve_json.get("CVE_data_type") != "CVE":
raise ValueError("JSON does not look like valid CVE data.")

cve_items = cve_json.get("CVE_Items", [])
if not isinstance(cve_items, list):
raise ValueError("CVE_Items must be a list, got: %s" % type(cve_items).__name__)

logger.info(
"Async task %d: Starting to process %d CVEs",
Expand All @@ -57,35 +63,66 @@ async def process_cve_json(cve_json: dict) -> Tuple[int, int]:
)
for cve in cve_items:
try:
# Validate CVE item structure
if not isinstance(cve, dict):
logger.warning("Skipping non-dict CVE item: %s", type(cve).__name__)
continue

cve_id = cve["cve"]["CVE_data_meta"]["ID"]
except KeyError:
# JSON might be malformed, so we'll log what the CVE object looks like
# and then raise an error
logger.error("CVE object: %s", cve)
logger.error("CVE object missing required fields: %s", cve)
raise ValueError("JSON does not look like valid CVE data.")
# All fields are there but "ID" field is empty
if not cve_id:
raise ValueError("CVE ID is empty.")

# Validate CVE ID format and content
if not cve_id or not isinstance(cve_id, str):
raise ValueError("CVE ID is empty or not a string.")

if not cve_id.startswith("CVE-"):
logger.warning("CVE ID has unexpected format: %s", cve_id)

# Only process CVEs that have CVSS V2 or V3 data
if any(k in cve["impact"] for k in ["baseMetricV2", "baseMetricV3"]):
impact = cve.get("impact", {})
if not isinstance(impact, dict):
logger.warning("Skipping CVE %s: invalid impact data", cve_id)
continue

if any(k in impact for k in ["baseMetricV2", "baseMetricV3"]):
# Check if the CVE document already exists in the database
global cve_map
async with cve_map_lock:
cve_doc = cve_map.pop(cve_id, None)

version = "V3" if "baseMetricV3" in cve["impact"] else "V2"
version = "V3" if "baseMetricV3" in impact else "V2"
try:
cvss_base_score = cve["impact"]["baseMetric" + version][
cvss_base_score = impact["baseMetric" + version][
"cvss" + version
]["baseScore"]
cvss_version_temp = cve["impact"]["baseMetric" + version][
cvss_version_temp = impact["baseMetric" + version][
"cvss" + version
]["version"]
except KeyError:
logger.error("CVE object: %s", cve)
logger.error("CVE object missing CVSS data: %s", cve)
raise ValueError("JSON does not look like valid CVE data.")

# Validate CVSS score
if not isinstance(cvss_base_score, (int, float)):
logger.warning("Skipping CVE %s: CVSS score is not numeric: %s",
cve_id, cvss_base_score)
continue

if not (0.0 <= cvss_base_score <= 10.0):
logger.warning("Skipping CVE %s: CVSS score out of range (0.0-10.0): %s",
cve_id, cvss_base_score)
continue

# Validate CVSS version
if not isinstance(cvss_version_temp, str):
logger.warning("Skipping CVE %s: CVSS version is not a string: %s",
cve_id, cvss_version_temp)
continue

if cve_doc: # Update existing CVE doc
if (
cve_doc.cvss_score != cvss_base_score
Expand All @@ -106,6 +143,8 @@ async def process_cve_json(cve_json: dict) -> Tuple[int, int]:
await cve_doc.save()
logger.info("Created CVE document with id: %s", cve_id)
created_cve_docs_count += 1
else:
logger.debug("Skipping CVE %s: no CVSS data", cve_id)
logger.info(
"Async task %d: Created %d CVE document(s), updated %d CVE document(s)",
id(asyncio.current_task()),
Expand Down
50 changes: 50 additions & 0 deletions tests/test_cvesync.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,56 @@ async def test_process_cve_json_invalid_cve_data_type():
await process_cve_json({"CVE_data_type": "INVALID", "CVE_Items": []})


async def test_process_cve_json_non_dict():
"""Test processing non-dictionary CVE data."""
with pytest.raises(ValueError, match="CVE data must be a dictionary"):
await process_cve_json("not a dict")


async def test_process_cve_json_invalid_cve_items():
"""Test processing CVE data with invalid CVE_Items."""
with pytest.raises(ValueError, match="CVE_Items must be a list"):
await process_cve_json({"CVE_data_type": "CVE", "CVE_Items": "not a list"})


async def test_process_cve_json_invalid_cvss_score():
"""Test processing CVE with invalid CVSS score."""
cve_json_invalid_score = {
"CVE_data_type": "CVE",
"CVE_Items": [
{
"cve": {"CVE_data_meta": {"ID": "CVE-2023-1234"}},
"impact": {
"baseMetricV3": {"cvssV3": {"baseScore": "not_a_number", "version": "3.1"}}
},
}
],
}
# Should not raise exception but skip the invalid CVE
created, updated = await process_cve_json(cve_json_invalid_score)
assert created == 0
assert updated == 0


async def test_process_cve_json_cvss_score_out_of_range():
"""Test processing CVE with CVSS score out of range."""
cve_json_out_of_range = {
"CVE_data_type": "CVE",
"CVE_Items": [
{
"cve": {"CVE_data_meta": {"ID": "CVE-2023-1234"}},
"impact": {
"baseMetricV3": {"cvssV3": {"baseScore": 15.0, "version": "3.1"}} # Invalid: > 10.0
},
}
],
}
# Should not raise exception but skip the invalid CVE
created, updated = await process_cve_json(cve_json_out_of_range)
assert created == 0
assert updated == 0


async def test_process_cve_json_malformed_1():
"""Test processing malformed CVE JSON data."""
with pytest.raises(ValueError, match="JSON does not look like valid CVE data."):
Expand Down
Loading