|
2 | 2 |
|
3 | 3 | import argparse |
4 | 4 | import sys |
| 5 | +import threading |
| 6 | +import time |
5 | 7 | from pathlib import Path |
| 8 | +from typing import TextIO |
6 | 9 |
|
7 | 10 | # Exit codes |
8 | 11 | EXIT_SUCCESS = 0 |
|
11 | 14 |
|
12 | 15 | SEVERITY_CHOICES = ["critical", "high", "medium", "low", "none"] |
13 | 16 | FORMAT_CHOICES = ["terminal", "markdown", "sarif", "json"] |
| 17 | +_SPINNER_STYLES = [ |
| 18 | + ["⠁", "⠂", "⠄", "⡀", "⢀", "⠠", "⠐", "⠈"], |
| 19 | + ["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"], |
| 20 | + ["⠿", "⣟", "⣯", "⣷", "⣾", "⣽", "⣻", "⢿"], |
| 21 | +] |
| 22 | + |
| 23 | + |
| 24 | +class _TerminalSpinner: |
| 25 | + """Minimal terminal spinner for long-running CLI operations.""" |
| 26 | + |
| 27 | + def __init__( |
| 28 | + self, |
| 29 | + message: str, |
| 30 | + enabled: bool, |
| 31 | + stream: TextIO | None = None, |
| 32 | + interval: float = 0.08, |
| 33 | + ): |
| 34 | + self._message = message |
| 35 | + self._enabled = enabled |
| 36 | + self._stream = stream or sys.stderr |
| 37 | + self._interval = interval |
| 38 | + self._stop_event = threading.Event() |
| 39 | + self._thread: threading.Thread | None = None |
| 40 | + self._start_ts = 0.0 |
| 41 | + |
| 42 | + def __enter__(self): |
| 43 | + self._start_ts = time.monotonic() |
| 44 | + if self._enabled: |
| 45 | + self._thread = threading.Thread(target=self._spin, daemon=True) |
| 46 | + self._thread.start() |
| 47 | + return self |
| 48 | + |
| 49 | + def __exit__(self, exc_type, _exc, _tb): |
| 50 | + elapsed = time.monotonic() - self._start_ts |
| 51 | + if self._enabled: |
| 52 | + self._stop_event.set() |
| 53 | + if self._thread: |
| 54 | + self._thread.join(timeout=0.2) |
| 55 | + self._clear_line() |
| 56 | + |
| 57 | + status = "done" if exc_type is None else "failed" |
| 58 | + print(f"{self._message} [{status} in {elapsed:.1f}s]", file=self._stream) |
| 59 | + return False |
| 60 | + |
| 61 | + def _spin(self) -> None: |
| 62 | + style_index = 0 |
| 63 | + frame_index = 0 |
| 64 | + |
| 65 | + while not self._stop_event.is_set(): |
| 66 | + frames = _SPINNER_STYLES[style_index] |
| 67 | + frame = frames[frame_index] |
| 68 | + self._stream.write(f"\r{self._message} {frame}") |
| 69 | + self._stream.flush() |
| 70 | + |
| 71 | + frame_index += 1 |
| 72 | + if frame_index >= len(frames): |
| 73 | + frame_index = 0 |
| 74 | + style_index = (style_index + 1) % len(_SPINNER_STYLES) |
| 75 | + |
| 76 | + self._stop_event.wait(self._interval) |
| 77 | + |
| 78 | + def _clear_line(self) -> None: |
| 79 | + self._stream.write("\r" + (" " * 80) + "\r") |
| 80 | + self._stream.flush() |
| 81 | + |
| 82 | + |
| 83 | +def _spinner_enabled(args: argparse.Namespace) -> bool: |
| 84 | + """Enable spinner for interactive terminals unless explicitly disabled.""" |
| 85 | + if getattr(args, "no_spinner", False): |
| 86 | + return False |
| 87 | + if getattr(args, "verbose", False): |
| 88 | + return False |
| 89 | + return sys.stderr.isatty() |
14 | 90 |
|
15 | 91 |
|
16 | 92 | def _make_run_dir(base_dir: str) -> str: |
@@ -178,6 +254,11 @@ def _build_scan_parser(subparsers: argparse._SubParsersAction) -> None: |
178 | 254 | action="store_true", |
179 | 255 | help="Enable verbose output", |
180 | 256 | ) |
| 257 | + scan_parser.add_argument( |
| 258 | + "--no-spinner", |
| 259 | + action="store_true", |
| 260 | + help="Disable animated spinner output", |
| 261 | + ) |
181 | 262 |
|
182 | 263 | # Container-specific flags (used with: argus scan container) |
183 | 264 | container_group = scan_parser.add_argument_group( |
@@ -480,7 +561,11 @@ def _cmd_source_scan(args: argparse.Namespace) -> int: |
480 | 561 | try: |
481 | 562 | scanner_names = [args.scanner] if args.scanner else None |
482 | 563 | log.info("Running scanners: %s", scanner_names or "all enabled") |
483 | | - summary = engine.run(scanner_names=scanner_names, path=args.path) |
| 564 | + with _TerminalSpinner( |
| 565 | + message="Running scanners", |
| 566 | + enabled=_spinner_enabled(args), |
| 567 | + ): |
| 568 | + summary = engine.run(scanner_names=scanner_names, path=args.path) |
484 | 569 | log.info( |
485 | 570 | "Scan complete: %d scanner(s), %d finding(s)", |
486 | 571 | len(summary.results), |
@@ -547,7 +632,11 @@ def _cmd_container_scan(args: argparse.Namespace) -> int: |
547 | 632 | # Run |
548 | 633 | try: |
549 | 634 | engine = ContainerEngine(config) |
550 | | - summary = engine.run() |
| 635 | + with _TerminalSpinner( |
| 636 | + message="Running container scan", |
| 637 | + enabled=_spinner_enabled(args), |
| 638 | + ): |
| 639 | + summary = engine.run() |
551 | 640 | except Exception as exc: |
552 | 641 | print(f"Error: container scan failed: {exc}", file=sys.stderr) |
553 | 642 | return EXIT_ERROR |
@@ -627,7 +716,11 @@ def _cmd_dast_scan(args: argparse.Namespace) -> int: |
627 | 716 |
|
628 | 717 | try: |
629 | 718 | engine = DastEngine(config) |
630 | | - summary = engine.run() |
| 719 | + with _TerminalSpinner( |
| 720 | + message="Running DAST scan", |
| 721 | + enabled=_spinner_enabled(args), |
| 722 | + ): |
| 723 | + summary = engine.run() |
631 | 724 | except Exception as exc: |
632 | 725 | print(f"Error: DAST scan failed: {exc}", file=sys.stderr) |
633 | 726 | return EXIT_ERROR |
@@ -1079,8 +1172,6 @@ def _show_logo_easter_egg() -> int: |
1079 | 1172 | Hidden trigger: `argus __logo` |
1080 | 1173 | Not part of argparse, so it stays out of generated docs/help text. |
1081 | 1174 | """ |
1082 | | - import time |
1083 | | - |
1084 | 1175 | try: |
1085 | 1176 | from argus.init import _load_banner |
1086 | 1177 | lines = _load_banner().splitlines() |
|
0 commit comments