Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- **security:** add exact dangerous-global coverage for `numpy.load`, `site.main`, `_io.FileIO`, `test.support.script_helper.assert_python_ok`, `_osx_support._read_output`, `_aix_support._read_cmd_output`, `_pyrepl.pager.pipe_pager`, `torch.serialization.load`, and `torch._inductor.codecache.compile_file` (9 PickleScan-only loader and execution primitives)
- **security:** treat legacy `httplib` pickle globals the same as `http.client`, including import-only and `REDUCE` findings in standalone and archived payloads
- **security:** remove `builtins.hasattr` / `__builtin__.hasattr` from the pickle safe-global allowlist so attribute-access primitives stay flagged as dangerous builtins
- **security:** harden TensorFlow weight extraction limits to bound actual tensor payload materialization, including malformed `tensor_content` and string-backed tensors, and continue scanning past oversized `Const` nodes
- **security:** stream TAR members to temp files under size limits instead of buffering whole entries in memory during scan
- **security:** inspect TensorFlow SavedModel function definitions when scanning for dangerous ops and protobuf string abuse, with function-aware finding locations
Expand Down
5 changes: 2 additions & 3 deletions modelaudit/scanners/pickle_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -826,7 +826,8 @@ def _is_dangerous_module(mod: str) -> bool:
],
# Python builtins - safe built-in types and functions
# NOTE: eval, exec, compile, __import__, open, file are NOT in this list (they remain dangerous)
# NOTE: getattr, setattr, delattr are also NOT in this list (in ALWAYS_DANGEROUS_FUNCTIONS)
# NOTE: getattr, setattr, delattr, hasattr are NOT in this list
# because attribute-access primitives must never be allowlisted.
"__builtin__": [ # Python 2 builtins
"set",
"frozenset",
Expand Down Expand Up @@ -861,7 +862,6 @@ def _is_dangerous_module(mod: str) -> bool:
"id",
"isinstance",
"issubclass",
"hasattr",
"callable",
"repr",
"ascii",
Expand Down Expand Up @@ -921,7 +921,6 @@ def _is_dangerous_module(mod: str) -> bool:
"id",
"isinstance",
"issubclass",
"hasattr",
"callable",
"repr",
"ascii",
Expand Down
146 changes: 146 additions & 0 deletions tests/scanners/test_pickle_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,152 @@ def _scan_bytes(self, data: bytes, *, suffix: str = ".pkl") -> ScanResult:
finally:
os.unlink(path)

def test_builtins_hasattr_is_critical(self) -> None:
"""builtins.hasattr must not be allowlisted as safe."""
result = self._scan_bytes(self._craft_global_reduce_pickle("builtins", "hasattr"))

assert result.success
assert result.has_errors
failed_global_checks = [
check
for check in result.checks
if check.name == "Global Module Reference Check"
and check.status == CheckStatus.FAILED
and check.severity == IssueSeverity.CRITICAL
]
assert any("builtins.hasattr" in check.message for check in failed_global_checks), (
f"Expected CRITICAL Global Module Reference Check for builtins.hasattr, "
f"got: {[check.message for check in failed_global_checks]}"
)

def test_dunder_builtin_hasattr_is_critical(self) -> None:
"""__builtin__.hasattr must not be allowlisted as safe."""
result = self._scan_bytes(self._craft_global_reduce_pickle("__builtin__", "hasattr"))

assert result.success
assert result.has_errors
failed_global_checks = [
check
for check in result.checks
if check.name == "Global Module Reference Check"
and check.status == CheckStatus.FAILED
and check.severity == IssueSeverity.CRITICAL
]
assert any("__builtin__.hasattr" in check.message for check in failed_global_checks), (
f"Expected CRITICAL Global Module Reference Check for __builtin__.hasattr, "
f"got: {[check.message for check in failed_global_checks]}"
)

def test_safe_builtins_remain_allowlisted(self) -> None:
"""Safe reconstruction builtins must remain non-failing."""
for safe_builtin in ["set", "slice", "tuple"]:
result = self._scan_bytes(self._craft_global_reduce_pickle("builtins", safe_builtin))

assert result.success
assert not result.has_errors, (
f"Expected builtins.{safe_builtin} to remain non-failing, got: {[i.message for i in result.issues]}"
)
safe_global_checks = [check for check in result.checks if check.name == "Global Module Reference Check"]
failed_builtin_checks = [
check
for check in result.checks
if check.status == CheckStatus.FAILED and f"builtins.{safe_builtin}" in check.message
]
assert any(
check.status == CheckStatus.PASSED and f"builtins.{safe_builtin}" in check.message
for check in safe_global_checks
), f"Expected passed Global Module Reference Check for builtins.{safe_builtin}"
assert not any(check.status == CheckStatus.FAILED for check in safe_global_checks), (
f"Unexpected failed Global Module Reference Check for builtins.{safe_builtin}: "
f"{[check.message for check in safe_global_checks]}"
)
assert not failed_builtin_checks, (
f"Expected builtins.{safe_builtin} to stay non-failing across all checks, "
f"got: {[check.message for check in failed_builtin_checks]}"
)

def test_dangerous_builtins_still_fail(self) -> None:
"""Dangerous builtins must continue to fail after allowlist tightening."""
for dangerous_builtin in ["eval", "open"]:
result = self._scan_bytes(self._craft_global_reduce_pickle("builtins", dangerous_builtin))

assert result.success
assert result.has_errors
failed_checks = [
check
for check in result.checks
if check.status == CheckStatus.FAILED and check.severity == IssueSeverity.CRITICAL
]
assert any(f"builtins.{dangerous_builtin}" in check.message for check in failed_checks), (
f"Expected CRITICAL builtins.{dangerous_builtin} finding, "
f"got: {[check.message for check in failed_checks]}"
)

def test_builtins_hasattr_stack_global_is_critical(self) -> None:
"""STACK_GLOBAL resolution for builtins.hasattr must be flagged."""
payload = b"\x80\x04\x8c\x08builtins\x8c\x07hasattr\x93)R."

result = self._scan_bytes(payload)

assert result.success
assert result.has_errors
failed_stack_global_checks = [
check
for check in result.checks
if check.name == "STACK_GLOBAL Module Check"
and check.status == CheckStatus.FAILED
and check.severity == IssueSeverity.CRITICAL
]
assert any("builtins.hasattr" in check.message for check in failed_stack_global_checks), (
f"Expected CRITICAL STACK_GLOBAL Module Check for builtins.hasattr, "
f"got: {[check.message for check in failed_stack_global_checks]}"
)

def test_builtins_hasattr_binput_binget_recall_is_critical(self) -> None:
"""Memoized callable recall via BINPUT/BINGET must keep builtins.hasattr dangerous."""
# Memoize the callable, drop the original stack reference, then recall it.
payload = b"\x80\x02cbuiltins\nhasattr\nq\x010h\x01(tR."

result = self._scan_bytes(payload)

assert result.success
assert result.has_errors
failed_reduce_checks = [
check
for check in result.checks
if check.name == "REDUCE Opcode Safety Check"
and check.status == CheckStatus.FAILED
and check.severity == IssueSeverity.CRITICAL
]
assert any(check.details.get("associated_global") == "builtins.hasattr" for check in failed_reduce_checks), (
f"Expected CRITICAL REDUCE finding for builtins.hasattr memo recall, "
f"got: {[check.details for check in failed_reduce_checks]}"
)

def test_builtins_hasattr_detected_after_benign_stream(self) -> None:
"""Malicious builtins.hasattr stream should be detected after benign warm-up stream."""
import io

buf = io.BytesIO()
pickle.dump({"safe": True}, buf, protocol=2)
buf.write(self._craft_global_reduce_pickle("builtins", "hasattr"))

result = self._scan_bytes(buf.getvalue())

assert result.success
assert result.has_errors
failed_reduce_checks = [
check
for check in result.checks
if check.name == "REDUCE Opcode Safety Check"
and check.status == CheckStatus.FAILED
and check.severity == IssueSeverity.CRITICAL
]
assert any(check.details.get("associated_global") == "builtins.hasattr" for check in failed_reduce_checks), (
f"Expected CRITICAL REDUCE finding for builtins.hasattr after benign stream, "
f"got: {[check.details for check in failed_reduce_checks]}"
)

@staticmethod
def _structural_tamper_checks(result: ScanResult) -> list:
return [issue for issue in result.issues if issue.details.get("tamper_type") is not None]
Expand Down
Loading