-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathtest_zip_scanner.py
More file actions
457 lines (378 loc) · 19.1 KB
/
test_zip_scanner.py
File metadata and controls
457 lines (378 loc) · 19.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
import os
import tempfile
import zipfile
from pathlib import Path
from modelaudit.scanners.base import IssueSeverity
from modelaudit.scanners.zip_scanner import ZipScanner
class TestZipScanner:
"""Test the ZIP scanner"""
def setup_method(self):
"""Set up test fixtures"""
self.scanner = ZipScanner()
def test_can_handle_zip_files(self):
"""Test that the scanner correctly identifies ZIP files"""
# Create a temporary zip file
with tempfile.NamedTemporaryFile(suffix=".zip", delete=False) as tmp:
with zipfile.ZipFile(tmp.name, "w") as z:
z.writestr("test.txt", "Hello World")
tmp_path = tmp.name
try:
assert ZipScanner.can_handle(tmp_path) is True
assert ZipScanner.can_handle("/path/to/file.txt") is False
assert ZipScanner.can_handle("/path/to/file.pkl") is False
finally:
os.unlink(tmp_path)
def test_symlink_outside_extraction_root(self):
"""Symlinks resolving outside the extraction root should be flagged."""
with tempfile.NamedTemporaryFile(suffix=".zip", delete=False) as tmp:
with zipfile.ZipFile(tmp.name, "w") as z:
import stat
info = zipfile.ZipInfo("link.txt")
info.create_system = 3
info.external_attr = (stat.S_IFLNK | 0o777) << 16
z.writestr(info, "../evil.txt")
tmp_path = tmp.name
try:
result = self.scanner.scan(tmp_path)
symlink_issues = [i for i in result.issues if "symlink" in i.message.lower()]
assert any("outside" in i.message.lower() for i in symlink_issues)
finally:
os.unlink(tmp_path)
def test_symlink_to_critical_path(self):
"""Symlinks targeting critical system paths should be flagged."""
with tempfile.NamedTemporaryFile(suffix=".zip", delete=False) as tmp:
with zipfile.ZipFile(tmp.name, "w") as z:
import stat
info = zipfile.ZipInfo("etc_passwd")
info.create_system = 3
info.external_attr = (stat.S_IFLNK | 0o777) << 16
z.writestr(info, "/etc/passwd")
tmp_path = tmp.name
try:
result = self.scanner.scan(tmp_path)
symlink_issues = [i for i in result.issues if "symlink" in i.message.lower()]
assert any("critical system" in i.message.lower() for i in symlink_issues)
finally:
os.unlink(tmp_path)
def test_zip_bytes_scanned_single_count(self):
"""Ensure bytes scanned equals the sum of embedded files once."""
with tempfile.NamedTemporaryFile(suffix=".zip", delete=False) as tmp:
with zipfile.ZipFile(tmp.name, "w") as z:
import pickle
data1 = pickle.dumps({"a": 1})
data2 = pickle.dumps({"b": 2})
z.writestr("one.pkl", data1)
z.writestr("two.pkl", data2)
tmp_path = tmp.name
try:
result = self.scanner.scan(tmp_path)
assert result.success is True
expected = len(data1) + len(data2)
assert result.bytes_scanned == expected
finally:
os.unlink(tmp_path)
def test_scan_simple_zip(self):
"""Test scanning a simple ZIP file with text files"""
with tempfile.NamedTemporaryFile(suffix=".zip", delete=False) as tmp:
with zipfile.ZipFile(tmp.name, "w") as z:
z.writestr("readme.txt", "This is a readme file")
z.writestr("data.json", '{"key": "value"}')
tmp_path = tmp.name
try:
result = self.scanner.scan(tmp_path)
assert result.success is True
assert result.bytes_scanned > 0
# May have some debug/info issues about unknown formats
error_issues = [i for i in result.issues if i.severity == IssueSeverity.CRITICAL]
assert len(error_issues) == 0
finally:
os.unlink(tmp_path)
def test_scan_zip_with_pickle(self):
"""Test scanning a ZIP file containing a pickle file"""
with tempfile.NamedTemporaryFile(suffix=".zip", delete=False) as tmp:
with zipfile.ZipFile(tmp.name, "w") as z:
# Create a simple pickle file
import pickle
pickle_data = pickle.dumps({"safe": "data"})
z.writestr("model.pkl", pickle_data)
tmp_path = tmp.name
try:
result = self.scanner.scan(tmp_path)
assert result.success is True
assert result.bytes_scanned > 0
# The pickle scanner was run on the embedded file
# Check that we scanned the pickle data
assert result.bytes_scanned >= len(pickle_data)
finally:
os.unlink(tmp_path)
def test_scan_nested_zip(self):
"""Test scanning nested ZIP files"""
# Create inner zip
with tempfile.NamedTemporaryFile(suffix=".zip", delete=False) as inner_tmp:
with zipfile.ZipFile(inner_tmp.name, "w") as inner_z:
inner_z.writestr("inner.txt", "Inner file content")
inner_path = inner_tmp.name
try:
# Create outer zip containing inner zip
with tempfile.NamedTemporaryFile(suffix=".zip", delete=False) as outer_tmp:
with zipfile.ZipFile(outer_tmp.name, "w") as outer_z:
outer_z.write(inner_path, "nested.zip")
outer_path = outer_tmp.name
result = self.scanner.scan(outer_path)
assert result.success is True
# Should have scanned the nested content
assert (
any("nested.zip" in str(issue.location) for issue in result.issues if hasattr(issue, "location"))
or result.bytes_scanned > 0
)
finally:
os.unlink(inner_path)
os.unlink(outer_path)
def test_directory_traversal_detection(self):
"""Test detection of directory traversal attempts in ZIP files"""
with tempfile.NamedTemporaryFile(suffix=".zip", delete=False) as tmp:
with zipfile.ZipFile(tmp.name, "w") as z:
# Create entries with directory traversal attempts
z.writestr("../../../etc/passwd", "malicious content")
z.writestr("/etc/passwd", "malicious content")
z.writestr("safe.txt", "safe content")
tmp_path = tmp.name
try:
result = self.scanner.scan(tmp_path)
assert result.success is True
# Should have detected directory traversal attempts
traversal_issues = [
i
for i in result.issues
if "path traversal" in i.message.lower() or "directory traversal" in i.message.lower()
]
assert len(traversal_issues) >= 2
# Check severity
for issue in traversal_issues:
assert issue.severity == IssueSeverity.CRITICAL
finally:
os.unlink(tmp_path)
def test_windows_traversal_detection(self):
"""Ensure Windows-style path traversal is caught"""
with tempfile.NamedTemporaryFile(suffix=".zip", delete=False) as tmp:
with zipfile.ZipFile(tmp.name, "w") as z:
z.writestr("..\\evil.txt", "malicious")
z.writestr("safe.txt", "ok")
tmp_path = tmp.name
try:
result = self.scanner.scan(tmp_path)
traversal_issues = [i for i in result.issues if "path traversal" in i.message.lower()]
assert len(traversal_issues) >= 1
for issue in traversal_issues:
assert issue.severity == IssueSeverity.CRITICAL
finally:
os.unlink(tmp_path)
def test_zip_bomb_detection(self):
"""Test detection of potential zip bombs (high compression ratio)"""
with tempfile.NamedTemporaryFile(suffix=".zip", delete=False) as tmp:
with zipfile.ZipFile(tmp.name, "w", compression=zipfile.ZIP_DEFLATED) as z:
# Create a highly compressible file (potential zip bomb indicator)
# Keep highly compressible but smaller to speed CI
large_content = "A" * 300000 # 300KB of repeated 'A's
z.writestr("suspicious.txt", large_content)
tmp_path = tmp.name
try:
result = self.scanner.scan(tmp_path)
assert result.success is True
# Should detect high compression ratio
compression_issues = [i for i in result.issues if "compression ratio" in i.message.lower()]
assert len(compression_issues) >= 1
finally:
os.unlink(tmp_path)
def test_max_depth_limit(self):
"""Test that maximum nesting depth is enforced"""
# Create deeply nested zips
current_path = None
paths_to_delete = []
try:
# Create 10 levels of nested zips
for i in range(10):
with tempfile.NamedTemporaryFile(suffix=".zip", delete=False) as tmp:
with zipfile.ZipFile(tmp.name, "w") as z:
if current_path:
z.write(current_path, f"level{i}.zip")
else:
z.writestr("deepest.txt", "Deep content")
paths_to_delete.append(tmp.name)
current_path = tmp.name
# Scan the outermost zip
assert current_path is not None # Should be set by the loop above
scanner = ZipScanner(config={"max_zip_depth": 3})
result = scanner.scan(current_path)
assert result.success is True
# Should have a warning about max depth
depth_issues = [i for i in result.issues if "depth" in i.message.lower()]
assert len(depth_issues) >= 1
finally:
for path in paths_to_delete:
if os.path.exists(path):
os.unlink(path)
def test_scan_zip_with_dangerous_pickle(self):
"""Test scanning a ZIP file containing a dangerous pickle"""
with tempfile.NamedTemporaryFile(suffix=".zip", delete=False) as tmp:
with zipfile.ZipFile(tmp.name, "w") as z:
# Create a pickle with suspicious content
import os as os_module
import pickle
class DangerousClass:
def __reduce__(self):
return (os_module.system, ("echo pwned",))
dangerous_obj = DangerousClass()
pickle_data = pickle.dumps(dangerous_obj)
z.writestr("dangerous.pkl", pickle_data)
tmp_path = tmp.name
try:
result = self.scanner.scan(tmp_path)
# The scan should complete even if there are errors in the pickle scanner
assert result.success is True
# Check that we at least tried to scan the pickle
assert result.bytes_scanned > 0
# May have error issues due to the bug in pickle scanner with string_stack
# or it may detect the dangerous content
# Either way, it should have scanned the file
finally:
os.unlink(tmp_path)
def test_scan_zip_with_proto0_pickle_disguised_as_text(self, tmp_path: Path) -> None:
"""Protocol 0 pickle in .txt entry should still be detected as pickle content."""
archive_path = tmp_path / "proto0_payload.zip"
with zipfile.ZipFile(archive_path, "w") as z:
z.writestr("payload.txt", b'cos\nsystem\n(S"echo pwned"\ntR.')
result = self.scanner.scan(str(archive_path))
assert result.success is True
assert result.has_errors is True
critical_messages = [
issue.message.lower() for issue in result.issues if issue.severity == IssueSeverity.CRITICAL
]
assert any("os.system" in msg or "posix.system" in msg for msg in critical_messages), (
f"Expected critical os/posix.system issue, got: {critical_messages}"
)
def test_scan_zip_with_prefixed_proto0_pickle_disguised_as_text(self, tmp_path: Path) -> None:
"""Protocol 0 pickles with MARK/LIST prefixes in .txt entries should be detected."""
archive_path = tmp_path / "proto0_prefixed_payload.zip"
with zipfile.ZipFile(archive_path, "w") as z:
z.writestr("payload.txt", b'(lp0\n0cos\nsystem\n(S"echo pwned"\ntR.')
result = self.scanner.scan(str(archive_path))
assert result.success is True
assert result.has_errors is True
critical_messages = [
issue.message.lower() for issue in result.issues if issue.severity == IssueSeverity.CRITICAL
]
assert any("os.system" in msg or "posix.system" in msg for msg in critical_messages), (
f"Expected critical os/posix.system issue, got: {critical_messages}"
)
def test_scan_npz_with_object_member_recurses_into_pickle(self, tmp_path: Path) -> None:
import numpy as np
class _ExecPayload:
def __reduce__(self):
return (exec, ("print('owned')",))
archive_path = tmp_path / "payload.npz"
np.savez(archive_path, safe=np.arange(3), payload=np.array([_ExecPayload()], dtype=object))
result = self.scanner.scan(str(archive_path))
assert result.success is True
failed_checks = [c for c in result.checks if c.status.value == "failed"]
assert any("cve-2019-6446" in (c.name + c.message).lower() for c in failed_checks)
assert any("exec" in i.message.lower() and i.details.get("zip_entry") == "payload.npy" for i in result.issues)
def test_scan_zip_with_plain_text_global_prefix_not_treated_as_pickle(self, tmp_path: Path) -> None:
"""Plain text entries that start with GLOBAL-like bytes should not trigger pickle parse warnings."""
archive_path = tmp_path / "plain_text_payload.zip"
with zipfile.ZipFile(archive_path, "w") as z:
z.writestr("notes.txt", b"c\nthis is plain text\nnot a pickle stream")
result = self.scanner.scan(str(archive_path))
assert result.success is True
noisy_pickle_warnings = [
issue for issue in result.issues if "incomplete or corrupted pickle file" in issue.message.lower()
]
assert not noisy_pickle_warnings, (
f"Expected no noisy pickle warning for plain text, got: {[i.message for i in noisy_pickle_warnings]}"
)
def test_scan_zip_with_proto0_pickle_with_single_comment_token_bypass_regression(self, tmp_path: Path) -> None:
"""Single comment-token prefix must not suppress proto0 payload detection."""
archive_path = tmp_path / "proto0_comment_prefixed_payload.zip"
payload = b"#" + b'cos\nsystem\n(S"echo pwned"\ntR.'
with zipfile.ZipFile(archive_path, "w") as z:
z.writestr("payload.txt", payload)
result = self.scanner.scan(str(archive_path))
assert result.success is True
assert result.has_errors is True
critical_messages = [
issue.message.lower() for issue in result.issues if issue.severity == IssueSeverity.CRITICAL
]
assert any("os.system" in msg or "posix.system" in msg for msg in critical_messages), (
f"Expected critical os/posix.system issue, got: {critical_messages}"
)
def test_scan_nonexistent_file(self):
"""Test scanning a file that doesn't exist"""
result = self.scanner.scan("/nonexistent/file.zip")
assert result.success is False
assert len(result.issues) > 0
assert any("does not exist" in issue.message for issue in result.issues)
def test_scan_invalid_zip(self):
"""Test scanning a file that's not a valid ZIP"""
with tempfile.NamedTemporaryFile(suffix=".zip", delete=False) as tmp:
tmp.write(b"This is not a zip file")
tmp_path = tmp.name
try:
result = self.scanner.scan(tmp_path)
assert result.success is False
assert len(result.issues) > 0
assert any("not a valid zip" in issue.message.lower() for issue in result.issues)
finally:
os.unlink(tmp_path)
def test_scan_empty_zip(self, tmp_path: Path) -> None:
"""An empty ZIP archive should scan successfully with no critical issues."""
archive_path = tmp_path / "empty.zip"
with zipfile.ZipFile(archive_path, "w"):
pass # empty archive
result = self.scanner.scan(str(archive_path))
assert result.success is True
assert result.bytes_scanned == 0
critical_issues = [i for i in result.issues if i.severity == IssueSeverity.CRITICAL]
assert len(critical_issues) == 0
def test_scan_zip_with_multiple_model_formats(self, tmp_path: Path) -> None:
"""ZIP containing multiple model-format files should scan all of them."""
import pickle
archive_path = tmp_path / "multi_format.zip"
pkl_data = pickle.dumps({"weights": [1, 2, 3]})
json_data = b'{"model_type": "linear", "version": "1.0"}'
pt_data = pickle.dumps({"state_dict": {}}) # .pt files are pickle-based
with zipfile.ZipFile(archive_path, "w") as z:
z.writestr("model.pkl", pkl_data)
z.writestr("config.json", json_data)
z.writestr("weights.pt", pt_data)
result = self.scanner.scan(str(archive_path))
assert result.success is True
# All three file payloads should have been scanned
assert result.bytes_scanned == len(pkl_data) + len(json_data) + len(pt_data)
contents_paths = {c.get("path", "") for c in result.metadata.get("contents", [])}
assert any("model.pkl" in p for p in contents_paths)
assert any("config.json" in p for p in contents_paths)
assert any("weights.pt" in p for p in contents_paths)
def test_scan_zip_with_very_long_filename(self, tmp_path: Path) -> None:
"""ZIP entries with very long filenames should be handled without crashing."""
import pickle
archive_path = tmp_path / "long_name.zip"
long_name = "a" * 200 + ".pkl" # 204-character filename
payload = pickle.dumps({"key": "value"})
with zipfile.ZipFile(archive_path, "w") as z:
z.writestr(long_name, payload)
result = self.scanner.scan(str(archive_path))
# Scan must not crash; success is expected for a benign payload
assert result.success is True
assert result.bytes_scanned == len(payload)
def test_scan_truncated_zip(self, tmp_path: Path) -> None:
"""A truncated (corrupted) ZIP file should fail gracefully."""
archive_path = tmp_path / "valid.zip"
with zipfile.ZipFile(archive_path, "w") as z:
z.writestr("file.txt", "some content")
full_data = archive_path.read_bytes()
truncated_path = tmp_path / "truncated.zip"
truncated_path.write_bytes(full_data[: len(full_data) // 2])
result = self.scanner.scan(str(truncated_path))
# A truncated archive is invalid — scan must not raise an unhandled exception
assert result.success is False
assert len(result.issues) > 0