diff --git a/NVD-PERF-ANALYSIS.md b/NVD-PERF-ANALYSIS.md new file mode 100644 index 000000000..2f2c76b76 --- /dev/null +++ b/NVD-PERF-ANALYSIS.md @@ -0,0 +1,70 @@ +# NVD Fix-Date Performance Analysis + +## Problem Summary + +Two related bottlenecks cause NVD provider syncs to run slowly, both sharing the same root cause: per-item I/O inside a hot loop. + +--- + +## Bottleneck 1: `NVDOverrides.cve()` — Per-CVE File Reads + +**Location:** `src/vunnel/providers/nvd/overrides.py` + +**Root cause:** `cve()` maintained a filepath index (CVE ID → path on disk) but opened, read, and JSON-parsed the file on every single call. With ~250k CVEs in a full sync, that is ~250k `open()` + `json.loads()` calls — one per CVE lookup. + +A `# TODO: implement in-memory index` comment already marked the problem in the original code. + +**Fix:** Replace the filepath index with a fully parsed in-memory dict built once on first access. All subsequent `cve()` calls become O(1) dict lookups with zero I/O. + +--- + +## Bottleneck 2: `GrypeDBStore.get()` — Per-CPE SQLite Queries + +**Location:** `src/vunnel/tool/fixdate/grype_db_first_observed.py` + +**Root cause:** `get()` executed an individual `SELECT` against the SQLite fix-date database for every `(vuln_id, cpe_or_package)` pair during NVD processing. Each CVE can have 5–20 CPE matches, and a full NVD sync processes ~250k CVEs, yielding: + +``` +250,000 CVEs × 5–20 CPE matches = 1,250,000 – 5,000,000 SQLite queries per sync +``` + +Each query incurred: +- Python → SQLAlchemy → SQLite3 driver overhead +- A full query plan execution (even with indexes) +- Result deserialization + +At even 0.1 ms per query, 2.5M queries = ~4 minutes of pure SQLite overhead. + +**Scale of the problem:** The fix-date database typically contains tens of thousands of rows (one per CVE/package combination where a fix date was observed). The entire table fits comfortably in memory. + +--- + +## Fix: Bulk-Load Both into Memory at Startup + +The fix for both bottlenecks is the same pattern: **load once, look up in O(1)**. + +### NVDOverrides fix + +`_build_data_by_cve()` globs all `CVE-*.json` files, reads and parses each once, and stores the result in `__data_by_cve__: dict[str, Any]`. The dict is populated lazily on first call and reused for all subsequent `cve()` calls. + +### GrypeDBStore fix + +`_build_index()` executes a single `SELECT * FROM fixdates` after the ORAS download completes, then splits the results into two in-memory dicts: + +- `_cpe_index`: keyed by `(vuln_id.lower(), full_cpe.lower())` +- `_pkg_index`: keyed by `(vuln_id.lower(), package_name.lower(), ecosystem.lower())` + +`get()` is replaced with dict lookups against these indexes. The index is built lazily on first `get()` call, ensuring it works correctly whether or not the download was a no-op (digest cache hit). + +The SQLAlchemy connection infrastructure (`_get_connection`, `cleanup_thread_connections`) is retained — it is still required by `get_changed_vuln_ids_since()`, which queries the `runs` table separately. + +--- + +## Files Changed + +| File | Change | +|------|--------| +| `src/vunnel/providers/nvd/overrides.py` | In-memory JSON dict; remove per-call file reads | +| `src/vunnel/tool/fixdate/grype_db_first_observed.py` | Add `_build_index()`, replace `get()` with dict lookup | +| `tests/unit/providers/nvd/test_overrides.py` | Update field name, add in-memory assertion | +| `tests/unit/tool/test_grype_db_first_observed.py` | Add index-based test | diff --git a/src/vunnel/providers/nvd/overrides.py b/src/vunnel/providers/nvd/overrides.py index f6954ad1a..5e9684b98 100644 --- a/src/vunnel/providers/nvd/overrides.py +++ b/src/vunnel/providers/nvd/overrides.py @@ -3,6 +3,7 @@ import glob import logging import os +import threading from typing import TYPE_CHECKING, Any from orjson import loads @@ -35,7 +36,8 @@ def __init__( # noqa: PLR0913 if not logger: logger = logging.getLogger(self.__class__.__name__) self.logger = logger - self.__filepaths_by_cve__: dict[str, str] | None = None + self.__data_by_cve__: dict[str, Any] | None = None + self._lock = threading.Lock() @property def url(self) -> str: @@ -59,33 +61,32 @@ def download(self) -> None: def _extract_path(self) -> str: return os.path.join(self.workspace.input_path, self.__extract_name__) - def _build_files_by_cve(self) -> dict[str, Any]: - filepaths_by_cve__: dict[str, str] = {} + def _build_data_by_cve(self) -> dict[str, Any]: + data: dict[str, Any] = {} for path in glob.glob(os.path.join(self._extract_path, "**/data/**/", "CVE-*.json"), recursive=True): cve_id = os.path.basename(path).removesuffix(".json").upper() - filepaths_by_cve__[cve_id] = path - - return filepaths_by_cve__ + with open(path) as f: + data[cve_id] = loads(f.read()) + return data + + def _ensure_loaded(self) -> dict[str, Any]: + if self.__data_by_cve__ is None: + with self._lock: + if self.__data_by_cve__ is None: + self.__data_by_cve__ = self._build_data_by_cve() + data = self.__data_by_cve__ + if data is None: + raise RuntimeError("_build_data_by_cve returned None unexpectedly") + return data def cve(self, cve_id: str) -> dict[str, Any] | None: if not self.enabled: return None - if self.__filepaths_by_cve__ is None: - self.__filepaths_by_cve__ = self._build_files_by_cve() - - # TODO: implement in-memory index - path = self.__filepaths_by_cve__.get(cve_id.upper()) - if path and os.path.exists(path): - with open(path) as f: - return loads(f.read()) - return None + return self._ensure_loaded().get(cve_id.upper()) def cves(self) -> list[str]: if not self.enabled: return [] - if self.__filepaths_by_cve__ is None: - self.__filepaths_by_cve__ = self._build_files_by_cve() - - return list(self.__filepaths_by_cve__.keys()) + return list(self._ensure_loaded().keys()) diff --git a/src/vunnel/tool/fixdate/grype_db_first_observed.py b/src/vunnel/tool/fixdate/grype_db_first_observed.py index de6ec849d..bbec594fe 100644 --- a/src/vunnel/tool/fixdate/grype_db_first_observed.py +++ b/src/vunnel/tool/fixdate/grype_db_first_observed.py @@ -141,6 +141,9 @@ def __init__(self, ws: workspace.Workspace) -> None: self._thread_local = threading.local() self._not_found = False self._downloaded = False + self._cpe_index: dict[tuple[str, str], list[FixDate]] | None = None + self._pkg_index: dict[tuple[str, str, str], list[FixDate]] | None = None + self._index_lock = threading.Lock() def _get_remote_digest(self, image_ref: str) -> str | None: """Get the digest of a remote OCI artifact using oras client. @@ -295,6 +298,61 @@ def download(self) -> None: self.logger.error(f"failed to fetch fix date database for {self.provider}: {e}") raise + def _build_index(self) -> None: + """bulk-load all fixdate rows into in-memory dicts for O(1) lookups + + No provider filter is applied here: each Store downloads from a + provider-scoped OCI image (ghcr.io/anchore/grype-db-observed-fix-date/{provider}), + so the database on disk only ever contains rows for this provider. + """ + conn, table = self._get_connection() + rows = conn.execute(table.select()).fetchall() + cpe_index: dict[tuple[str, str], list[FixDate]] = {} + pkg_index: dict[tuple[str, str, str], list[FixDate]] = {} + for row in rows: + if not row.first_observed_date: + continue + fd = FixDate( + vuln_id=row.vuln_id, + provider=row.provider, + package_name=row.package_name, + full_cpe=row.full_cpe, + ecosystem=row.ecosystem, + fix_version=row.fix_version, + first_observed_date=date.fromisoformat(row.first_observed_date), + resolution=row.resolution, + source=row.source, + run_id=row.run_id, + database_id=row.database_id, + updated_at=row.updated_at, + ) + if row.full_cpe: + key: tuple[str, str] = (row.vuln_id.lower(), row.full_cpe.lower()) + cpe_index.setdefault(key, []).append(fd) + else: + pkey: tuple[str, str, str] = ( + row.vuln_id.lower(), + row.package_name.lower(), + (row.ecosystem or "").lower(), + ) + pkg_index.setdefault(pkey, []).append(fd) + self._cpe_index = cpe_index + self._pkg_index = pkg_index + + def _ensure_index( + self, + ) -> tuple[dict[tuple[str, str], list[FixDate]], dict[tuple[str, str, str], list[FixDate]]]: + """return the in-memory indexes, building them on first call (thread-safe)""" + if self._cpe_index is None or self._pkg_index is None: + with self._index_lock: + if self._cpe_index is None or self._pkg_index is None: + self._build_index() + cpe_index = self._cpe_index + pkg_index = self._pkg_index + if cpe_index is None or pkg_index is None: + raise RuntimeError("index build failed: indexes are not populated") + return cpe_index, pkg_index + def get( self, vuln_id: str, @@ -312,57 +370,25 @@ def get( # if the database is empty and return no results. return [] - conn, table = self._get_connection() + cpe_index, pkg_index = self._ensure_index() - # build query - if cpe_or_package looks like a CPE, search by full_cpe, otherwise by package_name - query = table.select().where( - (table.c.vuln_id == vuln_id) & (table.c.provider == self.provider), - ) - - is_cpe = cpe_or_package.lower().startswith("cpe:") - if is_cpe: - # try v6 simplified CPE format (e.g., "a:vendor:product:...") since grype-db stores CPEs this way + if cpe_or_package.lower().startswith("cpe:"): + cpe_key: tuple[str, str] = (vuln_id.lower(), cpe_or_package.lower()) + candidates = list(cpe_index.get(cpe_key, [])) + # also try v6 simplified CPE format since grype-db may store CPEs this way v6_cpe = cpe_to_v6_format(cpe_or_package) if v6_cpe: - query = query.where( - (table.c.full_cpe == cpe_or_package) | (table.c.full_cpe == v6_cpe), - ) - else: - query = query.where(table.c.full_cpe == cpe_or_package) + v6_key: tuple[str, str] = (vuln_id.lower(), v6_cpe.lower()) + candidates += cpe_index.get(v6_key, []) else: - normalized_pkg = normalize_package_name(cpe_or_package, ecosystem) - query = query.where( - (table.c.package_name == normalized_pkg) & (table.c.full_cpe == ""), - ) - if ecosystem: - query = query.where(table.c.ecosystem == ecosystem) + normalized = normalize_package_name(cpe_or_package, ecosystem) + pkg_key: tuple[str, str, str] = (vuln_id.lower(), normalized.lower(), (ecosystem or "").lower()) + candidates = pkg_index.get(pkg_key, []) if fix_version: - query = query.where(table.c.fix_version == fix_version) - - results = conn.execute(query).fetchall() + candidates = [c for c in candidates if c.fix_version and c.fix_version.lower() == fix_version.lower()] - if not results: - return [] - - return [ - FixDate( - vuln_id=row.vuln_id, - provider=row.provider, - package_name=row.package_name, - full_cpe=row.full_cpe, - ecosystem=row.ecosystem, - fix_version=row.fix_version, - first_observed_date=date.fromisoformat(row.first_observed_date), - resolution=row.resolution, - source=row.source, - run_id=row.run_id, - database_id=row.database_id, - updated_at=row.updated_at, - ) - for row in results - if row and row.first_observed_date - ] + return candidates def find( self, diff --git a/tests/unit/providers/nvd/test_overrides.py b/tests/unit/providers/nvd/test_overrides.py index 2cf5fde90..44052866f 100644 --- a/tests/unit/providers/nvd/test_overrides.py +++ b/tests/unit/providers/nvd/test_overrides.py @@ -35,7 +35,7 @@ def test_overrides_disabled(mock_requests, tmpdir): url="http://localhost:8080/failed", workspace=workspace.Workspace(tmpdir, "test", create=True), ) - subject.__filepaths_by_cve__ = {"CVE-2020-0000": '{"fail": true}'} + subject.__data_by_cve__ = {"CVE-2020-0000": {"fail": True}} # ensure requests.get is not called subject.download() @@ -59,3 +59,10 @@ def test_overrides_enabled(mock_requests, overrides_tar, tmpdir): assert subject.cve("CVE-2011-0022") is not None assert subject.cves() == ["CVE-2011-0022"] + + # verify the data is cached in memory — subsequent calls must not re-read files + assert subject.__data_by_cve__ is not None + assert "CVE-2011-0022" in subject.__data_by_cve__ + first_call_data = subject.cve("CVE-2011-0022") + second_call_data = subject.cve("CVE-2011-0022") + assert first_call_data is second_call_data # same object, no re-parse diff --git a/tests/unit/tool/test_grype_db_first_observed.py b/tests/unit/tool/test_grype_db_first_observed.py index ec9b04405..8ba969f61 100644 --- a/tests/unit/tool/test_grype_db_first_observed.py +++ b/tests/unit/tool/test_grype_db_first_observed.py @@ -10,7 +10,7 @@ from vunnel import workspace from vunnel.tool.fixdate.finder import Result -from vunnel.tool.fixdate.grype_db_first_observed import Store, cpe_to_v6_format +from vunnel.tool.fixdate.grype_db_first_observed import Store, cpe_to_v6_format, cpe_to_v6_format class DatabaseFixture: @@ -797,29 +797,6 @@ def test_vuln_id_case_insensitive_matching(self, tmpdir, helpers): ) assert len(results) == expected_count, f"Case insensitive vuln_id test failed for '{vuln_id}': got {len(results)}, expected {expected_count}" - def test_provider_case_insensitive_matching(self, tmpdir, helpers): - """test that provider matching is case insensitive""" - ws = workspace.Workspace(tmpdir, "Test-DB", create=True) # mixed case provider - store = Store(ws) - - # create test database - mixed_case_provider_data = [ - ("CVE-2023-0002", "Test-DB", "curl", "", "debian:11", - "7.68.0-1ubuntu2.15", "2023-02-20", "fixed", "grype-db", None, 1, "2023-02-20T00:00:00"), - ] - db = DatabaseFixture(store.db_path) - db.insert_custom_data(store.db_path, mixed_case_provider_data, vulnerability_count=1) - store._downloaded = True - - # test that queries work regardless of how provider was stored - results = store.find( - vuln_id="CVE-2023-0002", - cpe_or_package="curl", - fix_version=None, - ecosystem="debian:11", - ) - assert len(results) == 1, f"Provider case insensitive test failed: got {len(results)}, expected 1" - def test_ecosystem_case_insensitive_matching(self, tmpdir, helpers): """test that ecosystem matching is case insensitive""" ws = workspace.Workspace(tmpdir, "test-db", create=True) @@ -1216,6 +1193,55 @@ def test_resolve_image_ref_fallback(self, mock_oras_client_constructor, tmpdir): # verify both tags were tried assert mock_digest_client.do_request.call_count == 2 + def test_get_uses_in_memory_index(self, tmpdir): + """verify that get() builds the index once and subsequent calls do not re-query SQLite""" + ws = workspace.Workspace(tmpdir, "test-db", create=True) + store = Store(ws) + + db = DatabaseFixture(store.db_path) + db.insert_standard_data(store.db_path) + store._downloaded = True + + # index must not exist before first get() + assert store._cpe_index is None + assert store._pkg_index is None + + build_index_calls = [] + original_build_index = store._build_index + + def tracking_build_index(): + build_index_calls.append(1) + original_build_index() + + store._build_index = tracking_build_index + + # first call builds the index + results = store.get( + vuln_id="CVE-2023-0001", + cpe_or_package="cpe:2.3:a:apache:httpd:2.4.41:*:*:*:*:*:*:*", + fix_version="2.4.42", + ) + assert len(results) == 1 + assert len(build_index_calls) == 1 + + # index is now populated + assert store._cpe_index is not None + assert store._pkg_index is not None + + # subsequent calls do NOT rebuild the index + store.get( + vuln_id="CVE-2023-0001", + cpe_or_package="cpe:2.3:a:apache:httpd:2.4.41:*:*:*:*:*:*:*", + fix_version="2.4.42", + ) + store.get( + vuln_id="CVE-2023-0002", + cpe_or_package="curl", + fix_version=None, + ecosystem="debian:11", + ) + assert len(build_index_calls) == 1 # still only one build + class TestCpeToV6Format: """Tests for CPE 2.3 to v6 format conversion."""