11import json
2- from typing import Iterable
3- from typing import Optional
42
3+ import dateparser
54import requests
65from packageurl import PackageURL
76from univers .version_range import RANGE_CLASS_BY_SCHEMES
87
9- from vulnerabilities .importer import AdvisoryData
108from vulnerabilities .pipelines import VulnerableCodeBaseImporterPipelineV2
9+ from vulnerabilities .pipes .osv_v2 import parse_advisory_data_v3
10+ from vulnerabilities .utils import fetch_response
11+
12+ ECOSYSTEM_BY_PURL_TYPE = {
13+ "pypi" : "PyPI" ,
14+ "npm" : "npm" ,
15+ "maven" : "Maven" ,
16+ "composer" : "Packagist" ,
17+ "hex" : "Hex" ,
18+ "gem" : "RubyGems" ,
19+ "nuget" : "NuGet" ,
20+ "cargo" : "crates.io" ,
21+ }
1122
1223
1324class GithubOSVLiveImporterPipeline (VulnerableCodeBaseImporterPipelineV2 ):
@@ -26,11 +37,12 @@ class GithubOSVLiveImporterPipeline(VulnerableCodeBaseImporterPipelineV2):
2637 def steps (cls ):
2738 return (
2839 cls .get_purl_inputs ,
40+ cls .get_osv_advisories_urls ,
2941 cls .collect_and_store_advisories ,
3042 )
3143
3244 def get_purl_inputs (self ):
33- purl = self .inputs [ "purl" ]
45+ purl = self .inputs . get ( "purl" )
3446 if not purl :
3547 raise ValueError ("PURL is required for GithubOSVLiveImporterPipeline" )
3648
@@ -51,106 +63,94 @@ def get_purl_inputs(self):
5163 self .purl = purl
5264
5365 def advisories_count (self ):
54- self .advisories = fetch_github_osv_advisories_for_purl (self .purl )
55- return len (self .advisories )
56-
57- def collect_advisories (self ) -> Iterable [AdvisoryData ]:
58- from vulnerabilities .importers .osv import parse_advisory_data_v2
59-
60- supported_ecosystems = [
61- "pypi" ,
62- "npm" ,
63- "maven" ,
64- # "golang",
65- "composer" ,
66- "hex" ,
67- "gem" ,
68- "nuget" ,
69- "cargo" ,
70- ]
71-
72- input_version = self .purl .version
73- vrc = RANGE_CLASS_BY_SCHEMES [self .purl .type ]
74- version_obj = vrc .version_class (input_version )
75-
76- for adv in self .advisories :
77- adv_id = adv .get ("id" )
78- advisory_url = build_github_repo_advisory_url (adv , adv_id )
79-
80- advisory = parse_advisory_data_v2 (
81- raw_data = adv ,
82- supported_ecosystems = supported_ecosystems ,
66+ return len (self .advisory_urls )
67+
68+ def collect_advisories (self ):
69+ """
70+ Fetch and parse advisory data from GitHub Advisory Database URLs, Filters the packages to
71+ ensure they match the exact type, name, and namespace of the target PURL, and ensure the target
72+ version falls within the affected or fixed version ranges and yield these related advisories
73+ """
74+ version_range = RANGE_CLASS_BY_SCHEMES .get (self .purl .type )
75+ version_obj = version_range .version_class (self .purl .version )
76+ for advisory_url in self .advisory_urls :
77+ response = fetch_response (advisory_url )
78+ raw_data = json .loads (response .content )
79+
80+ advisory = parse_advisory_data_v3 (
81+ raw_data = raw_data ,
82+ supported_ecosystems = self .supported_types ,
8383 advisory_url = advisory_url ,
84- advisory_text = json .dumps (adv , ensure_ascii = False ),
84+ advisory_text = json .dumps (raw_data , ensure_ascii = False ),
8585 )
8686
87- advisory . affected_packages = [
88- ap
89- for ap in advisory .affected_packages
90- if ap .package
91- and ap .package .type == self .purl .type
92- and ap .package .name == self .purl .name
93- and (ap .package .namespace or "" ) == (self .purl .namespace or "" )
87+ filtered_affected_packages = [
88+ affected_package
89+ for affected_package in advisory .affected_packages
90+ if affected_package .package
91+ and affected_package .package .type == self .purl .type
92+ and affected_package .package .name == self .purl .name
93+ and (affected_package .package .namespace or "" ) == (self .purl .namespace or "" )
9494 ]
9595
96- if not advisory . affected_packages :
96+ if not filtered_affected_packages :
9797 continue
9898
99- if any (
100- ap .affected_version_range and version_obj in ap .affected_version_range
101- for ap in advisory .affected_packages
102- ):
103- yield advisory
104-
105-
106- ECOSYSTEM_BY_PURL_TYPE = {
107- "pypi" : "PyPI" ,
108- "npm" : "npm" ,
109- "maven" : "Maven" ,
110- "composer" : "Packagist" ,
111- "hex" : "Hex" ,
112- "gem" : "RubyGems" ,
113- "nuget" : "NuGet" ,
114- "cargo" : "crates.io" ,
115- }
116-
117- # Map purl.type to directory names used in the advisory-database repository
118- REPO_DIR_BY_PURL_TYPE = {
119- "pypi" : "pypi" ,
120- "npm" : "npm" ,
121- "maven" : "maven" ,
122- "composer" : "composer" ,
123- "hex" : "hex" ,
124- "gem" : "rubygems" ,
125- "nuget" : "nuget" ,
126- "cargo" : "crates.io" ,
127- }
99+ for affected_package in filtered_affected_packages :
100+ if (
101+ affected_package .affected_version_range
102+ and version_obj in affected_package .affected_version_range
103+ ) or (
104+ affected_package .fixed_version_range
105+ and version_obj in affected_package .fixed_version_range
106+ ):
107+ yield advisory
108+
109+ def get_osv_advisories_urls (self ):
110+ """
111+ Fetch a list of OSV advisory dicts from the OSV API for a given PURL,
112+ filtered to only GitHub advisories (GHSA-*) and return the Advisories URLS.
113+ """
114+ ecosystem = ECOSYSTEM_BY_PURL_TYPE .get (self .purl .type )
115+ if not ecosystem :
116+ return []
128117
118+ # Query by package to get all advisories for that package; we filter GHSA below.
119+ body = {"package" : {"ecosystem" : ecosystem , "name" : _osv_package_name (self .purl )}}
120+ resp = requests .post ("https://api.osv.dev/v1/query" , json = body , timeout = 30 )
121+ if resp .status_code != 200 :
122+ return []
129123
130- def build_github_repo_advisory_url (adv : dict , adv_id : Optional [str ]) -> str :
124+ data = resp .json () or {}
125+ advisories = data .get ("vulns" ) or []
126+ self .advisory_urls = set ()
127+ for advisory in advisories :
128+ adv_id = advisory .get ("id" ) or ""
129+ aliases = advisory .get ("aliases" ) or []
130+ advisory_ids = [adv_id ] + aliases
131+ for ghsa_id in advisory_ids :
132+ if not ghsa_id .startswith ("GHSA-" ):
133+ continue
134+
135+ published_date = advisory .get ("published" )
136+ advisory_url = build_github_repo_advisory_url (
137+ published_date , ghsa_id , logger = self .log
138+ )
139+ self .advisory_urls .add (advisory_url )
140+
141+
142+ def build_github_repo_advisory_url (published_date , advisory_id , logger ):
131143 """
132144 Return the advisory JSON URL in the GitHub advisory-database repo, using the GHSA path:
133145 advisories/github-reviewed/YYYY/MM/GHSA-ID/GHSA-ID.json
134146 """
135- base = "https://github.com/github/advisory-database/blob/main/advisories/github-reviewed"
136- if not adv_id :
137- return f"{ base } /"
138-
139- date_str = adv .get ("published" ) or adv .get ("modified" )
147+ if not published_date :
148+ logger (f"Cannot build URL for { advisory_id } : Missing both published and modified dates" )
140149
141- if date_str :
142- from datetime import datetime
143-
144- try :
145- dt = datetime .fromisoformat (date_str .replace ("Z" , "+00:00" ))
146- year = dt .strftime ("%Y" )
147- month = dt .strftime ("%m" )
148- return f"{ base } /{ year } /{ month } /{ adv_id } /{ adv_id } .json"
149- except Exception :
150- pass
151-
152- # Fallback to the base directory if no parseable date is present
153- return f"{ base } /"
150+ parsed_date = dateparser .parse (date_string = published_date )
151+ year = parsed_date .strftime ("%Y" )
152+ month = parsed_date .strftime ("%m" )
153+ return f"https://raw.githubusercontent.com/github/advisory-database/refs/heads/main/advisories/github-reviewed/{ year } /{ month } /{ advisory_id } /{ advisory_id } .json"
154154
155155
156156def _osv_package_name (purl : PackageURL ) -> str :
@@ -160,27 +160,3 @@ def _osv_package_name(purl: PackageURL) -> str:
160160 if purl .namespace :
161161 return f"{ purl .namespace } /{ purl .name } "
162162 return purl .name
163-
164-
165- def fetch_github_osv_advisories_for_purl (purl : PackageURL ):
166- """
167- Return a list of OSV advisory dicts from the OSV API for a given PURL,
168- filtered to only GitHub advisories (GHSA-*).
169- """
170- ecosystem = ECOSYSTEM_BY_PURL_TYPE .get (purl .type )
171- if not ecosystem :
172- return []
173-
174- pkg = {"ecosystem" : ecosystem , "name" : _osv_package_name (purl )}
175- # Query by package to get all advisories for that package; we filter GHSA below.
176- body = {"package" : pkg }
177- try :
178- resp = requests .post ("https://api.osv.dev/v1/query" , json = body , timeout = 30 )
179- if resp .status_code != 200 :
180- return []
181- data = resp .json () or {}
182- vulns = data .get ("vulns" ) or []
183- # Keep only GHSA advisories which correspond to GitHub Advisory Database
184- return [v for v in vulns if isinstance (v .get ("id" ), str ) and v ["id" ].startswith ("GHSA-" )]
185- except Exception :
186- return []
0 commit comments