Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion .ai/workflows.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -275,8 +275,13 @@ contributor_workflows:
steps:
- action: "Create scanner module"
location: "argus/scanners/{name}.py"
purpose: "Implement Scanner protocol — name, scan(path, config), is_available(), install_command()"
purpose: "Implement Scanner protocol — name, scan(), build_args(), is_available(), install_command(), tool_version(), parse_results()"
template: "argus/scanners/bandit.py"
helpers:
scan_template: "Use argus.core.scanner_template.run_subprocess_scan(self, path, config) — collapses the tempdir+subprocess+output-file+error-handling boilerplate into a one-line scan() body. Each scanner declares the WHAT (build_args, parse_results) instead of the HOW (subprocess plumbing). Skip when the tool runs multiple binaries or emits to stdout (see grype.py, clamav.py, supply_chain.py for the bypass shape)."
build_args: "Implement build_args(self, paths: ScanPaths, config: dict) — single source of truth for both local and container execution. Returns the FULL argv (binary name as args[0]). Engine drops args[0] when ``container_entrypoint`` is declared on the class (i.e. the image has ENTRYPOINT). Replaces the legacy _build_command + container_args pair, which used to drift."
tool_version: "Use argus.core.version.parse_tool_version(cmd, regex) — collapses the ~17-line subprocess+regex+exception boilerplate into one line. Only fall back to custom parsing when the tool emits structured output (JSON, etc.) — see grype.py"
secret_redaction: "If raw output ever contains source-code literals (passwords, code excerpts), use argus.core.redact helpers and add a regression test (see CLAUDE.md → secret leak audit)"

- action: "Register in SCANNER_REGISTRY"
location: "argus/scanners/__init__.py"
Expand Down
112 changes: 54 additions & 58 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,94 +100,88 @@ This is the primary way to add a new scanner. Each scanner is a single Python mo

### Step 1: Create the Scanner Module

Create `argus/scanners/my_scanner.py` implementing the `Scanner` protocol:
Create `argus/scanners/my_scanner.py`. The SDK provides two helpers — `parse_tool_version` (for `tool_version()`) and `run_subprocess_scan` (for `scan()`) — so a typical scanner is ~50 lines:

```python
"""My Scanner - brief description of what it scans."""

import json
import shutil
import subprocess
import tempfile
from pathlib import Path

from argus.containers import get_image
from argus.core.models import Finding, ScanResult, Severity
from argus.core.scanner_template import ScanPaths, run_subprocess_scan
from argus.core.version import parse_tool_version


class MyScanner:
"""Wraps MyTool to scan for security issues."""

name = "my-scanner"
description = "What it scans, in one line"
category = "sast" # or "secrets", "iac", "sca", "container", "linter", ...
container_image = get_image("my-scanner")
# Set this if your container image declares ``ENTRYPOINT ["my-tool"]``.
# The engine drops argv[0] when this attr is present, so build_args
# can return the same argv shape for both local and container paths.
container_entrypoint = "my-tool"

def scan(self, path: str, config: dict | None = None) -> ScanResult:
"""Run the scanner against the given path and return results."""
config = config or {}

with tempfile.TemporaryDirectory() as tmp_dir:
output_file = Path(tmp_dir) / "results.json"
cmd = self._build_command(path, output_file, config)

result = subprocess.run(cmd, capture_output=True, text=True)

if result.returncode != 0:
return ScanResult(
scanner=self.name,
metadata={"error": result.stderr.strip()},
)

if not output_file.exists():
return ScanResult(
scanner=self.name,
metadata={"error": "No output file produced"},
)

findings = self.parse_results(output_file)
return ScanResult(
scanner=self.name,
findings=findings,
raw_report=output_file,
)
"""One-line wrapper — the template handles tempdir, subprocess, errors."""
return run_subprocess_scan(self, path, config)

def build_args(self, paths: ScanPaths, config: dict) -> list[str]:
"""Build the FULL argv (binary name as args[0]).

Single source of truth for both local and container execution.
Local path: ``paths.workspace`` is the host path the user gave;
container path: ``paths.workspace`` is ``/workspace`` and
``paths.output`` is ``/output/results.json``. The engine handles
the path translation; this method just consumes whatever paths
it's handed.
"""
args = [
"my-tool", "scan",
paths.workspace,
"--format", "json",
"--output", paths.output,
]
if config.get("exclude"):
args.extend(["--exclude", config["exclude"]])
return args

def is_available(self) -> bool:
"""Check if the underlying tool is installed."""
return shutil.which("my-tool") is not None

def install_command(self) -> str | None:
"""Return the shell command to install the tool, or None."""
return "pip install my-tool"

def container_args(self, config: dict | None = None) -> list[str]:
"""Return CLI args for running the tool in a container."""
return [
"my-tool", "scan", "/workspace",
"-f", "json",
"-o", "/output/results.json",
]
def tool_version(self) -> str | None:
if not self.is_available():
return None
# Tool output: "my-tool X.Y.Z (build info)"
return parse_tool_version(["my-tool", "--version"], r"^my-tool (\S+)")

def parse_results(self, raw_output_path: Path) -> list[Finding]:
"""Parse tool output into normalized Finding objects."""
data = json.loads(raw_output_path.read_text())
return [self._parse_finding(item) for item in data.get("results", [])]
return [
Finding(
id=item.get("rule_id", "UNKNOWN"),
severity=Severity.from_string(item.get("severity", "UNKNOWN")),
title=item.get("message", ""),
description=item.get("message", ""),
location=item.get("file", ""),
scanner=self.name,
)
for item in data.get("results", [])
]
```

def _parse_finding(self, item: dict) -> Finding:
"""Convert a single result into a Finding."""
return Finding(
id=item.get("rule_id", "UNKNOWN"),
severity=Severity.from_string(item.get("severity", "UNKNOWN")),
title=item.get("message", ""),
description=item.get("message", ""),
location=item.get("file", ""),
scanner=self.name,
)
**Why this shape**: `build_args(paths, config)` replaces the historical pair of `_build_command(path, output, config)` (local) + `container_args(config)` (container). The two used to drift — different `--output` vs `--output-file` flag names, different exit-code handling — and the engine had to know about both. With one method that takes a `ScanPaths` value object, local and container execution share the same definition; the engine builds the right `ScanPaths` for the context (host paths locally, `/workspace` + `/output/...` in containers) and the scanner stays oblivious.

def _build_command(
self, path: str, output_file: Path, config: dict
) -> list[str]:
"""Build the CLI command."""
return ["my-tool", "scan", path, "-f", "json", "-o", str(output_file)]
```
**When to skip the template**: if your tool emits results to stdout instead of a file (`grype version -o json`, ClamAV's text output) or runs an orchestration of multiple binaries (`supply_chain` → zizmor + actionlint), write a custom `scan()`. The template doesn't fit every shape and shouldn't be forced — see `grype.py`, `clamav.py`, `supply_chain.py` for examples.

**Scanner protocol requirements** (from `argus/core/scanner.py`):

Expand All @@ -197,9 +191,11 @@ class MyScanner:
| `scan(path, config) -> ScanResult` | Yes | Run the scanner and return normalized results |
| `is_available() -> bool` | Yes | Check if the tool is installed locally |
| `install_command() -> str \| None` | Yes | Shell command to install the tool |
| `tool_version() -> str \| None` | Recommended | Installed tool version. Use `parse_tool_version()` from `argus.core.version` for the common case (regex match against `<tool> --version` output) — see `bandit.py`, `clamav.py`, `trivy.py`, `gitleaks.py` for examples. Only fall back to custom parsing for tools with structured output (JSON, etc.) — see `grype.py` |
| `build_args(paths, config) -> list[str]` | Yes (subprocess scanners) | Build the full argv for the tool. Used by both local and container execution paths — single source of truth. `paths` is a `ScanPaths(workspace, output)` value object. Drop `argv[0]` automatically when the image has an `ENTRYPOINT` — declare `container_entrypoint = "<bin>"` on the class |
| `container_image: str` | Optional | Docker image for container fallback |
| `container_args(config) -> list[str]` | Optional | CLI args for containerized execution |
| `parse_results(path) -> list[Finding]` | Optional | Parse raw output file into findings |
| `container_entrypoint: str` | Optional | Set when the container image has `ENTRYPOINT ["<bin>"]`; engine drops `argv[0]` from `build_args` output |
| `parse_results(path) -> list[Finding]` | Yes | Parse raw output file into findings. May return `(list[Finding], dict)` to attach scanner-level metadata (e.g. `passed_count` for linters) |

**Reference implementation**: See `argus/scanners/bandit.py` for a complete, well-documented example.

Expand Down
30 changes: 26 additions & 4 deletions argus.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ scanners:
lint-dockerfile:
enabled: true

# Not applicable to this repo as part of the default scan:
# Not applicable to this repo as part of the default source scan:
# - trivy-iac / checkov target Terraform / CloudFormation / K8s — none here
# - container / zap operate on built images / running endpoints, not
# source paths; run them on demand:
# argus scan container --discover .
# argus scan zap --target http://...
# source paths. Container targets are configured in the
# ``containers:`` block below and run via ``argus scan container``;
# ZAP is on-demand: ``argus scan zap --target http://...``
trivy-iac:
enabled: false
checkov:
Expand All @@ -51,6 +51,28 @@ scanners:
zap:
enabled: false

# Container lifecycle targets — drives ``argus scan container`` without
# needing CLI flags. Each entry is a Dockerfile-built image that the
# scanner builds locally (so the scan reflects exactly the bytes that
# ship from this branch) and then hands to trivy + grype + syft. The
# build-containers workflow can read this same list rather than
# hard-coding a parallel matrix — single source of truth.
containers:
images:
- image: ghcr.io/huntridge-labs/argus/scanner-bandit:dev
dockerfile: docker/Dockerfile.bandit
context: .
- image: ghcr.io/huntridge-labs/argus/scanner-opengrep:dev
dockerfile: docker/Dockerfile.opengrep
context: .
- image: ghcr.io/huntridge-labs/argus/scanner-supply-chain:dev
dockerfile: docker/Dockerfile.supply-chain
context: .
- image: ghcr.io/huntridge-labs/argus/cli:dev
dockerfile: docker/Dockerfile.cli
context: .
scanners: [trivy, grype, syft]

reporting:
formats:
- terminal
Expand Down
19 changes: 18 additions & 1 deletion argus/core/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -674,7 +674,24 @@ def _run_in_container(
docker_cmd.extend(["--entrypoint", entrypoint])
logger.debug("Overriding entrypoint: %s", entrypoint)

container_args = scanner.container_args(config)
# Prefer the unified ``build_args(ScanPaths)`` shape (single
# source of truth for both local and container CLI args).
# Fall back to legacy ``container_args(config)`` for scanners
# not yet migrated. Once every scanner declares
# ``build_args``, the legacy branch and container_args
# method go away.
if hasattr(scanner, "build_args"):
from argus.core.scanner_template import ScanPaths
paths = ScanPaths(
workspace="/workspace",
output="/output/results.json",
)
container_args = scanner.build_args(paths, config or {})
# ENTRYPOINT-based images supply the binary; drop argv[0].
if getattr(scanner, "container_entrypoint", None):
container_args = container_args[1:]
else:
container_args = scanner.container_args(config)
docker_cmd.extend([image] + container_args)

logger.debug(
Expand Down
17 changes: 2 additions & 15 deletions argus/core/models.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Core data models for Argus scan results."""

import functools
from dataclasses import dataclass, field, asdict
from enum import Enum
from typing import Optional
Expand Down Expand Up @@ -31,6 +32,7 @@
}


@functools.total_ordering
class Severity(Enum):
"""Security finding severity levels with comparison support."""

Expand All @@ -57,21 +59,6 @@ def from_string(cls, value: str) -> "Severity":
def _order(self) -> int:
return _SEVERITY_ORDER[self.value]

def __ge__(self, other: "Severity") -> bool:
if not isinstance(other, Severity):
return NotImplemented
return self._order >= other._order

def __gt__(self, other: "Severity") -> bool:
if not isinstance(other, Severity):
return NotImplemented
return self._order > other._order

def __le__(self, other: "Severity") -> bool:
if not isinstance(other, Severity):
return NotImplemented
return self._order <= other._order

def __lt__(self, other: "Severity") -> bool:
if not isinstance(other, Severity):
return NotImplemented
Expand Down
Loading
Loading