diff --git a/.ai/workflows.yaml b/.ai/workflows.yaml index 69d6cc33..1f003018 100644 --- a/.ai/workflows.yaml +++ b/.ai/workflows.yaml @@ -275,8 +275,13 @@ contributor_workflows: steps: - action: "Create scanner module" location: "argus/scanners/{name}.py" - purpose: "Implement Scanner protocol — name, scan(path, config), is_available(), install_command()" + purpose: "Implement Scanner protocol — name, scan(), build_args(), is_available(), install_command(), tool_version(), parse_results()" template: "argus/scanners/bandit.py" + helpers: + scan_template: "Use argus.core.scanner_template.run_subprocess_scan(self, path, config) — collapses the tempdir+subprocess+output-file+error-handling boilerplate into a one-line scan() body. Each scanner declares the WHAT (build_args, parse_results) instead of the HOW (subprocess plumbing). Skip when the tool runs multiple binaries or emits to stdout (see grype.py, clamav.py, supply_chain.py for the bypass shape)." + build_args: "Implement build_args(self, paths: ScanPaths, config: dict) — single source of truth for both local and container execution. Returns the FULL argv (binary name as args[0]). Engine drops args[0] when ``container_entrypoint`` is declared on the class (i.e. the image has ENTRYPOINT). Replaces the legacy _build_command + container_args pair, which used to drift." + tool_version: "Use argus.core.version.parse_tool_version(cmd, regex) — collapses the ~17-line subprocess+regex+exception boilerplate into one line. Only fall back to custom parsing when the tool emits structured output (JSON, etc.) — see grype.py" + secret_redaction: "If raw output ever contains source-code literals (passwords, code excerpts), use argus.core.redact helpers and add a regression test (see CLAUDE.md → secret leak audit)" - action: "Register in SCANNER_REGISTRY" location: "argus/scanners/__init__.py" diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index e448547e..0e7abef6 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -100,94 +100,88 @@ This is the primary way to add a new scanner. Each scanner is a single Python mo ### Step 1: Create the Scanner Module -Create `argus/scanners/my_scanner.py` implementing the `Scanner` protocol: +Create `argus/scanners/my_scanner.py`. The SDK provides two helpers — `parse_tool_version` (for `tool_version()`) and `run_subprocess_scan` (for `scan()`) — so a typical scanner is ~50 lines: ```python """My Scanner - brief description of what it scans.""" import json import shutil -import subprocess -import tempfile from pathlib import Path from argus.containers import get_image from argus.core.models import Finding, ScanResult, Severity +from argus.core.scanner_template import ScanPaths, run_subprocess_scan +from argus.core.version import parse_tool_version class MyScanner: """Wraps MyTool to scan for security issues.""" name = "my-scanner" + description = "What it scans, in one line" + category = "sast" # or "secrets", "iac", "sca", "container", "linter", ... container_image = get_image("my-scanner") + # Set this if your container image declares ``ENTRYPOINT ["my-tool"]``. + # The engine drops argv[0] when this attr is present, so build_args + # can return the same argv shape for both local and container paths. + container_entrypoint = "my-tool" def scan(self, path: str, config: dict | None = None) -> ScanResult: - """Run the scanner against the given path and return results.""" - config = config or {} - - with tempfile.TemporaryDirectory() as tmp_dir: - output_file = Path(tmp_dir) / "results.json" - cmd = self._build_command(path, output_file, config) - - result = subprocess.run(cmd, capture_output=True, text=True) - - if result.returncode != 0: - return ScanResult( - scanner=self.name, - metadata={"error": result.stderr.strip()}, - ) - - if not output_file.exists(): - return ScanResult( - scanner=self.name, - metadata={"error": "No output file produced"}, - ) - - findings = self.parse_results(output_file) - return ScanResult( - scanner=self.name, - findings=findings, - raw_report=output_file, - ) + """One-line wrapper — the template handles tempdir, subprocess, errors.""" + return run_subprocess_scan(self, path, config) + + def build_args(self, paths: ScanPaths, config: dict) -> list[str]: + """Build the FULL argv (binary name as args[0]). + + Single source of truth for both local and container execution. + Local path: ``paths.workspace`` is the host path the user gave; + container path: ``paths.workspace`` is ``/workspace`` and + ``paths.output`` is ``/output/results.json``. The engine handles + the path translation; this method just consumes whatever paths + it's handed. + """ + args = [ + "my-tool", "scan", + paths.workspace, + "--format", "json", + "--output", paths.output, + ] + if config.get("exclude"): + args.extend(["--exclude", config["exclude"]]) + return args def is_available(self) -> bool: - """Check if the underlying tool is installed.""" return shutil.which("my-tool") is not None def install_command(self) -> str | None: - """Return the shell command to install the tool, or None.""" return "pip install my-tool" - def container_args(self, config: dict | None = None) -> list[str]: - """Return CLI args for running the tool in a container.""" - return [ - "my-tool", "scan", "/workspace", - "-f", "json", - "-o", "/output/results.json", - ] + def tool_version(self) -> str | None: + if not self.is_available(): + return None + # Tool output: "my-tool X.Y.Z (build info)" + return parse_tool_version(["my-tool", "--version"], r"^my-tool (\S+)") def parse_results(self, raw_output_path: Path) -> list[Finding]: """Parse tool output into normalized Finding objects.""" data = json.loads(raw_output_path.read_text()) - return [self._parse_finding(item) for item in data.get("results", [])] + return [ + Finding( + id=item.get("rule_id", "UNKNOWN"), + severity=Severity.from_string(item.get("severity", "UNKNOWN")), + title=item.get("message", ""), + description=item.get("message", ""), + location=item.get("file", ""), + scanner=self.name, + ) + for item in data.get("results", []) + ] +``` - def _parse_finding(self, item: dict) -> Finding: - """Convert a single result into a Finding.""" - return Finding( - id=item.get("rule_id", "UNKNOWN"), - severity=Severity.from_string(item.get("severity", "UNKNOWN")), - title=item.get("message", ""), - description=item.get("message", ""), - location=item.get("file", ""), - scanner=self.name, - ) +**Why this shape**: `build_args(paths, config)` replaces the historical pair of `_build_command(path, output, config)` (local) + `container_args(config)` (container). The two used to drift — different `--output` vs `--output-file` flag names, different exit-code handling — and the engine had to know about both. With one method that takes a `ScanPaths` value object, local and container execution share the same definition; the engine builds the right `ScanPaths` for the context (host paths locally, `/workspace` + `/output/...` in containers) and the scanner stays oblivious. - def _build_command( - self, path: str, output_file: Path, config: dict - ) -> list[str]: - """Build the CLI command.""" - return ["my-tool", "scan", path, "-f", "json", "-o", str(output_file)] -``` +**When to skip the template**: if your tool emits results to stdout instead of a file (`grype version -o json`, ClamAV's text output) or runs an orchestration of multiple binaries (`supply_chain` → zizmor + actionlint), write a custom `scan()`. The template doesn't fit every shape and shouldn't be forced — see `grype.py`, `clamav.py`, `supply_chain.py` for examples. **Scanner protocol requirements** (from `argus/core/scanner.py`): @@ -197,9 +191,11 @@ class MyScanner: | `scan(path, config) -> ScanResult` | Yes | Run the scanner and return normalized results | | `is_available() -> bool` | Yes | Check if the tool is installed locally | | `install_command() -> str \| None` | Yes | Shell command to install the tool | +| `tool_version() -> str \| None` | Recommended | Installed tool version. Use `parse_tool_version()` from `argus.core.version` for the common case (regex match against ` --version` output) — see `bandit.py`, `clamav.py`, `trivy.py`, `gitleaks.py` for examples. Only fall back to custom parsing for tools with structured output (JSON, etc.) — see `grype.py` | +| `build_args(paths, config) -> list[str]` | Yes (subprocess scanners) | Build the full argv for the tool. Used by both local and container execution paths — single source of truth. `paths` is a `ScanPaths(workspace, output)` value object. Drop `argv[0]` automatically when the image has an `ENTRYPOINT` — declare `container_entrypoint = ""` on the class | | `container_image: str` | Optional | Docker image for container fallback | -| `container_args(config) -> list[str]` | Optional | CLI args for containerized execution | -| `parse_results(path) -> list[Finding]` | Optional | Parse raw output file into findings | +| `container_entrypoint: str` | Optional | Set when the container image has `ENTRYPOINT [""]`; engine drops `argv[0]` from `build_args` output | +| `parse_results(path) -> list[Finding]` | Yes | Parse raw output file into findings. May return `(list[Finding], dict)` to attach scanner-level metadata (e.g. `passed_count` for linters) | **Reference implementation**: See `argus/scanners/bandit.py` for a complete, well-documented example. diff --git a/argus.yml b/argus.yml index 73e2bafa..d2dcce45 100644 --- a/argus.yml +++ b/argus.yml @@ -36,12 +36,12 @@ scanners: lint-dockerfile: enabled: true - # Not applicable to this repo as part of the default scan: + # Not applicable to this repo as part of the default source scan: # - trivy-iac / checkov target Terraform / CloudFormation / K8s — none here # - container / zap operate on built images / running endpoints, not - # source paths; run them on demand: - # argus scan container --discover . - # argus scan zap --target http://... + # source paths. Container targets are configured in the + # ``containers:`` block below and run via ``argus scan container``; + # ZAP is on-demand: ``argus scan zap --target http://...`` trivy-iac: enabled: false checkov: @@ -51,6 +51,28 @@ scanners: zap: enabled: false +# Container lifecycle targets — drives ``argus scan container`` without +# needing CLI flags. Each entry is a Dockerfile-built image that the +# scanner builds locally (so the scan reflects exactly the bytes that +# ship from this branch) and then hands to trivy + grype + syft. The +# build-containers workflow can read this same list rather than +# hard-coding a parallel matrix — single source of truth. +containers: + images: + - image: ghcr.io/huntridge-labs/argus/scanner-bandit:dev + dockerfile: docker/Dockerfile.bandit + context: . + - image: ghcr.io/huntridge-labs/argus/scanner-opengrep:dev + dockerfile: docker/Dockerfile.opengrep + context: . + - image: ghcr.io/huntridge-labs/argus/scanner-supply-chain:dev + dockerfile: docker/Dockerfile.supply-chain + context: . + - image: ghcr.io/huntridge-labs/argus/cli:dev + dockerfile: docker/Dockerfile.cli + context: . + scanners: [trivy, grype, syft] + reporting: formats: - terminal diff --git a/argus/core/engine.py b/argus/core/engine.py index 703d5f9a..3f7ab971 100644 --- a/argus/core/engine.py +++ b/argus/core/engine.py @@ -674,7 +674,24 @@ def _run_in_container( docker_cmd.extend(["--entrypoint", entrypoint]) logger.debug("Overriding entrypoint: %s", entrypoint) - container_args = scanner.container_args(config) + # Prefer the unified ``build_args(ScanPaths)`` shape (single + # source of truth for both local and container CLI args). + # Fall back to legacy ``container_args(config)`` for scanners + # not yet migrated. Once every scanner declares + # ``build_args``, the legacy branch and container_args + # method go away. + if hasattr(scanner, "build_args"): + from argus.core.scanner_template import ScanPaths + paths = ScanPaths( + workspace="/workspace", + output="/output/results.json", + ) + container_args = scanner.build_args(paths, config or {}) + # ENTRYPOINT-based images supply the binary; drop argv[0]. + if getattr(scanner, "container_entrypoint", None): + container_args = container_args[1:] + else: + container_args = scanner.container_args(config) docker_cmd.extend([image] + container_args) logger.debug( diff --git a/argus/core/models.py b/argus/core/models.py index c41c13a6..df72f230 100644 --- a/argus/core/models.py +++ b/argus/core/models.py @@ -1,5 +1,6 @@ """Core data models for Argus scan results.""" +import functools from dataclasses import dataclass, field, asdict from enum import Enum from typing import Optional @@ -31,6 +32,7 @@ } +@functools.total_ordering class Severity(Enum): """Security finding severity levels with comparison support.""" @@ -57,21 +59,6 @@ def from_string(cls, value: str) -> "Severity": def _order(self) -> int: return _SEVERITY_ORDER[self.value] - def __ge__(self, other: "Severity") -> bool: - if not isinstance(other, Severity): - return NotImplemented - return self._order >= other._order - - def __gt__(self, other: "Severity") -> bool: - if not isinstance(other, Severity): - return NotImplemented - return self._order > other._order - - def __le__(self, other: "Severity") -> bool: - if not isinstance(other, Severity): - return NotImplemented - return self._order <= other._order - def __lt__(self, other: "Severity") -> bool: if not isinstance(other, Severity): return NotImplemented diff --git a/argus/core/scanner_template.py b/argus/core/scanner_template.py new file mode 100644 index 00000000..6c453c7f --- /dev/null +++ b/argus/core/scanner_template.py @@ -0,0 +1,190 @@ +"""Shared template for subprocess-based scanners. + +Every scanner that wraps a CLI tool used to repeat the same shape: + + with tempfile.TemporaryDirectory() as tmp_dir: + output_file = Path(tmp_dir) / "results.json" + cmd = self._build_command(path, output_file, config) + result = subprocess.run(cmd, capture_output=True, text=True) + if result.returncode != 0 and ...: + return ScanResult(error=...) + if not output_file.exists(): + return ScanResult(error=...) + findings = self.parse_results(output_file) + return ScanResult(findings=findings, ...) + +…and a *parallel* ``container_args(config)`` method that built the +same args but with ``/workspace`` and ``/output/results.json`` in place +of the local paths. The two methods drifted (different ``--output`` vs +``--output-file`` flag names; different exit-code handling) and the +boilerplate hid the per-scanner intent — what command to run, how to +parse — under 30+ lines of plumbing. + +This module collapses that into: + +* :class:`ScanPaths` — the file paths a scanner operates on, in whatever + execution environment is current. Local execution sets absolute host + paths; container execution sets ``/workspace`` and ``/output/...``. + +* :func:`run_subprocess_scan` — runs ``scanner.build_args(paths)``, + handles the exit code, reads back the output file, and produces a + uniform :class:`ScanResult`. The scanner only declares *what command + to run* (``build_args``) and *how to parse output* (``parse_results``). + +Scanners with structurally-different flows (Grype JSON-stdout, ClamAV +text-output, ZAP Docker-only, the container orchestrator) keep custom +``scan()`` implementations — the template is a tool for the common +case, not a forced abstraction. +""" + +from __future__ import annotations + +import logging +import subprocess +import tempfile +from dataclasses import dataclass +from pathlib import Path +from typing import Protocol + +from argus.core.models import Finding, ScanResult + + +logger = logging.getLogger("argus.scanner") + + +@dataclass(frozen=True) +class ScanPaths: + """File-system paths the scanner operates on, in the *current* execution environment. + + Local execution: absolute host paths. + Container execution: paths inside the container (``/workspace``, + ``/output/``). + + Deliberately narrow — only what the scanner needs to wire up + ``input → tool → output`` plumbing. Tool-specific knobs (lockfile + selection, recursive flag, exclusions, config-file paths) live in + the ``config`` dict that ``build_args`` also receives, so each + scanner declares its own config schema instead of pushing every + option through this dataclass. + """ + + workspace: str + output: str + + +class _SubprocessScanner(Protocol): + """Structural protocol the template expects of the scanner argument.""" + + name: str + + def build_args(self, paths: ScanPaths, config: dict) -> list[str]: + ... # pragma: no cover + + def parse_results(self, output_path: Path) -> list[Finding] | tuple: + ... # pragma: no cover + + +def run_subprocess_scan( + scanner: _SubprocessScanner, + path: str, + config: dict | None = None, + *, + output_filename: str = "results.json", + timeout: float | None = None, +) -> ScanResult: + """Run *scanner*'s CLI in a tempdir and return a :class:`ScanResult`. + + The scanner declares the command via ``build_args(paths)`` and the + parser via ``parse_results(output_path)``. Most security tools + intentionally exit non-zero when findings exist — that's the *happy + path*, not an error — so the template only treats a missing/empty + output file as a failure. ``parse_results`` may also legitimately + fall back to stdout for tools that don't write to disk; the + template captures stdout when no output file is produced. + + Args: + scanner: Scanner instance with ``name``, ``build_args``, + ``parse_results``. + path: Workspace path passed through to ``build_args`` as + ``ScanPaths.workspace``. + config: Per-scanner config dict (e.g. SBOM path, custom + config-file path). Forwarded as ``ScanPaths.config_file`` / + ``ScanPaths.sbom`` when present. + output_filename: Name of the output file inside the tempdir. + Most scanners emit ``results.json``; SARIF emitters override + (e.g. ``"results.sarif"``). + timeout: Optional subprocess timeout in seconds. Default: no + timeout (most scanners self-cap). + + Returns: + A :class:`ScanResult` with ``findings`` populated on success or + an ``error`` metadata key when execution fails. + """ + config = config or {} + + with tempfile.TemporaryDirectory() as tmp_dir: + output_file = Path(tmp_dir) / output_filename + paths = ScanPaths(workspace=path, output=str(output_file)) + + cmd = scanner.build_args(paths, config) + logger.debug("[%s] running: %s", scanner.name, " ".join(cmd)) + + try: + result = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=timeout, + ) + except FileNotFoundError as exc: + return ScanResult( + scanner=scanner.name, + metadata={"error": f"Tool not found: {exc.filename or cmd[0]}"}, + ) + except subprocess.TimeoutExpired: + return ScanResult( + scanner=scanner.name, + metadata={"error": f"Scanner timed out after {timeout}s"}, + ) + + if not output_file.exists(): + # Some scanners emit findings to stdout (no -o flag). When + # the planned output file isn't there but stdout has content, + # fall back to stdout — parse_results decides what to do. + stdout = (result.stdout or "").strip() + if stdout: + output_file.write_text(stdout) + else: + return ScanResult( + scanner=scanner.name, + metadata={ + "error": ( + f"No output produced (exit={result.returncode}). " + f"stderr: {(result.stderr or '').strip()[:400]}" + ), + }, + ) + + parsed = scanner.parse_results(output_file) + # ``parse_results`` may return ``list[Finding]`` (most scanners), + # a ``(list, int)`` tuple (linter passed-count channel — used by + # checkov), or a ``(list, dict)`` tuple (extra metadata). Engine + # also unpacks all three; we mirror that here so callers see the + # same shape regardless of which path produced it. + metadata: dict = {} + if isinstance(parsed, tuple): + findings = parsed[0] + extra = parsed[1] if len(parsed) > 1 else {} + if isinstance(extra, int): + metadata["passed_count"] = extra + elif isinstance(extra, dict): + metadata.update(extra) + else: + findings = parsed + + return ScanResult( + scanner=scanner.name, + findings=list(findings), + raw_report=output_file, + metadata=metadata, + ) diff --git a/argus/core/version.py b/argus/core/version.py new file mode 100644 index 00000000..77ec16fc --- /dev/null +++ b/argus/core/version.py @@ -0,0 +1,66 @@ +"""Shared helper for parsing scanner tool versions. + +Every scanner module historically rolled its own ~17-line +``tool_version()`` that ran `` --version``, swallowed all +exceptions, and parsed the output with ad-hoc string slicing. The +parsers had drifted (some checked the first line, some the last; some +stripped a leading ``v``, some did not), the exception lists had drifted +(``except (TimeoutExpired, FileNotFoundError, Exception)`` — where +``Exception`` already covers the others), and the timeouts had drifted +(5s vs 10s for no clear reason). + +``parse_tool_version`` collapses that boilerplate into a regex match: +each scanner declares its version-discovery command and a pattern, and +the helper returns the captured group or ``None`` when anything goes +wrong. Scanners whose output format doesn't fit a single regex (e.g. +Grype emits JSON via ``grype version -o json``) keep custom parsing — +the goal is a shared shape for the common case, not a forced +abstraction. +""" + +import re +import subprocess + + +def parse_tool_version( + cmd: list[str], + pattern: str | re.Pattern, + *, + group: int = 1, + timeout: float = 5.0, +) -> str | None: + """Run *cmd* and extract a version string via *pattern*. + + Args: + cmd: argv for the version subprocess (e.g. ``["bandit", "--version"]``). + pattern: Compiled regex or pattern string. Compiled with + ``re.MULTILINE`` so anchors like ``^`` match line starts in + multi-line tool output. The captured group is returned. + group: Match group index to return (default 1, the first capture). + timeout: Seconds before giving up on the subprocess. + + Returns: + The captured version string with surrounding whitespace stripped, + or ``None`` when the subprocess fails (missing binary, timeout, + nonzero exit) or the pattern doesn't match. + """ + try: + result = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=timeout, + ) + except (FileNotFoundError, subprocess.TimeoutExpired, OSError): + return None + + if isinstance(pattern, str): + pattern = re.compile(pattern, re.MULTILINE) + + # Search stdout first (most tools), fall back to stderr (some + # version banners land there — Java tools especially). + for stream in (result.stdout or "", result.stderr or ""): + match = pattern.search(stream) + if match: + return match.group(group).strip() + return None diff --git a/argus/init.py b/argus/init.py index 76eb387e..15f6ee0f 100644 --- a/argus/init.py +++ b/argus/init.py @@ -52,18 +52,8 @@ def run_init( root = Path(target_dir) config_path = root / "argus.yml" - # Show banner on interactive terminals with scroll effect if sys.stderr.isatty(): - import time - lines = _load_banner().splitlines() - for i, line in enumerate(lines): - print(line, file=sys.stderr) - if not line.strip(): - time.sleep(0.15) - elif "A R G U S" in line or "Perception is Protection" in line: - time.sleep(0.20) - else: - time.sleep(0.06) + print(_load_banner(), file=sys.stderr) print(file=sys.stderr) if config_path.exists() and not force: @@ -112,20 +102,21 @@ def _extract_enabled_scanners(config_content: str) -> list[str]: def _check_local_readiness(scanner_names: list[str]) -> dict[str, int] | None: - """Return a {local, container, missing} bucket summary, or None on error. + """Return a {local, container, missing} bucket summary, or None when unavailable. - Swallows all errors from the readiness check — init must never fail - because a scanner registry import blew up. A None return skips the - readiness block entirely in `_print_summary`. + Catches ImportError only — readiness check is best-effort and an + optional preflight import (the [preflight] extra is not always + installed) shouldn't fail init. Bugs inside the readiness logic + itself should surface, not get silently swallowed. """ if not scanner_names: return None try: from argus.preflight.tool_check import check_scanner_readiness, summarize - statuses = check_scanner_readiness(scanner_names, backend="auto") - return summarize(statuses) - except Exception: + except ImportError: return None + statuses = check_scanner_readiness(scanner_names, backend="auto") + return summarize(statuses) def detect_project(root: Path) -> dict[str, list[str]]: diff --git a/argus/linters/hadolint.py b/argus/linters/hadolint.py index 599a8885..247cbeb4 100644 --- a/argus/linters/hadolint.py +++ b/argus/linters/hadolint.py @@ -7,6 +7,7 @@ from argus.containers import get_image from argus.core.models import Finding, ScanResult, Severity +from argus.core.version import parse_tool_version class HadolintLinter: @@ -49,24 +50,7 @@ def tool_version(self) -> str | None: """Return the installed hadolint version, or None if not available.""" if not self.is_available(): return None - try: - result = subprocess.run( - ["hadolint", "--version"], - capture_output=True, text=True, timeout=5, - ) - # Output: "Haskell Dockerfile Linter X.Y.Z-no-git" - text = result.stdout.strip() - if not text: - return None - # Last token is the version, possibly with a suffix - parts = text.splitlines()[0].split() - if parts: - version = parts[-1] - # Strip git/build suffixes like "-no-git" - return version.split("-")[0] if version else None - return None - except (subprocess.TimeoutExpired, FileNotFoundError, Exception): - return None + return parse_tool_version(["hadolint", "--version"], r"Linter (\d+\.\d+\.\d+)") def _find_dockerfiles(self, target: Path) -> list[Path]: """Find all Dockerfile-like files under the target path.""" diff --git a/argus/scanners/bandit.py b/argus/scanners/bandit.py index c85098e8..aab98fe4 100644 --- a/argus/scanners/bandit.py +++ b/argus/scanners/bandit.py @@ -1,13 +1,30 @@ """Bandit SAST scanner for Python code.""" import json +import re import shutil -import subprocess -import tempfile from pathlib import Path from argus.containers import get_image from argus.core.models import Finding, ScanResult, Severity +from argus.core.redact import REDACTED_PLACEHOLDER +from argus.core.scanner_template import ScanPaths, run_subprocess_scan +from argus.core.version import parse_tool_version + + +# Bandit tests that detect hardcoded credentials. Their ``issue_text`` +# interpolates the matched literal verbatim +# (e.g. ``Possible hardcoded password: 'hunter2'``) and the ``code`` +# excerpt contains the offending source line — both leak the secret to +# terminal output, exports, and (most acutely) the MCP server's +# AI-assistant context. We redact both for these IDs. +_HARDCODED_SECRET_TESTS = frozenset({ + "B105", # hardcoded_password_string + "B106", # hardcoded_password_funcarg + "B107", # hardcoded_password_default +}) + +_QUOTED_LITERAL_RE = re.compile(r":\s*['\"][^'\"]*['\"]") class BanditScanner: @@ -18,43 +35,37 @@ class BanditScanner: category = "sast" languages = ["python"] container_image = get_image("bandit") + # The custom argus bandit image uses ENTRYPOINT ["bandit"]; engine + # strips argv[0] for ENTRYPOINT-based images. + container_entrypoint = "bandit" def scan(self, path: str, config: dict | None = None) -> ScanResult: - """Run Bandit against the given path and return results.""" - config = config or {} + """Run Bandit against *path* and return results.""" + return run_subprocess_scan(self, path, config) - with tempfile.TemporaryDirectory() as tmp_dir: - output_file = Path(tmp_dir) / "bandit-results.json" - cmd = self._build_command(path, output_file, config) + def build_args(self, paths: ScanPaths, config: dict) -> list[str]: + """Build the full argv (including the binary name). - result = subprocess.run( - cmd, - capture_output=True, - text=True, - ) - - # Bandit uses --exit-zero so non-zero means a real error - if result.returncode != 0: - return ScanResult( - scanner=self.name, - metadata={ - "error": result.stderr.strip(), - "returncode": result.returncode, - }, - ) - - if not output_file.exists(): - return ScanResult( - scanner=self.name, - metadata={"error": "No output file produced"}, - ) - - findings = self.parse_results(output_file) - return ScanResult( - scanner=self.name, - findings=findings, - raw_report=output_file, - ) + Engine drops argv[0] when the container image declares an + ENTRYPOINT, so the same method works for both local and + container execution. + """ + args = [ + "bandit", + "-r", paths.workspace, + "-f", "json", + "-o", paths.output, + "--exit-zero", + ] + config_file = config.get("config_file") + if config_file: + # Local: caller passes the host path; container: prefix + # /workspace/ since the file is mounted there. + args.extend(["-c", config_file if "/" in config_file else f"{paths.workspace}/{config_file}"]) + exclude = config.get("exclude") + if exclude: + args.extend(["--exclude", exclude]) + return args def is_available(self) -> bool: """Check if Bandit is installed.""" @@ -68,74 +79,22 @@ def tool_version(self) -> str | None: """Return the installed Bandit version, or None if not available.""" if not self.is_available(): return None - try: - result = subprocess.run( - ["bandit", "--version"], - capture_output=True, text=True, timeout=5, - ) - # Output: "bandit X.Y.Z ..." - for line in result.stdout.strip().splitlines(): - parts = line.split() - if len(parts) >= 2 and parts[0] == "bandit": - return parts[1] - return None - except (subprocess.TimeoutExpired, FileNotFoundError, Exception): - return None - - def container_args(self, config: dict | None = None) -> list[str]: - """Build container args from config -- mirrors _build_command. - - NOTE: The custom bandit image (ghcr.io/huntridge-labs/argus/ - scanner-bandit) uses ENTRYPOINT ["bandit"], so these args are - appended directly to the bandit command. Do NOT include the - ``bandit`` executable name here; the container entrypoint - already provides it. - """ - config = config or {} - args = [ - "-r", "/workspace", - "-f", "json", - "-o", "/output/results.json", - "--exit-zero", - ] - exclude = config.get("exclude") - if exclude: - args.extend(["--exclude", exclude]) - config_file = config.get("config_file") - if config_file: - args.extend(["-c", f"/workspace/{config_file}"]) - return args + return parse_tool_version(["bandit", "--version"], r"^bandit (\S+)") def parse_results(self, raw_output_path: Path) -> list[Finding]: """Parse Bandit JSON output into findings.""" data = json.loads(raw_output_path.read_text()) - results = data.get("results", []) - - return [self._parse_finding(item) for item in results] - - # Bandit tests that detect hardcoded credentials. Their ``issue_text`` - # interpolates the matched literal verbatim - # (e.g. ``Possible hardcoded password: 'hunter2'``) and the ``code`` - # excerpt contains the offending source line — both leak the secret - # to terminal output, exports, and (most acutely) the MCP server's - # AI-assistant context. We redact both for these IDs. - _HARDCODED_SECRET_TESTS = frozenset({ - "B105", # hardcoded_password_string - "B106", # hardcoded_password_funcarg - "B107", # hardcoded_password_default - }) + return [self._parse_finding(item) for item in data.get("results", [])] def _parse_finding(self, item: dict) -> Finding: """Convert a single Bandit result into a Finding. Redaction commitment: for the hardcoded-credential tests - (B105 / B106 / B107) bandit's ``issue_text`` and ``code`` - fields contain the matched secret value literally. Both - are replaced with the redaction placeholder before they - reach the Finding — see ``argus/core/redact.py``. + (B105 / B106 / B107) bandit's ``issue_text`` and ``code`` fields + contain the matched secret value literally. Both are replaced + with the redaction placeholder before they reach the Finding — + see ``argus/core/redact.py``. """ - from argus.core.redact import REDACTED_PLACEHOLDER - severity = Severity.from_string(item.get("issue_severity", "UNKNOWN")) cwe = None @@ -151,16 +110,9 @@ def _parse_finding(self, item: dict) -> Finding: issue_text = item.get("issue_text", "") code_excerpt = item.get("code", "") - if test_id in self._HARDCODED_SECRET_TESTS: - # Strip the quoted literal from the message — bandit's - # template is ``...: ''``. Anchored to the colon-quote - # boundary so we don't false-positive on messages with - # other punctuation. - import re - issue_text = re.sub( - r":\s*['\"][^'\"]*['\"]", - f": {REDACTED_PLACEHOLDER}", - issue_text, + if test_id in _HARDCODED_SECRET_TESTS: + issue_text = _QUOTED_LITERAL_RE.sub( + f": {REDACTED_PLACEHOLDER}", issue_text, ) code_excerpt = REDACTED_PLACEHOLDER @@ -179,25 +131,3 @@ def _parse_finding(self, item: dict) -> Finding: "code": code_excerpt, }, ) - - def _build_command( - self, path: str, output_file: Path, config: dict - ) -> list[str]: - """Build the Bandit CLI command.""" - cmd = [ - "bandit", - "-r", path, - "-f", "json", - "-o", str(output_file), - "--exit-zero", - ] - - exclude = config.get("exclude") - if exclude: - cmd.extend(["--exclude", exclude]) - - config_file = config.get("config_file") - if config_file: - cmd.extend(["-c", config_file]) - - return cmd diff --git a/argus/scanners/checkov.py b/argus/scanners/checkov.py index c1885d33..3516a56a 100644 --- a/argus/scanners/checkov.py +++ b/argus/scanners/checkov.py @@ -2,12 +2,12 @@ import json import shutil -import subprocess -import tempfile from pathlib import Path from argus.containers import get_image from argus.core.models import Finding, ScanResult, Severity +from argus.core.scanner_template import ScanPaths, run_subprocess_scan +from argus.core.version import parse_tool_version class CheckovScanner: @@ -18,11 +18,31 @@ class CheckovScanner: category = "iac" languages = ["terraform", "kubernetes", "cloudformation"] container_image = get_image("checkov") + # The official Checkov image uses ENTRYPOINT ["checkov"]; engine strips + # argv[0] for ENTRYPOINT-based images. + container_entrypoint = "checkov" - def container_args(self, config: dict | None = None) -> list[str]: - """Build container args from config — mirrors _build_command.""" - config = config or {} - args = ["-d", "/workspace", "-o", "json", "--quiet", "--output-file-path", "/output"] + def scan(self, path: str, config: dict | None = None) -> ScanResult: + """Run Checkov against the given path and return results.""" + return run_subprocess_scan(self, path, config) + + def build_args(self, paths: ScanPaths, config: dict) -> list[str]: + """Build the full argv (including the binary name). + + Engine drops argv[0] when the container image declares an + ENTRYPOINT, so the same method works for both local and + container execution. + + Checkov writes JSON to stdout when given ``-o json``. The + template's stdout fallback captures it and writes to the + expected output file automatically. + """ + args = [ + "checkov", + "-d", paths.workspace, + "-o", "json", + "--quiet", + ] framework = config.get("framework") if framework: args.extend(["--framework", framework]) @@ -34,45 +54,6 @@ def container_args(self, config: dict | None = None) -> list[str]: args.extend(["--skip-check", skip_check]) return args - def scan(self, path: str, config: dict | None = None) -> ScanResult: - """Run Checkov against the given path and return results.""" - config = config or {} - - with tempfile.TemporaryDirectory() as tmp_dir: - output_file = Path(tmp_dir) / "checkov-results.json" - cmd = self._build_command(path, config) - - result = subprocess.run( - cmd, - capture_output=True, - text=True, - ) - - # Checkov outputs JSON to stdout with -o json - if result.stdout.strip(): - output_file.write_text(result.stdout) - elif result.returncode != 0: - return ScanResult( - scanner=self.name, - metadata={ - "error": result.stderr.strip(), - "returncode": result.returncode, - }, - ) - else: - return ScanResult( - scanner=self.name, - metadata={"error": "No output produced"}, - ) - - findings, passed_count = self.parse_results(output_file) - return ScanResult( - scanner=self.name, - findings=findings, - raw_report=output_file, - metadata={"passed_count": passed_count}, - ) - def is_available(self) -> bool: """Check if Checkov is installed.""" return shutil.which("checkov") is not None @@ -85,28 +66,11 @@ def tool_version(self) -> str | None: """Return the installed Checkov version, or None if not available.""" if not self.is_available(): return None - try: - result = subprocess.run( - ["checkov", "--version"], - capture_output=True, text=True, timeout=10, - ) - # Output: "X.Y.Z" or "checkov X.Y.Z" - version_text = result.stdout.strip() - if not version_text: - return None - # Take the last line in case of extra output - last_line = version_text.splitlines()[-1].strip() - # If it starts with a digit, it's just the version - if last_line and last_line[0].isdigit(): - return last_line - # Otherwise try to extract version after "checkov" - parts = last_line.split() - for part in parts: - if part and part[0].isdigit(): - return part - return None - except (subprocess.TimeoutExpired, FileNotFoundError, Exception): - return None + return parse_tool_version( + ["checkov", "--version"], + r"(?:^|checkov )([0-9][\w.]*)", + timeout=10, + ) def parse_results(self, raw_output_path: Path) -> tuple[list[Finding], int]: """Parse Checkov JSON output into findings and passed count.""" @@ -169,26 +133,3 @@ def _parse_check(self, check: dict) -> Finding: ), }, ) - - def _build_command(self, path: str, config: dict) -> list[str]: - """Build the Checkov CLI command.""" - cmd = [ - "checkov", - "-d", path, - "-o", "json", - "--quiet", - ] - - framework = config.get("framework") - if framework: - cmd.extend(["--framework", framework]) - - check = config.get("check") - if check: - cmd.extend(["--check", check]) - - skip_check = config.get("skip_check") - if skip_check: - cmd.extend(["--skip-check", skip_check]) - - return cmd diff --git a/argus/scanners/clamav.py b/argus/scanners/clamav.py index fa15df5e..2c84b43f 100644 --- a/argus/scanners/clamav.py +++ b/argus/scanners/clamav.py @@ -7,6 +7,7 @@ from argus.containers import get_image from argus.core.models import Finding, ScanResult, Severity +from argus.core.version import parse_tool_version _FOUND_PATTERN = re.compile(r"^(.+):\s+(.+)\s+FOUND$") @@ -77,24 +78,8 @@ def tool_version(self) -> str | None: """Return the installed ClamAV version, or None if not available.""" if not self.is_available(): return None - try: - result = subprocess.run( - ["clamscan", "--version"], - capture_output=True, text=True, timeout=5, - ) - # Output: "ClamAV X.Y.Z/..." or "ClamAV X.Y.Z" - text = result.stdout.strip() - if not text: - return None - # Parse "ClamAV X.Y.Z" from first line - first_line = text.splitlines()[0] - parts = first_line.split() - if len(parts) >= 2 and parts[0] == "ClamAV": - # Version may have "/dbver" suffix - return parts[1].split("/")[0] - return None - except (subprocess.TimeoutExpired, FileNotFoundError, Exception): - return None + # Output: "ClamAV X.Y.Z/dbver/..." → take only the X.Y.Z part + return parse_tool_version(["clamscan", "--version"], r"^ClamAV ([0-9.]+)") def parse_results(self, raw_output_path: Path) -> list[Finding]: """Parse ClamAV text output file into findings.""" diff --git a/argus/scanners/gitleaks.py b/argus/scanners/gitleaks.py index 2200003c..15e4da88 100644 --- a/argus/scanners/gitleaks.py +++ b/argus/scanners/gitleaks.py @@ -2,12 +2,12 @@ import json import shutil -import subprocess -import tempfile from pathlib import Path from argus.containers import get_image from argus.core.models import Finding, ScanResult, Severity +from argus.core.scanner_template import ScanPaths, run_subprocess_scan +from argus.core.version import parse_tool_version class GitleaksScanner: @@ -18,58 +18,34 @@ class GitleaksScanner: category = "secrets" languages = ["all"] container_image = get_image("gitleaks") + # The gitleaks image uses ENTRYPOINT ["gitleaks"]; engine strips argv[0] + # for ENTRYPOINT-based images. + container_entrypoint = "gitleaks" - def container_args(self, config: dict | None = None) -> list[str]: - """Build container args from config — mirrors _build_command.""" - config = config or {} + def scan(self, path: str, config: dict | None = None) -> ScanResult: + """Run Gitleaks against the given path and return results.""" + return run_subprocess_scan(self, path, config) + + def build_args(self, paths: ScanPaths, config: dict) -> list[str]: + """Build the full argv (including the binary name). + + Engine drops argv[0] when the container image declares an + ENTRYPOINT, so the same method works for both local and + container execution. + """ args = [ - "detect", "--source", "/workspace", + "gitleaks", + "detect", + "--source", paths.workspace, "--report-format", "json", - "--report-path", "/output/results.json", + "--report-path", paths.output, "--exit-code", "0", ] config_file = config.get("config_file") if config_file: - args.extend(["--config", f"/workspace/{config_file}"]) + args.extend(["--config", config_file if "/" in config_file else f"{paths.workspace}/{config_file}"]) return args - def scan(self, path: str, config: dict | None = None) -> ScanResult: - """Run Gitleaks against the given path and return results.""" - config = config or {} - - with tempfile.TemporaryDirectory() as tmp_dir: - output_file = Path(tmp_dir) / "gitleaks-results.json" - cmd = self._build_command(path, output_file, config) - - result = subprocess.run( - cmd, - capture_output=True, - text=True, - ) - - # --exit-code 0 means gitleaks always exits 0 - if result.returncode != 0: - return ScanResult( - scanner=self.name, - metadata={ - "error": result.stderr.strip(), - "returncode": result.returncode, - }, - ) - - if not output_file.exists(): - return ScanResult( - scanner=self.name, - metadata={"error": "No output file produced"}, - ) - - findings = self.parse_results(output_file) - return ScanResult( - scanner=self.name, - findings=findings, - raw_report=output_file, - ) - def is_available(self) -> bool: """Check if Gitleaks is installed.""" return shutil.which("gitleaks") is not None @@ -82,18 +58,7 @@ def tool_version(self) -> str | None: """Return the installed Gitleaks version, or None if not available.""" if not self.is_available(): return None - try: - result = subprocess.run( - ["gitleaks", "version"], - capture_output=True, text=True, timeout=5, - ) - # Output: "vX.Y.Z" — strip the leading v - version_text = result.stdout.strip() - if not version_text: - return None - return version_text.lstrip("v") - except (subprocess.TimeoutExpired, FileNotFoundError, Exception): - return None + return parse_tool_version(["gitleaks", "version"], r"v?([0-9]\S*)") def parse_results(self, raw_output_path: Path) -> list[Finding]: """Parse Gitleaks JSON output into findings.""" @@ -161,22 +126,3 @@ def _parse_finding(self, item: dict) -> Finding: "end_line": item.get("EndLine", 0), }, ) - - def _build_command( - self, path: str, output_file: Path, config: dict - ) -> list[str]: - """Build the Gitleaks CLI command.""" - cmd = [ - "gitleaks", - "detect", - "--source", path, - "--report-format", "json", - "--report-path", str(output_file), - "--exit-code", "0", - ] - - config_file = config.get("config_file") - if config_file: - cmd.extend(["--config", config_file]) - - return cmd diff --git a/argus/scanners/grype.py b/argus/scanners/grype.py index 2da637e1..6ff6e097 100644 --- a/argus/scanners/grype.py +++ b/argus/scanners/grype.py @@ -110,7 +110,7 @@ def tool_version(self) -> str | None: data = json.loads(res.stdout) v = data.get("version") return v if isinstance(v, str) else None - except (subprocess.TimeoutExpired, json.JSONDecodeError, Exception): + except (subprocess.TimeoutExpired, FileNotFoundError, json.JSONDecodeError, OSError): return None def parse_results(self, raw_output_path: Path) -> list[Finding]: diff --git a/argus/scanners/opengrep.py b/argus/scanners/opengrep.py index 1fc33f42..f67881d3 100644 --- a/argus/scanners/opengrep.py +++ b/argus/scanners/opengrep.py @@ -2,12 +2,12 @@ import json import shutil -import subprocess -import tempfile from pathlib import Path from argus.containers import get_image from argus.core.models import Finding, ScanResult, Severity +from argus.core.scanner_template import ScanPaths, run_subprocess_scan +from argus.core.version import parse_tool_version class OpengrepScanner: @@ -19,51 +19,29 @@ class OpengrepScanner: languages = ["python", "javascript", "typescript", "go", "java", "ruby", "c", "cpp"] container_image = get_image("semgrep") - def container_args(self, config: dict | None = None) -> list[str]: - """Build container args from config — mirrors _build_command. + def scan(self, path: str, config: dict | None = None) -> ScanResult: + """Run OpenGrep against the given path and return results.""" + return run_subprocess_scan(self, path, config) + + def build_args(self, paths: ScanPaths, config: dict) -> list[str]: + """Build the full argv (including the binary name). - The semgrep image ENTRYPOINT is not semgrep, so we prefix the command. + Note: local execution uses the ``opengrep`` binary; container + execution (semgrep image) uses ``semgrep scan`` — if you need + the semgrep image to work, override ``container_args`` or set + the container image to one that ships the ``opengrep`` binary. """ - config = config or {} - args = ["semgrep", "scan", "--json", "--output", "/output/results.json"] + args = [ + "opengrep", + "--json", + "--output", paths.output, + ] rules_config = config.get("config") if rules_config: args.extend(["--config", rules_config]) - args.append("/workspace") + args.append(paths.workspace) return args - def scan(self, path: str, config: dict | None = None) -> ScanResult: - """Run OpenGrep against the given path and return results.""" - config = config or {} - - with tempfile.TemporaryDirectory() as tmp_dir: - output_file = Path(tmp_dir) / "opengrep-results.json" - cmd = self._build_command(path, output_file, config) - - result = subprocess.run( - cmd, - capture_output=True, - text=True, - ) - - # OpenGrep returns non-zero on findings or errors; - # check for output file to distinguish - if not output_file.exists(): - return ScanResult( - scanner=self.name, - metadata={ - "error": result.stderr.strip() or "No output file produced", - "returncode": result.returncode, - }, - ) - - findings = self.parse_results(output_file) - return ScanResult( - scanner=self.name, - findings=findings, - raw_report=output_file, - ) - def is_available(self) -> bool: """Check if OpenGrep is installed.""" return shutil.which("opengrep") is not None @@ -76,21 +54,7 @@ def tool_version(self) -> str | None: """Return the installed OpenGrep version, or None if not available.""" if not self.is_available(): return None - try: - result = subprocess.run( - ["opengrep", "--version"], - capture_output=True, text=True, timeout=5, - ) - # Parse the version string from output - version_text = result.stdout.strip() - if not version_text: - return None - # Take the last token of the first line as the version - first_line = version_text.splitlines()[0] - parts = first_line.split() - return parts[-1] if parts else None - except (subprocess.TimeoutExpired, FileNotFoundError, Exception): - return None + return parse_tool_version(["opengrep", "--version"], r"(\d+\.\d+\.\S+)") def parse_results(self, raw_output_path: Path) -> list[Finding]: """Parse OpenGrep JSON output into findings.""" @@ -129,20 +93,3 @@ def _parse_finding(self, item: dict) -> Finding: "column": start.get("col", 0), }, ) - - def _build_command( - self, path: str, output_file: Path, config: dict - ) -> list[str]: - """Build the OpenGrep CLI command.""" - cmd = [ - "opengrep", - "--json", - "--output", str(output_file), - ] - - rules_config = config.get("config") - if rules_config: - cmd.extend(["--config", rules_config]) - - cmd.append(path) - return cmd diff --git a/argus/scanners/osv.py b/argus/scanners/osv.py index d40c7f32..10c941e8 100644 --- a/argus/scanners/osv.py +++ b/argus/scanners/osv.py @@ -2,12 +2,12 @@ import json import shutil -import subprocess -import tempfile from pathlib import Path from argus.containers import get_image from argus.core.models import Finding, ScanResult, Severity +from argus.core.scanner_template import ScanPaths, run_subprocess_scan +from argus.core.version import parse_tool_version class OsvScanner: @@ -19,67 +19,67 @@ class OsvScanner: languages = ["all"] container_image = get_image("osv-scanner") supports_sbom = True + # The official osv-scanner image uses ENTRYPOINT ["osv-scanner"]; engine + # strips argv[0] for ENTRYPOINT-based images. + container_entrypoint = "osv-scanner" - def container_args(self, config: dict | None = None) -> list[str]: - """Build container args from config — mirrors _build_command.""" - config = config or {} + def scan(self, path: str, config: dict | None = None) -> ScanResult: + """Run OSV-Scanner against the given path and return results.""" + return run_subprocess_scan(self, path, config) + + def build_args(self, paths: ScanPaths, config: dict) -> list[str]: + """Build the full argv (including the binary name). + + Engine drops argv[0] when the container image declares an + ENTRYPOINT, so the same method works for both local and + container execution. + + Two modes: + - SBOM mode (``config["sbom_path"]`` set): passes the SBOM via + ``-L`` to ``scan`` (osv-scanner v2 API). + - Source mode (default): passes the workspace path to + ``scan source``, with optional ``-L `` and + ``--recursive``. + """ sbom_path = config.get("sbom_path") - # osv-scanner v2: `scan --sbom` for SBOMs, `scan source` for source trees if sbom_path: - sbom_in_container = config.get("sbom_mount_path") or f"/workspace/{sbom_path}" - return ["scan", "--format", "json", "--output-file", "/output/results.json", - "-L", sbom_in_container] - args = ["scan", "source", "--format", "json", "--output-file", "/output/results.json"] + # The engine sets ``sbom_mount_path`` for container execution + # (it mounts the host SBOM into a known container path). + # Otherwise: absolute SBOM paths pass through; relative ones + # resolve against the workspace. + mount_path = config.get("sbom_mount_path") + if mount_path: + sbom_in_context = mount_path + elif sbom_path.startswith("/"): + sbom_in_context = sbom_path + else: + sbom_in_context = f"{paths.workspace}/{sbom_path}" + return [ + "osv-scanner", + "scan", + "--format", "json", + "--output-file", paths.output, + "-L", sbom_in_context, + ] + + args = [ + "osv-scanner", + "scan", "source", + "--format", "json", + "--output-file", paths.output, + ] lockfile = config.get("lockfile") if lockfile: - args.extend(["-L", f"/workspace/{lockfile}"]) + args.extend(["-L", f"{paths.workspace}/{lockfile}"]) else: recursive = config.get("recursive", True) if str(recursive).lower() not in ("false", "0", "no"): args.append("--recursive") - args.append("/workspace") + args.append(paths.workspace) if config.get("config_file"): - args.extend(["--config", f"/workspace/{config['config_file']}"]) + args.extend(["--config", f"{paths.workspace}/{config['config_file']}"]) return args - def scan(self, path: str, config: dict | None = None) -> ScanResult: - """Run OSV-Scanner against the given path and return results.""" - config = config or {} - - with tempfile.TemporaryDirectory() as tmp_dir: - output_file = Path(tmp_dir) / "osv-results.json" - cmd = self._build_command(path, output_file, config) - - result = subprocess.run( - cmd, - capture_output=True, - text=True, - ) - - # osv-scanner exits non-zero when vulnerabilities are found, - # so we only treat truly unexpected failures as errors - if result.returncode != 0 and not output_file.exists(): - return ScanResult( - scanner=self.name, - metadata={ - "error": result.stderr.strip(), - "returncode": result.returncode, - }, - ) - - if not output_file.exists(): - return ScanResult( - scanner=self.name, - metadata={"error": "No output file produced"}, - ) - - findings = self.parse_results(output_file) - return ScanResult( - scanner=self.name, - findings=findings, - raw_report=output_file, - ) - def is_available(self) -> bool: """Check if OSV-Scanner is installed.""" return shutil.which("osv-scanner") is not None @@ -92,24 +92,10 @@ def tool_version(self) -> str | None: """Return the installed OSV-Scanner version, or None if not available.""" if not self.is_available(): return None - try: - result = subprocess.run( - ["osv-scanner", "--version"], - capture_output=True, text=True, timeout=5, - ) - # Output varies: "osv-scanner version X.Y.Z" or similar - text = result.stdout.strip() - if not text: - return None - # Look for a version-like token (digits and dots) - for line in text.splitlines(): - for part in line.split(): - stripped = part.lstrip("v") - if stripped and stripped[0].isdigit() and "." in stripped: - return stripped - return None - except (subprocess.TimeoutExpired, FileNotFoundError, Exception): - return None + return parse_tool_version( + ["osv-scanner", "--version"], + r"v?(\d+\.\d+(?:\.\d+)?[\w.-]*)", + ) def parse_results(self, raw_output_path: Path) -> list[Finding]: """Parse OSV-Scanner JSON output into findings.""" @@ -196,34 +182,3 @@ def _extract_cwe(self, vuln: dict) -> str | None: if cwe_ids: return cwe_ids[0] return None - - def _build_command( - self, path: str, output_file: Path, config: dict - ) -> list[str]: - """Build the OSV-Scanner CLI command.""" - cmd = [ - "osv-scanner", - "scan", - "--format", "json", - "--output", str(output_file), - ] - - config_file = config.get("config_file") - if config_file: - cmd.extend(["--config", config_file]) - - sbom_path = config.get("sbom_path") - if sbom_path: - cmd.extend(["--sbom", str(sbom_path)]) - return cmd - - lockfile = config.get("lockfile") - if lockfile: - cmd.extend(["-L", lockfile]) - else: - recursive = config.get("recursive", True) - if str(recursive).lower() not in ("false", "0", "no"): - cmd.append("--recursive") - cmd.append(path) - - return cmd diff --git a/argus/scanners/supply_chain.py b/argus/scanners/supply_chain.py index a0caa44f..0944dade 100644 --- a/argus/scanners/supply_chain.py +++ b/argus/scanners/supply_chain.py @@ -9,6 +9,7 @@ from argus.containers import get_image from argus.core.models import Finding, ScanResult, Severity +from argus.core.version import parse_tool_version # zizmor security-severity score thresholds _ZIZMOR_CRITICAL_THRESHOLD = 9.0 @@ -122,21 +123,7 @@ def tool_version(self) -> str | None: """ if shutil.which("zizmor") is None: return None - try: - result = subprocess.run( - ["zizmor", "--version"], - capture_output=True, text=True, timeout=5, - ) - # Output: "zizmor X.Y.Z" - text = result.stdout.strip() - if not text: - return None - parts = text.splitlines()[0].split() - if len(parts) >= 2 and parts[0] == "zizmor": - return parts[1] - return None - except (subprocess.TimeoutExpired, FileNotFoundError, Exception): - return None + return parse_tool_version(["zizmor", "--version"], r"^zizmor (\S+)") def parse_results(self, raw_output_path: Path) -> list[Finding]: """Parse combined results. Detects format automatically. diff --git a/argus/scanners/trivy.py b/argus/scanners/trivy.py index 5a76cbe4..e2088ff2 100644 --- a/argus/scanners/trivy.py +++ b/argus/scanners/trivy.py @@ -16,6 +16,7 @@ from argus.containers import get_image from argus.core.models import Finding, ScanResult, Severity +from argus.core.version import parse_tool_version from argus.scanners._vuln_parsers import parse_trivy_vuln @@ -100,18 +101,7 @@ def install_command(self) -> str | None: def tool_version(self) -> str | None: if not self.is_available(): return None - try: - res = subprocess.run( - ["trivy", "--version"], - capture_output=True, text=True, timeout=5, - ) - # Output: "Version: 0.58.1\n..." - for line in res.stdout.splitlines(): - if line.startswith("Version:"): - return line.split(":", 1)[1].strip() - return None - except (subprocess.TimeoutExpired, Exception): - return None + return parse_tool_version(["trivy", "--version"], r"^Version: (\S+)") def parse_results(self, raw_output_path: Path) -> list[Finding]: """Parse Trivy JSON output into Finding objects.""" diff --git a/argus/scanners/trivy_iac.py b/argus/scanners/trivy_iac.py index 25951629..95b24b20 100644 --- a/argus/scanners/trivy_iac.py +++ b/argus/scanners/trivy_iac.py @@ -8,6 +8,8 @@ from argus.containers import get_image from argus.core.models import Finding, ScanResult, Severity +from argus.core.scanner_template import ScanPaths +from argus.core.version import parse_tool_version class TrivyIacScanner: @@ -18,21 +20,24 @@ class TrivyIacScanner: category = "iac" languages = ["terraform", "kubernetes", "dockerfile"] container_image = get_image("trivy") + # The official Trivy image uses ENTRYPOINT ["trivy"]; engine strips + # argv[0] for ENTRYPOINT-based images. + container_entrypoint = "trivy" def scan(self, path: str, config: dict | None = None) -> ScanResult: - """Run Trivy IaC scan against the given path and return results.""" + """Run Trivy IaC scan against the given path and return results. + + Runs the JSON scan via the shared template and additionally + produces a SARIF report (best-effort, non-blocking) to attach as + ``sarif_report`` on the returned :class:`ScanResult`. + """ with tempfile.TemporaryDirectory() as tmp_dir: json_output = Path(tmp_dir) / "trivy-iac-results.json" sarif_output = Path(tmp_dir) / "trivy-iac-results.sarif" - # Run JSON scan + json_paths = ScanPaths(workspace=path, output=str(json_output)) json_result = subprocess.run( - [ - "trivy", "config", - "--format", "json", - "--output", str(json_output), - path, - ], + self.build_args(json_paths, config or {}), capture_output=True, text=True, ) @@ -46,17 +51,14 @@ def scan(self, path: str, config: dict | None = None) -> ScanResult: }, ) - # Run SARIF scan (best-effort, non-blocking) - subprocess.run( - [ - "trivy", "config", - "--format", "sarif", - "--output", str(sarif_output), - path, - ], - capture_output=True, - text=True, - ) + # SARIF scan — best-effort, non-blocking + sarif_args = [ + "trivy", "config", + "--format", "sarif", + "--output", str(sarif_output), + path, + ] + subprocess.run(sarif_args, capture_output=True, text=True) findings = self.parse_results(json_output) if json_output.exists() else [] @@ -67,6 +69,20 @@ def scan(self, path: str, config: dict | None = None) -> ScanResult: sarif_report=sarif_output if sarif_output.exists() else None, ) + def build_args(self, paths: ScanPaths, config: dict) -> list[str]: + """Build the full argv (including the binary name). + + Engine drops argv[0] when the container image declares an + ENTRYPOINT, so the same method works for both local and + container execution. + """ + return [ + "trivy", "config", + "--format", "json", + "--output", paths.output, + paths.workspace, + ] + def is_available(self) -> bool: """Check if Trivy is installed.""" return shutil.which("trivy") is not None @@ -79,27 +95,7 @@ def tool_version(self) -> str | None: """Return the installed Trivy version, or None if not available.""" if not self.is_available(): return None - try: - result = subprocess.run( - ["trivy", "--version"], - capture_output=True, text=True, timeout=5, - ) - # Output includes "Version: X.Y.Z" among other lines - for line in result.stdout.strip().splitlines(): - if line.startswith("Version:"): - return line.split(":", 1)[1].strip() - return None - except (subprocess.TimeoutExpired, FileNotFoundError, Exception): - return None - - def container_args(self, config: dict | None = None) -> list[str]: - """Return CLI args for running Trivy IaC in a container.""" - return [ - "config", - "--format", "json", - "--output", "/output/results.json", - "/workspace", - ] + return parse_tool_version(["trivy", "--version"], r"^Version: (\S+)") def parse_results(self, raw_output_path: Path) -> list[Finding]: """Parse Trivy IaC JSON output into findings.""" diff --git a/argus/tests/core/test_scanner_template.py b/argus/tests/core/test_scanner_template.py new file mode 100644 index 00000000..2477e497 --- /dev/null +++ b/argus/tests/core/test_scanner_template.py @@ -0,0 +1,247 @@ +"""Tests for argus.core.scanner_template. + +Locks in the contract: every scanner that calls run_subprocess_scan +goes through this code path, so any regression here breaks every +migrated scanner at once. +""" + +import json +import subprocess +from pathlib import Path +from unittest.mock import patch + +import pytest + +from argus.core.models import Finding, Severity +from argus.core.scanner_template import ScanPaths, run_subprocess_scan + + +# --------------------------------------------------------------------- # +# Test doubles # +# --------------------------------------------------------------------- # + + +class _FakeScanner: + """Minimal scanner satisfying the protocol shape.""" + + name = "fake" + + def __init__(self, args=None, parse=None): + self._args = args or [] + self._parse = parse or (lambda p: []) + + def build_args(self, paths: ScanPaths, config: dict) -> list[str]: + # Echo the paths into the command so tests can assert wiring. + return ["fake", "--in", paths.workspace, "--out", paths.output] + self._args + + def parse_results(self, output_path: Path): + return self._parse(output_path) + + +def _completed(stdout: str = "", stderr: str = "", returncode: int = 0): + return subprocess.CompletedProcess( + args=["fake"], returncode=returncode, stdout=stdout, stderr=stderr, + ) + + +# --------------------------------------------------------------------- # +# Happy paths # +# --------------------------------------------------------------------- # + + +class TestRunSubprocessScan: + + def test_writes_findings_when_output_file_exists(self, tmp_path): + finding = Finding(id="X", severity=Severity.HIGH, title="t", scanner="fake") + scanner = _FakeScanner(parse=lambda p: [finding]) + + def fake_run(cmd, **kwargs): + # The template wires ScanPaths.output into argv; write to it. + output_path = Path(cmd[cmd.index("--out") + 1]) + output_path.write_text("{}") + return _completed() + + with patch("subprocess.run", side_effect=fake_run): + result = run_subprocess_scan(scanner, str(tmp_path)) + + assert result.scanner == "fake" + assert result.findings == [finding] + assert "error" not in result.metadata + + def test_passes_workspace_path_through_to_build_args(self, tmp_path): + scanner = _FakeScanner() + + captured: dict = {} + + def fake_run(cmd, **kwargs): + captured["cmd"] = cmd + output_path = Path(cmd[cmd.index("--out") + 1]) + output_path.write_text("{}") + return _completed() + + with patch("subprocess.run", side_effect=fake_run): + run_subprocess_scan(scanner, "/some/workspace") + + assert "/some/workspace" in captured["cmd"] + # Output path lives inside a tempdir (not under workspace). + out_idx = captured["cmd"].index("--out") + assert "/some/workspace" not in captured["cmd"][out_idx + 1] + + def test_passes_config_through_to_build_args(self, tmp_path): + # Tool-specific knobs (lockfile, exclude, config_file, sbom_path) live + # in config, not on ScanPaths — the helper hands the config dict + # straight through to ``build_args(paths, config)``. + seen: list[dict] = [] + + class CapturingScanner: + name = "fake" + + def build_args(self, paths, config): + seen.append(config) + return ["fake", "--out", paths.output] + + def parse_results(self, output_path): + return [] + + def fake_run(cmd, **kwargs): + output_path = Path(cmd[cmd.index("--out") + 1]) + output_path.write_text("{}") + return _completed() + + with patch("subprocess.run", side_effect=fake_run): + run_subprocess_scan( + CapturingScanner(), + str(tmp_path), + config={"config_file": ".bandit", "sbom_path": "/tmp/x.spdx.json"}, + ) + + assert seen[0]["config_file"] == ".bandit" + assert seen[0]["sbom_path"] == "/tmp/x.spdx.json" + + def test_unpacks_passed_count_tuple(self, tmp_path): + # checkov-style return: (findings, passed_count_int) + scanner = _FakeScanner(parse=lambda p: ([], 42)) + + def fake_run(cmd, **kwargs): + output_path = Path(cmd[cmd.index("--out") + 1]) + output_path.write_text("{}") + return _completed() + + with patch("subprocess.run", side_effect=fake_run): + result = run_subprocess_scan(scanner, str(tmp_path)) + + assert result.metadata.get("passed_count") == 42 + + def test_unpacks_metadata_dict_tuple(self, tmp_path): + scanner = _FakeScanner(parse=lambda p: ([], {"warnings": 3})) + + def fake_run(cmd, **kwargs): + output_path = Path(cmd[cmd.index("--out") + 1]) + output_path.write_text("{}") + return _completed() + + with patch("subprocess.run", side_effect=fake_run): + result = run_subprocess_scan(scanner, str(tmp_path)) + + assert result.metadata.get("warnings") == 3 + + def test_falls_back_to_stdout_when_no_output_file(self, tmp_path): + scanner = _FakeScanner(parse=lambda p: [Finding( + id="Y", severity=Severity.LOW, title="from stdout", scanner="fake", + )]) + + def fake_run(cmd, **kwargs): + # Don't write the output file — emit on stdout instead. + return _completed(stdout=json.dumps({"data": []})) + + with patch("subprocess.run", side_effect=fake_run): + result = run_subprocess_scan(scanner, str(tmp_path)) + + assert len(result.findings) == 1 + assert result.findings[0].title == "from stdout" + + def test_overrides_output_filename(self, tmp_path): + captured: dict = {} + + class SarifScanner: + name = "sarif-fake" + + def build_args(self, paths, config): + captured["output"] = paths.output + return ["fake", "--out", paths.output] + + def parse_results(self, output_path): + return [] + + def fake_run(cmd, **kwargs): + Path(cmd[cmd.index("--out") + 1]).write_text("{}") + return _completed() + + with patch("subprocess.run", side_effect=fake_run): + run_subprocess_scan( + SarifScanner(), str(tmp_path), output_filename="results.sarif", + ) + + assert captured["output"].endswith("results.sarif") + + +# --------------------------------------------------------------------- # +# Failure paths # +# --------------------------------------------------------------------- # + + +class TestFailures: + + def test_missing_binary_returns_error_metadata(self, tmp_path): + scanner = _FakeScanner() + with patch("subprocess.run", side_effect=FileNotFoundError(2, "no such file", "fake")): + result = run_subprocess_scan(scanner, str(tmp_path)) + assert result.findings == [] + assert "Tool not found" in result.metadata.get("error", "") + + def test_timeout_returns_error_metadata(self, tmp_path): + scanner = _FakeScanner() + with patch("subprocess.run", side_effect=subprocess.TimeoutExpired("fake", 5)): + result = run_subprocess_scan(scanner, str(tmp_path), timeout=5) + assert "timed out" in result.metadata.get("error", "") + + def test_no_output_no_stdout_returns_error_metadata(self, tmp_path): + scanner = _FakeScanner() + with patch( + "subprocess.run", + return_value=_completed(stdout="", stderr="boom", returncode=2), + ): + result = run_subprocess_scan(scanner, str(tmp_path)) + err = result.metadata.get("error", "") + assert "No output produced" in err + assert "boom" in err + + def test_unexpected_exception_propagates(self, tmp_path): + # Bugs in scanner.parse_results shouldn't be silently translated. + scanner = _FakeScanner(parse=lambda p: (_ for _ in ()).throw(ValueError("bad json"))) + + def fake_run(cmd, **kwargs): + Path(cmd[cmd.index("--out") + 1]).write_text("{}") + return _completed() + + with patch("subprocess.run", side_effect=fake_run): + with pytest.raises(ValueError, match="bad json"): + run_subprocess_scan(scanner, str(tmp_path)) + + +# --------------------------------------------------------------------- # +# ScanPaths # +# --------------------------------------------------------------------- # + + +class TestScanPaths: + + def test_holds_workspace_and_output(self): + paths = ScanPaths(workspace="/in", output="/out/r.json") + assert paths.workspace == "/in" + assert paths.output == "/out/r.json" + + def test_frozen(self): + paths = ScanPaths(workspace="/in", output="/out") + with pytest.raises(Exception): # FrozenInstanceError or AttributeError + paths.workspace = "/other" # type: ignore[misc] diff --git a/argus/tests/core/test_version.py b/argus/tests/core/test_version.py new file mode 100644 index 00000000..81b96aa1 --- /dev/null +++ b/argus/tests/core/test_version.py @@ -0,0 +1,96 @@ +"""Tests for argus.core.version.parse_tool_version. + +Locks in the contract: 9 scanner modules now depend on this helper, so +any regression here breaks every scanner's tool_version() at once. +""" + +import re +import subprocess +from unittest.mock import patch + +import pytest + +from argus.core.version import parse_tool_version + + +def _completed(stdout: str = "", stderr: str = "", returncode: int = 0): + return subprocess.CompletedProcess( + args=["fake"], returncode=returncode, stdout=stdout, stderr=stderr, + ) + + +class TestParseToolVersion: + """Happy-path regex matching across the parser styles in use.""" + + def test_returns_first_capture_group(self): + with patch("subprocess.run", return_value=_completed("bandit 1.7.5")): + assert parse_tool_version(["bandit", "--version"], r"^bandit (\S+)") == "1.7.5" + + def test_strips_surrounding_whitespace(self): + with patch("subprocess.run", return_value=_completed("bandit 1.7.5 \n")): + assert parse_tool_version(["bandit", "--version"], r"^bandit (\S+)") == "1.7.5" + + def test_multiline_anchor_works(self): + # Trivy emits a multi-line banner; the helper passes re.MULTILINE so + # ^Version: matches at start of any line. The first line is the CLI + # version we want; later lines are DB metadata we don't. + output = "Version: 0.58.1\nVulnerability DB:\n Version: 2" + with patch("subprocess.run", return_value=_completed(output)): + assert parse_tool_version(["trivy", "--version"], r"^Version: (\S+)") == "0.58.1" + + def test_falls_back_to_stderr(self): + # Some Java tools emit version strings to stderr. + with patch("subprocess.run", return_value=_completed(stdout="", stderr="checkov 3.2.0")): + assert parse_tool_version(["checkov", "--version"], r"^checkov (\S+)") == "3.2.0" + + def test_optional_v_prefix(self): + # Gitleaks emits vX.Y.Z; the captured group should not include the v. + with patch("subprocess.run", return_value=_completed("v8.18.4")): + assert parse_tool_version(["gitleaks", "version"], r"v?([0-9]\S*)") == "8.18.4" + + def test_pattern_can_be_compiled_regex(self): + compiled = re.compile(r"^bandit (\S+)", re.MULTILINE) + with patch("subprocess.run", return_value=_completed("bandit 1.7.5")): + assert parse_tool_version(["bandit", "--version"], compiled) == "1.7.5" + + def test_custom_group_index(self): + with patch("subprocess.run", return_value=_completed("name=trivy ver=0.58.1")): + assert parse_tool_version( + ["trivy", "--version"], + r"name=(\S+) ver=(\S+)", + group=2, + ) == "0.58.1" + + +class TestParseToolVersionFailures: + """Anything we narrowly expect returns None — never raises. + + Per ADR-016 the broader Exception catch is the anti-pattern; + only missing binary, timeout, and OS errors are translated. + """ + + def test_no_match_returns_none(self): + with patch("subprocess.run", return_value=_completed("unknown")): + assert parse_tool_version(["x", "--version"], r"^expected (\S+)") is None + + def test_missing_binary_returns_none(self): + with patch("subprocess.run", side_effect=FileNotFoundError): + assert parse_tool_version(["nope", "--version"], r"(\S+)") is None + + def test_timeout_returns_none(self): + with patch("subprocess.run", side_effect=subprocess.TimeoutExpired("x", 5)): + assert parse_tool_version(["x", "--version"], r"(\S+)") is None + + def test_oserror_returns_none(self): + with patch("subprocess.run", side_effect=OSError("permission denied")): + assert parse_tool_version(["x", "--version"], r"(\S+)") is None + + def test_empty_output_returns_none(self): + with patch("subprocess.run", return_value=_completed("")): + assert parse_tool_version(["x", "--version"], r"(\S+)") is None + + def test_does_not_swallow_unexpected_exceptions(self): + # Bugs inside subprocess.run shouldn't be silently translated to None. + with patch("subprocess.run", side_effect=ValueError("unexpected")): + with pytest.raises(ValueError): + parse_tool_version(["x", "--version"], r"(\S+)") diff --git a/argus/tests/scanners/test_osv.py b/argus/tests/scanners/test_osv.py index 460fb10c..2f8394de 100644 --- a/argus/tests/scanners/test_osv.py +++ b/argus/tests/scanners/test_osv.py @@ -3,9 +3,14 @@ import pytest from argus.core.models import Severity +from argus.core.scanner_template import ScanPaths from argus.scanners.osv import OsvScanner +_LOCAL = ScanPaths(workspace=".", output="/tmp/out.json") +_CONTAINER = ScanPaths(workspace="/workspace", output="/output/results.json") + + class TestOsvParseResults: """Test OsvScanner.parse_results with fixture data.""" @@ -62,68 +67,66 @@ def test_supports_sbom(self): class TestOsvSbomMode: - """OSV should accept an SBOM via config['sbom_path'] and add --sbom.""" + """SBOM mode (config['sbom_path'] set) → uses ``-L`` (osv-scanner v2).""" - def test_local_command_uses_sbom_flag(self): - from pathlib import Path - scanner = OsvScanner() - cmd = scanner._build_command( - path=".", - output_file=Path("/tmp/out.json"), - config={"sbom_path": "/shared/sbom.json"}, - ) - assert "--sbom" in cmd - assert "/shared/sbom.json" in cmd - # --recursive / path arg should NOT appear in SBOM mode - assert "--recursive" not in cmd - assert "." not in cmd - - def test_container_args_use_sbom_flag(self): - """osv-scanner v2 uses `-L` for SBOM input.""" - args = OsvScanner().container_args({ + def test_local_uses_sbom_flag(self): + args = OsvScanner().build_args(_LOCAL, {"sbom_path": "/shared/sbom.json"}) + assert "-L" in args + assert "/shared/sbom.json" in args + # SBOM mode never adds --recursive or the workspace path. + assert "--recursive" not in args + assert "." not in args + + def test_container_uses_sbom_flag_with_mount_path(self): + args = OsvScanner().build_args(_CONTAINER, { "sbom_path": "/host/sbom.json", "sbom_mount_path": "/sbom/sbom.json", }) - # v2 CLI: scan -L instead of deprecated --sbom assert "-L" in args assert "/sbom/sbom.json" in args assert "scan" in args assert "--format" in args def test_sbom_mode_ignores_lockfile_and_recursive(self): - """SBOM mode ignores lockfile/recursive options — uses only the SBOM.""" - args = OsvScanner().container_args({ + args = OsvScanner().build_args(_CONTAINER, { "sbom_path": "/host/sbom.json", "sbom_mount_path": "/sbom/sbom.json", "lockfile": "requirements.txt", "recursive": True, }) - # SBOM mode uses -L for the SBOM, but lockfile/recursive are ignored assert "-L" in args assert "/sbom/sbom.json" in args - # The specific lockfile should NOT appear assert "requirements.txt" not in " ".join(args) assert "--recursive" not in args - def test_container_args_sbom_fallback_path(self): - """When sbom_mount_path is not provided, fall back to /workspace/{sbom_path}.""" - args = OsvScanner().container_args({ + def test_container_sbom_fallback_to_workspace_path(self): + """No sbom_mount_path → fall back to ``/``.""" + args = OsvScanner().build_args(_CONTAINER, { "sbom_path": "my-sbom.spdx.json", - # No sbom_mount_path — should fall back }) assert "-L" in args assert "/workspace/my-sbom.spdx.json" in args -class TestOsvContainerArgs: - """Test OsvScanner.container_args for non-SBOM modes.""" +class TestOsvSourceMode: + """Non-SBOM mode — ``scan source`` with optional lockfile / recursive / config.""" - def test_container_args_with_config_file(self): - """config_file should be passed via --config flag.""" - args = OsvScanner().container_args({ + def test_config_file_passed_as_workspace_relative_config_flag(self): + args = OsvScanner().build_args(_CONTAINER, { "config_file": "osv-scanner.toml", }) assert "--config" in args assert "/workspace/osv-scanner.toml" in args - # Should be in source scan mode - assert "source" in args + assert "source" in args # source mode + + def test_lockfile_disables_recursive(self): + args = OsvScanner().build_args(_CONTAINER, {"lockfile": "requirements.txt"}) + assert "-L" in args + assert "/workspace/requirements.txt" in args + # When -L lockfile is set, the workspace path and --recursive are dropped. + assert "--recursive" not in args + + def test_recursive_default_true(self): + args = OsvScanner().build_args(_CONTAINER, {}) + assert "--recursive" in args + assert "/workspace" in args diff --git a/argus/tests/scanners/test_scanner_scan_methods.py b/argus/tests/scanners/test_scanner_scan_methods.py index d665efd6..eaa82c65 100644 --- a/argus/tests/scanners/test_scanner_scan_methods.py +++ b/argus/tests/scanners/test_scanner_scan_methods.py @@ -12,6 +12,7 @@ import pytest from argus.core.models import Severity +from argus.core.scanner_template import ScanPaths from argus.scanners.bandit import BanditScanner from argus.scanners.clamav import ClamavScanner from argus.scanners.checkov import CheckovScanner @@ -20,6 +21,12 @@ from argus.scanners.osv import OsvScanner +def _make_paths(tmp_path, filename="results.json"): + """Build a ScanPaths for unit-testing build_args.""" + out = tmp_path / filename + return ScanPaths(workspace="src/", output=str(out)) + + # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- @@ -148,31 +155,31 @@ def _write_fixture(path: Path, data): # Bandit # ===================================================================== -class TestBanditBuildCommand: - """Test BanditScanner._build_command construction.""" +class TestBanditBuildArgs: + """Test BanditScanner.build_args construction.""" def test_base_command(self, tmp_path): scanner = BanditScanner() - out = tmp_path / "out.json" - cmd = scanner._build_command("src/", out, {}) + paths = _make_paths(tmp_path) + cmd = scanner.build_args(paths, {}) assert cmd[0] == "bandit" assert "-r" in cmd - assert "src/" in cmd + assert paths.workspace in cmd assert "--exit-zero" in cmd def test_includes_config_file(self, tmp_path): scanner = BanditScanner() - out = tmp_path / "out.json" - cmd = scanner._build_command(".", out, {"config_file": ".bandit.yml"}) + paths = _make_paths(tmp_path) + cmd = scanner.build_args(paths, {"config_file": ".bandit.yml"}) assert "-c" in cmd - assert ".bandit.yml" in cmd + assert ".bandit.yml" in " ".join(cmd) def test_includes_exclude(self, tmp_path): scanner = BanditScanner() - out = tmp_path / "out.json" - cmd = scanner._build_command(".", out, {"exclude": "tests/"}) + paths = _make_paths(tmp_path) + cmd = scanner.build_args(paths, {"exclude": "tests/"}) assert "--exclude" in cmd assert "tests/" in cmd @@ -210,7 +217,6 @@ def fake_run(cmd, **kwargs): assert result.scanner == "bandit" assert "error" in result.metadata - assert result.metadata["returncode"] == 2 # ===================================================================== @@ -245,13 +251,13 @@ def fake_run(cmd, **kwargs): # Gitleaks # ===================================================================== -class TestGitleaksBuildCommand: - """Test GitleaksScanner._build_command construction.""" +class TestGitleaksBuildArgs: + """Test GitleaksScanner.build_args construction.""" def test_base_command(self, tmp_path): scanner = GitleaksScanner() - out = tmp_path / "out.json" - cmd = scanner._build_command(".", out, {}) + paths = _make_paths(tmp_path) + cmd = scanner.build_args(paths, {}) assert cmd[0] == "gitleaks" assert "detect" in cmd @@ -259,11 +265,11 @@ def test_base_command(self, tmp_path): def test_includes_config_file(self, tmp_path): scanner = GitleaksScanner() - out = tmp_path / "out.json" - cmd = scanner._build_command(".", out, {"config_file": ".gitleaks.toml"}) + paths = _make_paths(tmp_path) + cmd = scanner.build_args(paths, {"config_file": ".gitleaks.toml"}) assert "--config" in cmd - assert ".gitleaks.toml" in cmd + assert ".gitleaks.toml" in " ".join(cmd) class TestGitleaksScan: @@ -302,25 +308,25 @@ def fake_run(cmd, **kwargs): # OSV # ===================================================================== -class TestOsvBuildCommand: - """Test OsvScanner._build_command construction.""" +class TestOsvBuildArgs: + """Test OsvScanner.build_args construction.""" def test_base_command(self, tmp_path): - scanner = OsvScanner() - out = tmp_path / "out.json" - cmd = scanner._build_command(".", out, {}) + out = str(tmp_path / "out.json") + paths = ScanPaths(workspace=".", output=out) + cmd = OsvScanner().build_args(paths, {}) assert cmd[0] == "osv-scanner" assert "scan" in cmd assert "--format" in cmd def test_includes_config_file(self, tmp_path): - scanner = OsvScanner() - out = tmp_path / "out.json" - cmd = scanner._build_command(".", out, {"config_file": "osv.toml"}) + out = str(tmp_path / "out.json") + paths = ScanPaths(workspace=".", output=out) + cmd = OsvScanner().build_args(paths, {"config_file": "osv.toml"}) assert "--config" in cmd - assert "osv.toml" in cmd + assert "osv.toml" in " ".join(cmd) class TestOsvScan: @@ -330,7 +336,8 @@ def test_successful_scan(self, monkeypatch): scanner = OsvScanner() def fake_run(cmd, **kwargs): - output_idx = cmd.index("--output") + 1 + # build_args uses --output-file (osv-scanner v2 API) + output_idx = cmd.index("--output-file") + 1 output_path = Path(cmd[output_idx]) _write_fixture(output_path, _OSV_FIXTURE) return _completed_process() @@ -359,28 +366,31 @@ def fake_run(cmd, **kwargs): # Checkov # ===================================================================== -class TestCheckovBuildCommand: - """Test CheckovScanner._build_command construction.""" +class TestCheckovBuildArgs: + """Test CheckovScanner.build_args construction.""" - def test_base_command(self): + def test_base_command(self, tmp_path): scanner = CheckovScanner() - cmd = scanner._build_command("infra/", {}) + paths = _make_paths(tmp_path) + cmd = scanner.build_args(paths, {}) assert cmd[0] == "checkov" assert "-d" in cmd - assert "infra/" in cmd + assert paths.workspace in cmd assert "--quiet" in cmd - def test_includes_framework(self): + def test_includes_framework(self, tmp_path): scanner = CheckovScanner() - cmd = scanner._build_command(".", {"framework": "terraform"}) + paths = _make_paths(tmp_path) + cmd = scanner.build_args(paths, {"framework": "terraform"}) assert "--framework" in cmd assert "terraform" in cmd - def test_includes_check_and_skip_check(self): + def test_includes_check_and_skip_check(self, tmp_path): scanner = CheckovScanner() - cmd = scanner._build_command(".", { + paths = _make_paths(tmp_path) + cmd = scanner.build_args(paths, { "check": "CKV_AWS_1", "skip_check": "CKV_AWS_2", }) @@ -434,29 +444,29 @@ def fake_run(cmd, **kwargs): result = scanner.scan(".") assert result.scanner == "checkov" - assert result.metadata.get("error") == "No output produced" + assert "error" in result.metadata # ===================================================================== # OpenGrep # ===================================================================== -class TestOpengrepBuildCommand: - """Test OpengrepScanner._build_command construction.""" +class TestOpengrepBuildArgs: + """Test OpengrepScanner.build_args construction.""" def test_base_command(self, tmp_path): scanner = OpengrepScanner() - out = tmp_path / "out.json" - cmd = scanner._build_command("src/", out, {}) + paths = _make_paths(tmp_path) + cmd = scanner.build_args(paths, {}) assert cmd[0] == "opengrep" assert "--json" in cmd - assert "src/" in cmd + assert paths.workspace in cmd def test_includes_config(self, tmp_path): scanner = OpengrepScanner() - out = tmp_path / "out.json" - cmd = scanner._build_command(".", out, {"config": "p/python"}) + paths = _make_paths(tmp_path) + cmd = scanner.build_args(paths, {"config": "p/python"}) assert "--config" in cmd assert "p/python" in cmd @@ -492,4 +502,3 @@ def fake_run(cmd, **kwargs): assert result.scanner == "opengrep" assert "error" in result.metadata - assert result.metadata["returncode"] == 1 diff --git a/argus/tests/test_containers.py b/argus/tests/test_containers.py index 01ff2d08..2f469cab 100644 --- a/argus/tests/test_containers.py +++ b/argus/tests/test_containers.py @@ -70,11 +70,17 @@ def test_all_scanners_have_container_image(self): ) def test_all_scanners_have_container_args(self): + """Scanners must expose container CLI args via build_args or container_args.""" + from argus.core.scanner_template import ScanPaths + for name, cls in self._security_scanners().items(): scanner = cls() - assert hasattr(scanner, "container_args"), ( - f"{name} missing container_args method" + has_build_args = hasattr(scanner, "build_args") + has_container_args = hasattr(scanner, "container_args") + assert has_build_args or has_container_args, ( + f"{name} missing both build_args and container_args" ) + # SBOM-only scanners refuse to build args without an sbom_path # because the SBOM is required for the invocation; pass a # placeholder so this protocol test still exercises them. @@ -84,9 +90,14 @@ def test_all_scanners_have_container_args(self): "sbom_path": "/host/sbom.json", "sbom_mount_path": "/sbom/sbom.json", } - args = scanner.container_args(cfg) + + if has_build_args: + paths = ScanPaths(workspace="/workspace", output="/output/results.json") + args = scanner.build_args(paths, cfg or {}) + else: + args = scanner.container_args(cfg) assert isinstance(args, list), ( - f"{name}.container_args() should return list" + f"{name} container args method should return list" ) diff --git a/argus/tests/test_init.py b/argus/tests/test_init.py index f3a4f9f5..c8ddfcd9 100644 --- a/argus/tests/test_init.py +++ b/argus/tests/test_init.py @@ -294,14 +294,36 @@ def fake_check(names, backend="auto"): def test_empty_names_returns_none(self): assert _check_local_readiness([]) is None - def test_swallows_exceptions(self, monkeypatch): + def test_swallows_import_error(self, monkeypatch): + """Missing optional preflight import → silently skip readiness.""" + import builtins + + real_import = builtins.__import__ + + def fake_import(name, *args, **kwargs): + if "preflight" in name: + raise ImportError("preflight extra not installed") + return real_import(name, *args, **kwargs) + + monkeypatch.setattr(builtins, "__import__", fake_import) + assert _check_local_readiness(["a"]) is None + + def test_runtime_errors_surface(self, monkeypatch): + """Bugs inside check_scanner_readiness propagate (ADR-016). + + Init silently swallowing arbitrary exceptions hid real bugs; + the readiness check is best-effort *only* against the optional + import being absent, not against logic errors in the check + itself. + """ def boom(*args, **kwargs): raise RuntimeError("registry exploded") monkeypatch.setattr( "argus.preflight.tool_check.check_scanner_readiness", boom ) - assert _check_local_readiness(["a"]) is None + with pytest.raises(RuntimeError, match="registry exploded"): + _check_local_readiness(["a"]) class TestRunInitReadinessOutput: