Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed

- **security:** treat legacy `httplib` pickle globals the same as `http.client`, including import-only and `REDUCE` findings in standalone and archived payloads
- **security:** remove `builtins.hasattr` / `__builtin__.hasattr` from the pickle safe-global allowlist so attribute-access primitives stay flagged as dangerous builtins
- **security:** harden TensorFlow weight extraction limits to bound actual tensor payload materialization, including malformed `tensor_content` and string-backed tensors, and continue scanning past oversized `Const` nodes
- **security:** stream TAR members to temp files under size limits instead of buffering whole entries in memory during scan
- **security:** inspect TensorFlow SavedModel function definitions when scanning for dangerous ops and protobuf string abuse, with function-aware finding locations
Expand Down
5 changes: 2 additions & 3 deletions modelaudit/scanners/pickle_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,8 @@ def _is_dangerous_module(mod: str) -> bool:
],
# Python builtins - safe built-in types and functions
# NOTE: eval, exec, compile, __import__, open, file are NOT in this list (they remain dangerous)
# NOTE: getattr, setattr, delattr are also NOT in this list (in ALWAYS_DANGEROUS_FUNCTIONS)
# NOTE: getattr, setattr, delattr, hasattr are NOT in this list
# because attribute-access primitives must never be allowlisted.
"__builtin__": [ # Python 2 builtins
"set",
"frozenset",
Expand Down Expand Up @@ -751,7 +752,6 @@ def _is_dangerous_module(mod: str) -> bool:
"id",
"isinstance",
"issubclass",
"hasattr",
"callable",
"repr",
"ascii",
Expand Down Expand Up @@ -811,7 +811,6 @@ def _is_dangerous_module(mod: str) -> bool:
"id",
"isinstance",
"issubclass",
"hasattr",
"callable",
"repr",
"ascii",
Expand Down
136 changes: 136 additions & 0 deletions tests/scanners/test_pickle_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,142 @@ def _scan_bytes(self, data: bytes) -> ScanResult:
finally:
os.unlink(path)

def test_builtins_hasattr_is_critical(self) -> None:
"""builtins.hasattr must not be allowlisted as safe."""
result = self._scan_bytes(self._craft_global_reduce_pickle("builtins", "hasattr"))

assert result.success
assert result.has_errors
failed_global_checks = [
check
for check in result.checks
if check.name == "Global Module Reference Check"
and check.status == CheckStatus.FAILED
and check.severity == IssueSeverity.CRITICAL
]
assert any("builtins.hasattr" in check.message for check in failed_global_checks), (
f"Expected CRITICAL Global Module Reference Check for builtins.hasattr, "
f"got: {[check.message for check in failed_global_checks]}"
)

def test_dunder_builtin_hasattr_is_critical(self) -> None:
"""__builtin__.hasattr must not be allowlisted as safe."""
result = self._scan_bytes(self._craft_global_reduce_pickle("__builtin__", "hasattr"))

assert result.success
assert result.has_errors
failed_global_checks = [
check
for check in result.checks
if check.name == "Global Module Reference Check"
and check.status == CheckStatus.FAILED
and check.severity == IssueSeverity.CRITICAL
]
assert any("__builtin__.hasattr" in check.message for check in failed_global_checks), (
f"Expected CRITICAL Global Module Reference Check for __builtin__.hasattr, "
f"got: {[check.message for check in failed_global_checks]}"
)

def test_safe_builtins_remain_allowlisted(self) -> None:
"""Safe reconstruction builtins must remain non-failing."""
for safe_builtin in ["set", "slice", "tuple"]:
result = self._scan_bytes(self._craft_global_reduce_pickle("builtins", safe_builtin))

assert result.success
assert not result.has_errors, (
f"Expected builtins.{safe_builtin} to remain non-failing, got: {[i.message for i in result.issues]}"
)
safe_global_checks = [check for check in result.checks if check.name == "Global Module Reference Check"]
assert any(
check.status == CheckStatus.PASSED and f"builtins.{safe_builtin}" in check.message
for check in safe_global_checks
), f"Expected passed Global Module Reference Check for builtins.{safe_builtin}"
assert not any(check.status == CheckStatus.FAILED for check in safe_global_checks), (
f"Unexpected failed Global Module Reference Check for builtins.{safe_builtin}: "
f"{[check.message for check in safe_global_checks]}"
)

def test_dangerous_builtins_still_fail(self) -> None:
"""Dangerous builtins must continue to fail after allowlist tightening."""
for dangerous_builtin in ["eval", "open"]:
result = self._scan_bytes(self._craft_global_reduce_pickle("builtins", dangerous_builtin))

assert result.success
assert result.has_errors
failed_checks = [
check
for check in result.checks
if check.status == CheckStatus.FAILED and check.severity == IssueSeverity.CRITICAL
]
assert any(f"builtins.{dangerous_builtin}" in check.message for check in failed_checks), (
f"Expected CRITICAL builtins.{dangerous_builtin} finding, "
f"got: {[check.message for check in failed_checks]}"
)

def test_builtins_hasattr_stack_global_is_critical(self) -> None:
"""STACK_GLOBAL resolution for builtins.hasattr must be flagged."""
payload = b"\x80\x04\x8c\x08builtins\x8c\x07hasattr\x93)R."

result = self._scan_bytes(payload)

assert result.success
assert result.has_errors
failed_stack_global_checks = [
check
for check in result.checks
if check.name == "STACK_GLOBAL Module Check"
and check.status == CheckStatus.FAILED
and check.severity == IssueSeverity.CRITICAL
]
assert any("builtins.hasattr" in check.message for check in failed_stack_global_checks), (
f"Expected CRITICAL STACK_GLOBAL Module Check for builtins.hasattr, "
f"got: {[check.message for check in failed_stack_global_checks]}"
)

def test_builtins_hasattr_binput_binget_recall_is_critical(self) -> None:
"""Memoized callable recall via BINPUT/BINGET must keep builtins.hasattr dangerous."""
payload = b"\x80\x02cbuiltins\nhasattr\nq\x01h\x01(tR."

result = self._scan_bytes(payload)

assert result.success
assert result.has_errors
failed_reduce_checks = [
check
for check in result.checks
if check.name == "REDUCE Opcode Safety Check"
and check.status == CheckStatus.FAILED
and check.severity == IssueSeverity.CRITICAL
]
assert any(check.details.get("associated_global") == "builtins.hasattr" for check in failed_reduce_checks), (
f"Expected CRITICAL REDUCE finding for builtins.hasattr memo recall, "
f"got: {[check.details for check in failed_reduce_checks]}"
)

def test_builtins_hasattr_detected_after_benign_stream(self) -> None:
"""Malicious builtins.hasattr stream should be detected after benign warm-up stream."""
import io

buf = io.BytesIO()
pickle.dump({"safe": True}, buf, protocol=2)
buf.write(self._craft_global_reduce_pickle("builtins", "hasattr"))

result = self._scan_bytes(buf.getvalue())

assert result.success
assert result.has_errors
failed_reduce_checks = [
check
for check in result.checks
if check.name == "REDUCE Opcode Safety Check"
and check.status == CheckStatus.FAILED
and check.severity == IssueSeverity.CRITICAL
]
assert any(check.details.get("associated_global") == "builtins.hasattr" for check in failed_reduce_checks), (
f"Expected CRITICAL REDUCE finding for builtins.hasattr after benign stream, "
f"got: {[check.details for check in failed_reduce_checks]}"
)

# ------------------------------------------------------------------
# Fix 1: pkgutil trampoline — must be CRITICAL
# ------------------------------------------------------------------
Expand Down
Loading