diff --git a/.ai/errors.yaml b/.ai/errors.yaml index 34413e7..72db8d7 100644 --- a/.ai/errors.yaml +++ b/.ai/errors.yaml @@ -374,3 +374,138 @@ scn_detector_errors: how: "Set enable_ai_fallback: true and provide ANTHROPIC_API_KEY" - description: "Review manually" how: "MANUAL_REVIEW is the expected safe default for unrecognized changes" + + # ============================================================================== + # SCANNER EXECUTION FAILURES + # ============================================================================== + + - pattern: "exit 127|exec.*not found" + category: scanner-execution + context: "Container scanner exits 127 immediately (e.g. osv-scanner)" + + root_cause: | + Docker's --entrypoint override does NOT consult the image's $PATH + when given a bare binary name. Some images declare an absolute + entrypoint (e.g. ghcr.io/google/osv-scanner uses + ENTRYPOINT ["/osv-scanner"]) and require the absolute path on + the override. A bare ``osv-scanner`` resolves nowhere and exits + 127. + + solution: + steps: + - "Run ``docker image inspect `` and read ``Config.Entrypoint``" + - "Set ``container_entrypoint`` on the scanner class to the absolute path from that field" + verification: "Engine strips argv[0] for ENTRYPOINT-based images, so build_args may keep the bare name" + + reference: "argus/scanners/osv.py — see container_entrypoint = '/osv-scanner'" + + - pattern: "yamllint.*PASS.*0 findings|lint-yaml.*passes despite errors" + category: scanner-execution + context: "Linter exits non-zero but reports zero findings and Status: PASS" + + root_cause: | + Tools like yamllint use exit codes to differentiate happy-path + lint failures (exit 1, with parseable findings on stdout) from + runtime errors (exit ≥ 2, empty stdout). Naive callers that map + empty stdout to ``[]`` lose the runtime-error signal entirely + and surface ``Status: PASS`` even when the tool failed to run. + + solution: + steps: + - "Distinguish exit codes: 0 = clean, 1 = findings, ≥2 = real failure" + - "When exit ≥ 2 with empty findings, set metadata['execution_failed']=True and metadata['execution_failure_reason']=" + - "Use the same shape the engine container path emits — the terminal reporter and --fail-on-scanner-error key off these exact field names" + + reference: "argus/linters/yamllint.py — scan() exit-code branching" + + - pattern: "scanner produced no output|execution_failed" + category: scanner-execution + context: "Reporter shows Warning row but Status: PASS contradicts it" + + root_cause: | + Threshold compliance (``ScanSummary.passed``) and execution + success are independent signals. A scanner that fails to run + produces zero findings, which alone passes any threshold. The + reporter must label PASS as ``PASS (degraded — some scanners + did not run)`` whenever any scanner has + metadata['execution_failed']=True, otherwise the Warning above + and the Status below contradict each other. + + solution: + steps: + - "Reporter checks for any result.metadata.get('execution_failed')" + - "If passed and any failed: print 'Status: PASS (degraded — N did not run, M unparsable)'" + - "Add --fail-on-scanner-error in CI for hard-fail behavior" + + reference: "argus/reporters/terminal.py::_print_status" + + - pattern: "0 findings.*known to be vulnerable|JSONDecodeError.*results.json" + category: scanner-execution + context: "Scanner produced output but parser couldn't interpret it (third state)" + + root_cause: | + Distinct from execution-failure: the scanner ran, exited, and + wrote a results file — we just couldn't parse what came out + (schema drift, truncated JSON, mixed text+JSON). Previously + this surfaced as a stack trace from the engine's exception + handler and got rolled up as a generic "scanner failed". + Reporters and CI gates now have a fourth state for it: + ``parse_failed`` + ``parse_failure_reason`` (carries the + exception type and a clipped output head). + + solution: + steps: + - "Engine's container path wraps scanner.parse_results in try/except" + - "On any Exception, set metadata['parse_failed']=True and metadata['parse_failure_reason']" + - "scanner_template.run_subprocess_scan does the same for local execution" + - "TerminalReporter renders parse_failed in its own warning block" + - "--fail-on-scanner-error fires on parse_failed too" + + reference: "argus/core/engine.py::_run_in_container — try/except around parse_results" + + # ============================================================================== + # WINDOWS-SPECIFIC SCANNER ERRORS + # ============================================================================== + + - pattern: "PermissionError.*WinError 5.*Access is denied|yamllint.*Access is denied" + category: scanner-execution + context: "yamllint launches with PermissionError on Windows hosts with AppLocker / SRP" + + root_cause: | + AppLocker or Software Restriction Policy on Windows blocks + executable launches from user AppData paths (where pip --user + and virtualenv typically install scripts). The python + interpreter itself is whitelisted, so loading the same + package via ``python -m yamllint`` works on the same + machine. + + solution: + steps: + - "YamllintLinter._run_with_windows_fallback wraps subprocess.run" + - "On sys.platform == 'win32', PermissionError/OSError triggers a retry with [sys.executable, '-m', 'yamllint'] + cmd[1:]" + - "FileNotFoundError still propagates so 'yamllint not installed' renders cleanly" + - "Linux/macOS bypass the fallback — Linux PermissionError is a genuine bug, not a policy case" + + reference: "argus/linters/yamllint.py::_run_with_windows_fallback" + + - pattern: "UnicodeDecodeError.*charmap codec.*can't decode byte" + category: scanner-execution + context: "Scanner result decode fails on Windows with cp1252 default encoding" + + root_cause: | + Docker container output (and most CLI tool output) is UTF-8. + ``subprocess.run(text=True)`` and ``Path.read_text()`` fall + back to the platform default encoding when ``encoding=`` is + omitted — cp1252 on Windows. Any non-ASCII byte (CVE + descriptions, accented file paths, scanner banners) raises + UnicodeDecodeError mid-scan. + + solution: + steps: + - "Engine docker subprocess: encoding='utf-8', errors='replace'" + - "scanner_template subprocess: same" + - "All scanner.parse_results read_text() calls: same" + - "Yamllint subprocess + Windows fallback: same" + note: "errors='replace' is preferred over 'strict' — a security tool showing � is better than crashing on otherwise-usable output" + + reference: "argus/core/engine.py and every argus/scanners/*.py read_text()" diff --git a/argus/cli.py b/argus/cli.py index 543b035..d588f01 100644 --- a/argus/cli.py +++ b/argus/cli.py @@ -1693,20 +1693,37 @@ def _cmd_source_scan(args: argparse.Namespace) -> int: r.scanner for r in summary.results if r.metadata.get("execution_failed") ] + scanner_parse_failures = [ + r.scanner for r in summary.results + if r.metadata.get("parse_failed") + ] if not summary.passed: exit_code = EXIT_FINDINGS elif sbom_batch_failures: exit_code = EXIT_ERROR elif ( getattr(args, "fail_on_scanner_error", False) - and scanner_execution_failures + and (scanner_execution_failures or scanner_parse_failures) ): - log.error( - "Exiting non-zero: %d scanner(s) produced no output (%s) and " - "--fail-on-scanner-error is set.", - len(scanner_execution_failures), - ", ".join(scanner_execution_failures), - ) + # Both states represent "the scan didn't fully succeed": + # execution_failed = couldn't run; parse_failed = ran but + # output unintelligible. From a CI gating perspective they're + # equivalent — the user asked for a hard fail when scanners + # don't deliver clean results. + if scanner_execution_failures: + log.error( + "Exiting non-zero: %d scanner(s) did not run cleanly " + "(%s) and --fail-on-scanner-error is set.", + len(scanner_execution_failures), + ", ".join(scanner_execution_failures), + ) + if scanner_parse_failures: + log.error( + "Exiting non-zero: %d scanner(s) produced unparsable " + "output (%s) and --fail-on-scanner-error is set.", + len(scanner_parse_failures), + ", ".join(scanner_parse_failures), + ) exit_code = EXIT_ERROR else: exit_code = EXIT_SUCCESS diff --git a/argus/core/engine.py b/argus/core/engine.py index 932fb00..bd3c7cb 100644 --- a/argus/core/engine.py +++ b/argus/core/engine.py @@ -2,6 +2,7 @@ import logging import os +import platform import shutil import subprocess import tempfile @@ -674,7 +675,15 @@ def _run_in_container( # - holds only one scan's transient output (no secrets; # findings travel through ``parse_results`` and end up # in the user-specified output_dir, never here). - os.chmod(output_dir, 0o777) + # + # Skip on Windows: NTFS doesn't honor POSIX bits, ``os.chmod`` + # only flips the read-only attribute, and Docker Desktop on + # Windows handles uid mapping for bind mounts differently + # (it doesn't suffer from the macOS uid-mismatch failure mode + # this guard exists for). Calling ``chmod 0o777`` there is + # at best a no-op and at worst confusing in stack traces. + if platform.system() != "Windows": + os.chmod(output_dir, 0o777) docker_cmd = [ self._runtime, "run", "--rm", @@ -738,10 +747,21 @@ def _run_in_container( ) start = time.monotonic() + # Docker container output is always UTF-8. Without + # ``encoding='utf-8'``, ``text=True`` falls back to the + # platform default — cp1252 on Windows — which raises + # ``UnicodeDecodeError`` on any non-ASCII byte the + # scanner emits (CVE descriptions, file paths with + # non-ASCII characters, etc.). ``errors='replace'`` is + # a safe fallback over ``strict``: a security tool + # showing ``�`` is better than crashing the whole + # scan on output we'd otherwise be able to use. proc = subprocess.run( docker_cmd, capture_output=True, text=True, + encoding="utf-8", + errors="replace", ) elapsed = int((time.monotonic() - start) * 1000) @@ -854,35 +874,65 @@ def _run_in_container( f"no output files and no stdout (exit={proc.returncode})" ) if result_files and hasattr(scanner, "parse_results"): - parsed = scanner.parse_results(result_files[0]) - # parse_results may return either a list of Findings, - # a ``(list, int)`` tuple (legacy passed_count channel, - # used by linters), or a ``(list, dict)`` tuple (extra - # metadata merged into ScanResult.metadata — used by - # Grype to flag "source.target=unknown" which means - # "couldn't identify packages" rather than "nothing - # vulnerable"). - if isinstance(parsed, tuple): - findings, extra = parsed - if isinstance(extra, int): - metadata_extra["passed_count"] = extra - elif isinstance(extra, dict): - metadata_extra.update(extra) - # Warn at the engine layer too so the signal is - # visible even when a reporter doesn't render - # per-scanner metadata. - if "warning" in extra: - logger.warning( - "Scanner '%s': %s", - scanner.name, extra["warning"], - ) + try: + parsed = scanner.parse_results(result_files[0]) + except Exception as exc: + # Scanner produced output but the parser couldn't + # interpret it (e.g. osv-scanner v2 rev'd its + # schema, truncated output, mixed text+JSON). This + # is a third state distinct from "execution failed" + # and "ran clean" — we surface it as + # ``parse_failed`` so the reporter can show "OSV + # produced 12KB of output we couldn't parse" rather + # than the misleading "no output produced". The + # parser bug doesn't crash the rest of the scan; + # other scanners' results are still useful. + head = "" + try: + head = result_files[0].read_text( + encoding="utf-8", errors="replace", + )[:200] + except OSError: + head = "" + metadata_extra["parse_failed"] = True + metadata_extra["parse_failure_reason"] = ( + f"{type(exc).__name__}: {exc}. " + f"output head: {head!r}" + ) + logger.warning( + "Scanner '%s' produced output but parse failed: %s", + scanner.name, exc, + ) + findings = [] else: - findings = parsed - logger.debug( - "Parsed %d finding(s) from %s", - len(findings), - result_files[0].name, - ) + # parse_results may return either a list of Findings, + # a ``(list, int)`` tuple (legacy passed_count channel, + # used by linters), or a ``(list, dict)`` tuple (extra + # metadata merged into ScanResult.metadata — used by + # Grype to flag "source.target=unknown" which means + # "couldn't identify packages" rather than "nothing + # vulnerable"). + if isinstance(parsed, tuple): + findings, extra = parsed + if isinstance(extra, int): + metadata_extra["passed_count"] = extra + elif isinstance(extra, dict): + metadata_extra.update(extra) + # Warn at the engine layer too so the signal is + # visible even when a reporter doesn't render + # per-scanner metadata. + if "warning" in extra: + logger.warning( + "Scanner '%s': %s", + scanner.name, extra["warning"], + ) + else: + findings = parsed + logger.debug( + "Parsed %d finding(s) from %s", + len(findings), + result_files[0].name, + ) return ScanResult( scanner=scanner.name, diff --git a/argus/core/scanner_template.py b/argus/core/scanner_template.py index 6c453c7..aad9217 100644 --- a/argus/core/scanner_template.py +++ b/argus/core/scanner_template.py @@ -118,7 +118,13 @@ def run_subprocess_scan( Returns: A :class:`ScanResult` with ``findings`` populated on success or - an ``error`` metadata key when execution fails. + ``metadata["execution_failed"] = True`` when the underlying + tool failed to run. The terminal reporter, viewers, and + ``--fail-on-scanner-error`` all key off ``execution_failed``; + using the same metadata shape that the engine's container path + emits (see ``argus/core/engine.py::_run_in_container``) keeps + local-execution and container-execution failures uniformly + visible without per-path special-casing. """ config = config or {} @@ -130,21 +136,39 @@ def run_subprocess_scan( logger.debug("[%s] running: %s", scanner.name, " ".join(cmd)) try: + # Explicit UTF-8 encoding for stdout/stderr capture: the + # platform default (cp1252 on Windows) raises + # ``UnicodeDecodeError`` on non-ASCII output (path names, + # CVE descriptions, scanner banners). All container and + # CLI scanner output is UTF-8, so we lock that in + # uniformly across local- and container-execution paths. result = subprocess.run( cmd, capture_output=True, text=True, timeout=timeout, + encoding="utf-8", + errors="replace", ) except FileNotFoundError as exc: return ScanResult( scanner=scanner.name, - metadata={"error": f"Tool not found: {exc.filename or cmd[0]}"}, + metadata={ + "execution_failed": True, + "execution_failure_reason": ( + f"Tool not found: {exc.filename or cmd[0]}" + ), + }, ) except subprocess.TimeoutExpired: return ScanResult( scanner=scanner.name, - metadata={"error": f"Scanner timed out after {timeout}s"}, + metadata={ + "execution_failed": True, + "execution_failure_reason": ( + f"Scanner timed out after {timeout}s" + ), + }, ) if not output_file.exists(): @@ -158,14 +182,45 @@ def run_subprocess_scan( return ScanResult( scanner=scanner.name, metadata={ - "error": ( + "execution_failed": True, + "execution_failure_reason": ( f"No output produced (exit={result.returncode}). " f"stderr: {(result.stderr or '').strip()[:400]}" ), }, ) - parsed = scanner.parse_results(output_file) + try: + parsed = scanner.parse_results(output_file) + except Exception as exc: + # ``parse_failed`` is a distinct state from ``execution_failed``: + # the scanner *did* run and produced an output file we just + # couldn't interpret. Schema drift (e.g. osv-scanner v2 + # rev'd its JSON), truncated output, or non-JSON content + # land here. Surface it as its own signal — reporters + # render parse failures separately, and ``--fail-on-scanner- + # error`` keys off both flags. We deliberately don't + # re-raise: a parser bug shouldn't crash the whole scan + # when the rest of the run is still useful. + head = "" + try: + head = output_file.read_text( + encoding="utf-8", errors="replace", + )[:200] + except OSError: + head = "" + return ScanResult( + scanner=scanner.name, + raw_report=output_file, + metadata={ + "parse_failed": True, + "parse_failure_reason": ( + f"{type(exc).__name__}: {exc}. " + f"output head: {head!r}" + ), + }, + ) + # ``parse_results`` may return ``list[Finding]`` (most scanners), # a ``(list, int)`` tuple (linter passed-count channel — used by # checkov), or a ``(list, dict)`` tuple (extra metadata). Engine diff --git a/argus/linters/yamllint.py b/argus/linters/yamllint.py index 5787c9c..a1d4550 100644 --- a/argus/linters/yamllint.py +++ b/argus/linters/yamllint.py @@ -1,11 +1,16 @@ """YAML linter wrapping yamllint.""" +import logging import shutil import subprocess +import sys from argus.core.models import Finding, ScanResult, Severity +logger = logging.getLogger("argus.scanner") + + class YamllintLinter: """Wraps yamllint to lint YAML files for syntax and style issues.""" @@ -15,17 +20,79 @@ class YamllintLinter: languages = ["yaml"] def scan(self, path: str, config: dict | None = None) -> ScanResult: - """Run yamllint against the given path and return results.""" + """Run yamllint against the given path and return results. + + yamllint exit codes (per its docs): + * 0 — no problems found + * 1 — lint problems found (this is the expected happy path + when violations exist; NOT an execution failure) + * 2+ — yamllint itself failed to run (bad config file path, + unreadable input, internal error). stderr carries the + reason; stdout is empty in this case. + + Previously, exit ≥ 2 was silently dropped: ``_parse_output("")`` + returned ``[]`` and the caller saw a clean ``ScanResult`` with + zero findings. The terminal reporter would then print + ``Scanner: lint-yaml (0 findings)`` followed by ``Status: PASS``, + masking a real execution failure. Now we set the + ``execution_failed`` metadata flag (same shape the engine's + container path produces) so the reporter's + ``Warning: scanner produced no output`` block surfaces it and + ``--fail-on-scanner-error`` correctly fails the run. + """ config = config or {} cmd = self._build_command(path, config) - result = subprocess.run(cmd, capture_output=True, text=True) + try: + result = self._run_with_windows_fallback(cmd) + except FileNotFoundError as exc: + # ``is_available`` is checked before scan() is called, but + # there's a race between that check and the subprocess + # invocation (the binary could be uninstalled between + # them). Treating this as an execution failure — rather + # than letting it propagate — keeps the engine's exception + # handler from rendering a stack trace and lets the + # reporter surface a clean "yamllint not found" reason + # the user can act on. + return ScanResult( + scanner=self.name, + metadata={ + "execution_failed": True, + "execution_failure_reason": ( + f"yamllint binary not found: " + f"{exc.filename or 'yamllint'}" + ), + }, + ) + except OSError as exc: + # Both the direct binary launch AND the Windows + # ``python -m yamllint`` fallback failed. Surface the + # original error so the user sees the actual permission / + # OS reason instead of a generic stack trace. + return ScanResult( + scanner=self.name, + metadata={ + "execution_failed": True, + "execution_failure_reason": ( + f"yamllint launch failed: {type(exc).__name__}: {exc}" + ), + }, + ) findings = self._parse_output(result.stdout) + metadata: dict = {"returncode": result.returncode} + # Exit ≥ 2 with no findings parsed = real runtime/config error. + # Exit 1 with findings is the lint-violations-found happy path. + if result.returncode > 1 and not findings: + metadata["execution_failed"] = True + metadata["execution_failure_reason"] = ( + f"yamllint exited {result.returncode}. " + f"stderr: {(result.stderr or '').strip()[:400]}" + ) return ScanResult( scanner=self.name, findings=findings, - metadata={"returncode": result.returncode}, + metadata=metadata, ) def is_available(self) -> bool: @@ -56,6 +123,70 @@ def tool_version(self) -> str | None: except (subprocess.TimeoutExpired, FileNotFoundError, Exception): return None + def _run_with_windows_fallback( + self, cmd: list[str], + ) -> subprocess.CompletedProcess: + """Run ``yamllint`` directly, with a Windows-only ``python -m`` + fallback for AppLocker / Software Restriction Policy hosts. + + On Windows machines with AppLocker or SRP, executables installed + under user AppData (typical pip --user / virtualenv install + location) get blocked by policy and ``subprocess.run`` raises + ``PermissionError`` — but loading the same package via the + Python interpreter (which is whitelisted) works. ``python -m + yamllint`` invokes yamllint's ``__main__`` module and is + argv-compatible with the binary, so we can swap argv[0] and + retry without touching the result-parsing logic. + + The fallback is platform-guarded (``sys.platform == 'win32'``) + so the Linux invocation path is **byte-identical** to before + — no PATH lookups change, no extra subprocess on the happy + path, no behavioral drift on existing CI runs. + + FileNotFoundError is re-raised here so the outer ``scan()`` + handler can convert it to a clean ``execution_failed`` + ScanResult. PermissionError / other OSError on Linux is also + re-raised (Linux doesn't have AppLocker; if the binary can't + execute there it's a genuine permission bug, not the policy + case this fallback exists for). + + Encoding: explicit ``encoding='utf-8'`` + ``errors='replace'`` + replaces the platform default (cp1252 on Windows) which would + otherwise raise ``UnicodeDecodeError`` on yamllint output + containing non-ASCII characters in user file paths or YAML + values. + """ + try: + return subprocess.run( + cmd, + capture_output=True, + text=True, + encoding="utf-8", + errors="replace", + ) + except FileNotFoundError: + # Re-raise so the outer handler renders "binary not found" + # cleanly. The fallback ``python -m yamllint`` would also + # fail (yamllint isn't installed at all), so retrying is + # pointless. + raise + except OSError as exc: + if sys.platform != "win32": + raise + logger.warning( + "yamllint direct invocation blocked on Windows (%s) — " + "retrying via 'python -m yamllint'", + exc, + ) + fallback_cmd = [sys.executable, "-m", "yamllint"] + cmd[1:] + return subprocess.run( + fallback_cmd, + capture_output=True, + text=True, + encoding="utf-8", + errors="replace", + ) + def _build_command(self, path: str, config: dict) -> list[str]: """Build the yamllint CLI command.""" cmd = ["yamllint", "--format", "parsable"] diff --git a/argus/reporters/terminal.py b/argus/reporters/terminal.py index 76226fc..030326a 100644 --- a/argus/reporters/terminal.py +++ b/argus/reporters/terminal.py @@ -100,26 +100,64 @@ def _print_scanner_results(self, summary: ScanSummary) -> None: print() def _print_status(self, summary: ScanSummary) -> None: - # Scanner-execution failures (no output produced) are flagged - # separately so a single bad scanner image doesn't quietly slip - # past a "Status: PASS" line. The PASS/FAIL line still reflects - # *threshold* outcome; this row reflects *execution* outcome. - # CI-callers who want hard-fail behavior on missing output use - # ``--fail-on-scanner-error``. - failed = [ - r.scanner for r in summary.results + # Three orthogonal signals get rendered here: + # * threshold outcome — drives Status: PASS / FAIL + # * scanner execution outcome — ran-or-not, surfaced as + # ``execution_failed`` by the engine and adapters + # * parse outcome — ran-and-produced-output-but-couldn't-be- + # interpreted, surfaced as ``parse_failed`` + # Each gets its own warning block carrying the scanner-specific + # reason (engine collects ``execution_failure_reason`` / + # ``parse_failure_reason`` per scanner). Generic "uid mismatch + # / crashed / wrong entrypoint" boilerplate is *not* printed — + # it was misleading for OSV exit-1-with-findings and yamllint + # binary-not-found cases. The reason from the adapter wins. + exec_failed = [ + r for r in summary.results if r.metadata.get("execution_failed") ] - if failed: - names = ", ".join(failed) - print(f"Warning: {len(failed)} scanner(s) produced no output: {names}") - print(" These scanners likely failed to execute (uid mismatch on") - print(" /output mount, crashed, or wrong entrypoint). Re-run with") - print(" --verbose for stderr; pass --fail-on-scanner-error to fail") - print(" the scan when this happens.") + parse_failed = [ + r for r in summary.results + if r.metadata.get("parse_failed") + ] + + if exec_failed: + print(f"Warning: {len(exec_failed)} scanner(s) did not run cleanly:") + for r in exec_failed: + reason = r.metadata.get( + "execution_failure_reason", "no reason recorded", + ) + print(f" - {r.scanner}: {reason}") + print( + " Pass --fail-on-scanner-error to fail the scan when " + "this happens." + ) + + if parse_failed: + print( + f"Warning: {len(parse_failed)} scanner(s) produced " + f"output that could not be parsed:" + ) + for r in parse_failed: + reason = r.metadata.get( + "parse_failure_reason", "no reason recorded", + ) + print(f" - {r.scanner}: {reason}") + print( + " This usually means a scanner output schema changed; " + "report at https://github.com/huntridge-labs/argus/issues." + ) if summary.passed: - print("Status: PASS") + if exec_failed or parse_failed: + parts = [] + if exec_failed: + parts.append(f"{len(exec_failed)} did not run") + if parse_failed: + parts.append(f"{len(parse_failed)} unparsable") + print(f"Status: PASS (degraded — {', '.join(parts)})") + else: + print("Status: PASS") else: threshold = summary.severity_threshold.value if summary.severity_threshold else "none" print(f"Status: FAIL (findings above threshold: {threshold})") diff --git a/argus/scanners/bandit.py b/argus/scanners/bandit.py index aab98fe..5efc0d8 100644 --- a/argus/scanners/bandit.py +++ b/argus/scanners/bandit.py @@ -83,7 +83,7 @@ def tool_version(self) -> str | None: def parse_results(self, raw_output_path: Path) -> list[Finding]: """Parse Bandit JSON output into findings.""" - data = json.loads(raw_output_path.read_text()) + data = json.loads(raw_output_path.read_text(encoding="utf-8", errors="replace")) return [self._parse_finding(item) for item in data.get("results", [])] def _parse_finding(self, item: dict) -> Finding: diff --git a/argus/scanners/checkov.py b/argus/scanners/checkov.py index 3516a56..cd13743 100644 --- a/argus/scanners/checkov.py +++ b/argus/scanners/checkov.py @@ -74,7 +74,7 @@ def tool_version(self) -> str | None: def parse_results(self, raw_output_path: Path) -> tuple[list[Finding], int]: """Parse Checkov JSON output into findings and passed count.""" - text = raw_output_path.read_text().strip() + text = raw_output_path.read_text(encoding="utf-8", errors="replace").strip() if not text: return [], 0 diff --git a/argus/scanners/clamav.py b/argus/scanners/clamav.py index 2c84b43..c141404 100644 --- a/argus/scanners/clamav.py +++ b/argus/scanners/clamav.py @@ -83,7 +83,7 @@ def tool_version(self) -> str | None: def parse_results(self, raw_output_path: Path) -> list[Finding]: """Parse ClamAV text output file into findings.""" - text = raw_output_path.read_text() + text = raw_output_path.read_text(encoding="utf-8", errors="replace") return self.parse_results_text(text) def parse_results_text(self, text: str) -> list[Finding]: diff --git a/argus/scanners/container.py b/argus/scanners/container.py index 7b796e9..fa5cd82 100644 --- a/argus/scanners/container.py +++ b/argus/scanners/container.py @@ -133,7 +133,7 @@ def tool_version(self) -> str | None: def parse_trivy_results(self, raw_output_path: Path) -> list[Finding]: """Parse Trivy container JSON output into findings.""" - data = json.loads(raw_output_path.read_text()) + data = json.loads(raw_output_path.read_text(encoding="utf-8", errors="replace")) findings: list[Finding] = [] for target in data.get("Results", []): @@ -144,7 +144,7 @@ def parse_trivy_results(self, raw_output_path: Path) -> list[Finding]: def parse_grype_results(self, raw_output_path: Path) -> list[Finding]: """Parse Grype JSON output into findings.""" - data = json.loads(raw_output_path.read_text()) + data = json.loads(raw_output_path.read_text(encoding="utf-8", errors="replace")) findings: list[Finding] = [] for match in data.get("matches", []): diff --git a/argus/scanners/gitleaks.py b/argus/scanners/gitleaks.py index 15e4da8..91de1df 100644 --- a/argus/scanners/gitleaks.py +++ b/argus/scanners/gitleaks.py @@ -62,7 +62,7 @@ def tool_version(self) -> str | None: def parse_results(self, raw_output_path: Path) -> list[Finding]: """Parse Gitleaks JSON output into findings.""" - text = raw_output_path.read_text().strip() + text = raw_output_path.read_text(encoding="utf-8", errors="replace").strip() if not text: return [] diff --git a/argus/scanners/grype.py b/argus/scanners/grype.py index 6ff6e09..3f3c2ea 100644 --- a/argus/scanners/grype.py +++ b/argus/scanners/grype.py @@ -127,7 +127,7 @@ def parse_results(self, raw_output_path: Path) -> list[Finding]: the metadata gets attached. """ try: - data = json.loads(Path(raw_output_path).read_text()) + data = json.loads(Path(raw_output_path).read_text(encoding="utf-8", errors="replace")) except (json.JSONDecodeError, OSError): return [] matches = data.get("matches") or [] diff --git a/argus/scanners/opengrep.py b/argus/scanners/opengrep.py index f67881d..3dfc489 100644 --- a/argus/scanners/opengrep.py +++ b/argus/scanners/opengrep.py @@ -58,7 +58,7 @@ def tool_version(self) -> str | None: def parse_results(self, raw_output_path: Path) -> list[Finding]: """Parse OpenGrep JSON output into findings.""" - data = json.loads(raw_output_path.read_text()) + data = json.loads(raw_output_path.read_text(encoding="utf-8", errors="replace")) results = data.get("results", []) return [self._parse_finding(item) for item in results] diff --git a/argus/scanners/osv.py b/argus/scanners/osv.py index 10c941e..20f3b6c 100644 --- a/argus/scanners/osv.py +++ b/argus/scanners/osv.py @@ -19,9 +19,13 @@ class OsvScanner: languages = ["all"] container_image = get_image("osv-scanner") supports_sbom = True - # The official osv-scanner image uses ENTRYPOINT ["osv-scanner"]; engine - # strips argv[0] for ENTRYPOINT-based images. - container_entrypoint = "osv-scanner" + # The official osv-scanner image uses ENTRYPOINT ["/osv-scanner"] + # — note the absolute path. ``$PATH`` inside the image does NOT + # resolve a bare ``osv-scanner``; ``--entrypoint osv-scanner`` + # would exit 127. Pass the absolute path explicitly. Engine strips + # argv[0] for ENTRYPOINT-based images so the binary name in + # build_args() is informational only. + container_entrypoint = "/osv-scanner" def scan(self, path: str, config: dict | None = None) -> ScanResult: """Run OSV-Scanner against the given path and return results.""" @@ -99,7 +103,7 @@ def tool_version(self) -> str | None: def parse_results(self, raw_output_path: Path) -> list[Finding]: """Parse OSV-Scanner JSON output into findings.""" - text = raw_output_path.read_text().strip() + text = raw_output_path.read_text(encoding="utf-8", errors="replace").strip() if not text: return [] diff --git a/argus/scanners/supply_chain.py b/argus/scanners/supply_chain.py index 0944dad..e3e5165 100644 --- a/argus/scanners/supply_chain.py +++ b/argus/scanners/supply_chain.py @@ -131,7 +131,7 @@ def parse_results(self, raw_output_path: Path) -> list[Finding]: For explicit parsing, use parse_zizmor_results or parse_actionlint_results directly. """ - data = json.loads(raw_output_path.read_text()) + data = json.loads(raw_output_path.read_text(encoding="utf-8", errors="replace")) # SARIF format (zizmor) if "$schema" in data or "runs" in data: @@ -145,7 +145,7 @@ def parse_results(self, raw_output_path: Path) -> list[Finding]: def parse_zizmor_results(self, raw_output_path: Path) -> list[Finding]: """Parse zizmor SARIF output into findings.""" - data = json.loads(raw_output_path.read_text()) + data = json.loads(raw_output_path.read_text(encoding="utf-8", errors="replace")) findings: list[Finding] = [] for run in data.get("runs", []): @@ -162,7 +162,7 @@ def parse_actionlint_results( self, raw_output_path: Path ) -> list[Finding]: """Parse actionlint JSON output into findings.""" - data = json.loads(raw_output_path.read_text()) + data = json.loads(raw_output_path.read_text(encoding="utf-8", errors="replace")) if not isinstance(data, list): return [] diff --git a/argus/scanners/trivy.py b/argus/scanners/trivy.py index e2088ff..4b77cf5 100644 --- a/argus/scanners/trivy.py +++ b/argus/scanners/trivy.py @@ -106,7 +106,7 @@ def tool_version(self) -> str | None: def parse_results(self, raw_output_path: Path) -> list[Finding]: """Parse Trivy JSON output into Finding objects.""" try: - data = json.loads(Path(raw_output_path).read_text()) + data = json.loads(Path(raw_output_path).read_text(encoding="utf-8", errors="replace")) except (json.JSONDecodeError, OSError): return [] findings: list[Finding] = [] diff --git a/argus/scanners/trivy_iac.py b/argus/scanners/trivy_iac.py index 95b24b2..1aaf7db 100644 --- a/argus/scanners/trivy_iac.py +++ b/argus/scanners/trivy_iac.py @@ -99,7 +99,7 @@ def tool_version(self) -> str | None: def parse_results(self, raw_output_path: Path) -> list[Finding]: """Parse Trivy IaC JSON output into findings.""" - data = json.loads(raw_output_path.read_text()) + data = json.loads(raw_output_path.read_text(encoding="utf-8", errors="replace")) results = data.get("Results", []) findings = [] diff --git a/argus/scanners/zap.py b/argus/scanners/zap.py index f009ef3..1a9bc19 100644 --- a/argus/scanners/zap.py +++ b/argus/scanners/zap.py @@ -95,7 +95,7 @@ def tool_version(self) -> str | None: def parse_results(self, raw_output_path: Path) -> list[Finding]: """Parse ZAP JSON output into findings.""" - data = json.loads(raw_output_path.read_text()) + data = json.loads(raw_output_path.read_text(encoding="utf-8", errors="replace")) findings: list[Finding] = [] for site in data.get("site", []): diff --git a/argus/tests/core/test_scanner_template.py b/argus/tests/core/test_scanner_template.py index 2477e49..677302d 100644 --- a/argus/tests/core/test_scanner_template.py +++ b/argus/tests/core/test_scanner_template.py @@ -66,7 +66,7 @@ def fake_run(cmd, **kwargs): assert result.scanner == "fake" assert result.findings == [finding] - assert "error" not in result.metadata + assert result.metadata.get("execution_failed") is not True def test_passes_workspace_path_through_to_build_args(self, tmp_path): scanner = _FakeScanner() @@ -192,41 +192,67 @@ def fake_run(cmd, **kwargs): class TestFailures: - def test_missing_binary_returns_error_metadata(self, tmp_path): + def test_missing_binary_returns_execution_failed_metadata(self, tmp_path): + # The template uses the same metadata contract as the engine's + # container path: ``execution_failed`` (bool) + + # ``execution_failure_reason`` (string). The terminal reporter, + # ``--fail-on-scanner-error`` gate, and CI viewers all key off + # those exact field names, so the local-execution path has to + # match — otherwise a missing binary would be invisible to the + # warning row and silently roll up as PASS. scanner = _FakeScanner() with patch("subprocess.run", side_effect=FileNotFoundError(2, "no such file", "fake")): result = run_subprocess_scan(scanner, str(tmp_path)) assert result.findings == [] - assert "Tool not found" in result.metadata.get("error", "") + assert result.metadata.get("execution_failed") is True + assert "Tool not found" in result.metadata.get("execution_failure_reason", "") - def test_timeout_returns_error_metadata(self, tmp_path): + def test_timeout_returns_execution_failed_metadata(self, tmp_path): scanner = _FakeScanner() with patch("subprocess.run", side_effect=subprocess.TimeoutExpired("fake", 5)): result = run_subprocess_scan(scanner, str(tmp_path), timeout=5) - assert "timed out" in result.metadata.get("error", "") + assert result.metadata.get("execution_failed") is True + assert "timed out" in result.metadata.get("execution_failure_reason", "") - def test_no_output_no_stdout_returns_error_metadata(self, tmp_path): + def test_no_output_no_stdout_returns_execution_failed_metadata(self, tmp_path): scanner = _FakeScanner() with patch( "subprocess.run", return_value=_completed(stdout="", stderr="boom", returncode=2), ): result = run_subprocess_scan(scanner, str(tmp_path)) - err = result.metadata.get("error", "") - assert "No output produced" in err - assert "boom" in err - - def test_unexpected_exception_propagates(self, tmp_path): - # Bugs in scanner.parse_results shouldn't be silently translated. + assert result.metadata.get("execution_failed") is True + reason = result.metadata.get("execution_failure_reason", "") + assert "No output produced" in reason + assert "boom" in reason + + def test_parse_exception_emits_parse_failed_metadata(self, tmp_path): + # Parse failures are a third state distinct from execution + # failures: the scanner *did* run and produced output we + # couldn't interpret (schema drift, truncated JSON, mixed + # text+JSON output). The template translates parser + # exceptions into ``parse_failed`` metadata + a reason instead + # of crashing the scan — the rest of the run is still useful, + # and reporters render parse failures distinctly from + # execution failures. scanner = _FakeScanner(parse=lambda p: (_ for _ in ()).throw(ValueError("bad json"))) def fake_run(cmd, **kwargs): - Path(cmd[cmd.index("--out") + 1]).write_text("{}") + Path(cmd[cmd.index("--out") + 1]).write_text("{not valid json") return _completed() with patch("subprocess.run", side_effect=fake_run): - with pytest.raises(ValueError, match="bad json"): - run_subprocess_scan(scanner, str(tmp_path)) + result = run_subprocess_scan(scanner, str(tmp_path)) + + assert result.findings == [] + assert result.metadata.get("parse_failed") is True + # ``execution_failed`` must NOT also be set — these are + # mutually distinct signals so reporters can show "OSV + # produced 12KB we couldn't parse" vs "OSV crashed" cleanly. + assert result.metadata.get("execution_failed") is not True + reason = result.metadata.get("parse_failure_reason", "") + assert "ValueError" in reason + assert "bad json" in reason # --------------------------------------------------------------------- # diff --git a/argus/tests/linters/test_yamllint.py b/argus/tests/linters/test_yamllint.py new file mode 100644 index 0000000..4c22d5b --- /dev/null +++ b/argus/tests/linters/test_yamllint.py @@ -0,0 +1,214 @@ +"""Tests for argus.linters.yamllint.YamllintLinter. + +Locks in the exit-code contract that previously allowed exit ≥ 2 to be +silently dropped: ``_parse_output("")`` returned ``[]`` and the caller +saw a clean ``ScanResult`` with zero findings, so a yamllint config +error rendered as ``Status: PASS``. The fix maps exit ≥ 2 + empty +findings to ``execution_failed`` so the terminal warning row, the +``--fail-on-scanner-error`` gate, and the engine's container-path +metadata all see the same shape. +""" + +import subprocess +from unittest.mock import patch + +from argus.core.models import Severity +from argus.linters.yamllint import YamllintLinter + + +def _completed(stdout: str = "", stderr: str = "", returncode: int = 0): + return subprocess.CompletedProcess( + args=["yamllint"], returncode=returncode, stdout=stdout, stderr=stderr, + ) + + +class TestYamllintScanExitCodes: + """Per yamllint docs: 0=clean, 1=lint findings, 2+=runtime/config error.""" + + def test_exit_zero_no_findings_is_clean_pass(self): + linter = YamllintLinter() + with patch("subprocess.run", return_value=_completed(returncode=0)): + result = linter.scan(".") + assert result.findings == [] + assert result.metadata.get("returncode") == 0 + # Exit 0 must NOT be treated as an execution failure. + assert result.metadata.get("execution_failed") is not True + + def test_exit_one_with_findings_is_happy_path(self): + # The "lint problems exist" exit code is the *expected* outcome + # when violations are present — never an execution failure. + sample = "config.yml:3:1: [error] syntax error (key-duplicates)" + linter = YamllintLinter() + with patch("subprocess.run", return_value=_completed(stdout=sample, returncode=1)): + result = linter.scan(".") + assert len(result.findings) == 1 + assert result.findings[0].severity == Severity.INFO + assert result.metadata.get("returncode") == 1 + assert result.metadata.get("execution_failed") is not True + + def test_exit_one_with_findings_does_not_mark_failed_even_if_findings_only_partial(self): + # Even when stdout has content that doesn't fully parse, exit + # 1 still means yamllint *ran*. Don't blame the tool. + linter = YamllintLinter() + with patch( + "subprocess.run", + return_value=_completed(stdout="garbage that won't parse", returncode=1), + ): + result = linter.scan(".") + # Parser may produce zero findings, but exit=1 still isn't a + # runtime error — return code drives the decision, not parse + # success. + assert result.metadata.get("execution_failed") is not True + + def test_exit_two_empty_stdout_marks_execution_failed(self): + """Regression: yamllint exit 2+ with empty stdout means the + tool itself failed (bad config path, unreadable input, + internal error). Previously we silently dropped this and + printed Status: PASS. Now it surfaces via the same + ``execution_failed`` metadata key the engine's container path + emits, so reporters and ``--fail-on-scanner-error`` see it.""" + linter = YamllintLinter() + stderr_msg = "configuration error: file not found" + with patch( + "subprocess.run", + return_value=_completed(stdout="", stderr=stderr_msg, returncode=2), + ): + result = linter.scan(".") + assert result.findings == [] + assert result.metadata.get("execution_failed") is True + reason = result.metadata.get("execution_failure_reason", "") + assert "yamllint exited 2" in reason + # User-actionable: stderr is included so they don't have to + # re-run with --verbose just to see why. + assert stderr_msg in reason + + def test_high_exit_code_with_empty_stdout_marks_execution_failed(self): + # Any exit ≥ 2 with no parsed findings is the same failure mode. + linter = YamllintLinter() + with patch( + "subprocess.run", + return_value=_completed(stdout="", stderr="boom", returncode=99), + ): + result = linter.scan(".") + assert result.metadata.get("execution_failed") is True + assert "exited 99" in result.metadata.get("execution_failure_reason", "") + + def test_returncode_always_recorded_in_metadata(self): + # Returncode is captured even on the happy path so audit logs + # and viewers can show it without re-running the tool. + linter = YamllintLinter() + with patch("subprocess.run", return_value=_completed(returncode=0)): + result = linter.scan(".") + assert "returncode" in result.metadata + + def test_permission_error_on_windows_falls_back_to_python_module(self, monkeypatch): + """On Windows hosts with AppLocker / Software Restriction + Policy blocking executables in user AppData paths, + ``yamllint.exe`` raises PermissionError on direct launch but + loading the same package via ``python -m yamllint`` works + (the interpreter is whitelisted). The linter must detect + that path and retry — without disturbing the Linux flow. + + The test pretends to be on Windows and asserts the second + ``subprocess.run`` call is the ``python -m`` invocation.""" + from argus.linters import yamllint as ymod + monkeypatch.setattr(ymod.sys, "platform", "win32") + + calls = [] + + def fake_run(cmd, **kwargs): + calls.append(cmd) + if len(calls) == 1: + # First call: direct yamllint invocation — simulate + # AppLocker blocking it. + raise PermissionError(13, "Access is denied", "yamllint.exe") + # Second call: Windows fallback — succeeds. + return _completed(stdout="path:1:1: [error] msg (rule)", returncode=1) + + with patch("subprocess.run", side_effect=fake_run): + result = ymod.YamllintLinter().scan(".") + + # Both subprocess.run calls fired (direct + fallback). + assert len(calls) == 2, f"expected 2 calls, got {len(calls)}: {calls}" + # First was the direct binary invocation. + assert calls[0][0] == "yamllint" + # Second was 'python -m yamllint' with sys.executable as argv[0]. + assert calls[1][0] == ymod.sys.executable + assert calls[1][1] == "-m" + assert calls[1][2] == "yamllint" + # Result-parsing logic is unchanged — the finding from the + # fallback's stdout still made it through. + assert len(result.findings) == 1 + assert result.metadata.get("execution_failed") is not True + + def test_permission_error_on_linux_does_not_fall_back(self, monkeypatch): + """Cross-platform safety: Linux must NOT use the Windows + fallback. AppLocker doesn't exist on Linux; a PermissionError + there indicates a genuine permission bug (chmod, mount + options) that the user needs to see, not a policy case the + fallback compensates for. We re-raise and let the outer + handler mark execution_failed with the actual reason.""" + from argus.linters import yamllint as ymod + monkeypatch.setattr(ymod.sys, "platform", "linux") + + calls = [] + + def fake_run(cmd, **kwargs): + calls.append(cmd) + raise PermissionError(13, "Permission denied", "yamllint") + + with patch("subprocess.run", side_effect=fake_run): + result = ymod.YamllintLinter().scan(".") + + # Exactly ONE subprocess.run call — no fallback retry. + # Linux happy-path / failure-path bytes are unchanged from + # before this fix. + assert len(calls) == 1 + # The PermissionError surfaces as execution_failed (caught by + # the outer OSError handler in scan()), not as a stack trace. + assert result.metadata.get("execution_failed") is True + assert "PermissionError" in result.metadata.get( + "execution_failure_reason", "", + ) + + def test_subprocess_uses_utf8_encoding(self, monkeypatch): + """Bug 2 regression: container / CLI scanner output is UTF-8. + Without explicit ``encoding='utf-8'`` the platform default + (cp1252 on Windows) raises UnicodeDecodeError on non-ASCII + bytes in YAML file paths or value content. We assert the + subprocess.run call passes ``encoding='utf-8'`` and + ``errors='replace'`` so Windows decodes container output + the same way Linux does.""" + from argus.linters import yamllint as ymod + + captured: dict = {} + + def fake_run(cmd, **kwargs): + captured.update(kwargs) + return _completed(returncode=0) + + with patch("subprocess.run", side_effect=fake_run): + ymod.YamllintLinter().scan(".") + + assert captured.get("encoding") == "utf-8" + assert captured.get("errors") == "replace" + + def test_filenotfound_returns_execution_failed_not_raised(self): + """If the yamllint binary disappears between is_available() + and subprocess.run() (rare but possible: race in CI cleanup, + manual uninstall mid-scan), scan() must not raise. Letting + FileNotFoundError propagate triggers the engine's exception + handler and renders a stack trace; a clean + ``execution_failed`` ScanResult lets the reporter surface + the actual reason.""" + linter = YamllintLinter() + with patch( + "subprocess.run", + side_effect=FileNotFoundError(2, "no such file", "yamllint"), + ): + result = linter.scan(".") + assert result.findings == [] + assert result.metadata.get("execution_failed") is True + reason = result.metadata.get("execution_failure_reason", "") + assert "yamllint" in reason + assert "not found" in reason diff --git a/argus/tests/reporters/test_terminal.py b/argus/tests/reporters/test_terminal.py index 677c7ce..d67b547 100644 --- a/argus/tests/reporters/test_terminal.py +++ b/argus/tests/reporters/test_terminal.py @@ -77,10 +77,11 @@ def test_report_fail_status_with_threshold(self, capsys): output = capsys.readouterr().out assert "FAIL" in output - def test_report_warns_on_execution_failure_above_pass_status(self, capsys): - """Scanners that produced no output (execution_failed=True in - metadata) get a clear warning in the terminal output so a single - bad scanner image doesn't quietly slip past the PASS line.""" + def test_report_warns_on_execution_failure_with_per_scanner_reason(self, capsys): + """Scanners that did not run cleanly get a clear warning row + with the *actual* scanner-specific reason (not generic 'uid + mismatch / crashed / wrong entrypoint' boilerplate), so the + user can act on it without re-running with --verbose.""" reporter = TerminalReporter() summary = ScanSummary( results=[ @@ -105,16 +106,49 @@ def test_report_warns_on_execution_failure_above_pass_status(self, capsys): reporter.report(summary) output = capsys.readouterr().out - # Failed scanners are named, count is correct, and the hint - # points at --fail-on-scanner-error for hard CI gating. - assert "2 scanner(s) produced no output" in output - assert "bandit" in output + # New header text: count + "did not run cleanly". + assert "2 scanner(s) did not run cleanly" in output + # Per-scanner reason lines (bullet form). Each scanner name + # appears with its specific reason, not a generic guess. + assert "bandit" in output and "permission denied" in output assert "opengrep" in output + # The CTA still points at --fail-on-scanner-error for CI gating. assert "--fail-on-scanner-error" in output # PASS status still renders below — execution failure is a # separate signal from threshold compliance. assert "PASS" in output + def test_report_does_not_emit_generic_failure_guesses(self, capsys): + """Regression: the old reporter printed canned text guessing at + causes (uid mismatch / crashed / wrong entrypoint). That was + misleading for OSV exit-1-with-findings and yamllint + binary-not-found cases. The reason from the adapter is + authoritative — generic boilerplate must not appear.""" + reporter = TerminalReporter() + summary = ScanSummary( + results=[ + ScanResult( + scanner="osv", findings=[], + metadata={ + "execution_failed": True, + "execution_failure_reason": "Tool not found: osv-scanner", + }, + ), + ], + severity_threshold=None, + ) + reporter.report(summary) + output = capsys.readouterr().out + + # The actual reason must be visible. + assert "Tool not found" in output + # The old generic guesses must NOT appear — they were the + # exact source of the user's "misleading warning text" + # complaint after the first patch. + assert "uid mismatch" not in output + assert "wrong entrypoint" not in output + assert "crashed" not in output + def test_report_no_warning_when_all_scanners_produced_output(self, capsys): """Successful runs must not get the warning row.""" reporter = TerminalReporter() @@ -124,6 +158,135 @@ def test_report_no_warning_when_all_scanners_produced_output(self, capsys): output = capsys.readouterr().out assert "produced no output" not in output + def test_report_status_pass_degraded_when_any_scanner_failed(self, capsys): + """Regression: when one scanner fails to execute but the rest + are clean against threshold, the status line previously read + ``Status: PASS`` — directly contradicting the warning row above + it. Status must annotate ``(degraded — some scanners did not + run)`` so a quick scroll to the bottom doesn't lie. The + ``passed`` boolean stays True (threshold-only); the degraded + label is purely a UX cue.""" + reporter = TerminalReporter() + summary = ScanSummary( + results=[ + ScanResult(scanner="gitleaks", findings=[]), + ScanResult( + scanner="bandit", findings=[], + metadata={"execution_failed": True}, + ), + ], + severity_threshold=None, + ) + reporter.report(summary) + output = capsys.readouterr().out + # The threshold check still passes — that's the contract. + assert summary.passed is True + # But the displayed status must call out the degraded run so + # the user doesn't read "Warning: ..." then "PASS" and shrug. + assert "Status: PASS (degraded" in output + # And the plain "Status: PASS" string must not appear *as a + # standalone status line* — only as part of the degraded label. + # We assert the plain newline-bounded form is absent: + assert "\nStatus: PASS\n" not in output + + def test_report_status_pass_clean_when_no_failures(self, capsys): + """Clean runs must still show the unqualified ``Status: PASS``.""" + reporter = TerminalReporter() + summary = _make_summary(threshold=None) + reporter.report(summary) + output = capsys.readouterr().out + # Clean PASS is the default rendering — no degraded suffix. + assert "Status: PASS" in output + assert "degraded" not in output + + def test_report_parse_failed_renders_distinctly_from_execution_failed(self, capsys): + """A scanner that produced output but couldn't be parsed is a + third state — not the same as 'didn't run'. The reporter + renders parse failures in their own warning block so the user + can tell them apart, and the degraded status label counts + them separately.""" + reporter = TerminalReporter() + summary = ScanSummary( + results=[ + ScanResult( + scanner="osv", findings=[], + metadata={ + "parse_failed": True, + "parse_failure_reason": ( + "JSONDecodeError: Expecting value: line 1 column 1. " + "output head: 'unexpected text'" + ), + }, + ), + ], + severity_threshold=None, + ) + reporter.report(summary) + output = capsys.readouterr().out + + # The dedicated parse-failure block must surface the reason. + assert "could not be parsed" in output + assert "osv" in output + assert "JSONDecodeError" in output + # Distinct degraded label — separates execution failures from + # parse failures so the user understands which kind happened. + assert "Status: PASS (degraded — 1 unparsable)" in output + # Must not be miscategorized as an execution failure. + assert "did not run cleanly" not in output + + def test_report_status_label_lists_both_kinds_when_both_present(self, capsys): + """Mixed-failure runs label the degraded status with both + counts so the user knows the breakdown without scrolling up.""" + reporter = TerminalReporter() + summary = ScanSummary( + results=[ + ScanResult( + scanner="bandit", findings=[], + metadata={ + "execution_failed": True, + "execution_failure_reason": "Tool not found: bandit", + }, + ), + ScanResult( + scanner="osv", findings=[], + metadata={ + "parse_failed": True, + "parse_failure_reason": "JSONDecodeError: line 1 col 1", + }, + ), + ], + severity_threshold=None, + ) + reporter.report(summary) + output = capsys.readouterr().out + + assert "Status: PASS (degraded — 1 did not run, 1 unparsable)" in output + + def test_report_status_fail_takes_priority_over_degraded(self, capsys): + """If findings exceed the threshold, the run is FAIL — not + a degraded PASS — even when other scanners failed to execute. + Threshold violations dominate the exit policy.""" + reporter = TerminalReporter() + summary = ScanSummary( + results=[ + ScanResult( + scanner="bandit", + findings=[Finding(id="B102", severity=Severity.HIGH, title="t")], + ), + ScanResult( + scanner="gitleaks", findings=[], + metadata={"execution_failed": True}, + ), + ], + severity_threshold=Severity.MEDIUM, + ) + reporter.report(summary) + output = capsys.readouterr().out + assert "FAIL" in output + # Don't display PASS at all when the run is FAIL — even with + # an executed-but-degraded scanner mixed in. + assert "Status: PASS" not in output + def test_report_empty_results(self, capsys): reporter = TerminalReporter() summary = ScanSummary() diff --git a/argus/tests/scanners/test_osv.py b/argus/tests/scanners/test_osv.py index 2f8394d..b6af802 100644 --- a/argus/tests/scanners/test_osv.py +++ b/argus/tests/scanners/test_osv.py @@ -65,6 +65,102 @@ def test_install_command(self): def test_supports_sbom(self): assert OsvScanner.supports_sbom is True + def test_parse_results_succeeds_when_json_valid_regardless_of_exit_code(self, fixtures_dir): + """Acceptance criterion: OSV exits 1 when vulnerabilities are + found (it's the documented happy path). The engine parses + ``results.json`` whenever the file exists — exit code is + irrelevant. This test locks in that the parser successfully + extracts findings from real OSV output that came out of an + exit-1 run; if it ever stops doing so, the user sees + '0 findings' on a vulnerable workspace.""" + scanner = OsvScanner() + path = fixtures_dir / "osv" / "results-with-findings.json" + findings = scanner.parse_results(path) + # Fixture is captured from a real osv-scanner exit-1 run. + # 4 findings means: 1 critical + 1 high + 1 medium + 1 low. + assert len(findings) == 4 + assert all(f.scanner == "osv" for f in findings) + + def test_parse_results_zero_findings_returns_empty_list_no_exception(self, fixtures_dir): + """OSV exits 0 with an empty ``results`` array when nothing's + vulnerable. Parser returns [] cleanly — must NOT raise (a raise + would be misclassified as parse_failed by the engine).""" + scanner = OsvScanner() + path = fixtures_dir / "osv" / "results-zero-findings.json" + findings = scanner.parse_results(path) + assert findings == [] + + def test_parse_results_handles_non_ascii_utf8_bytes(self, tmp_path): + """Bug 2 regression: OSV vulnerability summaries and file paths + can contain non-ASCII characters (CVE titles in non-English + locales, paths with unicode segments, contributor names with + accents). The parser reads the file with explicit UTF-8 so + Windows hosts (default cp1252) don't raise + UnicodeDecodeError on bytes like 0x8f that decode fine in + UTF-8 but not in cp1252.""" + import json + scanner = OsvScanner() + f = tmp_path / "results.json" + # 'café' contains 0xc3 0xa9 in UTF-8 — both bytes are + # invalid in cp1252's mapping (0xc3 is valid but 0xa9 + # decodes differently and the *combination* breaks). The + # crash byte the user hit was 0x8f at position 18203 — same + # category of failure mode. + f.write_text( + json.dumps({ + "results": [{ + "source": {"path": "/src/café/Krürzungen.json"}, + "packages": [{ + "package": { + "name": "lodash", + "version": "4.17.20", + "ecosystem": "npm", + }, + "vulnerabilities": [{ + "id": "GHSA-xxxx", + "summary": "Précis: prototype pollution attack — café", + "aliases": ["CVE-2021-23337"], + "database_specific": {"severity": "HIGH"}, + }], + }], + }], + }), + encoding="utf-8", + ) + findings = scanner.parse_results(f) + assert len(findings) == 1 + # The non-ASCII content survived the round-trip — no + # mojibake, no replacement chars (because the file was + # genuinely UTF-8 and we read it as UTF-8). + assert "Précis" in findings[0].title + assert "café" in findings[0].location + + def test_parse_results_malformed_json_raises_for_engine_to_catch(self, tmp_path): + """When the fixture is junk, parse_results must raise so the + engine's parse-failed wrapper can mark + ``parse_failed=True`` with a useful reason. Silently + returning [] would hide schema-drift breakages and make every + OSV run report '0 findings' indefinitely.""" + scanner = OsvScanner() + bad = tmp_path / "results.json" + bad.write_text("this is not json at all") + with pytest.raises((ValueError, Exception)): + scanner.parse_results(bad) + + def test_container_entrypoint_uses_absolute_path(self): + """Regression: ``--entrypoint osv-scanner`` (bare) exited 127 + because the official ghcr.io/google/osv-scanner image declares + ``ENTRYPOINT ["/osv-scanner"]`` (absolute) and Docker's + ``--entrypoint`` does NOT consult the image's $PATH for bare + names. We pin the absolute path so the engine's + ``--entrypoint`` override resolves the binary the same way the + image's own ENTRYPOINT does.""" + assert OsvScanner.container_entrypoint == "/osv-scanner" + assert OsvScanner.container_entrypoint.startswith("/"), ( + "container_entrypoint must be absolute — Docker --entrypoint " + "does not resolve bare names against the image $PATH" + ) + class TestOsvSbomMode: """SBOM mode (config['sbom_path'] set) → uses ``-L`` (osv-scanner v2).""" diff --git a/argus/tests/scanners/test_scanner_scan_methods.py b/argus/tests/scanners/test_scanner_scan_methods.py index eaa82c6..205362c 100644 --- a/argus/tests/scanners/test_scanner_scan_methods.py +++ b/argus/tests/scanners/test_scanner_scan_methods.py @@ -216,7 +216,7 @@ def fake_run(cmd, **kwargs): result = scanner.scan("nonexistent/") assert result.scanner == "bandit" - assert "error" in result.metadata + assert result.metadata.get("execution_failed") is True # ===================================================================== @@ -301,7 +301,7 @@ def fake_run(cmd, **kwargs): result = scanner.scan(".") assert result.scanner == "gitleaks" - assert "error" in result.metadata + assert result.metadata.get("execution_failed") is True # ===================================================================== @@ -359,7 +359,7 @@ def fake_run(cmd, **kwargs): result = scanner.scan(".") assert result.scanner == "osv" - assert "error" in result.metadata + assert result.metadata.get("execution_failed") is True # ===================================================================== @@ -432,7 +432,7 @@ def fake_run(cmd, **kwargs): result = scanner.scan(".") assert result.scanner == "checkov" - assert "error" in result.metadata + assert result.metadata.get("execution_failed") is True def test_scan_no_output(self, monkeypatch): scanner = CheckovScanner() @@ -444,7 +444,7 @@ def fake_run(cmd, **kwargs): result = scanner.scan(".") assert result.scanner == "checkov" - assert "error" in result.metadata + assert result.metadata.get("execution_failed") is True # ===================================================================== @@ -501,4 +501,4 @@ def fake_run(cmd, **kwargs): result = scanner.scan(".") assert result.scanner == "opengrep" - assert "error" in result.metadata + assert result.metadata.get("execution_failed") is True diff --git a/argus/tests/test_engine.py b/argus/tests/test_engine.py index 61c5db0..a1e166b 100644 --- a/argus/tests/test_engine.py +++ b/argus/tests/test_engine.py @@ -852,6 +852,88 @@ def mock_run(cmd, **kwargs): assert result.findings[0].title == "from stdout" assert captured_file["path"].name == "stdout.txt" + def test_chmod_skipped_on_windows(self, monkeypatch): + """On Windows, NTFS doesn't honor POSIX bits and ``os.chmod`` + only flips the read-only attribute — the uid-mismatch failure + mode that the chmod guards against doesn't exist there. We + skip the call so a Windows-native run doesn't trip on confused + permission semantics in stack traces. + + Linux and macOS keep the chmod (covered by the next test) so + existing behavior is unchanged.""" + engine = self._make_engine() + scanner = self._make_scanner() + + monkeypatch.setattr(engine, "_pull_image", lambda img: True) + monkeypatch.setattr(engine, "_get_image_digest", lambda img: "sha256:abc") + # Pretend we're on Windows for this test. + from argus.core import engine as engine_mod + monkeypatch.setattr( + engine_mod.platform, "system", lambda: "Windows", + ) + + chmod_calls = [] + monkeypatch.setattr( + engine_mod.os, "chmod", + lambda *args, **kwargs: chmod_calls.append(args), + ) + + def mock_run(cmd, **_kwargs): + for arg in cmd: + if ":/output" in str(arg): + host_dir = arg.split(":")[0] + Path(host_dir).joinpath("results.json").write_text("{}") + break + return subprocess.CompletedProcess( + args=cmd, returncode=0, stdout="", stderr="", + ) + + monkeypatch.setattr(subprocess, "run", mock_run) + engine._run_in_container(scanner, "/src", {}) + # No chmod on the temp output dir on Windows. Other code paths + # may still call chmod (e.g., cache mount setup), so we filter + # to confirm the *output_dir* chmod specifically didn't fire + # by checking no call used mode 0o777 — that's the unique + # signature of the output-dir chmod this test guards. + assert not any( + len(call) >= 2 and call[1] == 0o777 + for call in chmod_calls + ), f"chmod 0o777 should be skipped on Windows; got: {chmod_calls}" + + def test_chmod_runs_on_non_windows(self, monkeypatch): + """Linux/macOS keep the chmod 0o777 — non-Windows scanners + running as a different uid than the host user need it to + write /output/results.json. This test guards the Linux-safety + side of the Windows fix: skipping on Windows must not also + skip on the platforms where the chmod is load-bearing.""" + engine = self._make_engine() + scanner = self._make_scanner() + + monkeypatch.setattr(engine, "_pull_image", lambda img: True) + monkeypatch.setattr(engine, "_get_image_digest", lambda img: "sha256:abc") + + from argus.core import engine as engine_mod + monkeypatch.setattr(engine_mod.platform, "system", lambda: "Linux") + + captured_mode = {} + + def mock_run(cmd, **_kwargs): + for arg in cmd: + if ":/output" in str(arg): + host_dir = arg.split(":")[0] + captured_mode["mode"] = os.stat(host_dir).st_mode & 0o777 + Path(host_dir).joinpath("results.json").write_text("{}") + break + return subprocess.CompletedProcess( + args=cmd, returncode=0, stdout="", stderr="", + ) + + monkeypatch.setattr(subprocess, "run", mock_run) + engine._run_in_container(scanner, "/src", {}) + # The chmod must have fired before docker run, exactly as + # before the Windows guard was added. + assert captured_mode["mode"] == 0o777 + def test_container_output_dir_is_world_writable(self, monkeypatch): """Regression: scanners running as USER non-root (uid 1000) couldn't write /output/results.json on hosts with uid != 1000 @@ -885,6 +967,87 @@ def mock_run(cmd, **_kwargs): # uid can write to /output regardless of its image's USER. assert captured_mode["mode"] == 0o777 + def test_docker_subprocess_uses_utf8_encoding(self, monkeypatch): + """Bug 2 regression: docker container output is always UTF-8. + Without explicit ``encoding='utf-8'`` on the subprocess call, + ``text=True`` falls back to the platform default — cp1252 on + Windows — and ``UnicodeDecodeError`` fires on any non-ASCII + byte in scanner output (CVE descriptions with accented chars, + file paths with non-ASCII characters, etc.). This test locks + in the explicit encoding + ``errors='replace'`` fallback.""" + engine = self._make_engine() + scanner = self._make_scanner() + + monkeypatch.setattr(engine, "_pull_image", lambda img: True) + monkeypatch.setattr(engine, "_get_image_digest", lambda img: "sha256:abc") + + captured: dict = {} + + def mock_run(cmd, **kwargs): + captured.update(kwargs) + for arg in cmd: + if ":/output" in str(arg): + Path(arg.split(":")[0]).joinpath("results.json").write_text("{}") + break + return subprocess.CompletedProcess( + args=cmd, returncode=0, stdout="", stderr="", + ) + + monkeypatch.setattr(subprocess, "run", mock_run) + engine._run_in_container(scanner, "/src", {}) + + # The docker subprocess must read its output as UTF-8 with + # replace fallback, identical on every host OS. + assert captured.get("encoding") == "utf-8" + assert captured.get("errors") == "replace" + + def test_container_parse_exception_marks_parse_failed_not_raised(self, monkeypatch): + """A parser exception is the third state the user asked for: + the scanner *did* run and produced output we just couldn't + interpret. The engine's container path catches the exception, + sets ``metadata['parse_failed']=True`` with the reason, and + leaves ``execution_failed`` unset — so the reporter can show + 'OSV produced 12KB of output we couldn't parse' rather than + the misleading 'no output produced'. Crucially, the rest of + the scan keeps running; one parser bug doesn't crash the + whole pipeline.""" + engine = self._make_engine() + + def explode(_path): + raise ValueError("Expecting value: line 1 column 1 (char 0)") + + scanner = self._make_scanner(parse_results=explode) + + monkeypatch.setattr(engine, "_pull_image", lambda img: True) + monkeypatch.setattr(engine, "_get_image_digest", lambda img: "sha256:abc") + + def mock_run(cmd, **_kwargs): + for arg in cmd: + if ":/output" in str(arg): + Path(arg.split(":")[0]).joinpath("results.json").write_text( + "this is not valid json" + ) + break + # Scanner exits 1 — findings-found path. A passing test + # here proves "non-zero exit + valid output file path" + # routes through parse, exactly the OSV acceptance case. + return subprocess.CompletedProcess( + args=cmd, returncode=1, stdout="", stderr="", + ) + + monkeypatch.setattr(subprocess, "run", mock_run) + + # Engine must not propagate the parser exception. + result = engine._run_in_container(scanner, "/src", {}) + assert result.findings == [] + assert result.metadata.get("parse_failed") is True + # Distinct from execution_failed — these are orthogonal signals. + assert result.metadata.get("execution_failed") is not True + reason = result.metadata.get("parse_failure_reason", "") + assert "ValueError" in reason + # The output head is included so the user can see what came out. + assert "this is not valid" in reason + def test_container_no_output_marks_execution_failed(self, monkeypatch): """Empty result_files + no stdout means the container ran but produced nothing. Mark the ScanResult so reporters and the