Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
b7cb604
fix: recurse into numpy object pickle payloads
mldangelo Mar 13, 2026
407652a
test: type annotate numpy recursion regressions
mldangelo Mar 13, 2026
56782ee
fix(numpy): preserve npz member check context
mldangelo Mar 13, 2026
fa0abda
Merge remote-tracking branch 'refs/remotes/origin/feat/numpy-object-p…
mldangelo Mar 13, 2026
f42f854
test: format asset extraction regressions
mldangelo Mar 13, 2026
d45df12
Merge origin/main into feat/numpy-object-pickle-recursion
mldangelo Mar 14, 2026
432c71e
Merge remote-tracking branch 'origin/feat/numpy-object-pickle-recursi…
mldangelo Mar 14, 2026
d42949a
Merge remote-tracking branch 'origin/main' into feat/numpy-object-pic…
mldangelo Mar 14, 2026
2e16362
Merge remote-tracking branch 'origin/feat/numpy-object-pickle-recursi…
mldangelo Mar 14, 2026
5a114b2
Merge remote-tracking branch 'origin/main' into feat/numpy-object-pic…
mldangelo Mar 14, 2026
95ae02c
test: type annotate numpy trailing-bytes regression
mldangelo Mar 14, 2026
644d00a
Merge remote-tracking branch 'origin/main' into feat/numpy-object-pic…
mldangelo Mar 14, 2026
2098e6c
Merge remote-tracking branch 'origin/main' into feat/numpy-object-pic…
mldangelo Mar 15, 2026
0e141f4
fix: harden numpy recursion follow-up checks
mldangelo Mar 15, 2026
e146683
Merge remote-tracking branch 'origin/main' into feat/numpy-object-pic…
mldangelo Mar 15, 2026
74e1117
Merge remote-tracking branch 'origin/main' into feat/numpy-object-pic…
mldangelo Mar 15, 2026
fb19fc1
Merge remote-tracking branch 'origin/main' into feat/numpy-object-pic…
mldangelo Mar 16, 2026
9535dac
Merge remote-tracking branch 'origin/main' into feat/numpy-object-pic…
mldangelo Mar 16, 2026
81229c0
fix: harden numpy recursion and local streaming
mldangelo Mar 16, 2026
383c543
Merge branch 'main' into review-pr-699
mldangelo Mar 17, 2026
827d186
Merge remote-tracking branch 'origin/main' into audit-pr699-mainmerge
mldangelo Mar 18, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Fixed

- **security:** recurse into object-dtype `.npy` payloads and `.npz` object members with the pickle scanner while preserving CVE-2019-6446 warnings and archive-member context
- **security:** harden TensorFlow weight extraction limits to bound actual tensor payload materialization, including malformed `tensor_content` and string-backed tensors, and continue scanning past oversized `Const` nodes
- **security:** stream TAR members to temp files under size limits instead of buffering whole entries in memory during scan
- **security:** inspect TensorFlow SavedModel function definitions when scanning for dangerous ops and protobuf string abuse, with function-aware finding locations
Expand Down
23 changes: 22 additions & 1 deletion modelaudit/scanners/numpy_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@

import sys
import warnings
from typing import TYPE_CHECKING, Any, ClassVar
from typing import TYPE_CHECKING, Any, BinaryIO, ClassVar

from .base import BaseScanner, IssueSeverity, ScanResult
from .pickle_scanner import PickleScanner

# Import NumPy with compatibility handling
try:
Expand Down Expand Up @@ -88,6 +89,17 @@ def _validate_array_dimensions(self, shape: tuple[int, ...]) -> None:
CVE_2019_6446_CVSS = 9.8
CVE_2019_6446_CWE = "CWE-502"

def _scan_embedded_pickle_payload(
self,
file_obj: BinaryIO,
payload_size: int,
context_path: str,
) -> ScanResult:
"""Reuse PickleScanner analysis for object-dtype NumPy payloads."""
pickle_scanner = PickleScanner(config=self.config)
pickle_scanner.current_file_path = context_path
return pickle_scanner._scan_pickle_bytes(file_obj, payload_size)

def _validate_dtype(self, dtype: Any) -> None:
"""Validate numpy dtype for security"""
# Check for problematic data types
Expand Down Expand Up @@ -299,6 +311,15 @@ def scan(self, path: str) -> ScanResult:
),
)

f.seek(data_offset)
embedded_result = self._scan_embedded_pickle_payload(
f,
file_size - data_offset,
path,
)
result.issues.extend(embedded_result.issues)
result.checks.extend(embedded_result.checks)

self._validate_dtype(dtype)
result.add_check(
name="Data Type Safety Check",
Expand Down
119 changes: 119 additions & 0 deletions tests/scanners/test_numpy_scanner.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from pathlib import Path

import numpy as np

from modelaudit.scanners.base import IssueSeverity
Expand Down Expand Up @@ -100,3 +102,120 @@ def test_structured_with_object_field_triggers_cve(self, tmp_path):

cve_checks = [c for c in result.checks if "CVE-2019-6446" in (c.name + c.message)]
assert len(cve_checks) > 0, "Structured dtype with object field should trigger CVE"


class _ExecPayload:
def __reduce__(self):
return (exec, ("print('owned')",))


class _SSLPayload:
def __reduce__(self):
import ssl

return (ssl.get_server_certificate, (("example.com", 443),))


def _failed_checks(result):
return [c for c in result.checks if c.status.value == "failed"]
Comment on lines +115 to +128
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Add type hints to newly introduced helpers.

The new helper methods/functions are untyped, which violates the repository Python typing rule.

Proposed fix
+from typing import Any, Callable
+
 class _ExecPayload:
-    def __reduce__(self):
+    def __reduce__(self) -> tuple[Callable[..., object], tuple[str]]:
         return (exec, ("print('owned')",))
@@
 class _SSLPayload:
-    def __reduce__(self):
+    def __reduce__(self) -> tuple[Callable[..., object], tuple[tuple[str, int]]]:
         import ssl
 
         return (ssl.get_server_certificate, (("example.com", 443),))
@@
-def _failed_checks(result):
+def _failed_checks(result: Any) -> list[Any]:
     return [c for c in result.checks if c.status.value == "failed"]

As per coding guidelines: "Always include type hints in Python code".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/scanners/test_numpy_scanner.py` around lines 107 - 120, Add Python type
hints to the new helpers: annotate _ExecPayload.__reduce__ and
_SSLPayload.__reduce__ to return a Tuple[Callable[..., Any], Tuple[Any, ...]]
and annotate their self parameter as usual; import the needed typing names
(e.g., Callable, Tuple, Any). Also annotate _failed_checks to accept result: Any
(or the specific result type if available) and return List[Any] (or
List[CheckType] if you have a Check type); import List/Any as needed. Ensure all
new function/method signatures use these type annotations to satisfy the
repository typing rule.



def test_object_dtype_numpy_recurses_into_pickle_exec(tmp_path: Path) -> None:
arr = np.array([_ExecPayload()], dtype=object)
path = tmp_path / "malicious_object.npy"
np.save(path, arr, allow_pickle=True)

scanner = NumPyScanner()
result = scanner.scan(str(path))

failed = _failed_checks(result)
assert any("CVE-2019-6446" in (c.name + c.message) for c in failed)
assert any("exec" in (c.message.lower()) for c in failed)


def test_object_dtype_numpy_recurses_into_pickle_ssl(tmp_path: Path) -> None:
arr = np.array([_SSLPayload()], dtype=object)
path = tmp_path / "malicious_ssl_object.npy"
np.save(path, arr, allow_pickle=True)

scanner = NumPyScanner()
result = scanner.scan(str(path))

failed = _failed_checks(result)
assert any("CVE-2019-6446" in (c.name + c.message) for c in failed)
assert any("ssl.get_server_certificate" in c.message for c in failed)


def test_numeric_npz_has_no_pickle_recursion_findings(tmp_path: Path) -> None:
npz_path = tmp_path / "numeric_only.npz"
np.savez(npz_path, a=np.arange(4), b=np.ones((2, 2), dtype=np.float32))

from modelaudit.scanners.zip_scanner import ZipScanner

result = ZipScanner().scan(str(npz_path))

assert not any("CVE-2019-6446" in (c.name + c.message) for c in result.checks)
assert not any("exec" in c.message.lower() for c in result.checks)


def test_object_npz_member_recurses_into_pickle_exec_with_member_context(tmp_path: Path) -> None:
safe = np.array([1, 2, 3], dtype=np.int64)
malicious = np.array([_ExecPayload()], dtype=object)
npz_path = tmp_path / "mixed_object.npz"
np.savez(npz_path, safe=safe, payload=malicious)

from modelaudit.scanners.zip_scanner import ZipScanner

result = ZipScanner().scan(str(npz_path))

failed = _failed_checks(result)
assert any("CVE-2019-6446" in (c.name + c.message) and "payload.npy" in str(c.location) for c in failed)
assert any("exec" in i.message.lower() and i.details.get("zip_entry") == "payload.npy" for i in result.issues)


def test_benign_object_dtype_numpy_no_nested_critical(tmp_path: Path) -> None:
arr = np.array([{"k": "v"}, [1, 2, 3]], dtype=object)
path = tmp_path / "benign_object.npy"
np.save(path, arr, allow_pickle=True)

scanner = NumPyScanner()
result = scanner.scan(str(path))

assert any("CVE-2019-6446" in (c.name + c.message) for c in result.checks)
assert not any(i.severity == IssueSeverity.CRITICAL for i in result.issues if "CVE-2019-6446" not in i.message)


def test_benign_object_dtype_npz_no_nested_critical(tmp_path: Path) -> None:
npz_path = tmp_path / "benign_object.npz"
np.savez(npz_path, safe=np.array([{"x": 1}], dtype=object))

from modelaudit.scanners.zip_scanner import ZipScanner

result = ZipScanner().scan(str(npz_path))

assert any("CVE-2019-6446" in (c.name + c.message) for c in result.checks)
assert not any(i.severity == IssueSeverity.CRITICAL for i in result.issues)


def test_truncated_npy_fails_safely(tmp_path: Path) -> None:
arr = np.array([_ExecPayload()], dtype=object)
path = tmp_path / "truncated.npy"
np.save(path, arr, allow_pickle=True)
path.write_bytes(path.read_bytes()[:-8])

scanner = NumPyScanner()
result = scanner.scan(str(path))

assert any(i.severity == IssueSeverity.INFO for i in result.issues)


def test_corrupted_npz_fails_safely(tmp_path: Path) -> None:
npz_path = tmp_path / "corrupt.npz"
npz_path.write_bytes(b"not-a-zip")

from modelaudit.scanners.zip_scanner import ZipScanner

result = ZipScanner().scan(str(npz_path))

assert result.success is False
assert any(i.severity == IssueSeverity.INFO for i in result.issues)
17 changes: 17 additions & 0 deletions tests/scanners/test_zip_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,23 @@ def test_scan_zip_with_prefixed_proto0_pickle_disguised_as_text(self, tmp_path:
f"Expected critical os/posix.system issue, got: {critical_messages}"
)

def test_scan_npz_with_object_member_recurses_into_pickle(self, tmp_path: Path) -> None:
import numpy as np

class _ExecPayload:
def __reduce__(self):
return (exec, ("print('owned')",))

archive_path = tmp_path / "payload.npz"
np.savez(archive_path, safe=np.arange(3), payload=np.array([_ExecPayload()], dtype=object))

result = self.scanner.scan(str(archive_path))
assert result.success is True

failed_checks = [c for c in result.checks if c.status.value == "failed"]
assert any("cve-2019-6446" in (c.name + c.message).lower() for c in failed_checks)
assert any("exec" in i.message.lower() and i.details.get("zip_entry") == "payload.npy" for i in result.issues)

def test_scan_zip_with_plain_text_global_prefix_not_treated_as_pickle(self, tmp_path: Path) -> None:
"""Plain text entries that start with GLOBAL-like bytes should not trigger pickle parse warnings."""
archive_path = tmp_path / "plain_text_payload.zip"
Expand Down
Loading