Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 78 additions & 19 deletions argus/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,35 @@ def update_message(self, message: str) -> None:
self._stream.flush()


def _canonical_container_metadata(result) -> dict:
"""Build the metadata dict for a container ScanResult row.

Surfaces the originating Dockerfile + context for build-mode
targets so security reviewers and audit archives can trace any
finding back to its source. Empty dockerfile/context fields
(remote-pull entries) are *omitted* rather than written as empty
strings so downstream readers don't have to special-case them.

Extracted from the inline list comprehension that builds
``canonical_results`` in ``_cmd_container_scan`` so the dict
shape is unit-testable without spinning up the full container
engine.
"""
metadata: dict = {
"image_ref": result.image_ref,
"build_success": result.build_success,
}
if getattr(result, "dockerfile", ""):
metadata["dockerfile_path"] = result.dockerfile
if getattr(result, "context", ""):
metadata["context_path"] = result.context
if getattr(result, "scanner_errors", None):
metadata["scanner_errors"] = dict(result.scanner_errors)
if getattr(result, "scan_error", None):
metadata["scan_error"] = result.scan_error
return metadata


def _configure_logger(args: argparse.Namespace, output_dir: str | None = None):
"""Set up the ``argus`` logger at the level the user's flags imply.

Expand Down Expand Up @@ -1909,6 +1938,33 @@ def _cmd_container_scan(
output_dir = _make_run_dir(base_dir)
formats = args.formats or ["terminal", "markdown"]

# Now that we know the output dir, re-attach the logger's file
# handler so engine logs land in <output_dir>/argus.log alongside
# the rest of the audit trail — same shape as the source-scan
# path. ``_configure_logger`` is idempotent on the stream handler;
# passing output_dir adds the file handler when it's missing.
log = _configure_logger(args, output_dir=output_dir)

# Audit manifest — captures the exact targets, config path, and
# outcome so a security reviewer (or CI archive) can trace any
# finding back to its inputs without cross-referencing the
# workflow. Source scans have always done this; container scans
# used to skip it entirely. ``scan_targets`` lists the originating
# source per target — Dockerfile path for build-mode entries,
# image ref for remote-pull entries — so the audit manifest
# answers "which container produced this artifact?" by name.
from argus.audit import create_manifest, finalize_manifest
from argus.container.discovery import parse_container_config
_audit_targets = [
str(t.dockerfile) if t.dockerfile else t.image_ref
for t in parse_container_config(config)
]
manifest = create_manifest(
config_path=getattr(args, "config", None),
scan_targets=_audit_targets,
)
manifest.execution_backend = config.get("backend", "auto")

# Decide whether to persist raw per-scanner outputs alongside the
# canonical argus-results.json. Default is ON — the user just ran
# a scan and would expect those artifacts to be available for
Expand Down Expand Up @@ -1944,6 +2000,7 @@ def _cmd_container_scan(
summary = engine.run()
except Exception as exc:
print(f"Error: container scan failed: {exc}", file=sys.stderr)
finalize_manifest(manifest, exit_code=EXIT_ERROR, output_dir=output_dir)
return EXIT_ERROR

# Build a canonical ScanSummary view of the container results so
Expand All @@ -1958,18 +2015,7 @@ def _cmd_container_scan(
ScanResult(
scanner=f"container/{r.name}",
findings=list(r.combined_findings),
metadata={
"image_ref": r.image_ref,
"build_success": r.build_success,
**(
{"scanner_errors": dict(r.scanner_errors)}
if r.scanner_errors else {}
),
**(
{"scan_error": r.scan_error}
if getattr(r, "scan_error", None) else {}
),
},
metadata=_canonical_container_metadata(r),
)
for r in summary.results
]
Expand Down Expand Up @@ -2009,16 +2055,29 @@ def _cmd_container_scan(
f"\n{scan_failures} scanner failure(s) — results are incomplete",
file=sys.stderr,
)
return EXIT_ERROR

if args.severity_threshold and args.severity_threshold != "none":
exit_code = EXIT_ERROR
elif args.severity_threshold and args.severity_threshold != "none":
from argus.core.models import Severity
threshold = Severity.from_string(args.severity_threshold)
exit_code = EXIT_SUCCESS
for r in summary.results:
for f in r.combined_findings:
if f.severity >= threshold:
return EXIT_FINDINGS
return EXIT_SUCCESS
if any(f.severity >= threshold for f in r.combined_findings):
exit_code = EXIT_FINDINGS
break
else:
exit_code = EXIT_SUCCESS

# Finalize the audit manifest with the canonical summary so the
# archived ``argus-audit.json`` reflects what actually ran. Same
# shape as the source-scan flow.
finalize_manifest(
manifest,
summary=canonical_summary,
exit_code=exit_code,
output_dir=output_dir,
)
log.info("Audit manifest written to %s/argus-audit.json", output_dir)
return exit_code


def _cmd_dast_scan(args: argparse.Namespace) -> int:
Expand Down
6 changes: 6 additions & 0 deletions argus/container/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,8 @@ def _elapsed() -> int:
return ContainerScanResult(
name=target.name,
image_ref=target.image_ref,
dockerfile=str(target.dockerfile) if target.dockerfile else "",
context=str(target.context) if target.context else "",
build_success=False,
scan_error=error_msg,
)
Expand Down Expand Up @@ -206,13 +208,17 @@ def _elapsed() -> int:
return ContainerScanResult(
name=target.name,
image_ref=target.image_ref,
dockerfile=str(target.dockerfile) if target.dockerfile else "",
context=str(target.context) if target.context else "",
scan_error=f"OS error: {exc}",
)
except Exception:
logger.exception("Scan failed for %s", target.name)
return ContainerScanResult(
name=target.name,
image_ref=target.image_ref,
dockerfile=str(target.dockerfile) if target.dockerfile else "",
context=str(target.context) if target.context else "",
scan_error=f"Scan failed for {target.image_ref}",
)

Expand Down
16 changes: 15 additions & 1 deletion argus/container/scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,23 @@

@dataclass
class ContainerScanResult:
"""Results for a single container image scan."""
"""Results for a single container image scan.

``dockerfile`` and ``context`` capture the source the image was
built from — empty strings for remote-pull entries, populated for
local builds. Without these, downstream artifacts (argus-results.
json, per-image markdown, SARIF, audit manifest) only carry the
auto-derived tag like ``scanner-bandit:argus-scan``, which is
meaningless to a security reviewer asking "which Dockerfile
produced this finding?". Plumbing them through here lets every
consumer surface a real source path alongside the image.
"""

name: str
image_ref: str
digest: str = ""
dockerfile: str = ""
context: str = ""
trivy_findings: list[Finding] = field(default_factory=list)
grype_findings: list[Finding] = field(default_factory=list)
combined_findings: list[Finding] = field(default_factory=list)
Expand Down Expand Up @@ -205,6 +217,8 @@ def scan_image(
return ContainerScanResult(
name=target.name,
image_ref=target.image_ref,
dockerfile=str(target.dockerfile) if target.dockerfile else "",
context=str(target.context) if target.context else "",
trivy_findings=trivy_findings,
grype_findings=grype_findings,
combined_findings=combined,
Expand Down
60 changes: 60 additions & 0 deletions argus/tests/test_container_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,66 @@ def test_build_failure_defaults(self):
assert result.total_count == 0


class TestContainerScanResultDockerfileFields:
"""``dockerfile`` / ``context`` should flow with the result so a security
reviewer can trace any artifact back to its source without cross-
referencing the workflow."""

def test_remote_pull_entry_leaves_dockerfile_empty(self):
# Default (remote pull) — both empty strings.
result = ContainerScanResult(name="x", image_ref="x:1")
assert result.dockerfile == ""
assert result.context == ""

def test_build_entry_carries_dockerfile_and_context(self):
result = ContainerScanResult(
name="myapp",
image_ref="myapp:argus-scan",
dockerfile="docker/Dockerfile.app",
context=".",
)
assert result.dockerfile == "docker/Dockerfile.app"
assert result.context == "."


class TestCanonicalContainerMetadata:
"""The cli helper that maps ContainerScanResult → ScanResult metadata.

Locks in the dict shape so security reviewers and the audit-archive
layer always see ``dockerfile_path`` for build-mode targets.
"""

def test_remote_pull_omits_dockerfile_keys(self):
from argus.cli import _canonical_container_metadata
result = ContainerScanResult(name="x", image_ref="x:1")
meta = _canonical_container_metadata(result)
assert meta["image_ref"] == "x:1"
assert meta["build_success"] is True
assert "dockerfile_path" not in meta
assert "context_path" not in meta

def test_build_entry_includes_dockerfile_path(self):
from argus.cli import _canonical_container_metadata
result = ContainerScanResult(
name="myapp",
image_ref="myapp:argus-scan",
dockerfile="docker/Dockerfile.app",
context=".",
)
meta = _canonical_container_metadata(result)
assert meta["dockerfile_path"] == "docker/Dockerfile.app"
assert meta["context_path"] == "."

def test_scanner_errors_surfaced(self):
from argus.cli import _canonical_container_metadata
result = ContainerScanResult(
name="x", image_ref="x:1",
scanner_errors={"trivy": "DB pull failed"},
)
meta = _canonical_container_metadata(result)
assert meta["scanner_errors"] == {"trivy": "DB pull failed"}


class TestDeduplicateFindings:
"""Test deduplicate_findings merging logic."""

Expand Down
132 changes: 132 additions & 0 deletions argus/tests/test_container_scanner_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -549,3 +549,135 @@ def fake_grype(image_ref, tmp_path, local=False):
# Combined view doesn't claim grype's missing data is "no
# vulnerabilities" — it's simply trivy's findings.
assert len(result.combined_findings) == 1


# ───────────────────────────────────────────────
# Engine error-path dockerfile propagation
# ───────────────────────────────────────────────


class TestEngineErrorPathsCarryDockerfile:
"""Every engine path that builds a ContainerScanResult must
carry the originating Dockerfile + context, including the failure
paths. Otherwise a build error or an OS error would lose the
dockerfile reference and a security reviewer couldn't trace which
container the error belongs to.
"""

def _engine(self):
from argus.container.engine import ContainerEngine
return ContainerEngine({})

def _build_target(self, tmp_path):
from argus.container.discovery import ContainerTarget
return ContainerTarget(
name="myapp",
image_ref="myapp:argus-scan",
dockerfile=tmp_path / "Dockerfile",
context=tmp_path,
)

def test_build_failure_preserves_dockerfile(self, tmp_path, monkeypatch):
target = self._build_target(tmp_path)
# Pretend build failed.
monkeypatch.setattr(
"argus.container.engine.build_image", lambda t: False,
)
# Disk-space probe — return enough to avoid the OOD message branch.
monkeypatch.setattr(
"argus.container.engine.check_disk_space", lambda: 10 * 1024**3,
)
result = self._engine()._process_target(target)
assert result.build_success is False
assert result.dockerfile == str(tmp_path / "Dockerfile")
assert result.context == str(tmp_path)

def test_oserror_during_scan_preserves_dockerfile(self, tmp_path, monkeypatch):
target = self._build_target(tmp_path)
monkeypatch.setattr(
"argus.container.engine.build_image", lambda t: True,
)
monkeypatch.setattr(
"argus.container.engine.scan_image",
lambda *a, **kw: (_ for _ in ()).throw(OSError("disk full")),
)
result = self._engine()._process_target(target)
assert "OS error" in result.scan_error
assert result.dockerfile == str(tmp_path / "Dockerfile")
assert result.context == str(tmp_path)

def test_generic_exception_during_scan_preserves_dockerfile(
self, tmp_path, monkeypatch,
):
target = self._build_target(tmp_path)
monkeypatch.setattr(
"argus.container.engine.build_image", lambda t: True,
)
monkeypatch.setattr(
"argus.container.engine.scan_image",
lambda *a, **kw: (_ for _ in ()).throw(RuntimeError("oops")),
)
result = self._engine()._process_target(target)
assert "Scan failed" in result.scan_error
assert result.dockerfile == str(tmp_path / "Dockerfile")
assert result.context == str(tmp_path)


# ───────────────────────────────────────────────
# scan_image happy-path threading
# ───────────────────────────────────────────────


class TestScanImageThreadsDockerfile:
"""The happy-path ContainerScanResult from ``scan_image`` must
also carry dockerfile/context from the target."""

def test_scan_image_populates_dockerfile_from_target(
self, tmp_path, monkeypatch,
):
from argus.container.discovery import ContainerTarget
from argus.container.scanner import scan_image

target = ContainerTarget(
name="myapp",
image_ref="myapp:argus-scan",
dockerfile=tmp_path / "Dockerfile.x",
context=tmp_path,
)

# Stub out the actual scanners — we only need scan_image to
# construct the result and return.
monkeypatch.setattr(
"argus.container.scanner._run_trivy",
lambda *a, **kw: [],
)
monkeypatch.setattr(
"argus.container.scanner._run_grype",
lambda *a, **kw: [],
)

result = scan_image(target, scanners=("trivy", "grype"))
assert result.dockerfile == str(tmp_path / "Dockerfile.x")
assert result.context == str(tmp_path)

def test_scan_image_remote_pull_leaves_dockerfile_empty(
self, tmp_path, monkeypatch,
):
from argus.container.discovery import ContainerTarget
from argus.container.scanner import scan_image

# Remote-pull entry — no dockerfile, no context.
target = ContainerTarget(name="webapp", image_ref="myorg/webapp:1.0")

monkeypatch.setattr(
"argus.container.scanner._run_trivy",
lambda *a, **kw: [],
)
monkeypatch.setattr(
"argus.container.scanner._run_grype",
lambda *a, **kw: [],
)

result = scan_image(target, scanners=("trivy", "grype"))
assert result.dockerfile == ""
assert result.context == ""
Loading