-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathtest_cvesync.py
More file actions
320 lines (274 loc) · 11.5 KB
/
test_cvesync.py
File metadata and controls
320 lines (274 loc) · 11.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
"""Test the cve_sync module."""
# Standard Python Libraries
import asyncio
import json
import os
from unittest.mock import Mock, patch
# Third-Party Libraries
from aiohttp import ClientResponseError, ClientSession
from pymongo import AsyncMongoClient
import pytest
# cisagov Libraries
from cyhy_cvesync import DEFAULT_CVE_URL_PATTERN, __version__
from cyhy_cvesync.cve_sync import fetch_cve_data, process_cve_json, process_urls
# define sources of version strings
RELEASE_TAG = os.getenv("RELEASE_TAG")
PROJECT_VERSION = __version__
@pytest.mark.skipif(
RELEASE_TAG in [None, ""], reason="this is not a release (RELEASE_TAG not set)"
)
def test_release_version():
"""Verify that release tag version agrees with the module version."""
assert (
RELEASE_TAG == f"v{PROJECT_VERSION}"
), "RELEASE_TAG does not match the project version"
async def test_connection_motor(db_uri, db_name):
"""Test the database connection."""
client = AsyncMongoClient(db_uri)
db = client[db_name]
server_info = await db.command("ping")
assert server_info["ok"] == 1.0, "Direct database ping failed"
async def test_process_cve_json_invalid_cve_data_type():
"""Test processing invalid CVE JSON data."""
with pytest.raises(ValueError, match="JSON does not look like valid CVE data."):
await process_cve_json({"CVE_data_type": "INVALID", "CVE_Items": []})
async def test_process_cve_json_non_dict():
"""Test processing non-dictionary CVE data."""
with pytest.raises(ValueError, match="CVE data must be a dictionary"):
await process_cve_json("not a dict")
async def test_process_cve_json_invalid_cve_items():
"""Test processing CVE data with invalid CVE_Items."""
with pytest.raises(ValueError, match="CVE_Items must be a list"):
await process_cve_json({"CVE_data_type": "CVE", "CVE_Items": "not a list"})
async def test_process_cve_json_invalid_cvss_score():
"""Test processing CVE with invalid CVSS score."""
cve_json_invalid_score = {
"CVE_data_type": "CVE",
"CVE_Items": [
{
"cve": {"CVE_data_meta": {"ID": "CVE-2023-1234"}},
"impact": {
"baseMetricV3": {"cvssV3": {"baseScore": "not_a_number", "version": "3.1"}}
},
}
],
}
# Should not raise exception but skip the invalid CVE
created, updated = await process_cve_json(cve_json_invalid_score)
assert created == 0
assert updated == 0
async def test_process_cve_json_cvss_score_out_of_range():
"""Test processing CVE with CVSS score out of range."""
cve_json_out_of_range = {
"CVE_data_type": "CVE",
"CVE_Items": [
{
"cve": {"CVE_data_meta": {"ID": "CVE-2023-1234"}},
"impact": {
"baseMetricV3": {"cvssV3": {"baseScore": 15.0, "version": "3.1"}} # Invalid: > 10.0
},
}
],
}
# Should not raise exception but skip the invalid CVE
created, updated = await process_cve_json(cve_json_out_of_range)
assert created == 0
assert updated == 0
async def test_process_cve_json_malformed_1():
"""Test processing malformed CVE JSON data."""
with pytest.raises(ValueError, match="JSON does not look like valid CVE data."):
await process_cve_json(
{
"CVE_data_type": "CVE",
"CVE_Items": [{"cve": {"CVE_data_meta": {"INVALID": "FOOBAR"}}}],
}
)
async def test_process_cve_json_malformed_2():
"""Test processing malformed CVE JSON data."""
with pytest.raises(ValueError, match="JSON does not look like valid CVE data."):
await process_cve_json(
{
"CVE_data_type": "CVE",
"CVE_Items": [
{
"cve": {"CVE_data_meta": {"ID": "TEST"}},
"impact": {"baseMetricV3": {"cvssV3": {}}},
}
],
}
)
async def test_process_cve_json_empty_id():
"""Test processing CVE JSON data with an empty CVE ID."""
cve_json_empty_id = {
"CVE_data_type": "CVE",
"CVE_Items": [
{
"cve": {"CVE_data_meta": {"ID": ""}},
"impact": {
"baseMetricV3": {"cvssV3": {"baseScore": 9.8, "version": "3.1"}}
},
}
],
}
with pytest.raises(ValueError, match="CVE ID is empty."):
await process_cve_json(cve_json_empty_id)
async def test_fetch_cve_data_invalid_url_scheme():
"""Test fetching CVE data with an invalid URL scheme."""
cve_json_url = "ftp://example.com/cve.json"
with pytest.raises(ValueError, match="Invalid URL scheme in CVE JSON URL: ftp"):
async with ClientSession() as session:
await fetch_cve_data(session, cve_json_url, gzipped=False)
@patch("aiohttp.client.ClientSession.get")
async def test_fetch_cve_data_json_decode_error(mock_get):
"""Test fetching CVE data with a JSON decode error."""
mock_response = Mock()
mock_response.status = 200
mock_response.read.return_value = asyncio.Future()
mock_response.read.return_value.set_result(b"Invalid JSON")
mock_get.return_value.__aenter__.return_value = mock_response
with pytest.raises(json.JSONDecodeError):
async with ClientSession() as session:
await fetch_cve_data(session, "https://example.com/cve.json", gzipped=False)
@patch("aiohttp.client.ClientSession.get")
async def test_fetch_cve_data_non_200_response(mock_urlopen):
"""Test fetching CVE data with a non-200 HTTP response."""
mock_response = Mock()
mock_response.status = 500
mock_urlopen.return_value.__aenter__.return_value = mock_response
with pytest.raises(ClientResponseError, match="Failed to retrieve CVE data."):
async with ClientSession() as session:
await fetch_cve_data(session, "https://example.com/cve.json", gzipped=False)
@patch("aiohttp.client.ClientSession.get")
async def test_fetch_cve_data_empty_response(mock_urlopen):
"""Test fetching CVE data with an empty HTTP response."""
mock_response = Mock()
mock_response.status = 200
mock_response.read.return_value = asyncio.Future()
mock_response.read.return_value.set_result(b"")
mock_urlopen.return_value.__aenter__.return_value = mock_response
with pytest.raises(ValueError, match="Empty response received from the server."):
async with ClientSession() as session:
await fetch_cve_data(session, "https://example.com/cve.json", gzipped=False)
async def test_fetch_real_cve_data():
"""Test fetching CVE data."""
cve_url = DEFAULT_CVE_URL_PATTERN.format(year=2024)
async with ClientSession() as session:
cve_json = await fetch_cve_data(session, cve_url, gzipped=True)
assert "CVE_Items" in cve_json, "Expected 'CVE_Items' in CVE data"
assert len(cve_json["CVE_Items"]) > 0, "Expected at least one CVE item in CVE data"
async def test_process_urls_create_cves():
"""Test processing URLs where new CVEs are created."""
cve_json_data = {
"CVE_data_type": "CVE",
"CVE_Items": [
{
"cve": {"CVE_data_meta": {"ID": "CVE-TEST-1"}},
"impact": {
"baseMetricV3": {"cvssV3": {"baseScore": 9.8, "version": "3.1"}}
},
},
{
"cve": {"CVE_data_meta": {"ID": "CVE-TEST-2"}},
"impact": {
"baseMetricV3": {"cvssV3": {"baseScore": 8.5, "version": "3.1"}}
},
},
{
"cve": {"CVE_data_meta": {"ID": "CVE-TEST-3"}},
"impact": {
"baseMetricV3": {"cvssV3": {"baseScore": 4.0, "version": "3.1"}}
},
},
],
}
with patch("cyhy_cvesync.cve_sync.fetch_cve_data", return_value=cve_json_data):
created, updated, deleted = await process_urls(
["https://example.com/cve.json"], cve_data_gzipped=False, concurrency=1
)
assert created == 3, "Expected 3 CVEs to be created"
assert updated == 0, "Expected no CVEs to be updated"
assert deleted == 0, "Expected no CVEs to be deleted"
async def test_process_urls_update_cves():
"""Test processing URLs where CVEs are updated."""
cve_json_data = {
"CVE_data_type": "CVE",
"CVE_Items": [
{
"cve": {"CVE_data_meta": {"ID": "CVE-TEST-1"}},
"impact": {
"baseMetricV3": {"cvssV3": {"baseScore": 9.1, "version": "3.1"}}
},
},
{
"cve": {"CVE_data_meta": {"ID": "CVE-TEST-2"}},
"impact": {
"baseMetricV3": {"cvssV3": {"baseScore": 8.5, "version": "3.1"}}
},
},
{
"cve": {"CVE_data_meta": {"ID": "CVE-TEST-3"}},
"impact": {
"baseMetricV3": {"cvssV3": {"baseScore": 7.2, "version": "3.1"}}
},
},
],
}
with patch("cyhy_cvesync.cve_sync.fetch_cve_data", return_value=cve_json_data):
created, updated, deleted = await process_urls(
["https://example.com/cve.json"], cve_data_gzipped=False, concurrency=1
)
assert created == 0, "Expected no CVEs to be created"
assert updated == 2, "Expected 2 CVEs to be updated"
assert deleted == 0, "Expected no CVEs to be deleted"
async def test_process_urls_delete_cves():
"""Test processing URLs where CVEs are deleted."""
cve_json_data = {
"CVE_data_type": "CVE",
"CVE_Items": [
{
"cve": {"CVE_data_meta": {"ID": "CVE-TEST-1"}},
"impact": {
"baseMetricV3": {"cvssV3": {"baseScore": 9.1, "version": "3.1"}}
},
},
{
"cve": {"CVE_data_meta": {"ID": "CVE-TEST-3"}},
"impact": {
"baseMetricV3": {"cvssV3": {"baseScore": 7.2, "version": "3.1"}}
},
},
],
}
with patch("cyhy_cvesync.cve_sync.fetch_cve_data", return_value=cve_json_data):
created, updated, deleted = await process_urls(
["https://example.com/cve.json"], cve_data_gzipped=False, concurrency=1
)
assert created == 0, "Expected no CVEs to be created"
assert updated == 0, "Expected no CVEs to be updated"
assert deleted == 1, "Expected 1 CVE to be deleted"
async def test_process_urls_create_update_delete_cves():
"""Test processing URLs where CVEs are created, updated, and deleted."""
cve_json_data = {
"CVE_data_type": "CVE",
"CVE_Items": [
{
"cve": {"CVE_data_meta": {"ID": "CVE-TEST-1"}},
"impact": {
"baseMetricV3": {"cvssV3": {"baseScore": 9.3, "version": "3.1"}}
},
},
{
"cve": {"CVE_data_meta": {"ID": "CVE-TEST-4"}},
"impact": {
"baseMetricV3": {"cvssV3": {"baseScore": 5.5, "version": "3.1"}}
},
},
],
}
with patch("cyhy_cvesync.cve_sync.fetch_cve_data", return_value=cve_json_data):
created, updated, deleted = await process_urls(
["https://example.com/cve.json"], cve_data_gzipped=False, concurrency=1
)
assert created == 1, "Expected 1 CVE to be created"
assert updated == 1, "Expected 1 CVE to be updated"
assert deleted == 1, "Expected 1 CVE to be deleted"