Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Fixed

- **cli:** include streamed artifacts as SBOM components when `scan --stream --sbom` is used
- **cli:** exclude HuggingFace download cache bookkeeping files from remote SBOMs and asset lists
- **security:** require official or explicitly allowlisted JFrog hosts before treating `/artifactory/` URLs as authenticated JFrog endpoints
- **security:** detect CVE-2024-5480 PyTorch torch.distributed.rpc arbitrary function execution via PythonUDF (CVSS 10.0)
- **security:** detect CVE-2024-48063 PyTorch torch.distributed.rpc.RemoteModule deserialization RCE via pickle (CVSS 9.8)
Expand Down
16 changes: 13 additions & 3 deletions modelaudit/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -1901,9 +1901,19 @@ def enhanced_progress_callback(message, percentage):
if sbom:
from .integrations.sbom_generator import generate_sbom_pydantic

# Use scanned_paths (actual file paths) instead of expanded_paths (original URLs)
# to prevent FileNotFoundError when generating SBOM for downloaded content
paths_for_sbom = scanned_paths if scanned_paths else expanded_paths
# Remote downloads may leave cache internals under the downloaded directory,
# and streamed scans may delete files before SBOM generation runs. Reuse the
# scanned asset list for those cases so the SBOM reflects actual scanned model
# artifacts rather than the raw cache directory contents.
asset_paths = list(
dict.fromkeys(asset.path for asset in audit_result.assets if asset.path and asset.type != "skipped")
)
if asset_paths and final_scan_and_delete:
paths_for_sbom = asset_paths
else:
# Use scanned_paths (actual file paths) instead of expanded_paths (original URLs)
# to prevent FileNotFoundError when generating SBOM for downloaded content
paths_for_sbom = scanned_paths if scanned_paths else expanded_paths
sbom_text = generate_sbom_pydantic(paths_for_sbom, audit_result)
with open(sbom, "w", encoding="utf-8") as f:
f.write(sbom_text)
Expand Down
17 changes: 17 additions & 0 deletions modelaudit/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,13 @@ def scan_model_directory_or_file(
for root, _, files in os.walk(path, followlinks=False):
for file in files:
file_path = os.path.join(root, file)

# HuggingFace cache bookkeeping files should never surface as
# scan assets or SBOM components for downloaded models.
if _is_huggingface_cache_file(file_path):
logger.debug(f"Skipping HuggingFace cache file: {file_path}")
continue
Comment on lines +666 to +670
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't apply the Hugging Face cache skip globally.

_is_huggingface_cache_file() matches names like main and HEAD regardless of location. Because this new branch runs for every directory scan, a normal local artifact with one of those names now disappears from scanning and SBOM generation.

Proposed fix
-                    if _is_huggingface_cache_file(file_path):
+                    if is_hf_cache and _is_huggingface_cache_file(file_path):
                         logger.debug(f"Skipping HuggingFace cache file: {file_path}")
                         continue
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@modelaudit/core.py` around lines 666 - 670, The skip for HuggingFace cache
files is applied too broadly because _is_huggingface_cache_file(file_path)
matches names like "main" or "HEAD" regardless of location; change the check so
we only skip when the file is actually inside a HuggingFace cache directory.
Update the branch where file_path is tested (the code calling
_is_huggingface_cache_file) to require both that
_is_huggingface_cache_file(file_path) is true and that the path is inside a HF
cache root (e.g., inspect pathlib.Path(file_path).parents for known cache
markers like ".cache/huggingface" or implement a helper
_is_within_hf_cache_dir(file_path) and call that together with
_is_huggingface_cache_file), or alternatively modify _is_huggingface_cache_file
to perform the parent-directory check itself; only then log and continue.


resolved_file = Path(file_path).resolve()

# Check if this is a HuggingFace cache symlink scenario
Expand Down Expand Up @@ -1583,6 +1590,15 @@ def scan_model_streaming(

# Merge results
if scan_result:
metadata_dict = dict(scan_result.metadata or {})
metadata_dict.setdefault("file_size", file_path.stat().st_size)

existing_hashes = metadata_dict.get("file_hashes")
if isinstance(existing_hashes, dict):
existing_hashes.setdefault("sha256", file_hash)
else:
metadata_dict["file_hashes"] = {"sha256": file_hash}

# Use dict-based aggregation to avoid import issues
scan_result_dict = {
"bytes_scanned": scan_result.bytes_scanned,
Expand All @@ -1592,6 +1608,7 @@ def scan_model_streaming(
"issues": [issue.__dict__ for issue in (scan_result.issues or [])],
"checks": [check.__dict__ for check in (scan_result.checks or [])],
"scanners": [scan_result.scanner_name] if scan_result.scanner_name else [],
"file_metadata": {str(file_path): metadata_dict},
}
results.aggregate_scan_result(scan_result_dict)

Expand Down
55 changes: 50 additions & 5 deletions modelaudit/integrations/sbom_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,49 @@ def _get_component_type(path: str, metadata: dict[str, Any] | None) -> Component
return ComponentType.FILE


def _resolve_component_size_and_sha256(
path: str,
metadata: FileMetadataModel | dict[str, Any] | None,
) -> tuple[int, str]:
"""Resolve component size/hash from disk, falling back to recorded metadata."""
if os.path.exists(path):
return os.path.getsize(path), _file_sha256(path)

file_size = 0
sha256 = ""

if isinstance(metadata, FileMetadataModel):
if metadata.file_size is not None:
file_size = metadata.file_size
if metadata.file_hashes and metadata.file_hashes.sha256:
sha256 = metadata.file_hashes.sha256
elif isinstance(metadata, dict):
raw_size = metadata.get("file_size")
if isinstance(raw_size, int):
file_size = raw_size
raw_hashes = metadata.get("file_hashes")
if isinstance(raw_hashes, dict):
raw_sha256 = raw_hashes.get("sha256")
if isinstance(raw_sha256, str):
sha256 = raw_sha256

return file_size, sha256


def _should_skip_sbom_file(path: str) -> bool:
"""Skip cache bookkeeping files that are not model artifacts."""
filename = os.path.basename(path)

if filename.endswith(".metadata") or filename.endswith(".lock"):
return True

if filename in {".gitignore", ".gitattributes", "main", "HEAD"}:
return True

normalized_path = path.replace("\\", "/")
return "/refs/" in normalized_path and filename in {"main", "HEAD"}


def _calculate_risk_score(path: str, issues: list[Issue]) -> int:
"""Calculate risk score for a file based on associated issues."""
score = 0
Expand Down Expand Up @@ -179,8 +222,7 @@ def _component_for_file_pydantic(
issues: list[Issue],
) -> Component:
"""Create a CycloneDX component from Pydantic models (type-safe version)."""
size = os.path.getsize(path) if os.path.exists(path) else 0
sha256 = _file_sha256(path) if os.path.exists(path) else ""
size, sha256 = _resolve_component_size_and_sha256(path, metadata)

# Start with basic properties
props = [Property(name="size", value=str(size))]
Expand Down Expand Up @@ -219,8 +261,7 @@ def _component_for_file(
metadata: dict[str, Any],
issues: Iterable[dict[str, Any]],
) -> Component:
size = os.path.getsize(path)
sha256 = _file_sha256(path)
size, sha256 = _resolve_component_size_and_sha256(path, metadata)
props = [Property(name="size", value=str(size))]

# Compute risk score based on issues related to this file
Expand Down Expand Up @@ -321,7 +362,7 @@ def _component_for_file(
name=os.path.basename(path),
bom_ref=path,
type=component_type,
hashes=[HashType.from_hashlib_alg("sha256", sha256)],
hashes=[HashType.from_hashlib_alg("sha256", sha256)] if sha256 else [],
properties=props,
)

Expand Down Expand Up @@ -350,6 +391,8 @@ def generate_sbom(paths: Iterable[str], results: dict[str, Any] | Any) -> str:
for root, _, files in os.walk(input_path):
for f in files:
fp = os.path.join(root, f)
if _should_skip_sbom_file(fp):
continue
Comment on lines +394 to +395
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Apply the SBOM skip filter to single-file inputs too.

This only filters directory walks. In the new streamed flow, paths_for_sbom is a flat list of asset files, so the else branches below still emit .metadata/.gitignore/lock files if one reaches the asset list.

Proposed fix
         else:
+            if _should_skip_sbom_file(input_path):
+                continue
             meta_model = file_meta.get(input_path)
             # Convert Pydantic model to dict if needed
             if meta_model is not None and hasattr(meta_model, "model_dump"):
                 meta = meta_model.model_dump()
             else:
                 meta = meta_model or {}
             component = _component_for_file(input_path, meta, issues_dicts)
             bom.components.add(component)
         else:
+            if _should_skip_sbom_file(input_path):
+                continue
             metadata = file_metadata.get(input_path)
             component = _component_for_file_pydantic(input_path, metadata, issues)
             bom.components.add(component)

Also applies to: 436-437

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@modelaudit/integrations/sbom_generator.py` around lines 394 - 395, The SBOM
skip predicate (_should_skip_sbom_file) is only applied when walking
directories, so the flat list paths_for_sbom and the single-file/else branches
still include .metadata/.gitignore/lock files; update the code paths that
iterate over paths_for_sbom and the single-file emission branch to call
_should_skip_sbom_file(fp) (or the local file variable) and continue when it
returns True, ensuring the same skip behavior for streamed/flat inputs as for
directory walks.

meta_model = file_meta.get(fp)
# Convert Pydantic model to dict if needed
if meta_model is not None and hasattr(meta_model, "model_dump"):
Expand Down Expand Up @@ -390,6 +433,8 @@ def generate_sbom_pydantic(paths: Iterable[str], results: ModelAuditResultModel)
for root, _, files in os.walk(input_path):
for f in files:
fp = os.path.join(root, f)
if _should_skip_sbom_file(fp):
continue
metadata = file_metadata.get(fp)
component = _component_for_file_pydantic(fp, metadata, issues)
bom.components.add(component)
Expand Down
179 changes: 141 additions & 38 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import json
import os
import re
from pathlib import Path
from unittest.mock import Mock, patch
from unittest.mock import patch

import pytest
from click.testing import CliRunner

from modelaudit import __version__
from modelaudit.cli import cli, expand_paths, format_text_output
from modelaudit.core import scan_model_directory_or_file
from modelaudit.models import create_initial_audit_result


Expand Down Expand Up @@ -68,6 +68,15 @@ def create_mock_scan_result(**kwargs):
if "scanners" in kwargs:
result.scanner_names = kwargs["scanners"]

# Add file metadata if provided
if "file_metadata" in kwargs:
from modelaudit.models import FileMetadataModel

result.file_metadata = {
path: metadata if isinstance(metadata, FileMetadataModel) else FileMetadataModel(**metadata)
for path, metadata in kwargs["file_metadata"].items()
}

result.finalize_statistics()
return result

Expand Down Expand Up @@ -779,58 +788,152 @@ def file_generator():
@patch("modelaudit.cli.is_huggingface_url")
@patch("modelaudit.utils.sources.huggingface.download_model_streaming")
@patch("modelaudit.core.scan_model_streaming")
def test_scan_huggingface_streaming_sbom_contains_all_components(
mock_scan_streaming: Mock, mock_download_streaming: Mock, mock_is_hf_url: Mock, tmp_path: Path
) -> None:
"""Regression test for issue #671: --stream should still produce full SBOM components."""
def test_scan_huggingface_streaming_sbom_includes_streamed_assets(
mock_scan_streaming, mock_download_streaming, mock_is_hf_url, tmp_path
):
Comment on lines +791 to +793
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Add the required test annotations.

These new tests are missing the repo-required -> None return annotations and tmp_path: Path parameter typing.

As per coding guidelines, "Use type hints -> None on all test methods and tmp_path: Path / monkeypatch: pytest.MonkeyPatch on test parameters".

Also applies to: 868-870, 919-919

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/test_cli.py` around lines 791 - 793, The test function
test_scan_huggingface_streaming_sbom_includes_streamed_assets is missing the
required typing: add a return type annotation "-> None" and annotate the
tmp_path parameter as "tmp_path: Path" (import Path from pathlib if not
already). Apply the same change (add "-> None" and type tmp_path as Path or
monkeypatch as pytest.MonkeyPatch where appropriate) to the other test functions
flagged in the review so all tests follow the repo convention.

"""Streaming scans should build SBOM components from discovered artifacts."""
mock_is_hf_url.return_value = True

# The generator itself is not consumed in this test because scan_model_streaming is mocked.
streamed_files = []
file_hashes = {}
file_sizes = {}
for name in ("config.json", "model.safetensors", "tokenizer.json"):
file_path = tmp_path / name
content = f"streamed content for {name}".encode()
file_path.write_bytes(content)
streamed_files.append(file_path)
import hashlib

file_hashes[str(file_path)] = hashlib.sha256(content).hexdigest()
file_sizes[str(file_path)] = len(content)

def file_generator():
yield (tmp_path / "model-00001-of-00002.safetensors", False)
yield (tmp_path / "model-00002-of-00002.safetensors", True)
for index, file_path in enumerate(streamed_files):
yield (file_path, index == len(streamed_files) - 1)

mock_download_streaming.return_value = file_generator()

streamed_assets = [
{
"path": str(tmp_path / "model-00001-of-00002.safetensors"),
"type": "safetensors",
"size": 123,
},
{
"path": str(tmp_path / "model-00002-of-00002.safetensors"),
"type": "safetensors",
"size": 456,
mock_result = create_mock_scan_result(
bytes_scanned=sum(file_path.stat().st_size for file_path in streamed_files),
files_scanned=len(streamed_files),
has_errors=False,
assets=[
{
"path": str(file_path),
"type": "streamed",
"size": file_sizes[str(file_path)],
}
for file_path in streamed_files
],
file_metadata={
str(file_path): {
"file_size": file_sizes[str(file_path)],
"file_hashes": {"sha256": file_hashes[str(file_path)]},
}
for file_path in streamed_files
},
]
mock_scan_streaming.return_value = create_mock_scan_result(
bytes_scanned=579,
files_scanned=2,
assets=streamed_assets,
)
mock_scan_streaming.return_value = mock_result

for file_path in streamed_files:
file_path.unlink()

sbom_file = tmp_path / "streamed.sbom.json"

sbom_file = tmp_path / "streaming_sbom.json"
runner = CliRunner()
result = runner.invoke(
cli,
[
"scan",
"--stream",
"--sbom",
str(sbom_file),
"https://huggingface.co/test/model",
result = runner.invoke(cli, ["scan", "--stream", "--quiet", "--sbom", str(sbom_file), "hf://test/model"])

assert result.exit_code == 0
assert sbom_file.exists()
sbom_data = json.loads(sbom_file.read_text())
components = {component["name"]: component for component in sbom_data["components"]}

assert set(components) == {file_path.name for file_path in streamed_files}
assert "model" not in components

for file_path in streamed_files:
component = components[file_path.name]
properties = {prop["name"]: prop["value"] for prop in component["properties"]}

assert properties["size"] == str(file_sizes[str(file_path)])
assert component["hashes"][0]["alg"] == "SHA-256"
assert component["hashes"][0]["content"] == file_hashes[str(file_path)]
Comment on lines +854 to +860
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Don't couple the test to hash ordering.

component["hashes"][0] makes this test fail if SBOM generation adds another hash or reorders the list while still emitting the correct SHA-256. Match the hash by alg instead.

Suggested assertion change
-        assert component["hashes"][0]["alg"] == "SHA-256"
-        assert component["hashes"][0]["content"] == file_hashes[str(file_path)]
+        sha256_hash = next(hash_entry for hash_entry in component["hashes"] if hash_entry["alg"] == "SHA-256")
+        assert sha256_hash["content"] == file_hashes[str(file_path)]
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/test_cli.py` around lines 855 - 861, The test currently assumes SHA-256
is at index 0 via component["hashes"][0], which breaks if hashes are reordered
or expanded; update the loop in tests/test_cli.py that iterates streamed_files
(using variables file_path, component, properties) to locate the hash entry with
alg == "SHA-256" (e.g. use a generator/filter or next(...) to find the matching
dict) and assert its "content" equals file_hashes[str(file_path)]; also fail the
test with a clear message if no SHA-256 entry is found instead of indexing into
position 0.



@patch("modelaudit.cli.is_huggingface_url")
@patch("modelaudit.cli.is_huggingface_file_url", return_value=False)
@patch("modelaudit.cli.download_model")
@patch("modelaudit.cli.scan_model_directory_or_file")
def test_scan_huggingface_sbom_excludes_download_cache_files(
mock_scan, mock_download, mock_is_hf_file_url, mock_is_hf_url, tmp_path
):
"""Remote SBOM generation should ignore HuggingFace cache bookkeeping files."""
mock_is_hf_url.return_value = True

downloaded_dir = tmp_path / "gpt2"
downloaded_dir.mkdir()
real_files = {
downloaded_dir / "config.json": b'{"model_type":"gpt2"}',
downloaded_dir / "merges.txt": b"merge rules",
downloaded_dir / "model.safetensors": b"weights",
}
for file_path, content in real_files.items():
file_path.write_bytes(content)

cache_dir = downloaded_dir / ".cache" / "huggingface" / "download"
cache_dir.mkdir(parents=True)
(cache_dir / "config.json.metadata").write_text("{}")
(cache_dir / ".gitignore").write_text("*\n")

mock_download.return_value = downloaded_dir
mock_scan.return_value = create_mock_scan_result(
bytes_scanned=sum(len(content) for content in real_files.values()),
files_scanned=len(real_files),
has_errors=False,
assets=[
{
"path": str(file_path),
"type": "streamed",
"size": len(content),
}
for file_path, content in real_files.items()
],
)

sbom_file = tmp_path / "downloaded.sbom.json"

runner = CliRunner()
result = runner.invoke(cli, ["scan", "--quiet", "--sbom", str(sbom_file), "hf://test/model"])

assert result.exit_code == 0
assert sbom_file.exists()

sbom_json = json.loads(sbom_file.read_text(encoding="utf-8"))
component_names = {component["name"] for component in sbom_json.get("components", [])}
sbom_data = json.loads(sbom_file.read_text())
component_names = {component["name"] for component in sbom_data["components"]}

assert component_names == {file_path.name for file_path in real_files}
assert "config.json.metadata" not in component_names
assert ".gitignore" not in component_names


def test_scan_directory_skips_huggingface_cache_bookkeeping(tmp_path):
"""Directory scans should not surface HuggingFace cache bookkeeping files as assets."""
model_dir = tmp_path / "model"
model_dir.mkdir()
(model_dir / "config.json").write_text('{"model_type":"gpt2"}')
(model_dir / "model.safetensors").write_bytes(b"weights")

cache_dir = model_dir / ".cache" / "huggingface" / "download"
cache_dir.mkdir(parents=True)
(cache_dir / "config.json.metadata").write_text("{}")
(cache_dir / ".gitignore").write_text("*\n")

result = scan_model_directory_or_file(str(model_dir))

assert "model-00001-of-00002.safetensors" in component_names
assert "model-00002-of-00002.safetensors" in component_names
asset_names = {os.path.basename(asset.path) for asset in result.assets}
assert "config.json" in asset_names
assert "model.safetensors" in asset_names
assert "config.json.metadata" not in asset_names
assert ".gitignore" not in asset_names


@patch("modelaudit.cli.is_huggingface_url")
Expand Down
Loading