Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 59 additions & 5 deletions modelaudit/scanners/skops_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,20 @@ def __init__(self, config: dict[str, Any] | None = None):
# Security limits for decompression bomb protection
self.max_file_size = self.config.get("max_skops_file_size", 500 * 1024 * 1024) # 500MB
self.max_files_in_archive = self.config.get("max_files_in_archive", 10000)
self.max_zip_entry_read_size = self.config.get("max_zip_entry_read_size", 10 * 1024 * 1024)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Enforce a hard 10 MB upper bound for ZIP entry reads.

Line 29 accepts arbitrary larger config values, which weakens the decompression-bomb protection goal. Clamp to 10 MB and only allow lowering it.

🔧 Proposed fix
 class SkopsScanner(BaseScanner):
     """Scanner for skops serialized files (.skops format)."""

+    MAX_ZIP_ENTRY_READ_SIZE: ClassVar[int] = 10 * 1024 * 1024
+
     name = "skops"
@@
-        self.max_zip_entry_read_size = self.config.get("max_zip_entry_read_size", 10 * 1024 * 1024)
+        configured_zip_entry_limit = int(
+            self.config.get("max_zip_entry_read_size", self.MAX_ZIP_ENTRY_READ_SIZE)
+        )
+        self.max_zip_entry_read_size = min(configured_zip_entry_limit, self.MAX_ZIP_ENTRY_READ_SIZE)

As per coding guidelines: "Cap archive member reads to 10 MB for metadata validation to prevent memory spikes on large pickles".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@modelaudit/scanners/skops_scanner.py` at line 29, Clamp the configured ZIP
entry read size so it can never exceed 10 MB: replace the current assignment to
self.max_zip_entry_read_size with logic that reads the configured value via
self.config.get("max_zip_entry_read_size", 10 * 1024 * 1024) and sets
self.max_zip_entry_read_size to the minimum of that value and 10 * 1024 * 1024
(thus allowing only lowered values), keeping the 10 MB default when unset;
update the assignment in scanners/skops_scanner.py where
self.max_zip_entry_read_size is set.


def _read_zip_entry_safely(self, zip_file: zipfile.ZipFile, file_info: zipfile.ZipInfo) -> bytes | None:
"""Read a ZIP entry with a bounded memory limit."""
if file_info.file_size > self.max_zip_entry_read_size:
return None

with zip_file.open(file_info, "r") as entry:
content = entry.read(self.max_zip_entry_read_size + 1)

if len(content) > self.max_zip_entry_read_size:
return None

return content
Comment on lines +31 to +42
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don’t silently skip oversized entries without reporting reduced coverage.

Lines 33-40 return None, and callers silently continue. That can yield a successful scan while unread oversized files may contain exploit indicators. Record skipped entries and emit a dedicated check/metadata flag for partial scan visibility.

🔧 Proposed fix
     def __init__(self, config: dict[str, Any] | None = None):
         super().__init__(config)
@@
         self.max_zip_entry_read_size = self.config.get("max_zip_entry_read_size", 10 * 1024 * 1024)
+        self._skipped_oversized_entries: list[str] = []

     def _read_zip_entry_safely(self, zip_file: zipfile.ZipFile, file_info: zipfile.ZipInfo) -> bytes | None:
         """Read a ZIP entry with a bounded memory limit."""
         if file_info.file_size > self.max_zip_entry_read_size:
+            self._skipped_oversized_entries.append(file_info.filename)
             return None
@@
         if len(content) > self.max_zip_entry_read_size:
+            self._skipped_oversized_entries.append(file_info.filename)
             return None
@@
     def scan(self, path: str) -> ScanResult:
@@
+        self._skipped_oversized_entries = []
@@
                 self._check_unsafe_joblib_fallback(zip_file, result, path)
+
+                if self._skipped_oversized_entries:
+                    result.add_check(
+                        name="Oversized ZIP Entries Skipped",
+                        passed=False,
+                        message=(
+                            f"Skipped {len(self._skipped_oversized_entries)} oversized archive entries; "
+                            "security detection coverage is partial"
+                        ),
+                        severity=IssueSeverity.WARNING,
+                        location=path,
+                        details={
+                            "max_zip_entry_read_size": self.max_zip_entry_read_size,
+                            "skipped_entries": self._skipped_oversized_entries[:20],
+                            "skipped_count": len(self._skipped_oversized_entries),
+                        },
+                    )

Based on learnings: "Preserve or strengthen security detections; test both benign and malicious samples when adding scanner/feature changes".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@modelaudit/scanners/skops_scanner.py` around lines 31 - 42, The
_read_zip_entry_safely function currently returns None for oversized entries and
callers silently continue; modify it to record skipped entries and mark the scan
as partial: when file_info.file_size > self.max_zip_entry_read_size or the read
exceeds the limit, append the file name/path to a new self._skipped_zip_entries
list (or similar attribute) and ensure the scanner emits a metadata flag (e.g.,
partial_scan=True and skipped_entries list) in the final scan result; update any
callers that iterate zip entries to still record the skipped name from the
return or from self._skipped_zip_entries so results/reporting include the
skipped files and a clear partial-scan indicator.


@classmethod
def can_handle(cls, path: str) -> bool:
Expand Down Expand Up @@ -84,7 +98,9 @@ def _detect_cve_2025_54412(
# Scan file contents for suspicious patterns
for file_info in zip_file.filelist:
try:
content = zip_file.read(file_info)
content = self._read_zip_entry_safely(zip_file, file_info)
if content is None:
continue
for binary_pattern in suspicious_binary_patterns:
if binary_pattern in content:
found_patterns.append(
Expand Down Expand Up @@ -149,7 +165,9 @@ def _detect_cve_2025_54413(
# Scan file contents for suspicious patterns
for file_info in zip_file.filelist:
try:
content = zip_file.read(file_info)
content = self._read_zip_entry_safely(zip_file, file_info)
if content is None:
continue
for binary_pattern in suspicious_binary_patterns:
if binary_pattern in content:
found_patterns.append(
Expand Down Expand Up @@ -194,7 +212,11 @@ def _detect_cve_2025_54886(self, zip_file: zipfile.ZipFile, result: ScanResult,
# Check for Card/model card files
if "card" in file_name or "model_card" in file_name or "readme" in file_name:
try:
content = zip_file.read(file_info).decode("utf-8", errors="ignore")
raw_content = self._read_zip_entry_safely(zip_file, file_info)
if raw_content is None:
continue

content = raw_content.decode("utf-8", errors="ignore")
# Check for get_model or joblib references
if "get_model" in content or "joblib" in content or "load" in content:
suspicious_files.append(file_info.filename)
Expand Down Expand Up @@ -231,7 +253,12 @@ def _check_protocol_version(self, zip_file: zipfile.ZipFile, result: ScanResult,
# Check for schema or version files
for file_name in zip_file.namelist():
if "schema" in file_name.lower() or "version" in file_name.lower() or "protocol" in file_name.lower():
content = zip_file.read(file_name).decode("utf-8", errors="ignore")
file_info = zip_file.getinfo(file_name)
raw_content = self._read_zip_entry_safely(zip_file, file_info)
if raw_content is None:
continue

content = raw_content.decode("utf-8", errors="ignore")

# Check for protocol version indicators
if "PROTOCOL" in content or "version" in content.lower():
Expand Down Expand Up @@ -293,7 +320,9 @@ def _check_unsafe_joblib_fallback(self, zip_file: zipfile.ZipFile, result: ScanR
is_metadata = self._is_skops_metadata(file_info.filename)

try:
content = zip_file.read(file_info)
content = self._read_zip_entry_safely(zip_file, file_info)
if content is None:
continue
for pattern in joblib_patterns:
# Only suppress the broad `sklearn` heuristic for metadata
# files; explicit signals like `joblib.load` / `pickle.load`
Expand Down Expand Up @@ -388,6 +417,31 @@ def scan(self, path: str) -> ScanResult:
result.metadata["file_count"] = len(file_list)
result.metadata["files"] = file_list[:100] # Store first 100 files

total_uncompressed_size = sum(file_info.file_size for file_info in zip_file.filelist)
result.metadata["archive_uncompressed_size"] = total_uncompressed_size

if total_uncompressed_size > self.max_file_size:
result.add_check(
name="Archive Uncompressed Size Limit",
passed=False,
message=(
"Suspicious: Archive uncompressed content exceeds configured skops size limit "
f"({total_uncompressed_size} bytes > {self.max_file_size} bytes)"
),
severity=IssueSeverity.INFO,
location=path,
details={
"archive_uncompressed_size": total_uncompressed_size,
"max_skops_file_size": self.max_file_size,
},
why=(
"Large uncompressed archive content may indicate a decompression bomb that can "
"exhaust memory during scanning."
),
)
result.finish(success=False)
return result

# Run CVE detection checks (with content analysis)
self._detect_cve_2025_54412(zip_file, result, path, file_list)
self._detect_cve_2025_54413(zip_file, result, path, file_list)
Expand Down
29 changes: 29 additions & 0 deletions tests/scanners/test_skops_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,35 @@ def test_handles_decompression_bomb(self, tmp_path: Path) -> None:
assert len(bomb_checks) > 0
assert bomb_checks[0].status == CheckStatus.FAILED

def test_rejects_archive_exceeding_uncompressed_size_limit(self, tmp_path: Path) -> None:
"""Archive should fail when total uncompressed bytes exceed max_skops_file_size."""
skops_file = tmp_path / "size_limit.skops"
with zipfile.ZipFile(skops_file, "w", compression=zipfile.ZIP_DEFLATED) as zf:
zf.writestr("README.md", "A" * 4096)
zf.writestr("schema.json", '{"version": "1.0"}')

scanner = SkopsScanner(config={"max_skops_file_size": 2048})
result = scanner.scan(str(skops_file))

assert result.success is False
size_checks = [c for c in result.checks if "Archive Uncompressed Size Limit" in c.name]
assert len(size_checks) > 0
assert size_checks[0].status == CheckStatus.FAILED

def test_skips_oversized_readme_entry_without_crashing(self, tmp_path: Path) -> None:
"""Oversized archive entries should be skipped by bounded reads."""
skops_file = tmp_path / "oversized_readme.skops"
with zipfile.ZipFile(skops_file, "w", compression=zipfile.ZIP_DEFLATED) as zf:
zf.writestr("README.md", "get_model via joblib.load" * 512)
zf.writestr("schema.json", '{"version": "1.0"}')

scanner = SkopsScanner(config={"max_zip_entry_read_size": 128, "max_skops_file_size": 10 * 1024 * 1024})
result = scanner.scan(str(skops_file))

assert result.success is True
cve_checks = [c for c in result.checks if "CVE-2025-54886" in c.name and c.status == CheckStatus.FAILED]
assert len(cve_checks) == 0
Comment on lines +359 to +371
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Strengthen this test to assert partial-scan visibility for oversized entries.

Right now it only verifies “no crash/no CVE flag.” Once skipped-entry reporting is added, assert that check/metadata too, so silent detection gaps can’t regress.

Based on learnings: "Preserve or strengthen security detections; test both benign and malicious samples when adding scanner/feature changes".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/scanners/test_skops_scanner.py` around lines 359 - 371, Update the
test_skips_oversized_readme_entry_without_crashing to also assert that the
scanner reports the skipped oversized entry (e.g., via a specific check or
metadata field on result) so partial-scan visibility is verified: after calling
SkopsScanner.scan(...) inspect result.checks and result.metadata (or the
scanner's skip-reporting check name) and assert there is a check or metadata
entry indicating README.md was skipped due to exceeding max_zip_entry_read_size,
while still asserting no failed CVE-2025-54886 checks; reference test function
name, SkopsScanner, result.checks, result.metadata, and CheckStatus to locate
and add the new assertions.



class TestSkopsScannerMultipleCVEs:
"""Test detection of multiple CVEs in a single file."""
Expand Down
Loading