Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Fixed

- **security:** treat legacy `httplib` pickle globals the same as `http.client`, including import-only and `REDUCE` findings in standalone and archived payloads
- **security:** harden TensorFlow weight extraction limits to bound actual tensor payload materialization, including malformed `tensor_content` and string-backed tensors, and continue scanning past oversized `Const` nodes
- **security:** stream TAR members to temp files under size limits instead of buffering whole entries in memory during scan
- **security:** inspect TensorFlow SavedModel function definitions when scanning for dangerous ops and protobuf string abuse, with function-aware finding locations
Expand Down
1 change: 1 addition & 0 deletions modelaudit/scanners/pickle_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,7 @@ def _compute_pickle_length(path: str) -> int:
"urllib",
"urllib2",
"http",
"httplib",
"ftplib",
"telnetlib",
"pty",
Expand Down
88 changes: 88 additions & 0 deletions tests/scanners/test_pickle_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -951,6 +951,58 @@ def test_smtplib_blocked(self) -> None:
f"Expected CRITICAL smtplib issue, got: {[i.message for i in result.issues]}"
)

def test_httplib_import_only_global_blocked(self) -> None:
"""Python 2 httplib alias GLOBAL imports should match http.client danger handling."""
payload = b"\x80\x02chttplib\nHTTPSConnection\n."
result = self._scan_bytes(payload)

assert result.success
assert result.has_errors
failed_checks = [check for check in result.checks if check.status == CheckStatus.FAILED]
assert any(
check.severity == IssueSeverity.CRITICAL
and check.rule_code == "S303"
and "httplib.HTTPSConnection" in check.message
for check in failed_checks
), f"Expected S303 CRITICAL httplib import finding, got: {[c.message for c in failed_checks]}"

def test_httplib_reduce_blocked(self) -> None:
"""Python 2 httplib alias REDUCE payloads should be treated as dangerous."""
result = self._scan_bytes(self._craft_global_reduce_pickle("httplib", "HTTPSConnection"))

assert result.success
assert result.has_errors
failed_checks = [check for check in result.checks if check.status == CheckStatus.FAILED]
assert any(
check.severity == IssueSeverity.CRITICAL
and check.rule_code == "S201"
and "httplib.HTTPSConnection" in check.message
for check in failed_checks
), f"Expected S201 CRITICAL httplib reduce finding, got: {[c.message for c in failed_checks]}"

def test_http_client_coverage_unchanged(self) -> None:
"""Existing http.client dangerous-global coverage should remain stable."""
result = self._scan_bytes(self._craft_global_reduce_pickle("http.client", "HTTPSConnection"))

assert result.success
assert result.has_errors
failed_checks = [check for check in result.checks if check.status == CheckStatus.FAILED]
assert any(
check.severity == IssueSeverity.CRITICAL
and check.rule_code == "S201"
and "http.client.HTTPSConnection" in check.message
for check in failed_checks
), f"Expected CRITICAL http.client reduce finding, got: {[c.message for c in failed_checks]}"

def test_safe_stdlib_import_remains_non_failing(self) -> None:
"""Benign stdlib import-only globals should stay non-failing."""
payload = b"\x80\x02cdatetime\ndatetime\n."
result = self._scan_bytes(payload)

assert result.success
assert not result.has_errors
assert not [check for check in result.checks if check.status == CheckStatus.FAILED]

def test_sqlite3_blocked(self) -> None:
"""sqlite3 module should be flagged as dangerous."""
result = self._scan_bytes(self._craft_global_reduce_pickle("sqlite3", "connect"))
Expand Down Expand Up @@ -993,6 +1045,42 @@ def test_webbrowser_blocked(self) -> None:
f"Expected CRITICAL webbrowser issue, got: {[i.message for i in result.issues]}"
)

def test_multi_stream_httplib_detected_in_second_stream(self) -> None:
"""Scanner should detect httplib payload hidden in a second pickle stream."""
benign_stream = pickle.dumps({"safe": True}, protocol=2)
payload = benign_stream + b"\x80\x02chttplib\nHTTPSConnection\n."

result = self._scan_bytes(payload)

assert result.success
assert result.has_errors
assert any(
issue.severity == IssueSeverity.CRITICAL and "httplib" in issue.message.lower() for issue in result.issues
), f"Expected CRITICAL httplib issue in second stream, got: {[i.message for i in result.issues]}"

def test_zip_entry_with_httplib_payload_is_detected(self) -> None:
"""Scanner should detect httplib payload embedded in a zip entry."""
import zipfile

from modelaudit.core import scan_file

with tempfile.TemporaryDirectory() as tmp_dir:
zip_path = Path(tmp_dir) / "httplib-payload.zip"

with zipfile.ZipFile(zip_path, "w") as zf:
zf.writestr("nested.pkl", self._craft_global_reduce_pickle("httplib", "HTTPSConnection"))

result = scan_file(str(zip_path))

assert result.success
assert result.has_errors
assert any(
issue.severity == IssueSeverity.CRITICAL
and "httplib.httpsconnection" in issue.message.lower()
and issue.details.get("zip_entry") == "nested.pkl"
for issue in result.issues
), f"Expected CRITICAL httplib zip issue, got: {[i.message for i in result.issues]}"


class TestCVE20251716PipMainBlocklist(unittest.TestCase):
"""Test CVE-2025-1716: pickle bypass via pip.main() as callable."""
Expand Down
Loading