Skip to content

Commit 15d338c

Browse files
authored
Merge pull request #2137 from aboutcode-org/debian_importer_v2
Migrate debian importer to v2
2 parents a535807 + ecc2622 commit 15d338c

File tree

4 files changed

+335
-4
lines changed

4 files changed

+335
-4
lines changed

vulnerabilities/importers/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
from vulnerabilities.pipelines.v2_importers import apache_tomcat_importer as apache_tomcat_v2
4848
from vulnerabilities.pipelines.v2_importers import archlinux_importer as archlinux_importer_v2
4949
from vulnerabilities.pipelines.v2_importers import curl_importer as curl_importer_v2
50+
from vulnerabilities.pipelines.v2_importers import debian_importer as debian_importer_v2
5051
from vulnerabilities.pipelines.v2_importers import (
5152
elixir_security_importer as elixir_security_importer_v2,
5253
)
@@ -103,6 +104,7 @@
103104
ruby_importer_v2.RubyImporterPipeline,
104105
epss_importer_v2.EPSSImporterPipeline,
105106
nginx_importer_v2.NginxImporterPipeline,
107+
debian_importer_v2.DebianImporterPipeline,
106108
mattermost_importer_v2.MattermostImporterPipeline,
107109
apache_tomcat_v2.ApacheTomcatImporterPipeline,
108110
nvd_importer.NVDImporterPipeline,
Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
#
2+
# Copyright (c) nexB Inc. and others. All rights reserved.
3+
# VulnerableCode is a trademark of nexB Inc.
4+
# SPDX-License-Identifier: Apache-2.0
5+
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
6+
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
7+
# See https://aboutcode.org for more information about nexB OSS projects.
8+
#
9+
10+
import re
11+
from typing import Any
12+
from typing import Iterable
13+
from typing import Mapping
14+
15+
from packageurl import PackageURL
16+
from univers.version_range import DebianVersionRange
17+
18+
from vulnerabilities.importer import AdvisoryData
19+
from vulnerabilities.importer import AffectedPackageV2
20+
from vulnerabilities.importer import ReferenceV2
21+
from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipelineV2
22+
from vulnerabilities.utils import create_weaknesses_list
23+
from vulnerabilities.utils import dedupe
24+
from vulnerabilities.utils import fetch_response
25+
from vulnerabilities.utils import get_item
26+
27+
28+
class DebianImporterPipeline(VulnerableCodeBaseImporterPipelineV2):
29+
"""Debian Importer Pipeline"""
30+
31+
pipeline_id = "debian_importer_v2"
32+
spdx_license_expression = "LicenseRef-scancode-other-permissive"
33+
license_url = "https://www.debian.org/license"
34+
notice = """
35+
From: Tushar Goel <tgoel@nexb.com>
36+
Date: Thu, May 12, 2022 at 11:42 PM +00:00
37+
Subject: Usage of Debian Security Data in VulnerableCode
38+
To: <team@security.debian.org>
39+
40+
Hey,
41+
42+
We would like to integrate the debian security data in vulnerablecode
43+
[1][2] which is a FOSS db of FOSS vulnerability data. We were not able
44+
to know under which license the debian security data comes. We would
45+
be grateful to have your acknowledgement over usage of the debian
46+
security data in vulnerablecode and have some kind of licensing
47+
declaration from your side.
48+
49+
[1] - https://github.com/nexB/vulnerablecode
50+
[2] - https://github.com/nexB/vulnerablecode/pull/723
51+
52+
Regards,
53+
54+
From: Moritz Mühlenhoff <jmm@inutil.org>
55+
Date: Wed, May 17, 2022, 19:12 PM +00:00
56+
Subject: Re: Usage of Debian Security Data in VulnerableCode
57+
To: Tushar Goel <tgoel@nexb.com>
58+
Cc: <team@security.debian.org>
59+
60+
61+
Am Thu, May 12, 2022 at 05:12:48PM +0530 schrieb Tushar Goel:
62+
> Hey,
63+
>
64+
> We would like to integrate the debian security data in vulnerablecode
65+
> [1][2] which is a FOSS db of FOSS vulnerability data. We were not able
66+
> to know under which license the debian security data comes. We would
67+
> be grateful to have your acknowledgement over usage of the debian
68+
> security data in vulnerablecode and have some kind of licensing
69+
> declaration from your side.
70+
71+
We don't have a specific license, but you have our endorsemen to
72+
reuse the data by all means :-)
73+
74+
Cheers,
75+
Moritz
76+
"""
77+
78+
api_url = "https://security-tracker.debian.org/tracker/data/json"
79+
response = None
80+
81+
@classmethod
82+
def steps(cls):
83+
return (cls.collect_and_store_advisories,)
84+
85+
def get_response(self):
86+
try:
87+
response = fetch_response(self.api_url)
88+
if response:
89+
return response.json()
90+
return {}
91+
except Exception as e:
92+
self.log(f"Error fetching data from {self.api_url!r}: {e}")
93+
return {}
94+
95+
def advisories_count(self) -> int:
96+
adv_count = 0
97+
if not self.response:
98+
self.response = self.get_response()
99+
for pkg in self.response:
100+
recs = len(self.response[pkg])
101+
adv_count += recs
102+
return adv_count
103+
104+
def collect_advisories(self) -> Iterable[AdvisoryData]:
105+
if not self.response:
106+
self.response = self.get_response()
107+
for pkg_name, records in self.response.items():
108+
yield from self.parse(pkg_name, records)
109+
110+
def parse(self, pkg_name: str, records: Mapping[str, Any]) -> Iterable[AdvisoryData]:
111+
112+
for record_identifier, record in records.items():
113+
affected_versions = []
114+
fixed_versions = []
115+
116+
releases = record["releases"].items()
117+
for release_name, release_record in releases:
118+
version = get_item(release_record, "repositories", release_name)
119+
120+
if not version:
121+
self.log(
122+
f"Version not found for {release_name} in {record} in package {pkg_name}"
123+
)
124+
continue
125+
126+
purl = PackageURL(
127+
name=pkg_name,
128+
type="deb",
129+
namespace="debian",
130+
qualifiers={"distro": release_name},
131+
)
132+
133+
if release_record.get("status", "") == "resolved":
134+
fixed_versions.append(version)
135+
else:
136+
affected_versions.append(version)
137+
138+
if release_record.get("fixed_version"):
139+
fixed_versions.append(release_record["fixed_version"])
140+
141+
references = []
142+
debianbug = record.get("debianbug")
143+
if debianbug:
144+
bug_url = f"https://bugs.debian.org/cgi-bin/bugreport.cgi?bug={debianbug}"
145+
references.append(ReferenceV2(url=bug_url, reference_id=str(debianbug)))
146+
affected_versions = dedupe(affected_versions)
147+
fixed_versions = dedupe(fixed_versions)
148+
if affected_versions:
149+
affected_version_range = DebianVersionRange.from_versions(affected_versions)
150+
else:
151+
affected_version_range = None
152+
affected_packages = []
153+
for fixed_version in fixed_versions:
154+
affected_packages.append(
155+
AffectedPackageV2(
156+
package=purl,
157+
affected_version_range=affected_version_range,
158+
fixed_version_range=DebianVersionRange.from_versions([fixed_version]),
159+
)
160+
)
161+
weaknesses = get_cwe_from_debian_advisory(record)
162+
163+
yield AdvisoryData(
164+
advisory_id=f"{pkg_name}/{record_identifier}",
165+
aliases=[record_identifier],
166+
summary=record.get("description", ""),
167+
affected_packages=affected_packages,
168+
references=references,
169+
weaknesses=weaknesses,
170+
url=f"https://security-tracker.debian.org/tracker/{record_identifier}",
171+
)
172+
173+
174+
def get_cwe_from_debian_advisory(record):
175+
"""
176+
Extracts CWE ID strings from the given raw_data and returns a list of CWE IDs.
177+
178+
>>> get_cwe_from_debian_advisory({"description":"PEAR HTML_QuickForm version 3.2.14 contains an eval injection (CWE-95) vulnerability in HTML_QuickForm's getSubmitValue method, HTML_QuickForm's validate method, HTML_QuickForm_hierselect's _setOptions method, HTML_QuickForm_element's _findValue method, HTML_QuickForm_element's _prepareValue method. that can result in Possible information disclosure, possible impact on data integrity and execution of arbitrary code. This attack appear to be exploitable via A specially crafted query string could be utilised, e.g. http://www.example.com/admin/add_practice_type_id[1]=fubar%27])%20OR%20die(%27OOK!%27);%20//&mode=live. This vulnerability appears to have been fixed in 3.2.15."})
179+
[95]
180+
>>> get_cwe_from_debian_advisory({"description":"There is no WEAKNESS DATA"})
181+
[]
182+
"""
183+
description = record.get("description") or ""
184+
pattern = r"CWE-\d+"
185+
cwe_strings = re.findall(pattern, description)
186+
weaknesses = create_weaknesses_list(cwe_strings)
187+
return weaknesses
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
#
2+
# Copyright (c) nexB Inc. and others. All rights reserved.
3+
# VulnerableCode is a trademark of nexB Inc.
4+
# SPDX-License-Identifier: Apache-2.0
5+
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
6+
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
7+
# See https://aboutcode.org for more information about nexB OSS projects.
8+
#
9+
10+
from unittest.mock import MagicMock
11+
from unittest.mock import patch
12+
13+
import pytest
14+
from packageurl import PackageURL
15+
from univers.version_range import DebianVersionRange
16+
17+
from vulnerabilities.importer import AdvisoryData
18+
from vulnerabilities.importer import AffectedPackageV2
19+
from vulnerabilities.importer import ReferenceV2
20+
from vulnerabilities.pipelines.v2_importers.debian_importer import DebianImporterPipeline
21+
from vulnerabilities.pipelines.v2_importers.debian_importer import get_cwe_from_debian_advisory
22+
23+
24+
@pytest.fixture
25+
def importer():
26+
return DebianImporterPipeline()
27+
28+
29+
@pytest.fixture
30+
def sample_response():
31+
return {
32+
"openssl": {
33+
"CVE-2023-1234": {
34+
"description": "Some vulnerability description (CWE-79)",
35+
"debianbug": 123456,
36+
"releases": {
37+
"bullseye": {
38+
"status": "resolved",
39+
"repositories": {"bullseye": "1.1.1k-1"},
40+
"fixed_version": "1.1.1k-2",
41+
},
42+
"bookworm": {
43+
"status": "open",
44+
"repositories": {"bookworm": "1.1.1l-1"},
45+
},
46+
},
47+
}
48+
}
49+
}
50+
51+
52+
def test_get_cwe_from_debian_advisory_with_cwe():
53+
record = {"description": "This issue relates to improper input validation (CWE-20)."}
54+
55+
weaknesses = get_cwe_from_debian_advisory(record)
56+
57+
assert len(weaknesses) == 1
58+
assert weaknesses[0] == 20
59+
60+
61+
def test_get_cwe_from_debian_advisory_without_cwe():
62+
record = {"description": "No weakness mentioned here."}
63+
64+
weaknesses = get_cwe_from_debian_advisory(record)
65+
66+
assert weaknesses == []
67+
68+
69+
@patch("vulnerabilities.pipelines.v2_importers.debian_importer.fetch_response")
70+
def test_get_response_success(mock_fetch, importer, sample_response):
71+
mock_resp = MagicMock()
72+
mock_resp.json.return_value = sample_response
73+
mock_fetch.return_value = mock_resp
74+
75+
response = importer.get_response()
76+
77+
assert response == sample_response
78+
mock_fetch.assert_called_once_with(importer.api_url)
79+
80+
81+
@patch("vulnerabilities.pipelines.v2_importers.debian_importer.fetch_response")
82+
def test_get_response_failure(mock_fetch, importer):
83+
mock_fetch.side_effect = Exception("network error")
84+
85+
response = importer.get_response()
86+
87+
assert response == {}
88+
89+
90+
def test_advisories_count(importer, sample_response):
91+
importer.response = sample_response
92+
93+
count = importer.advisories_count()
94+
95+
assert count == 1
96+
97+
98+
def test_collect_advisories(importer, sample_response):
99+
importer.response = sample_response
100+
101+
advisories = list(importer.collect_advisories())
102+
103+
assert len(advisories) == 1
104+
advisory = advisories[0]
105+
106+
assert isinstance(advisory, AdvisoryData)
107+
assert advisory.advisory_id == "openssl/CVE-2023-1234"
108+
assert advisory.summary.startswith("Some vulnerability")
109+
110+
111+
def test_affected_packages_generation(importer, sample_response):
112+
importer.response = sample_response
113+
114+
advisory = next(importer.collect_advisories())
115+
affected_packages = advisory.affected_packages
116+
117+
assert len(affected_packages) == 2
118+
119+
for pkg in affected_packages:
120+
assert isinstance(pkg, AffectedPackageV2)
121+
assert isinstance(pkg.package, PackageURL)
122+
assert isinstance(pkg.fixed_version_range, DebianVersionRange)
123+
124+
125+
def test_debian_bug_reference(importer, sample_response):
126+
importer.response = sample_response
127+
128+
advisory = next(importer.collect_advisories())
129+
references = advisory.references
130+
131+
assert len(references) == 1
132+
ref = references[0]
133+
134+
assert isinstance(ref, ReferenceV2)
135+
assert ref.reference_id == "123456"
136+
assert "bugs.debian.org" in ref.url

vulnerabilities/utils.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -389,10 +389,16 @@ def fetch_response(url):
389389
"""
390390
Fetch and return `response` from the `url`
391391
"""
392-
response = requests.get(url)
393-
if response.status_code == HTTPStatus.OK:
394-
return response
395-
raise Exception(f"Failed to fetch data from {url!r} with status code: {response.status_code!r}")
392+
try:
393+
response = requests.get(url)
394+
if response.status_code == HTTPStatus.OK:
395+
return response
396+
raise Exception(
397+
f"Failed to fetch data from {url!r} with status code: {response.status_code!r}"
398+
)
399+
except Exception as e:
400+
logger.error(f"Error fetching data from {url!r}: {e}")
401+
return None
396402

397403

398404
# This should be a method on PackageURL

0 commit comments

Comments
 (0)